summaryrefslogtreecommitdiff
path: root/src/leap/bitmask/mail/adaptors/soledad.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/bitmask/mail/adaptors/soledad.py')
-rw-r--r--src/leap/bitmask/mail/adaptors/soledad.py1268
1 files changed, 1268 insertions, 0 deletions
diff --git a/src/leap/bitmask/mail/adaptors/soledad.py b/src/leap/bitmask/mail/adaptors/soledad.py
new file mode 100644
index 0000000..ca8f741
--- /dev/null
+++ b/src/leap/bitmask/mail/adaptors/soledad.py
@@ -0,0 +1,1268 @@
+# soledad.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Soledadad MailAdaptor module.
+"""
+import logging
+import re
+
+from collections import defaultdict
+from email import message_from_string
+
+from twisted.internet import defer
+from twisted.python import log
+from zope.interface import implements
+from leap.soledad.common import l2db
+
+from leap.common.check import leap_assert, leap_assert_type
+
+from leap.mail import constants
+from leap.mail import walk
+from leap.mail.adaptors import soledad_indexes as indexes
+from leap.mail.constants import INBOX_NAME
+from leap.mail.adaptors import models
+from leap.mail.imap.mailbox import normalize_mailbox
+from leap.mail.utils import lowerdict, first
+from leap.mail.utils import stringify_parts_map
+from leap.mail.interfaces import IMailAdaptor, IMessageWrapper
+
+from leap.soledad.common.document import SoledadDocument
+
+
+logger = logging.getLogger(__name__)
+
+# TODO
+# [ ] Convenience function to create mail specifying subject, date, etc?
+
+
+_MSGID_PATTERN = r"""<([\w@.]+)>"""
+_MSGID_RE = re.compile(_MSGID_PATTERN)
+
+
+class DuplicatedDocumentError(Exception):
+ """
+ Raised when a duplicated document is detected.
+ """
+ pass
+
+
+def cleanup_deferred_locks():
+ """
+ Need to use this from within trial to cleanup the reactor before
+ each run.
+ """
+ SoledadDocumentWrapper._k_locks = defaultdict(defer.DeferredLock)
+
+
+class SoledadDocumentWrapper(models.DocumentWrapper):
+ """
+ A Wrapper object that can be manipulated, passed around, and serialized in
+ a format that the Soledad Store understands.
+
+ It ensures atomicity of the document operations on creation, update and
+ deletion.
+ """
+ # TODO we could also use a _dirty flag (in models)
+ # TODO add a get_count() method ??? -- that is extended over l2db.
+
+ # We keep a dictionary with DeferredLocks, that will be
+ # unique to every subclass of SoledadDocumentWrapper.
+ _k_locks = defaultdict(defer.DeferredLock)
+
+ @classmethod
+ def _get_klass_lock(cls):
+ """
+ Get a DeferredLock that is unique for this subclass name.
+ Used to lock the access to indexes in the `get_or_create` call
+ for a particular DocumentWrapper.
+ """
+ return cls._k_locks[cls.__name__]
+
+ def __init__(self, doc_id=None, future_doc_id=None, **kwargs):
+ self._doc_id = doc_id
+ self._future_doc_id = future_doc_id
+ self._lock = defer.DeferredLock()
+ super(SoledadDocumentWrapper, self).__init__(**kwargs)
+
+ @property
+ def doc_id(self):
+ return self._doc_id
+
+ @property
+ def future_doc_id(self):
+ return self._future_doc_id
+
+ def set_future_doc_id(self, doc_id):
+ self._future_doc_id = doc_id
+
+ def create(self, store, is_copy=False):
+ """
+ Create the documents for this wrapper.
+ Since this method will not check for duplication, the
+ responsibility of avoiding duplicates is left to the caller.
+
+ You might be interested in using `get_or_create` classmethod
+ instead (that's the preferred way of creating documents from
+ the wrapper object).
+
+ :return: a deferred that will fire when the underlying
+ Soledad document has been created.
+ :rtype: Deferred
+ """
+ leap_assert(self._doc_id is None,
+ "This document already has a doc_id!")
+
+ def update_doc_id(doc):
+ self._doc_id = doc.doc_id
+ self.set_future_doc_id(None)
+ return doc
+
+ def update_wrapper(failure):
+ # In the case of some copies (for instance, from one folder to
+ # another and back to the original folder), the document that we
+ # want to insert already exists. In this case, putting it
+ # and overwriting the document with that doc_id is the right thing
+ # to do.
+ failure.trap(l2db.errors.RevisionConflict)
+ self._doc_id = self.future_doc_id
+ self._future_doc_id = None
+ return self.update(store)
+
+ if self.future_doc_id is None:
+ d = store.create_doc(self.serialize())
+ else:
+ d = store.create_doc(self.serialize(),
+ doc_id=self.future_doc_id)
+ d.addCallback(update_doc_id)
+
+ if is_copy:
+ d.addErrback(update_wrapper)
+ else:
+ d.addErrback(self._catch_revision_conflict, self.future_doc_id)
+ return d
+
+ def update(self, store):
+ """
+ Update the documents for this wrapper.
+
+ :return: a deferred that will fire when the underlying
+ Soledad document has been updated.
+ :rtype: Deferred
+ """
+ # the deferred lock guards against revision conflicts
+ return self._lock.run(self._update, store)
+
+ def _update(self, store):
+ leap_assert(self._doc_id is not None,
+ "Need to create doc before updating")
+
+ def update_and_put_doc(doc):
+ doc.content.update(self.serialize())
+ d = store.put_doc(doc)
+ d.addErrback(self._catch_revision_conflict, doc.doc_id)
+ return d
+
+ d = store.get_doc(self._doc_id)
+ d.addCallback(update_and_put_doc)
+ return d
+
+ def _catch_revision_conflict(self, failure, doc_id):
+ # XXX We can have some RevisionConflicts if we try
+ # to put the docs that are already there.
+ # This can happen right now when creating/saving the cdocs
+ # during a copy. Instead of catching and ignoring this
+ # error, we should mark them in the copy so there is no attempt to
+ # create/update them.
+ failure.trap(l2db.errors.RevisionConflict)
+ logger.debug("Got conflict while putting %s" % doc_id)
+
+ def delete(self, store):
+ """
+ Delete the documents for this wrapper.
+
+ :return: a deferred that will fire when the underlying
+ Soledad document has been deleted.
+ :rtype: Deferred
+ """
+ # the deferred lock guards against conflicts while updating
+ return self._lock.run(self._delete, store)
+
+ def _delete(self, store):
+ leap_assert(self._doc_id is not None,
+ "Need to create doc before deleting")
+ # XXX might want to flag this DocumentWrapper to avoid
+ # updating it by mistake. This could go in models.DocumentWrapper
+
+ def delete_doc(doc):
+ return store.delete_doc(doc)
+
+ d = store.get_doc(self._doc_id)
+ d.addCallback(delete_doc)
+ return d
+
+ @classmethod
+ def get_or_create(cls, store, index, value):
+ """
+ Get a unique DocumentWrapper by index, or create a new one if the
+ matching query does not exist.
+
+ :param index: the primary index for the model.
+ :type index: str
+ :param value: the value to query the primary index.
+ :type value: str
+
+ :return: a deferred that will be fired with the SoledadDocumentWrapper
+ matching the index query, either existing or just created.
+ :rtype: Deferred
+ """
+ return cls._get_klass_lock().run(
+ cls._get_or_create, store, index, value)
+
+ @classmethod
+ def _get_or_create(cls, store, index, value):
+ # TODO shorten this method.
+ assert store is not None
+ assert index is not None
+ assert value is not None
+
+ def get_main_index():
+ try:
+ return cls.model.__meta__.index
+ except AttributeError:
+ raise RuntimeError("The model is badly defined")
+
+ # TODO separate into another method?
+ def try_to_get_doc_from_index(indexes):
+ values = []
+ idx_def = dict(indexes)[index]
+ if len(idx_def) == 1:
+ values = [value]
+ else:
+ main_index = get_main_index()
+ fields = cls.model.serialize()
+ for field in idx_def:
+ if field == main_index:
+ values.append(value)
+ else:
+ values.append(fields[field])
+ d = store.get_from_index(index, *values)
+ return d
+
+ def get_first_doc_if_any(docs):
+ if not docs:
+ return None
+ if len(docs) > 1:
+ raise DuplicatedDocumentError
+ return docs[0]
+
+ def wrap_existing_or_create_new(doc):
+ if doc:
+ return cls(doc_id=doc.doc_id, **doc.content)
+ else:
+ return create_and_wrap_new_doc()
+
+ def create_and_wrap_new_doc():
+ # XXX use closure to store indexes instead of
+ # querying for them again.
+ d = store.list_indexes()
+ d.addCallback(get_wrapper_instance_from_index)
+ d.addCallback(return_wrapper_when_created)
+ return d
+
+ def get_wrapper_instance_from_index(indexes):
+ init_values = {}
+ idx_def = dict(indexes)[index]
+ if len(idx_def) == 1:
+ init_value = {idx_def[0]: value}
+ return cls(**init_value)
+ main_index = get_main_index()
+ fields = cls.model.serialize()
+ for field in idx_def:
+ if field == main_index:
+ init_values[field] = value
+ else:
+ init_values[field] = fields[field]
+ return cls(**init_values)
+
+ def return_wrapper_when_created(wrapper):
+ d = wrapper.create(store)
+ d.addCallback(lambda doc: wrapper)
+ return d
+
+ d = store.list_indexes()
+ d.addCallback(try_to_get_doc_from_index)
+ d.addCallback(get_first_doc_if_any)
+ d.addCallback(wrap_existing_or_create_new)
+ return d
+
+ @classmethod
+ def get_all(cls, store):
+ """
+ Get a collection of wrappers around all the documents belonging
+ to this kind.
+
+ For this to work, the model.__meta__ needs to include a tuple with
+ the index to be used for listing purposes, and which is the field to be
+ used to query the index.
+
+ Note that this method only supports indexes of a single field at the
+ moment. It also might be too expensive to return all the documents
+ matching the query, so handle with care.
+
+ class __meta__(object):
+ index = "name"
+ list_index = ("by-type", "type_")
+
+ :return: a deferred that will be fired with an iterable containing
+ as many SoledadDocumentWrapper are matching the index defined
+ in the model as the `list_index`.
+ :rtype: Deferred
+ """
+ # TODO LIST (get_all)
+ # [ ] extend support to indexes with n-ples
+ # [ ] benchmark the cost of querying and returning indexes in a big
+ # database. This might badly need pagination before being put to
+ # serious use.
+ return cls._get_klass_lock().run(cls._get_all, store)
+
+ @classmethod
+ def _get_all(cls, store):
+ try:
+ list_index, list_attr = cls.model.__meta__.list_index
+ except AttributeError:
+ raise RuntimeError("The model is badly defined: no list_index")
+ try:
+ index_value = getattr(cls.model, list_attr)
+ except AttributeError:
+ raise RuntimeError("The model is badly defined: "
+ "no attribute matching list_index")
+
+ def wrap_docs(docs):
+ return (cls(doc_id=doc.doc_id, **doc.content) for doc in docs)
+
+ d = store.get_from_index(list_index, index_value)
+ d.addCallback(wrap_docs)
+ return d
+
+ def __repr__(self):
+ try:
+ idx = getattr(self, self.model.__meta__.index)
+ except AttributeError:
+ idx = ""
+ return "<%s: %s (%s)>" % (self.__class__.__name__,
+ idx, self._doc_id)
+
+
+#
+# Message documents
+#
+
+class FlagsDocWrapper(SoledadDocumentWrapper):
+
+ class model(models.SerializableModel):
+ type_ = "flags"
+ chash = ""
+
+ mbox_uuid = ""
+ seen = False
+ deleted = False
+ recent = False
+ flags = []
+ tags = []
+ size = 0
+ multi = False
+
+ class __meta__(object):
+ index = "mbox"
+
+ def set_mbox_uuid(self, mbox_uuid):
+ # XXX raise error if already created, should use copy instead
+ mbox_uuid = mbox_uuid.replace('-', '_')
+ new_id = constants.FDOCID.format(mbox_uuid=mbox_uuid, chash=self.chash)
+ self._future_doc_id = new_id
+ self.mbox_uuid = mbox_uuid
+
+ def get_flags(self):
+ """
+ Get the flags for this message (as a tuple of strings, not unicode).
+ """
+ return map(str, self.flags)
+
+
+class HeaderDocWrapper(SoledadDocumentWrapper):
+
+ class model(models.SerializableModel):
+ type_ = "head"
+ chash = ""
+
+ date = ""
+ subject = ""
+ headers = {}
+ part_map = {}
+ body = "" # link to phash of body
+ msgid = ""
+ multi = False
+
+ class __meta__(object):
+ index = "chash"
+
+
+class ContentDocWrapper(SoledadDocumentWrapper):
+
+ class model(models.SerializableModel):
+ type_ = "cnt"
+ phash = ""
+
+ ctype = "" # XXX index by ctype too?
+ lkf = [] # XXX not implemented yet!
+ raw = ""
+
+ content_disposition = ""
+ content_transfer_encoding = ""
+ content_type = ""
+
+ class __meta__(object):
+ index = "phash"
+
+
+class MetaMsgDocWrapper(SoledadDocumentWrapper):
+
+ class model(models.SerializableModel):
+ type_ = "meta"
+ fdoc = ""
+ hdoc = ""
+ cdocs = []
+
+ def set_mbox_uuid(self, mbox_uuid):
+ # XXX raise error if already created, should use copy instead
+ mbox_uuid = mbox_uuid.replace('-', '_')
+ chash = re.findall(constants.FDOCID_CHASH_RE, self.fdoc)[0]
+ new_id = constants.METAMSGID.format(mbox_uuid=mbox_uuid, chash=chash)
+ new_fdoc_id = constants.FDOCID.format(mbox_uuid=mbox_uuid, chash=chash)
+ self._future_doc_id = new_id
+ self.fdoc = new_fdoc_id
+
+
+class MessageWrapper(object):
+
+ # This could benefit of a DeferredLock to create/update all the
+ # documents at the same time maybe, and defend against concurrent updates?
+
+ implements(IMessageWrapper)
+
+ def __init__(self, mdoc, fdoc, hdoc, cdocs=None, is_copy=False):
+ """
+ Need at least a metamsg-document, a flag-document and a header-document
+ to instantiate a MessageWrapper. Content-documents can be retrieved
+ lazily.
+
+ cdocs, if any, should be a dictionary in which the keys are ascending
+ integers, beginning at one, and the values are dictionaries with the
+ content of the content-docs.
+
+ is_copy, if set to True, will only attempt to create mdoc and fdoc
+ (because hdoc and cdocs are supposed to exist already)
+ """
+ self._is_copy = is_copy
+
+ def get_doc_wrapper(doc, cls):
+ if isinstance(doc, SoledadDocument):
+ doc_id = doc.doc_id
+ doc = doc.content
+ else:
+ doc_id = None
+ if not doc:
+ doc = {}
+ return cls(doc_id=doc_id, **doc)
+
+ self.mdoc = get_doc_wrapper(mdoc, MetaMsgDocWrapper)
+
+ self.fdoc = get_doc_wrapper(fdoc, FlagsDocWrapper)
+ self.fdoc.set_future_doc_id(self.mdoc.fdoc)
+
+ self.hdoc = get_doc_wrapper(hdoc, HeaderDocWrapper)
+ self.hdoc.set_future_doc_id(self.mdoc.hdoc)
+
+ if cdocs is None:
+ cdocs = {}
+ cdocs_keys = cdocs.keys()
+ assert sorted(cdocs_keys) == range(1, len(cdocs_keys) + 1)
+ self.cdocs = dict([
+ (key, get_doc_wrapper(doc, ContentDocWrapper))
+ for (key, doc) in cdocs.items()])
+ for doc_id, cdoc in zip(self.mdoc.cdocs, self.cdocs.values()):
+ if cdoc.raw == "":
+ log.msg("Empty raw field in cdoc %s" % doc_id)
+ cdoc.set_future_doc_id(doc_id)
+
+ def create(self, store, notify_just_mdoc=False, pending_inserts_dict=None):
+ """
+ Create all the parts for this message in the store.
+
+ :param store: an instance of Soledad
+
+ :param notify_just_mdoc:
+ if set to True, this method will return *only* the deferred
+ corresponding to the creation of the meta-message document.
+ Be warned that in that case there will be no record of failures
+ when creating the other part-documents.
+
+ Otherwise, this method will return a deferred that will wait for
+ the creation of all the part documents.
+
+ Setting this flag to True is mostly a convenient workaround for the
+ fact that massive serial appends will take too much time, and in
+ most of the cases the MUA will only switch to the mailbox where the
+ appends have happened after a certain time, which in most of the
+ times will be enough to have all the queued insert operations
+ finished.
+ :type notify_just_mdoc: bool
+ :param pending_inserts_dict:
+ a dictionary with the pending inserts ids.
+ :type pending_inserts_dict: dict
+
+ :return: a deferred whose callback will be called when either all the
+ part documents have been written, or just the metamsg-doc,
+ depending on the value of the notify_just_mdoc flag
+ :rtype: defer.Deferred
+ """
+ if pending_inserts_dict is None:
+ pending_inserts_dict = {}
+
+ leap_assert(self.cdocs,
+ "Need non empty cdocs to create the "
+ "MessageWrapper documents")
+ leap_assert(self.mdoc.doc_id is None,
+ "Cannot create: mdoc has a doc_id")
+ leap_assert(self.fdoc.doc_id is None,
+ "Cannot create: fdoc has a doc_id")
+
+ def unblock_pending_insert(result):
+ if pending_inserts_dict:
+ ci_headers = lowerdict(self.hdoc.headers)
+ msgid = ci_headers.get('message-id', None)
+ try:
+ d = pending_inserts_dict[msgid]
+ d.callback(msgid)
+ except KeyError:
+ pass
+ return result
+
+ # TODO check that the doc_ids in the mdoc are coherent
+ self.d = []
+
+ mdoc_created = self.mdoc.create(store, is_copy=self._is_copy)
+ fdoc_created = self.fdoc.create(store, is_copy=self._is_copy)
+
+ mdoc_created.addErrback(lambda f: log.err(f))
+ fdoc_created.addErrback(lambda f: log.err(f))
+
+ self.d.append(mdoc_created)
+ self.d.append(fdoc_created)
+
+ if not self._is_copy:
+ if self.hdoc.doc_id is None:
+ self.d.append(self.hdoc.create(store))
+ for cdoc in self.cdocs.values():
+ if cdoc.doc_id is not None:
+ # we could be just linking to an existing
+ # content-doc.
+ continue
+ self.d.append(cdoc.create(store))
+
+ def log_all_inserted(result):
+ log.msg("All parts inserted for msg!")
+ return result
+
+ self.all_inserted_d = defer.gatherResults(self.d, consumeErrors=True)
+ self.all_inserted_d.addCallback(log_all_inserted)
+ self.all_inserted_d.addCallback(unblock_pending_insert)
+ self.all_inserted_d.addErrback(lambda failure: log.err(failure))
+
+ if notify_just_mdoc:
+ return mdoc_created
+ else:
+ return self.all_inserted_d
+
+ def update(self, store):
+ """
+ Update the only mutable parts, which are within the flags document.
+ """
+ return self.fdoc.update(store)
+
+ def delete(self, store):
+ # TODO
+ # Eventually this would have to do the duplicate search or send for the
+ # garbage collector. At least mdoc and t the mdoc and fdoc can be
+ # unlinked.
+ d = []
+ if self.mdoc.doc_id:
+ d.append(self.mdoc.delete(store))
+ d.append(self.fdoc.delete(store))
+ return defer.gatherResults(d)
+
+ def copy(self, store, new_mbox_uuid):
+ """
+ Return a copy of this MessageWrapper in a new mailbox.
+
+ :param store: an instance of Soledad, or anything that behaves alike.
+ :param new_mbox_uuid: the uuid of the mailbox where we are copying this
+ message to.
+ :type new_mbox_uuid: str
+ :rtype: MessageWrapper
+ """
+ new_mdoc = self.mdoc.serialize()
+ new_fdoc = self.fdoc.serialize()
+
+ # the future doc_ids is properly set because we modified
+ # the pointers in mdoc, which has precedence.
+ new_wrapper = MessageWrapper(new_mdoc, new_fdoc, None, None,
+ is_copy=True)
+ new_wrapper.hdoc = self.hdoc
+ new_wrapper.cdocs = self.cdocs
+ new_wrapper.set_mbox_uuid(new_mbox_uuid)
+
+ # XXX could flag so that it only creates mdoc/fdoc...
+
+ d = new_wrapper.create(store)
+ d.addCallback(lambda result: new_wrapper)
+ d.addErrback(lambda failure: log.err(failure))
+ return d
+
+ def set_mbox_uuid(self, mbox_uuid):
+ """
+ Set the mailbox for this wrapper.
+ This method should only be used before the Documents for the
+ MessageWrapper have been created, will raise otherwise.
+ """
+ mbox_uuid = mbox_uuid.replace('-', '_')
+ self.mdoc.set_mbox_uuid(mbox_uuid)
+ self.fdoc.set_mbox_uuid(mbox_uuid)
+
+ def set_flags(self, flags):
+ # TODO serialize the get + update
+ if flags is None:
+ flags = tuple()
+ leap_assert_type(flags, tuple)
+ self.fdoc.flags = list(flags)
+ self.fdoc.deleted = "\\Deleted" in flags
+ self.fdoc.seen = "\\Seen" in flags
+ self.fdoc.recent = "\\Recent" in flags
+
+ def set_tags(self, tags):
+ # TODO serialize the get + update
+ if tags is None:
+ tags = tuple()
+ leap_assert_type(tags, tuple)
+ self.fdoc.tags = list(tags)
+
+ def set_date(self, date):
+ # XXX assert valid date format
+ self.hdoc.date = date
+
+ def get_subpart_dict(self, index):
+ """
+ :param index: the part to lookup, 1-indexed
+ :type index: int
+ :rtype: dict
+ """
+ return self.hdoc.part_map[str(index)]
+
+ def get_subpart_indexes(self):
+ return self.hdoc.part_map.keys()
+
+ def get_body(self, store):
+ """
+ :rtype: deferred
+ """
+ body_phash = self.hdoc.body
+ if body_phash:
+ d = store.get_doc('C-' + body_phash)
+ d.addCallback(lambda doc: ContentDocWrapper(**doc.content))
+ return d
+ elif self.cdocs:
+ return self.cdocs[1]
+ else:
+ return ''
+
+#
+# Mailboxes
+#
+
+
+class MailboxWrapper(SoledadDocumentWrapper):
+
+ class model(models.SerializableModel):
+ type_ = "mbox"
+ mbox = INBOX_NAME
+ uuid = None
+ flags = []
+ recent = []
+ created = 1
+ closed = False
+ subscribed = False
+
+ class __meta__(object):
+ index = "mbox"
+ list_index = (indexes.TYPE_IDX, 'type_')
+
+
+#
+# Soledad Adaptor
+#
+
+class SoledadIndexMixin(object):
+ """
+ This will need a class attribute `indexes`, that is a dictionary containing
+ the index definitions for the underlying l2db store underlying soledad.
+
+ It needs to be in the following format:
+ {'index-name': ['field1', 'field2']}
+
+ You can also add a class attribute `wait_for_indexes` to any class
+ inheriting from this Mixin, that should be a list of strings representing
+ the methods that need to wait until the indexes have been initialized
+ before being able to work properly.
+ """
+ # TODO move this mixin to soledad itself
+ # so that each application can pass a set of indexes for their data model.
+
+ # TODO could have a wrapper class for indexes, supporting introspection
+ # and __getattr__
+
+ # TODO make this an interface?
+
+ indexes = {}
+ wait_for_indexes = []
+ store_ready = False
+
+ def initialize_store(self, store):
+ """
+ Initialize the indexes in the database.
+
+ :param store: store
+ :returns: a Deferred that will fire when the store is correctly
+ initialized.
+ :rtype: deferred
+ """
+ # TODO I think we *should* get another deferredLock in here, but
+ # global to the soledad namespace, to protect from several points
+ # initializing soledad indexes at the same time.
+ self._wait_for_indexes()
+
+ d = self._init_indexes(store)
+ d.addCallback(self._restore_waiting_methods)
+ return d
+
+ def _init_indexes(self, store):
+ """
+ Initialize the database indexes.
+ """
+ leap_assert(store, "Cannot init indexes with null soledad")
+ leap_assert_type(self.indexes, dict)
+
+ def _create_index(name, expression):
+ return store.create_index(name, *expression)
+
+ def init_idexes(indexes):
+ deferreds = []
+ db_indexes = dict(indexes)
+ # Loop through the indexes we expect to find.
+ for name, expression in self.indexes.items():
+ if name not in db_indexes:
+ # The index does not yet exist.
+ d = _create_index(name, expression)
+ deferreds.append(d)
+ elif expression != db_indexes[name]:
+ # The index exists but the definition is not what expected,
+ # so we delete it and add the proper index expression.
+ d = store.delete_index(name)
+ d.addCallback(
+ lambda _: _create_index(name, *expression))
+ deferreds.append(d)
+ return defer.gatherResults(deferreds, consumeErrors=True)
+
+ def store_ready(whatever):
+ self.store_ready = True
+ return whatever
+
+ self.deferred_indexes = store.list_indexes()
+ self.deferred_indexes.addCallback(init_idexes)
+ self.deferred_indexes.addCallback(store_ready)
+ return self.deferred_indexes
+
+ def _wait_for_indexes(self):
+ """
+ Make the marked methods to wait for the indexes to be ready.
+ Heavily based on
+ http://blogs.fluidinfo.com/terry/2009/05/11/a-mixin-class-allowing-python-__init__-methods-to-work-with-twisted-deferreds/
+
+ :param methods: methods that need to wait for the indexes to be ready
+ :type methods: tuple(str)
+ """
+ leap_assert_type(self.wait_for_indexes, list)
+ methods = self.wait_for_indexes
+
+ self.waiting = []
+ self.stored = {}
+
+ def makeWrapper(method):
+ def wrapper(*args, **kw):
+ d = defer.Deferred()
+ d.addCallback(lambda _: self.stored[method](*args, **kw))
+ self.waiting.append(d)
+ return d
+ return wrapper
+
+ for method in methods:
+ self.stored[method] = getattr(self, method)
+ setattr(self, method, makeWrapper(method))
+
+ def _restore_waiting_methods(self, _):
+ for method in self.stored:
+ setattr(self, method, self.stored[method])
+ for d in self.waiting:
+ d.callback(None)
+
+
+class SoledadMailAdaptor(SoledadIndexMixin):
+
+ implements(IMailAdaptor)
+ store = None
+
+ indexes = indexes.MAIL_INDEXES
+ wait_for_indexes = ['get_or_create_mbox', 'update_mbox', 'get_all_mboxes']
+
+ mboxwrapper_klass = MailboxWrapper
+
+ def __init__(self):
+ SoledadIndexMixin.__init__(self)
+
+ # Message handling
+
+ def get_msg_from_string(self, MessageClass, raw_msg):
+ """
+ Get an instance of a MessageClass initialized with a MessageWrapper
+ that contains all the parts obtained from parsing the raw string for
+ the message.
+
+ :param MessageClass: any Message class that can be initialized passing
+ an instance of an IMessageWrapper implementor.
+ :type MessageClass: type
+ :param raw_msg: a string containing the raw email message.
+ :type raw_msg: str
+ :rtype: MessageClass instance.
+ """
+ assert(MessageClass is not None)
+ mdoc, fdoc, hdoc, cdocs = _split_into_parts(raw_msg)
+ return self.get_msg_from_docs(
+ MessageClass, mdoc, fdoc, hdoc, cdocs)
+
+ def get_msg_from_docs(self, MessageClass, mdoc, fdoc, hdoc, cdocs=None,
+ uid=None):
+ """
+ Get an instance of a MessageClass initialized with a MessageWrapper
+ that contains the passed part documents.
+
+ This is not the recommended way of obtaining a message, unless you know
+ how to take care of ensuring the internal consistency between the part
+ documents, or unless you are glueing together the part documents that
+ have been previously generated by `get_msg_from_string`.
+
+ :param MessageClass: any Message class that can be initialized passing
+ an instance of an IMessageWrapper implementor.
+ :type MessageClass: type
+ :param fdoc: a dictionary containing values from which a
+ FlagsDocWrapper can be initialized
+ :type fdoc: dict
+ :param hdoc: a dictionary containing values from which a
+ HeaderDocWrapper can be initialized
+ :type hdoc: dict
+ :param cdocs: None, or a dictionary mapping integers (1-indexed) to
+ dicts from where a ContentDocWrapper can be initialized.
+ :type cdocs: dict, or None
+
+ :rtype: MessageClass instance.
+ """
+ assert(MessageClass is not None)
+ return MessageClass(MessageWrapper(mdoc, fdoc, hdoc, cdocs), uid=uid)
+
+ def get_msg_from_mdoc_id(self, MessageClass, store, mdoc_id,
+ uid=None, get_cdocs=False):
+
+ def wrap_meta_doc(doc):
+ cls = MetaMsgDocWrapper
+ return cls(doc_id=doc.doc_id, **doc.content)
+
+ def get_part_docs_from_mdoc_wrapper(wrapper):
+ d_docs = []
+ d_docs.append(store.get_doc(wrapper.fdoc))
+ d_docs.append(store.get_doc(wrapper.hdoc))
+ for cdoc in wrapper.cdocs:
+ d_docs.append(store.get_doc(cdoc))
+
+ def add_mdoc(doc_list):
+ return [wrapper.serialize()] + doc_list
+
+ d = defer.gatherResults(d_docs)
+ d.addCallback(add_mdoc)
+ return d
+
+ def get_parts_doc_from_mdoc_id():
+ mbox = re.findall(constants.METAMSGID_MBOX_RE, mdoc_id)[0]
+ chash = re.findall(constants.METAMSGID_CHASH_RE, mdoc_id)[0]
+
+ def _get_fdoc_id_from_mdoc_id():
+ return constants.FDOCID.format(mbox_uuid=mbox, chash=chash)
+
+ def _get_hdoc_id_from_mdoc_id():
+ return constants.HDOCID.format(mbox_uuid=mbox, chash=chash)
+
+ d_docs = []
+ fdoc_id = _get_fdoc_id_from_mdoc_id()
+ hdoc_id = _get_hdoc_id_from_mdoc_id()
+
+ d_docs.append(store.get_doc(mdoc_id))
+ d_docs.append(store.get_doc(fdoc_id))
+ d_docs.append(store.get_doc(hdoc_id))
+
+ d = defer.gatherResults(d_docs)
+ return d
+
+ def _err_log_failure_part_docs(failure):
+ # See https://leap.se/code/issues/7495.
+ # This avoids blocks, but the real cause still needs to be
+ # isolated (0.9.0rc3) -- kali
+ log.msg("BUG ---------------------------------------------------")
+ log.msg("BUG: Error while retrieving part docs for mdoc id %s" %
+ mdoc_id)
+ log.err(failure)
+ log.msg("BUG (please report above info) ------------------------")
+ return []
+
+ def _err_log_cannot_find_msg(failure):
+ log.msg("BUG: Error while getting msg (uid=%s)" % uid)
+ return None
+
+ if get_cdocs:
+ d = store.get_doc(mdoc_id)
+ d.addCallback(wrap_meta_doc)
+ d.addCallback(get_part_docs_from_mdoc_wrapper)
+ d.addErrback(_err_log_failure_part_docs)
+
+ else:
+ d = get_parts_doc_from_mdoc_id()
+
+ d.addCallback(self._get_msg_from_variable_doc_list,
+ msg_class=MessageClass, uid=uid)
+ d.addErrback(_err_log_cannot_find_msg)
+ return d
+
+ def _get_msg_from_variable_doc_list(self, doc_list, msg_class, uid=None):
+ if len(doc_list) == 3:
+ mdoc, fdoc, hdoc = doc_list
+ cdocs = None
+ elif len(doc_list) > 3:
+ # XXX is this case used?
+ mdoc, fdoc, hdoc = doc_list[:3]
+ cdocs = dict(enumerate(doc_list[3:], 1))
+ return self.get_msg_from_docs(
+ msg_class, mdoc, fdoc, hdoc, cdocs, uid=uid)
+
+ def get_flags_from_mdoc_id(self, store, mdoc_id):
+ """
+ # XXX stuff here...
+ """
+ mbox = re.findall(constants.METAMSGID_MBOX_RE, mdoc_id)[0]
+ chash = re.findall(constants.METAMSGID_CHASH_RE, mdoc_id)[0]
+
+ def _get_fdoc_id_from_mdoc_id():
+ return constants.FDOCID.format(mbox_uuid=mbox, chash=chash)
+
+ fdoc_id = _get_fdoc_id_from_mdoc_id()
+
+ def wrap_fdoc(doc):
+ if not doc:
+ return
+ cls = FlagsDocWrapper
+ return cls(doc_id=doc.doc_id, **doc.content)
+
+ def get_flags(fdoc_wrapper):
+ if not fdoc_wrapper:
+ return []
+ return fdoc_wrapper.get_flags()
+
+ d = store.get_doc(fdoc_id)
+ d.addCallback(wrap_fdoc)
+ d.addCallback(get_flags)
+ return d
+
+ def create_msg(self, store, msg):
+ """
+ :param store: an instance of soledad, or anything that behaves alike
+ :param msg: a Message object.
+
+ :return: a Deferred that is fired when all the underlying documents
+ have been created.
+ :rtype: defer.Deferred
+ """
+ wrapper = msg.get_wrapper()
+ return wrapper.create(store)
+
+ def update_msg(self, store, msg):
+ """
+ :param msg: a Message object.
+ :param store: an instance of soledad, or anything that behaves alike
+ :return: a Deferred that is fired when all the underlying documents
+ have been updated (actually, it's only the fdoc that's allowed
+ to update).
+ :rtype: defer.Deferred
+ """
+ wrapper = msg.get_wrapper()
+ return wrapper.update(store)
+
+ # batch deletion
+
+ def del_all_flagged_messages(self, store, mbox_uuid):
+ """
+ Delete all messages flagged as deleted.
+ """
+ def err(failure):
+ log.err(failure)
+
+ def delete_fdoc_and_mdoc_flagged(fdocs):
+ # low level here, not using the wrappers...
+ # get meta doc ids from the flag doc ids
+ fdoc_ids = [doc.doc_id for doc in fdocs]
+ mdoc_ids = map(lambda s: "M" + s[1:], fdoc_ids)
+
+ def delete_all_docs(mdocs, fdocs):
+ mdocs = list(mdocs)
+ doc_ids = [m.doc_id for m in mdocs]
+ _d = []
+ docs = mdocs + fdocs
+ for doc in docs:
+ _d.append(store.delete_doc(doc))
+ d = defer.gatherResults(_d)
+ # return the mdocs ids only
+ d.addCallback(lambda _: doc_ids)
+ return d
+
+ d = store.get_docs(mdoc_ids)
+ d.addCallback(delete_all_docs, fdocs)
+ d.addErrback(err)
+ return d
+
+ type_ = FlagsDocWrapper.model.type_
+ uuid = mbox_uuid.replace('-', '_')
+ deleted_index = indexes.TYPE_MBOX_DEL_IDX
+
+ d = store.get_from_index(deleted_index, type_, uuid, "1")
+ d.addCallbacks(delete_fdoc_and_mdoc_flagged, err)
+ return d
+
+ # count messages
+
+ def get_count_unseen(self, store, mbox_uuid):
+ """
+ Get the number of unseen messages for a given mailbox.
+
+ :param store: instance of Soledad.
+ :param mbox_uuid: the uuid for this mailbox.
+ :rtype: int
+ """
+ type_ = FlagsDocWrapper.model.type_
+ uuid = mbox_uuid.replace('-', '_')
+
+ unseen_index = indexes.TYPE_MBOX_SEEN_IDX
+
+ d = store.get_count_from_index(unseen_index, type_, uuid, "0")
+ d.addErrback(self._errback)
+ return d
+
+ def get_count_recent(self, store, mbox_uuid):
+ """
+ Get the number of recent messages for a given mailbox.
+
+ :param store: instance of Soledad.
+ :param mbox_uuid: the uuid for this mailbox.
+ :rtype: int
+ """
+ type_ = FlagsDocWrapper.model.type_
+ uuid = mbox_uuid.replace('-', '_')
+
+ recent_index = indexes.TYPE_MBOX_RECENT_IDX
+
+ d = store.get_count_from_index(recent_index, type_, uuid, "1")
+ d.addErrback(self._errback)
+ return d
+
+ # search api
+
+ def get_mdoc_id_from_msgid(self, store, mbox_uuid, msgid):
+ """
+ Get the UID for a message with the passed msgid (the one in the headers
+ msg-id).
+ This is used by the MUA to retrieve the recently saved draft.
+ """
+ type_ = HeaderDocWrapper.model.type_
+ uuid = mbox_uuid.replace('-', '_')
+
+ msgid_index = indexes.TYPE_MSGID_IDX
+
+ def get_mdoc_id(hdoc):
+ if not hdoc:
+ log.msg("Could not find a HDOC with MSGID %s" % msgid)
+ return None
+ hdoc = hdoc[0]
+ mdoc_id = hdoc.doc_id.replace("H-", "M-%s-" % uuid)
+ return mdoc_id
+
+ d = store.get_from_index(msgid_index, type_, msgid)
+ d.addCallback(get_mdoc_id)
+ return d
+
+ # Mailbox handling
+
+ def get_or_create_mbox(self, store, name):
+ """
+ Get the mailbox with the given name, or create one if it does not
+ exist.
+
+ :param store: instance of Soledad
+ :param name: the name of the mailbox
+ :type name: str
+ """
+ index = indexes.TYPE_MBOX_IDX
+ mbox = normalize_mailbox(name)
+ return MailboxWrapper.get_or_create(store, index, mbox)
+
+ def update_mbox(self, store, mbox_wrapper):
+ """
+ Update the documents for a given mailbox.
+ :param mbox_wrapper: MailboxWrapper instance
+ :type mbox_wrapper: MailboxWrapper
+ :return: a Deferred that will be fired when the mailbox documents
+ have been updated.
+ :rtype: defer.Deferred
+ """
+ leap_assert_type(mbox_wrapper, SoledadDocumentWrapper)
+ return mbox_wrapper.update(store)
+
+ def delete_mbox(self, store, mbox_wrapper):
+ leap_assert_type(mbox_wrapper, SoledadDocumentWrapper)
+ return mbox_wrapper.delete(store)
+
+ def get_all_mboxes(self, store):
+ """
+ Retrieve a list with wrappers for all the mailboxes.
+
+ :return: a deferred that will be fired with a list of all the
+ MailboxWrappers found.
+ :rtype: defer.Deferred
+ """
+ return MailboxWrapper.get_all(store)
+
+ def _errback(self, failure):
+ log.err(failure)
+
+
+def _split_into_parts(raw):
+ # TODO signal that we can delete the original message!-----
+ # when all the processing is done.
+ # TODO add the linked-from info !
+ # TODO add reference to the original message?
+ # TODO populate Default FLAGS/TAGS (unseen?)
+ # TODO seed propely the content_docs with defaults??
+
+ msg, chash, multi = _parse_msg(raw)
+ size = len(msg.as_string())
+
+ parts_map = walk.get_tree(msg)
+ cdocs_list = list(walk.get_raw_docs(msg))
+ cdocs_phashes = [c['phash'] for c in cdocs_list]
+ body_phash = walk.get_body_phash(msg)
+
+ mdoc = _build_meta_doc(chash, cdocs_phashes)
+ fdoc = _build_flags_doc(chash, size, multi)
+ hdoc = _build_headers_doc(msg, chash, body_phash, parts_map)
+
+ # The MessageWrapper expects a dict, one-indexed
+ cdocs = dict(enumerate(cdocs_list, 1))
+
+ return mdoc, fdoc, hdoc, cdocs
+
+
+def _parse_msg(raw):
+ msg = message_from_string(raw)
+ chash = walk.get_hash(raw)
+ multi = msg.is_multipart()
+ return msg, chash, multi
+
+
+def _build_meta_doc(chash, cdocs_phashes):
+ _mdoc = MetaMsgDocWrapper()
+ # FIXME passing the inbox name because we don't have the uuid at this
+ # point.
+
+ _mdoc.fdoc = constants.FDOCID.format(mbox_uuid=INBOX_NAME, chash=chash)
+ _mdoc.hdoc = constants.HDOCID.format(chash=chash)
+ _mdoc.cdocs = [constants.CDOCID.format(phash=p) for p in cdocs_phashes]
+
+ return _mdoc.serialize()
+
+
+def _build_flags_doc(chash, size, multi):
+ _fdoc = FlagsDocWrapper(chash=chash, size=size, multi=multi)
+ return _fdoc.serialize()
+
+
+def _build_headers_doc(msg, chash, body_phash, parts_map):
+ """
+ Assemble a headers document from the original parsed message, the
+ content-hash, and the parts map.
+
+ It takes into account possibly repeated headers.
+ """
+ headers = defaultdict(list)
+ for k, v in msg.items():
+ headers[k].append(v)
+ # "fix" for repeated headers (as in "Received:"
+ for k, v in headers.items():
+ newline = "\n%s: " % (k.lower(),)
+ headers[k] = newline.join(v)
+
+ lower_headers = lowerdict(dict(headers))
+ msgid = first(_MSGID_RE.findall(
+ lower_headers.get('message-id', '')))
+
+ _hdoc = HeaderDocWrapper(
+ chash=chash, headers=headers, body=body_phash,
+ msgid=msgid)
+
+ def copy_attr(headers, key, doc):
+ if key in headers:
+ setattr(doc, key, headers[key])
+
+ copy_attr(lower_headers, "subject", _hdoc)
+ copy_attr(lower_headers, "date", _hdoc)
+
+ hdoc = _hdoc.serialize()
+ # add some of the attr from the parts map to header doc
+ for key in parts_map:
+ if key in ('body', 'multi', 'part_map'):
+ hdoc[key] = parts_map[key]
+ return stringify_parts_map(hdoc)