summaryrefslogtreecommitdiff
path: root/src/leap/mail
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-02-11 01:45:20 -0400
committerKali Kaneko <kali@leap.se>2014-02-17 11:39:48 -0400
commitf869b7eecab67d07a23dfb8b2931b3844f7523e3 (patch)
tree42aded16d0059ffa0636540d0a4b8a9a4f0eb7ed /src/leap/mail
parent4338368aa2ba0efaee742e9000e21b81af34d3db (diff)
fine grained locks for puts
Diffstat (limited to 'src/leap/mail')
-rw-r--r--src/leap/mail/imap/messages.py35
-rw-r--r--src/leap/mail/imap/soledadstore.py40
2 files changed, 58 insertions, 17 deletions
diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py
index 8b6d3f3..de5dd1f 100644
--- a/src/leap/mail/imap/messages.py
+++ b/src/leap/mail/imap/messages.py
@@ -88,6 +88,13 @@ def try_unique_query(curried):
logger.exception("Unhandled error %r" % exc)
+"""
+A dictionary that keeps one lock per mbox and uid.
+"""
+# XXX too much overhead?
+fdoc_locks = defaultdict(lambda: defaultdict(lambda: threading.Lock()))
+
+
class LeapMessage(fields, MailParser, MBoxParser):
"""
The main representation of a message.
@@ -102,8 +109,6 @@ class LeapMessage(fields, MailParser, MBoxParser):
implements(imap4.IMessage)
- flags_lock = threading.Lock()
-
def __init__(self, soledad, uid, mbox, collection=None, container=None):
"""
Initializes a LeapMessage.
@@ -129,6 +134,9 @@ class LeapMessage(fields, MailParser, MBoxParser):
self.__chash = None
self.__bdoc = None
+ from twisted.internet import reactor
+ self.reactor = reactor
+
# XXX make these properties public
@property
@@ -238,20 +246,21 @@ class LeapMessage(fields, MailParser, MBoxParser):
:type mode: int
"""
leap_assert(isinstance(flags, tuple), "flags need to be a tuple")
- log.msg('setting flags: %s (%s)' % (self._uid, flags))
+ #log.msg('setting flags: %s (%s)' % (self._uid, flags))
- doc = self.fdoc
- if not doc:
- logger.warning(
- "Could not find FDOC for %s:%s while setting flags!" %
- (self._mbox, self._uid))
- return
+ mbox, uid = self._mbox, self._uid
APPEND = 1
REMOVE = -1
SET = 0
- with self.flags_lock:
+ with fdoc_locks[mbox][uid]:
+ doc = self.fdoc
+ if not doc:
+ logger.warning(
+ "Could not find FDOC for %r:%s while setting flags!" %
+ (mbox, uid))
+ return
current = doc.content[self.FLAGS_KEY]
if mode == APPEND:
newflags = tuple(set(tuple(current) + flags))
@@ -733,6 +742,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# ensure that we have a recent-flags and a hdocs-sec doc
self._get_or_create_rdoc()
+ from twisted.internet import reactor
+ self.reactor = reactor
+
def _get_empty_doc(self, _type=FLAGS_DOC):
"""
Returns an empty doc for storing different message parts.
@@ -877,7 +889,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
uid when the adding succeed.
:rtype: deferred
"""
- logger.debug('adding message')
+ logger.debug('Adding message')
if flags is None:
flags = tuple()
leap_assert_type(flags, tuple)
@@ -921,7 +933,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
msg = self.get_msg_by_uid(uid)
# TODO this cannot be deferred, this has to block.
- #reactor.callLater(0, msg.setFlags, (fields.DELETED_FLAG,), -1)
msg.setFlags((fields.DELETED_FLAG,), -1)
reactor.callLater(0, observer.callback, uid)
return
diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py
index 667e64d..9d19857 100644
--- a/src/leap/mail/imap/soledadstore.py
+++ b/src/leap/mail/imap/soledadstore.py
@@ -20,6 +20,7 @@ A MessageStore that writes to Soledad.
import logging
import threading
+from collections import defaultdict
from itertools import chain
from u1db import errors as u1db_errors
@@ -123,6 +124,17 @@ class MsgWriteError(Exception):
"""
Raised if any exception is found while saving message parts.
"""
+ pass
+
+
+"""
+A lock per document.
+"""
+# TODO should bound the space of this!!!
+# http://stackoverflow.com/a/2437645/1157664
+# Setting this to twice the number of threads in the threadpool
+# should be safe.
+put_locks = defaultdict(lambda: threading.Lock())
class SoledadStore(ContentDedup):
@@ -142,6 +154,8 @@ class SoledadStore(ContentDedup):
:type soledad: Soledad
"""
from twisted.internet import reactor
+ self.reactor = reactor
+
self._soledad = soledad
self._CREATE_DOC_FUN = self._soledad.create_doc
@@ -326,9 +340,9 @@ class SoledadStore(ContentDedup):
if call is None:
return
- with self._soledad_rw_lock:
- if call == self._PUT_DOC_FUN:
- doc_id = item.doc_id
+ if call == self._PUT_DOC_FUN:
+ doc_id = item.doc_id
+ with put_locks[doc_id]:
doc = self._GET_DOC_FUN(doc_id)
if doc is None:
@@ -337,13 +351,26 @@ class SoledadStore(ContentDedup):
return
doc.content = dict(item.content)
+
item = doc
+ try:
+ call(item)
+ except u1db_errors.RevisionConflict as exc:
+ logger.exception("Error: %r" % (exc,))
+ raise exc
+ except Exception as exc:
+ logger.exception("Error: %r" % (exc,))
+ raise exc
+ else:
try:
call(item)
except u1db_errors.RevisionConflict as exc:
logger.exception("Error: %r" % (exc,))
raise exc
+ except Exception as exc:
+ logger.exception("Error: %r" % (exc,))
+ raise exc
def _get_calls_for_msg_parts(self, msg_wrapper):
"""
@@ -383,10 +410,11 @@ class SoledadStore(ContentDedup):
# XXX FIXME Give error if dirty and not doc_id !!!
doc_id = item.doc_id # defend!
if not doc_id:
+ logger.warning("Dirty item but no doc_id!")
continue
if item.part == MessagePartType.fdoc:
- logger.debug("PUT dirty fdoc")
+ #logger.debug("PUT dirty fdoc")
yield item, call
# XXX also for linkage-doc !!!
@@ -443,6 +471,9 @@ class SoledadStore(ContentDedup):
flag_docs = self._soledad.get_from_index(
fields.TYPE_MBOX_UID_IDX,
fields.TYPE_FLAGS_VAL, mbox, str(uid))
+ if len(flag_docs) != 1:
+ logger.warning("More than one flag doc for %r:%s" %
+ (mbox, uid))
result = first(flag_docs)
except Exception as exc:
# ugh! Something's broken down there!
@@ -506,7 +537,6 @@ class SoledadStore(ContentDedup):
fields.TYPE_MBOX_DEL_IDX,
fields.TYPE_FLAGS_VAL, mbox, '1'))
- # TODO can deferToThread this?
def remove_all_deleted(self, mbox):
"""
Remove from Soledad all messages flagged as deleted for a given