From f869b7eecab67d07a23dfb8b2931b3844f7523e3 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 11 Feb 2014 01:45:20 -0400 Subject: fine grained locks for puts --- src/leap/mail/imap/messages.py | 35 +++++++++++++++++++++------------ src/leap/mail/imap/soledadstore.py | 40 +++++++++++++++++++++++++++++++++----- 2 files changed, 58 insertions(+), 17 deletions(-) (limited to 'src') diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py index 8b6d3f3..de5dd1f 100644 --- a/src/leap/mail/imap/messages.py +++ b/src/leap/mail/imap/messages.py @@ -88,6 +88,13 @@ def try_unique_query(curried): logger.exception("Unhandled error %r" % exc) +""" +A dictionary that keeps one lock per mbox and uid. +""" +# XXX too much overhead? +fdoc_locks = defaultdict(lambda: defaultdict(lambda: threading.Lock())) + + class LeapMessage(fields, MailParser, MBoxParser): """ The main representation of a message. @@ -102,8 +109,6 @@ class LeapMessage(fields, MailParser, MBoxParser): implements(imap4.IMessage) - flags_lock = threading.Lock() - def __init__(self, soledad, uid, mbox, collection=None, container=None): """ Initializes a LeapMessage. @@ -129,6 +134,9 @@ class LeapMessage(fields, MailParser, MBoxParser): self.__chash = None self.__bdoc = None + from twisted.internet import reactor + self.reactor = reactor + # XXX make these properties public @property @@ -238,20 +246,21 @@ class LeapMessage(fields, MailParser, MBoxParser): :type mode: int """ leap_assert(isinstance(flags, tuple), "flags need to be a tuple") - log.msg('setting flags: %s (%s)' % (self._uid, flags)) + #log.msg('setting flags: %s (%s)' % (self._uid, flags)) - doc = self.fdoc - if not doc: - logger.warning( - "Could not find FDOC for %s:%s while setting flags!" % - (self._mbox, self._uid)) - return + mbox, uid = self._mbox, self._uid APPEND = 1 REMOVE = -1 SET = 0 - with self.flags_lock: + with fdoc_locks[mbox][uid]: + doc = self.fdoc + if not doc: + logger.warning( + "Could not find FDOC for %r:%s while setting flags!" % + (mbox, uid)) + return current = doc.content[self.FLAGS_KEY] if mode == APPEND: newflags = tuple(set(tuple(current) + flags)) @@ -733,6 +742,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # ensure that we have a recent-flags and a hdocs-sec doc self._get_or_create_rdoc() + from twisted.internet import reactor + self.reactor = reactor + def _get_empty_doc(self, _type=FLAGS_DOC): """ Returns an empty doc for storing different message parts. @@ -877,7 +889,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): uid when the adding succeed. :rtype: deferred """ - logger.debug('adding message') + logger.debug('Adding message') if flags is None: flags = tuple() leap_assert_type(flags, tuple) @@ -921,7 +933,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): msg = self.get_msg_by_uid(uid) # TODO this cannot be deferred, this has to block. - #reactor.callLater(0, msg.setFlags, (fields.DELETED_FLAG,), -1) msg.setFlags((fields.DELETED_FLAG,), -1) reactor.callLater(0, observer.callback, uid) return diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index 667e64d..9d19857 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -20,6 +20,7 @@ A MessageStore that writes to Soledad. import logging import threading +from collections import defaultdict from itertools import chain from u1db import errors as u1db_errors @@ -123,6 +124,17 @@ class MsgWriteError(Exception): """ Raised if any exception is found while saving message parts. """ + pass + + +""" +A lock per document. +""" +# TODO should bound the space of this!!! +# http://stackoverflow.com/a/2437645/1157664 +# Setting this to twice the number of threads in the threadpool +# should be safe. +put_locks = defaultdict(lambda: threading.Lock()) class SoledadStore(ContentDedup): @@ -142,6 +154,8 @@ class SoledadStore(ContentDedup): :type soledad: Soledad """ from twisted.internet import reactor + self.reactor = reactor + self._soledad = soledad self._CREATE_DOC_FUN = self._soledad.create_doc @@ -326,9 +340,9 @@ class SoledadStore(ContentDedup): if call is None: return - with self._soledad_rw_lock: - if call == self._PUT_DOC_FUN: - doc_id = item.doc_id + if call == self._PUT_DOC_FUN: + doc_id = item.doc_id + with put_locks[doc_id]: doc = self._GET_DOC_FUN(doc_id) if doc is None: @@ -337,13 +351,26 @@ class SoledadStore(ContentDedup): return doc.content = dict(item.content) + item = doc + try: + call(item) + except u1db_errors.RevisionConflict as exc: + logger.exception("Error: %r" % (exc,)) + raise exc + except Exception as exc: + logger.exception("Error: %r" % (exc,)) + raise exc + else: try: call(item) except u1db_errors.RevisionConflict as exc: logger.exception("Error: %r" % (exc,)) raise exc + except Exception as exc: + logger.exception("Error: %r" % (exc,)) + raise exc def _get_calls_for_msg_parts(self, msg_wrapper): """ @@ -383,10 +410,11 @@ class SoledadStore(ContentDedup): # XXX FIXME Give error if dirty and not doc_id !!! doc_id = item.doc_id # defend! if not doc_id: + logger.warning("Dirty item but no doc_id!") continue if item.part == MessagePartType.fdoc: - logger.debug("PUT dirty fdoc") + #logger.debug("PUT dirty fdoc") yield item, call # XXX also for linkage-doc !!! @@ -443,6 +471,9 @@ class SoledadStore(ContentDedup): flag_docs = self._soledad.get_from_index( fields.TYPE_MBOX_UID_IDX, fields.TYPE_FLAGS_VAL, mbox, str(uid)) + if len(flag_docs) != 1: + logger.warning("More than one flag doc for %r:%s" % + (mbox, uid)) result = first(flag_docs) except Exception as exc: # ugh! Something's broken down there! @@ -506,7 +537,6 @@ class SoledadStore(ContentDedup): fields.TYPE_MBOX_DEL_IDX, fields.TYPE_FLAGS_VAL, mbox, '1')) - # TODO can deferToThread this? def remove_all_deleted(self, mbox): """ Remove from Soledad all messages flagged as deleted for a given -- cgit v1.2.3