summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/soledadstore.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/mail/imap/soledadstore.py')
-rw-r--r--src/leap/mail/imap/soledadstore.py487
1 files changed, 487 insertions, 0 deletions
diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py
new file mode 100644
index 0000000..8e22f26
--- /dev/null
+++ b/src/leap/mail/imap/soledadstore.py
@@ -0,0 +1,487 @@
+# -*- coding: utf-8 -*-
+# soledadstore.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+A MessageStore that writes to Soledad.
+"""
+import logging
+import threading
+
+from itertools import chain
+
+from u1db import errors as u1db_errors
+from twisted.internet import defer
+from twisted.python import log
+from zope.interface import implements
+
+from leap.common.check import leap_assert_type
+from leap.mail.decorators import deferred_to_thread
+from leap.mail.imap.messageparts import MessagePartType
+from leap.mail.imap.messageparts import MessageWrapper
+from leap.mail.imap.messageparts import RecentFlagsDoc
+from leap.mail.imap.fields import fields
+from leap.mail.imap.interfaces import IMessageStore
+from leap.mail.messageflow import IMessageConsumer
+from leap.mail.utils import first
+
+logger = logging.getLogger(__name__)
+
+
+# TODO
+# [ ] Delete original message from the incoming queue after all successful
+# writes.
+# [ ] Implement a retry queue.
+# [ ] Consider journaling of operations.
+
+
+class ContentDedup(object):
+ """
+ Message deduplication.
+
+ We do a query for the content hashes before writing to our beloved
+ sqlcipher backend of Soledad. This means, by now, that:
+
+ 1. We will not store the same body/attachment twice, only the hash of it.
+ 2. We will not store the same message header twice, only the hash of it.
+
+ The first case is useful if you are always receiving the same old memes
+ from unwary friends that still have not discovered that 4chan is the
+ generator of the internet. The second will save your day if you have
+ initiated session with the same account in two different machines. I also
+ wonder why would you do that, but let's respect each other choices, like
+ with the religious celebrations, and assume that one day we'll be able
+ to run Bitmask in completely free phones. Yes, I mean that, the whole GSM
+ Stack.
+ """
+ # TODO refactor using unique_query
+
+ def _header_does_exist(self, doc):
+ """
+ Check whether we already have a header document for this
+ content hash in our database.
+
+ :param doc: tentative header for document
+ :type doc: dict
+ :returns: True if it exists, False otherwise.
+ """
+ if not doc:
+ return False
+ chash = doc[fields.CONTENT_HASH_KEY]
+ header_docs = self._soledad.get_from_index(
+ fields.TYPE_C_HASH_IDX,
+ fields.TYPE_HEADERS_VAL, str(chash))
+ if not header_docs:
+ return False
+
+ if len(header_docs) != 1:
+ logger.warning("Found more than one copy of chash %s!"
+ % (chash,))
+ logger.debug("Found header doc with that hash! Skipping save!")
+ return True
+
+ def _content_does_exist(self, doc):
+ """
+ Check whether we already have a content document for a payload
+ with this hash in our database.
+
+ :param doc: tentative content for document
+ :type doc: dict
+ :returns: True if it exists, False otherwise.
+ """
+ if not doc:
+ return False
+ phash = doc[fields.PAYLOAD_HASH_KEY]
+ attach_docs = self._soledad.get_from_index(
+ fields.TYPE_P_HASH_IDX,
+ fields.TYPE_CONTENT_VAL, str(phash))
+ if not attach_docs:
+ return False
+
+ if len(attach_docs) != 1:
+ logger.warning("Found more than one copy of phash %s!"
+ % (phash,))
+ logger.debug("Found attachment doc with that hash! Skipping save!")
+ return True
+
+
+class MsgWriteError(Exception):
+ """
+ Raised if any exception is found while saving message parts.
+ """
+
+
+class SoledadStore(ContentDedup):
+ """
+ This will create docs in the local Soledad database.
+ """
+ _last_uid_lock = threading.Lock()
+
+ implements(IMessageConsumer, IMessageStore)
+
+ def __init__(self, soledad):
+ """
+ Initialize the permanent store that writes to Soledad database.
+
+ :param soledad: the soledad instance
+ :type soledad: Soledad
+ """
+ self._soledad = soledad
+
+ # IMessageStore
+
+ # -------------------------------------------------------------------
+ # We are not yet using this interface, but it would make sense
+ # to implement it.
+
+ def create_message(self, mbox, uid, message):
+ """
+ Create the passed message into this SoledadStore.
+
+ :param mbox: the mbox this message belongs.
+ :type mbox: str or unicode
+ :param uid: the UID that identifies this message in this mailbox.
+ :type uid: int
+ :param message: a IMessageContainer implementor.
+ """
+ raise NotImplementedError()
+
+ def put_message(self, mbox, uid, message):
+ """
+ Put the passed existing message into this SoledadStore.
+
+ :param mbox: the mbox this message belongs.
+ :type mbox: str or unicode
+ :param uid: the UID that identifies this message in this mailbox.
+ :type uid: int
+ :param message: a IMessageContainer implementor.
+ """
+ raise NotImplementedError()
+
+ def remove_message(self, mbox, uid):
+ """
+ Remove the given message from this SoledadStore.
+
+ :param mbox: the mbox this message belongs.
+ :type mbox: str or unicode
+ :param uid: the UID that identifies this message in this mailbox.
+ :type uid: int
+ """
+ raise NotImplementedError()
+
+ def get_message(self, mbox, uid):
+ """
+ Get a IMessageContainer for the given mbox and uid combination.
+
+ :param mbox: the mbox this message belongs.
+ :type mbox: str or unicode
+ :param uid: the UID that identifies this message in this mailbox.
+ :type uid: int
+ """
+ raise NotImplementedError()
+
+ # IMessageConsumer
+
+ # It's not thread-safe to defer this to a different thread
+
+ def consume(self, queue):
+ """
+ Creates a new document in soledad db.
+
+ :param queue: queue to get item from, with content of the document
+ to be inserted.
+ :type queue: Queue
+ """
+ # TODO should delete the original message from incoming only after
+ # the writes are done.
+ # TODO should handle the delete case
+ # TODO should handle errors
+ # TODO could generalize this method into a generic consumer
+ # and only implement `process` here
+
+ def docWriteCallBack(doc_wrapper):
+ """
+ Callback for a successful write of a document wrapper.
+ """
+ if isinstance(doc_wrapper, MessageWrapper):
+ # If everything went well, we can unset the new flag
+ # in the source store (memory store)
+ self._unset_new_dirty(doc_wrapper)
+
+ def docWriteErrorBack(failure):
+ """
+ Errorback for write operations.
+ """
+ log.error("Error while processing item.")
+ log.msg(failure.getTraceBack())
+
+ while not queue.empty():
+ doc_wrapper = queue.get()
+ d = defer.Deferred()
+ d.addCallbacks(docWriteCallBack, docWriteErrorBack)
+
+ self._consume_doc(doc_wrapper, d)
+
+ @deferred_to_thread
+ def _unset_new_dirty(self, doc_wrapper):
+ """
+ Unset the `new` and `dirty` flags for this document wrapper in the
+ memory store.
+
+ :param doc_wrapper: a MessageWrapper instance
+ :type doc_wrapper: MessageWrapper
+ """
+ # XXX debug msg id/mbox?
+ logger.info("unsetting new flag!")
+ doc_wrapper.new = False
+ doc_wrapper.dirty = False
+
+ @deferred_to_thread
+ def _consume_doc(self, doc_wrapper, deferred):
+ """
+ Consume each document wrapper in a separate thread.
+
+ :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance
+ :type doc_wrapper: MessageWrapper or RecentFlagsDoc
+ :param deferred: a deferred that will be fired when the write operation
+ has finished, either calling its callback or its
+ errback depending on whether it succeed.
+ :type deferred: Deferred
+ """
+ items = self._process(doc_wrapper)
+
+ # we prime the generator, that should return the
+ # message or flags wrapper item in the first place.
+ doc_wrapper = items.next()
+
+ # From here, we unpack the subpart items and
+ # the right soledad call.
+ failed = False
+ for item, call in items:
+ try:
+ self._try_call(call, item)
+ except Exception as exc:
+ failed = exc
+ continue
+ if failed:
+ deferred.errback(MsgWriteError(
+ "There was an error writing the mesage"))
+ else:
+ deferred.callback(doc_wrapper)
+
+ #
+ # SoledadStore specific methods.
+ #
+
+ def _process(self, doc_wrapper):
+ """
+ Return an iterator that will yield the doc_wrapper in the first place,
+ followed by the subparts item and the proper call type for every
+ item in the queue, if any.
+
+ :param queue: the queue from where we'll pick item.
+ :type queue: Queue
+ """
+ if isinstance(doc_wrapper, MessageWrapper):
+ return chain((doc_wrapper,),
+ self._get_calls_for_msg_parts(doc_wrapper))
+ elif isinstance(doc_wrapper, RecentFlagsDoc):
+ return chain((doc_wrapper,),
+ self._get_calls_for_rflags_doc(doc_wrapper))
+ else:
+ logger.warning("CANNOT PROCESS ITEM!")
+ return (i for i in [])
+
+ def _try_call(self, call, item):
+ """
+ Try to invoke a given call with item as a parameter.
+
+ :param call: the function to call
+ :type call: callable
+ :param item: the payload to pass to the call as argument
+ :type item: object
+ """
+ if call is None:
+ return
+ try:
+ call(item)
+ except u1db_errors.RevisionConflict as exc:
+ logger.exception("Error: %r" % (exc,))
+ raise exc
+
+ def _get_calls_for_msg_parts(self, msg_wrapper):
+ """
+ Generator that return the proper call type for a given item.
+
+ :param msg_wrapper: A MessageWrapper
+ :type msg_wrapper: IMessageContainer
+ :return: a generator of tuples with recent-flags doc payload
+ and callable
+ :rtype: generator
+ """
+ call = None
+
+ if msg_wrapper.new:
+ call = self._soledad.create_doc
+
+ # item is expected to be a MessagePartDoc
+ for item in msg_wrapper.walk():
+ if item.part == MessagePartType.fdoc:
+ yield dict(item.content), call
+
+ elif item.part == MessagePartType.hdoc:
+ if not self._header_does_exist(item.content):
+ yield dict(item.content), call
+
+ elif item.part == MessagePartType.cdoc:
+ if not self._content_does_exist(item.content):
+ yield dict(item.content), call
+
+ # For now, the only thing that will be dirty is
+ # the flags doc.
+
+ elif msg_wrapper.dirty:
+ call = self._soledad.put_doc
+ # item is expected to be a MessagePartDoc
+ for item in msg_wrapper.walk():
+ # XXX FIXME Give error if dirty and not doc_id !!!
+ doc_id = item.doc_id # defend!
+ if not doc_id:
+ continue
+ doc = self._soledad.get_doc(doc_id)
+ doc.content = dict(item.content)
+ if item.part == MessagePartType.fdoc:
+ logger.debug("PUT dirty fdoc")
+ yield doc, call
+
+ # XXX also for linkage-doc !!!
+ else:
+ logger.error("Cannot delete documents yet from the queue...!")
+
+ def _get_calls_for_rflags_doc(self, rflags_wrapper):
+ """
+ We always put these documents.
+
+ :param rflags_wrapper: A wrapper around recent flags doc.
+ :type rflags_wrapper: RecentFlagsWrapper
+ :return: a tuple with recent-flags doc payload and callable
+ :rtype: tuple
+ """
+ call = self._soledad.put_doc
+ rdoc = self._soledad.get_doc(rflags_wrapper.doc_id)
+
+ payload = rflags_wrapper.content
+ logger.debug("Saving RFLAGS to Soledad...")
+
+ if payload:
+ rdoc.content = payload
+ yield rdoc, call
+
+ def _get_mbox_document(self, mbox):
+ """
+ Return mailbox document.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :return: A SoledadDocument containing this mailbox, or None if
+ the query failed.
+ :rtype: SoledadDocument or None.
+ """
+ try:
+ query = self._soledad.get_from_index(
+ fields.TYPE_MBOX_IDX,
+ fields.TYPE_MBOX_VAL, mbox)
+ if query:
+ return query.pop()
+ except Exception as exc:
+ logger.exception("Unhandled error %r" % exc)
+
+ def get_flags_doc(self, mbox, uid):
+ """
+ Return the SoledadDocument for the given mbox and uid.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param uid: the UID for the message
+ :type uid: int
+ """
+ result = None
+ try:
+ flag_docs = self._soledad.get_from_index(
+ fields.TYPE_MBOX_UID_IDX,
+ fields.TYPE_FLAGS_VAL, mbox, str(uid))
+ result = first(flag_docs)
+ except Exception as exc:
+ # ugh! Something's broken down there!
+ logger.warning("ERROR while getting flags for UID: %s" % uid)
+ logger.exception(exc)
+ finally:
+ return result
+
+ def write_last_uid(self, mbox, value):
+ """
+ Write the `last_uid` integer to the proper mailbox document
+ in Soledad.
+ This is called from the deferred triggered by
+ memorystore.increment_last_soledad_uid, which is expected to
+ run in a separate thread.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param value: the value to set
+ :type value: int
+ """
+ leap_assert_type(value, int)
+ key = fields.LAST_UID_KEY
+
+ with self._last_uid_lock:
+ mbox_doc = self._get_mbox_document(mbox)
+ old_val = mbox_doc.content[key]
+ if value < old_val:
+ logger.error("%r:%s Tried to write a UID lesser than what's "
+ "stored!" % (mbox, value))
+ mbox_doc.content[key] = value
+ self._soledad.put_doc(mbox_doc)
+
+ # deleted messages
+
+ def deleted_iter(self, mbox):
+ """
+ Get an iterator for the SoledadDocuments for messages
+ with \\Deleted flag for a given mailbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :return: iterator through deleted message docs
+ :rtype: iterable
+ """
+ return (doc for doc in self._soledad.get_from_index(
+ fields.TYPE_MBOX_DEL_IDX,
+ fields.TYPE_FLAGS_VAL, mbox, '1'))
+
+ # TODO can deferToThread this?
+ def remove_all_deleted(self, mbox):
+ """
+ Remove from Soledad all messages flagged as deleted for a given
+ mailbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ """
+ deleted = []
+ for doc in self.deleted_iter(mbox):
+ deleted.append(doc.content[fields.UID_KEY])
+ self._soledad.delete_doc(doc)
+ return deleted