diff options
| author | Tomás Touceda <chiiph@leap.se> | 2014-02-20 17:11:03 -0300 | 
|---|---|---|
| committer | Tomás Touceda <chiiph@leap.se> | 2014-02-20 17:11:03 -0300 | 
| commit | 9b20cd5042a3f0d351cadefb220b56b0bd6a9205 (patch) | |
| tree | ddcb5c0db87ced01d57e648f85507abcd6215a85 | |
| parent | 81a646792219f33e1331aa179dc6032b32026238 (diff) | |
| parent | cc5e8252a091560d9ea241e846b4a917ac3dc640 (diff) | |
Merge remote-tracking branch 'refs/remotes/kali/feature/speedup-select' into develop
| -rw-r--r-- | mail/src/leap/mail/imap/account.py | 1 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/mailbox.py | 46 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/memorystore.py | 19 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/messages.py | 55 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/soledadstore.py | 44 | 
5 files changed, 108 insertions, 57 deletions
| diff --git a/mail/src/leap/mail/imap/account.py b/mail/src/leap/mail/imap/account.py index 1b5d4a0..ede63d3 100644 --- a/mail/src/leap/mail/imap/account.py +++ b/mail/src/leap/mail/imap/account.py @@ -119,6 +119,7 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):          :rtype: SoledadDocument          """ +        # XXX use soledadstore instead ...;          doc = self._soledad.get_from_index(              self.TYPE_MBOX_IDX, self.MBOX_KEY,              self._parse_mailbox_name(name)) diff --git a/mail/src/leap/mail/imap/mailbox.py b/mail/src/leap/mail/imap/mailbox.py index 57505f0..59b2b40 100644 --- a/mail/src/leap/mail/imap/mailbox.py +++ b/mail/src/leap/mail/imap/mailbox.py @@ -132,13 +132,11 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          :param rw: read-and-write flag for this mailbox          :type rw: int          """ -        logger.debug("Initializing mailbox %r" % (mbox,))          leap_assert(mbox, "Need a mailbox name to initialize")          leap_assert(soledad, "Need a soledad instance to initialize") -        # XXX should move to wrapper -        #leap_assert(isinstance(soledad._db, SQLCipherDatabase), -                    #"soledad._db must be an instance of SQLCipherDatabase") +        from twisted.internet import reactor +        self.reactor = reactor          self.mbox = self._parse_mailbox_name(mbox)          self.rw = rw @@ -149,6 +147,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          self.messages = MessageCollection(              mbox=mbox, soledad=self._soledad, memstore=self._memstore) +        self._uidvalidity = None +          # XXX careful with this get/set (it would be          # hitting db unconditionally, move to memstore too)          # Now it's returning a fixed amount of flags from mem @@ -161,12 +161,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser):              self.prime_last_uid_to_memstore()              self.prime_flag_docs_to_memstore() -        from twisted.internet import reactor -        self.reactor = reactor -          # purge memstore from empty fdocs.          self._memstore.purge_fdoc_store(mbox) -        logger.debug("DONE initializing mailbox %r" % (mbox,))      @property      def listeners(self): @@ -339,8 +335,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          :return: unique validity identifier          :rtype: int          """ -        mbox = self._get_mbox_doc() -        return mbox.content.get(self.CREATED_KEY, 1) +        if self._uidvalidity is None: +            mbox = self._get_mbox_doc() +            self._uidvalidity = mbox.content.get(self.CREATED_KEY, 1) +        return self._uidvalidity      def getUID(self, message):          """ @@ -652,15 +650,37 @@ class SoledadMailbox(WithMsgFields, MBoxParser):                                 about          :type messages_asked: MessageSet -        :param uid: If true, the IDs are UIDs. They are message sequence IDs +        :param uid: If 1, the IDs are UIDs. They are message sequence IDs                      otherwise. -        :type uid: bool +        :type uid: int          :return: A tuple of two-tuples of message sequence numbers and                  flagsPart, which is a only a partial implementation of                  MessagePart.          :rtype: tuple          """ +        d = defer.Deferred() +        self.reactor.callInThread(self._do_fetch_flags, messages_asked, uid, d) +        if PROFILE_CMD: +            do_profile_cmd(d, "FETCH-ALL-FLAGS") +        return d + +    # called in thread +    def _do_fetch_flags(self, messages_asked, uid, d): +        """ +        :param messages_asked: IDs of the messages to retrieve information +                               about +        :type messages_asked: MessageSet + +        :param uid: If 1, the IDs are UIDs. They are message sequence IDs +                    otherwise. +        :type uid: int +        :param d: deferred whose callback will be called with result. +        :type d: Deferred + +        :rtype: A tuple of two-tuples of message sequence numbers and +                flagsPart +        """          class flagsPart(object):              def __init__(self, uid, flags):                  self.uid = uid @@ -678,7 +698,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          all_flags = self._memstore.all_flags(self.mbox)          result = ((msgid, flagsPart(              msgid, all_flags.get(msgid, tuple()))) for msgid in seq_messg) -        return result +        self.reactor.callFromThread(d.callback, result)      def fetch_headers(self, messages_asked, uid):          """ diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py index f23a234..6206468 100644 --- a/mail/src/leap/mail/imap/memorystore.py +++ b/mail/src/leap/mail/imap/memorystore.py @@ -364,9 +364,11 @@ class MemoryStore(object):          # Update memory store size          # XXX this should use [mbox][uid] -        key = mbox, uid -        self._sizes[key] = size.get_size(self._fdoc_store[key]) +        # TODO --- this has to be deferred to thread,          # TODO add hdoc and cdocs sizes too +        # it's slowing things down here. +        #key = mbox, uid +        #self._sizes[key] = size.get_size(self._fdoc_store[key])      def purge_fdoc_store(self, mbox):          """ @@ -504,12 +506,16 @@ class MemoryStore(object):              if key in self._sizes:                  del self._sizes[key]              self._known_uids[mbox].discard(uid) +        except KeyError: +            pass          except Exception as exc:              logger.error("error while removing message!")              logger.exception(exc)          try:              with self._fdoc_docid_lock:                  del self._fdoc_id_store[mbox][uid] +        except KeyError: +            pass          except Exception as exc:              logger.error("error while removing message!")              logger.exception(exc) @@ -724,17 +730,16 @@ class MemoryStore(object):          :type mbox: str or unicode          :rtype: dict          """ -        flags_dict = {} +        fdict = {}          uids = self.get_uids(mbox) -        fdoc_store = self._fdoc_store[mbox] +        fstore = self._fdoc_store[mbox]          for uid in uids:              try: -                flags = fdoc_store[uid][fields.FLAGS_KEY] -                flags_dict[uid] = flags +                fdict[uid] = fstore[uid][fields.FLAGS_KEY]              except KeyError:                  continue -        return flags_dict +        return fdict      def all_headers(self, mbox):          """ diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py index fc1ec55..9f7f6e2 100644 --- a/mail/src/leap/mail/imap/messages.py +++ b/mail/src/leap/mail/imap/messages.py @@ -77,7 +77,7 @@ def try_unique_query(curried):                  # TODO we could take action, like trigger a background                  # process to kill dupes.                  name = getattr(curried, 'expected', 'doc') -                logger.debug( +                logger.warning(                      "More than one %s found for this mbox, "                      "we got a duplicate!!" % (name,))              return query.pop() @@ -683,8 +683,12 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):      # TODO we would abstract this to a SoledadProperty class -    _rdoc_lock = threading.Lock() -    _rdoc_property_lock = threading.Lock() +    _rdoc_lock = defaultdict(lambda: threading.Lock()) +    _rdoc_write_lock = defaultdict(lambda: threading.Lock()) +    _rdoc_read_lock = defaultdict(lambda: threading.Lock()) +    _rdoc_property_lock = defaultdict(lambda: threading.Lock()) + +    _initialized = {}      def __init__(self, mbox=None, soledad=None, memstore=None):          """ @@ -725,10 +729,16 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          self.memstore = memstore          self.__rflags = None -        self.initialize_db() -        # ensure that we have a recent-flags and a hdocs-sec doc -        self._get_or_create_rdoc() +        if not self._initialized.get(mbox, False): +            try: +                self.initialize_db() +                # ensure that we have a recent-flags doc +                self._get_or_create_rdoc() +            except Exception: +                logger.debug("Error initializing %r" % (mbox,)) +            else: +                self._initialized[mbox] = True          from twisted.internet import reactor          self.reactor = reactor @@ -749,12 +759,14 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          Try to retrieve the recent-flags doc for this MessageCollection,          and create one if not found.          """ -        rdoc = self._get_recent_doc() -        if not rdoc: -            rdoc = self._get_empty_doc(self.RECENT_DOC) -            if self.mbox != fields.INBOX_VAL: -                rdoc[fields.MBOX_KEY] = self.mbox -            self._soledad.create_doc(rdoc) +        # XXX should move this to memstore too +        with self._rdoc_write_lock[self.mbox]: +            rdoc = self._get_recent_doc_from_soledad() +            if rdoc is None: +                rdoc = self._get_empty_doc(self.RECENT_DOC) +                if self.mbox != fields.INBOX_VAL: +                    rdoc[fields.MBOX_KEY] = self.mbox +                self._soledad.create_doc(rdoc)      @deferred_to_thread      def _do_parse(self, raw): @@ -972,12 +984,14 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):              return self.__rflags          if self.memstore is not None: -            with self._rdoc_lock: +            with self._rdoc_lock[self.mbox]:                  rflags = self.memstore.get_recent_flags(self.mbox)                  if not rflags:                      # not loaded in the memory store yet.                      # let's fetch them from soledad... -                    rdoc = self._get_recent_doc() +                    rdoc = self._get_recent_doc_from_soledad() +                    if rdoc is None: +                        return set([])                      rflags = set(rdoc.content.get(                          fields.RECENTFLAGS_KEY, []))                      # ...and cache them now. @@ -997,8 +1011,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          _get_recent_flags, _set_recent_flags,          doc="Set of UIDs with the recent flag for this mailbox.") -    # XXX change naming, indicate soledad query. -    def _get_recent_doc(self): +    def _get_recent_doc_from_soledad(self):          """          Get recent-flags document from Soledad for this mailbox.          :rtype: SoledadDocument or None @@ -1008,8 +1021,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):              fields.TYPE_MBOX_IDX,              fields.TYPE_RECENT_VAL, self.mbox)          curried.expected = "rdoc" -        rdoc = try_unique_query(curried) -        return rdoc +        with self._rdoc_read_lock[self.mbox]: +            return try_unique_query(curried)      # Property-set modification (protected by a different      # lock to give atomicity to the read/write operation) @@ -1021,7 +1034,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          :param uids: the uids to unset          :type uid: sequence          """ -        with self._rdoc_property_lock: +        with self._rdoc_property_lock[self.mbox]:              self.recent_flags.difference_update(                  set(uids)) @@ -1034,7 +1047,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          :param uid: the uid to unset          :type uid: int          """ -        with self._rdoc_property_lock: +        with self._rdoc_property_lock[self.mbox]:              self.recent_flags.difference_update(                  set([uid])) @@ -1046,7 +1059,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          :param uid: the uid to set          :type uid: int          """ -        with self._rdoc_property_lock: +        with self._rdoc_property_lock[self.mbox]:              self.recent_flags = self.recent_flags.union(                  set([uid])) diff --git a/mail/src/leap/mail/imap/soledadstore.py b/mail/src/leap/mail/imap/soledadstore.py index 732fe03..25f00bb 100644 --- a/mail/src/leap/mail/imap/soledadstore.py +++ b/mail/src/leap/mail/imap/soledadstore.py @@ -133,15 +133,14 @@ A lock per document.  # Setting this to twice the number of threads in the threadpool  # should be safe.  put_locks = defaultdict(lambda: threading.Lock()) +mbox_doc_locks = defaultdict(lambda: threading.Lock())  class SoledadStore(ContentDedup):      """      This will create docs in the local Soledad database.      """ -    _soledad_rw_lock = threading.Lock()      _remove_lock = threading.Lock() -    _mbox_doc_locks = defaultdict(lambda: threading.Lock())      implements(IMessageConsumer, IMessageStore) @@ -282,9 +281,13 @@ class SoledadStore(ContentDedup):          def doSoledadCalls(items):              # we prime the generator, that should return the              # message or flags wrapper item in the first place. -            doc_wrapper = items.next() -            failed = self._soledad_write_document_parts(items) -            queueNotifyBack(failed, doc_wrapper) +            try: +                doc_wrapper = items.next() +            except StopIteration: +                pass +            else: +                failed = self._soledad_write_document_parts(items) +                queueNotifyBack(failed, doc_wrapper)          doSoledadCalls(self._iter_wrapper_subparts(doc_wrapper)) @@ -309,8 +312,10 @@ class SoledadStore(ContentDedup):              try:                  self._try_call(call, item)              except Exception as exc: -                logger.debug("ITEM WAS: %s" % str(item)) -                logger.debug("ITEM CONTENT WAS: %s" % str(item.content)) +                logger.debug("ITEM WAS: %s" % repr(item)) +                if hasattr(item, 'content'): +                        logger.debug("ITEM CONTENT WAS: %s" % +                                     repr(item.content))                  logger.exception(exc)                  failed = True                  continue @@ -349,6 +354,9 @@ class SoledadStore(ContentDedup):          if call == self._PUT_DOC_FUN:              doc_id = item.doc_id +            if doc_id is None: +                logger.warning("BUG! Dirty doc but has no doc_id!") +                return              with put_locks[doc_id]:                  doc = self._GET_DOC_FUN(doc_id) @@ -437,12 +445,12 @@ class SoledadStore(ContentDedup):          :return: a tuple with recent-flags doc payload and callable          :rtype: tuple          """ -        call = self._CREATE_DOC_FUN +        call = self._PUT_DOC_FUN          payload = rflags_wrapper.content          if payload:              logger.debug("Saving RFLAGS to Soledad...") -            yield payload, call +            yield rflags_wrapper, call      # Mbox documents and attributes @@ -456,7 +464,7 @@ class SoledadStore(ContentDedup):                   the query failed.          :rtype: SoledadDocument or None.          """ -        with self._mbox_doc_locks[mbox]: +        with mbox_doc_locks[mbox]:              return self._get_mbox_document(mbox)      def _get_mbox_document(self, mbox): @@ -471,7 +479,7 @@ class SoledadStore(ContentDedup):                  return query.pop()              else:                  logger.error("Could not find mbox document for %r" % -                            (self.mbox,)) +                            (mbox,))          except Exception as exc:              logger.exception("Unhandled error %r" % exc) @@ -496,7 +504,7 @@ class SoledadStore(ContentDedup):          :type closed: bool          """          leap_assert(isinstance(closed, bool), "closed needs to be boolean") -        with self._mbox_doc_locks[mbox]: +        with mbox_doc_locks[mbox]:              mbox_doc = self._get_mbox_document(mbox)              if mbox_doc is None:                  logger.error( @@ -521,14 +529,18 @@ class SoledadStore(ContentDedup):          leap_assert_type(value, int)          key = fields.LAST_UID_KEY -        # XXX change for a lock related to the mbox document -        # itself. -        with self._mbox_doc_locks[mbox]: +        # XXX use accumulator to reduce number of hits +        with mbox_doc_locks[mbox]:              mbox_doc = self._get_mbox_document(mbox)              old_val = mbox_doc.content[key]              if value > old_val:                  mbox_doc.content[key] = value -                self._soledad.put_doc(mbox_doc) +                try: +                    self._soledad.put_doc(mbox_doc) +                except Exception as exc: +                    logger.error("Error while setting last_uid for %r" +                                 % (mbox,)) +                    logger.exception(exc)      def get_flags_doc(self, mbox, uid):          """ | 
