diff options
Diffstat (limited to 'mail')
-rw-r--r-- | mail/src/leap/mail/imap/account.py | 1 | ||||
-rw-r--r-- | mail/src/leap/mail/imap/mailbox.py | 46 | ||||
-rw-r--r-- | mail/src/leap/mail/imap/memorystore.py | 19 | ||||
-rw-r--r-- | mail/src/leap/mail/imap/messages.py | 55 | ||||
-rw-r--r-- | mail/src/leap/mail/imap/soledadstore.py | 44 |
5 files changed, 108 insertions, 57 deletions
diff --git a/mail/src/leap/mail/imap/account.py b/mail/src/leap/mail/imap/account.py index 1b5d4a05..ede63d38 100644 --- a/mail/src/leap/mail/imap/account.py +++ b/mail/src/leap/mail/imap/account.py @@ -119,6 +119,7 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser): :rtype: SoledadDocument """ + # XXX use soledadstore instead ...; doc = self._soledad.get_from_index( self.TYPE_MBOX_IDX, self.MBOX_KEY, self._parse_mailbox_name(name)) diff --git a/mail/src/leap/mail/imap/mailbox.py b/mail/src/leap/mail/imap/mailbox.py index 57505f0f..59b2b409 100644 --- a/mail/src/leap/mail/imap/mailbox.py +++ b/mail/src/leap/mail/imap/mailbox.py @@ -132,13 +132,11 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :param rw: read-and-write flag for this mailbox :type rw: int """ - logger.debug("Initializing mailbox %r" % (mbox,)) leap_assert(mbox, "Need a mailbox name to initialize") leap_assert(soledad, "Need a soledad instance to initialize") - # XXX should move to wrapper - #leap_assert(isinstance(soledad._db, SQLCipherDatabase), - #"soledad._db must be an instance of SQLCipherDatabase") + from twisted.internet import reactor + self.reactor = reactor self.mbox = self._parse_mailbox_name(mbox) self.rw = rw @@ -149,6 +147,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser): self.messages = MessageCollection( mbox=mbox, soledad=self._soledad, memstore=self._memstore) + self._uidvalidity = None + # XXX careful with this get/set (it would be # hitting db unconditionally, move to memstore too) # Now it's returning a fixed amount of flags from mem @@ -161,12 +161,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser): self.prime_last_uid_to_memstore() self.prime_flag_docs_to_memstore() - from twisted.internet import reactor - self.reactor = reactor - # purge memstore from empty fdocs. self._memstore.purge_fdoc_store(mbox) - logger.debug("DONE initializing mailbox %r" % (mbox,)) @property def listeners(self): @@ -339,8 +335,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :return: unique validity identifier :rtype: int """ - mbox = self._get_mbox_doc() - return mbox.content.get(self.CREATED_KEY, 1) + if self._uidvalidity is None: + mbox = self._get_mbox_doc() + self._uidvalidity = mbox.content.get(self.CREATED_KEY, 1) + return self._uidvalidity def getUID(self, message): """ @@ -652,15 +650,37 @@ class SoledadMailbox(WithMsgFields, MBoxParser): about :type messages_asked: MessageSet - :param uid: If true, the IDs are UIDs. They are message sequence IDs + :param uid: If 1, the IDs are UIDs. They are message sequence IDs otherwise. - :type uid: bool + :type uid: int :return: A tuple of two-tuples of message sequence numbers and flagsPart, which is a only a partial implementation of MessagePart. :rtype: tuple """ + d = defer.Deferred() + self.reactor.callInThread(self._do_fetch_flags, messages_asked, uid, d) + if PROFILE_CMD: + do_profile_cmd(d, "FETCH-ALL-FLAGS") + return d + + # called in thread + def _do_fetch_flags(self, messages_asked, uid, d): + """ + :param messages_asked: IDs of the messages to retrieve information + about + :type messages_asked: MessageSet + + :param uid: If 1, the IDs are UIDs. They are message sequence IDs + otherwise. + :type uid: int + :param d: deferred whose callback will be called with result. + :type d: Deferred + + :rtype: A tuple of two-tuples of message sequence numbers and + flagsPart + """ class flagsPart(object): def __init__(self, uid, flags): self.uid = uid @@ -678,7 +698,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): all_flags = self._memstore.all_flags(self.mbox) result = ((msgid, flagsPart( msgid, all_flags.get(msgid, tuple()))) for msgid in seq_messg) - return result + self.reactor.callFromThread(d.callback, result) def fetch_headers(self, messages_asked, uid): """ diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py index f23a2344..6206468d 100644 --- a/mail/src/leap/mail/imap/memorystore.py +++ b/mail/src/leap/mail/imap/memorystore.py @@ -364,9 +364,11 @@ class MemoryStore(object): # Update memory store size # XXX this should use [mbox][uid] - key = mbox, uid - self._sizes[key] = size.get_size(self._fdoc_store[key]) + # TODO --- this has to be deferred to thread, # TODO add hdoc and cdocs sizes too + # it's slowing things down here. + #key = mbox, uid + #self._sizes[key] = size.get_size(self._fdoc_store[key]) def purge_fdoc_store(self, mbox): """ @@ -504,12 +506,16 @@ class MemoryStore(object): if key in self._sizes: del self._sizes[key] self._known_uids[mbox].discard(uid) + except KeyError: + pass except Exception as exc: logger.error("error while removing message!") logger.exception(exc) try: with self._fdoc_docid_lock: del self._fdoc_id_store[mbox][uid] + except KeyError: + pass except Exception as exc: logger.error("error while removing message!") logger.exception(exc) @@ -724,17 +730,16 @@ class MemoryStore(object): :type mbox: str or unicode :rtype: dict """ - flags_dict = {} + fdict = {} uids = self.get_uids(mbox) - fdoc_store = self._fdoc_store[mbox] + fstore = self._fdoc_store[mbox] for uid in uids: try: - flags = fdoc_store[uid][fields.FLAGS_KEY] - flags_dict[uid] = flags + fdict[uid] = fstore[uid][fields.FLAGS_KEY] except KeyError: continue - return flags_dict + return fdict def all_headers(self, mbox): """ diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py index fc1ec554..9f7f6e2f 100644 --- a/mail/src/leap/mail/imap/messages.py +++ b/mail/src/leap/mail/imap/messages.py @@ -77,7 +77,7 @@ def try_unique_query(curried): # TODO we could take action, like trigger a background # process to kill dupes. name = getattr(curried, 'expected', 'doc') - logger.debug( + logger.warning( "More than one %s found for this mbox, " "we got a duplicate!!" % (name,)) return query.pop() @@ -683,8 +683,12 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # TODO we would abstract this to a SoledadProperty class - _rdoc_lock = threading.Lock() - _rdoc_property_lock = threading.Lock() + _rdoc_lock = defaultdict(lambda: threading.Lock()) + _rdoc_write_lock = defaultdict(lambda: threading.Lock()) + _rdoc_read_lock = defaultdict(lambda: threading.Lock()) + _rdoc_property_lock = defaultdict(lambda: threading.Lock()) + + _initialized = {} def __init__(self, mbox=None, soledad=None, memstore=None): """ @@ -725,10 +729,16 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): self.memstore = memstore self.__rflags = None - self.initialize_db() - # ensure that we have a recent-flags and a hdocs-sec doc - self._get_or_create_rdoc() + if not self._initialized.get(mbox, False): + try: + self.initialize_db() + # ensure that we have a recent-flags doc + self._get_or_create_rdoc() + except Exception: + logger.debug("Error initializing %r" % (mbox,)) + else: + self._initialized[mbox] = True from twisted.internet import reactor self.reactor = reactor @@ -749,12 +759,14 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): Try to retrieve the recent-flags doc for this MessageCollection, and create one if not found. """ - rdoc = self._get_recent_doc() - if not rdoc: - rdoc = self._get_empty_doc(self.RECENT_DOC) - if self.mbox != fields.INBOX_VAL: - rdoc[fields.MBOX_KEY] = self.mbox - self._soledad.create_doc(rdoc) + # XXX should move this to memstore too + with self._rdoc_write_lock[self.mbox]: + rdoc = self._get_recent_doc_from_soledad() + if rdoc is None: + rdoc = self._get_empty_doc(self.RECENT_DOC) + if self.mbox != fields.INBOX_VAL: + rdoc[fields.MBOX_KEY] = self.mbox + self._soledad.create_doc(rdoc) @deferred_to_thread def _do_parse(self, raw): @@ -972,12 +984,14 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): return self.__rflags if self.memstore is not None: - with self._rdoc_lock: + with self._rdoc_lock[self.mbox]: rflags = self.memstore.get_recent_flags(self.mbox) if not rflags: # not loaded in the memory store yet. # let's fetch them from soledad... - rdoc = self._get_recent_doc() + rdoc = self._get_recent_doc_from_soledad() + if rdoc is None: + return set([]) rflags = set(rdoc.content.get( fields.RECENTFLAGS_KEY, [])) # ...and cache them now. @@ -997,8 +1011,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): _get_recent_flags, _set_recent_flags, doc="Set of UIDs with the recent flag for this mailbox.") - # XXX change naming, indicate soledad query. - def _get_recent_doc(self): + def _get_recent_doc_from_soledad(self): """ Get recent-flags document from Soledad for this mailbox. :rtype: SoledadDocument or None @@ -1008,8 +1021,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): fields.TYPE_MBOX_IDX, fields.TYPE_RECENT_VAL, self.mbox) curried.expected = "rdoc" - rdoc = try_unique_query(curried) - return rdoc + with self._rdoc_read_lock[self.mbox]: + return try_unique_query(curried) # Property-set modification (protected by a different # lock to give atomicity to the read/write operation) @@ -1021,7 +1034,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): :param uids: the uids to unset :type uid: sequence """ - with self._rdoc_property_lock: + with self._rdoc_property_lock[self.mbox]: self.recent_flags.difference_update( set(uids)) @@ -1034,7 +1047,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): :param uid: the uid to unset :type uid: int """ - with self._rdoc_property_lock: + with self._rdoc_property_lock[self.mbox]: self.recent_flags.difference_update( set([uid])) @@ -1046,7 +1059,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): :param uid: the uid to set :type uid: int """ - with self._rdoc_property_lock: + with self._rdoc_property_lock[self.mbox]: self.recent_flags = self.recent_flags.union( set([uid])) diff --git a/mail/src/leap/mail/imap/soledadstore.py b/mail/src/leap/mail/imap/soledadstore.py index 732fe035..25f00bb9 100644 --- a/mail/src/leap/mail/imap/soledadstore.py +++ b/mail/src/leap/mail/imap/soledadstore.py @@ -133,15 +133,14 @@ A lock per document. # Setting this to twice the number of threads in the threadpool # should be safe. put_locks = defaultdict(lambda: threading.Lock()) +mbox_doc_locks = defaultdict(lambda: threading.Lock()) class SoledadStore(ContentDedup): """ This will create docs in the local Soledad database. """ - _soledad_rw_lock = threading.Lock() _remove_lock = threading.Lock() - _mbox_doc_locks = defaultdict(lambda: threading.Lock()) implements(IMessageConsumer, IMessageStore) @@ -282,9 +281,13 @@ class SoledadStore(ContentDedup): def doSoledadCalls(items): # we prime the generator, that should return the # message or flags wrapper item in the first place. - doc_wrapper = items.next() - failed = self._soledad_write_document_parts(items) - queueNotifyBack(failed, doc_wrapper) + try: + doc_wrapper = items.next() + except StopIteration: + pass + else: + failed = self._soledad_write_document_parts(items) + queueNotifyBack(failed, doc_wrapper) doSoledadCalls(self._iter_wrapper_subparts(doc_wrapper)) @@ -309,8 +312,10 @@ class SoledadStore(ContentDedup): try: self._try_call(call, item) except Exception as exc: - logger.debug("ITEM WAS: %s" % str(item)) - logger.debug("ITEM CONTENT WAS: %s" % str(item.content)) + logger.debug("ITEM WAS: %s" % repr(item)) + if hasattr(item, 'content'): + logger.debug("ITEM CONTENT WAS: %s" % + repr(item.content)) logger.exception(exc) failed = True continue @@ -349,6 +354,9 @@ class SoledadStore(ContentDedup): if call == self._PUT_DOC_FUN: doc_id = item.doc_id + if doc_id is None: + logger.warning("BUG! Dirty doc but has no doc_id!") + return with put_locks[doc_id]: doc = self._GET_DOC_FUN(doc_id) @@ -437,12 +445,12 @@ class SoledadStore(ContentDedup): :return: a tuple with recent-flags doc payload and callable :rtype: tuple """ - call = self._CREATE_DOC_FUN + call = self._PUT_DOC_FUN payload = rflags_wrapper.content if payload: logger.debug("Saving RFLAGS to Soledad...") - yield payload, call + yield rflags_wrapper, call # Mbox documents and attributes @@ -456,7 +464,7 @@ class SoledadStore(ContentDedup): the query failed. :rtype: SoledadDocument or None. """ - with self._mbox_doc_locks[mbox]: + with mbox_doc_locks[mbox]: return self._get_mbox_document(mbox) def _get_mbox_document(self, mbox): @@ -471,7 +479,7 @@ class SoledadStore(ContentDedup): return query.pop() else: logger.error("Could not find mbox document for %r" % - (self.mbox,)) + (mbox,)) except Exception as exc: logger.exception("Unhandled error %r" % exc) @@ -496,7 +504,7 @@ class SoledadStore(ContentDedup): :type closed: bool """ leap_assert(isinstance(closed, bool), "closed needs to be boolean") - with self._mbox_doc_locks[mbox]: + with mbox_doc_locks[mbox]: mbox_doc = self._get_mbox_document(mbox) if mbox_doc is None: logger.error( @@ -521,14 +529,18 @@ class SoledadStore(ContentDedup): leap_assert_type(value, int) key = fields.LAST_UID_KEY - # XXX change for a lock related to the mbox document - # itself. - with self._mbox_doc_locks[mbox]: + # XXX use accumulator to reduce number of hits + with mbox_doc_locks[mbox]: mbox_doc = self._get_mbox_document(mbox) old_val = mbox_doc.content[key] if value > old_val: mbox_doc.content[key] = value - self._soledad.put_doc(mbox_doc) + try: + self._soledad.put_doc(mbox_doc) + except Exception as exc: + logger.error("Error while setting last_uid for %r" + % (mbox,)) + logger.exception(exc) def get_flags_doc(self, mbox, uid): """ |