summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/soledadstore.py
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-02-11 01:45:20 -0400
committerKali Kaneko <kali@leap.se>2014-02-17 11:39:48 -0400
commitf869b7eecab67d07a23dfb8b2931b3844f7523e3 (patch)
tree42aded16d0059ffa0636540d0a4b8a9a4f0eb7ed /src/leap/mail/imap/soledadstore.py
parent4338368aa2ba0efaee742e9000e21b81af34d3db (diff)
fine grained locks for puts
Diffstat (limited to 'src/leap/mail/imap/soledadstore.py')
-rw-r--r--src/leap/mail/imap/soledadstore.py40
1 files changed, 35 insertions, 5 deletions
diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py
index 667e64d..9d19857 100644
--- a/src/leap/mail/imap/soledadstore.py
+++ b/src/leap/mail/imap/soledadstore.py
@@ -20,6 +20,7 @@ A MessageStore that writes to Soledad.
import logging
import threading
+from collections import defaultdict
from itertools import chain
from u1db import errors as u1db_errors
@@ -123,6 +124,17 @@ class MsgWriteError(Exception):
"""
Raised if any exception is found while saving message parts.
"""
+ pass
+
+
+"""
+A lock per document.
+"""
+# TODO should bound the space of this!!!
+# http://stackoverflow.com/a/2437645/1157664
+# Setting this to twice the number of threads in the threadpool
+# should be safe.
+put_locks = defaultdict(lambda: threading.Lock())
class SoledadStore(ContentDedup):
@@ -142,6 +154,8 @@ class SoledadStore(ContentDedup):
:type soledad: Soledad
"""
from twisted.internet import reactor
+ self.reactor = reactor
+
self._soledad = soledad
self._CREATE_DOC_FUN = self._soledad.create_doc
@@ -326,9 +340,9 @@ class SoledadStore(ContentDedup):
if call is None:
return
- with self._soledad_rw_lock:
- if call == self._PUT_DOC_FUN:
- doc_id = item.doc_id
+ if call == self._PUT_DOC_FUN:
+ doc_id = item.doc_id
+ with put_locks[doc_id]:
doc = self._GET_DOC_FUN(doc_id)
if doc is None:
@@ -337,13 +351,26 @@ class SoledadStore(ContentDedup):
return
doc.content = dict(item.content)
+
item = doc
+ try:
+ call(item)
+ except u1db_errors.RevisionConflict as exc:
+ logger.exception("Error: %r" % (exc,))
+ raise exc
+ except Exception as exc:
+ logger.exception("Error: %r" % (exc,))
+ raise exc
+ else:
try:
call(item)
except u1db_errors.RevisionConflict as exc:
logger.exception("Error: %r" % (exc,))
raise exc
+ except Exception as exc:
+ logger.exception("Error: %r" % (exc,))
+ raise exc
def _get_calls_for_msg_parts(self, msg_wrapper):
"""
@@ -383,10 +410,11 @@ class SoledadStore(ContentDedup):
# XXX FIXME Give error if dirty and not doc_id !!!
doc_id = item.doc_id # defend!
if not doc_id:
+ logger.warning("Dirty item but no doc_id!")
continue
if item.part == MessagePartType.fdoc:
- logger.debug("PUT dirty fdoc")
+ #logger.debug("PUT dirty fdoc")
yield item, call
# XXX also for linkage-doc !!!
@@ -443,6 +471,9 @@ class SoledadStore(ContentDedup):
flag_docs = self._soledad.get_from_index(
fields.TYPE_MBOX_UID_IDX,
fields.TYPE_FLAGS_VAL, mbox, str(uid))
+ if len(flag_docs) != 1:
+ logger.warning("More than one flag doc for %r:%s" %
+ (mbox, uid))
result = first(flag_docs)
except Exception as exc:
# ugh! Something's broken down there!
@@ -506,7 +537,6 @@ class SoledadStore(ContentDedup):
fields.TYPE_MBOX_DEL_IDX,
fields.TYPE_FLAGS_VAL, mbox, '1'))
- # TODO can deferToThread this?
def remove_all_deleted(self, mbox):
"""
Remove from Soledad all messages flagged as deleted for a given