diff options
Diffstat (limited to 'mail/src/leap/mail/adaptors/soledad.py')
| -rw-r--r-- | mail/src/leap/mail/adaptors/soledad.py | 183 | 
1 files changed, 141 insertions, 42 deletions
| diff --git a/mail/src/leap/mail/adaptors/soledad.py b/mail/src/leap/mail/adaptors/soledad.py index c5cfce06..d99f6777 100644 --- a/mail/src/leap/mail/adaptors/soledad.py +++ b/mail/src/leap/mail/adaptors/soledad.py @@ -23,7 +23,6 @@ from functools import partial  from pycryptopp.hash import sha256  from twisted.internet import defer -from twisted.python import util  from zope.interface import implements  from leap.common.check import leap_assert, leap_assert_type @@ -56,6 +55,14 @@ class DuplicatedDocumentError(Exception):      pass +def cleanup_deferred_locks(): +    """ +    Need to use this from within trial to cleanup the reactor before +    each run. +    """ +    SoledadDocumentWrapper._k_locks = defaultdict(defer.DeferredLock) + +  class SoledadDocumentWrapper(models.DocumentWrapper):      """      A Wrapper object that can be manipulated, passed around, and serialized in @@ -526,7 +533,7 @@ class MessageWrapper(object):          This method should only be used before the Documents for the          MessageWrapper have been created, will raise otherwise.          """ -        mbox_uuid = mbox.uuid.replace('-', '_') +        mbox_uuid = mbox_uuid.replace('-', '_')          self.mdoc.set_mbox_uuid(mbox_uuid)          self.fdoc.set_mbox_uuid(mbox_uuid) @@ -536,6 +543,9 @@ class MessageWrapper(object):              flags = tuple()          leap_assert_type(flags, tuple)          self.fdoc.flags = list(flags) +        self.fdoc.deleted = "\\Deleted" in flags +        self.fdoc.seen = "\\Seen" in flags +        self.fdoc.recent = "\\Recent" in flags      def set_tags(self, tags):          # TODO serialize the get + update @@ -593,26 +603,30 @@ class MailboxWrapper(SoledadDocumentWrapper):  # Soledad Adaptor  # -# TODO make this an interface?  class SoledadIndexMixin(object):      """ -    this will need a class attribute `indexes`, that is a dictionary containing +    This will need a class attribute `indexes`, that is a dictionary containing      the index definitions for the underlying u1db store underlying soledad.      It needs to be in the following format:      {'index-name': ['field1', 'field2']} + +    You can also add a class attribute `wait_for_indexes` to any class +    inheriting from this Mixin, that should be a list of strings representing +    the methods that need to wait until the indexes have been initialized +    before being able to work properly.      """ +    # TODO move this mixin to soledad itself +    # so that each application can pass a set of indexes for their data model. +      # TODO could have a wrapper class for indexes, supporting introspection      # and __getattr__ -    indexes = {} -    store_ready = False -    _index_creation_deferreds = [] +    # TODO make this an interface? -    # TODO we might want to move this logic to soledad itself -    # so that each application can pass a set of indexes for their data model. -    # TODO check also the decorator used in keymanager for waiting for indexes -    # to be ready. +    indexes = {} +    wait_for_indexes = [] +    store_ready = False      def initialize_store(self, store):          """ @@ -626,47 +640,81 @@ class SoledadIndexMixin(object):          # TODO I think we *should* get another deferredLock in here, but          # global to the soledad namespace, to protect from several points          # initializing soledad indexes at the same time. +        self._wait_for_indexes() -        leap_assert(store, "Need a store") -        leap_assert_type(self.indexes, dict) +        d = self._init_indexes(store) +        d.addCallback(self._restore_waiting_methods) +        return d -        self._index_creation_deferreds = [] +    def _init_indexes(self, store): +        """ +        Initialize the database indexes. +        """ +        leap_assert(store, "Cannot init indexes with null soledad") +        leap_assert_type(self.indexes, dict)          def _create_index(name, expression):              return store.create_index(name, *expression) -        def _create_indexes(db_indexes): -            db_indexes = dict(db_indexes) - +        def init_idexes(indexes): +            deferreds = [] +            db_indexes = dict(indexes) +            # Loop through the indexes we expect to find.              for name, expression in self.indexes.items():                  if name not in db_indexes:                      # The index does not yet exist.                      d = _create_index(name, expression) -                    self._index_creation_deferreds.append(d) -                    continue - -                if expression == db_indexes[name]: -                    # The index exists and is up to date. -                    continue -                # The index exists but the definition is not what expected, so -                # we delete it and add the proper index expression. -                d1 = store.delete_index(name) -                d1.addCallback(lambda _: _create_index(name, expression)) -                self._index_creation_deferreds.append(d1) - -            all_created = defer.gatherResults( -                self._index_creation_deferreds, consumeErrors=True) -            all_created.addCallback(_on_indexes_created) -            return all_created - -        def _on_indexes_created(ignored): +                    deferreds.append(d) +                elif expression != db_indexes[name]: +                    # The index exists but the definition is not what expected, +                    # so we delete it and add the proper index expression. +                    d = store.delete_index(name) +                    d.addCallback( +                        lambda _: _create_index(name, *expression)) +                    deferreds.append(d) +            return defer.gatherResults(deferreds, consumeErrors=True) + +        def store_ready(whatever):              self.store_ready = True +            return whatever -        # Ask the database for currently existing indexes, and create them -        # if not found. -        d = store.list_indexes() -        d.addCallback(_create_indexes) -        return d +        self.deferred_indexes = store.list_indexes() +        self.deferred_indexes.addCallback(init_idexes) +        self.deferred_indexes.addCallback(store_ready) +        return self.deferred_indexes + +    def _wait_for_indexes(self): +        """ +        Make the marked methods to wait for the indexes to be ready. +        Heavily based on +        http://blogs.fluidinfo.com/terry/2009/05/11/a-mixin-class-allowing-python-__init__-methods-to-work-with-twisted-deferreds/ + +        :param methods: methods that need to wait for the indexes to be ready +        :type methods: tuple(str) +        """ +        leap_assert_type(self.wait_for_indexes, list) +        methods = self.wait_for_indexes + +        self.waiting = [] +        self.stored = {} + +        def makeWrapper(method): +            def wrapper(*args, **kw): +                d = defer.Deferred() +                d.addCallback(lambda _: self.stored[method](*args, **kw)) +                self.waiting.append(d) +                return d +            return wrapper + +        for method in methods: +            self.stored[method] = getattr(self, method) +            setattr(self, method, makeWrapper(method)) + +    def _restore_waiting_methods(self, _): +        for method in self.stored: +            setattr(self, method, self.stored[method]) +        for d in self.waiting: +            d.callback(None)  class SoledadMailAdaptor(SoledadIndexMixin): @@ -675,8 +723,18 @@ class SoledadMailAdaptor(SoledadIndexMixin):      store = None      indexes = indexes.MAIL_INDEXES +    wait_for_indexes = ['get_or_create_mbox', 'update_mbox', 'get_all_mboxes'] +      mboxwrapper_klass = MailboxWrapper +    def __init__(self): +        SoledadIndexMixin.__init__(self) + +    mboxwrapper_klass = MailboxWrapper + +    def __init__(self): +        SoledadIndexMixin.__init__(self) +      # Message handling      def get_msg_from_string(self, MessageClass, raw_msg): @@ -762,10 +820,10 @@ class SoledadMailAdaptor(SoledadIndexMixin):              chash = re.findall(constants.METAMSGID_CHASH_RE, mdoc_id)[0]              def _get_fdoc_id_from_mdoc_id(): -                return constants.FDOCID.format(mbox=mbox, chash=chash) +                return constants.FDOCID.format(mbox_uuid=mbox, chash=chash)              def _get_hdoc_id_from_mdoc_id(): -                return constants.HDOCID.format(mbox=mbox, chash=chash) +                return constants.HDOCID.format(mbox_uuid=mbox, chash=chash)              d_docs = []              fdoc_id = _get_fdoc_id_from_mdoc_id() @@ -816,6 +874,47 @@ class SoledadMailAdaptor(SoledadIndexMixin):          wrapper = msg.get_wrapper()          return wrapper.update(store) +    # batch deletion + +    def del_all_flagged_messages(self, store, mbox_uuid): +        """ +        Delete all messages flagged as deleted. +        """ +        def err(f): +            print "ERROR GETTING FROM INDEX" +            f.printTraceback() + +        def delete_fdoc_and_mdoc_flagged(fdocs): +            # low level here, not using the wrappers... +            # get meta doc ids from the flag doc ids +            fdoc_ids = [doc.doc_id for doc in fdocs] +            mdoc_ids = map(lambda s: "M" + s[1:], fdoc_ids) + +            def delete_all_docs(mdocs, fdocs): +                mdocs = list(mdocs) +                doc_ids = [m.doc_id for m in mdocs] +                _d = [] +                docs = mdocs + fdocs +                for doc in docs: +                    _d.append(store.delete_doc(doc)) +                d = defer.gatherResults(_d) +                # return the mdocs ids only +                d.addCallback(lambda _: doc_ids) +                return d + +            d = store.get_docs(mdoc_ids) +            d.addCallback(delete_all_docs, fdocs) +            d.addErrback(err) +            return d + +        type_ = FlagsDocWrapper.model.type_ +        uuid = mbox_uuid.replace('-', '_') +        deleted_index = indexes.TYPE_MBOX_DEL_IDX + +        d = store.get_from_index(deleted_index, type_, uuid, "1") +        d.addCallbacks(delete_fdoc_and_mdoc_flagged, err) +        return d +      # Mailbox handling      def get_or_create_mbox(self, store, name): | 
