diff options
Diffstat (limited to 'src/leap/mail/imap/messages.py')
-rw-r--r-- | src/leap/mail/imap/messages.py | 1104 |
1 files changed, 378 insertions, 726 deletions
diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py index 34304ea..25fc55f 100644 --- a/src/leap/mail/imap/messages.py +++ b/src/leap/mail/imap/messages.py @@ -20,17 +20,15 @@ LeapMessage and MessageCollection. import copy import logging import re -import time import threading import StringIO -from collections import defaultdict, namedtuple +from collections import defaultdict from functools import partial from twisted.mail import imap4 from twisted.internet import defer from twisted.python import log -from u1db import errors as u1db_errors from zope.interface import implements from zope.proxy import sameProxiedObjects @@ -38,33 +36,30 @@ from leap.common.check import leap_assert, leap_assert_type from leap.common.decorators import memoized_method from leap.common.mail import get_email_charset from leap.mail import walk -from leap.mail.utils import first, find_charset -from leap.mail.decorators import deferred +from leap.mail.utils import first, find_charset, lowerdict, empty +from leap.mail.utils import stringify_parts_map +from leap.mail.decorators import deferred_to_thread from leap.mail.imap.index import IndexedDB from leap.mail.imap.fields import fields, WithMsgFields +from leap.mail.imap.memorystore import MessageWrapper +from leap.mail.imap.messageparts import MessagePart from leap.mail.imap.parser import MailParser, MBoxParser -from leap.mail.messageflow import IMessageConsumer logger = logging.getLogger(__name__) # TODO ------------------------------------------------------------ +# [ ] Add ref to incoming message during add_msg # [ ] Add linked-from info. +# * Need a new type of documents: linkage info. +# * HDOCS are linked from FDOCs (ref to chash) +# * CDOCS are linked from HDOCS (ref to chash) + # [ ] Delete incoming mail only after successful write! # [ ] Remove UID from syncable db. Store only those indexes locally. - -def lowerdict(_dict): - """ - Return a dict with the keys in lowercase. - - :param _dict: the dict to convert - :rtype: dict - """ - # TODO should properly implement a CaseInsensitive dict. - # Look into requests code. - return dict((key.lower(), value) - for key, value in _dict.items()) +MSGID_PATTERN = r"""<([\w@.]+)>""" +MSGID_RE = re.compile(MSGID_PATTERN) def try_unique_query(curried): @@ -92,232 +87,6 @@ def try_unique_query(curried): except Exception as exc: logger.exception("Unhandled error %r" % exc) -MSGID_PATTERN = r"""<([\w@.]+)>""" -MSGID_RE = re.compile(MSGID_PATTERN) - - -class MessagePart(object): - """ - IMessagePart implementor. - It takes a subpart message and is able to find - the inner parts. - - Excusatio non petita: see the interface documentation. - """ - - implements(imap4.IMessagePart) - - def __init__(self, soledad, part_map): - """ - Initializes the MessagePart. - - :param part_map: a dictionary containing the parts map for this - message - :type part_map: dict - """ - # TODO - # It would be good to pass the uid/mailbox also - # for references while debugging. - - # We have a problem on bulk moves, and is - # that when the fetch on the new mailbox is done - # the parts maybe are not complete. - # So we should be able to fail with empty - # docs until we solve that. The ideal would be - # to gather the results of the deferred operations - # to signal the operation is complete. - #leap_assert(part_map, "part map dict cannot be null") - self._soledad = soledad - self._pmap = part_map - - def getSize(self): - """ - Return the total size, in octets, of this message part. - - :return: size of the message, in octets - :rtype: int - """ - if not self._pmap: - return 0 - size = self._pmap.get('size', None) - if not size: - logger.error("Message part cannot find size in the partmap") - return size - - def getBodyFile(self): - """ - Retrieve a file object containing only the body of this message. - - :return: file-like object opened for reading - :rtype: StringIO - """ - fd = StringIO.StringIO() - if self._pmap: - multi = self._pmap.get('multi') - if not multi: - phash = self._pmap.get("phash", None) - else: - pmap = self._pmap.get('part_map') - first_part = pmap.get('1', None) - if first_part: - phash = first_part['phash'] - - if not phash: - logger.warning("Could not find phash for this subpart!") - payload = str("") - else: - payload = self._get_payload_from_document(phash) - - else: - logger.warning("Message with no part_map!") - payload = str("") - - if payload: - content_type = self._get_ctype_from_document(phash) - charset = find_charset(content_type) - logger.debug("Got charset from header: %s" % (charset,)) - if charset is None: - charset = self._get_charset(payload) - logger.debug("Got charset: %s" % (charset,)) - try: - payload = payload.encode(charset) - except (UnicodeEncodeError, UnicodeDecodeError) as e: - logger.error("Unicode error, using 'replace'. {0!r}".format(e)) - payload = payload.encode(charset, 'replace') - - fd.write(payload) - fd.seek(0) - return fd - - # TODO cache the phash retrieval - def _get_payload_from_document(self, phash): - """ - Gets the message payload from the content document. - - :param phash: the payload hash to retrieve by. - :type phash: basestring - """ - cdocs = self._soledad.get_from_index( - fields.TYPE_P_HASH_IDX, - fields.TYPE_CONTENT_VAL, str(phash)) - - cdoc = first(cdocs) - if not cdoc: - logger.warning( - "Could not find the content doc " - "for phash %s" % (phash,)) - payload = cdoc.content.get(fields.RAW_KEY, "") - return payload - - # TODO cache the pahash retrieval - def _get_ctype_from_document(self, phash): - """ - Gets the content-type from the content document. - - :param phash: the payload hash to retrieve by. - :type phash: basestring - """ - cdocs = self._soledad.get_from_index( - fields.TYPE_P_HASH_IDX, - fields.TYPE_CONTENT_VAL, str(phash)) - - cdoc = first(cdocs) - if not cdoc: - logger.warning( - "Could not find the content doc " - "for phash %s" % (phash,)) - ctype = cdoc.content.get('ctype', "") - return ctype - - @memoized_method - def _get_charset(self, stuff): - # TODO put in a common class with LeapMessage - """ - Gets (guesses?) the charset of a payload. - - :param stuff: the stuff to guess about. - :type stuff: basestring - :returns: charset - """ - # XXX existential doubt 2. shouldn't we make the scope - # of the decorator somewhat more persistent? - # ah! yes! and put memory bounds. - return get_email_charset(unicode(stuff)) - - def getHeaders(self, negate, *names): - """ - Retrieve a group of message headers. - - :param names: The names of the headers to retrieve or omit. - :type names: tuple of str - - :param negate: If True, indicates that the headers listed in names - should be omitted from the return value, rather - than included. - :type negate: bool - - :return: A mapping of header field names to header field values - :rtype: dict - """ - if not self._pmap: - logger.warning("No pmap in Subpart!") - return {} - headers = dict(self._pmap.get("headers", [])) - - # twisted imap server expects *some* headers to be lowercase - # We could use a CaseInsensitiveDict here... - headers = dict( - (str(key), str(value)) if key.lower() != "content-type" - else (str(key.lower()), str(value)) - for (key, value) in headers.items()) - - names = map(lambda s: s.upper(), names) - if negate: - cond = lambda key: key.upper() not in names - else: - cond = lambda key: key.upper() in names - - # unpack and filter original dict by negate-condition - filter_by_cond = [ - map(str, (key, val)) for - key, val in headers.items() - if cond(key)] - filtered = dict(filter_by_cond) - return filtered - - def isMultipart(self): - """ - Return True if this message is multipart. - """ - if not self._pmap: - logger.warning("Could not get part map!") - return False - multi = self._pmap.get("multi", False) - return multi - - def getSubPart(self, part): - """ - Retrieve a MIME submessage - - :type part: C{int} - :param part: The number of the part to retrieve, indexed from 0. - :raise IndexError: Raised if the specified part does not exist. - :raise TypeError: Raised if this message is not multipart. - :rtype: Any object implementing C{IMessagePart}. - :return: The specified sub-part. - """ - if not self.isMultipart(): - raise TypeError - sub_pmap = self._pmap.get("part_map", {}) - try: - part_map = sub_pmap[str(part + 1)] - except KeyError: - logger.debug("getSubpart for %s: KeyError" % (part,)) - raise IndexError - - # XXX check for validity - return MessagePart(self._soledad, part_map) - class LeapMessage(fields, MailParser, MBoxParser): """ @@ -328,12 +97,14 @@ class LeapMessage(fields, MailParser, MBoxParser): """ # TODO this has to change. - # Should index primarily by chash, and keep a local-lonly + # Should index primarily by chash, and keep a local-only # UID table. implements(imap4.IMessage) - def __init__(self, soledad, uid, mbox, collection=None): + flags_lock = threading.Lock() + + def __init__(self, soledad, uid, mbox, collection=None, container=None): """ Initializes a LeapMessage. @@ -342,32 +113,54 @@ class LeapMessage(fields, MailParser, MBoxParser): :param uid: the UID for the message. :type uid: int or basestring :param mbox: the mbox this message belongs to - :type mbox: basestring + :type mbox: str or unicode :param collection: a reference to the parent collection object :type collection: MessageCollection + :param container: a IMessageContainer implementor instance + :type container: IMessageContainer """ MailParser.__init__(self) self._soledad = soledad self._uid = int(uid) self._mbox = self._parse_mailbox_name(mbox) self._collection = collection + self._container = container self.__chash = None self.__bdoc = None + # XXX make these properties public + @property def _fdoc(self): """ An accessor to the flags document. """ if all(map(bool, (self._uid, self._mbox))): - fdoc = self._get_flags_doc() + fdoc = None + if self._container is not None: + fdoc = self._container.fdoc + if not fdoc: + fdoc = self._get_flags_doc() if fdoc: - self.__chash = fdoc.content.get( + fdoc_content = fdoc.content + self.__chash = fdoc_content.get( fields.CONTENT_HASH_KEY, None) return fdoc @property + def _hdoc(self): + """ + An accessor to the headers document. + """ + if self._container is not None: + hdoc = self._container.hdoc + if hdoc and not empty(hdoc.content): + return hdoc + # XXX cache this into the memory store !!! + return self._get_headers_doc() + + @property def _chash(self): """ An accessor to the content hash for this message. @@ -380,13 +173,6 @@ class LeapMessage(fields, MailParser, MBoxParser): return self.__chash @property - def _hdoc(self): - """ - An accessor to the headers document. - """ - return self._get_headers_doc() - - @property def _bdoc(self): """ An accessor to the body document. @@ -415,39 +201,33 @@ class LeapMessage(fields, MailParser, MBoxParser): :return: The flags, represented as strings :rtype: tuple """ - if self._uid is None: - return [] uid = self._uid - flags = [] + flags = set([]) fdoc = self._fdoc if fdoc: - flags = fdoc.content.get(self.FLAGS_KEY, None) + flags = set(fdoc.content.get(self.FLAGS_KEY, None)) msgcol = self._collection # We treat the recent flag specially: gotten from # a mailbox-level document. if msgcol and uid in msgcol.recent_flags: - flags.append(fields.RECENT_FLAG) + flags.add(fields.RECENT_FLAG) if flags: flags = map(str, flags) return tuple(flags) - # setFlags, addFlags, removeFlags are not in the interface spec - # but we use them with store command. + # setFlags not in the interface spec but we use it with store command. - def setFlags(self, flags): + def setFlags(self, flags, mode): """ Sets the flags for this message - Returns a SoledadDocument that needs to be updated by the caller. - :param flags: the flags to update in the message. :type flags: tuple of str - - :return: a SoledadDocument instance - :rtype: SoledadDocument + :param mode: the mode for setting. 1 is append, -1 is remove, 0 set. + :type mode: int """ leap_assert(isinstance(flags, tuple), "flags need to be a tuple") log.msg('setting flags: %s (%s)' % (self._uid, flags)) @@ -458,42 +238,36 @@ class LeapMessage(fields, MailParser, MBoxParser): "Could not find FDOC for %s:%s while setting flags!" % (self._mbox, self._uid)) return - doc.content[self.FLAGS_KEY] = flags - doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags - doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags - self._soledad.put_doc(doc) - - def addFlags(self, flags): - """ - Adds flags to this message. - - Returns a SoledadDocument that needs to be updated by the caller. - :param flags: the flags to add to the message. - :type flags: tuple of str - - :return: a SoledadDocument instance - :rtype: SoledadDocument - """ - leap_assert(isinstance(flags, tuple), "flags need to be a tuple") - oldflags = self.getFlags() - self.setFlags(tuple(set(flags + oldflags))) - - def removeFlags(self, flags): - """ - Remove flags from this message. - - Returns a SoledadDocument that needs to be updated by the caller. - - :param flags: the flags to be removed from the message. - :type flags: tuple of str - - :return: a SoledadDocument instance - :rtype: SoledadDocument - """ - leap_assert(isinstance(flags, tuple), "flags need to be a tuple") - oldflags = self.getFlags() - self.setFlags(tuple(set(oldflags) - set(flags))) + APPEND = 1 + REMOVE = -1 + SET = 0 + + with self.flags_lock: + current = doc.content[self.FLAGS_KEY] + if mode == APPEND: + newflags = tuple(set(tuple(current) + flags)) + elif mode == REMOVE: + newflags = tuple(set(current).difference(set(flags))) + elif mode == SET: + newflags = flags + + # We could defer this, but I think it's better + # to put it under the lock... + doc.content[self.FLAGS_KEY] = newflags + doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags + doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags + + if self._collection.memstore is not None: + log.msg("putting message in collection") + self._collection.memstore.put_message( + self._mbox, self._uid, + MessageWrapper(fdoc=doc.content, new=False, dirty=True, + docs_id={'fdoc': doc.doc_id})) + else: + # fallback for non-memstore initializations. + self._soledad.put_doc(doc) + return map(str, newflags) def getInternalDate(self): """ @@ -519,29 +293,42 @@ class LeapMessage(fields, MailParser, MBoxParser): :return: file-like object opened for reading :rtype: StringIO """ + def write_fd(body): + fd.write(body) + fd.seek(0) + return fd + # TODO refactor with getBodyFile in MessagePart + fd = StringIO.StringIO() - bdoc = self._bdoc - if bdoc: - body = self._bdoc.content.get(self.RAW_KEY, "") - content_type = bdoc.content.get('content-type', "") + + if self._bdoc is not None: + bdoc_content = self._bdoc.content + if empty(bdoc_content): + logger.warning("No BDOC content found for message!!!") + return write_fd("") + + body = bdoc_content.get(self.RAW_KEY, "") + content_type = bdoc_content.get('content-type', "") charset = find_charset(content_type) + logger.debug('got charset from content-type: %s' % charset) if charset is None: charset = self._get_charset(body) try: - body = body.encode(charset) - except UnicodeError as e: - logger.error("Unicode error, using 'replace'. {0!r}".format(e)) + if isinstance(body, unicode): + body = body.encode(charset) + except UnicodeError as exc: + logger.error( + "Unicode error, using 'replace'. {0!r}".format(exc)) + logger.debug("Attempted to encode with: %s" % charset) body = body.encode(charset, 'replace') + finally: + return write_fd(body) # We are still returning funky characters from here. else: logger.warning("No BDOC found for message.") - body = str("") - - fd.write(body) - fd.seek(0) - return fd + return write_fd("") @memoized_method def _get_charset(self, stuff): @@ -552,11 +339,10 @@ class LeapMessage(fields, MailParser, MBoxParser): :type stuff: basestring :returns: charset """ - # TODO get from subpart headers - # XXX existential doubt 2. shouldn't we make the scope + # XXX shouldn't we make the scope # of the decorator somewhat more persistent? # ah! yes! and put memory bounds. - return get_email_charset(unicode(stuff)) + return get_email_charset(stuff) def getSize(self): """ @@ -567,7 +353,8 @@ class LeapMessage(fields, MailParser, MBoxParser): """ size = None if self._fdoc: - size = self._fdoc.content.get(self.SIZE_KEY, False) + fdoc_content = self._fdoc.content + size = fdoc_content.get(self.SIZE_KEY, False) else: logger.warning("No FLAGS doc for %s:%s" % (self._mbox, self._uid)) @@ -592,6 +379,8 @@ class LeapMessage(fields, MailParser, MBoxParser): :rtype: dict """ # TODO split in smaller methods + # XXX refactor together with MessagePart method + headers = self._get_headers() if not headers: logger.warning("No headers found") @@ -608,11 +397,10 @@ class LeapMessage(fields, MailParser, MBoxParser): # default to most likely standard charset = find_charset(headers, "utf-8") - - # twisted imap server expects *some* headers to be lowercase - # XXX refactor together with MessagePart method headers2 = dict() for key, value in headers.items(): + # twisted imap server expects *some* headers to be lowercase + # We could use a CaseInsensitiveDict here... if key.lower() == "content-type": key = key.lower() @@ -621,10 +409,13 @@ class LeapMessage(fields, MailParser, MBoxParser): if not isinstance(value, str): value = value.encode(charset, 'replace') + if value.endswith(";"): + # bastards + value = value[:-1] + # filter original dict by negate-condition if cond(key): headers2[key] = value - return headers2 def _get_headers(self): @@ -632,7 +423,8 @@ class LeapMessage(fields, MailParser, MBoxParser): Return the headers dict for this message. """ if self._hdoc is not None: - headers = self._hdoc.content.get(self.HEADERS_KEY, {}) + hdoc_content = self._hdoc.content + headers = hdoc_content.get(self.HEADERS_KEY, {}) return headers else: @@ -646,7 +438,8 @@ class LeapMessage(fields, MailParser, MBoxParser): Return True if this message is multipart. """ if self._fdoc: - is_multipart = self._fdoc.content.get(self.MULTIPART_KEY, False) + fdoc_content = self._fdoc.content + is_multipart = fdoc_content.get(self.MULTIPART_KEY, False) return is_multipart else: logger.warning( @@ -688,9 +481,15 @@ class LeapMessage(fields, MailParser, MBoxParser): logger.warning("Tried to get part but no HDOC found!") return None - pmap = self._hdoc.content.get(fields.PARTS_MAP_KEY, {}) + hdoc_content = self._hdoc.content + pmap = hdoc_content.get(fields.PARTS_MAP_KEY, {}) + + # remember, lads, soledad is using strings in its keys, + # not integers! return pmap[str(part)] + # XXX moved to memory store + # move the rest too. ------------------------------------------ def _get_flags_doc(self): """ Return the document that keeps the flags for this @@ -724,16 +523,31 @@ class LeapMessage(fields, MailParser, MBoxParser): Return the document that keeps the body for this message. """ - body_phash = self._hdoc.content.get( + hdoc_content = self._hdoc.content + body_phash = hdoc_content.get( fields.BODY_KEY, None) if not body_phash: logger.warning("No body phash for this document!") return None - body_docs = self._soledad.get_from_index( - fields.TYPE_P_HASH_IDX, - fields.TYPE_CONTENT_VAL, str(body_phash)) - return first(body_docs) + # XXX get from memstore too... + # if memstore: memstore.get_phrash + # memstore should keep a dict with weakrefs to the + # phash doc... + + if self._container is not None: + bdoc = self._container.memstore.get_cdoc_from_phash(body_phash) + if not empty(bdoc) and not empty(bdoc.content): + return bdoc + + # no memstore, or no body doc found there + if self._soledad: + body_docs = self._soledad.get_from_index( + fields.TYPE_P_HASH_IDX, + fields.TYPE_CONTENT_VAL, str(body_phash)) + return first(body_docs) + else: + logger.error("No phash in container, and no soledad found!") def __getitem__(self, key): """ @@ -748,216 +562,19 @@ class LeapMessage(fields, MailParser, MBoxParser): """ return self._fdoc.content.get(key, None) - # setters - - # XXX to be used in the messagecopier interface?! - - def set_uid(self, uid): - """ - Set new uid for this message. - - :param uid: the new uid - :type uid: basestring - """ - # XXX dangerous! lock? - self._uid = uid - d = self._fdoc - d.content[self.UID_KEY] = uid - self._soledad.put_doc(d) - - def set_mbox(self, mbox): - """ - Set new mbox for this message. - - :param mbox: the new mbox - :type mbox: basestring - """ - # XXX dangerous! lock? - self._mbox = mbox - d = self._fdoc - d.content[self.MBOX_KEY] = mbox - self._soledad.put_doc(d) - - # destructor - - @deferred - def remove(self): - """ - Remove all docs associated with this message. - """ - # XXX For the moment we are only removing the flags and headers - # docs. The rest we leave there polluting your hard disk, - # until we think about a good way of deorphaning. - # Maybe a crawler of unreferenced docs. - - # XXX implement elijah's idea of using a PUT document as a - # token to ensure consistency in the removal. - - uid = self._uid - - fd = self._get_flags_doc() - #hd = self._get_headers_doc() - #bd = self._get_body_doc() - #docs = [fd, hd, bd] - - docs = [fd] - - for d in filter(None, docs): - try: - self._soledad.delete_doc(d) - except Exception as exc: - logger.error(exc) - return uid - def does_exist(self): """ - Return True if there is actually a flags message for this + Return True if there is actually a flags document for this UID and mbox. """ - return self._fdoc is not None - - -class ContentDedup(object): - """ - Message deduplication. - - We do a query for the content hashes before writing to our beloved - sqlcipher backend of Soledad. This means, by now, that: - - 1. We will not store the same attachment twice, only the hash of it. - 2. We will not store the same message body twice, only the hash of it. - - The first case is useful if you are always receiving the same old memes - from unwary friends that still have not discovered that 4chan is the - generator of the internet. The second will save your day if you have - initiated session with the same account in two different machines. I also - wonder why would you do that, but let's respect each other choices, like - with the religious celebrations, and assume that one day we'll be able - to run Bitmask in completely free phones. Yes, I mean that, the whole GSM - Stack. - """ - - def _content_does_exist(self, doc): - """ - Check whether we already have a content document for a payload - with this hash in our database. - - :param doc: tentative body document - :type doc: dict - :returns: True if that happens, False otherwise. - """ - if not doc: - return False - phash = doc[fields.PAYLOAD_HASH_KEY] - attach_docs = self._soledad.get_from_index( - fields.TYPE_P_HASH_IDX, - fields.TYPE_CONTENT_VAL, str(phash)) - if not attach_docs: - return False - - if len(attach_docs) != 1: - logger.warning("Found more than one copy of phash %s!" - % (phash,)) - logger.debug("Found attachment doc with that hash! Skipping save!") - return True - - -SoledadWriterPayload = namedtuple( - 'SoledadWriterPayload', ['mode', 'payload']) - -# TODO we could consider using enum here: -# https://pypi.python.org/pypi/enum - -SoledadWriterPayload.CREATE = 1 -SoledadWriterPayload.PUT = 2 -SoledadWriterPayload.CONTENT_CREATE = 3 - - -""" -SoledadDocWriter was used to avoid writing to the db from multiple threads. -Its use here has been deprecated in favor of a local rw_lock in the client. -But we might want to reuse in in the near future to implement priority queues. -""" - - -class SoledadDocWriter(object): - """ - This writer will create docs serially in the local soledad database. - """ - - implements(IMessageConsumer) - - def __init__(self, soledad): - """ - Initialize the writer. - - :param soledad: the soledad instance - :type soledad: Soledad - """ - self._soledad = soledad - - def _get_call_for_item(self, item): - """ - Return the proper call type for a given item. - - :param item: one of the types defined under the - attributes of SoledadWriterPayload - :type item: int - """ - call = None - payload = item.payload - - if item.mode == SoledadWriterPayload.CREATE: - call = self._soledad.create_doc - elif (item.mode == SoledadWriterPayload.CONTENT_CREATE - and not self._content_does_exist(payload)): - call = self._soledad.create_doc - elif item.mode == SoledadWriterPayload.PUT: - call = self._soledad.put_doc - return call - - def _process(self, queue): - """ - Return the item and the proper call type for the next - item in the queue if any. + return not empty(self._fdoc) - :param queue: the queue from where we'll pick item. - :type queue: Queue - """ - item = queue.get() - call = self._get_call_for_item(item) - return item, call - - def consume(self, queue): - """ - Creates a new document in soledad db. - - :param queue: queue to get item from, with content of the document - to be inserted. - :type queue: Queue - """ - empty = queue.empty() - while not empty: - item, call = self._process(queue) - - if call: - # XXX should handle the delete case - # should handle errors - try: - call(item.payload) - except u1db_errors.RevisionConflict as exc: - logger.error("Error: %r" % (exc,)) - raise exc - empty = queue.empty() - - -class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, - ContentDedup): +class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): """ A collection of messages, surprisingly. - It is tied to a selected mailbox name that is passed to constructor. + It is tied to a selected mailbox name that is passed to its constructor. Implements a filter query over the messages contained in a soledad database. """ @@ -1058,7 +675,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, _hdocset_lock = threading.Lock() _hdocset_property_lock = threading.Lock() - def __init__(self, mbox=None, soledad=None): + def __init__(self, mbox=None, soledad=None, memstore=None): """ Constructor for MessageCollection. @@ -1068,13 +685,18 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, MessageCollection for each mailbox, instead of treating them as a property of each message. + We are passed an instance of MemoryStore, the same for the + SoledadBackedAccount, that we use as a read cache and a buffer + for writes. + :param mbox: the name of the mailbox. It is the name with which we filter the query over the - messages database + messages database. :type mbox: str - :param soledad: Soledad database :type soledad: Soledad instance + :param memstore: a MemoryStore instance + :type memstore: MemoryStore """ MailParser.__init__(self) leap_assert(mbox, "Need a mailbox name to initialize") @@ -1084,15 +706,22 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, leap_assert(soledad, "Need a soledad instance to initialize") # okay, all in order, keep going... + self.mbox = self._parse_mailbox_name(mbox) + + # XXX get a SoledadStore passed instead self._soledad = soledad + self.memstore = memstore + self.__rflags = None self.__hdocset = None self.initialize_db() # ensure that we have a recent-flags and a hdocs-sec doc self._get_or_create_rdoc() - self._get_or_create_hdocset() + + # Not for now... + #self._get_or_create_hdocset() def _get_empty_doc(self, _type=FLAGS_DOC): """ @@ -1210,17 +839,21 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, :type chash: basestring :return: False, if it does not exist, or UID. """ - exist = self._get_fdoc_from_chash(chash) + exist = False + if self.memstore is not None: + exist = self.memstore.get_fdoc_from_chash(chash, self.mbox) + + if not exist: + exist = self._get_fdoc_from_chash(chash) if exist: return exist.content.get(fields.UID_KEY, "unknown-uid") else: return False - @deferred - def add_msg(self, raw, subject=None, flags=None, date=None, uid=1): + def add_msg(self, raw, subject=None, flags=None, date=None, uid=None, + notify_on_disk=False): """ Creates a new message document. - Here lives the magic of the leap mail. Well, in soledad, really. :param raw: the raw message :type raw: str @@ -1236,27 +869,63 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, :param uid: the message uid for this mailbox :type uid: int - """ - # TODO signal that we can delete the original message!----- - # when all the processing is done. - - # TODO add the linked-from info ! + :return: a deferred that will be fired with the message + uid when the adding succeed. + :rtype: deferred + """ logger.debug('adding message') if flags is None: flags = tuple() leap_assert_type(flags, tuple) + d = defer.Deferred() + self._do_add_msg(raw, flags, subject, date, notify_on_disk, d) + return d + + # We SHOULD defer this (or the heavy load here) to the thread pool, + # but it gives troubles with the QSocketNotifier used by Qt... + def _do_add_msg(self, raw, flags, subject, date, notify_on_disk, observer): + """ + Helper that creates a new message document. + Here lives the magic of the leap mail. Well, in soledad, really. + + See `add_msg` docstring for parameter info. + + :param observer: a deferred that will be fired with the message + uid when the adding succeed. + :type observer: deferred + """ + # TODO signal that we can delete the original message!----- + # when all the processing is done. + + # TODO add the linked-from info ! + # TODO add reference to the original message + # parse msg, chash, size, multi = self._do_parse(raw) - # check for uniqueness. - if self._fdoc_already_exists(chash): - logger.warning("We already have that message in this mailbox.") - # note that this operation will leave holes in the UID sequence, - # but we're gonna change that all the same for a local-only table. - # so not touch it by the moment. - return False + # check for uniqueness -------------------------------- + # XXX profiler says that this test is costly. + # So we probably should just do an in-memory check and + # move the complete check to the soledad writer? + # Watch out! We're reserving a UID right after this! + existing_uid = self._fdoc_already_exists(chash) + if existing_uid: + logger.warning("We already have that message in this " + "mailbox, unflagging as deleted") + uid = existing_uid + msg = self.get_msg_by_uid(uid) + msg.setFlags((fields.DELETED_FLAG,), -1) + + # XXX if this is deferred to thread again we should not use + # the callback in the deferred thread, but return and + # call the callback from the caller fun... + observer.callback(uid) + return + + uid = self.memstore.increment_last_soledad_uid(self.mbox) + logger.info("ADDING MSG WITH UID: %s" % uid) fd = self._populate_flags(flags, uid, chash, size, multi) hd = self._populate_headr(msg, chash, subject, date) @@ -1273,48 +942,16 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, hd[key] = parts_map[key] del parts_map - # Saving ---------------------------------------- - self.set_recent_flag(uid) - - # first, regular docs: flags and headers - self._soledad.create_doc(fd) - # XXX should check for content duplication on headers too - # but with chash. !!! - hdoc = self._soledad.create_doc(hd) - # We add the newly created hdoc to the fast-access set of - # headers documents associated with the mailbox. - self.add_hdocset_docid(hdoc.doc_id) - - # and last, but not least, try to create - # content docs if not already there. - cdocs = walk.get_raw_docs(msg, parts) - for cdoc in cdocs: - if not self._content_does_exist(cdoc): - self._soledad.create_doc(cdoc) + hd = stringify_parts_map(hd) - def _remove_cb(self, result): - return result - - def remove_all_deleted(self): - """ - Removes all messages flagged as deleted. - """ - delete_deferl = [] - for msg in self.get_deleted(): - delete_deferl.append(msg.remove()) - d1 = defer.gatherResults(delete_deferl, consumeErrors=True) - d1.addCallback(self._remove_cb) - return d1 + # The MessageContainer expects a dict, one-indexed + cdocs = dict(enumerate(walk.get_raw_docs(msg, parts), 1)) - def remove(self, msg): - """ - Remove a given msg. - :param msg: the message to be removed - :type msg: LeapMessage - """ - d = msg.remove() - d.addCallback(self._remove_cb) - return d + self.set_recent_flag(uid) + msg_container = MessageWrapper(fd, hd, cdocs) + self.memstore.create_message(self.mbox, uid, msg_container, + observer=observer, + notify_on_disk=notify_on_disk) # # getters: specific queries @@ -1326,32 +963,59 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, """ An accessor for the recent-flags set for this mailbox. """ - if not self.__rflags: + # XXX check if we should remove this + if self.__rflags is not None: + return self.__rflags + + if self.memstore is not None: with self._rdoc_lock: - rdoc = self._get_recent_doc() - self.__rflags = set(rdoc.content.get( - fields.RECENTFLAGS_KEY, [])) - return self.__rflags + rflags = self.memstore.get_recent_flags(self.mbox) + if not rflags: + # not loaded in the memory store yet. + # let's fetch them from soledad... + rdoc = self._get_recent_doc() + rflags = set(rdoc.content.get( + fields.RECENTFLAGS_KEY, [])) + # ...and cache them now. + self.memstore.load_recent_flags( + self.mbox, + {'doc_id': rdoc.doc_id, 'set': rflags}) + return rflags + + #else: + # fallback for cases without memory store + #with self._rdoc_lock: + #rdoc = self._get_recent_doc() + #self.__rflags = set(rdoc.content.get( + #fields.RECENTFLAGS_KEY, [])) + #return self.__rflags def _set_recent_flags(self, value): """ Setter for the recent-flags set for this mailbox. """ - with self._rdoc_lock: - rdoc = self._get_recent_doc() - newv = set(value) - self.__rflags = newv - rdoc.content[fields.RECENTFLAGS_KEY] = list(newv) - # XXX should deferLater 0 it? - self._soledad.put_doc(rdoc) + if self.memstore is not None: + self.memstore.set_recent_flags(self.mbox, value) + + #else: + # fallback for cases without memory store + #with self._rdoc_lock: + #rdoc = self._get_recent_doc() + #newv = set(value) + #self.__rflags = newv + #rdoc.content[fields.RECENTFLAGS_KEY] = list(newv) + # XXX should deferLater 0 it? + #self._soledad.put_doc(rdoc) recent_flags = property( _get_recent_flags, _set_recent_flags, doc="Set of UIDs with the recent flag for this mailbox.") + # XXX change naming, indicate soledad query. def _get_recent_doc(self): """ - Get recent-flags document for this mailbox. + Get recent-flags document from Soledad for this mailbox. + :rtype: SoledadDocument or None """ curried = partial( self._soledad.get_from_index, @@ -1367,100 +1031,39 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, def unset_recent_flags(self, uids): """ Unset Recent flag for a sequence of uids. + + :param uids: the uids to unset + :type uid: sequence """ with self._rdoc_property_lock: - self.recent_flags = self.recent_flags.difference( + self.recent_flags.difference_update( set(uids)) + # Individual flags operations + def unset_recent_flag(self, uid): """ Unset Recent flag for a given uid. + + :param uid: the uid to unset + :type uid: int """ with self._rdoc_property_lock: - self.recent_flags = self.recent_flags.difference( + self.recent_flags.difference_update( set([uid])) + @deferred_to_thread def set_recent_flag(self, uid): """ Set Recent flag for a given uid. + + :param uid: the uid to set + :type uid: int """ with self._rdoc_property_lock: self.recent_flags = self.recent_flags.union( set([uid])) - # headers-docs-set - - def _get_hdocset(self): - """ - An accessor for the hdocs-set for this mailbox. - """ - if not self.__hdocset: - with self._hdocset_lock: - hdocset_doc = self._get_hdocset_doc() - value = set(hdocset_doc.content.get( - fields.HDOCS_SET_KEY, [])) - self.__hdocset = value - return self.__hdocset - - def _set_hdocset(self, value): - """ - Setter for the hdocs-set for this mailbox. - """ - with self._hdocset_lock: - hdocset_doc = self._get_hdocset_doc() - newv = set(value) - self.__hdocset = newv - hdocset_doc.content[fields.HDOCS_SET_KEY] = list(newv) - # XXX should deferLater 0 it? - self._soledad.put_doc(hdocset_doc) - - _hdocset = property( - _get_hdocset, _set_hdocset, - doc="Set of Document-IDs for the headers docs associated " - "with this mailbox.") - - def _get_hdocset_doc(self): - """ - Get hdocs-set document for this mailbox. - """ - curried = partial( - self._soledad.get_from_index, - fields.TYPE_MBOX_IDX, - fields.TYPE_HDOCS_SET_VAL, self.mbox) - curried.expected = "hdocset" - hdocset_doc = try_unique_query(curried) - return hdocset_doc - - # Property-set modification (protected by a different - # lock to give atomicity to the read/write operation) - - def remove_hdocset_docids(self, docids): - """ - Remove the given document IDs from the set of - header-documents associated with this mailbox. - """ - with self._hdocset_property_lock: - self._hdocset = self._hdocset.difference( - set(docids)) - - def remove_hdocset_docid(self, docid): - """ - Remove the given document ID from the set of - header-documents associated with this mailbox. - """ - with self._hdocset_property_lock: - self._hdocset = self._hdocset.difference( - set([docid])) - - def add_hdocset_docid(self, docid): - """ - Add the given document ID to the set of - header-documents associated with this mailbox. - """ - with self._hdocset_property_lock: - self._hdocset = self._hdocset.union( - set([docid])) - # individual doc getters, message layer. def _get_fdoc_from_chash(self, chash): @@ -1499,7 +1102,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, return None return fdoc.content.get(fields.UID_KEY, None) - @deferred + @deferred_to_thread def _get_uid_from_msgid(self, msgid): """ Return a UID for a given message-id. @@ -1515,24 +1118,83 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, # and we cannot find it otherwise. This seems to be enough. # XXX do a deferLater instead ?? - time.sleep(0.3) + # XXX is this working? return self._get_uid_from_msgidCb(msgid) + def set_flags(self, mbox, messages, flags, mode, observer): + """ + Set flags for a sequence of messages. + + :param mbox: the mbox this message belongs to + :type mbox: str or unicode + :param messages: the messages to iterate through + :type messages: sequence + :flags: the flags to be set + :type flags: tuple + :param mode: the mode for setting. 1 is append, -1 is remove, 0 set. + :type mode: int + :param observer: a deferred that will be called with the dictionary + mapping UIDs to flags after the operation has been + done. + :type observer: deferred + """ + # XXX we could defer *this* to thread pool, and gather results... + # XXX use deferredList + + deferreds = [] + for msg_id in messages: + deferreds.append( + self._set_flag_for_uid(msg_id, flags, mode)) + + def notify(result): + observer.callback(dict(result)) + d1 = defer.gatherResults(deferreds, consumeErrors=True) + d1.addCallback(notify) + + @deferred_to_thread + def _set_flag_for_uid(self, msg_id, flags, mode): + """ + Run the set_flag operation in the thread pool. + """ + log.msg("MSG ID = %s" % msg_id) + msg = self.get_msg_by_uid(msg_id, mem_only=True, flags_only=True) + if msg is not None: + return msg_id, msg.setFlags(flags, mode) + # getters: generic for a mailbox - def get_msg_by_uid(self, uid): + def get_msg_by_uid(self, uid, mem_only=False, flags_only=False): """ Retrieves a LeapMessage by UID. This is used primarity in the Mailbox fetch and store methods. :param uid: the message uid to query by :type uid: int + :param mem_only: a flag that indicates whether this Message should + pass a reference to soledad to retrieve missing pieces + or not. + :type mem_only: bool + :param flags_only: whether the message should carry only a reference + to the flags document. + :type flags_only: bool :return: A LeapMessage instance matching the query, or None if not found. :rtype: LeapMessage """ - msg = LeapMessage(self._soledad, uid, self.mbox, collection=self) + msg_container = self.memstore.get_message(self.mbox, uid, flags_only) + if msg_container is not None: + if mem_only: + msg = LeapMessage(None, uid, self.mbox, collection=self, + container=msg_container) + else: + # We pass a reference to soledad just to be able to retrieve + # missing parts that cannot be found in the container, like + # the content docs after a copy. + msg = LeapMessage(self._soledad, uid, self.mbox, + collection=self, container=msg_container) + else: + msg = LeapMessage(self._soledad, uid, self.mbox, collection=self) if not msg.does_exist(): return None return msg @@ -1564,40 +1226,50 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, # FIXME ---------------------------------------------- return sorted(all_docs, key=lambda item: item.content['uid']) - def all_uid_iter(self): + def all_soledad_uid_iter(self): """ - Return an iterator trhough the UIDs of all messages, sorted in + Return an iterator through the UIDs of all messages, sorted in ascending order. """ - # XXX we should get this from the uid table, local-only - all_uids = (doc.content[self.UID_KEY] for doc in - self._soledad.get_from_index( - fields.TYPE_MBOX_IDX, - fields.TYPE_FLAGS_VAL, self.mbox)) - return (u for u in sorted(all_uids)) + db_uids = set([doc.content[self.UID_KEY] for doc in + self._soledad.get_from_index( + fields.TYPE_MBOX_IDX, + fields.TYPE_FLAGS_VAL, self.mbox)]) + return db_uids - def reset_last_uid(self, param): + def all_uid_iter(self): """ - Set the last uid to the highest uid found. - Used while expunging, passed as a callback. + Return an iterator through the UIDs of all messages, from memory. """ - try: - self.last_uid = max(self.all_uid_iter()) + 1 - except ValueError: - # empty sequence - pass - return param + if self.memstore is not None: + mem_uids = self.memstore.get_uids(self.mbox) + soledad_known_uids = self.memstore.get_soledad_known_uids( + self.mbox) + combined = tuple(set(mem_uids).union(soledad_known_uids)) + return combined + # XXX MOVE to memstore def all_flags(self): """ Return a dict with all flags documents for this mailbox. """ + # XXX get all from memstore and cache it there + # FIXME should get all uids, get them fro memstore, + # and get only the missing ones from disk. + all_flags = dict((( doc.content[self.UID_KEY], doc.content[self.FLAGS_KEY]) for doc in self._soledad.get_from_index( fields.TYPE_MBOX_IDX, fields.TYPE_FLAGS_VAL, self.mbox))) + if self.memstore is not None: + uids = self.memstore.get_uids(self.mbox) + docs = ((uid, self.memstore.get_message(self.mbox, uid)) + for uid in uids) + for uid, doc in docs: + all_flags[uid] = doc.fdoc.content[self.FLAGS_KEY] + return all_flags def all_flags_chash(self): @@ -1630,9 +1302,12 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, :rtype: int """ + # XXX We should cache this in memstore too until next write... count = self._soledad.get_count_from_index( fields.TYPE_MBOX_IDX, fields.TYPE_FLAGS_VAL, self.mbox) + if self.memstore is not None: + count += self.memstore.count_new() return count # unseen messages @@ -1674,6 +1349,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, # recent messages + # XXX take it from memstore def count_recent(self): """ Count all messages with the `Recent` flag. @@ -1686,30 +1362,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, """ return len(self.recent_flags) - # deleted messages - - def deleted_iter(self): - """ - Get an iterator for the message UIDs with `deleted` flag. - - :return: iterator through deleted message docs - :rtype: iterable - """ - return (doc.content[self.UID_KEY] for doc in - self._soledad.get_from_index( - fields.TYPE_MBOX_DEL_IDX, - fields.TYPE_FLAGS_VAL, self.mbox, '1')) - - def get_deleted(self): - """ - Get all messages with the `Deleted` flag. - - :returns: a generator of LeapMessages - :rtype: generator - """ - return (LeapMessage(self._soledad, docid, self.mbox) - for docid in self.deleted_iter()) - def __len__(self): """ Returns the number of messages on this mailbox. |