summaryrefslogtreecommitdiff
path: root/src/leap/mail/adaptors
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/mail/adaptors')
-rw-r--r--src/leap/mail/adaptors/soledad.py183
-rw-r--r--src/leap/mail/adaptors/soledad_indexes.py15
-rw-r--r--src/leap/mail/adaptors/tests/test_soledad_adaptor.py7
3 files changed, 151 insertions, 54 deletions
diff --git a/src/leap/mail/adaptors/soledad.py b/src/leap/mail/adaptors/soledad.py
index c5cfce0..d99f677 100644
--- a/src/leap/mail/adaptors/soledad.py
+++ b/src/leap/mail/adaptors/soledad.py
@@ -23,7 +23,6 @@ from functools import partial
from pycryptopp.hash import sha256
from twisted.internet import defer
-from twisted.python import util
from zope.interface import implements
from leap.common.check import leap_assert, leap_assert_type
@@ -56,6 +55,14 @@ class DuplicatedDocumentError(Exception):
pass
+def cleanup_deferred_locks():
+ """
+ Need to use this from within trial to cleanup the reactor before
+ each run.
+ """
+ SoledadDocumentWrapper._k_locks = defaultdict(defer.DeferredLock)
+
+
class SoledadDocumentWrapper(models.DocumentWrapper):
"""
A Wrapper object that can be manipulated, passed around, and serialized in
@@ -526,7 +533,7 @@ class MessageWrapper(object):
This method should only be used before the Documents for the
MessageWrapper have been created, will raise otherwise.
"""
- mbox_uuid = mbox.uuid.replace('-', '_')
+ mbox_uuid = mbox_uuid.replace('-', '_')
self.mdoc.set_mbox_uuid(mbox_uuid)
self.fdoc.set_mbox_uuid(mbox_uuid)
@@ -536,6 +543,9 @@ class MessageWrapper(object):
flags = tuple()
leap_assert_type(flags, tuple)
self.fdoc.flags = list(flags)
+ self.fdoc.deleted = "\\Deleted" in flags
+ self.fdoc.seen = "\\Seen" in flags
+ self.fdoc.recent = "\\Recent" in flags
def set_tags(self, tags):
# TODO serialize the get + update
@@ -593,26 +603,30 @@ class MailboxWrapper(SoledadDocumentWrapper):
# Soledad Adaptor
#
-# TODO make this an interface?
class SoledadIndexMixin(object):
"""
- this will need a class attribute `indexes`, that is a dictionary containing
+ This will need a class attribute `indexes`, that is a dictionary containing
the index definitions for the underlying u1db store underlying soledad.
It needs to be in the following format:
{'index-name': ['field1', 'field2']}
+
+ You can also add a class attribute `wait_for_indexes` to any class
+ inheriting from this Mixin, that should be a list of strings representing
+ the methods that need to wait until the indexes have been initialized
+ before being able to work properly.
"""
+ # TODO move this mixin to soledad itself
+ # so that each application can pass a set of indexes for their data model.
+
# TODO could have a wrapper class for indexes, supporting introspection
# and __getattr__
- indexes = {}
- store_ready = False
- _index_creation_deferreds = []
+ # TODO make this an interface?
- # TODO we might want to move this logic to soledad itself
- # so that each application can pass a set of indexes for their data model.
- # TODO check also the decorator used in keymanager for waiting for indexes
- # to be ready.
+ indexes = {}
+ wait_for_indexes = []
+ store_ready = False
def initialize_store(self, store):
"""
@@ -626,47 +640,81 @@ class SoledadIndexMixin(object):
# TODO I think we *should* get another deferredLock in here, but
# global to the soledad namespace, to protect from several points
# initializing soledad indexes at the same time.
+ self._wait_for_indexes()
- leap_assert(store, "Need a store")
- leap_assert_type(self.indexes, dict)
+ d = self._init_indexes(store)
+ d.addCallback(self._restore_waiting_methods)
+ return d
- self._index_creation_deferreds = []
+ def _init_indexes(self, store):
+ """
+ Initialize the database indexes.
+ """
+ leap_assert(store, "Cannot init indexes with null soledad")
+ leap_assert_type(self.indexes, dict)
def _create_index(name, expression):
return store.create_index(name, *expression)
- def _create_indexes(db_indexes):
- db_indexes = dict(db_indexes)
-
+ def init_idexes(indexes):
+ deferreds = []
+ db_indexes = dict(indexes)
+ # Loop through the indexes we expect to find.
for name, expression in self.indexes.items():
if name not in db_indexes:
# The index does not yet exist.
d = _create_index(name, expression)
- self._index_creation_deferreds.append(d)
- continue
-
- if expression == db_indexes[name]:
- # The index exists and is up to date.
- continue
- # The index exists but the definition is not what expected, so
- # we delete it and add the proper index expression.
- d1 = store.delete_index(name)
- d1.addCallback(lambda _: _create_index(name, expression))
- self._index_creation_deferreds.append(d1)
-
- all_created = defer.gatherResults(
- self._index_creation_deferreds, consumeErrors=True)
- all_created.addCallback(_on_indexes_created)
- return all_created
-
- def _on_indexes_created(ignored):
+ deferreds.append(d)
+ elif expression != db_indexes[name]:
+ # The index exists but the definition is not what expected,
+ # so we delete it and add the proper index expression.
+ d = store.delete_index(name)
+ d.addCallback(
+ lambda _: _create_index(name, *expression))
+ deferreds.append(d)
+ return defer.gatherResults(deferreds, consumeErrors=True)
+
+ def store_ready(whatever):
self.store_ready = True
+ return whatever
- # Ask the database for currently existing indexes, and create them
- # if not found.
- d = store.list_indexes()
- d.addCallback(_create_indexes)
- return d
+ self.deferred_indexes = store.list_indexes()
+ self.deferred_indexes.addCallback(init_idexes)
+ self.deferred_indexes.addCallback(store_ready)
+ return self.deferred_indexes
+
+ def _wait_for_indexes(self):
+ """
+ Make the marked methods to wait for the indexes to be ready.
+ Heavily based on
+ http://blogs.fluidinfo.com/terry/2009/05/11/a-mixin-class-allowing-python-__init__-methods-to-work-with-twisted-deferreds/
+
+ :param methods: methods that need to wait for the indexes to be ready
+ :type methods: tuple(str)
+ """
+ leap_assert_type(self.wait_for_indexes, list)
+ methods = self.wait_for_indexes
+
+ self.waiting = []
+ self.stored = {}
+
+ def makeWrapper(method):
+ def wrapper(*args, **kw):
+ d = defer.Deferred()
+ d.addCallback(lambda _: self.stored[method](*args, **kw))
+ self.waiting.append(d)
+ return d
+ return wrapper
+
+ for method in methods:
+ self.stored[method] = getattr(self, method)
+ setattr(self, method, makeWrapper(method))
+
+ def _restore_waiting_methods(self, _):
+ for method in self.stored:
+ setattr(self, method, self.stored[method])
+ for d in self.waiting:
+ d.callback(None)
class SoledadMailAdaptor(SoledadIndexMixin):
@@ -675,8 +723,18 @@ class SoledadMailAdaptor(SoledadIndexMixin):
store = None
indexes = indexes.MAIL_INDEXES
+ wait_for_indexes = ['get_or_create_mbox', 'update_mbox', 'get_all_mboxes']
+
mboxwrapper_klass = MailboxWrapper
+ def __init__(self):
+ SoledadIndexMixin.__init__(self)
+
+ mboxwrapper_klass = MailboxWrapper
+
+ def __init__(self):
+ SoledadIndexMixin.__init__(self)
+
# Message handling
def get_msg_from_string(self, MessageClass, raw_msg):
@@ -762,10 +820,10 @@ class SoledadMailAdaptor(SoledadIndexMixin):
chash = re.findall(constants.METAMSGID_CHASH_RE, mdoc_id)[0]
def _get_fdoc_id_from_mdoc_id():
- return constants.FDOCID.format(mbox=mbox, chash=chash)
+ return constants.FDOCID.format(mbox_uuid=mbox, chash=chash)
def _get_hdoc_id_from_mdoc_id():
- return constants.HDOCID.format(mbox=mbox, chash=chash)
+ return constants.HDOCID.format(mbox_uuid=mbox, chash=chash)
d_docs = []
fdoc_id = _get_fdoc_id_from_mdoc_id()
@@ -816,6 +874,47 @@ class SoledadMailAdaptor(SoledadIndexMixin):
wrapper = msg.get_wrapper()
return wrapper.update(store)
+ # batch deletion
+
+ def del_all_flagged_messages(self, store, mbox_uuid):
+ """
+ Delete all messages flagged as deleted.
+ """
+ def err(f):
+ print "ERROR GETTING FROM INDEX"
+ f.printTraceback()
+
+ def delete_fdoc_and_mdoc_flagged(fdocs):
+ # low level here, not using the wrappers...
+ # get meta doc ids from the flag doc ids
+ fdoc_ids = [doc.doc_id for doc in fdocs]
+ mdoc_ids = map(lambda s: "M" + s[1:], fdoc_ids)
+
+ def delete_all_docs(mdocs, fdocs):
+ mdocs = list(mdocs)
+ doc_ids = [m.doc_id for m in mdocs]
+ _d = []
+ docs = mdocs + fdocs
+ for doc in docs:
+ _d.append(store.delete_doc(doc))
+ d = defer.gatherResults(_d)
+ # return the mdocs ids only
+ d.addCallback(lambda _: doc_ids)
+ return d
+
+ d = store.get_docs(mdoc_ids)
+ d.addCallback(delete_all_docs, fdocs)
+ d.addErrback(err)
+ return d
+
+ type_ = FlagsDocWrapper.model.type_
+ uuid = mbox_uuid.replace('-', '_')
+ deleted_index = indexes.TYPE_MBOX_DEL_IDX
+
+ d = store.get_from_index(deleted_index, type_, uuid, "1")
+ d.addCallbacks(delete_fdoc_and_mdoc_flagged, err)
+ return d
+
# Mailbox handling
def get_or_create_mbox(self, store, name):
diff --git a/src/leap/mail/adaptors/soledad_indexes.py b/src/leap/mail/adaptors/soledad_indexes.py
index f3e990d..856dfb4 100644
--- a/src/leap/mail/adaptors/soledad_indexes.py
+++ b/src/leap/mail/adaptors/soledad_indexes.py
@@ -25,6 +25,7 @@ Soledad Indexes for Mail Documents.
TYPE = "type"
MBOX = "mbox"
+MBOX_UUID = "mbox_uuid"
FLAGS = "flags"
HEADERS = "head"
CONTENT = "cnt"
@@ -46,7 +47,7 @@ UID = "uid"
TYPE_IDX = 'by-type'
TYPE_MBOX_IDX = 'by-type-and-mbox'
-#TYPE_MBOX_UID_IDX = 'by-type-and-mbox-and-uid'
+TYPE_MBOX_UUID_IDX = 'by-type-and-mbox-uuid'
TYPE_SUBS_IDX = 'by-type-and-subscribed'
TYPE_MSGID_IDX = 'by-type-and-message-id'
TYPE_MBOX_SEEN_IDX = 'by-type-and-mbox-and-seen'
@@ -62,9 +63,6 @@ TYPE_P_HASH_IDX = 'by-type-and-payloadhash'
JUST_MAIL_IDX = "just-mail"
JUST_MAIL_COMPAT_IDX = "just-mail-compat"
-# Tomas created the `recent and seen index`, but the semantic is not too
-# correct since the recent flag is volatile --- XXX review and delete.
-#TYPE_MBOX_RECT_SEEN_IDX = 'by-type-and-mbox-and-recent-and-seen'
# TODO
# it would be nice to measure the cost of indexing
@@ -77,6 +75,7 @@ MAIL_INDEXES = {
# generic
TYPE_IDX: [TYPE],
TYPE_MBOX_IDX: [TYPE, MBOX],
+ TYPE_MBOX_UUID_IDX: [TYPE, MBOX_UUID],
# XXX deprecate 0.4.0
# TYPE_MBOX_UID_IDX: [TYPE, MBOX, UID],
@@ -97,11 +96,9 @@ MAIL_INDEXES = {
TYPE_P_HASH_IDX: [TYPE, PAYLOAD_HASH],
# messages
- TYPE_MBOX_SEEN_IDX: [TYPE, MBOX, 'bool(seen)'],
- TYPE_MBOX_RECT_IDX: [TYPE, MBOX, 'bool(recent)'],
- TYPE_MBOX_DEL_IDX: [TYPE, MBOX, 'bool(deleted)'],
- #TYPE_MBOX_RECT_SEEN_IDX: [TYPE, MBOX,
- #'bool(recent)', 'bool(seen)'],
+ TYPE_MBOX_SEEN_IDX: [TYPE, MBOX_UUID, 'bool(seen)'],
+ TYPE_MBOX_RECT_IDX: [TYPE, MBOX_UUID, 'bool(recent)'],
+ TYPE_MBOX_DEL_IDX: [TYPE, MBOX_UUID, 'bool(deleted)'],
# incoming queue
JUST_MAIL_IDX: [INCOMING_KEY,
diff --git a/src/leap/mail/adaptors/tests/test_soledad_adaptor.py b/src/leap/mail/adaptors/tests/test_soledad_adaptor.py
index 7bdeef5..3dc79fe 100644
--- a/src/leap/mail/adaptors/tests/test_soledad_adaptor.py
+++ b/src/leap/mail/adaptors/tests/test_soledad_adaptor.py
@@ -276,8 +276,9 @@ HERE = os.path.split(os.path.abspath(__file__))[0]
class TestMessageClass(object):
- def __init__(self, wrapper):
+ def __init__(self, wrapper, uid):
self.wrapper = wrapper
+ self.uid = uid
def get_wrapper(self):
return self.wrapper
@@ -322,9 +323,9 @@ class SoledadMailAdaptorTestCase(SoledadTestMixin):
self.assertTrue(msg.wrapper.cdocs is not None)
self.assertEquals(len(msg.wrapper.cdocs), 1)
self.assertEquals(msg.wrapper.fdoc.chash, chash)
- self.assertEquals(msg.wrapper.fdoc.size, 3834)
+ self.assertEquals(msg.wrapper.fdoc.size, 3837)
self.assertEquals(msg.wrapper.hdoc.chash, chash)
- self.assertEqual(msg.wrapper.hdoc.headers['subject'],
+ self.assertEqual(dict(msg.wrapper.hdoc.headers)['Subject'],
subject)
self.assertEqual(msg.wrapper.hdoc.subject, subject)
self.assertEqual(msg.wrapper.cdocs[1].phash, phash)