summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/soledadstore.py
diff options
context:
space:
mode:
authorTomás Touceda <chiiph@leap.se>2014-02-17 12:45:19 -0300
committerTomás Touceda <chiiph@leap.se>2014-02-17 12:45:19 -0300
commit32e3c5ddaa5df30a573762e273f3a12f7eb3c255 (patch)
tree0924be918d990628e73eb2059fb6eb1200748b7c /src/leap/mail/imap/soledadstore.py
parent7828c517ae162de4676a71e05f77339598acd6f7 (diff)
parent985ff0a78a8df0eafb7789383f711b9e5ceb1cb6 (diff)
Merge remote-tracking branch 'refs/remotes/kali/bug/separate_deferreds_threads' into develop
Diffstat (limited to 'src/leap/mail/imap/soledadstore.py')
-rw-r--r--src/leap/mail/imap/soledadstore.py369
1 files changed, 245 insertions, 124 deletions
diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py
index 8e22f26..732fe03 100644
--- a/src/leap/mail/imap/soledadstore.py
+++ b/src/leap/mail/imap/soledadstore.py
@@ -20,14 +20,14 @@ A MessageStore that writes to Soledad.
import logging
import threading
+from collections import defaultdict
from itertools import chain
from u1db import errors as u1db_errors
-from twisted.internet import defer
from twisted.python import log
from zope.interface import implements
-from leap.common.check import leap_assert_type
+from leap.common.check import leap_assert_type, leap_assert
from leap.mail.decorators import deferred_to_thread
from leap.mail.imap.messageparts import MessagePartType
from leap.mail.imap.messageparts import MessageWrapper
@@ -35,15 +35,13 @@ from leap.mail.imap.messageparts import RecentFlagsDoc
from leap.mail.imap.fields import fields
from leap.mail.imap.interfaces import IMessageStore
from leap.mail.messageflow import IMessageConsumer
-from leap.mail.utils import first
+from leap.mail.utils import first, empty, accumulator_queue
logger = logging.getLogger(__name__)
# TODO
-# [ ] Delete original message from the incoming queue after all successful
-# writes.
-# [ ] Implement a retry queue.
+# [ ] Implement a retry queue?
# [ ] Consider journaling of operations.
@@ -86,10 +84,12 @@ class ContentDedup(object):
if not header_docs:
return False
- if len(header_docs) != 1:
- logger.warning("Found more than one copy of chash %s!"
- % (chash,))
- logger.debug("Found header doc with that hash! Skipping save!")
+ # FIXME enable only to debug this problem.
+ #if len(header_docs) != 1:
+ #logger.warning("Found more than one copy of chash %s!"
+ #% (chash,))
+
+ #logger.debug("Found header doc with that hash! Skipping save!")
return True
def _content_does_exist(self, doc):
@@ -110,10 +110,11 @@ class ContentDedup(object):
if not attach_docs:
return False
- if len(attach_docs) != 1:
- logger.warning("Found more than one copy of phash %s!"
- % (phash,))
- logger.debug("Found attachment doc with that hash! Skipping save!")
+ # FIXME enable only to debug this problem
+ #if len(attach_docs) != 1:
+ #logger.warning("Found more than one copy of phash %s!"
+ #% (phash,))
+ #logger.debug("Found attachment doc with that hash! Skipping save!")
return True
@@ -121,13 +122,26 @@ class MsgWriteError(Exception):
"""
Raised if any exception is found while saving message parts.
"""
+ pass
+
+
+"""
+A lock per document.
+"""
+# TODO should bound the space of this!!!
+# http://stackoverflow.com/a/2437645/1157664
+# Setting this to twice the number of threads in the threadpool
+# should be safe.
+put_locks = defaultdict(lambda: threading.Lock())
class SoledadStore(ContentDedup):
"""
This will create docs in the local Soledad database.
"""
- _last_uid_lock = threading.Lock()
+ _soledad_rw_lock = threading.Lock()
+ _remove_lock = threading.Lock()
+ _mbox_doc_locks = defaultdict(lambda: threading.Lock())
implements(IMessageConsumer, IMessageStore)
@@ -138,8 +152,20 @@ class SoledadStore(ContentDedup):
:param soledad: the soledad instance
:type soledad: Soledad
"""
+ from twisted.internet import reactor
+ self.reactor = reactor
+
self._soledad = soledad
+ self._CREATE_DOC_FUN = self._soledad.create_doc
+ self._PUT_DOC_FUN = self._soledad.put_doc
+ self._GET_DOC_FUN = self._soledad.get_doc
+
+ # we instantiate an accumulator to batch the notifications
+ self.docs_notify_queue = accumulator_queue(
+ lambda item: reactor.callFromThread(self._unset_new_dirty, item),
+ 20)
+
# IMessageStore
# -------------------------------------------------------------------
@@ -194,47 +220,32 @@ class SoledadStore(ContentDedup):
# IMessageConsumer
- # It's not thread-safe to defer this to a different thread
+ # TODO should handle the delete case
+ # TODO should handle errors better
+ # TODO could generalize this method into a generic consumer
+ # and only implement `process` here
def consume(self, queue):
"""
Creates a new document in soledad db.
- :param queue: queue to get item from, with content of the document
- to be inserted.
- :type queue: Queue
- """
- # TODO should delete the original message from incoming only after
- # the writes are done.
- # TODO should handle the delete case
- # TODO should handle errors
- # TODO could generalize this method into a generic consumer
- # and only implement `process` here
-
- def docWriteCallBack(doc_wrapper):
- """
- Callback for a successful write of a document wrapper.
- """
- if isinstance(doc_wrapper, MessageWrapper):
- # If everything went well, we can unset the new flag
- # in the source store (memory store)
- self._unset_new_dirty(doc_wrapper)
-
- def docWriteErrorBack(failure):
- """
- Errorback for write operations.
- """
- log.error("Error while processing item.")
- log.msg(failure.getTraceBack())
-
- while not queue.empty():
- doc_wrapper = queue.get()
- d = defer.Deferred()
- d.addCallbacks(docWriteCallBack, docWriteErrorBack)
-
- self._consume_doc(doc_wrapper, d)
+ :param queue: a tuple of queues to get item from, with content of the
+ document to be inserted.
+ :type queue: tuple of Queues
+ """
+ new, dirty = queue
+ while not new.empty():
+ doc_wrapper = new.get()
+ self.reactor.callInThread(self._consume_doc, doc_wrapper,
+ self.docs_notify_queue)
+ while not dirty.empty():
+ doc_wrapper = dirty.get()
+ self.reactor.callInThread(self._consume_doc, doc_wrapper,
+ self.docs_notify_queue)
+
+ # Queue empty, flush the notifications queue.
+ self.docs_notify_queue(None, flush=True)
- @deferred_to_thread
def _unset_new_dirty(self, doc_wrapper):
"""
Unset the `new` and `dirty` flags for this document wrapper in the
@@ -243,56 +254,76 @@ class SoledadStore(ContentDedup):
:param doc_wrapper: a MessageWrapper instance
:type doc_wrapper: MessageWrapper
"""
- # XXX debug msg id/mbox?
- logger.info("unsetting new flag!")
- doc_wrapper.new = False
- doc_wrapper.dirty = False
+ if isinstance(doc_wrapper, MessageWrapper):
+ # XXX still needed for debug quite often
+ #logger.info("unsetting new flag!")
+ doc_wrapper.new = False
+ doc_wrapper.dirty = False
@deferred_to_thread
- def _consume_doc(self, doc_wrapper, deferred):
+ def _consume_doc(self, doc_wrapper, notify_queue):
"""
Consume each document wrapper in a separate thread.
+ We pass an instance of an accumulator that handles the notifications
+ to the memorystore when the write has been done.
:param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance
:type doc_wrapper: MessageWrapper or RecentFlagsDoc
- :param deferred: a deferred that will be fired when the write operation
- has finished, either calling its callback or its
- errback depending on whether it succeed.
- :type deferred: Deferred
+ :param notify_queue: a callable that handles the writeback
+ notifications to the memstore.
+ :type notify_queue: callable
"""
- items = self._process(doc_wrapper)
+ def queueNotifyBack(failed, doc_wrapper):
+ if failed:
+ log.msg("There was an error writing the mesage...")
+ else:
+ notify_queue(doc_wrapper)
+
+ def doSoledadCalls(items):
+ # we prime the generator, that should return the
+ # message or flags wrapper item in the first place.
+ doc_wrapper = items.next()
+ failed = self._soledad_write_document_parts(items)
+ queueNotifyBack(failed, doc_wrapper)
- # we prime the generator, that should return the
- # message or flags wrapper item in the first place.
- doc_wrapper = items.next()
+ doSoledadCalls(self._iter_wrapper_subparts(doc_wrapper))
- # From here, we unpack the subpart items and
- # the right soledad call.
+ #
+ # SoledadStore specific methods.
+ #
+
+ def _soledad_write_document_parts(self, items):
+ """
+ Write the document parts to soledad in a separate thread.
+
+ :param items: the iterator through the different document wrappers
+ payloads.
+ :type items: iterator
+ :return: whether the write was successful or not
+ :rtype: bool
+ """
failed = False
for item, call in items:
+ if empty(item):
+ continue
try:
self._try_call(call, item)
except Exception as exc:
- failed = exc
+ logger.debug("ITEM WAS: %s" % str(item))
+ logger.debug("ITEM CONTENT WAS: %s" % str(item.content))
+ logger.exception(exc)
+ failed = True
continue
- if failed:
- deferred.errback(MsgWriteError(
- "There was an error writing the mesage"))
- else:
- deferred.callback(doc_wrapper)
-
- #
- # SoledadStore specific methods.
- #
+ return failed
- def _process(self, doc_wrapper):
+ def _iter_wrapper_subparts(self, doc_wrapper):
"""
Return an iterator that will yield the doc_wrapper in the first place,
followed by the subparts item and the proper call type for every
item in the queue, if any.
- :param queue: the queue from where we'll pick item.
- :type queue: Queue
+ :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance
+ :type doc_wrapper: MessageWrapper or RecentFlagsDoc
"""
if isinstance(doc_wrapper, MessageWrapper):
return chain((doc_wrapper,),
@@ -315,11 +346,38 @@ class SoledadStore(ContentDedup):
"""
if call is None:
return
- try:
- call(item)
- except u1db_errors.RevisionConflict as exc:
- logger.exception("Error: %r" % (exc,))
- raise exc
+
+ if call == self._PUT_DOC_FUN:
+ doc_id = item.doc_id
+ with put_locks[doc_id]:
+ doc = self._GET_DOC_FUN(doc_id)
+
+ if doc is None:
+ logger.warning("BUG! Dirty doc but could not "
+ "find document %s" % (doc_id,))
+ return
+
+ doc.content = dict(item.content)
+
+ item = doc
+ try:
+ call(item)
+ except u1db_errors.RevisionConflict as exc:
+ logger.exception("Error: %r" % (exc,))
+ raise exc
+ except Exception as exc:
+ logger.exception("Error: %r" % (exc,))
+ raise exc
+
+ else:
+ try:
+ call(item)
+ except u1db_errors.RevisionConflict as exc:
+ logger.exception("Error: %r" % (exc,))
+ raise exc
+ except Exception as exc:
+ logger.exception("Error: %r" % (exc,))
+ raise exc
def _get_calls_for_msg_parts(self, msg_wrapper):
"""
@@ -334,7 +392,7 @@ class SoledadStore(ContentDedup):
call = None
if msg_wrapper.new:
- call = self._soledad.create_doc
+ call = self._CREATE_DOC_FUN
# item is expected to be a MessagePartDoc
for item in msg_wrapper.walk():
@@ -353,18 +411,18 @@ class SoledadStore(ContentDedup):
# the flags doc.
elif msg_wrapper.dirty:
- call = self._soledad.put_doc
+ call = self._PUT_DOC_FUN
# item is expected to be a MessagePartDoc
for item in msg_wrapper.walk():
# XXX FIXME Give error if dirty and not doc_id !!!
doc_id = item.doc_id # defend!
if not doc_id:
+ logger.warning("Dirty item but no doc_id!")
continue
- doc = self._soledad.get_doc(doc_id)
- doc.content = dict(item.content)
+
if item.part == MessagePartType.fdoc:
- logger.debug("PUT dirty fdoc")
- yield doc, call
+ #logger.debug("PUT dirty fdoc")
+ yield item, call
# XXX also for linkage-doc !!!
else:
@@ -379,17 +437,16 @@ class SoledadStore(ContentDedup):
:return: a tuple with recent-flags doc payload and callable
:rtype: tuple
"""
- call = self._soledad.put_doc
- rdoc = self._soledad.get_doc(rflags_wrapper.doc_id)
+ call = self._CREATE_DOC_FUN
payload = rflags_wrapper.content
- logger.debug("Saving RFLAGS to Soledad...")
-
if payload:
- rdoc.content = payload
- yield rdoc, call
+ logger.debug("Saving RFLAGS to Soledad...")
+ yield payload, call
- def _get_mbox_document(self, mbox):
+ # Mbox documents and attributes
+
+ def get_mbox_document(self, mbox):
"""
Return mailbox document.
@@ -399,15 +456,80 @@ class SoledadStore(ContentDedup):
the query failed.
:rtype: SoledadDocument or None.
"""
+ with self._mbox_doc_locks[mbox]:
+ return self._get_mbox_document(mbox)
+
+ def _get_mbox_document(self, mbox):
+ """
+ Helper for returning the mailbox document.
+ """
try:
query = self._soledad.get_from_index(
fields.TYPE_MBOX_IDX,
fields.TYPE_MBOX_VAL, mbox)
if query:
return query.pop()
+ else:
+ logger.error("Could not find mbox document for %r" %
+ (self.mbox,))
except Exception as exc:
logger.exception("Unhandled error %r" % exc)
+ def get_mbox_closed(self, mbox):
+ """
+ Return the closed attribute for a given mailbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :rtype: bool
+ """
+ mbox_doc = self.get_mbox_document()
+ return mbox_doc.content.get(fields.CLOSED_KEY, False)
+
+ def set_mbox_closed(self, mbox, closed):
+ """
+ Set the closed attribute for a given mailbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param closed: the value to be set
+ :type closed: bool
+ """
+ leap_assert(isinstance(closed, bool), "closed needs to be boolean")
+ with self._mbox_doc_locks[mbox]:
+ mbox_doc = self._get_mbox_document(mbox)
+ if mbox_doc is None:
+ logger.error(
+ "Could not find mbox document for %r" % (mbox,))
+ return
+ mbox_doc.content[fields.CLOSED_KEY] = closed
+ self._soledad.put_doc(mbox_doc)
+
+ def write_last_uid(self, mbox, value):
+ """
+ Write the `last_uid` integer to the proper mailbox document
+ in Soledad.
+ This is called from the deferred triggered by
+ memorystore.increment_last_soledad_uid, which is expected to
+ run in a separate thread.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param value: the value to set
+ :type value: int
+ """
+ leap_assert_type(value, int)
+ key = fields.LAST_UID_KEY
+
+ # XXX change for a lock related to the mbox document
+ # itself.
+ with self._mbox_doc_locks[mbox]:
+ mbox_doc = self._get_mbox_document(mbox)
+ old_val = mbox_doc.content[key]
+ if value > old_val:
+ mbox_doc.content[key] = value
+ self._soledad.put_doc(mbox_doc)
+
def get_flags_doc(self, mbox, uid):
"""
Return the SoledadDocument for the given mbox and uid.
@@ -416,12 +538,16 @@ class SoledadStore(ContentDedup):
:type mbox: str or unicode
:param uid: the UID for the message
:type uid: int
+ :rtype: SoledadDocument or None
"""
result = None
try:
flag_docs = self._soledad.get_from_index(
fields.TYPE_MBOX_UID_IDX,
fields.TYPE_FLAGS_VAL, mbox, str(uid))
+ if len(flag_docs) != 1:
+ logger.warning("More than one flag doc for %r:%s" %
+ (mbox, uid))
result = first(flag_docs)
except Exception as exc:
# ugh! Something's broken down there!
@@ -430,36 +556,25 @@ class SoledadStore(ContentDedup):
finally:
return result
- def write_last_uid(self, mbox, value):
+ def get_headers_doc(self, chash):
"""
- Write the `last_uid` integer to the proper mailbox document
- in Soledad.
- This is called from the deferred triggered by
- memorystore.increment_last_soledad_uid, which is expected to
- run in a separate thread.
+ Return the document that keeps the headers for a message
+ indexed by its content-hash.
- :param mbox: the mailbox
- :type mbox: str or unicode
- :param value: the value to set
- :type value: int
+ :param chash: the content-hash to retrieve the document from.
+ :type chash: str or unicode
+ :rtype: SoledadDocument or None
"""
- leap_assert_type(value, int)
- key = fields.LAST_UID_KEY
-
- with self._last_uid_lock:
- mbox_doc = self._get_mbox_document(mbox)
- old_val = mbox_doc.content[key]
- if value < old_val:
- logger.error("%r:%s Tried to write a UID lesser than what's "
- "stored!" % (mbox, value))
- mbox_doc.content[key] = value
- self._soledad.put_doc(mbox_doc)
+ head_docs = self._soledad.get_from_index(
+ fields.TYPE_C_HASH_IDX,
+ fields.TYPE_HEADERS_VAL, str(chash))
+ return first(head_docs)
# deleted messages
def deleted_iter(self, mbox):
"""
- Get an iterator for the SoledadDocuments for messages
+ Get an iterator for the the doc_id for SoledadDocuments for messages
with \\Deleted flag for a given mailbox.
:param mbox: the mailbox
@@ -467,11 +582,10 @@ class SoledadStore(ContentDedup):
:return: iterator through deleted message docs
:rtype: iterable
"""
- return (doc for doc in self._soledad.get_from_index(
+ return [doc.doc_id for doc in self._soledad.get_from_index(
fields.TYPE_MBOX_DEL_IDX,
- fields.TYPE_FLAGS_VAL, mbox, '1'))
+ fields.TYPE_FLAGS_VAL, mbox, '1')]
- # TODO can deferToThread this?
def remove_all_deleted(self, mbox):
"""
Remove from Soledad all messages flagged as deleted for a given
@@ -481,7 +595,14 @@ class SoledadStore(ContentDedup):
:type mbox: str or unicode
"""
deleted = []
- for doc in self.deleted_iter(mbox):
- deleted.append(doc.content[fields.UID_KEY])
- self._soledad.delete_doc(doc)
+ for doc_id in self.deleted_iter(mbox):
+ with self._remove_lock:
+ doc = self._soledad.get_doc(doc_id)
+ if doc is not None:
+ self._soledad.delete_doc(doc)
+ try:
+ deleted.append(doc.content[fields.UID_KEY])
+ except TypeError:
+ # empty content
+ pass
return deleted