diff options
| author | Kali Kaneko <kali@leap.se> | 2014-02-05 21:40:20 -0400 | 
|---|---|---|
| committer | Kali Kaneko <kali@leap.se> | 2014-02-17 11:37:03 -0400 | 
| commit | 354dbdff54c136a54d11e24ea7cfc88f360a4a50 (patch) | |
| tree | b126fba2a434535f4fbae5c4f7dacf88e980d566 | |
| parent | 44263b4aceb2b828b9823055a95c83d0e439042d (diff) | |
lock document retrieval/put
| -rw-r--r-- | mail/src/leap/mail/imap/soledadstore.py | 47 | 
1 files changed, 29 insertions, 18 deletions
| diff --git a/mail/src/leap/mail/imap/soledadstore.py b/mail/src/leap/mail/imap/soledadstore.py index 8e22f26..bfa53b6 100644 --- a/mail/src/leap/mail/imap/soledadstore.py +++ b/mail/src/leap/mail/imap/soledadstore.py @@ -128,6 +128,7 @@ class SoledadStore(ContentDedup):      This will create docs in the local Soledad database.      """      _last_uid_lock = threading.Lock() +    _soledad_rw_lock = threading.Lock()      implements(IMessageConsumer, IMessageStore) @@ -140,6 +141,10 @@ class SoledadStore(ContentDedup):          """          self._soledad = soledad +        self._CREATE_DOC_FUN = self._soledad.create_doc +        self._PUT_DOC_FUN = self._soledad.put_doc +        self._GET_DOC_FUN = self._soledad.get_doc +      # IMessageStore      # ------------------------------------------------------------------- @@ -224,7 +229,7 @@ class SoledadStore(ContentDedup):              """              Errorback for write operations.              """ -            log.error("Error while processing item.") +            log.msg("ERROR: Error while processing item.")              log.msg(failure.getTraceBack())          while not queue.empty(): @@ -234,6 +239,7 @@ class SoledadStore(ContentDedup):              self._consume_doc(doc_wrapper, d) +    # FIXME this should not run the callback in the deferred thred      @deferred_to_thread      def _unset_new_dirty(self, doc_wrapper):          """ @@ -248,7 +254,8 @@ class SoledadStore(ContentDedup):          doc_wrapper.new = False          doc_wrapper.dirty = False -    @deferred_to_thread +    # FIXME this should not run the callback in the deferred thred +    #@deferred_to_thread      def _consume_doc(self, doc_wrapper, deferred):          """          Consume each document wrapper in a separate thread. @@ -273,6 +280,7 @@ class SoledadStore(ContentDedup):              try:                  self._try_call(call, item)              except Exception as exc: +                logger.exception(exc)                  failed = exc                  continue          if failed: @@ -315,11 +323,18 @@ class SoledadStore(ContentDedup):          """          if call is None:              return -        try: -            call(item) -        except u1db_errors.RevisionConflict as exc: -            logger.exception("Error: %r" % (exc,)) -            raise exc + +        with self._soledad_rw_lock: +            if call == self._PUT_DOC_FUN: +                doc_id = item.doc_id +                doc = self._GET_DOC_FUN(doc_id) +                doc.content = dict(item.content) +                item = doc +            try: +                call(item) +            except u1db_errors.RevisionConflict as exc: +                logger.exception("Error: %r" % (exc,)) +                raise exc      def _get_calls_for_msg_parts(self, msg_wrapper):          """ @@ -334,7 +349,7 @@ class SoledadStore(ContentDedup):          call = None          if msg_wrapper.new: -            call = self._soledad.create_doc +            call = self._CREATE_DOC_FUN              # item is expected to be a MessagePartDoc              for item in msg_wrapper.walk(): @@ -353,18 +368,17 @@ class SoledadStore(ContentDedup):          # the flags doc.          elif msg_wrapper.dirty: -            call = self._soledad.put_doc +            call = self._PUT_DOC_FUN              # item is expected to be a MessagePartDoc              for item in msg_wrapper.walk():                  # XXX FIXME Give error if dirty and not doc_id !!!                  doc_id = item.doc_id  # defend!                  if not doc_id:                      continue -                doc = self._soledad.get_doc(doc_id) -                doc.content = dict(item.content) +                  if item.part == MessagePartType.fdoc:                      logger.debug("PUT dirty fdoc") -                    yield doc, call +                    yield item, call                  # XXX also for linkage-doc !!!          else: @@ -379,15 +393,12 @@ class SoledadStore(ContentDedup):          :return: a tuple with recent-flags doc payload and callable          :rtype: tuple          """ -        call = self._soledad.put_doc -        rdoc = self._soledad.get_doc(rflags_wrapper.doc_id) +        call = self._CREATE_DOC_FUN          payload = rflags_wrapper.content -        logger.debug("Saving RFLAGS to Soledad...") -          if payload: -            rdoc.content = payload -            yield rdoc, call +            logger.debug("Saving RFLAGS to Soledad...") +            yield payload, call      def _get_mbox_document(self, mbox):          """ | 
