summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/messages.py
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-11-25 15:04:26 +0100
committerKali Kaneko <kali@leap.se>2015-02-11 14:05:42 -0400
commit6ede495b94501a4cbdfd985dcdf4be4f582bbb9b (patch)
tree8afc3622e5afe865285ec25a4da85b0f1a14ecb5 /src/leap/mail/imap/messages.py
parentea82f75f5465de47c4a838fbd1dfe8b2030fd842 (diff)
Serializable Models + Soledad Adaptor
Diffstat (limited to 'src/leap/mail/imap/messages.py')
-rw-r--r--src/leap/mail/imap/messages.py484
1 files changed, 158 insertions, 326 deletions
diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py
index c761091..d47c8eb 100644
--- a/src/leap/mail/imap/messages.py
+++ b/src/leap/mail/imap/messages.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# messages.py
-# Copyright (C) 2013 LEAP
+# Copyright (C) 2013, 2014 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -19,30 +19,25 @@ LeapMessage and MessageCollection.
"""
import copy
import logging
-import re
import threading
import StringIO
from collections import defaultdict
-from email import message_from_string
from functools import partial
-from pycryptopp.hash import sha256
from twisted.mail import imap4
-from twisted.internet import defer, reactor
+from twisted.internet import reactor
from zope.interface import implements
from zope.proxy import sameProxiedObjects
from leap.common.check import leap_assert, leap_assert_type
from leap.common.decorators import memoized_method
from leap.common.mail import get_email_charset
-from leap.mail import walk
-from leap.mail.utils import first, find_charset, lowerdict, empty
-from leap.mail.utils import stringify_parts_map
-from leap.mail.decorators import deferred_to_thread
+from leap.mail.adaptors import soledad_indexes as indexes
+from leap.mail.constants import INBOX_NAME
+from leap.mail.utils import find_charset, empty
from leap.mail.imap.index import IndexedDB
from leap.mail.imap.fields import fields, WithMsgFields
-from leap.mail.imap.memorystore import MessageWrapper
from leap.mail.imap.messageparts import MessagePart, MessagePartDoc
from leap.mail.imap.parser import MBoxParser
@@ -59,9 +54,6 @@ logger = logging.getLogger(__name__)
# [ ] Delete incoming mail only after successful write!
# [ ] Remove UID from syncable db. Store only those indexes locally.
-MSGID_PATTERN = r"""<([\w@.]+)>"""
-MSGID_RE = re.compile(MSGID_PATTERN)
-
def try_unique_query(curried):
"""
@@ -90,28 +82,18 @@ def try_unique_query(curried):
logger.exception("Unhandled error %r" % exc)
-"""
-A dictionary that keeps one lock per mbox and uid.
-"""
-# XXX too much overhead?
-fdoc_locks = defaultdict(lambda: defaultdict(lambda: threading.Lock()))
+# FIXME remove-me
+#fdoc_locks = defaultdict(lambda: defaultdict(lambda: threading.Lock()))
-class LeapMessage(fields, MBoxParser):
+class IMAPMessage(fields, MBoxParser):
"""
The main representation of a message.
-
- It indexes the messages in one mailbox by a combination
- of uid+mailbox name.
"""
- # TODO this has to change.
- # Should index primarily by chash, and keep a local-only
- # UID table.
-
implements(imap4.IMessage)
- def __init__(self, soledad, uid, mbox, collection=None, container=None):
+ def __init__(self, soledad, uid, mbox):
"""
Initializes a LeapMessage.
@@ -129,76 +111,73 @@ class LeapMessage(fields, MBoxParser):
self._soledad = soledad
self._uid = int(uid) if uid is not None else None
self._mbox = self._parse_mailbox_name(mbox)
- self._collection = collection
- self._container = container
self.__chash = None
self.__bdoc = None
- # XXX make these properties public
-
- # XXX FIXME ------ the documents can be
- # deferreds too.... niice.
-
- @property
- def fdoc(self):
- """
- An accessor to the flags document.
- """
- if all(map(bool, (self._uid, self._mbox))):
- fdoc = None
- if self._container is not None:
- fdoc = self._container.fdoc
- if not fdoc:
- fdoc = self._get_flags_doc()
- if fdoc:
- fdoc_content = fdoc.content
- self.__chash = fdoc_content.get(
- fields.CONTENT_HASH_KEY, None)
- return fdoc
-
- @property
- def hdoc(self):
- """
- An accessor to the headers document.
- """
- container = self._container
- if container is not None:
- hdoc = self._container.hdoc
- if hdoc and not empty(hdoc.content):
- return hdoc
- hdoc = self._get_headers_doc()
-
- if container and not empty(hdoc.content):
+ # TODO collection and container are deprecated.
+
+ # TODO move to adaptor
+
+ #@property
+ #def fdoc(self):
+ #"""
+ #An accessor to the flags document.
+ #"""
+ #if all(map(bool, (self._uid, self._mbox))):
+ #fdoc = None
+ #if self._container is not None:
+ #fdoc = self._container.fdoc
+ #if not fdoc:
+ #fdoc = self._get_flags_doc()
+ #if fdoc:
+ #fdoc_content = fdoc.content
+ #self.__chash = fdoc_content.get(
+ #fields.CONTENT_HASH_KEY, None)
+ #return fdoc
+#
+ #@property
+ #def hdoc(self):
+ #"""
+ #An accessor to the headers document.
+ #"""
+ #container = self._container
+ #if container is not None:
+ #hdoc = self._container.hdoc
+ #if hdoc and not empty(hdoc.content):
+ #return hdoc
+ #hdoc = self._get_headers_doc()
+#
+ #if container and not empty(hdoc.content):
# mem-cache it
- hdoc_content = hdoc.content
- chash = hdoc_content.get(fields.CONTENT_HASH_KEY)
- hdocs = {chash: hdoc_content}
- container.memstore.load_header_docs(hdocs)
- return hdoc
-
- @property
- def chash(self):
- """
- An accessor to the content hash for this message.
- """
- if not self.fdoc:
- return None
- if not self.__chash and self.fdoc:
- self.__chash = self.fdoc.content.get(
- fields.CONTENT_HASH_KEY, None)
- return self.__chash
-
- @property
- def bdoc(self):
- """
- An accessor to the body document.
- """
- if not self.hdoc:
- return None
- if not self.__bdoc:
- self.__bdoc = self._get_body_doc()
- return self.__bdoc
+ #hdoc_content = hdoc.content
+ #chash = hdoc_content.get(fields.CONTENT_HASH_KEY)
+ #hdocs = {chash: hdoc_content}
+ #container.memstore.load_header_docs(hdocs)
+ #return hdoc
+#
+ #@property
+ #def chash(self):
+ #"""
+ #An accessor to the content hash for this message.
+ #"""
+ #if not self.fdoc:
+ #return None
+ #if not self.__chash and self.fdoc:
+ #self.__chash = self.fdoc.content.get(
+ #fields.CONTENT_HASH_KEY, None)
+ #return self.__chash
+
+ #@property
+ #def bdoc(self):
+ #"""
+ #An accessor to the body document.
+ #"""
+ #if not self.hdoc:
+ #return None
+ #if not self.__bdoc:
+ #self.__bdoc = self._get_body_doc()
+ #return self.__bdoc
# IMessage implementation
@@ -209,8 +188,13 @@ class LeapMessage(fields, MBoxParser):
:return: uid for this message
:rtype: int
"""
+ # TODO ----> return lookup in local sqlcipher table.
return self._uid
+ # --------------------------------------------------------------
+ # TODO -- from here on, all the methods should be proxied to the
+ # instance of leap.mail.mail.Message
+
def getFlags(self):
"""
Retrieve the flags associated with this Message.
@@ -253,25 +237,24 @@ class LeapMessage(fields, MBoxParser):
REMOVE = -1
SET = 0
- with fdoc_locks[mbox][uid]:
- doc = self.fdoc
- if not doc:
- logger.warning(
- "Could not find FDOC for %r:%s while setting flags!" %
- (mbox, uid))
- return
- current = doc.content[self.FLAGS_KEY]
- if mode == APPEND:
- newflags = tuple(set(tuple(current) + flags))
- elif mode == REMOVE:
- newflags = tuple(set(current).difference(set(flags)))
- elif mode == SET:
- newflags = flags
- new_fdoc = {
- self.FLAGS_KEY: newflags,
- self.SEEN_KEY: self.SEEN_FLAG in newflags,
- self.DEL_KEY: self.DELETED_FLAG in newflags}
- self._collection.memstore.update_flags(mbox, uid, new_fdoc)
+ doc = self.fdoc
+ if not doc:
+ logger.warning(
+ "Could not find FDOC for %r:%s while setting flags!" %
+ (mbox, uid))
+ return
+ current = doc.content[self.FLAGS_KEY]
+ if mode == APPEND:
+ newflags = tuple(set(tuple(current) + flags))
+ elif mode == REMOVE:
+ newflags = tuple(set(current).difference(set(flags)))
+ elif mode == SET:
+ newflags = flags
+ new_fdoc = {
+ self.FLAGS_KEY: newflags,
+ self.SEEN_KEY: self.SEEN_FLAG in newflags,
+ self.DEL_KEY: self.DELETED_FLAG in newflags}
+ self._collection.memstore.update_flags(mbox, uid, new_fdoc)
return map(str, newflags)
@@ -371,9 +354,9 @@ class LeapMessage(fields, MBoxParser):
else:
logger.warning("No FLAGS doc for %s:%s" % (self._mbox,
self._uid))
- if not size:
+ #if not size:
# XXX fallback, should remove when all migrated.
- size = self.getBodyFile().len
+ #size = self.getBodyFile().len
return size
def getHeaders(self, negate, *names):
@@ -395,6 +378,9 @@ class LeapMessage(fields, MBoxParser):
# XXX refactor together with MessagePart method
headers = self._get_headers()
+
+ # XXX keep this in the imap imessage implementation,
+ # because the server impl. expects content-type to be present.
if not headers:
logger.warning("No headers found")
return {str('content-type'): str('')}
@@ -614,64 +600,23 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser):
(the u1db index) for all the headers documents for a given mailbox.
We use it to prefetch massively all the headers for a mailbox.
This is the second massive query, after fetching all the FLAGS, that
- a MUA will do in a case where we do not have local disk cache.
+ a typical IMAP MUA will do in a case where we do not have local disk cache.
"""
HDOCS_SET_DOC = "HDOCS_SET"
templates = {
- # Message Level
-
- FLAGS_DOC: {
- fields.TYPE_KEY: fields.TYPE_FLAGS_VAL,
- fields.UID_KEY: 1, # XXX moe to a local table
- fields.MBOX_KEY: fields.INBOX_VAL,
- fields.CONTENT_HASH_KEY: "",
-
- fields.SEEN_KEY: False,
- fields.DEL_KEY: False,
- fields.FLAGS_KEY: [],
- fields.MULTIPART_KEY: False,
- fields.SIZE_KEY: 0
- },
-
- HEADERS_DOC: {
- fields.TYPE_KEY: fields.TYPE_HEADERS_VAL,
- fields.CONTENT_HASH_KEY: "",
-
- fields.DATE_KEY: "",
- fields.SUBJECT_KEY: "",
-
- fields.HEADERS_KEY: {},
- fields.PARTS_MAP_KEY: {},
- },
-
- CONTENT_DOC: {
- fields.TYPE_KEY: fields.TYPE_CONTENT_VAL,
- fields.PAYLOAD_HASH_KEY: "",
- fields.LINKED_FROM_KEY: [],
- fields.CTYPE_KEY: "", # should index by this too
-
- # should only get inmutable headers parts
- # (for indexing)
- fields.HEADERS_KEY: {},
- fields.RAW_KEY: "",
- fields.PARTS_MAP_KEY: {},
- fields.HEADERS_KEY: {},
- fields.MULTIPART_KEY: False,
- },
-
# Mailbox Level
RECENT_DOC: {
- fields.TYPE_KEY: fields.TYPE_RECENT_VAL,
- fields.MBOX_KEY: fields.INBOX_VAL,
+ "type": indexes.RECENT,
+ "mbox": INBOX_NAME,
fields.RECENTFLAGS_KEY: [],
},
HDOCS_SET_DOC: {
- fields.TYPE_KEY: fields.TYPE_HDOCS_SET_VAL,
- fields.MBOX_KEY: fields.INBOX_VAL,
+ "type": indexes.HDOCS_SET,
+ "mbox": INBOX_NAME,
fields.HDOCS_SET_KEY: [],
}
@@ -681,8 +626,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser):
# Different locks for wrapping both the u1db document getting/setting
# and the property getting/settting in an atomic operation.
- # TODO we would abstract this to a SoledadProperty class
-
+ # TODO --- deprecate ! --- use SoledadDocumentWrapper + locks
_rdoc_lock = defaultdict(lambda: threading.Lock())
_rdoc_write_lock = defaultdict(lambda: threading.Lock())
_rdoc_read_lock = defaultdict(lambda: threading.Lock())
@@ -764,81 +708,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser):
rdoc[fields.MBOX_KEY] = self.mbox
self._soledad.create_doc(rdoc)
- @deferred_to_thread
- def _do_parse(self, raw):
- """
- Parse raw message and return it along with
- relevant information about its outer level.
-
- This is done in a separate thread, and the callback is passed
- to `_do_add_msg` method.
+ # --------------------------------------------------------------------
- :param raw: the raw message
- :type raw: StringIO or basestring
- :return: msg, parts, chash, size, multi
- :rtype: tuple
- """
- msg = message_from_string(raw)
- parts = walk.get_parts(msg)
- size = len(raw)
- chash = sha256.SHA256(raw).hexdigest()
- multi = msg.is_multipart()
- return msg, parts, chash, size, multi
-
- def _populate_flags(self, flags, uid, chash, size, multi):
- """
- Return a flags doc.
-
- XXX Missing DOC -----------
- """
- fd = self._get_empty_doc(self.FLAGS_DOC)
-
- fd[self.MBOX_KEY] = self.mbox
- fd[self.UID_KEY] = uid
- fd[self.CONTENT_HASH_KEY] = chash
- fd[self.SIZE_KEY] = size
- fd[self.MULTIPART_KEY] = multi
- if flags:
- fd[self.FLAGS_KEY] = flags
- fd[self.SEEN_KEY] = self.SEEN_FLAG in flags
- fd[self.DEL_KEY] = self.DELETED_FLAG in flags
- fd[self.RECENT_KEY] = True # set always by default
- return fd
-
- def _populate_headr(self, msg, chash, subject, date):
- """
- Return a headers doc.
-
- XXX Missing DOC -----------
- """
- headers = defaultdict(list)
- for k, v in msg.items():
- headers[k].append(v)
-
- # "fix" for repeated headers.
- for k, v in headers.items():
- newline = "\n%s: " % (k,)
- headers[k] = newline.join(v)
-
- lower_headers = lowerdict(headers)
- msgid = first(MSGID_RE.findall(
- lower_headers.get('message-id', '')))
-
- hd = self._get_empty_doc(self.HEADERS_DOC)
- hd[self.CONTENT_HASH_KEY] = chash
- hd[self.HEADERS_KEY] = headers
- hd[self.MSGID_KEY] = msgid
-
- if not subject and self.SUBJECT_FIELD in headers:
- hd[self.SUBJECT_KEY] = headers[self.SUBJECT_FIELD]
- else:
- hd[self.SUBJECT_KEY] = subject
-
- if not date and self.DATE_FIELD in headers:
- hd[self.DATE_KEY] = headers[self.DATE_FIELD]
- else:
- hd[self.DATE_KEY] = date
- return hd
+ # -----------------------------------------------------------------------
def _fdoc_already_exists(self, chash):
"""
@@ -885,86 +757,41 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser):
flags = tuple()
leap_assert_type(flags, tuple)
- # TODO return soledad deferred instead
- observer = defer.Deferred()
- d = self._do_parse(raw)
- d.addCallback(lambda result: reactor.callInThread(
- self._do_add_msg, result, flags, subject, date,
- notify_on_disk, observer))
- return observer
+ # TODO ---- proxy to MessageCollection addMessage
+
+ #observer = defer.Deferred()
+ #d = self._do_parse(raw)
+ #d.addCallback(lambda result: reactor.callInThread(
+ #self._do_add_msg, result, flags, subject, date,
+ #notify_on_disk, observer))
+ #return observer
+
+ # TODO ---------------------------------------------------
+ # move this to leap.mail.adaptors.soledad
- # Called in thread
def _do_add_msg(self, parse_result, flags, subject,
date, notify_on_disk, observer):
"""
- Helper that creates a new message document.
- Here lives the magic of the leap mail. Well, in soledad, really.
-
- See `add_msg` docstring for parameter info.
-
- :param parse_result: a tuple with the results of `self._do_parse`
- :type parse_result: tuple
- :param observer: a deferred that will be fired with the message
- uid when the adding succeed.
- :type observer: deferred
"""
- # TODO signal that we can delete the original message!-----
- # when all the processing is done.
-
- # TODO add the linked-from info !
- # TODO add reference to the original message
-
msg, parts, chash, size, multi = parse_result
+ # XXX move to SoledadAdaptor write operation ... ???
# check for uniqueness --------------------------------
# Watch out! We're reserving a UID right after this!
existing_uid = self._fdoc_already_exists(chash)
if existing_uid:
msg = self.get_msg_by_uid(existing_uid)
-
- # We can say the observer that we're done
- # TODO return soledad deferred instead
reactor.callFromThread(observer.callback, existing_uid)
msg.setFlags((fields.DELETED_FLAG,), -1)
return
+ # TODO move UID autoincrement to MessageCollection.addMessage(mailbox)
# TODO S2 -- get FUCKING UID from autoincremental table
- uid = self.memstore.increment_last_soledad_uid(self.mbox)
-
- # We can say the observer that we're done at this point, but
- # before that we should make sure it has no serious consequences
- # if we're issued, for instance, a fetch command right after...
- # reactor.callFromThread(observer.callback, uid)
- # if we did the notify, we need to invalidate the deferred
- # so not to try to fire it twice.
- # observer = None
-
- fd = self._populate_flags(flags, uid, chash, size, multi)
- hd = self._populate_headr(msg, chash, subject, date)
-
- body_phash_fun = [walk.get_body_phash_simple,
- walk.get_body_phash_multi][int(multi)]
- body_phash = body_phash_fun(walk.get_payloads(msg))
- parts_map = walk.walk_msg_tree(parts, body_phash=body_phash)
-
- # add parts map to header doc
- # (body, multi, part_map)
- for key in parts_map:
- hd[key] = parts_map[key]
- del parts_map
+ #uid = self.memstore.increment_last_soledad_uid(self.mbox)
+ #self.set_recent_flag(uid)
- hd = stringify_parts_map(hd)
- # The MessageContainer expects a dict, one-indexed
- cdocs = dict(enumerate(walk.get_raw_docs(msg, parts), 1))
-
- self.set_recent_flag(uid)
- msg_container = MessageWrapper(fd, hd, cdocs)
-
- # TODO S1 -- just pass this to memstore and return that deferred.
- self.memstore.create_message(
- self.mbox, uid, msg_container,
- observer=observer, notify_on_disk=notify_on_disk)
+ # ------------------------------------------------------------
#
# getters: specific queries
@@ -1073,6 +900,10 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser):
the query failed.
:rtype: SoledadDocument or None.
"""
+ # USED from:
+ # [ ] duplicated fdoc detection
+ # [ ] _get_uid_from_msgidCb
+
# FIXME ----- use deferreds.
curried = partial(
self._soledad.get_from_index,
@@ -1205,51 +1036,52 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser):
if msg_container is not None:
if mem_only:
- msg = LeapMessage(None, uid, self.mbox, collection=self,
+ msg = IMAPMessage(None, uid, self.mbox, collection=self,
container=msg_container)
else:
# We pass a reference to soledad just to be able to retrieve
# missing parts that cannot be found in the container, like
# the content docs after a copy.
- msg = LeapMessage(self._soledad, uid, self.mbox,
+ msg = IMAPMessage(self._soledad, uid, self.mbox,
collection=self, container=msg_container)
else:
- msg = LeapMessage(self._soledad, uid, self.mbox, collection=self)
+ msg = IMAPMessage(self._soledad, uid, self.mbox, collection=self)
if not msg.does_exist():
return None
return msg
- def get_all_docs(self, _type=fields.TYPE_FLAGS_VAL):
- """
- Get all documents for the selected mailbox of the
- passed type. By default, it returns the flag docs.
-
- If you want acess to the content, use __iter__ instead
-
- :return: a Deferred, that will fire with a list of u1db documents
- :rtype: Deferred (promise of list of SoledadDocument)
- """
- if _type not in fields.__dict__.values():
- raise TypeError("Wrong type passed to get_all_docs")
-
+ # FIXME --- used where ? ---------------------------------------------
+ #def get_all_docs(self, _type=fields.TYPE_FLAGS_VAL):
+ #"""
+ #Get all documents for the selected mailbox of the
+ #passed type. By default, it returns the flag docs.
+#
+ #If you want acess to the content, use __iter__ instead
+#
+ #:return: a Deferred, that will fire with a list of u1db documents
+ #:rtype: Deferred (promise of list of SoledadDocument)
+ #"""
+ #if _type not in fields.__dict__.values():
+ #raise TypeError("Wrong type passed to get_all_docs")
+#
# FIXME ----- either raise or return a deferred wrapper.
- if sameProxiedObjects(self._soledad, None):
- logger.warning('Tried to get messages but soledad is None!')
- return []
-
- def get_sorted_docs(docs):
- all_docs = [doc for doc in docs]
+ #if sameProxiedObjects(self._soledad, None):
+ #logger.warning('Tried to get messages but soledad is None!')
+ #return []
+#
+ #def get_sorted_docs(docs):
+ #all_docs = [doc for doc in docs]
# inneficient, but first let's grok it and then
# let's worry about efficiency.
# XXX FIXINDEX -- should implement order by in soledad
# FIXME ----------------------------------------------
- return sorted(all_docs, key=lambda item: item.content['uid'])
-
- d = self._soledad.get_from_index(
- fields.TYPE_MBOX_IDX, _type, self.mbox)
- d.addCallback(get_sorted_docs)
- return d
+ #return sorted(all_docs, key=lambda item: item.content['uid'])
+#
+ #d = self._soledad.get_from_index(
+ #fields.TYPE_MBOX_IDX, _type, self.mbox)
+ #d.addCallback(get_sorted_docs)
+ #return d
def all_soledad_uid_iter(self):
"""
@@ -1350,7 +1182,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser):
:returns: a list of LeapMessages
:rtype: list
"""
- return [LeapMessage(self._soledad, docid, self.mbox, collection=self)
+ return [IMAPMessage(self._soledad, docid, self.mbox, collection=self)
for docid in self.unseen_iter()]
# recent messages
@@ -1384,7 +1216,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser):
:returns: iterator of dicts with content for all messages.
:rtype: iterable
"""
- return (LeapMessage(self._soledad, docuid, self.mbox, collection=self)
+ return (IMAPMessage(self._soledad, docuid, self.mbox, collection=self)
for docuid in self.all_uid_iter())
def __repr__(self):