summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/soledadstore.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/mail/imap/soledadstore.py')
-rw-r--r--src/leap/mail/imap/soledadstore.py620
1 files changed, 0 insertions, 620 deletions
diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py
deleted file mode 100644
index f3de8eb..0000000
--- a/src/leap/mail/imap/soledadstore.py
+++ /dev/null
@@ -1,620 +0,0 @@
-# -*- coding: utf-8 -*-
-# soledadstore.py
-# Copyright (C) 2014 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-A MessageStore that writes to Soledad.
-"""
-import logging
-import threading
-
-from collections import defaultdict
-from itertools import chain
-
-from u1db import errors as u1db_errors
-from twisted.python import log
-from zope.interface import implements
-
-from leap.common.check import leap_assert_type, leap_assert
-from leap.mail.decorators import deferred_to_thread
-from leap.mail.imap.messageparts import MessagePartType
-from leap.mail.imap.messageparts import MessageWrapper
-from leap.mail.imap.messageparts import RecentFlagsDoc
-from leap.mail.imap.fields import fields
-from leap.mail.imap.interfaces import IMessageStore
-from leap.mail.messageflow import IMessageConsumer
-from leap.mail.utils import first, empty, accumulator_queue
-
-logger = logging.getLogger(__name__)
-
-
-# TODO
-# [ ] Implement a retry queue?
-# [ ] Consider journaling of operations.
-
-
-class ContentDedup(object):
- """
- Message deduplication.
-
- We do a query for the content hashes before writing to our beloved
- sqlcipher backend of Soledad. This means, by now, that:
-
- 1. We will not store the same body/attachment twice, only the hash of it.
- 2. We will not store the same message header twice, only the hash of it.
-
- The first case is useful if you are always receiving the same old memes
- from unwary friends that still have not discovered that 4chan is the
- generator of the internet. The second will save your day if you have
- initiated session with the same account in two different machines. I also
- wonder why would you do that, but let's respect each other choices, like
- with the religious celebrations, and assume that one day we'll be able
- to run Bitmask in completely free phones. Yes, I mean that, the whole GSM
- Stack.
- """
- # TODO refactor using unique_query
-
- def _header_does_exist(self, doc):
- """
- Check whether we already have a header document for this
- content hash in our database.
-
- :param doc: tentative header for document
- :type doc: dict
- :returns: True if it exists, False otherwise.
- """
- if not doc:
- return False
- chash = doc[fields.CONTENT_HASH_KEY]
- header_docs = self._soledad.get_from_index(
- fields.TYPE_C_HASH_IDX,
- fields.TYPE_HEADERS_VAL, str(chash))
- if not header_docs:
- return False
-
- # FIXME enable only to debug this problem.
- #if len(header_docs) != 1:
- #logger.warning("Found more than one copy of chash %s!"
- #% (chash,))
-
- #logger.debug("Found header doc with that hash! Skipping save!")
- return True
-
- def _content_does_exist(self, doc):
- """
- Check whether we already have a content document for a payload
- with this hash in our database.
-
- :param doc: tentative content for document
- :type doc: dict
- :returns: True if it exists, False otherwise.
- """
- if not doc:
- return False
- phash = doc[fields.PAYLOAD_HASH_KEY]
- attach_docs = self._soledad.get_from_index(
- fields.TYPE_P_HASH_IDX,
- fields.TYPE_CONTENT_VAL, str(phash))
- if not attach_docs:
- return False
-
- # FIXME enable only to debug this problem
- #if len(attach_docs) != 1:
- #logger.warning("Found more than one copy of phash %s!"
- #% (phash,))
- #logger.debug("Found attachment doc with that hash! Skipping save!")
- return True
-
-
-class MsgWriteError(Exception):
- """
- Raised if any exception is found while saving message parts.
- """
- pass
-
-
-"""
-A lock per document.
-"""
-# TODO should bound the space of this!!!
-# http://stackoverflow.com/a/2437645/1157664
-# Setting this to twice the number of threads in the threadpool
-# should be safe.
-put_locks = defaultdict(lambda: threading.Lock())
-mbox_doc_locks = defaultdict(lambda: threading.Lock())
-
-
-class SoledadStore(ContentDedup):
- """
- This will create docs in the local Soledad database.
- """
- _remove_lock = threading.Lock()
-
- implements(IMessageConsumer, IMessageStore)
-
- def __init__(self, soledad):
- """
- Initialize the permanent store that writes to Soledad database.
-
- :param soledad: the soledad instance
- :type soledad: Soledad
- """
- from twisted.internet import reactor
- self.reactor = reactor
-
- self._soledad = soledad
-
- self._CREATE_DOC_FUN = self._soledad.create_doc
- self._PUT_DOC_FUN = self._soledad.put_doc
- self._GET_DOC_FUN = self._soledad.get_doc
-
- # we instantiate an accumulator to batch the notifications
- self.docs_notify_queue = accumulator_queue(
- lambda item: reactor.callFromThread(self._unset_new_dirty, item),
- 20)
-
- # IMessageStore
-
- # -------------------------------------------------------------------
- # We are not yet using this interface, but it would make sense
- # to implement it.
-
- def create_message(self, mbox, uid, message):
- """
- Create the passed message into this SoledadStore.
-
- :param mbox: the mbox this message belongs.
- :type mbox: str or unicode
- :param uid: the UID that identifies this message in this mailbox.
- :type uid: int
- :param message: a IMessageContainer implementor.
- """
- raise NotImplementedError()
-
- def put_message(self, mbox, uid, message):
- """
- Put the passed existing message into this SoledadStore.
-
- :param mbox: the mbox this message belongs.
- :type mbox: str or unicode
- :param uid: the UID that identifies this message in this mailbox.
- :type uid: int
- :param message: a IMessageContainer implementor.
- """
- raise NotImplementedError()
-
- def remove_message(self, mbox, uid):
- """
- Remove the given message from this SoledadStore.
-
- :param mbox: the mbox this message belongs.
- :type mbox: str or unicode
- :param uid: the UID that identifies this message in this mailbox.
- :type uid: int
- """
- raise NotImplementedError()
-
- def get_message(self, mbox, uid):
- """
- Get a IMessageContainer for the given mbox and uid combination.
-
- :param mbox: the mbox this message belongs.
- :type mbox: str or unicode
- :param uid: the UID that identifies this message in this mailbox.
- :type uid: int
- """
- raise NotImplementedError()
-
- # IMessageConsumer
-
- # TODO should handle the delete case
- # TODO should handle errors better
- # TODO could generalize this method into a generic consumer
- # and only implement `process` here
-
- def consume(self, queue):
- """
- Creates a new document in soledad db.
-
- :param queue: a tuple of queues to get item from, with content of the
- document to be inserted.
- :type queue: tuple of Queues
- """
- new, dirty = queue
- while not new.empty():
- doc_wrapper = new.get()
- self.reactor.callInThread(self._consume_doc, doc_wrapper,
- self.docs_notify_queue)
- while not dirty.empty():
- doc_wrapper = dirty.get()
- self.reactor.callInThread(self._consume_doc, doc_wrapper,
- self.docs_notify_queue)
-
- # Queue empty, flush the notifications queue.
- self.docs_notify_queue(None, flush=True)
-
- def _unset_new_dirty(self, doc_wrapper):
- """
- Unset the `new` and `dirty` flags for this document wrapper in the
- memory store.
-
- :param doc_wrapper: a MessageWrapper instance
- :type doc_wrapper: MessageWrapper
- """
- if isinstance(doc_wrapper, MessageWrapper):
- # XXX still needed for debug quite often
- #logger.info("unsetting new flag!")
- doc_wrapper.new = False
- doc_wrapper.dirty = False
-
- @deferred_to_thread
- def _consume_doc(self, doc_wrapper, notify_queue):
- """
- Consume each document wrapper in a separate thread.
- We pass an instance of an accumulator that handles the notifications
- to the memorystore when the write has been done.
-
- :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance
- :type doc_wrapper: MessageWrapper or RecentFlagsDoc
- :param notify_queue: a callable that handles the writeback
- notifications to the memstore.
- :type notify_queue: callable
- """
- def queueNotifyBack(failed, doc_wrapper):
- if failed:
- log.msg("There was an error writing the mesage...")
- else:
- notify_queue(doc_wrapper)
-
- def doSoledadCalls(items):
- # we prime the generator, that should return the
- # message or flags wrapper item in the first place.
- try:
- doc_wrapper = items.next()
- except StopIteration:
- pass
- else:
- failed = self._soledad_write_document_parts(items)
- queueNotifyBack(failed, doc_wrapper)
-
- doSoledadCalls(self._iter_wrapper_subparts(doc_wrapper))
-
- #
- # SoledadStore specific methods.
- #
-
- def _soledad_write_document_parts(self, items):
- """
- Write the document parts to soledad in a separate thread.
-
- :param items: the iterator through the different document wrappers
- payloads.
- :type items: iterator
- :return: whether the write was successful or not
- :rtype: bool
- """
- failed = False
- for item, call in items:
- if empty(item):
- continue
- try:
- self._try_call(call, item)
- except Exception as exc:
- logger.debug("ITEM WAS: %s" % repr(item))
- if hasattr(item, 'content'):
- logger.debug("ITEM CONTENT WAS: %s" %
- repr(item.content))
- logger.exception(exc)
- failed = True
- continue
- return failed
-
- def _iter_wrapper_subparts(self, doc_wrapper):
- """
- Return an iterator that will yield the doc_wrapper in the first place,
- followed by the subparts item and the proper call type for every
- item in the queue, if any.
-
- :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance
- :type doc_wrapper: MessageWrapper or RecentFlagsDoc
- """
- if isinstance(doc_wrapper, MessageWrapper):
- return chain((doc_wrapper,),
- self._get_calls_for_msg_parts(doc_wrapper))
- elif isinstance(doc_wrapper, RecentFlagsDoc):
- return chain((doc_wrapper,),
- self._get_calls_for_rflags_doc(doc_wrapper))
- else:
- logger.warning("CANNOT PROCESS ITEM!")
- return (i for i in [])
-
- def _try_call(self, call, item):
- """
- Try to invoke a given call with item as a parameter.
-
- :param call: the function to call
- :type call: callable
- :param item: the payload to pass to the call as argument
- :type item: object
- """
- if call is None:
- return
-
- if call == self._PUT_DOC_FUN:
- doc_id = item.doc_id
- if doc_id is None:
- logger.warning("BUG! Dirty doc but has no doc_id!")
- return
- with put_locks[doc_id]:
- doc = self._GET_DOC_FUN(doc_id)
-
- if doc is None:
- logger.warning("BUG! Dirty doc but could not "
- "find document %s" % (doc_id,))
- return
-
- doc.content = dict(item.content)
-
- item = doc
- try:
- call(item)
- except u1db_errors.RevisionConflict as exc:
- logger.exception("Error: %r" % (exc,))
- raise exc
- except Exception as exc:
- logger.exception("Error: %r" % (exc,))
- raise exc
-
- else:
- try:
- call(item)
- except u1db_errors.RevisionConflict as exc:
- logger.exception("Error: %r" % (exc,))
- raise exc
- except Exception as exc:
- logger.exception("Error: %r" % (exc,))
- raise exc
-
- def _get_calls_for_msg_parts(self, msg_wrapper):
- """
- Generator that return the proper call type for a given item.
-
- :param msg_wrapper: A MessageWrapper
- :type msg_wrapper: IMessageContainer
- :return: a generator of tuples with recent-flags doc payload
- and callable
- :rtype: generator
- """
- call = None
-
- if msg_wrapper.new:
- call = self._CREATE_DOC_FUN
-
- # item is expected to be a MessagePartDoc
- for item in msg_wrapper.walk():
- if item.part == MessagePartType.fdoc:
- yield dict(item.content), call
-
- elif item.part == MessagePartType.hdoc:
- if not self._header_does_exist(item.content):
- yield dict(item.content), call
-
- elif item.part == MessagePartType.cdoc:
- if not self._content_does_exist(item.content):
- yield dict(item.content), call
-
- # For now, the only thing that will be dirty is
- # the flags doc.
-
- elif msg_wrapper.dirty:
- call = self._PUT_DOC_FUN
- # item is expected to be a MessagePartDoc
- for item in msg_wrapper.walk():
- # XXX FIXME Give error if dirty and not doc_id !!!
- doc_id = item.doc_id # defend!
- if not doc_id:
- logger.warning("Dirty item but no doc_id!")
- continue
-
- if item.part == MessagePartType.fdoc:
- #logger.debug("PUT dirty fdoc")
- yield item, call
-
- # XXX also for linkage-doc !!!
- else:
- logger.error("Cannot delete documents yet from the queue...!")
-
- def _get_calls_for_rflags_doc(self, rflags_wrapper):
- """
- We always put these documents.
-
- :param rflags_wrapper: A wrapper around recent flags doc.
- :type rflags_wrapper: RecentFlagsWrapper
- :return: a tuple with recent-flags doc payload and callable
- :rtype: tuple
- """
- call = self._PUT_DOC_FUN
-
- payload = rflags_wrapper.content
- if payload:
- logger.debug("Saving RFLAGS to Soledad...")
- yield rflags_wrapper, call
-
- # Mbox documents and attributes
-
- def get_mbox_document(self, mbox):
- """
- Return mailbox document.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :return: A SoledadDocument containing this mailbox, or None if
- the query failed.
- :rtype: SoledadDocument or None.
- """
- with mbox_doc_locks[mbox]:
- return self._get_mbox_document(mbox)
-
- def _get_mbox_document(self, mbox):
- """
- Helper for returning the mailbox document.
- """
- try:
- query = self._soledad.get_from_index(
- fields.TYPE_MBOX_IDX,
- fields.TYPE_MBOX_VAL, mbox)
- if query:
- return query.pop()
- else:
- logger.error("Could not find mbox document for %r" %
- (mbox,))
- except Exception as exc:
- logger.exception("Unhandled error %r" % exc)
-
- def get_mbox_closed(self, mbox):
- """
- Return the closed attribute for a given mailbox.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :rtype: bool
- """
- mbox_doc = self.get_mbox_document()
- return mbox_doc.content.get(fields.CLOSED_KEY, False)
-
- def set_mbox_closed(self, mbox, closed):
- """
- Set the closed attribute for a given mailbox.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :param closed: the value to be set
- :type closed: bool
- """
- leap_assert(isinstance(closed, bool), "closed needs to be boolean")
- with mbox_doc_locks[mbox]:
- mbox_doc = self._get_mbox_document(mbox)
- if mbox_doc is None:
- logger.error(
- "Could not find mbox document for %r" % (mbox,))
- return
- mbox_doc.content[fields.CLOSED_KEY] = closed
- self._soledad.put_doc(mbox_doc)
-
- def write_last_uid(self, mbox, value):
- """
- Write the `last_uid` integer to the proper mailbox document
- in Soledad.
- This is called from the deferred triggered by
- memorystore.increment_last_soledad_uid, which is expected to
- run in a separate thread.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :param value: the value to set
- :type value: int
- """
- leap_assert_type(value, int)
- key = fields.LAST_UID_KEY
-
- # XXX use accumulator to reduce number of hits
- with mbox_doc_locks[mbox]:
- mbox_doc = self._get_mbox_document(mbox)
- old_val = mbox_doc.content[key]
- if value > old_val:
- mbox_doc.content[key] = value
- try:
- self._soledad.put_doc(mbox_doc)
- except Exception as exc:
- logger.error("Error while setting last_uid for %r"
- % (mbox,))
- logger.exception(exc)
-
- def get_flags_doc(self, mbox, uid):
- """
- Return the SoledadDocument for the given mbox and uid.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :param uid: the UID for the message
- :type uid: int
- :rtype: SoledadDocument or None
- """
- result = None
- try:
- flag_docs = self._soledad.get_from_index(
- fields.TYPE_MBOX_UID_IDX,
- fields.TYPE_FLAGS_VAL, mbox, str(uid))
- if len(flag_docs) != 1:
- logger.warning("More than one flag doc for %r:%s" %
- (mbox, uid))
- result = first(flag_docs)
- except Exception as exc:
- # ugh! Something's broken down there!
- logger.warning("ERROR while getting flags for UID: %s" % uid)
- logger.exception(exc)
- finally:
- return result
-
- def get_headers_doc(self, chash):
- """
- Return the document that keeps the headers for a message
- indexed by its content-hash.
-
- :param chash: the content-hash to retrieve the document from.
- :type chash: str or unicode
- :rtype: SoledadDocument or None
- """
- head_docs = self._soledad.get_from_index(
- fields.TYPE_C_HASH_IDX,
- fields.TYPE_HEADERS_VAL, str(chash))
- return first(head_docs)
-
- # deleted messages
-
- def deleted_iter(self, mbox):
- """
- Get an iterator for the the doc_id for SoledadDocuments for messages
- with \\Deleted flag for a given mailbox.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :return: iterator through deleted message docs
- :rtype: iterable
- """
- return [doc.doc_id for doc in self._soledad.get_from_index(
- fields.TYPE_MBOX_DEL_IDX,
- fields.TYPE_FLAGS_VAL, mbox, '1')]
-
- def remove_all_deleted(self, mbox):
- """
- Remove from Soledad all messages flagged as deleted for a given
- mailbox.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- """
- deleted = []
- for doc_id in self.deleted_iter(mbox):
- with self._remove_lock:
- doc = self._soledad.get_doc(doc_id)
- if doc is not None:
- self._soledad.delete_doc(doc)
- try:
- deleted.append(doc.content[fields.UID_KEY])
- except TypeError:
- # empty content
- pass
- return deleted