diff options
Diffstat (limited to 'mail')
| -rw-r--r-- | mail/src/leap/mail/imap/messageparts.py | 262 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/messages.py | 423 | 
2 files changed, 286 insertions, 399 deletions
| diff --git a/mail/src/leap/mail/imap/messageparts.py b/mail/src/leap/mail/imap/messageparts.py new file mode 100644 index 0000000..a47ea1d --- /dev/null +++ b/mail/src/leap/mail/imap/messageparts.py @@ -0,0 +1,262 @@ +# -*- coding: utf-8 -*- +# messageparts.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. +""" +MessagePart implementation. Used from LeapMessage. +""" +import logging +import re +import StringIO + +from enum import Enum +from zope.interface import implements +from twisted.mail import imap4 + +from leap.common.decorators import memoized_method +from leap.common.mail import get_email_charset +from leap.mail.imap.fields import fields +from leap.mail.utils import first + +MessagePartType = Enum("hdoc", "fdoc", "cdoc") + + +logger = logging.getLogger(__name__) + + +CHARSET_PATTERN = r"""charset=([\w-]+)""" +CHARSET_RE = re.compile(CHARSET_PATTERN, re.IGNORECASE) + + +class MessagePart(object): +    """ +    IMessagePart implementor. +    It takes a subpart message and is able to find +    the inner parts. + +    Excusatio non petita: see the interface documentation. +    """ + +    implements(imap4.IMessagePart) + +    def __init__(self, soledad, part_map): +        """ +        Initializes the MessagePart. + +        :param part_map: a dictionary containing the parts map for this +                         message +        :type part_map: dict +        """ +        # TODO +        # It would be good to pass the uid/mailbox also +        # for references while debugging. + +        # We have a problem on bulk moves, and is +        # that when the fetch on the new mailbox is done +        # the parts maybe are not complete. +        # So we should be able to fail with empty +        # docs until we solve that. The ideal would be +        # to gather the results of the deferred operations +        # to signal the operation is complete. +        #leap_assert(part_map, "part map dict cannot be null") +        self._soledad = soledad +        self._pmap = part_map + +    def getSize(self): +        """ +        Return the total size, in octets, of this message part. + +        :return: size of the message, in octets +        :rtype: int +        """ +        if not self._pmap: +            return 0 +        size = self._pmap.get('size', None) +        if not size: +            logger.error("Message part cannot find size in the partmap") +        return size + +    def getBodyFile(self): +        """ +        Retrieve a file object containing only the body of this message. + +        :return: file-like object opened for reading +        :rtype: StringIO +        """ +        fd = StringIO.StringIO() +        if self._pmap: +            multi = self._pmap.get('multi') +            if not multi: +                phash = self._pmap.get("phash", None) +            else: +                pmap = self._pmap.get('part_map') +                first_part = pmap.get('1', None) +                if first_part: +                    phash = first_part['phash'] + +            if not phash: +                logger.warning("Could not find phash for this subpart!") +                payload = str("") +            else: +                payload = self._get_payload_from_document(phash) + +        else: +            logger.warning("Message with no part_map!") +            payload = str("") + +        if payload: +            content_type = self._get_ctype_from_document(phash) +            charset = first(CHARSET_RE.findall(content_type)) +            logger.debug("Got charset from header: %s" % (charset,)) +            if not charset: +                charset = self._get_charset(payload) +            try: +                payload = payload.encode(charset) +            except UnicodeError as exc: +                logger.error("Unicode error {0}".format(exc)) +                payload = payload.encode(charset, 'replace') + +        fd.write(payload) +        fd.seek(0) +        return fd + +    # TODO cache the phash retrieval +    def _get_payload_from_document(self, phash): +        """ +        Gets the message payload from the content document. + +        :param phash: the payload hash to retrieve by. +        :type phash: basestring +        """ +        cdocs = self._soledad.get_from_index( +            fields.TYPE_P_HASH_IDX, +            fields.TYPE_CONTENT_VAL, str(phash)) + +        cdoc = first(cdocs) +        if not cdoc: +            logger.warning( +                "Could not find the content doc " +                "for phash %s" % (phash,)) +        payload = cdoc.content.get(fields.RAW_KEY, "") +        return payload + +    # TODO cache the pahash retrieval +    def _get_ctype_from_document(self, phash): +        """ +        Gets the content-type from the content document. + +        :param phash: the payload hash to retrieve by. +        :type phash: basestring +        """ +        cdocs = self._soledad.get_from_index( +            fields.TYPE_P_HASH_IDX, +            fields.TYPE_CONTENT_VAL, str(phash)) + +        cdoc = first(cdocs) +        if not cdoc: +            logger.warning( +                "Could not find the content doc " +                "for phash %s" % (phash,)) +        ctype = cdoc.content.get('ctype', "") +        return ctype + +    @memoized_method +    def _get_charset(self, stuff): +        # TODO put in a common class with LeapMessage +        """ +        Gets (guesses?) the charset of a payload. + +        :param stuff: the stuff to guess about. +        :type stuff: basestring +        :returns: charset +        """ +        # XXX existential doubt 2. shouldn't we make the scope +        # of the decorator somewhat more persistent? +        # ah! yes! and put memory bounds. +        return get_email_charset(unicode(stuff)) + +    def getHeaders(self, negate, *names): +        """ +        Retrieve a group of message headers. + +        :param names: The names of the headers to retrieve or omit. +        :type names: tuple of str + +        :param negate: If True, indicates that the headers listed in names +                       should be omitted from the return value, rather +                       than included. +        :type negate: bool + +        :return: A mapping of header field names to header field values +        :rtype: dict +        """ +        if not self._pmap: +            logger.warning("No pmap in Subpart!") +            return {} +        headers = dict(self._pmap.get("headers", [])) + +        # twisted imap server expects *some* headers to be lowercase +        # We could use a CaseInsensitiveDict here... +        headers = dict( +            (str(key), str(value)) if key.lower() != "content-type" +            else (str(key.lower()), str(value)) +            for (key, value) in headers.items()) + +        names = map(lambda s: s.upper(), names) +        if negate: +            cond = lambda key: key.upper() not in names +        else: +            cond = lambda key: key.upper() in names + +        # unpack and filter original dict by negate-condition +        filter_by_cond = [ +            map(str, (key, val)) for +            key, val in headers.items() +            if cond(key)] +        filtered = dict(filter_by_cond) +        return filtered + +    def isMultipart(self): +        """ +        Return True if this message is multipart. +        """ +        if not self._pmap: +            logger.warning("Could not get part map!") +            return False +        multi = self._pmap.get("multi", False) +        return multi + +    def getSubPart(self, part): +        """ +        Retrieve a MIME submessage + +        :type part: C{int} +        :param part: The number of the part to retrieve, indexed from 0. +        :raise IndexError: Raised if the specified part does not exist. +        :raise TypeError: Raised if this message is not multipart. +        :rtype: Any object implementing C{IMessagePart}. +        :return: The specified sub-part. +        """ +        if not self.isMultipart(): +            raise TypeError +        sub_pmap = self._pmap.get("part_map", {}) +        try: +            part_map = sub_pmap[str(part + 1)] +        except KeyError: +            logger.debug("getSubpart for %s: KeyError" % (part,)) +            raise IndexError + +        # XXX check for validity +        return MessagePart(self._soledad, part_map) diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py index ef0b0a1..67e5a41 100644 --- a/mail/src/leap/mail/imap/messages.py +++ b/mail/src/leap/mail/imap/messages.py @@ -24,13 +24,12 @@ import time  import threading  import StringIO -from collections import defaultdict, namedtuple +from collections import defaultdict  from functools import partial  from twisted.mail import imap4  from twisted.internet import defer  from twisted.python import log -from u1db import errors as u1db_errors  from zope.interface import implements  from zope.proxy import sameProxiedObjects @@ -38,13 +37,12 @@ from leap.common.check import leap_assert, leap_assert_type  from leap.common.decorators import memoized_method  from leap.common.mail import get_email_charset  from leap.mail import walk -from leap.mail.utils import first, find_charset +from leap.mail.utils import first, find_charset, lowerdict  from leap.mail.decorators import deferred  from leap.mail.imap.index import IndexedDB  from leap.mail.imap.fields import fields, WithMsgFields  from leap.mail.imap.memorystore import MessageDict  from leap.mail.imap.parser import MailParser, MBoxParser -from leap.mail.messageflow import IMessageConsumer  logger = logging.getLogger(__name__) @@ -52,29 +50,18 @@ logger = logging.getLogger(__name__)  # [ ] Add ref to incoming message during add_msg  # [ ] Add linked-from info. +#     * Need a new type of documents: linkage info. +#     * HDOCS are linked from FDOCs (ref to chash) +#     * CDOCS are linked from HDOCS (ref to chash) +  # [ ] Delete incoming mail only after successful write!  # [ ] Remove UID from syncable db. Store only those indexes locally. +CHARSET_PATTERN = r"""charset=([\w-]+)""" +MSGID_PATTERN = r"""<([\w@.]+)>""" -# XXX no longer needed, since i'm using proxies instead of direct weakrefs -def maybe_call(thing): -    """ -    Return the same thing, or the result of its invocation if it is a callable. -    """ -    return thing() if callable(thing) else thing - - -def lowerdict(_dict): -    """ -    Return a dict with the keys in lowercase. - -    :param _dict: the dict to convert -    :rtype: dict -    """ -    # TODO should properly implement a CaseInsensitive dict. -    # Look into requests code. -    return dict((key.lower(), value) -                for key, value in _dict.items()) +CHARSET_RE = re.compile(CHARSET_PATTERN, re.IGNORECASE) +MSGID_RE = re.compile(MSGID_PATTERN)  def try_unique_query(curried): @@ -102,232 +89,6 @@ def try_unique_query(curried):      except Exception as exc:          logger.exception("Unhandled error %r" % exc) -MSGID_PATTERN = r"""<([\w@.]+)>""" -MSGID_RE = re.compile(MSGID_PATTERN) - - -class MessagePart(object): -    """ -    IMessagePart implementor. -    It takes a subpart message and is able to find -    the inner parts. - -    Excusatio non petita: see the interface documentation. -    """ - -    implements(imap4.IMessagePart) - -    def __init__(self, soledad, part_map): -        """ -        Initializes the MessagePart. - -        :param part_map: a dictionary containing the parts map for this -                         message -        :type part_map: dict -        """ -        # TODO -        # It would be good to pass the uid/mailbox also -        # for references while debugging. - -        # We have a problem on bulk moves, and is -        # that when the fetch on the new mailbox is done -        # the parts maybe are not complete. -        # So we should be able to fail with empty -        # docs until we solve that. The ideal would be -        # to gather the results of the deferred operations -        # to signal the operation is complete. -        #leap_assert(part_map, "part map dict cannot be null") -        self._soledad = soledad -        self._pmap = part_map - -    def getSize(self): -        """ -        Return the total size, in octets, of this message part. - -        :return: size of the message, in octets -        :rtype: int -        """ -        if not self._pmap: -            return 0 -        size = self._pmap.get('size', None) -        if not size: -            logger.error("Message part cannot find size in the partmap") -        return size - -    def getBodyFile(self): -        """ -        Retrieve a file object containing only the body of this message. - -        :return: file-like object opened for reading -        :rtype: StringIO -        """ -        fd = StringIO.StringIO() -        if self._pmap: -            multi = self._pmap.get('multi') -            if not multi: -                phash = self._pmap.get("phash", None) -            else: -                pmap = self._pmap.get('part_map') -                first_part = pmap.get('1', None) -                if first_part: -                    phash = first_part['phash'] - -            if not phash: -                logger.warning("Could not find phash for this subpart!") -                payload = str("") -            else: -                payload = self._get_payload_from_document(phash) - -        else: -            logger.warning("Message with no part_map!") -            payload = str("") - -        if payload: -            content_type = self._get_ctype_from_document(phash) -            charset = find_charset(content_type) -            logger.debug("Got charset from header: %s" % (charset,)) -            if charset is None: -                charset = self._get_charset(payload) -                logger.debug("Got charset: %s" % (charset,)) -            try: -                payload = payload.encode(charset) -            except (UnicodeEncodeError, UnicodeDecodeError) as e: -                logger.error("Unicode error, using 'replace'. {0!r}".format(e)) -                payload = payload.encode(charset, 'replace') - -        fd.write(payload) -        fd.seek(0) -        return fd - -    # TODO cache the phash retrieval -    def _get_payload_from_document(self, phash): -        """ -        Gets the message payload from the content document. - -        :param phash: the payload hash to retrieve by. -        :type phash: basestring -        """ -        cdocs = self._soledad.get_from_index( -            fields.TYPE_P_HASH_IDX, -            fields.TYPE_CONTENT_VAL, str(phash)) - -        cdoc = first(cdocs) -        if not cdoc: -            logger.warning( -                "Could not find the content doc " -                "for phash %s" % (phash,)) -        payload = cdoc.content.get(fields.RAW_KEY, "") -        return payload - -    # TODO cache the pahash retrieval -    def _get_ctype_from_document(self, phash): -        """ -        Gets the content-type from the content document. - -        :param phash: the payload hash to retrieve by. -        :type phash: basestring -        """ -        cdocs = self._soledad.get_from_index( -            fields.TYPE_P_HASH_IDX, -            fields.TYPE_CONTENT_VAL, str(phash)) - -        cdoc = first(cdocs) -        if not cdoc: -            logger.warning( -                "Could not find the content doc " -                "for phash %s" % (phash,)) -        ctype = cdoc.content.get('ctype', "") -        return ctype - -    @memoized_method -    def _get_charset(self, stuff): -        # TODO put in a common class with LeapMessage -        """ -        Gets (guesses?) the charset of a payload. - -        :param stuff: the stuff to guess about. -        :type stuff: basestring -        :returns: charset -        """ -        # XXX existential doubt 2. shouldn't we make the scope -        # of the decorator somewhat more persistent? -        # ah! yes! and put memory bounds. -        return get_email_charset(unicode(stuff)) - -    def getHeaders(self, negate, *names): -        """ -        Retrieve a group of message headers. - -        :param names: The names of the headers to retrieve or omit. -        :type names: tuple of str - -        :param negate: If True, indicates that the headers listed in names -                       should be omitted from the return value, rather -                       than included. -        :type negate: bool - -        :return: A mapping of header field names to header field values -        :rtype: dict -        """ -        if not self._pmap: -            logger.warning("No pmap in Subpart!") -            return {} -        headers = dict(self._pmap.get("headers", [])) - -        # twisted imap server expects *some* headers to be lowercase -        # We could use a CaseInsensitiveDict here... -        headers = dict( -            (str(key), str(value)) if key.lower() != "content-type" -            else (str(key.lower()), str(value)) -            for (key, value) in headers.items()) - -        names = map(lambda s: s.upper(), names) -        if negate: -            cond = lambda key: key.upper() not in names -        else: -            cond = lambda key: key.upper() in names - -        # unpack and filter original dict by negate-condition -        filter_by_cond = [ -            map(str, (key, val)) for -            key, val in headers.items() -            if cond(key)] -        filtered = dict(filter_by_cond) -        return filtered - -    def isMultipart(self): -        """ -        Return True if this message is multipart. -        """ -        if not self._pmap: -            logger.warning("Could not get part map!") -            return False -        multi = self._pmap.get("multi", False) -        return multi - -    def getSubPart(self, part): -        """ -        Retrieve a MIME submessage - -        :type part: C{int} -        :param part: The number of the part to retrieve, indexed from 0. -        :raise IndexError: Raised if the specified part does not exist. -        :raise TypeError: Raised if this message is not multipart. -        :rtype: Any object implementing C{IMessagePart}. -        :return: The specified sub-part. -        """ -        if not self.isMultipart(): -            raise TypeError -        sub_pmap = self._pmap.get("part_map", {}) -        try: -            part_map = sub_pmap[str(part + 1)] -        except KeyError: -            logger.debug("getSubpart for %s: KeyError" % (part,)) -            raise IndexError - -        # XXX check for validity -        return MessagePart(self._soledad, part_map) -  class LeapMessage(fields, MailParser, MBoxParser):      """ @@ -380,7 +141,7 @@ class LeapMessage(fields, MailParser, MBoxParser):              if not fdoc:                  fdoc = self._get_flags_doc()              if fdoc: -                fdoc_content = maybe_call(fdoc.content) +                fdoc_content = fdoc.content                  self.__chash = fdoc_content.get(                      fields.CONTENT_HASH_KEY, None)              return fdoc @@ -404,7 +165,7 @@ class LeapMessage(fields, MailParser, MBoxParser):          if not self._fdoc:              return None          if not self.__chash and self._fdoc: -            self.__chash = maybe_call(self._fdoc.content).get( +            self.__chash = self._fdoc.content.get(                  fields.CONTENT_HASH_KEY, None)          return self.__chash @@ -444,7 +205,7 @@ class LeapMessage(fields, MailParser, MBoxParser):          flags = []          fdoc = self._fdoc          if fdoc: -            flags = maybe_call(fdoc.content).get(self.FLAGS_KEY, None) +            flags = fdoc.content.get(self.FLAGS_KEY, None)          msgcol = self._collection @@ -557,12 +318,12 @@ class LeapMessage(fields, MailParser, MBoxParser):                  charset = self._get_charset(body)              try:                  body = body.encode(charset) -            except UnicodeError as e: -                logger.error("Unicode error {0}".format(e)) +            except UnicodeError as exc: +                logger.error("Unicode error {0}".format(exc))                  logger.debug("Attempted to encode with: %s" % charset)                  try:                      body = body.encode(charset, 'replace') -                except UnicodeError as e: +                except UnicodeError as exc:                      try:                          body = body.encode('utf-8', 'replace')                      except: @@ -601,7 +362,7 @@ class LeapMessage(fields, MailParser, MBoxParser):          """          size = None          if self._fdoc: -            fdoc_content = maybe_call(self._fdoc.content) +            fdoc_content = self._fdoc.content              size = fdoc_content.get(self.SIZE_KEY, False)          else:              logger.warning("No FLAGS doc for %s:%s" % (self._mbox, @@ -667,7 +428,7 @@ class LeapMessage(fields, MailParser, MBoxParser):          Return the headers dict for this message.          """          if self._hdoc is not None: -            hdoc_content = maybe_call(self._hdoc.content) +            hdoc_content = self._hdoc.content              headers = hdoc_content.get(self.HEADERS_KEY, {})              return headers @@ -682,7 +443,7 @@ class LeapMessage(fields, MailParser, MBoxParser):          Return True if this message is multipart.          """          if self._fdoc: -            fdoc_content = maybe_call(self._fdoc.content) +            fdoc_content = self._fdoc.content              is_multipart = fdoc_content.get(self.MULTIPART_KEY, False)              return is_multipart          else: @@ -725,7 +486,7 @@ class LeapMessage(fields, MailParser, MBoxParser):              logger.warning("Tried to get part but no HDOC found!")              return None -        hdoc_content = maybe_call(self._hdoc.content) +        hdoc_content = self._hdoc.content          pmap = hdoc_content.get(fields.PARTS_MAP_KEY, {})          return pmap[str(part)] @@ -762,7 +523,7 @@ class LeapMessage(fields, MailParser, MBoxParser):          Return the document that keeps the body for this          message.          """ -        hdoc_content = maybe_call(self._hdoc.content) +        hdoc_content = self._hdoc.content          body_phash = hdoc_content.get(              fields.BODY_KEY, None)          if not body_phash: @@ -801,7 +562,7 @@ class LeapMessage(fields, MailParser, MBoxParser):          :return: The content value indexed by C{key} or None          :rtype: str          """ -        return maybe_call(self._fdoc.content).get(key, None) +        return self._fdoc.content.get(key, None)      # setters @@ -874,143 +635,7 @@ class LeapMessage(fields, MailParser, MBoxParser):          return self._fdoc is not None -class ContentDedup(object): -    """ -    Message deduplication. - -    We do a query for the content hashes before writing to our beloved -    sqlcipher backend of Soledad. This means, by now, that: - -    1. We will not store the same attachment twice, only the hash of it. -    2. We will not store the same message body twice, only the hash of it. - -    The first case is useful if you are always receiving the same old memes -    from unwary friends that still have not discovered that 4chan is the -    generator of the internet. The second will save your day if you have -    initiated session with the same account in two different machines. I also -    wonder why would you do that, but let's respect each other choices, like -    with the religious celebrations, and assume that one day we'll be able -    to run Bitmask in completely free phones. Yes, I mean that, the whole GSM -    Stack. -    """ - -    def _content_does_exist(self, doc): -        """ -        Check whether we already have a content document for a payload -        with this hash in our database. - -        :param doc: tentative body document -        :type doc: dict -        :returns: True if that happens, False otherwise. -        """ -        if not doc: -            return False -        phash = doc[fields.PAYLOAD_HASH_KEY] -        attach_docs = self._soledad.get_from_index( -            fields.TYPE_P_HASH_IDX, -            fields.TYPE_CONTENT_VAL, str(phash)) -        if not attach_docs: -            return False - -        if len(attach_docs) != 1: -            logger.warning("Found more than one copy of phash %s!" -                           % (phash,)) -        logger.debug("Found attachment doc with that hash! Skipping save!") -        return True - - -SoledadWriterPayload = namedtuple( -    'SoledadWriterPayload', ['mode', 'payload']) - -# TODO we could consider using enum here: -# https://pypi.python.org/pypi/enum - -SoledadWriterPayload.CREATE = 1 -SoledadWriterPayload.PUT = 2 -SoledadWriterPayload.CONTENT_CREATE = 3 - - -""" -SoledadDocWriter was used to avoid writing to the db from multiple threads. -Its use here has been deprecated in favor of a local rw_lock in the client. -But we might want to reuse in in the near future to implement priority queues. -""" - - -class SoledadDocWriter(object): -    """ -    This writer will create docs serially in the local soledad database. -    """ - -    implements(IMessageConsumer) - -    def __init__(self, soledad): -        """ -        Initialize the writer. - -        :param soledad: the soledad instance -        :type soledad: Soledad -        """ -        self._soledad = soledad - -    def _get_call_for_item(self, item): -        """ -        Return the proper call type for a given item. - -        :param item: one of the types defined under the -                     attributes of SoledadWriterPayload -        :type item: int -        """ -        call = None -        payload = item.payload - -        if item.mode == SoledadWriterPayload.CREATE: -            call = self._soledad.create_doc -        elif (item.mode == SoledadWriterPayload.CONTENT_CREATE -              and not self._content_does_exist(payload)): -                call = self._soledad.create_doc -        elif item.mode == SoledadWriterPayload.PUT: -            call = self._soledad.put_doc -        return call - -    def _process(self, queue): -        """ -        Return the item and the proper call type for the next -        item in the queue if any. - -        :param queue: the queue from where we'll pick item. -        :type queue: Queue -        """ -        item = queue.get() -        call = self._get_call_for_item(item) -        return item, call - -    def consume(self, queue): -        """ -        Creates a new document in soledad db. - -        :param queue: queue to get item from, with content of the document -                      to be inserted. -        :type queue: Queue -        """ -        empty = queue.empty() -        while not empty: -            item, call = self._process(queue) - -            if call: -                # XXX should handle the delete case -                # should handle errors -                try: -                    call(item.payload) -                except u1db_errors.RevisionConflict as exc: -                    logger.error("Error: %r" % (exc,)) -                    raise exc - -            empty = queue.empty() - - -class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, -                        ContentDedup): +class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):      """      A collection of messages, surprisingly. @@ -1360,7 +985,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,          # TODO ---- add reference to original doc, to be deleted          # after writes are done.          msg_container = MessageDict(fd, hd, cdocs) -        self._memstore.put(self.mbox, uid, msg_container) +        self._memstore.create_message(self.mbox, uid, msg_container)      def _remove_cb(self, result):          return result | 
