path: root/src/leap/mail/imap/
diff options
authorTomás Touceda <>2014-01-09 00:22:21 -0300
committerTomás Touceda <>2014-01-09 00:22:21 -0300
commitc2cccbcd4225049b35ea2c7be4bbe899532136ca (patch)
treed3ec191d5605d2803535048c84b7b9cec3cd227a /src/leap/mail/imap/
parent3bccd3323a36f58357d68465c07595b6807e9015 (diff)
parent3bd8a7b669ac490b8f8e6c33550963c37069e383 (diff)
Merge remote-tracking branch 'refs/remotes/kali/feature/split_message_docs' into develop
Diffstat (limited to 'src/leap/mail/imap/')
1 files changed, 1346 insertions, 0 deletions
diff --git a/src/leap/mail/imap/ b/src/leap/mail/imap/
new file mode 100644
index 0000000..37e4311
--- /dev/null
+++ b/src/leap/mail/imap/
@@ -0,0 +1,1346 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2013 LEAP
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <>.
+LeapMessage and MessageCollection.
+import copy
+import logging
+import StringIO
+from collections import defaultdict, namedtuple
+from twisted.mail import imap4
+from twisted.internet import defer
+from twisted.python import log
+from u1db import errors as u1db_errors
+from zope.interface import implements
+from zope.proxy import sameProxiedObjects
+from leap.common.check import leap_assert, leap_assert_type
+from leap.common.decorators import memoized_method
+from leap.common.mail import get_email_charset
+from leap.mail import walk
+from leap.mail.utils import first
+from leap.mail.decorators import deferred
+from leap.mail.imap.index import IndexedDB
+from leap.mail.imap.fields import fields, WithMsgFields
+from leap.mail.imap.parser import MailParser, MBoxParser
+from leap.mail.messageflow import IMessageConsumer, MessageProducer
+logger = logging.getLogger(__name__)
+# TODO ------------------------------------------------------------
+# [ ] Add linked-from info.
+# [ ] Delete incoming mail only after successful write!
+# [ ] Remove UID from syncable db. Store only those indexes locally.
+# [ ] Send patch to twisted for bug in (content-type can be
+# none? lower-case?)
+def lowerdict(_dict):
+ """
+ Return a dict with the keys in lowercase.
+ :param _dict: the dict to convert
+ :rtype: dict
+ """
+ return dict((key.lower(), value)
+ for key, value in _dict.items())
+class MessagePart(object):
+ """
+ IMessagePart implementor.
+ It takes a subpart message and is able to find
+ the inner parts.
+ Excusatio non petita: see the interface documentation.
+ """
+ implements(imap4.IMessagePart)
+ def __init__(self, soledad, part_map):
+ """
+ Initializes the MessagePart.
+ :param part_map: a dictionary containing the parts map for this
+ message
+ :type part_map: dict
+ """
+ # TODO
+ # It would be good to pass the uid/mailbox also
+ # for references while debugging.
+ # We have a problem on bulk moves, and is
+ # that when the fetch on the new mailbox is done
+ # the parts maybe are not complete.
+ # So we should be able to fail with empty
+ # docs until we solve that. The ideal would be
+ # to gather the results of the deferred operations
+ # to signal the operation is complete.
+ #leap_assert(part_map, "part map dict cannot be null")
+ self._soledad = soledad
+ self._pmap = part_map
+ def getSize(self):
+ """
+ Return the total size, in octets, of this message part.
+ :return: size of the message, in octets
+ :rtype: int
+ """
+ if not self._pmap:
+ return 0
+ size = self._pmap.get('size', None)
+ if not size:
+ logger.error("Message part cannot find size in the partmap")
+ return size
+ def getBodyFile(self):
+ """
+ Retrieve a file object containing only the body of this message.
+ :return: file-like object opened for reading
+ :rtype: StringIO
+ """
+ fd = StringIO.StringIO()
+ if self._pmap:
+ multi = self._pmap.get('multi')
+ if not multi:
+ phash = self._pmap.get("phash", None)
+ else:
+ pmap = self._pmap.get('part_map')
+ first_part = pmap.get('1', None)
+ if first_part:
+ phash = first_part['phash']
+ if not phash:
+ logger.warning("Could not find phash for this subpart!")
+ payload = str("")
+ else:
+ payload = self._get_payload_from_document(phash)
+ else:
+ logger.warning("Message with no part_map!")
+ payload = str("")
+ if payload:
+ #headers = self.getHeaders(True)
+ #headers = lowerdict(headers)
+ #content_type = headers.get('content-type', "")
+ content_type = self._get_ctype_from_document(phash)
+ charset_split = content_type.split('charset=')
+ # XXX fuck all this, use a regex!
+ if len(charset_split) > 1:
+ charset = charset_split[1]
+ if charset:
+ charset = charset.strip()
+ else:
+ charset = None
+ if not charset:
+ charset = self._get_charset(payload)
+ try:
+ payload = payload.encode(charset)
+ except (UnicodeEncodeError, UnicodeDecodeError) as e:
+ logger.error("Unicode error {0}".format(e))
+ payload = payload.encode(charset, 'replace')
+ fd.write(payload)
+ return fd
+ # TODO cache the phash retrieval
+ def _get_payload_from_document(self, phash):
+ """
+ Gets the message payload from the content document.
+ :param phash: the payload hash to retrieve by.
+ :type phash: basestring
+ """
+ cdocs = self._soledad.get_from_index(
+ fields.TYPE_P_HASH_IDX,
+ fields.TYPE_CONTENT_VAL, str(phash))
+ cdoc = first(cdocs)
+ if not cdoc:
+ logger.warning(
+ "Could not find the content doc "
+ "for phash %s" % (phash,))
+ payload = cdoc.content.get(fields.RAW_KEY, "")
+ return payload
+ # TODO cache the pahash retrieval
+ def _get_ctype_from_document(self, phash):
+ """
+ Gets the content-type from the content document.
+ :param phash: the payload hash to retrieve by.
+ :type phash: basestring
+ """
+ cdocs = self._soledad.get_from_index(
+ fields.TYPE_P_HASH_IDX,
+ fields.TYPE_CONTENT_VAL, str(phash))
+ cdoc = first(cdocs)
+ if not cdoc:
+ logger.warning(
+ "Could not find the content doc "
+ "for phash %s" % (phash,))
+ ctype = cdoc.content.get('ctype', "")
+ return ctype
+ @memoized_method
+ def _get_charset(self, stuff):
+ # TODO put in a common class with LeapMessage
+ """
+ Gets (guesses?) the charset of a payload.
+ :param stuff: the stuff to guess about.
+ :type stuff: basestring
+ :returns: charset
+ """
+ # XXX existential doubt 2. shouldn't we make the scope
+ # of the decorator somewhat more persistent?
+ # ah! yes! and put memory bounds.
+ return get_email_charset(unicode(stuff))
+ def getHeaders(self, negate, *names):
+ """
+ Retrieve a group of message headers.
+ :param names: The names of the headers to retrieve or omit.
+ :type names: tuple of str
+ :param negate: If True, indicates that the headers listed in names
+ should be omitted from the return value, rather
+ than included.
+ :type negate: bool
+ :return: A mapping of header field names to header field values
+ :rtype: dict
+ """
+ if not self._pmap:
+ logger.warning("No pmap in Subpart!")
+ return {}
+ headers = dict(self._pmap.get("headers", []))
+ # twisted imap server expects *some* headers to be lowercase
+ # We could use a CaseInsensitiveDict here...
+ headers = dict(
+ (str(key), str(value)) if key.lower() != "content-type"
+ else (str(key.lower()), str(value))
+ for (key, value) in headers.items())
+ names = map(lambda s: s.upper(), names)
+ if negate:
+ cond = lambda key: key.upper() not in names
+ else:
+ cond = lambda key: key.upper() in names
+ # unpack and filter original dict by negate-condition
+ filter_by_cond = [
+ map(str, (key, val)) for
+ key, val in headers.items()
+ if cond(key)]
+ filtered = dict(filter_by_cond)
+ return filtered
+ def isMultipart(self):
+ """
+ Return True if this message is multipart.
+ """
+ if not self._pmap:
+ logger.warning("Could not get part map!")
+ return False
+ multi = self._pmap.get("multi", False)
+ return multi
+ def getSubPart(self, part):
+ """
+ Retrieve a MIME submessage
+ :type part: C{int}
+ :param part: The number of the part to retrieve, indexed from 0.
+ :raise IndexError: Raised if the specified part does not exist.
+ :raise TypeError: Raised if this message is not multipart.
+ :rtype: Any object implementing C{IMessagePart}.
+ :return: The specified sub-part.
+ """
+ if not self.isMultipart():
+ raise TypeError
+ sub_pmap = self._pmap.get("part_map", {})
+ try:
+ part_map = sub_pmap[str(part + 1)]
+ except KeyError:
+ logger.debug("getSubpart for %s: KeyError" % (part,))
+ raise IndexError
+ # XXX check for validity
+ return MessagePart(self._soledad, part_map)
+class LeapMessage(fields, MailParser, MBoxParser):
+ """
+ The main representation of a message.
+ It indexes the messages in one mailbox by a combination
+ of uid+mailbox name.
+ """
+ # TODO this has to change.
+ # Should index primarily by chash, and keep a local-lonly
+ # UID table.
+ implements(imap4.IMessage)
+ def __init__(self, soledad, uid, mbox):
+ """
+ Initializes a LeapMessage.
+ :param soledad: a Soledad instance
+ :type soledad: Soledad
+ :param uid: the UID for the message.
+ :type uid: int or basestring
+ :param mbox: the mbox this message belongs to
+ :type mbox: basestring
+ """
+ MailParser.__init__(self)
+ self._soledad = soledad
+ self._uid = int(uid)
+ self._mbox = self._parse_mailbox_name(mbox)
+ self.__chash = None
+ self.__bdoc = None
+ @property
+ def _fdoc(self):
+ """
+ An accessor to the flags document.
+ """
+ if all(map(bool, (self._uid, self._mbox))):
+ fdoc = self._get_flags_doc()
+ if fdoc:
+ self.__chash = fdoc.content.get(
+ fields.CONTENT_HASH_KEY, None)
+ return fdoc
+ @property
+ def _chash(self):
+ """
+ An accessor to the content hash for this message.
+ """
+ if not self._fdoc:
+ return None
+ if not self.__chash and self._fdoc:
+ self.__chash = self._fdoc.content.get(
+ fields.CONTENT_HASH_KEY, None)
+ return self.__chash
+ @property
+ def _hdoc(self):
+ """
+ An accessor to the headers document.
+ """
+ return self._get_headers_doc()
+ @property
+ def _bdoc(self):
+ """
+ An accessor to the body document.
+ """
+ if not self._hdoc:
+ return None
+ if not self.__bdoc:
+ self.__bdoc = self._get_body_doc()
+ return self.__bdoc
+ # IMessage implementation
+ def getUID(self):
+ """
+ Retrieve the unique identifier associated with this message
+ :return: uid for this message
+ :rtype: int
+ """
+ return self._uid
+ def getFlags(self):
+ """
+ Retrieve the flags associated with this message
+ :return: The flags, represented as strings
+ :rtype: tuple
+ """
+ if self._uid is None:
+ return []
+ flags = []
+ fdoc = self._fdoc
+ if fdoc:
+ flags = fdoc.content.get(self.FLAGS_KEY, None)
+ if flags:
+ flags = map(str, flags)
+ return tuple(flags)
+ # setFlags, addFlags, removeFlags are not in the interface spec
+ # but we use them with store command.
+ def setFlags(self, flags):
+ """
+ Sets the flags for this message
+ Returns a SoledadDocument that needs to be updated by the caller.
+ :param flags: the flags to update in the message.
+ :type flags: tuple of str
+ :return: a SoledadDocument instance
+ :rtype: SoledadDocument
+ """
+ leap_assert(isinstance(flags, tuple), "flags need to be a tuple")
+ log.msg('setting flags: %s' % (self._uid))
+ doc = self._fdoc
+ if not doc:
+ logger.warning(
+ "Could not find FDOC for %s:%s while setting flags!" %
+ (self._mbox, self._uid))
+ return
+ doc.content[self.FLAGS_KEY] = flags
+ doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags
+ doc.content[self.RECENT_KEY] = self.RECENT_FLAG in flags
+ doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags
+ self._soledad.put_doc(doc)
+ def addFlags(self, flags):
+ """
+ Adds flags to this message.
+ Returns a SoledadDocument that needs to be updated by the caller.
+ :param flags: the flags to add to the message.
+ :type flags: tuple of str
+ :return: a SoledadDocument instance
+ :rtype: SoledadDocument
+ """
+ leap_assert(isinstance(flags, tuple), "flags need to be a tuple")
+ oldflags = self.getFlags()
+ self.setFlags(tuple(set(flags + oldflags)))
+ def removeFlags(self, flags):
+ """
+ Remove flags from this message.
+ Returns a SoledadDocument that needs to be updated by the caller.
+ :param flags: the flags to be removed from the message.
+ :type flags: tuple of str
+ :return: a SoledadDocument instance
+ :rtype: SoledadDocument
+ """
+ leap_assert(isinstance(flags, tuple), "flags need to be a tuple")
+ oldflags = self.getFlags()
+ self.setFlags(tuple(set(oldflags) - set(flags)))
+ def getInternalDate(self):
+ """
+ Retrieve the date internally associated with this message
+ :rtype: C{str}
+ :return: An RFC822-formatted date string.
+ """
+ return str(self._hdoc.content.get(self.DATE_KEY, ''))
+ #
+ # IMessagePart
+ #
+ # XXX we should implement this interface too for the subparts
+ # so we allow nested parts...
+ def getBodyFile(self):
+ """
+ Retrieve a file object containing only the body of this message.
+ :return: file-like object opened for reading
+ :rtype: StringIO
+ """
+ fd = StringIO.StringIO()
+ bdoc = self._bdoc
+ if bdoc:
+ body = str(self._bdoc.content.get(self.RAW_KEY, ""))
+ else:
+ logger.warning("No BDOC found for message.")
+ body = str("")
+ # XXX not needed, isn't it? ---- ivan?
+ #if bdoc:
+ #content_type = bdoc.content.get('content-type', "")
+ #charset = content_type.split('charset=')[1]
+ #if charset:
+ #charset = charset.strip()
+ #if not charset:
+ #charset = self._get_charset(body)
+ #try:
+ #body = str(body.encode(charset))
+ #except (UnicodeEncodeError, UnicodeDecodeError) as e:
+ #logger.error("Unicode error {0}".format(e))
+ #body = str(body.encode(charset, 'replace'))
+ fd.write(body)
+ return fd
+ @memoized_method
+ def _get_charset(self, stuff):
+ """
+ Gets (guesses?) the charset of a payload.
+ :param stuff: the stuff to guess about.
+ :type stuff: basestring
+ :returns: charset
+ """
+ # TODO get from subpart headers
+ # XXX existential doubt 2. shouldn't we make the scope
+ # of the decorator somewhat more persistent?
+ # ah! yes! and put memory bounds.
+ return get_email_charset(unicode(stuff))
+ def getSize(self):
+ """
+ Return the total size, in octets, of this message.
+ :return: size of the message, in octets
+ :rtype: int
+ """
+ size = None
+ if self._fdoc:
+ size = self._fdoc.content.get(self.SIZE_KEY, False)
+ else:
+ logger.warning("No FLAGS doc for %s:%s" % (self._mbox,
+ self._uid))
+ if not size:
+ # XXX fallback, should remove when all migrated.
+ size = self.getBodyFile().len
+ return size
+ def getHeaders(self, negate, *names):
+ """
+ Retrieve a group of message headers.
+ :param names: The names of the headers to retrieve or omit.
+ :type names: tuple of str
+ :param negate: If True, indicates that the headers listed in names
+ should be omitted from the return value, rather
+ than included.
+ :type negate: bool
+ :return: A mapping of header field names to header field values
+ :rtype: dict
+ """
+ # TODO split in smaller methods
+ headers = self._get_headers()
+ if not headers:
+ logger.warning("No headers found")
+ return {str('content-type'): str('')}
+ names = map(lambda s: s.upper(), names)
+ if negate:
+ cond = lambda key: key.upper() not in names
+ else:
+ cond = lambda key: key.upper() in names
+ if isinstance(headers, list):
+ headers = dict(headers)
+ # twisted imap server expects *some* headers to be lowercase
+ # XXX refactor together with MessagePart method
+ headers = dict(
+ (str(key), str(value)) if key.lower() != "content-type"
+ else (str(key.lower()), str(value))
+ for (key, value) in headers.items())
+ # unpack and filter original dict by negate-condition
+ filter_by_cond = [(key, val) for key, val
+ in headers.items() if cond(key)]
+ return dict(filter_by_cond)
+ def _get_headers(self):
+ """
+ Return the headers dict for this message.
+ """
+ if self._hdoc is not None:
+ headers = self._hdoc.content.get(self.HEADERS_KEY, {})
+ return headers
+ else:
+ logger.warning(
+ "No HEADERS doc for msg %s:%s" % (
+ self._mbox,
+ self._uid))
+ def isMultipart(self):
+ """
+ Return True if this message is multipart.
+ """
+ if self._fdoc:
+ is_multipart = self._fdoc.content.get(self.MULTIPART_KEY, False)
+ return is_multipart
+ else:
+ logger.warning(
+ "No FLAGS doc for msg %s:%s" % (
+ self._mbox,
+ self._uid))
+ def getSubPart(self, part):
+ """
+ Retrieve a MIME submessage
+ :type part: C{int}
+ :param part: The number of the part to retrieve, indexed from 0.
+ :raise IndexError: Raised if the specified part does not exist.
+ :raise TypeError: Raised if this message is not multipart.
+ :rtype: Any object implementing C{IMessagePart}.
+ :return: The specified sub-part.
+ """
+ if not self.isMultipart():
+ raise TypeError
+ try:
+ pmap_dict = self._get_part_from_parts_map(part + 1)
+ except KeyError:
+ logger.debug("getSubpart for %s: KeyError" % (part,))
+ raise IndexError
+ return MessagePart(self._soledad, pmap_dict)
+ #
+ # accessors
+ #
+ def _get_part_from_parts_map(self, part):
+ """
+ Get a part map from the headers doc
+ :raises: KeyError if key does not exist
+ :rtype: dict
+ """
+ if not self._hdoc:
+ logger.warning("Tried to get part but no HDOC found!")
+ return None
+ pmap = self._hdoc.content.get(fields.PARTS_MAP_KEY, {})
+ return pmap[str(part)]
+ def _get_flags_doc(self):
+ """
+ Return the document that keeps the flags for this
+ message.
+ """
+ flag_docs = self._soledad.get_from_index(
+ fields.TYPE_FLAGS_VAL, self._mbox, str(self._uid))
+ return first(flag_docs)
+ def _get_headers_doc(self):
+ """
+ Return the document that keeps the headers for this
+ message.
+ """
+ head_docs = self._soledad.get_from_index(
+ fields.TYPE_C_HASH_IDX,
+ fields.TYPE_HEADERS_VAL, str(self._chash))
+ return first(head_docs)
+ def _get_body_doc(self):
+ """
+ Return the document that keeps the body for this
+ message.
+ """
+ body_phash = self._hdoc.content.get(
+ fields.BODY_KEY, None)
+ if not body_phash:
+ logger.warning("No body phash for this document!")
+ return None
+ body_docs = self._soledad.get_from_index(
+ fields.TYPE_P_HASH_IDX,
+ fields.TYPE_CONTENT_VAL, str(body_phash))
+ return first(body_docs)
+ def __getitem__(self, key):
+ """
+ Return an item from the content of the flags document,
+ for convenience.
+ :param key: The key
+ :type key: str
+ :return: The content value indexed by C{key} or None
+ :rtype: str
+ """
+ return self._fdoc.content.get(key, None)
+ # setters
+ # XXX to be used in the messagecopier interface?!
+ def set_uid(self, uid):
+ """
+ Set new uid for this message.
+ :param uid: the new uid
+ :type uid: basestring
+ """
+ # XXX dangerous! lock?
+ self._uid = uid
+ d = self._fdoc
+ d.content[self.UID_KEY] = uid
+ self._soledad.put_doc(d)
+ def set_mbox(self, mbox):
+ """
+ Set new mbox for this message.
+ :param mbox: the new mbox
+ :type mbox: basestring
+ """
+ # XXX dangerous! lock?
+ self._mbox = mbox
+ d = self._fdoc
+ d.content[self.MBOX_KEY] = mbox
+ self._soledad.put_doc(d)
+ # destructor
+ @deferred
+ def remove(self):
+ """
+ Remove all docs associated with this message.
+ """
+ # XXX For the moment we are only removing the flags and headers
+ # docs. The rest we leave there polluting your hard disk,
+ # until we think about a good way of deorphaning.
+ # Maybe a crawler of unreferenced docs.
+ # XXX implement elijah's idea of using a PUT document as a
+ # token to ensure consistency in the removal.
+ uid = self._uid
+ fd = self._get_flags_doc()
+ #hd = self._get_headers_doc()
+ #bd = self._get_body_doc()
+ #docs = [fd, hd, bd]
+ docs = [fd]
+ for d in filter(None, docs):
+ try:
+ self._soledad.delete_doc(d)
+ except Exception as exc:
+ logger.error(exc)
+ return uid
+ def does_exist(self):
+ """
+ Return True if there is actually a flags message for this
+ UID and mbox.
+ """
+ return self._fdoc is not None
+SoledadWriterPayload = namedtuple(
+ 'SoledadWriterPayload', ['mode', 'payload'])
+# TODO we could consider using enum here:
+SoledadWriterPayload.CREATE = 1
+SoledadWriterPayload.PUT = 2
+SoledadWriterPayload.CONTENT_CREATE = 3
+class SoledadDocWriter(object):
+ """
+ This writer will create docs serially in the local soledad database.
+ """
+ implements(IMessageConsumer)
+ def __init__(self, soledad):
+ """
+ Initialize the writer.
+ :param soledad: the soledad instance
+ :type soledad: Soledad
+ """
+ self._soledad = soledad
+ def _get_call_for_item(self, item):
+ """
+ Return the proper call type for a given item.
+ :param item: one of the types defined under the
+ attributes of SoledadWriterPayload
+ :type item: int
+ """
+ call = None
+ payload = item.payload
+ if item.mode == SoledadWriterPayload.CREATE:
+ call = self._soledad.create_doc
+ elif (item.mode == SoledadWriterPayload.CONTENT_CREATE
+ and not self._content_does_exist(payload)):
+ call = self._soledad.create_doc
+ elif item.mode == SoledadWriterPayload.PUT:
+ call = self._soledad.put_doc
+ return call
+ def _process(self, queue):
+ """
+ Return the item and the proper call type for the next
+ item in the queue if any.
+ :param queue: the queue from where we'll pick item.
+ :type queue: Queue
+ """
+ item = queue.get()
+ call = self._get_call_for_item(item)
+ return item, call
+ def consume(self, queue):
+ """
+ Creates a new document in soledad db.
+ :param queue: queue to get item from, with content of the document
+ to be inserted.
+ :type queue: Queue
+ """
+ empty = queue.empty()
+ while not empty:
+ item, call = self._process(queue)
+ if call:
+ # XXX should handle the delete case
+ # should handle errors
+ try:
+ call(item.payload)
+ except u1db_errors.RevisionConflict as exc:
+ logger.error("Error: %r" % (exc,))
+ raise exc
+ empty = queue.empty()
+ """
+ Message deduplication.
+ We do a query for the content hashes before writing to our beloved
+ sqlcipher backend of Soledad. This means, by now, that:
+ 1. We will not store the same attachment twice, only the hash of it.
+ 2. We will not store the same message body twice, only the hash of it.
+ The first case is useful if you are always receiving the same old memes
+ from unwary friends that still have not discovered that 4chan is the
+ generator of the internet. The second will save your day if you have
+ initiated session with the same account in two different machines. I also
+ wonder why would you do that, but let's respect each other choices, like
+ with the religious celebrations, and assume that one day we'll be able
+ to run Bitmask in completely free phones. Yes, I mean that, the whole GSM
+ Stack.
+ """
+ def _content_does_exist(self, doc):
+ """
+ Check whether we already have a content document for a payload
+ with this hash in our database.
+ :param doc: tentative body document
+ :type doc: dict
+ :returns: True if that happens, False otherwise.
+ """
+ if not doc:
+ return False
+ phash = doc[fields.PAYLOAD_HASH_KEY]
+ attach_docs = self._soledad.get_from_index(
+ fields.TYPE_P_HASH_IDX,
+ fields.TYPE_CONTENT_VAL, str(phash))
+ if not attach_docs:
+ return False
+ if len(attach_docs) != 1:
+ logger.warning("Found more than one copy of phash %s!"
+ % (phash,))
+ logger.debug("Found attachment doc with that hash! Skipping save!")
+ return True
+class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
+ """
+ A collection of messages, surprisingly.
+ It is tied to a selected mailbox name that is passed to constructor.
+ Implements a filter query over the messages contained in a soledad
+ database.
+ """
+ # XXX this should be able to produce a MessageSet methinks
+ # could validate these kinds of objects turning them
+ # into a template for the class.
+ templates = {
+ fields.TYPE_KEY: fields.TYPE_FLAGS_VAL,
+ fields.UID_KEY: 1, # XXX moe to a local table
+ fields.MBOX_KEY: fields.INBOX_VAL,
+ fields.CONTENT_HASH_KEY: "",
+ fields.SEEN_KEY: False,
+ fields.RECENT_KEY: True,
+ fields.DEL_KEY: False,
+ fields.FLAGS_KEY: [],
+ fields.MULTIPART_KEY: False,
+ fields.SIZE_KEY: 0
+ },
+ fields.TYPE_KEY: fields.TYPE_HEADERS_VAL,
+ fields.CONTENT_HASH_KEY: "",
+ fields.DATE_KEY: "",
+ fields.SUBJECT_KEY: "",
+ fields.HEADERS_KEY: {},
+ fields.PARTS_MAP_KEY: {},
+ },
+ fields.TYPE_KEY: fields.TYPE_CONTENT_VAL,
+ fields.PAYLOAD_HASH_KEY: "",
+ fields.LINKED_FROM_KEY: [],
+ fields.CTYPE_KEY: "", # should index by this too
+ # should only get inmutable headers parts
+ # (for indexing)
+ fields.HEADERS_KEY: {},
+ fields.RAW_KEY: "",
+ fields.PARTS_MAP_KEY: {},
+ fields.HEADERS_KEY: {},
+ fields.MULTIPART_KEY: False,
+ },
+ }
+ def __init__(self, mbox=None, soledad=None):
+ """
+ Constructor for MessageCollection.
+ :param mbox: the name of the mailbox. It is the name
+ with which we filter the query over the
+ messages database
+ :type mbox: str
+ :param soledad: Soledad database
+ :type soledad: Soledad instance
+ """
+ MailParser.__init__(self)
+ leap_assert(mbox, "Need a mailbox name to initialize")
+ leap_assert(mbox.strip() != "", "mbox cannot be blank space")
+ leap_assert(isinstance(mbox, (str, unicode)),
+ "mbox needs to be a string")
+ leap_assert(soledad, "Need a soledad instance to initialize")
+ # okay, all in order, keep going...
+ self.mbox = self._parse_mailbox_name(mbox)
+ self._soledad = soledad
+ self.initialize_db()
+ # I think of someone like nietzsche when reading this
+ # this will be the producer that will enqueue the content
+ # to be processed serially by the consumer (the writer). We just
+ # need to `put` the new material on its plate.
+ self.soledad_writer = MessageProducer(
+ SoledadDocWriter(soledad),
+ period=0.02)
+ def _get_empty_doc(self, _type=FLAGS_DOC):
+ """
+ Returns an empty doc for storing different message parts.
+ Defaults to returning a template for a flags document.
+ :return: a dict with the template
+ :rtype: dict
+ """
+ if not _type in self.templates.keys():
+ raise TypeError("Improper type passed to _get_empty_doc")
+ return copy.deepcopy(self.templates[_type])
+ def _do_parse(self, raw):
+ """
+ Parse raw message and return it along with
+ relevant information about its outer level.
+ :param raw: the raw message
+ :type raw: StringIO or basestring
+ :return: msg, chash, size, multi
+ :rtype: tuple
+ """
+ msg = self._get_parsed_msg(raw)
+ chash = self._get_hash(msg)
+ size = len(msg.as_string())
+ multi = msg.is_multipart()
+ return msg, chash, size, multi
+ def _populate_flags(self, flags, uid, chash, size, multi):
+ """
+ Return a flags doc.
+ XXX Missing DOC -----------
+ """
+ fd = self._get_empty_doc(self.FLAGS_DOC)
+ fd[self.MBOX_KEY] = self.mbox
+ fd[self.UID_KEY] = uid
+ fd[self.CONTENT_HASH_KEY] = chash
+ fd[self.SIZE_KEY] = size
+ fd[self.MULTIPART_KEY] = multi
+ if flags:
+ fd[self.FLAGS_KEY] = map(self._stringify, flags)
+ fd[self.SEEN_KEY] = self.SEEN_FLAG in flags
+ fd[self.DEL_KEY] = self.DELETED_FLAG in flags
+ fd[self.RECENT_KEY] = True # set always by default
+ return fd
+ def _populate_headr(self, msg, chash, subject, date):
+ """
+ Return a headers doc.
+ XXX Missing DOC -----------
+ """
+ headers = defaultdict(list)
+ for k, v in msg.items():
+ headers[k].append(v)
+ # "fix" for repeated headers.
+ for k, v in headers.items():
+ newline = "\n%s: " % (k,)
+ headers[k] = newline.join(v)
+ hd = self._get_empty_doc(self.HEADERS_DOC)
+ hd[self.CONTENT_HASH_KEY] = chash
+ hd[self.HEADERS_KEY] = headers
+ if not subject and self.SUBJECT_FIELD in headers:
+ hd[self.SUBJECT_KEY] = first(headers[self.SUBJECT_FIELD])
+ else:
+ hd[self.SUBJECT_KEY] = subject
+ if not date and self.DATE_FIELD in headers:
+ hd[self.DATE_KEY] = first(headers[self.DATE_FIELD])
+ else:
+ hd[self.DATE_KEY] = date
+ return hd
+ @deferred
+ def add_msg(self, raw, subject=None, flags=None, date=None, uid=1):
+ """
+ Creates a new message document.
+ :param raw: the raw message
+ :type raw: str
+ :param subject: subject of the message.
+ :type subject: str
+ :param flags: flags
+ :type flags: list
+ :param date: the received date for the message
+ :type date: str
+ :param uid: the message uid for this mailbox
+ :type uid: int
+ """
+ # TODO signal that we can delete the original message!-----
+ # when all the processing is done.
+ # TODO add the linked-from info !
+ logger.debug('adding message')
+ if flags is None:
+ flags = tuple()
+ leap_assert_type(flags, tuple)
+ # parse
+ msg, chash, size, multi = self._do_parse(raw)
+ fd = self._populate_flags(flags, uid, chash, size, multi)
+ hd = self._populate_headr(msg, chash, subject, date)
+ parts = walk.get_parts(msg)
+ body_phash_fun = [walk.get_body_phash_simple,
+ walk.get_body_phash_multi][int(multi)]
+ body_phash = body_phash_fun(walk.get_payloads(msg))
+ parts_map = walk.walk_msg_tree(parts, body_phash=body_phash)
+ # add parts map to header doc
+ # (body, multi, part_map)
+ for key in parts_map:
+ hd[key] = parts_map[key]
+ del parts_map
+ docs = [fd, hd]
+ cdocs = walk.get_raw_docs(msg, parts)
+ # Saving
+ logger.debug('enqueuing message docs for write')
+ ptuple = SoledadWriterPayload
+ # first, regular docs: flags and headers
+ for doc in docs:
+ self.soledad_writer.put(ptuple(
+ mode=ptuple.CREATE, payload=doc))
+ # and last, but not least, try to create
+ # content docs if not already there.
+ for cd in cdocs:
+ self.soledad_writer.put(ptuple(
+ mode=ptuple.CONTENT_CREATE, payload=cd))
+ def _remove_cb(self, result):
+ return result
+ def remove_all_deleted(self):
+ """
+ Removes all messages flagged as deleted.
+ """
+ delete_deferl = []
+ for msg in self.get_deleted():
+ delete_deferl.append(msg.remove())
+ d1 = defer.gatherResults(delete_deferl, consumeErrors=True)
+ d1.addCallback(self._remove_cb)
+ return d1
+ def remove(self, msg):
+ """
+ Remove a given msg.
+ :param msg: the message to be removed
+ :type msg: LeapMessage
+ """
+ d = msg.remove()
+ d.addCallback(self._remove_cb)
+ return d
+ # getters
+ def get_msg_by_uid(self, uid):
+ """
+ Retrieves a LeapMessage by UID.
+ :param uid: the message uid to query by
+ :type uid: int
+ :return: A LeapMessage instance matching the query,
+ or None if not found.
+ :rtype: LeapMessage
+ """
+ msg = LeapMessage(self._soledad, uid, self.mbox)
+ if not msg.does_exist():
+ return None
+ return msg
+ def get_all_docs(self, _type=fields.TYPE_FLAGS_VAL):
+ """
+ Get all documents for the selected mailbox of the
+ passed type. By default, it returns the flag docs.
+ If you want acess to the content, use __iter__ instead
+ :return: a list of u1db documents
+ :rtype: list of SoledadDocument
+ """
+ if _type not in fields.__dict__.values():
+ raise TypeError("Wrong type passed to get_all_docs")
+ if sameProxiedObjects(self._soledad, None):
+ logger.warning('Tried to get messages but soledad is None!')
+ return []
+ all_docs = [doc for doc in self._soledad.get_from_index(
+ fields.TYPE_MBOX_IDX,
+ _type, self.mbox)]
+ # inneficient, but first let's grok it and then
+ # let's worry about efficiency.
+ # XXX FIXINDEX -- should implement order by in soledad
+ return sorted(all_docs, key=lambda item: item.content['uid'])
+ def all_msg_iter(self):
+ """
+ Return an iterator trhough the UIDs of all messages, sorted in
+ ascending order.
+ """
+ all_uids = (doc.content[self.UID_KEY] for doc in
+ self._soledad.get_from_index(
+ fields.TYPE_MBOX_IDX,
+ fields.TYPE_FLAGS_VAL, self.mbox))
+ return (u for u in sorted(all_uids))
+ def count(self):
+ """
+ Return the count of messages for this mailbox.
+ :rtype: int
+ """
+ count = self._soledad.get_count_from_index(
+ fields.TYPE_MBOX_IDX,
+ fields.TYPE_FLAGS_VAL, self.mbox)
+ return count
+ # unseen messages
+ def unseen_iter(self):
+ """
+ Get an iterator for the message UIDs with no `seen` flag
+ for this mailbox.
+ :return: iterator through unseen message doc UIDs
+ :rtype: iterable
+ """
+ return (doc.content[self.UID_KEY] for doc in
+ self._soledad.get_from_index(
+ fields.TYPE_FLAGS_VAL, self.mbox, '0'))
+ def count_unseen(self):
+ """
+ Count all messages with the `Unseen` flag.
+ :returns: count
+ :rtype: int
+ """
+ count = self._soledad.get_count_from_index(
+ fields.TYPE_FLAGS_VAL, self.mbox, '0')
+ return count
+ def get_unseen(self):
+ """
+ Get all messages with the `Unseen` flag
+ :returns: a list of LeapMessages
+ :rtype: list
+ """
+ return [LeapMessage(self._soledad, docid, self.mbox)
+ for docid in self.unseen_iter()]
+ # recent messages
+ def recent_iter(self):
+ """
+ Get an iterator for the message UIDs with `recent` flag.
+ :return: iterator through recent message docs
+ :rtype: iterable
+ """
+ return (doc.content[self.UID_KEY] for doc in
+ self._soledad.get_from_index(
+ fields.TYPE_FLAGS_VAL, self.mbox, '1'))
+ def get_recent(self):
+ """
+ Get all messages with the `Recent` flag.
+ :returns: a list of LeapMessages
+ :rtype: list
+ """
+ return [LeapMessage(self._soledad, docid, self.mbox)
+ for docid in self.recent_iter()]
+ def count_recent(self):
+ """
+ Count all messages with the `Recent` flag.
+ :returns: count
+ :rtype: int
+ """
+ count = self._soledad.get_count_from_index(
+ fields.TYPE_FLAGS_VAL, self.mbox, '1')
+ return count
+ # deleted messages
+ def deleted_iter(self):
+ """
+ Get an iterator for the message UIDs with `deleted` flag.
+ :return: iterator through deleted message docs
+ :rtype: iterable
+ """
+ return (doc.content[self.UID_KEY] for doc in
+ self._soledad.get_from_index(
+ fields.TYPE_FLAGS_VAL, self.mbox, '1'))
+ def get_deleted(self):
+ """
+ Get all messages with the `Deleted` flag.
+ :returns: a generator of LeapMessages
+ :rtype: generator
+ """
+ return (LeapMessage(self._soledad, docid, self.mbox)
+ for docid in self.deleted_iter())
+ def __len__(self):
+ """
+ Returns the number of messages on this mailbox.
+ :rtype: int
+ """
+ return self.count()
+ def __iter__(self):
+ """
+ Returns an iterator over all messages.
+ :returns: iterator of dicts with content for all messages.
+ :rtype: iterable
+ """
+ return (LeapMessage(self._soledad, docuid, self.mbox)
+ for docuid in self.all_msg_iter())
+ def __repr__(self):
+ """
+ Representation string for this object.
+ """
+ return u"<MessageCollection: mbox '%s' (%s)>" % (
+ self.mbox, self.count())
+ # XXX should implement __eq__ also !!!
+ # --- use the content hash for that, will be used for dedup.