summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap
diff options
context:
space:
mode:
authorTomás Touceda <chiiph@leap.se>2014-02-20 17:11:03 -0300
committerTomás Touceda <chiiph@leap.se>2014-02-20 17:11:03 -0300
commit66c5602b77547ec24674f5e40c1d244f28ff5a49 (patch)
tree91ef04803c589af26ef872563ab8fdf63d4bc516 /src/leap/mail/imap
parent6f98424d41b38d5e56f54f312a6de871e3a82daa (diff)
parent976ec85451bef3fd380f69c64e803d7740d7dae4 (diff)
Merge remote-tracking branch 'refs/remotes/kali/feature/speedup-select' into develop
Diffstat (limited to 'src/leap/mail/imap')
-rw-r--r--src/leap/mail/imap/account.py1
-rw-r--r--src/leap/mail/imap/mailbox.py46
-rw-r--r--src/leap/mail/imap/memorystore.py19
-rw-r--r--src/leap/mail/imap/messages.py55
-rw-r--r--src/leap/mail/imap/soledadstore.py44
5 files changed, 108 insertions, 57 deletions
diff --git a/src/leap/mail/imap/account.py b/src/leap/mail/imap/account.py
index 1b5d4a0..ede63d3 100644
--- a/src/leap/mail/imap/account.py
+++ b/src/leap/mail/imap/account.py
@@ -119,6 +119,7 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):
:rtype: SoledadDocument
"""
+ # XXX use soledadstore instead ...;
doc = self._soledad.get_from_index(
self.TYPE_MBOX_IDX, self.MBOX_KEY,
self._parse_mailbox_name(name))
diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py
index 57505f0..59b2b40 100644
--- a/src/leap/mail/imap/mailbox.py
+++ b/src/leap/mail/imap/mailbox.py
@@ -132,13 +132,11 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:param rw: read-and-write flag for this mailbox
:type rw: int
"""
- logger.debug("Initializing mailbox %r" % (mbox,))
leap_assert(mbox, "Need a mailbox name to initialize")
leap_assert(soledad, "Need a soledad instance to initialize")
- # XXX should move to wrapper
- #leap_assert(isinstance(soledad._db, SQLCipherDatabase),
- #"soledad._db must be an instance of SQLCipherDatabase")
+ from twisted.internet import reactor
+ self.reactor = reactor
self.mbox = self._parse_mailbox_name(mbox)
self.rw = rw
@@ -149,6 +147,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
self.messages = MessageCollection(
mbox=mbox, soledad=self._soledad, memstore=self._memstore)
+ self._uidvalidity = None
+
# XXX careful with this get/set (it would be
# hitting db unconditionally, move to memstore too)
# Now it's returning a fixed amount of flags from mem
@@ -161,12 +161,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
self.prime_last_uid_to_memstore()
self.prime_flag_docs_to_memstore()
- from twisted.internet import reactor
- self.reactor = reactor
-
# purge memstore from empty fdocs.
self._memstore.purge_fdoc_store(mbox)
- logger.debug("DONE initializing mailbox %r" % (mbox,))
@property
def listeners(self):
@@ -339,8 +335,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:return: unique validity identifier
:rtype: int
"""
- mbox = self._get_mbox_doc()
- return mbox.content.get(self.CREATED_KEY, 1)
+ if self._uidvalidity is None:
+ mbox = self._get_mbox_doc()
+ self._uidvalidity = mbox.content.get(self.CREATED_KEY, 1)
+ return self._uidvalidity
def getUID(self, message):
"""
@@ -652,15 +650,37 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
about
:type messages_asked: MessageSet
- :param uid: If true, the IDs are UIDs. They are message sequence IDs
+ :param uid: If 1, the IDs are UIDs. They are message sequence IDs
otherwise.
- :type uid: bool
+ :type uid: int
:return: A tuple of two-tuples of message sequence numbers and
flagsPart, which is a only a partial implementation of
MessagePart.
:rtype: tuple
"""
+ d = defer.Deferred()
+ self.reactor.callInThread(self._do_fetch_flags, messages_asked, uid, d)
+ if PROFILE_CMD:
+ do_profile_cmd(d, "FETCH-ALL-FLAGS")
+ return d
+
+ # called in thread
+ def _do_fetch_flags(self, messages_asked, uid, d):
+ """
+ :param messages_asked: IDs of the messages to retrieve information
+ about
+ :type messages_asked: MessageSet
+
+ :param uid: If 1, the IDs are UIDs. They are message sequence IDs
+ otherwise.
+ :type uid: int
+ :param d: deferred whose callback will be called with result.
+ :type d: Deferred
+
+ :rtype: A tuple of two-tuples of message sequence numbers and
+ flagsPart
+ """
class flagsPart(object):
def __init__(self, uid, flags):
self.uid = uid
@@ -678,7 +698,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
all_flags = self._memstore.all_flags(self.mbox)
result = ((msgid, flagsPart(
msgid, all_flags.get(msgid, tuple()))) for msgid in seq_messg)
- return result
+ self.reactor.callFromThread(d.callback, result)
def fetch_headers(self, messages_asked, uid):
"""
diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py
index f23a234..6206468 100644
--- a/src/leap/mail/imap/memorystore.py
+++ b/src/leap/mail/imap/memorystore.py
@@ -364,9 +364,11 @@ class MemoryStore(object):
# Update memory store size
# XXX this should use [mbox][uid]
- key = mbox, uid
- self._sizes[key] = size.get_size(self._fdoc_store[key])
+ # TODO --- this has to be deferred to thread,
# TODO add hdoc and cdocs sizes too
+ # it's slowing things down here.
+ #key = mbox, uid
+ #self._sizes[key] = size.get_size(self._fdoc_store[key])
def purge_fdoc_store(self, mbox):
"""
@@ -504,12 +506,16 @@ class MemoryStore(object):
if key in self._sizes:
del self._sizes[key]
self._known_uids[mbox].discard(uid)
+ except KeyError:
+ pass
except Exception as exc:
logger.error("error while removing message!")
logger.exception(exc)
try:
with self._fdoc_docid_lock:
del self._fdoc_id_store[mbox][uid]
+ except KeyError:
+ pass
except Exception as exc:
logger.error("error while removing message!")
logger.exception(exc)
@@ -724,17 +730,16 @@ class MemoryStore(object):
:type mbox: str or unicode
:rtype: dict
"""
- flags_dict = {}
+ fdict = {}
uids = self.get_uids(mbox)
- fdoc_store = self._fdoc_store[mbox]
+ fstore = self._fdoc_store[mbox]
for uid in uids:
try:
- flags = fdoc_store[uid][fields.FLAGS_KEY]
- flags_dict[uid] = flags
+ fdict[uid] = fstore[uid][fields.FLAGS_KEY]
except KeyError:
continue
- return flags_dict
+ return fdict
def all_headers(self, mbox):
"""
diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py
index fc1ec55..9f7f6e2 100644
--- a/src/leap/mail/imap/messages.py
+++ b/src/leap/mail/imap/messages.py
@@ -77,7 +77,7 @@ def try_unique_query(curried):
# TODO we could take action, like trigger a background
# process to kill dupes.
name = getattr(curried, 'expected', 'doc')
- logger.debug(
+ logger.warning(
"More than one %s found for this mbox, "
"we got a duplicate!!" % (name,))
return query.pop()
@@ -683,8 +683,12 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# TODO we would abstract this to a SoledadProperty class
- _rdoc_lock = threading.Lock()
- _rdoc_property_lock = threading.Lock()
+ _rdoc_lock = defaultdict(lambda: threading.Lock())
+ _rdoc_write_lock = defaultdict(lambda: threading.Lock())
+ _rdoc_read_lock = defaultdict(lambda: threading.Lock())
+ _rdoc_property_lock = defaultdict(lambda: threading.Lock())
+
+ _initialized = {}
def __init__(self, mbox=None, soledad=None, memstore=None):
"""
@@ -725,10 +729,16 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
self.memstore = memstore
self.__rflags = None
- self.initialize_db()
- # ensure that we have a recent-flags and a hdocs-sec doc
- self._get_or_create_rdoc()
+ if not self._initialized.get(mbox, False):
+ try:
+ self.initialize_db()
+ # ensure that we have a recent-flags doc
+ self._get_or_create_rdoc()
+ except Exception:
+ logger.debug("Error initializing %r" % (mbox,))
+ else:
+ self._initialized[mbox] = True
from twisted.internet import reactor
self.reactor = reactor
@@ -749,12 +759,14 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
Try to retrieve the recent-flags doc for this MessageCollection,
and create one if not found.
"""
- rdoc = self._get_recent_doc()
- if not rdoc:
- rdoc = self._get_empty_doc(self.RECENT_DOC)
- if self.mbox != fields.INBOX_VAL:
- rdoc[fields.MBOX_KEY] = self.mbox
- self._soledad.create_doc(rdoc)
+ # XXX should move this to memstore too
+ with self._rdoc_write_lock[self.mbox]:
+ rdoc = self._get_recent_doc_from_soledad()
+ if rdoc is None:
+ rdoc = self._get_empty_doc(self.RECENT_DOC)
+ if self.mbox != fields.INBOX_VAL:
+ rdoc[fields.MBOX_KEY] = self.mbox
+ self._soledad.create_doc(rdoc)
@deferred_to_thread
def _do_parse(self, raw):
@@ -972,12 +984,14 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
return self.__rflags
if self.memstore is not None:
- with self._rdoc_lock:
+ with self._rdoc_lock[self.mbox]:
rflags = self.memstore.get_recent_flags(self.mbox)
if not rflags:
# not loaded in the memory store yet.
# let's fetch them from soledad...
- rdoc = self._get_recent_doc()
+ rdoc = self._get_recent_doc_from_soledad()
+ if rdoc is None:
+ return set([])
rflags = set(rdoc.content.get(
fields.RECENTFLAGS_KEY, []))
# ...and cache them now.
@@ -997,8 +1011,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
_get_recent_flags, _set_recent_flags,
doc="Set of UIDs with the recent flag for this mailbox.")
- # XXX change naming, indicate soledad query.
- def _get_recent_doc(self):
+ def _get_recent_doc_from_soledad(self):
"""
Get recent-flags document from Soledad for this mailbox.
:rtype: SoledadDocument or None
@@ -1008,8 +1021,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
fields.TYPE_MBOX_IDX,
fields.TYPE_RECENT_VAL, self.mbox)
curried.expected = "rdoc"
- rdoc = try_unique_query(curried)
- return rdoc
+ with self._rdoc_read_lock[self.mbox]:
+ return try_unique_query(curried)
# Property-set modification (protected by a different
# lock to give atomicity to the read/write operation)
@@ -1021,7 +1034,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
:param uids: the uids to unset
:type uid: sequence
"""
- with self._rdoc_property_lock:
+ with self._rdoc_property_lock[self.mbox]:
self.recent_flags.difference_update(
set(uids))
@@ -1034,7 +1047,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
:param uid: the uid to unset
:type uid: int
"""
- with self._rdoc_property_lock:
+ with self._rdoc_property_lock[self.mbox]:
self.recent_flags.difference_update(
set([uid]))
@@ -1046,7 +1059,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
:param uid: the uid to set
:type uid: int
"""
- with self._rdoc_property_lock:
+ with self._rdoc_property_lock[self.mbox]:
self.recent_flags = self.recent_flags.union(
set([uid]))
diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py
index 732fe03..25f00bb 100644
--- a/src/leap/mail/imap/soledadstore.py
+++ b/src/leap/mail/imap/soledadstore.py
@@ -133,15 +133,14 @@ A lock per document.
# Setting this to twice the number of threads in the threadpool
# should be safe.
put_locks = defaultdict(lambda: threading.Lock())
+mbox_doc_locks = defaultdict(lambda: threading.Lock())
class SoledadStore(ContentDedup):
"""
This will create docs in the local Soledad database.
"""
- _soledad_rw_lock = threading.Lock()
_remove_lock = threading.Lock()
- _mbox_doc_locks = defaultdict(lambda: threading.Lock())
implements(IMessageConsumer, IMessageStore)
@@ -282,9 +281,13 @@ class SoledadStore(ContentDedup):
def doSoledadCalls(items):
# we prime the generator, that should return the
# message or flags wrapper item in the first place.
- doc_wrapper = items.next()
- failed = self._soledad_write_document_parts(items)
- queueNotifyBack(failed, doc_wrapper)
+ try:
+ doc_wrapper = items.next()
+ except StopIteration:
+ pass
+ else:
+ failed = self._soledad_write_document_parts(items)
+ queueNotifyBack(failed, doc_wrapper)
doSoledadCalls(self._iter_wrapper_subparts(doc_wrapper))
@@ -309,8 +312,10 @@ class SoledadStore(ContentDedup):
try:
self._try_call(call, item)
except Exception as exc:
- logger.debug("ITEM WAS: %s" % str(item))
- logger.debug("ITEM CONTENT WAS: %s" % str(item.content))
+ logger.debug("ITEM WAS: %s" % repr(item))
+ if hasattr(item, 'content'):
+ logger.debug("ITEM CONTENT WAS: %s" %
+ repr(item.content))
logger.exception(exc)
failed = True
continue
@@ -349,6 +354,9 @@ class SoledadStore(ContentDedup):
if call == self._PUT_DOC_FUN:
doc_id = item.doc_id
+ if doc_id is None:
+ logger.warning("BUG! Dirty doc but has no doc_id!")
+ return
with put_locks[doc_id]:
doc = self._GET_DOC_FUN(doc_id)
@@ -437,12 +445,12 @@ class SoledadStore(ContentDedup):
:return: a tuple with recent-flags doc payload and callable
:rtype: tuple
"""
- call = self._CREATE_DOC_FUN
+ call = self._PUT_DOC_FUN
payload = rflags_wrapper.content
if payload:
logger.debug("Saving RFLAGS to Soledad...")
- yield payload, call
+ yield rflags_wrapper, call
# Mbox documents and attributes
@@ -456,7 +464,7 @@ class SoledadStore(ContentDedup):
the query failed.
:rtype: SoledadDocument or None.
"""
- with self._mbox_doc_locks[mbox]:
+ with mbox_doc_locks[mbox]:
return self._get_mbox_document(mbox)
def _get_mbox_document(self, mbox):
@@ -471,7 +479,7 @@ class SoledadStore(ContentDedup):
return query.pop()
else:
logger.error("Could not find mbox document for %r" %
- (self.mbox,))
+ (mbox,))
except Exception as exc:
logger.exception("Unhandled error %r" % exc)
@@ -496,7 +504,7 @@ class SoledadStore(ContentDedup):
:type closed: bool
"""
leap_assert(isinstance(closed, bool), "closed needs to be boolean")
- with self._mbox_doc_locks[mbox]:
+ with mbox_doc_locks[mbox]:
mbox_doc = self._get_mbox_document(mbox)
if mbox_doc is None:
logger.error(
@@ -521,14 +529,18 @@ class SoledadStore(ContentDedup):
leap_assert_type(value, int)
key = fields.LAST_UID_KEY
- # XXX change for a lock related to the mbox document
- # itself.
- with self._mbox_doc_locks[mbox]:
+ # XXX use accumulator to reduce number of hits
+ with mbox_doc_locks[mbox]:
mbox_doc = self._get_mbox_document(mbox)
old_val = mbox_doc.content[key]
if value > old_val:
mbox_doc.content[key] = value
- self._soledad.put_doc(mbox_doc)
+ try:
+ self._soledad.put_doc(mbox_doc)
+ except Exception as exc:
+ logger.error("Error while setting last_uid for %r"
+ % (mbox,))
+ logger.exception(exc)
def get_flags_doc(self, mbox, uid):
"""