summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/soledadstore.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/mail/imap/soledadstore.py')
-rw-r--r--src/leap/mail/imap/soledadstore.py620
1 files changed, 620 insertions, 0 deletions
diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py
new file mode 100644
index 0000000..f3de8eb
--- /dev/null
+++ b/src/leap/mail/imap/soledadstore.py
@@ -0,0 +1,620 @@
+# -*- coding: utf-8 -*-
+# soledadstore.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+A MessageStore that writes to Soledad.
+"""
+import logging
+import threading
+
+from collections import defaultdict
+from itertools import chain
+
+from u1db import errors as u1db_errors
+from twisted.python import log
+from zope.interface import implements
+
+from leap.common.check import leap_assert_type, leap_assert
+from leap.mail.decorators import deferred_to_thread
+from leap.mail.imap.messageparts import MessagePartType
+from leap.mail.imap.messageparts import MessageWrapper
+from leap.mail.imap.messageparts import RecentFlagsDoc
+from leap.mail.imap.fields import fields
+from leap.mail.imap.interfaces import IMessageStore
+from leap.mail.messageflow import IMessageConsumer
+from leap.mail.utils import first, empty, accumulator_queue
+
+logger = logging.getLogger(__name__)
+
+
+# TODO
+# [ ] Implement a retry queue?
+# [ ] Consider journaling of operations.
+
+
+class ContentDedup(object):
+ """
+ Message deduplication.
+
+ We do a query for the content hashes before writing to our beloved
+ sqlcipher backend of Soledad. This means, by now, that:
+
+ 1. We will not store the same body/attachment twice, only the hash of it.
+ 2. We will not store the same message header twice, only the hash of it.
+
+ The first case is useful if you are always receiving the same old memes
+ from unwary friends that still have not discovered that 4chan is the
+ generator of the internet. The second will save your day if you have
+ initiated session with the same account in two different machines. I also
+ wonder why would you do that, but let's respect each other choices, like
+ with the religious celebrations, and assume that one day we'll be able
+ to run Bitmask in completely free phones. Yes, I mean that, the whole GSM
+ Stack.
+ """
+ # TODO refactor using unique_query
+
+ def _header_does_exist(self, doc):
+ """
+ Check whether we already have a header document for this
+ content hash in our database.
+
+ :param doc: tentative header for document
+ :type doc: dict
+ :returns: True if it exists, False otherwise.
+ """
+ if not doc:
+ return False
+ chash = doc[fields.CONTENT_HASH_KEY]
+ header_docs = self._soledad.get_from_index(
+ fields.TYPE_C_HASH_IDX,
+ fields.TYPE_HEADERS_VAL, str(chash))
+ if not header_docs:
+ return False
+
+ # FIXME enable only to debug this problem.
+ #if len(header_docs) != 1:
+ #logger.warning("Found more than one copy of chash %s!"
+ #% (chash,))
+
+ #logger.debug("Found header doc with that hash! Skipping save!")
+ return True
+
+ def _content_does_exist(self, doc):
+ """
+ Check whether we already have a content document for a payload
+ with this hash in our database.
+
+ :param doc: tentative content for document
+ :type doc: dict
+ :returns: True if it exists, False otherwise.
+ """
+ if not doc:
+ return False
+ phash = doc[fields.PAYLOAD_HASH_KEY]
+ attach_docs = self._soledad.get_from_index(
+ fields.TYPE_P_HASH_IDX,
+ fields.TYPE_CONTENT_VAL, str(phash))
+ if not attach_docs:
+ return False
+
+ # FIXME enable only to debug this problem
+ #if len(attach_docs) != 1:
+ #logger.warning("Found more than one copy of phash %s!"
+ #% (phash,))
+ #logger.debug("Found attachment doc with that hash! Skipping save!")
+ return True
+
+
+class MsgWriteError(Exception):
+ """
+ Raised if any exception is found while saving message parts.
+ """
+ pass
+
+
+"""
+A lock per document.
+"""
+# TODO should bound the space of this!!!
+# http://stackoverflow.com/a/2437645/1157664
+# Setting this to twice the number of threads in the threadpool
+# should be safe.
+put_locks = defaultdict(lambda: threading.Lock())
+mbox_doc_locks = defaultdict(lambda: threading.Lock())
+
+
+class SoledadStore(ContentDedup):
+ """
+ This will create docs in the local Soledad database.
+ """
+ _remove_lock = threading.Lock()
+
+ implements(IMessageConsumer, IMessageStore)
+
+ def __init__(self, soledad):
+ """
+ Initialize the permanent store that writes to Soledad database.
+
+ :param soledad: the soledad instance
+ :type soledad: Soledad
+ """
+ from twisted.internet import reactor
+ self.reactor = reactor
+
+ self._soledad = soledad
+
+ self._CREATE_DOC_FUN = self._soledad.create_doc
+ self._PUT_DOC_FUN = self._soledad.put_doc
+ self._GET_DOC_FUN = self._soledad.get_doc
+
+ # we instantiate an accumulator to batch the notifications
+ self.docs_notify_queue = accumulator_queue(
+ lambda item: reactor.callFromThread(self._unset_new_dirty, item),
+ 20)
+
+ # IMessageStore
+
+ # -------------------------------------------------------------------
+ # We are not yet using this interface, but it would make sense
+ # to implement it.
+
+ def create_message(self, mbox, uid, message):
+ """
+ Create the passed message into this SoledadStore.
+
+ :param mbox: the mbox this message belongs.
+ :type mbox: str or unicode
+ :param uid: the UID that identifies this message in this mailbox.
+ :type uid: int
+ :param message: a IMessageContainer implementor.
+ """
+ raise NotImplementedError()
+
+ def put_message(self, mbox, uid, message):
+ """
+ Put the passed existing message into this SoledadStore.
+
+ :param mbox: the mbox this message belongs.
+ :type mbox: str or unicode
+ :param uid: the UID that identifies this message in this mailbox.
+ :type uid: int
+ :param message: a IMessageContainer implementor.
+ """
+ raise NotImplementedError()
+
+ def remove_message(self, mbox, uid):
+ """
+ Remove the given message from this SoledadStore.
+
+ :param mbox: the mbox this message belongs.
+ :type mbox: str or unicode
+ :param uid: the UID that identifies this message in this mailbox.
+ :type uid: int
+ """
+ raise NotImplementedError()
+
+ def get_message(self, mbox, uid):
+ """
+ Get a IMessageContainer for the given mbox and uid combination.
+
+ :param mbox: the mbox this message belongs.
+ :type mbox: str or unicode
+ :param uid: the UID that identifies this message in this mailbox.
+ :type uid: int
+ """
+ raise NotImplementedError()
+
+ # IMessageConsumer
+
+ # TODO should handle the delete case
+ # TODO should handle errors better
+ # TODO could generalize this method into a generic consumer
+ # and only implement `process` here
+
+ def consume(self, queue):
+ """
+ Creates a new document in soledad db.
+
+ :param queue: a tuple of queues to get item from, with content of the
+ document to be inserted.
+ :type queue: tuple of Queues
+ """
+ new, dirty = queue
+ while not new.empty():
+ doc_wrapper = new.get()
+ self.reactor.callInThread(self._consume_doc, doc_wrapper,
+ self.docs_notify_queue)
+ while not dirty.empty():
+ doc_wrapper = dirty.get()
+ self.reactor.callInThread(self._consume_doc, doc_wrapper,
+ self.docs_notify_queue)
+
+ # Queue empty, flush the notifications queue.
+ self.docs_notify_queue(None, flush=True)
+
+ def _unset_new_dirty(self, doc_wrapper):
+ """
+ Unset the `new` and `dirty` flags for this document wrapper in the
+ memory store.
+
+ :param doc_wrapper: a MessageWrapper instance
+ :type doc_wrapper: MessageWrapper
+ """
+ if isinstance(doc_wrapper, MessageWrapper):
+ # XXX still needed for debug quite often
+ #logger.info("unsetting new flag!")
+ doc_wrapper.new = False
+ doc_wrapper.dirty = False
+
+ @deferred_to_thread
+ def _consume_doc(self, doc_wrapper, notify_queue):
+ """
+ Consume each document wrapper in a separate thread.
+ We pass an instance of an accumulator that handles the notifications
+ to the memorystore when the write has been done.
+
+ :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance
+ :type doc_wrapper: MessageWrapper or RecentFlagsDoc
+ :param notify_queue: a callable that handles the writeback
+ notifications to the memstore.
+ :type notify_queue: callable
+ """
+ def queueNotifyBack(failed, doc_wrapper):
+ if failed:
+ log.msg("There was an error writing the mesage...")
+ else:
+ notify_queue(doc_wrapper)
+
+ def doSoledadCalls(items):
+ # we prime the generator, that should return the
+ # message or flags wrapper item in the first place.
+ try:
+ doc_wrapper = items.next()
+ except StopIteration:
+ pass
+ else:
+ failed = self._soledad_write_document_parts(items)
+ queueNotifyBack(failed, doc_wrapper)
+
+ doSoledadCalls(self._iter_wrapper_subparts(doc_wrapper))
+
+ #
+ # SoledadStore specific methods.
+ #
+
+ def _soledad_write_document_parts(self, items):
+ """
+ Write the document parts to soledad in a separate thread.
+
+ :param items: the iterator through the different document wrappers
+ payloads.
+ :type items: iterator
+ :return: whether the write was successful or not
+ :rtype: bool
+ """
+ failed = False
+ for item, call in items:
+ if empty(item):
+ continue
+ try:
+ self._try_call(call, item)
+ except Exception as exc:
+ logger.debug("ITEM WAS: %s" % repr(item))
+ if hasattr(item, 'content'):
+ logger.debug("ITEM CONTENT WAS: %s" %
+ repr(item.content))
+ logger.exception(exc)
+ failed = True
+ continue
+ return failed
+
+ def _iter_wrapper_subparts(self, doc_wrapper):
+ """
+ Return an iterator that will yield the doc_wrapper in the first place,
+ followed by the subparts item and the proper call type for every
+ item in the queue, if any.
+
+ :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance
+ :type doc_wrapper: MessageWrapper or RecentFlagsDoc
+ """
+ if isinstance(doc_wrapper, MessageWrapper):
+ return chain((doc_wrapper,),
+ self._get_calls_for_msg_parts(doc_wrapper))
+ elif isinstance(doc_wrapper, RecentFlagsDoc):
+ return chain((doc_wrapper,),
+ self._get_calls_for_rflags_doc(doc_wrapper))
+ else:
+ logger.warning("CANNOT PROCESS ITEM!")
+ return (i for i in [])
+
+ def _try_call(self, call, item):
+ """
+ Try to invoke a given call with item as a parameter.
+
+ :param call: the function to call
+ :type call: callable
+ :param item: the payload to pass to the call as argument
+ :type item: object
+ """
+ if call is None:
+ return
+
+ if call == self._PUT_DOC_FUN:
+ doc_id = item.doc_id
+ if doc_id is None:
+ logger.warning("BUG! Dirty doc but has no doc_id!")
+ return
+ with put_locks[doc_id]:
+ doc = self._GET_DOC_FUN(doc_id)
+
+ if doc is None:
+ logger.warning("BUG! Dirty doc but could not "
+ "find document %s" % (doc_id,))
+ return
+
+ doc.content = dict(item.content)
+
+ item = doc
+ try:
+ call(item)
+ except u1db_errors.RevisionConflict as exc:
+ logger.exception("Error: %r" % (exc,))
+ raise exc
+ except Exception as exc:
+ logger.exception("Error: %r" % (exc,))
+ raise exc
+
+ else:
+ try:
+ call(item)
+ except u1db_errors.RevisionConflict as exc:
+ logger.exception("Error: %r" % (exc,))
+ raise exc
+ except Exception as exc:
+ logger.exception("Error: %r" % (exc,))
+ raise exc
+
+ def _get_calls_for_msg_parts(self, msg_wrapper):
+ """
+ Generator that return the proper call type for a given item.
+
+ :param msg_wrapper: A MessageWrapper
+ :type msg_wrapper: IMessageContainer
+ :return: a generator of tuples with recent-flags doc payload
+ and callable
+ :rtype: generator
+ """
+ call = None
+
+ if msg_wrapper.new:
+ call = self._CREATE_DOC_FUN
+
+ # item is expected to be a MessagePartDoc
+ for item in msg_wrapper.walk():
+ if item.part == MessagePartType.fdoc:
+ yield dict(item.content), call
+
+ elif item.part == MessagePartType.hdoc:
+ if not self._header_does_exist(item.content):
+ yield dict(item.content), call
+
+ elif item.part == MessagePartType.cdoc:
+ if not self._content_does_exist(item.content):
+ yield dict(item.content), call
+
+ # For now, the only thing that will be dirty is
+ # the flags doc.
+
+ elif msg_wrapper.dirty:
+ call = self._PUT_DOC_FUN
+ # item is expected to be a MessagePartDoc
+ for item in msg_wrapper.walk():
+ # XXX FIXME Give error if dirty and not doc_id !!!
+ doc_id = item.doc_id # defend!
+ if not doc_id:
+ logger.warning("Dirty item but no doc_id!")
+ continue
+
+ if item.part == MessagePartType.fdoc:
+ #logger.debug("PUT dirty fdoc")
+ yield item, call
+
+ # XXX also for linkage-doc !!!
+ else:
+ logger.error("Cannot delete documents yet from the queue...!")
+
+ def _get_calls_for_rflags_doc(self, rflags_wrapper):
+ """
+ We always put these documents.
+
+ :param rflags_wrapper: A wrapper around recent flags doc.
+ :type rflags_wrapper: RecentFlagsWrapper
+ :return: a tuple with recent-flags doc payload and callable
+ :rtype: tuple
+ """
+ call = self._PUT_DOC_FUN
+
+ payload = rflags_wrapper.content
+ if payload:
+ logger.debug("Saving RFLAGS to Soledad...")
+ yield rflags_wrapper, call
+
+ # Mbox documents and attributes
+
+ def get_mbox_document(self, mbox):
+ """
+ Return mailbox document.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :return: A SoledadDocument containing this mailbox, or None if
+ the query failed.
+ :rtype: SoledadDocument or None.
+ """
+ with mbox_doc_locks[mbox]:
+ return self._get_mbox_document(mbox)
+
+ def _get_mbox_document(self, mbox):
+ """
+ Helper for returning the mailbox document.
+ """
+ try:
+ query = self._soledad.get_from_index(
+ fields.TYPE_MBOX_IDX,
+ fields.TYPE_MBOX_VAL, mbox)
+ if query:
+ return query.pop()
+ else:
+ logger.error("Could not find mbox document for %r" %
+ (mbox,))
+ except Exception as exc:
+ logger.exception("Unhandled error %r" % exc)
+
+ def get_mbox_closed(self, mbox):
+ """
+ Return the closed attribute for a given mailbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :rtype: bool
+ """
+ mbox_doc = self.get_mbox_document()
+ return mbox_doc.content.get(fields.CLOSED_KEY, False)
+
+ def set_mbox_closed(self, mbox, closed):
+ """
+ Set the closed attribute for a given mailbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param closed: the value to be set
+ :type closed: bool
+ """
+ leap_assert(isinstance(closed, bool), "closed needs to be boolean")
+ with mbox_doc_locks[mbox]:
+ mbox_doc = self._get_mbox_document(mbox)
+ if mbox_doc is None:
+ logger.error(
+ "Could not find mbox document for %r" % (mbox,))
+ return
+ mbox_doc.content[fields.CLOSED_KEY] = closed
+ self._soledad.put_doc(mbox_doc)
+
+ def write_last_uid(self, mbox, value):
+ """
+ Write the `last_uid` integer to the proper mailbox document
+ in Soledad.
+ This is called from the deferred triggered by
+ memorystore.increment_last_soledad_uid, which is expected to
+ run in a separate thread.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param value: the value to set
+ :type value: int
+ """
+ leap_assert_type(value, int)
+ key = fields.LAST_UID_KEY
+
+ # XXX use accumulator to reduce number of hits
+ with mbox_doc_locks[mbox]:
+ mbox_doc = self._get_mbox_document(mbox)
+ old_val = mbox_doc.content[key]
+ if value > old_val:
+ mbox_doc.content[key] = value
+ try:
+ self._soledad.put_doc(mbox_doc)
+ except Exception as exc:
+ logger.error("Error while setting last_uid for %r"
+ % (mbox,))
+ logger.exception(exc)
+
+ def get_flags_doc(self, mbox, uid):
+ """
+ Return the SoledadDocument for the given mbox and uid.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param uid: the UID for the message
+ :type uid: int
+ :rtype: SoledadDocument or None
+ """
+ result = None
+ try:
+ flag_docs = self._soledad.get_from_index(
+ fields.TYPE_MBOX_UID_IDX,
+ fields.TYPE_FLAGS_VAL, mbox, str(uid))
+ if len(flag_docs) != 1:
+ logger.warning("More than one flag doc for %r:%s" %
+ (mbox, uid))
+ result = first(flag_docs)
+ except Exception as exc:
+ # ugh! Something's broken down there!
+ logger.warning("ERROR while getting flags for UID: %s" % uid)
+ logger.exception(exc)
+ finally:
+ return result
+
+ def get_headers_doc(self, chash):
+ """
+ Return the document that keeps the headers for a message
+ indexed by its content-hash.
+
+ :param chash: the content-hash to retrieve the document from.
+ :type chash: str or unicode
+ :rtype: SoledadDocument or None
+ """
+ head_docs = self._soledad.get_from_index(
+ fields.TYPE_C_HASH_IDX,
+ fields.TYPE_HEADERS_VAL, str(chash))
+ return first(head_docs)
+
+ # deleted messages
+
+ def deleted_iter(self, mbox):
+ """
+ Get an iterator for the the doc_id for SoledadDocuments for messages
+ with \\Deleted flag for a given mailbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :return: iterator through deleted message docs
+ :rtype: iterable
+ """
+ return [doc.doc_id for doc in self._soledad.get_from_index(
+ fields.TYPE_MBOX_DEL_IDX,
+ fields.TYPE_FLAGS_VAL, mbox, '1')]
+
+ def remove_all_deleted(self, mbox):
+ """
+ Remove from Soledad all messages flagged as deleted for a given
+ mailbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ """
+ deleted = []
+ for doc_id in self.deleted_iter(mbox):
+ with self._remove_lock:
+ doc = self._soledad.get_doc(doc_id)
+ if doc is not None:
+ self._soledad.delete_doc(doc)
+ try:
+ deleted.append(doc.content[fields.UID_KEY])
+ except TypeError:
+ # empty content
+ pass
+ return deleted