diff options
Diffstat (limited to 'src/leap/mail/imap/soledadstore.py')
-rw-r--r-- | src/leap/mail/imap/soledadstore.py | 620 |
1 files changed, 620 insertions, 0 deletions
diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py new file mode 100644 index 0000000..f3de8eb --- /dev/null +++ b/src/leap/mail/imap/soledadstore.py @@ -0,0 +1,620 @@ +# -*- coding: utf-8 -*- +# soledadstore.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +A MessageStore that writes to Soledad. +""" +import logging +import threading + +from collections import defaultdict +from itertools import chain + +from u1db import errors as u1db_errors +from twisted.python import log +from zope.interface import implements + +from leap.common.check import leap_assert_type, leap_assert +from leap.mail.decorators import deferred_to_thread +from leap.mail.imap.messageparts import MessagePartType +from leap.mail.imap.messageparts import MessageWrapper +from leap.mail.imap.messageparts import RecentFlagsDoc +from leap.mail.imap.fields import fields +from leap.mail.imap.interfaces import IMessageStore +from leap.mail.messageflow import IMessageConsumer +from leap.mail.utils import first, empty, accumulator_queue + +logger = logging.getLogger(__name__) + + +# TODO +# [ ] Implement a retry queue? +# [ ] Consider journaling of operations. + + +class ContentDedup(object): + """ + Message deduplication. + + We do a query for the content hashes before writing to our beloved + sqlcipher backend of Soledad. This means, by now, that: + + 1. We will not store the same body/attachment twice, only the hash of it. + 2. We will not store the same message header twice, only the hash of it. + + The first case is useful if you are always receiving the same old memes + from unwary friends that still have not discovered that 4chan is the + generator of the internet. The second will save your day if you have + initiated session with the same account in two different machines. I also + wonder why would you do that, but let's respect each other choices, like + with the religious celebrations, and assume that one day we'll be able + to run Bitmask in completely free phones. Yes, I mean that, the whole GSM + Stack. + """ + # TODO refactor using unique_query + + def _header_does_exist(self, doc): + """ + Check whether we already have a header document for this + content hash in our database. + + :param doc: tentative header for document + :type doc: dict + :returns: True if it exists, False otherwise. + """ + if not doc: + return False + chash = doc[fields.CONTENT_HASH_KEY] + header_docs = self._soledad.get_from_index( + fields.TYPE_C_HASH_IDX, + fields.TYPE_HEADERS_VAL, str(chash)) + if not header_docs: + return False + + # FIXME enable only to debug this problem. + #if len(header_docs) != 1: + #logger.warning("Found more than one copy of chash %s!" + #% (chash,)) + + #logger.debug("Found header doc with that hash! Skipping save!") + return True + + def _content_does_exist(self, doc): + """ + Check whether we already have a content document for a payload + with this hash in our database. + + :param doc: tentative content for document + :type doc: dict + :returns: True if it exists, False otherwise. + """ + if not doc: + return False + phash = doc[fields.PAYLOAD_HASH_KEY] + attach_docs = self._soledad.get_from_index( + fields.TYPE_P_HASH_IDX, + fields.TYPE_CONTENT_VAL, str(phash)) + if not attach_docs: + return False + + # FIXME enable only to debug this problem + #if len(attach_docs) != 1: + #logger.warning("Found more than one copy of phash %s!" + #% (phash,)) + #logger.debug("Found attachment doc with that hash! Skipping save!") + return True + + +class MsgWriteError(Exception): + """ + Raised if any exception is found while saving message parts. + """ + pass + + +""" +A lock per document. +""" +# TODO should bound the space of this!!! +# http://stackoverflow.com/a/2437645/1157664 +# Setting this to twice the number of threads in the threadpool +# should be safe. +put_locks = defaultdict(lambda: threading.Lock()) +mbox_doc_locks = defaultdict(lambda: threading.Lock()) + + +class SoledadStore(ContentDedup): + """ + This will create docs in the local Soledad database. + """ + _remove_lock = threading.Lock() + + implements(IMessageConsumer, IMessageStore) + + def __init__(self, soledad): + """ + Initialize the permanent store that writes to Soledad database. + + :param soledad: the soledad instance + :type soledad: Soledad + """ + from twisted.internet import reactor + self.reactor = reactor + + self._soledad = soledad + + self._CREATE_DOC_FUN = self._soledad.create_doc + self._PUT_DOC_FUN = self._soledad.put_doc + self._GET_DOC_FUN = self._soledad.get_doc + + # we instantiate an accumulator to batch the notifications + self.docs_notify_queue = accumulator_queue( + lambda item: reactor.callFromThread(self._unset_new_dirty, item), + 20) + + # IMessageStore + + # ------------------------------------------------------------------- + # We are not yet using this interface, but it would make sense + # to implement it. + + def create_message(self, mbox, uid, message): + """ + Create the passed message into this SoledadStore. + + :param mbox: the mbox this message belongs. + :type mbox: str or unicode + :param uid: the UID that identifies this message in this mailbox. + :type uid: int + :param message: a IMessageContainer implementor. + """ + raise NotImplementedError() + + def put_message(self, mbox, uid, message): + """ + Put the passed existing message into this SoledadStore. + + :param mbox: the mbox this message belongs. + :type mbox: str or unicode + :param uid: the UID that identifies this message in this mailbox. + :type uid: int + :param message: a IMessageContainer implementor. + """ + raise NotImplementedError() + + def remove_message(self, mbox, uid): + """ + Remove the given message from this SoledadStore. + + :param mbox: the mbox this message belongs. + :type mbox: str or unicode + :param uid: the UID that identifies this message in this mailbox. + :type uid: int + """ + raise NotImplementedError() + + def get_message(self, mbox, uid): + """ + Get a IMessageContainer for the given mbox and uid combination. + + :param mbox: the mbox this message belongs. + :type mbox: str or unicode + :param uid: the UID that identifies this message in this mailbox. + :type uid: int + """ + raise NotImplementedError() + + # IMessageConsumer + + # TODO should handle the delete case + # TODO should handle errors better + # TODO could generalize this method into a generic consumer + # and only implement `process` here + + def consume(self, queue): + """ + Creates a new document in soledad db. + + :param queue: a tuple of queues to get item from, with content of the + document to be inserted. + :type queue: tuple of Queues + """ + new, dirty = queue + while not new.empty(): + doc_wrapper = new.get() + self.reactor.callInThread(self._consume_doc, doc_wrapper, + self.docs_notify_queue) + while not dirty.empty(): + doc_wrapper = dirty.get() + self.reactor.callInThread(self._consume_doc, doc_wrapper, + self.docs_notify_queue) + + # Queue empty, flush the notifications queue. + self.docs_notify_queue(None, flush=True) + + def _unset_new_dirty(self, doc_wrapper): + """ + Unset the `new` and `dirty` flags for this document wrapper in the + memory store. + + :param doc_wrapper: a MessageWrapper instance + :type doc_wrapper: MessageWrapper + """ + if isinstance(doc_wrapper, MessageWrapper): + # XXX still needed for debug quite often + #logger.info("unsetting new flag!") + doc_wrapper.new = False + doc_wrapper.dirty = False + + @deferred_to_thread + def _consume_doc(self, doc_wrapper, notify_queue): + """ + Consume each document wrapper in a separate thread. + We pass an instance of an accumulator that handles the notifications + to the memorystore when the write has been done. + + :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance + :type doc_wrapper: MessageWrapper or RecentFlagsDoc + :param notify_queue: a callable that handles the writeback + notifications to the memstore. + :type notify_queue: callable + """ + def queueNotifyBack(failed, doc_wrapper): + if failed: + log.msg("There was an error writing the mesage...") + else: + notify_queue(doc_wrapper) + + def doSoledadCalls(items): + # we prime the generator, that should return the + # message or flags wrapper item in the first place. + try: + doc_wrapper = items.next() + except StopIteration: + pass + else: + failed = self._soledad_write_document_parts(items) + queueNotifyBack(failed, doc_wrapper) + + doSoledadCalls(self._iter_wrapper_subparts(doc_wrapper)) + + # + # SoledadStore specific methods. + # + + def _soledad_write_document_parts(self, items): + """ + Write the document parts to soledad in a separate thread. + + :param items: the iterator through the different document wrappers + payloads. + :type items: iterator + :return: whether the write was successful or not + :rtype: bool + """ + failed = False + for item, call in items: + if empty(item): + continue + try: + self._try_call(call, item) + except Exception as exc: + logger.debug("ITEM WAS: %s" % repr(item)) + if hasattr(item, 'content'): + logger.debug("ITEM CONTENT WAS: %s" % + repr(item.content)) + logger.exception(exc) + failed = True + continue + return failed + + def _iter_wrapper_subparts(self, doc_wrapper): + """ + Return an iterator that will yield the doc_wrapper in the first place, + followed by the subparts item and the proper call type for every + item in the queue, if any. + + :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance + :type doc_wrapper: MessageWrapper or RecentFlagsDoc + """ + if isinstance(doc_wrapper, MessageWrapper): + return chain((doc_wrapper,), + self._get_calls_for_msg_parts(doc_wrapper)) + elif isinstance(doc_wrapper, RecentFlagsDoc): + return chain((doc_wrapper,), + self._get_calls_for_rflags_doc(doc_wrapper)) + else: + logger.warning("CANNOT PROCESS ITEM!") + return (i for i in []) + + def _try_call(self, call, item): + """ + Try to invoke a given call with item as a parameter. + + :param call: the function to call + :type call: callable + :param item: the payload to pass to the call as argument + :type item: object + """ + if call is None: + return + + if call == self._PUT_DOC_FUN: + doc_id = item.doc_id + if doc_id is None: + logger.warning("BUG! Dirty doc but has no doc_id!") + return + with put_locks[doc_id]: + doc = self._GET_DOC_FUN(doc_id) + + if doc is None: + logger.warning("BUG! Dirty doc but could not " + "find document %s" % (doc_id,)) + return + + doc.content = dict(item.content) + + item = doc + try: + call(item) + except u1db_errors.RevisionConflict as exc: + logger.exception("Error: %r" % (exc,)) + raise exc + except Exception as exc: + logger.exception("Error: %r" % (exc,)) + raise exc + + else: + try: + call(item) + except u1db_errors.RevisionConflict as exc: + logger.exception("Error: %r" % (exc,)) + raise exc + except Exception as exc: + logger.exception("Error: %r" % (exc,)) + raise exc + + def _get_calls_for_msg_parts(self, msg_wrapper): + """ + Generator that return the proper call type for a given item. + + :param msg_wrapper: A MessageWrapper + :type msg_wrapper: IMessageContainer + :return: a generator of tuples with recent-flags doc payload + and callable + :rtype: generator + """ + call = None + + if msg_wrapper.new: + call = self._CREATE_DOC_FUN + + # item is expected to be a MessagePartDoc + for item in msg_wrapper.walk(): + if item.part == MessagePartType.fdoc: + yield dict(item.content), call + + elif item.part == MessagePartType.hdoc: + if not self._header_does_exist(item.content): + yield dict(item.content), call + + elif item.part == MessagePartType.cdoc: + if not self._content_does_exist(item.content): + yield dict(item.content), call + + # For now, the only thing that will be dirty is + # the flags doc. + + elif msg_wrapper.dirty: + call = self._PUT_DOC_FUN + # item is expected to be a MessagePartDoc + for item in msg_wrapper.walk(): + # XXX FIXME Give error if dirty and not doc_id !!! + doc_id = item.doc_id # defend! + if not doc_id: + logger.warning("Dirty item but no doc_id!") + continue + + if item.part == MessagePartType.fdoc: + #logger.debug("PUT dirty fdoc") + yield item, call + + # XXX also for linkage-doc !!! + else: + logger.error("Cannot delete documents yet from the queue...!") + + def _get_calls_for_rflags_doc(self, rflags_wrapper): + """ + We always put these documents. + + :param rflags_wrapper: A wrapper around recent flags doc. + :type rflags_wrapper: RecentFlagsWrapper + :return: a tuple with recent-flags doc payload and callable + :rtype: tuple + """ + call = self._PUT_DOC_FUN + + payload = rflags_wrapper.content + if payload: + logger.debug("Saving RFLAGS to Soledad...") + yield rflags_wrapper, call + + # Mbox documents and attributes + + def get_mbox_document(self, mbox): + """ + Return mailbox document. + + :param mbox: the mailbox + :type mbox: str or unicode + :return: A SoledadDocument containing this mailbox, or None if + the query failed. + :rtype: SoledadDocument or None. + """ + with mbox_doc_locks[mbox]: + return self._get_mbox_document(mbox) + + def _get_mbox_document(self, mbox): + """ + Helper for returning the mailbox document. + """ + try: + query = self._soledad.get_from_index( + fields.TYPE_MBOX_IDX, + fields.TYPE_MBOX_VAL, mbox) + if query: + return query.pop() + else: + logger.error("Could not find mbox document for %r" % + (mbox,)) + except Exception as exc: + logger.exception("Unhandled error %r" % exc) + + def get_mbox_closed(self, mbox): + """ + Return the closed attribute for a given mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :rtype: bool + """ + mbox_doc = self.get_mbox_document() + return mbox_doc.content.get(fields.CLOSED_KEY, False) + + def set_mbox_closed(self, mbox, closed): + """ + Set the closed attribute for a given mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :param closed: the value to be set + :type closed: bool + """ + leap_assert(isinstance(closed, bool), "closed needs to be boolean") + with mbox_doc_locks[mbox]: + mbox_doc = self._get_mbox_document(mbox) + if mbox_doc is None: + logger.error( + "Could not find mbox document for %r" % (mbox,)) + return + mbox_doc.content[fields.CLOSED_KEY] = closed + self._soledad.put_doc(mbox_doc) + + def write_last_uid(self, mbox, value): + """ + Write the `last_uid` integer to the proper mailbox document + in Soledad. + This is called from the deferred triggered by + memorystore.increment_last_soledad_uid, which is expected to + run in a separate thread. + + :param mbox: the mailbox + :type mbox: str or unicode + :param value: the value to set + :type value: int + """ + leap_assert_type(value, int) + key = fields.LAST_UID_KEY + + # XXX use accumulator to reduce number of hits + with mbox_doc_locks[mbox]: + mbox_doc = self._get_mbox_document(mbox) + old_val = mbox_doc.content[key] + if value > old_val: + mbox_doc.content[key] = value + try: + self._soledad.put_doc(mbox_doc) + except Exception as exc: + logger.error("Error while setting last_uid for %r" + % (mbox,)) + logger.exception(exc) + + def get_flags_doc(self, mbox, uid): + """ + Return the SoledadDocument for the given mbox and uid. + + :param mbox: the mailbox + :type mbox: str or unicode + :param uid: the UID for the message + :type uid: int + :rtype: SoledadDocument or None + """ + result = None + try: + flag_docs = self._soledad.get_from_index( + fields.TYPE_MBOX_UID_IDX, + fields.TYPE_FLAGS_VAL, mbox, str(uid)) + if len(flag_docs) != 1: + logger.warning("More than one flag doc for %r:%s" % + (mbox, uid)) + result = first(flag_docs) + except Exception as exc: + # ugh! Something's broken down there! + logger.warning("ERROR while getting flags for UID: %s" % uid) + logger.exception(exc) + finally: + return result + + def get_headers_doc(self, chash): + """ + Return the document that keeps the headers for a message + indexed by its content-hash. + + :param chash: the content-hash to retrieve the document from. + :type chash: str or unicode + :rtype: SoledadDocument or None + """ + head_docs = self._soledad.get_from_index( + fields.TYPE_C_HASH_IDX, + fields.TYPE_HEADERS_VAL, str(chash)) + return first(head_docs) + + # deleted messages + + def deleted_iter(self, mbox): + """ + Get an iterator for the the doc_id for SoledadDocuments for messages + with \\Deleted flag for a given mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :return: iterator through deleted message docs + :rtype: iterable + """ + return [doc.doc_id for doc in self._soledad.get_from_index( + fields.TYPE_MBOX_DEL_IDX, + fields.TYPE_FLAGS_VAL, mbox, '1')] + + def remove_all_deleted(self, mbox): + """ + Remove from Soledad all messages flagged as deleted for a given + mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + """ + deleted = [] + for doc_id in self.deleted_iter(mbox): + with self._remove_lock: + doc = self._soledad.get_doc(doc_id) + if doc is not None: + self._soledad.delete_doc(doc) + try: + deleted.append(doc.content[fields.UID_KEY]) + except TypeError: + # empty content + pass + return deleted |