diff options
Diffstat (limited to 'src/leap/mail/adaptors/soledad.py')
-rw-r--r-- | src/leap/mail/adaptors/soledad.py | 183 |
1 files changed, 141 insertions, 42 deletions
diff --git a/src/leap/mail/adaptors/soledad.py b/src/leap/mail/adaptors/soledad.py index c5cfce0..d99f677 100644 --- a/src/leap/mail/adaptors/soledad.py +++ b/src/leap/mail/adaptors/soledad.py @@ -23,7 +23,6 @@ from functools import partial from pycryptopp.hash import sha256 from twisted.internet import defer -from twisted.python import util from zope.interface import implements from leap.common.check import leap_assert, leap_assert_type @@ -56,6 +55,14 @@ class DuplicatedDocumentError(Exception): pass +def cleanup_deferred_locks(): + """ + Need to use this from within trial to cleanup the reactor before + each run. + """ + SoledadDocumentWrapper._k_locks = defaultdict(defer.DeferredLock) + + class SoledadDocumentWrapper(models.DocumentWrapper): """ A Wrapper object that can be manipulated, passed around, and serialized in @@ -526,7 +533,7 @@ class MessageWrapper(object): This method should only be used before the Documents for the MessageWrapper have been created, will raise otherwise. """ - mbox_uuid = mbox.uuid.replace('-', '_') + mbox_uuid = mbox_uuid.replace('-', '_') self.mdoc.set_mbox_uuid(mbox_uuid) self.fdoc.set_mbox_uuid(mbox_uuid) @@ -536,6 +543,9 @@ class MessageWrapper(object): flags = tuple() leap_assert_type(flags, tuple) self.fdoc.flags = list(flags) + self.fdoc.deleted = "\\Deleted" in flags + self.fdoc.seen = "\\Seen" in flags + self.fdoc.recent = "\\Recent" in flags def set_tags(self, tags): # TODO serialize the get + update @@ -593,26 +603,30 @@ class MailboxWrapper(SoledadDocumentWrapper): # Soledad Adaptor # -# TODO make this an interface? class SoledadIndexMixin(object): """ - this will need a class attribute `indexes`, that is a dictionary containing + This will need a class attribute `indexes`, that is a dictionary containing the index definitions for the underlying u1db store underlying soledad. It needs to be in the following format: {'index-name': ['field1', 'field2']} + + You can also add a class attribute `wait_for_indexes` to any class + inheriting from this Mixin, that should be a list of strings representing + the methods that need to wait until the indexes have been initialized + before being able to work properly. """ + # TODO move this mixin to soledad itself + # so that each application can pass a set of indexes for their data model. + # TODO could have a wrapper class for indexes, supporting introspection # and __getattr__ - indexes = {} - store_ready = False - _index_creation_deferreds = [] + # TODO make this an interface? - # TODO we might want to move this logic to soledad itself - # so that each application can pass a set of indexes for their data model. - # TODO check also the decorator used in keymanager for waiting for indexes - # to be ready. + indexes = {} + wait_for_indexes = [] + store_ready = False def initialize_store(self, store): """ @@ -626,47 +640,81 @@ class SoledadIndexMixin(object): # TODO I think we *should* get another deferredLock in here, but # global to the soledad namespace, to protect from several points # initializing soledad indexes at the same time. + self._wait_for_indexes() - leap_assert(store, "Need a store") - leap_assert_type(self.indexes, dict) + d = self._init_indexes(store) + d.addCallback(self._restore_waiting_methods) + return d - self._index_creation_deferreds = [] + def _init_indexes(self, store): + """ + Initialize the database indexes. + """ + leap_assert(store, "Cannot init indexes with null soledad") + leap_assert_type(self.indexes, dict) def _create_index(name, expression): return store.create_index(name, *expression) - def _create_indexes(db_indexes): - db_indexes = dict(db_indexes) - + def init_idexes(indexes): + deferreds = [] + db_indexes = dict(indexes) + # Loop through the indexes we expect to find. for name, expression in self.indexes.items(): if name not in db_indexes: # The index does not yet exist. d = _create_index(name, expression) - self._index_creation_deferreds.append(d) - continue - - if expression == db_indexes[name]: - # The index exists and is up to date. - continue - # The index exists but the definition is not what expected, so - # we delete it and add the proper index expression. - d1 = store.delete_index(name) - d1.addCallback(lambda _: _create_index(name, expression)) - self._index_creation_deferreds.append(d1) - - all_created = defer.gatherResults( - self._index_creation_deferreds, consumeErrors=True) - all_created.addCallback(_on_indexes_created) - return all_created - - def _on_indexes_created(ignored): + deferreds.append(d) + elif expression != db_indexes[name]: + # The index exists but the definition is not what expected, + # so we delete it and add the proper index expression. + d = store.delete_index(name) + d.addCallback( + lambda _: _create_index(name, *expression)) + deferreds.append(d) + return defer.gatherResults(deferreds, consumeErrors=True) + + def store_ready(whatever): self.store_ready = True + return whatever - # Ask the database for currently existing indexes, and create them - # if not found. - d = store.list_indexes() - d.addCallback(_create_indexes) - return d + self.deferred_indexes = store.list_indexes() + self.deferred_indexes.addCallback(init_idexes) + self.deferred_indexes.addCallback(store_ready) + return self.deferred_indexes + + def _wait_for_indexes(self): + """ + Make the marked methods to wait for the indexes to be ready. + Heavily based on + http://blogs.fluidinfo.com/terry/2009/05/11/a-mixin-class-allowing-python-__init__-methods-to-work-with-twisted-deferreds/ + + :param methods: methods that need to wait for the indexes to be ready + :type methods: tuple(str) + """ + leap_assert_type(self.wait_for_indexes, list) + methods = self.wait_for_indexes + + self.waiting = [] + self.stored = {} + + def makeWrapper(method): + def wrapper(*args, **kw): + d = defer.Deferred() + d.addCallback(lambda _: self.stored[method](*args, **kw)) + self.waiting.append(d) + return d + return wrapper + + for method in methods: + self.stored[method] = getattr(self, method) + setattr(self, method, makeWrapper(method)) + + def _restore_waiting_methods(self, _): + for method in self.stored: + setattr(self, method, self.stored[method]) + for d in self.waiting: + d.callback(None) class SoledadMailAdaptor(SoledadIndexMixin): @@ -675,8 +723,18 @@ class SoledadMailAdaptor(SoledadIndexMixin): store = None indexes = indexes.MAIL_INDEXES + wait_for_indexes = ['get_or_create_mbox', 'update_mbox', 'get_all_mboxes'] + mboxwrapper_klass = MailboxWrapper + def __init__(self): + SoledadIndexMixin.__init__(self) + + mboxwrapper_klass = MailboxWrapper + + def __init__(self): + SoledadIndexMixin.__init__(self) + # Message handling def get_msg_from_string(self, MessageClass, raw_msg): @@ -762,10 +820,10 @@ class SoledadMailAdaptor(SoledadIndexMixin): chash = re.findall(constants.METAMSGID_CHASH_RE, mdoc_id)[0] def _get_fdoc_id_from_mdoc_id(): - return constants.FDOCID.format(mbox=mbox, chash=chash) + return constants.FDOCID.format(mbox_uuid=mbox, chash=chash) def _get_hdoc_id_from_mdoc_id(): - return constants.HDOCID.format(mbox=mbox, chash=chash) + return constants.HDOCID.format(mbox_uuid=mbox, chash=chash) d_docs = [] fdoc_id = _get_fdoc_id_from_mdoc_id() @@ -816,6 +874,47 @@ class SoledadMailAdaptor(SoledadIndexMixin): wrapper = msg.get_wrapper() return wrapper.update(store) + # batch deletion + + def del_all_flagged_messages(self, store, mbox_uuid): + """ + Delete all messages flagged as deleted. + """ + def err(f): + print "ERROR GETTING FROM INDEX" + f.printTraceback() + + def delete_fdoc_and_mdoc_flagged(fdocs): + # low level here, not using the wrappers... + # get meta doc ids from the flag doc ids + fdoc_ids = [doc.doc_id for doc in fdocs] + mdoc_ids = map(lambda s: "M" + s[1:], fdoc_ids) + + def delete_all_docs(mdocs, fdocs): + mdocs = list(mdocs) + doc_ids = [m.doc_id for m in mdocs] + _d = [] + docs = mdocs + fdocs + for doc in docs: + _d.append(store.delete_doc(doc)) + d = defer.gatherResults(_d) + # return the mdocs ids only + d.addCallback(lambda _: doc_ids) + return d + + d = store.get_docs(mdoc_ids) + d.addCallback(delete_all_docs, fdocs) + d.addErrback(err) + return d + + type_ = FlagsDocWrapper.model.type_ + uuid = mbox_uuid.replace('-', '_') + deleted_index = indexes.TYPE_MBOX_DEL_IDX + + d = store.get_from_index(deleted_index, type_, uuid, "1") + d.addCallbacks(delete_fdoc_and_mdoc_flagged, err) + return d + # Mailbox handling def get_or_create_mbox(self, store, name): |