summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-02-05 21:40:20 -0400
committerKali Kaneko <kali@leap.se>2014-02-17 11:37:03 -0400
commitc955c7015b5986af40b2253ac98846f4547e5e00 (patch)
tree479d94eba2a66ee0472a241cc8b65667cbb6916d /src
parent12ffea333922d99ee7f7b4ab2cd46cfcec6a0d05 (diff)
lock document retrieval/put
Diffstat (limited to 'src')
-rw-r--r--src/leap/mail/imap/soledadstore.py47
1 files changed, 29 insertions, 18 deletions
diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py
index 8e22f26..bfa53b6 100644
--- a/src/leap/mail/imap/soledadstore.py
+++ b/src/leap/mail/imap/soledadstore.py
@@ -128,6 +128,7 @@ class SoledadStore(ContentDedup):
This will create docs in the local Soledad database.
"""
_last_uid_lock = threading.Lock()
+ _soledad_rw_lock = threading.Lock()
implements(IMessageConsumer, IMessageStore)
@@ -140,6 +141,10 @@ class SoledadStore(ContentDedup):
"""
self._soledad = soledad
+ self._CREATE_DOC_FUN = self._soledad.create_doc
+ self._PUT_DOC_FUN = self._soledad.put_doc
+ self._GET_DOC_FUN = self._soledad.get_doc
+
# IMessageStore
# -------------------------------------------------------------------
@@ -224,7 +229,7 @@ class SoledadStore(ContentDedup):
"""
Errorback for write operations.
"""
- log.error("Error while processing item.")
+ log.msg("ERROR: Error while processing item.")
log.msg(failure.getTraceBack())
while not queue.empty():
@@ -234,6 +239,7 @@ class SoledadStore(ContentDedup):
self._consume_doc(doc_wrapper, d)
+ # FIXME this should not run the callback in the deferred thred
@deferred_to_thread
def _unset_new_dirty(self, doc_wrapper):
"""
@@ -248,7 +254,8 @@ class SoledadStore(ContentDedup):
doc_wrapper.new = False
doc_wrapper.dirty = False
- @deferred_to_thread
+ # FIXME this should not run the callback in the deferred thred
+ #@deferred_to_thread
def _consume_doc(self, doc_wrapper, deferred):
"""
Consume each document wrapper in a separate thread.
@@ -273,6 +280,7 @@ class SoledadStore(ContentDedup):
try:
self._try_call(call, item)
except Exception as exc:
+ logger.exception(exc)
failed = exc
continue
if failed:
@@ -315,11 +323,18 @@ class SoledadStore(ContentDedup):
"""
if call is None:
return
- try:
- call(item)
- except u1db_errors.RevisionConflict as exc:
- logger.exception("Error: %r" % (exc,))
- raise exc
+
+ with self._soledad_rw_lock:
+ if call == self._PUT_DOC_FUN:
+ doc_id = item.doc_id
+ doc = self._GET_DOC_FUN(doc_id)
+ doc.content = dict(item.content)
+ item = doc
+ try:
+ call(item)
+ except u1db_errors.RevisionConflict as exc:
+ logger.exception("Error: %r" % (exc,))
+ raise exc
def _get_calls_for_msg_parts(self, msg_wrapper):
"""
@@ -334,7 +349,7 @@ class SoledadStore(ContentDedup):
call = None
if msg_wrapper.new:
- call = self._soledad.create_doc
+ call = self._CREATE_DOC_FUN
# item is expected to be a MessagePartDoc
for item in msg_wrapper.walk():
@@ -353,18 +368,17 @@ class SoledadStore(ContentDedup):
# the flags doc.
elif msg_wrapper.dirty:
- call = self._soledad.put_doc
+ call = self._PUT_DOC_FUN
# item is expected to be a MessagePartDoc
for item in msg_wrapper.walk():
# XXX FIXME Give error if dirty and not doc_id !!!
doc_id = item.doc_id # defend!
if not doc_id:
continue
- doc = self._soledad.get_doc(doc_id)
- doc.content = dict(item.content)
+
if item.part == MessagePartType.fdoc:
logger.debug("PUT dirty fdoc")
- yield doc, call
+ yield item, call
# XXX also for linkage-doc !!!
else:
@@ -379,15 +393,12 @@ class SoledadStore(ContentDedup):
:return: a tuple with recent-flags doc payload and callable
:rtype: tuple
"""
- call = self._soledad.put_doc
- rdoc = self._soledad.get_doc(rflags_wrapper.doc_id)
+ call = self._CREATE_DOC_FUN
payload = rflags_wrapper.content
- logger.debug("Saving RFLAGS to Soledad...")
-
if payload:
- rdoc.content = payload
- yield rdoc, call
+ logger.debug("Saving RFLAGS to Soledad...")
+ yield payload, call
def _get_mbox_document(self, mbox):
"""