From c955c7015b5986af40b2253ac98846f4547e5e00 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 5 Feb 2014 21:40:20 -0400 Subject: lock document retrieval/put --- src/leap/mail/imap/soledadstore.py | 47 +++++++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 18 deletions(-) (limited to 'src') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index 8e22f26..bfa53b6 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -128,6 +128,7 @@ class SoledadStore(ContentDedup): This will create docs in the local Soledad database. """ _last_uid_lock = threading.Lock() + _soledad_rw_lock = threading.Lock() implements(IMessageConsumer, IMessageStore) @@ -140,6 +141,10 @@ class SoledadStore(ContentDedup): """ self._soledad = soledad + self._CREATE_DOC_FUN = self._soledad.create_doc + self._PUT_DOC_FUN = self._soledad.put_doc + self._GET_DOC_FUN = self._soledad.get_doc + # IMessageStore # ------------------------------------------------------------------- @@ -224,7 +229,7 @@ class SoledadStore(ContentDedup): """ Errorback for write operations. """ - log.error("Error while processing item.") + log.msg("ERROR: Error while processing item.") log.msg(failure.getTraceBack()) while not queue.empty(): @@ -234,6 +239,7 @@ class SoledadStore(ContentDedup): self._consume_doc(doc_wrapper, d) + # FIXME this should not run the callback in the deferred thred @deferred_to_thread def _unset_new_dirty(self, doc_wrapper): """ @@ -248,7 +254,8 @@ class SoledadStore(ContentDedup): doc_wrapper.new = False doc_wrapper.dirty = False - @deferred_to_thread + # FIXME this should not run the callback in the deferred thred + #@deferred_to_thread def _consume_doc(self, doc_wrapper, deferred): """ Consume each document wrapper in a separate thread. @@ -273,6 +280,7 @@ class SoledadStore(ContentDedup): try: self._try_call(call, item) except Exception as exc: + logger.exception(exc) failed = exc continue if failed: @@ -315,11 +323,18 @@ class SoledadStore(ContentDedup): """ if call is None: return - try: - call(item) - except u1db_errors.RevisionConflict as exc: - logger.exception("Error: %r" % (exc,)) - raise exc + + with self._soledad_rw_lock: + if call == self._PUT_DOC_FUN: + doc_id = item.doc_id + doc = self._GET_DOC_FUN(doc_id) + doc.content = dict(item.content) + item = doc + try: + call(item) + except u1db_errors.RevisionConflict as exc: + logger.exception("Error: %r" % (exc,)) + raise exc def _get_calls_for_msg_parts(self, msg_wrapper): """ @@ -334,7 +349,7 @@ class SoledadStore(ContentDedup): call = None if msg_wrapper.new: - call = self._soledad.create_doc + call = self._CREATE_DOC_FUN # item is expected to be a MessagePartDoc for item in msg_wrapper.walk(): @@ -353,18 +368,17 @@ class SoledadStore(ContentDedup): # the flags doc. elif msg_wrapper.dirty: - call = self._soledad.put_doc + call = self._PUT_DOC_FUN # item is expected to be a MessagePartDoc for item in msg_wrapper.walk(): # XXX FIXME Give error if dirty and not doc_id !!! doc_id = item.doc_id # defend! if not doc_id: continue - doc = self._soledad.get_doc(doc_id) - doc.content = dict(item.content) + if item.part == MessagePartType.fdoc: logger.debug("PUT dirty fdoc") - yield doc, call + yield item, call # XXX also for linkage-doc !!! else: @@ -379,15 +393,12 @@ class SoledadStore(ContentDedup): :return: a tuple with recent-flags doc payload and callable :rtype: tuple """ - call = self._soledad.put_doc - rdoc = self._soledad.get_doc(rflags_wrapper.doc_id) + call = self._CREATE_DOC_FUN payload = rflags_wrapper.content - logger.debug("Saving RFLAGS to Soledad...") - if payload: - rdoc.content = payload - yield rdoc, call + logger.debug("Saving RFLAGS to Soledad...") + yield payload, call def _get_mbox_document(self, mbox): """ -- cgit v1.2.3