diff options
Diffstat (limited to 'mail/src')
| -rw-r--r-- | mail/src/leap/mail/imap/messages.py | 35 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/soledadstore.py | 40 | 
2 files changed, 58 insertions, 17 deletions
| diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py index 8b6d3f3..de5dd1f 100644 --- a/mail/src/leap/mail/imap/messages.py +++ b/mail/src/leap/mail/imap/messages.py @@ -88,6 +88,13 @@ def try_unique_query(curried):          logger.exception("Unhandled error %r" % exc) +""" +A dictionary that keeps one lock per mbox and uid. +""" +# XXX too much overhead? +fdoc_locks = defaultdict(lambda: defaultdict(lambda: threading.Lock())) + +  class LeapMessage(fields, MailParser, MBoxParser):      """      The main representation of a message. @@ -102,8 +109,6 @@ class LeapMessage(fields, MailParser, MBoxParser):      implements(imap4.IMessage) -    flags_lock = threading.Lock() -      def __init__(self, soledad, uid, mbox, collection=None, container=None):          """          Initializes a LeapMessage. @@ -129,6 +134,9 @@ class LeapMessage(fields, MailParser, MBoxParser):          self.__chash = None          self.__bdoc = None +        from twisted.internet import reactor +        self.reactor = reactor +      # XXX make these properties public      @property @@ -238,20 +246,21 @@ class LeapMessage(fields, MailParser, MBoxParser):          :type mode: int          """          leap_assert(isinstance(flags, tuple), "flags need to be a tuple") -        log.msg('setting flags: %s (%s)' % (self._uid, flags)) +        #log.msg('setting flags: %s (%s)' % (self._uid, flags)) -        doc = self.fdoc -        if not doc: -            logger.warning( -                "Could not find FDOC for %s:%s while setting flags!" % -                (self._mbox, self._uid)) -            return +        mbox, uid = self._mbox, self._uid          APPEND = 1          REMOVE = -1          SET = 0 -        with self.flags_lock: +        with fdoc_locks[mbox][uid]: +            doc = self.fdoc +            if not doc: +                logger.warning( +                    "Could not find FDOC for %r:%s while setting flags!" % +                    (mbox, uid)) +                return              current = doc.content[self.FLAGS_KEY]              if mode == APPEND:                  newflags = tuple(set(tuple(current) + flags)) @@ -733,6 +742,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          # ensure that we have a recent-flags and a hdocs-sec doc          self._get_or_create_rdoc() +        from twisted.internet import reactor +        self.reactor = reactor +      def _get_empty_doc(self, _type=FLAGS_DOC):          """          Returns an empty doc for storing different message parts. @@ -877,7 +889,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):                   uid when the adding succeed.          :rtype: deferred          """ -        logger.debug('adding message') +        logger.debug('Adding message')          if flags is None:              flags = tuple()          leap_assert_type(flags, tuple) @@ -921,7 +933,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):              msg = self.get_msg_by_uid(uid)              # TODO this cannot be deferred, this has to block. -            #reactor.callLater(0, msg.setFlags, (fields.DELETED_FLAG,), -1)              msg.setFlags((fields.DELETED_FLAG,), -1)              reactor.callLater(0, observer.callback, uid)              return diff --git a/mail/src/leap/mail/imap/soledadstore.py b/mail/src/leap/mail/imap/soledadstore.py index 667e64d..9d19857 100644 --- a/mail/src/leap/mail/imap/soledadstore.py +++ b/mail/src/leap/mail/imap/soledadstore.py @@ -20,6 +20,7 @@ A MessageStore that writes to Soledad.  import logging  import threading +from collections import defaultdict  from itertools import chain  from u1db import errors as u1db_errors @@ -123,6 +124,17 @@ class MsgWriteError(Exception):      """      Raised if any exception is found while saving message parts.      """ +    pass + + +""" +A lock per document. +""" +# TODO should bound the space of this!!! +# http://stackoverflow.com/a/2437645/1157664 +# Setting this to twice the number of threads in the threadpool +# should be safe. +put_locks = defaultdict(lambda: threading.Lock())  class SoledadStore(ContentDedup): @@ -142,6 +154,8 @@ class SoledadStore(ContentDedup):          :type soledad: Soledad          """          from twisted.internet import reactor +        self.reactor = reactor +          self._soledad = soledad          self._CREATE_DOC_FUN = self._soledad.create_doc @@ -326,9 +340,9 @@ class SoledadStore(ContentDedup):          if call is None:              return -        with self._soledad_rw_lock: -            if call == self._PUT_DOC_FUN: -                doc_id = item.doc_id +        if call == self._PUT_DOC_FUN: +            doc_id = item.doc_id +            with put_locks[doc_id]:                  doc = self._GET_DOC_FUN(doc_id)                  if doc is None: @@ -337,13 +351,26 @@ class SoledadStore(ContentDedup):                      return                  doc.content = dict(item.content) +                  item = doc +                try: +                    call(item) +                except u1db_errors.RevisionConflict as exc: +                    logger.exception("Error: %r" % (exc,)) +                    raise exc +                except Exception as exc: +                    logger.exception("Error: %r" % (exc,)) +                    raise exc +        else:              try:                  call(item)              except u1db_errors.RevisionConflict as exc:                  logger.exception("Error: %r" % (exc,))                  raise exc +            except Exception as exc: +                logger.exception("Error: %r" % (exc,)) +                raise exc      def _get_calls_for_msg_parts(self, msg_wrapper):          """ @@ -383,10 +410,11 @@ class SoledadStore(ContentDedup):                  # XXX FIXME Give error if dirty and not doc_id !!!                  doc_id = item.doc_id  # defend!                  if not doc_id: +                    logger.warning("Dirty item but no doc_id!")                      continue                  if item.part == MessagePartType.fdoc: -                    logger.debug("PUT dirty fdoc") +                    #logger.debug("PUT dirty fdoc")                      yield item, call                  # XXX also for linkage-doc !!! @@ -443,6 +471,9 @@ class SoledadStore(ContentDedup):              flag_docs = self._soledad.get_from_index(                  fields.TYPE_MBOX_UID_IDX,                  fields.TYPE_FLAGS_VAL, mbox, str(uid)) +            if len(flag_docs) != 1: +                logger.warning("More than one flag doc for %r:%s" % +                               (mbox, uid))              result = first(flag_docs)          except Exception as exc:              # ugh! Something's broken down there! @@ -506,7 +537,6 @@ class SoledadStore(ContentDedup):                  fields.TYPE_MBOX_DEL_IDX,                  fields.TYPE_FLAGS_VAL, mbox, '1')) -    # TODO can deferToThread this?      def remove_all_deleted(self, mbox):          """          Remove from Soledad all messages flagged as deleted for a given | 
