From 6ede495b94501a4cbdfd985dcdf4be4f582bbb9b Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 25 Nov 2014 15:04:26 +0100 Subject: Serializable Models + Soledad Adaptor --- src/leap/mail/imap/messages.py | 484 ++++++++++++++--------------------------- 1 file changed, 158 insertions(+), 326 deletions(-) (limited to 'src/leap/mail/imap/messages.py') diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py index c761091..d47c8eb 100644 --- a/src/leap/mail/imap/messages.py +++ b/src/leap/mail/imap/messages.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # messages.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013, 2014 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -19,30 +19,25 @@ LeapMessage and MessageCollection. """ import copy import logging -import re import threading import StringIO from collections import defaultdict -from email import message_from_string from functools import partial -from pycryptopp.hash import sha256 from twisted.mail import imap4 -from twisted.internet import defer, reactor +from twisted.internet import reactor from zope.interface import implements from zope.proxy import sameProxiedObjects from leap.common.check import leap_assert, leap_assert_type from leap.common.decorators import memoized_method from leap.common.mail import get_email_charset -from leap.mail import walk -from leap.mail.utils import first, find_charset, lowerdict, empty -from leap.mail.utils import stringify_parts_map -from leap.mail.decorators import deferred_to_thread +from leap.mail.adaptors import soledad_indexes as indexes +from leap.mail.constants import INBOX_NAME +from leap.mail.utils import find_charset, empty from leap.mail.imap.index import IndexedDB from leap.mail.imap.fields import fields, WithMsgFields -from leap.mail.imap.memorystore import MessageWrapper from leap.mail.imap.messageparts import MessagePart, MessagePartDoc from leap.mail.imap.parser import MBoxParser @@ -59,9 +54,6 @@ logger = logging.getLogger(__name__) # [ ] Delete incoming mail only after successful write! # [ ] Remove UID from syncable db. Store only those indexes locally. -MSGID_PATTERN = r"""<([\w@.]+)>""" -MSGID_RE = re.compile(MSGID_PATTERN) - def try_unique_query(curried): """ @@ -90,28 +82,18 @@ def try_unique_query(curried): logger.exception("Unhandled error %r" % exc) -""" -A dictionary that keeps one lock per mbox and uid. -""" -# XXX too much overhead? -fdoc_locks = defaultdict(lambda: defaultdict(lambda: threading.Lock())) +# FIXME remove-me +#fdoc_locks = defaultdict(lambda: defaultdict(lambda: threading.Lock())) -class LeapMessage(fields, MBoxParser): +class IMAPMessage(fields, MBoxParser): """ The main representation of a message. - - It indexes the messages in one mailbox by a combination - of uid+mailbox name. """ - # TODO this has to change. - # Should index primarily by chash, and keep a local-only - # UID table. - implements(imap4.IMessage) - def __init__(self, soledad, uid, mbox, collection=None, container=None): + def __init__(self, soledad, uid, mbox): """ Initializes a LeapMessage. @@ -129,76 +111,73 @@ class LeapMessage(fields, MBoxParser): self._soledad = soledad self._uid = int(uid) if uid is not None else None self._mbox = self._parse_mailbox_name(mbox) - self._collection = collection - self._container = container self.__chash = None self.__bdoc = None - # XXX make these properties public - - # XXX FIXME ------ the documents can be - # deferreds too.... niice. - - @property - def fdoc(self): - """ - An accessor to the flags document. - """ - if all(map(bool, (self._uid, self._mbox))): - fdoc = None - if self._container is not None: - fdoc = self._container.fdoc - if not fdoc: - fdoc = self._get_flags_doc() - if fdoc: - fdoc_content = fdoc.content - self.__chash = fdoc_content.get( - fields.CONTENT_HASH_KEY, None) - return fdoc - - @property - def hdoc(self): - """ - An accessor to the headers document. - """ - container = self._container - if container is not None: - hdoc = self._container.hdoc - if hdoc and not empty(hdoc.content): - return hdoc - hdoc = self._get_headers_doc() - - if container and not empty(hdoc.content): + # TODO collection and container are deprecated. + + # TODO move to adaptor + + #@property + #def fdoc(self): + #""" + #An accessor to the flags document. + #""" + #if all(map(bool, (self._uid, self._mbox))): + #fdoc = None + #if self._container is not None: + #fdoc = self._container.fdoc + #if not fdoc: + #fdoc = self._get_flags_doc() + #if fdoc: + #fdoc_content = fdoc.content + #self.__chash = fdoc_content.get( + #fields.CONTENT_HASH_KEY, None) + #return fdoc +# + #@property + #def hdoc(self): + #""" + #An accessor to the headers document. + #""" + #container = self._container + #if container is not None: + #hdoc = self._container.hdoc + #if hdoc and not empty(hdoc.content): + #return hdoc + #hdoc = self._get_headers_doc() +# + #if container and not empty(hdoc.content): # mem-cache it - hdoc_content = hdoc.content - chash = hdoc_content.get(fields.CONTENT_HASH_KEY) - hdocs = {chash: hdoc_content} - container.memstore.load_header_docs(hdocs) - return hdoc - - @property - def chash(self): - """ - An accessor to the content hash for this message. - """ - if not self.fdoc: - return None - if not self.__chash and self.fdoc: - self.__chash = self.fdoc.content.get( - fields.CONTENT_HASH_KEY, None) - return self.__chash - - @property - def bdoc(self): - """ - An accessor to the body document. - """ - if not self.hdoc: - return None - if not self.__bdoc: - self.__bdoc = self._get_body_doc() - return self.__bdoc + #hdoc_content = hdoc.content + #chash = hdoc_content.get(fields.CONTENT_HASH_KEY) + #hdocs = {chash: hdoc_content} + #container.memstore.load_header_docs(hdocs) + #return hdoc +# + #@property + #def chash(self): + #""" + #An accessor to the content hash for this message. + #""" + #if not self.fdoc: + #return None + #if not self.__chash and self.fdoc: + #self.__chash = self.fdoc.content.get( + #fields.CONTENT_HASH_KEY, None) + #return self.__chash + + #@property + #def bdoc(self): + #""" + #An accessor to the body document. + #""" + #if not self.hdoc: + #return None + #if not self.__bdoc: + #self.__bdoc = self._get_body_doc() + #return self.__bdoc # IMessage implementation @@ -209,8 +188,13 @@ class LeapMessage(fields, MBoxParser): :return: uid for this message :rtype: int """ + # TODO ----> return lookup in local sqlcipher table. return self._uid + # -------------------------------------------------------------- + # TODO -- from here on, all the methods should be proxied to the + # instance of leap.mail.mail.Message + def getFlags(self): """ Retrieve the flags associated with this Message. @@ -253,25 +237,24 @@ class LeapMessage(fields, MBoxParser): REMOVE = -1 SET = 0 - with fdoc_locks[mbox][uid]: - doc = self.fdoc - if not doc: - logger.warning( - "Could not find FDOC for %r:%s while setting flags!" % - (mbox, uid)) - return - current = doc.content[self.FLAGS_KEY] - if mode == APPEND: - newflags = tuple(set(tuple(current) + flags)) - elif mode == REMOVE: - newflags = tuple(set(current).difference(set(flags))) - elif mode == SET: - newflags = flags - new_fdoc = { - self.FLAGS_KEY: newflags, - self.SEEN_KEY: self.SEEN_FLAG in newflags, - self.DEL_KEY: self.DELETED_FLAG in newflags} - self._collection.memstore.update_flags(mbox, uid, new_fdoc) + doc = self.fdoc + if not doc: + logger.warning( + "Could not find FDOC for %r:%s while setting flags!" % + (mbox, uid)) + return + current = doc.content[self.FLAGS_KEY] + if mode == APPEND: + newflags = tuple(set(tuple(current) + flags)) + elif mode == REMOVE: + newflags = tuple(set(current).difference(set(flags))) + elif mode == SET: + newflags = flags + new_fdoc = { + self.FLAGS_KEY: newflags, + self.SEEN_KEY: self.SEEN_FLAG in newflags, + self.DEL_KEY: self.DELETED_FLAG in newflags} + self._collection.memstore.update_flags(mbox, uid, new_fdoc) return map(str, newflags) @@ -371,9 +354,9 @@ class LeapMessage(fields, MBoxParser): else: logger.warning("No FLAGS doc for %s:%s" % (self._mbox, self._uid)) - if not size: + #if not size: # XXX fallback, should remove when all migrated. - size = self.getBodyFile().len + #size = self.getBodyFile().len return size def getHeaders(self, negate, *names): @@ -395,6 +378,9 @@ class LeapMessage(fields, MBoxParser): # XXX refactor together with MessagePart method headers = self._get_headers() + + # XXX keep this in the imap imessage implementation, + # because the server impl. expects content-type to be present. if not headers: logger.warning("No headers found") return {str('content-type'): str('')} @@ -614,64 +600,23 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): (the u1db index) for all the headers documents for a given mailbox. We use it to prefetch massively all the headers for a mailbox. This is the second massive query, after fetching all the FLAGS, that - a MUA will do in a case where we do not have local disk cache. + a typical IMAP MUA will do in a case where we do not have local disk cache. """ HDOCS_SET_DOC = "HDOCS_SET" templates = { - # Message Level - - FLAGS_DOC: { - fields.TYPE_KEY: fields.TYPE_FLAGS_VAL, - fields.UID_KEY: 1, # XXX moe to a local table - fields.MBOX_KEY: fields.INBOX_VAL, - fields.CONTENT_HASH_KEY: "", - - fields.SEEN_KEY: False, - fields.DEL_KEY: False, - fields.FLAGS_KEY: [], - fields.MULTIPART_KEY: False, - fields.SIZE_KEY: 0 - }, - - HEADERS_DOC: { - fields.TYPE_KEY: fields.TYPE_HEADERS_VAL, - fields.CONTENT_HASH_KEY: "", - - fields.DATE_KEY: "", - fields.SUBJECT_KEY: "", - - fields.HEADERS_KEY: {}, - fields.PARTS_MAP_KEY: {}, - }, - - CONTENT_DOC: { - fields.TYPE_KEY: fields.TYPE_CONTENT_VAL, - fields.PAYLOAD_HASH_KEY: "", - fields.LINKED_FROM_KEY: [], - fields.CTYPE_KEY: "", # should index by this too - - # should only get inmutable headers parts - # (for indexing) - fields.HEADERS_KEY: {}, - fields.RAW_KEY: "", - fields.PARTS_MAP_KEY: {}, - fields.HEADERS_KEY: {}, - fields.MULTIPART_KEY: False, - }, - # Mailbox Level RECENT_DOC: { - fields.TYPE_KEY: fields.TYPE_RECENT_VAL, - fields.MBOX_KEY: fields.INBOX_VAL, + "type": indexes.RECENT, + "mbox": INBOX_NAME, fields.RECENTFLAGS_KEY: [], }, HDOCS_SET_DOC: { - fields.TYPE_KEY: fields.TYPE_HDOCS_SET_VAL, - fields.MBOX_KEY: fields.INBOX_VAL, + "type": indexes.HDOCS_SET, + "mbox": INBOX_NAME, fields.HDOCS_SET_KEY: [], } @@ -681,8 +626,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): # Different locks for wrapping both the u1db document getting/setting # and the property getting/settting in an atomic operation. - # TODO we would abstract this to a SoledadProperty class - + # TODO --- deprecate ! --- use SoledadDocumentWrapper + locks _rdoc_lock = defaultdict(lambda: threading.Lock()) _rdoc_write_lock = defaultdict(lambda: threading.Lock()) _rdoc_read_lock = defaultdict(lambda: threading.Lock()) @@ -764,81 +708,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): rdoc[fields.MBOX_KEY] = self.mbox self._soledad.create_doc(rdoc) - @deferred_to_thread - def _do_parse(self, raw): - """ - Parse raw message and return it along with - relevant information about its outer level. - - This is done in a separate thread, and the callback is passed - to `_do_add_msg` method. + # -------------------------------------------------------------------- - :param raw: the raw message - :type raw: StringIO or basestring - :return: msg, parts, chash, size, multi - :rtype: tuple - """ - msg = message_from_string(raw) - parts = walk.get_parts(msg) - size = len(raw) - chash = sha256.SHA256(raw).hexdigest() - multi = msg.is_multipart() - return msg, parts, chash, size, multi - - def _populate_flags(self, flags, uid, chash, size, multi): - """ - Return a flags doc. - - XXX Missing DOC ----------- - """ - fd = self._get_empty_doc(self.FLAGS_DOC) - - fd[self.MBOX_KEY] = self.mbox - fd[self.UID_KEY] = uid - fd[self.CONTENT_HASH_KEY] = chash - fd[self.SIZE_KEY] = size - fd[self.MULTIPART_KEY] = multi - if flags: - fd[self.FLAGS_KEY] = flags - fd[self.SEEN_KEY] = self.SEEN_FLAG in flags - fd[self.DEL_KEY] = self.DELETED_FLAG in flags - fd[self.RECENT_KEY] = True # set always by default - return fd - - def _populate_headr(self, msg, chash, subject, date): - """ - Return a headers doc. - - XXX Missing DOC ----------- - """ - headers = defaultdict(list) - for k, v in msg.items(): - headers[k].append(v) - - # "fix" for repeated headers. - for k, v in headers.items(): - newline = "\n%s: " % (k,) - headers[k] = newline.join(v) - - lower_headers = lowerdict(headers) - msgid = first(MSGID_RE.findall( - lower_headers.get('message-id', ''))) - - hd = self._get_empty_doc(self.HEADERS_DOC) - hd[self.CONTENT_HASH_KEY] = chash - hd[self.HEADERS_KEY] = headers - hd[self.MSGID_KEY] = msgid - - if not subject and self.SUBJECT_FIELD in headers: - hd[self.SUBJECT_KEY] = headers[self.SUBJECT_FIELD] - else: - hd[self.SUBJECT_KEY] = subject - - if not date and self.DATE_FIELD in headers: - hd[self.DATE_KEY] = headers[self.DATE_FIELD] - else: - hd[self.DATE_KEY] = date - return hd + # ----------------------------------------------------------------------- def _fdoc_already_exists(self, chash): """ @@ -885,86 +757,41 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): flags = tuple() leap_assert_type(flags, tuple) - # TODO return soledad deferred instead - observer = defer.Deferred() - d = self._do_parse(raw) - d.addCallback(lambda result: reactor.callInThread( - self._do_add_msg, result, flags, subject, date, - notify_on_disk, observer)) - return observer + # TODO ---- proxy to MessageCollection addMessage + + #observer = defer.Deferred() + #d = self._do_parse(raw) + #d.addCallback(lambda result: reactor.callInThread( + #self._do_add_msg, result, flags, subject, date, + #notify_on_disk, observer)) + #return observer + + # TODO --------------------------------------------------- + # move this to leap.mail.adaptors.soledad - # Called in thread def _do_add_msg(self, parse_result, flags, subject, date, notify_on_disk, observer): """ - Helper that creates a new message document. - Here lives the magic of the leap mail. Well, in soledad, really. - - See `add_msg` docstring for parameter info. - - :param parse_result: a tuple with the results of `self._do_parse` - :type parse_result: tuple - :param observer: a deferred that will be fired with the message - uid when the adding succeed. - :type observer: deferred """ - # TODO signal that we can delete the original message!----- - # when all the processing is done. - - # TODO add the linked-from info ! - # TODO add reference to the original message - msg, parts, chash, size, multi = parse_result + # XXX move to SoledadAdaptor write operation ... ??? # check for uniqueness -------------------------------- # Watch out! We're reserving a UID right after this! existing_uid = self._fdoc_already_exists(chash) if existing_uid: msg = self.get_msg_by_uid(existing_uid) - - # We can say the observer that we're done - # TODO return soledad deferred instead reactor.callFromThread(observer.callback, existing_uid) msg.setFlags((fields.DELETED_FLAG,), -1) return + # TODO move UID autoincrement to MessageCollection.addMessage(mailbox) # TODO S2 -- get FUCKING UID from autoincremental table - uid = self.memstore.increment_last_soledad_uid(self.mbox) - - # We can say the observer that we're done at this point, but - # before that we should make sure it has no serious consequences - # if we're issued, for instance, a fetch command right after... - # reactor.callFromThread(observer.callback, uid) - # if we did the notify, we need to invalidate the deferred - # so not to try to fire it twice. - # observer = None - - fd = self._populate_flags(flags, uid, chash, size, multi) - hd = self._populate_headr(msg, chash, subject, date) - - body_phash_fun = [walk.get_body_phash_simple, - walk.get_body_phash_multi][int(multi)] - body_phash = body_phash_fun(walk.get_payloads(msg)) - parts_map = walk.walk_msg_tree(parts, body_phash=body_phash) - - # add parts map to header doc - # (body, multi, part_map) - for key in parts_map: - hd[key] = parts_map[key] - del parts_map + #uid = self.memstore.increment_last_soledad_uid(self.mbox) + #self.set_recent_flag(uid) - hd = stringify_parts_map(hd) - # The MessageContainer expects a dict, one-indexed - cdocs = dict(enumerate(walk.get_raw_docs(msg, parts), 1)) - - self.set_recent_flag(uid) - msg_container = MessageWrapper(fd, hd, cdocs) - - # TODO S1 -- just pass this to memstore and return that deferred. - self.memstore.create_message( - self.mbox, uid, msg_container, - observer=observer, notify_on_disk=notify_on_disk) + # ------------------------------------------------------------ # # getters: specific queries @@ -1073,6 +900,10 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): the query failed. :rtype: SoledadDocument or None. """ + # USED from: + # [ ] duplicated fdoc detection + # [ ] _get_uid_from_msgidCb + # FIXME ----- use deferreds. curried = partial( self._soledad.get_from_index, @@ -1205,51 +1036,52 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): if msg_container is not None: if mem_only: - msg = LeapMessage(None, uid, self.mbox, collection=self, + msg = IMAPMessage(None, uid, self.mbox, collection=self, container=msg_container) else: # We pass a reference to soledad just to be able to retrieve # missing parts that cannot be found in the container, like # the content docs after a copy. - msg = LeapMessage(self._soledad, uid, self.mbox, + msg = IMAPMessage(self._soledad, uid, self.mbox, collection=self, container=msg_container) else: - msg = LeapMessage(self._soledad, uid, self.mbox, collection=self) + msg = IMAPMessage(self._soledad, uid, self.mbox, collection=self) if not msg.does_exist(): return None return msg - def get_all_docs(self, _type=fields.TYPE_FLAGS_VAL): - """ - Get all documents for the selected mailbox of the - passed type. By default, it returns the flag docs. - - If you want acess to the content, use __iter__ instead - - :return: a Deferred, that will fire with a list of u1db documents - :rtype: Deferred (promise of list of SoledadDocument) - """ - if _type not in fields.__dict__.values(): - raise TypeError("Wrong type passed to get_all_docs") - + # FIXME --- used where ? --------------------------------------------- + #def get_all_docs(self, _type=fields.TYPE_FLAGS_VAL): + #""" + #Get all documents for the selected mailbox of the + #passed type. By default, it returns the flag docs. +# + #If you want acess to the content, use __iter__ instead +# + #:return: a Deferred, that will fire with a list of u1db documents + #:rtype: Deferred (promise of list of SoledadDocument) + #""" + #if _type not in fields.__dict__.values(): + #raise TypeError("Wrong type passed to get_all_docs") +# # FIXME ----- either raise or return a deferred wrapper. - if sameProxiedObjects(self._soledad, None): - logger.warning('Tried to get messages but soledad is None!') - return [] - - def get_sorted_docs(docs): - all_docs = [doc for doc in docs] + #if sameProxiedObjects(self._soledad, None): + #logger.warning('Tried to get messages but soledad is None!') + #return [] +# + #def get_sorted_docs(docs): + #all_docs = [doc for doc in docs] # inneficient, but first let's grok it and then # let's worry about efficiency. # XXX FIXINDEX -- should implement order by in soledad # FIXME ---------------------------------------------- - return sorted(all_docs, key=lambda item: item.content['uid']) - - d = self._soledad.get_from_index( - fields.TYPE_MBOX_IDX, _type, self.mbox) - d.addCallback(get_sorted_docs) - return d + #return sorted(all_docs, key=lambda item: item.content['uid']) +# + #d = self._soledad.get_from_index( + #fields.TYPE_MBOX_IDX, _type, self.mbox) + #d.addCallback(get_sorted_docs) + #return d def all_soledad_uid_iter(self): """ @@ -1350,7 +1182,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): :returns: a list of LeapMessages :rtype: list """ - return [LeapMessage(self._soledad, docid, self.mbox, collection=self) + return [IMAPMessage(self._soledad, docid, self.mbox, collection=self) for docid in self.unseen_iter()] # recent messages @@ -1384,7 +1216,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): :returns: iterator of dicts with content for all messages. :rtype: iterable """ - return (LeapMessage(self._soledad, docuid, self.mbox, collection=self) + return (IMAPMessage(self._soledad, docuid, self.mbox, collection=self) for docuid in self.all_uid_iter()) def __repr__(self): -- cgit v1.2.3