diff options
| -rw-r--r-- | mail/changes/feature_split_message_docs | 6 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/fetch.py | 7 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/fields.py | 49 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/index.py | 4 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/mailbox.py | 103 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/messages.py | 831 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/parser.py | 24 | 
7 files changed, 808 insertions, 216 deletions
| diff --git a/mail/changes/feature_split_message_docs b/mail/changes/feature_split_message_docs new file mode 100644 index 0000000..231c36e --- /dev/null +++ b/mail/changes/feature_split_message_docs @@ -0,0 +1,6 @@ +  o Defer costly operations to a pool of threads. +  o Split the internal representation of messages into four distinct documents: +    1) Flags 2) Headers 3) Body 4) Attachments. +  o Add deduplication ability to the save operation, for body and attachments. +  o Add IMessageCopier interface to mailbox implementation, so bulk moves +    are costless. Closes: #4654 diff --git a/mail/src/leap/mail/imap/fetch.py b/mail/src/leap/mail/imap/fetch.py index 0b31c3b..fdf1412 100644 --- a/mail/src/leap/mail/imap/fetch.py +++ b/mail/src/leap/mail/imap/fetch.py @@ -412,13 +412,13 @@ class LeapIncomingMail(object):          # decrypt or fail gracefully          try: -            decrdata, valid_sig = yield self._decrypt_and_verify_data( +            decrdata, valid_sig = self._decrypt_and_verify_data(                  encdata, senderPubkey)          except keymanager_errors.DecryptError as e:              logger.warning('Failed to decrypt encrypted message (%s). '                             'Storing message without modifications.' % str(e))              # Bailing out! -            yield (msg, False) +            return (msg, False)          # decrypted successully, now fix encoding and parse          try: @@ -441,7 +441,7 @@ class LeapIncomingMail(object):          # all ok, replace payload by unencrypted payload          msg.set_payload(decrmsg.get_payload()) -        yield (msg, valid_sig) +        return (msg, valid_sig)      def _maybe_decrypt_inline_encrypted_msg(self, origmsg, encoding,                                              senderPubkey): @@ -527,6 +527,7 @@ class LeapIncomingMail(object):          """          log.msg('adding message to local db')          doc, data = msgtuple +          if isinstance(data, list):              data = data[0] diff --git a/mail/src/leap/mail/imap/fields.py b/mail/src/leap/mail/imap/fields.py index 96b937e..40817cd 100644 --- a/mail/src/leap/mail/imap/fields.py +++ b/mail/src/leap/mail/imap/fields.py @@ -25,18 +25,35 @@ class WithMsgFields(object):      Container class for class-attributes to be shared by      several message-related classes.      """ -    # Internal representation of Message -    DATE_KEY = "date" -    HEADERS_KEY = "headers" -    FLAGS_KEY = "flags" -    MBOX_KEY = "mbox" +    # indexing      CONTENT_HASH_KEY = "chash" -    RAW_KEY = "raw" -    SUBJECT_KEY = "subject" +    PAYLOAD_HASH_KEY = "phash" + +    # Internal representation of Message + +    # flags doc      UID_KEY = "uid" +    MBOX_KEY = "mbox" +    SEEN_KEY = "seen" +    RECENT_KEY = "recent" +    FLAGS_KEY = "flags"      MULTIPART_KEY = "multi"      SIZE_KEY = "size" +    # headers +    HEADERS_KEY = "headers" +    NUM_PARTS_KEY = "numparts" +    PARTS_MAP_KEY = "partmap" +    DATE_KEY = "date" +    SUBJECT_KEY = "subject" + +    # attachment +    PART_NUMBER_KEY = "part" +    RAW_KEY = "raw" + +    # content +    BODY_KEY = "body" +      # Mailbox specific keys      CLOSED_KEY = "closed"      CREATED_KEY = "created" @@ -55,10 +72,6 @@ class WithMsgFields(object):      INBOX_VAL = "inbox" -    # Flags for SoledadDocument for indexing. -    SEEN_KEY = "seen" -    RECENT_KEY = "recent" -      # Flags in Mailbox and Message      SEEN_FLAG = "\\Seen"      RECENT_FLAG = "\\Recent" @@ -82,7 +95,9 @@ class WithMsgFields(object):      TYPE_SUBS_IDX = 'by-type-and-subscribed'      TYPE_MBOX_SEEN_IDX = 'by-type-and-mbox-and-seen'      TYPE_MBOX_RECT_IDX = 'by-type-and-mbox-and-recent' -    TYPE_HASH_IDX = 'by-type-and-hash' +    TYPE_C_HASH_IDX = 'by-type-and-contenthash' +    TYPE_C_HASH_PART_IDX = 'by-type-and-contenthash-and-partnumber' +    TYPE_P_HASH_IDX = 'by-type-and-payloadhash'      # Tomas created the `recent and seen index`, but the semantic is not too      # correct since the recent flag is volatile. @@ -90,7 +105,9 @@ class WithMsgFields(object):      KTYPE = TYPE_KEY      MBOX_VAL = TYPE_MBOX_VAL -    HASH_VAL = CONTENT_HASH_KEY +    CHASH_VAL = CONTENT_HASH_KEY +    PHASH_VAL = PAYLOAD_HASH_KEY +    PART_VAL = PART_NUMBER_KEY      INDEXES = {          # generic @@ -102,7 +119,11 @@ class WithMsgFields(object):          TYPE_SUBS_IDX: [KTYPE, 'bool(subscribed)'],          # content, headers doc -        TYPE_HASH_IDX: [KTYPE, HASH_VAL], +        TYPE_C_HASH_IDX: [KTYPE, CHASH_VAL], +        # attachment docs +        TYPE_C_HASH_PART_IDX: [KTYPE, CHASH_VAL, PART_VAL], +        # attachment payload dedup +        TYPE_P_HASH_IDX: [KTYPE, PHASH_VAL],          # messages          TYPE_MBOX_SEEN_IDX: [KTYPE, MBOX_VAL, 'bool(seen)'], diff --git a/mail/src/leap/mail/imap/index.py b/mail/src/leap/mail/imap/index.py index 2280d86..5f0919a 100644 --- a/mail/src/leap/mail/imap/index.py +++ b/mail/src/leap/mail/imap/index.py @@ -21,7 +21,7 @@ import logging  from leap.common.check import leap_assert, leap_assert_type -from leap.mail.imap.account import SoledadBackedAccount +from leap.mail.imap.fields import fields  logger = logging.getLogger(__name__) @@ -54,7 +54,7 @@ class IndexedDB(object):          db_indexes = dict()          if self._soledad is not None:              db_indexes = dict(self._soledad.list_indexes()) -        for name, expression in SoledadBackedAccount.INDEXES.items(): +        for name, expression in fields.INDEXES.items():              if name not in db_indexes:                  # The index does not yet exist.                  self._soledad.create_index(name, *expression) diff --git a/mail/src/leap/mail/imap/mailbox.py b/mail/src/leap/mail/imap/mailbox.py index 09c06a2..5ea6f55 100644 --- a/mail/src/leap/mail/imap/mailbox.py +++ b/mail/src/leap/mail/imap/mailbox.py @@ -17,7 +17,13 @@  """  Soledad Mailbox.  """ +import copy +import threading  import logging +import time +import StringIO +import cStringIO +  from collections import defaultdict  from twisted.internet import defer @@ -45,9 +51,14 @@ class SoledadMailbox(WithMsgFields, MBoxParser):      The low-level database methods are contained in MessageCollection class,      which we instantiate and make accessible in the `messages` attribute.      """ -    implements(imap4.IMailboxInfo, imap4.IMailbox, imap4.ICloseableMailbox) +    implements( +        imap4.IMailbox, +        imap4.IMailboxInfo, +        imap4.ICloseableMailbox, +        imap4.IMessageCopier) +      # XXX should finish the implementation of IMailboxListener -    # XXX should implement IMessageCopier too +    # XXX should implement ISearchableMailbox too      messages = None      _closed = False @@ -65,6 +76,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):      CMD_UNSEEN = "UNSEEN"      _listeners = defaultdict(set) +    next_uid_lock = threading.Lock()      def __init__(self, mbox, soledad=None, rw=1):          """ @@ -284,8 +296,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          :rtype: int          """ -        self.last_uid += 1 -        return self.last_uid +        with self.next_uid_lock: +            self.last_uid += 1 +            return self.last_uid      def getMessageCount(self):          """ @@ -366,6 +379,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          :return: a deferred that evals to None          """ +        if isinstance(message, (cStringIO.OutputType, StringIO.StringIO)): +            message = message.getvalue()          # XXX we should treat the message as an IMessage from here          leap_assert_type(message, basestring)          uid_next = self.getUIDNext() @@ -375,11 +390,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          else:              flags = tuple(str(flag) for flag in flags) -        d = self._do_add_messages(message, flags, date, uid_next) +        d = self._do_add_message(message, flags, date, uid_next)          d.addCallback(self._notify_new) +        return d      @deferred -    def _do_add_messages(self, message, flags, date, uid_next): +    def _do_add_message(self, message, flags, date, uid_next):          """          Calls to the messageCollection add_msg method (deferred to thread).          Invoked from addMessage. @@ -420,28 +436,21 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          # we should postpone the removal          self._soledad.delete_doc(self._get_mbox()) +    @deferred      def expunge(self):          """          Remove all messages flagged \\Deleted          """          if not self.isWriteable():              raise imap4.ReadOnlyMailbox -        delete = []          deleted = [] - -        for m in self.messages.get_all_docs(): -            # XXX should operate with LeapMessages instead, -            # so we don't expose the implementation. -            # (so, iterate for m in self.messages) -            if self.DELETED_FLAG in m.content[self.FLAGS_KEY]: -                delete.append(m) -        for m in delete: -            deleted.append(m.content) -            self.messages.remove(m) - -        # XXX should return the UIDs of the deleted messages -        # more generically -        return [x for x in range(len(deleted))] +        for m in self.messages: +            if self.DELETED_FLAG in m.getFlags(): +                self.messages.remove(m) +                # XXX this would ve more efficient if we can just pass +                # a sequence of uids. +                deleted.append(m.getUID()) +        return deleted      @deferred      def fetch(self, messages, uid): @@ -510,6 +519,17 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          session is the first session to be notified about a message,          then that message SHOULD be considered recent.          """ +        # TODO this fucker, for the sake of correctness, is messing with +        # the whole collection of flag docs. + +        # Possible ways of action: +        # 1. Ignore it, we want fun. +        # 2. Trigger it with a delay +        # 3. Route it through a queue with lesser priority than the +        #    regularar writer. + +        # hmm let's try 2. in a quickndirty way... +        time.sleep(1)          log.msg('unsetting recent flags...')          for msg in self.messages.get_recent():              msg.removeFlags((fields.RECENT_FLAG,)) @@ -570,6 +590,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          for msg_id in messages:              log.msg("MSG ID = %s" % msg_id)              msg = self.messages.get_msg_by_uid(msg_id) +            if not msg: +                return result              if mode == 1:                  msg.addFlags(flags)              elif mode == -1: @@ -589,15 +611,36 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          self.expunge()          self.closed = True -    #@deferred -    #def copy(self, messageObject): -        #""" -        #Copy the given message object into this mailbox. -        #""" -        # XXX should just: -        # 1. Get the message._fdoc -        # 2. Change the UID to UIDNext for this mailbox -        # 3. Add implements IMessageCopier +    # IMessageCopier + +    @deferred +    def copy(self, messageObject): +        """ +        Copy the given message object into this mailbox. +        """ +        uid_next = self.getUIDNext() +        msg = messageObject + +        # XXX should use a public api instead +        fdoc = msg._fdoc +        if not fdoc: +            logger.debug("Tried to copy a MSG with no fdoc") +            return + +        new_fdoc = copy.deepcopy(fdoc.content) +        new_fdoc[self.UID_KEY] = uid_next +        new_fdoc[self.MBOX_KEY] = self.mbox + +        d = self._do_add_doc(new_fdoc) +        d.addCallback(self._notify_new) + +    @deferred +    def _do_add_doc(self, doc): +        """ +        Defers the adding of a new doc. +        :param doc: document to be created in soledad. +        """ +        self._soledad.create_doc(doc)      # convenience fun diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py index b0d5da2..c69c023 100644 --- a/mail/src/leap/mail/imap/messages.py +++ b/mail/src/leap/mail/imap/messages.py @@ -29,9 +29,9 @@ from zope.interface import implements  from zope.proxy import sameProxiedObjects  from leap.common.check import leap_assert, leap_assert_type +from leap.common.decorators import memoized_method  from leap.common.mail import get_email_charset  from leap.mail.decorators import deferred -from leap.mail.imap.account import SoledadBackedAccount  from leap.mail.imap.index import IndexedDB  from leap.mail.imap.fields import fields, WithMsgFields  from leap.mail.imap.parser import MailParser, MBoxParser @@ -40,6 +40,181 @@ from leap.mail.messageflow import IMessageConsumer, MessageProducer  logger = logging.getLogger(__name__) +def first(things): +    """ +    Return the head of a collection. +    """ +    try: +        return things[0] +    except (IndexError, TypeError): +        return None + + +class MessageBody(object): +    """ +    IMessagePart implementor for the main +    body of a multipart message. + +    Excusatio non petita: see the interface documentation. +    """ + +    implements(imap4.IMessagePart) + +    def __init__(self, fdoc, bdoc): +        self._fdoc = fdoc +        self._bdoc = bdoc + +    def getSize(self): +        return len(self._bdoc.content[fields.BODY_KEY]) + +    def getBodyFile(self): +        fd = StringIO.StringIO() + +        if self._bdoc: +            body = self._bdoc.content[fields.BODY_KEY] +        else: +            body = "" +        charset = self._get_charset(body) +        try: +            body = body.encode(charset) +        except (UnicodeEncodeError, UnicodeDecodeError) as e: +            logger.error("Unicode error {0}".format(e)) +            body = body.encode(charset, 'replace') +        fd.write(body) +        fd.seek(0) +        return fd + +    @memoized_method +    def _get_charset(self, stuff): +        return get_email_charset(unicode(stuff)) + +    def getHeaders(self, negate, *names): +        return {} + +    def isMultipart(self): +        return False + +    def getSubPart(self, part): +        return None + + +class MessageAttachment(object): + +    implements(imap4.IMessagePart) + +    def __init__(self, msg): +        """ +        Initializes the messagepart with a Message instance. +        :param msg: a message instance +        :type msg: Message +        """ +        self._msg = msg + +    def getSize(self): +        """ +        Return the total size, in octets, of this message part. + +        :return: size of the message, in octets +        :rtype: int +        """ +        if not self._msg: +            return 0 +        return len(self._msg.as_string()) + +    def getBodyFile(self): +        """ +        Retrieve a file object containing only the body of this message. + +        :return: file-like object opened for reading +        :rtype: StringIO +        """ +        fd = StringIO.StringIO() +        if self._msg: +            body = self._msg.get_payload() +        else: +            logger.debug("Empty message!") +            body = "" + +        # XXX should only do the dance if we're sure it's +        # content/text-plain!!! +        #charset = self._get_charset(body) +        #try: +            #body = body.encode(charset) +        #except (UnicodeEncodeError, UnicodeDecodeError) as e: +            #logger.error("Unicode error {0}".format(e)) +            #body = body.encode(charset, 'replace') +        fd.write(body) +        fd.seek(0) +        return fd + +    @memoized_method +    def _get_charset(self, stuff): +        # TODO put in a common class with LeapMessage +        """ +        Gets (guesses?) the charset of a payload. + +        :param stuff: the stuff to guess about. +        :type stuff: basestring +        :returns: charset +        """ +        # XXX existential doubt 1. wouldn't be smarter to +        # peek into the mail headers? +        # XXX existential doubt 2. shouldn't we make the scope +        # of the decorator somewhat more persistent? +        # ah! yes! and put memory bounds. +        return get_email_charset(unicode(stuff)) + +    def getHeaders(self, negate, *names): +        """ +        Retrieve a group of message headers. + +        :param names: The names of the headers to retrieve or omit. +        :type names: tuple of str + +        :param negate: If True, indicates that the headers listed in names +                       should be omitted from the return value, rather +                       than included. +        :type negate: bool + +        :return: A mapping of header field names to header field values +        :rtype: dict +        """ +        if not self._msg: +            return {} +        headers = dict(self._msg.items()) +        names = map(lambda s: s.upper(), names) +        if negate: +            cond = lambda key: key.upper() not in names +        else: +            cond = lambda key: key.upper() in names + +        # unpack and filter original dict by negate-condition +        filter_by_cond = [ +            map(str, (key, val)) for +            key, val in headers.items() +            if cond(key)] +        return dict(filter_by_cond) + +    def isMultipart(self): +        """ +        Return True if this message is multipart. +        """ +        return self._msg.is_multipart() + +    def getSubPart(self, part): +        """ +        Retrieve a MIME submessage + +        :type part: C{int} +        :param part: The number of the part to retrieve, indexed from 0. +        :raise IndexError: Raised if the specified part does not exist. +        :raise TypeError: Raised if this message is not multipart. +        :rtype: Any object implementing C{IMessagePart}. +        :return: The specified sub-part. +        """ +        return self._msg.get_payload() + +  class LeapMessage(fields, MailParser, MBoxParser):      implements(imap4.IMessage) @@ -59,25 +234,21 @@ class LeapMessage(fields, MailParser, MBoxParser):          self._soledad = soledad          self._uid = int(uid)          self._mbox = self._parse_mailbox_name(mbox) -        self._chash = None -        self.__cdoc = None +        self.__chash = None +        self.__bdoc = None      @property      def _fdoc(self):          """          An accessor to the flags document.          """ -        return self._get_flags_doc() - -    @property -    def _cdoc(self): -        """ -        An accessor to the content document. -        """ -        if not self.__cdoc: -            self.__cdoc = self._get_content_doc() -        return self.__cdoc +        if all(map(bool, (self._uid, self._mbox))): +            fdoc = self._get_flags_doc() +            if fdoc: +                self.__chash = fdoc.content.get( +                    fields.CONTENT_HASH_KEY, None) +            return fdoc      @property      def _chash(self): @@ -86,7 +257,26 @@ class LeapMessage(fields, MailParser, MBoxParser):          """          if not self._fdoc:              return None -        return self._fdoc.content.get(fields.CONTENT_HASH_KEY, None) +        if not self.__chash and self._fdoc: +            self.__chash = self._fdoc.content.get( +                fields.CONTENT_HASH_KEY, None) +        return self.__chash + +    @property +    def _hdoc(self): +        """ +        An accessor to the headers document. +        """ +        return self._get_headers_doc() + +    @property +    def _bdoc(self): +        """ +        An accessor to the body document. +        """ +        if not self.__bdoc: +            self.__bdoc = self._get_body_doc() +        return self.__bdoc      # IMessage implementation @@ -110,9 +300,9 @@ class LeapMessage(fields, MailParser, MBoxParser):              return []          flags = [] -        flag_doc = self._fdoc -        if flag_doc: -            flags = flag_doc.content.get(self.FLAGS_KEY, None) +        fdoc = self._fdoc +        if fdoc: +            flags = fdoc.content.get(self.FLAGS_KEY, None)          if flags:              flags = map(str, flags)          return tuple(flags) @@ -180,7 +370,7 @@ class LeapMessage(fields, MailParser, MBoxParser):          :rtype: C{str}          :return: An RFC822-formatted date string.          """ -        return str(self._cdoc.content.get(self.DATE_KEY, '')) +        return str(self._hdoc.content.get(self.DATE_KEY, ''))      #      # IMessagePart @@ -197,25 +387,38 @@ class LeapMessage(fields, MailParser, MBoxParser):          :rtype: StringIO          """          fd = StringIO.StringIO() +        bdoc = self._bdoc +        if bdoc: +            body = self._bdoc.content.get(self.BODY_KEY, "") +        else: +            body = "" -        cdoc = self._cdoc -        content = cdoc.content.get(self.RAW_KEY, '') -        charset = get_email_charset( -            unicode(cdoc.content.get(self.RAW_KEY, ''))) +        charset = self._get_charset(body)          try: -            content = content.encode(charset) +            body = body.encode(charset)          except (UnicodeEncodeError, UnicodeDecodeError) as e:              logger.error("Unicode error {0}".format(e)) -            content = content.encode(charset, 'replace') - -        raw = self._get_raw_msg() -        msg = self._get_parsed_msg(raw) -        body = msg.get_payload() +            body = body.encode(charset, 'replace')          fd.write(body) -        # XXX SHOULD use a separate BODY FIELD ...          fd.seek(0)          return fd +    @memoized_method +    def _get_charset(self, stuff): +        """ +        Gets (guesses?) the charset of a payload. + +        :param stuff: the stuff to guess about. +        :type stuff: basestring +        :returns: charset +        """ +        # XXX existential doubt 1. wouldn't be smarter to +        # peek into the mail headers? +        # XXX existential doubt 2. shouldn't we make the scope +        # of the decorator somewhat more persistent? +        # ah! yes! and put memory bounds. +        return get_email_charset(unicode(stuff)) +      def getSize(self):          """          Return the total size, in octets, of this message. @@ -223,19 +426,17 @@ class LeapMessage(fields, MailParser, MBoxParser):          :return: size of the message, in octets          :rtype: int          """ -        size = self._cdoc.content.get(self.SIZE_KEY, False) +        size = None +        if self._fdoc: +            size = self._fdoc.content.get(self.SIZE_KEY, False) +        else: +            logger.warning("No FLAGS doc for %s:%s" % (self._mbox, +                                                       self._uid))          if not size:              # XXX fallback, should remove when all migrated.              size = self.getBodyFile().len          return size -    def _get_headers(self): -        """ -        Return the headers dict stored in this message document. -        """ -        # XXX get from the headers doc -        return self._cdoc.content.get(self.HEADERS_KEY, {}) -      def getHeaders(self, negate, *names):          """          Retrieve a group of message headers. @@ -252,26 +453,49 @@ class LeapMessage(fields, MailParser, MBoxParser):          :rtype: dict          """          headers = self._get_headers() +        if not headers: +            return {'content-type': ''}          names = map(lambda s: s.upper(), names)          if negate:              cond = lambda key: key.upper() not in names          else:              cond = lambda key: key.upper() in names +        head = copy.deepcopy(dict(headers.items())) + +            # twisted imap server expects headers to be lowercase +        head = dict( +            map(str, (key, value)) if key.lower() != "content-type" +            else map(str, (key.lower(), value)) +            for (key, value) in head.items()) +          # unpack and filter original dict by negate-condition -        filter_by_cond = [ -            map(str, (key, val)) for -            key, val in headers.items() -            if cond(key)] +        filter_by_cond = [(key, val) for key, val in head.items() if cond(key)]          return dict(filter_by_cond) +    def _get_headers(self): +        """ +        Return the headers dict for this message. +        """ +        if self._hdoc is not None: +            return self._hdoc.content.get(self.HEADERS_KEY, {}) +        else: +            logger.warning( +                "No HEADERS doc for msg %s:%s" % ( +                    self._mbox, +                    self._uid)) +      def isMultipart(self):          """          Return True if this message is multipart.          """ -        if self._cdoc: -            retval = self._fdoc.content.get(self.MULTIPART_KEY, False) -            return retval +        if self._fdoc: +            return self._fdoc.content.get(self.MULTIPART_KEY, False) +        else: +            logger.warning( +                "No FLAGS doc for msg %s:%s" % ( +                    self.mbox, +                    self.uid))      def getSubPart(self, part):          """ @@ -284,12 +508,22 @@ class LeapMessage(fields, MailParser, MBoxParser):          :rtype: Any object implementing C{IMessagePart}.          :return: The specified sub-part.          """ +        logger.debug("Getting subpart: %s" % part)          if not self.isMultipart():              raise TypeError -        msg = self._get_parsed_msg() -        # XXX should wrap IMessagePart -        return msg.get_payload()[part] +        if part == 0: +            # Let's get the first part, which +            # is really the body. +            return MessageBody(self._fdoc, self._bdoc) + +        attach_doc = self._get_attachment_doc(part) +        if not attach_doc: +            # so long and thanks for all the fish +            logger.debug("...not today") +            raise IndexError +        msg_part = self._get_parsed_msg(attach_doc.content[self.RAW_KEY]) +        return MessageAttachment(msg_part)      #      # accessors @@ -301,32 +535,87 @@ class LeapMessage(fields, MailParser, MBoxParser):          message.          """          flag_docs = self._soledad.get_from_index( -            SoledadBackedAccount.TYPE_MBOX_UID_IDX, +            fields.TYPE_MBOX_UID_IDX,              fields.TYPE_FLAGS_VAL, self._mbox, str(self._uid)) -        flag_doc = flag_docs[0] if flag_docs else None -        return flag_doc +        return first(flag_docs) -    def _get_content_doc(self): +    def _get_headers_doc(self):          """ -        Return the document that keeps the flags for this +        Return the document that keeps the headers for this +        message. +        """ +        head_docs = self._soledad.get_from_index( +            fields.TYPE_C_HASH_IDX, +            fields.TYPE_HEADERS_VAL, str(self._chash)) +        return first(head_docs) + +    def _get_body_doc(self): +        """ +        Return the document that keeps the body for this          message.          """ -        cont_docs = self._soledad.get_from_index( -            SoledadBackedAccount.TYPE_HASH_IDX, -            fields.TYPE_MESSAGE_VAL, self._content_hash, str(self._uid)) -        cont_doc = cont_docs[0] if cont_docs else None -        return cont_doc +        body_docs = self._soledad.get_from_index( +            fields.TYPE_C_HASH_IDX, +            fields.TYPE_MESSAGE_VAL, str(self._chash)) +        return first(body_docs) + +    def _get_num_parts(self): +        """ +        Return the number of parts for a multipart message. +        """ +        if not self.isMultipart(): +            raise TypeError( +                "Tried to get num parts in a non-multipart message") +        if not self._hdoc: +            return None +        return self._hdoc.content.get(fields.NUM_PARTS_KEY, 2) + +    def _get_attachment_doc(self, part): +        """ +        Return the document that keeps the headers for this +        message. + +        :param part: the part number for the multipart message. +        :type part: int +        """ +        if not self._hdoc: +            return None +        try: +            phash = self._hdoc.content[self.PARTS_MAP_KEY][str(part)] +        except KeyError: +            # this is the remnant of a debug session until +            # I found that the index is actually a string... +            # It should be safe to just raise the KeyError now, +            # but leaving it here while the blood is fresh... +            logger.warning("We expected a phash in the " +                           "index %s, but noone found" % (part, )) +            logger.debug(self._hdoc.content[self.PARTS_MAP_KEY]) +            return None +        attach_docs = self._soledad.get_from_index( +            fields.TYPE_P_HASH_IDX, +            fields.TYPE_ATTACHMENT_VAL, str(phash)) + +        # The following is true for the fist owner. +        # We could use this relationship to flag the "owner" +        # and orphan when we delete it. + +        #attach_docs = self._soledad.get_from_index( +            #fields.TYPE_C_HASH_PART_IDX, +            #fields.TYPE_ATTACHMENT_VAL, str(self._chash), str(part)) +        return first(attach_docs)      def _get_raw_msg(self):          """          Return the raw msg.          :rtype: basestring          """ -        return self._cdoc.content.get(self.RAW_KEY, '') +        # TODO deprecate this. +        return self._bdoc.content.get(self.RAW_KEY, '')      def __getitem__(self, key):          """ -        Return the content of the message document. +        Return an item from the content of the flags document, +        for convenience.          :param key: The key          :type key: str @@ -334,14 +623,73 @@ class LeapMessage(fields, MailParser, MBoxParser):          :return: The content value indexed by C{key} or None          :rtype: str          """ -        return self._cdoc.content.get(key, None) +        return self._fdoc.content.get(key, None) + +    # setters + +    # XXX to be used in the messagecopier interface?! + +    def set_uid(self, uid): +        """ +        Set new uid for this message. + +        :param uid: the new uid +        :type uid: basestring +        """ +        # XXX dangerous! lock? +        self._uid = uid +        d = self._fdoc +        d.content[self.UID_KEY] = uid +        self._soledad.put_doc(d) + +    def set_mbox(self, mbox): +        """ +        Set new mbox for this message. + +        :param mbox: the new mbox +        :type mbox: basestring +        """ +        # XXX dangerous! lock? +        self._mbox = mbox +        d = self._fdoc +        d.content[self.MBOX_KEY] = mbox +        self._soledad.put_doc(d) + +    # destructor + +    @deferred +    def remove(self): +        """ +        Remove all docs associated with this message. +        """ +        # XXX this would ve more efficient if we can just pass +        # a sequence of uids. + +        # XXX For the moment we are only removing the flags and headers +        # docs. The rest we leave there polluting your hard disk, +        # until we think about a good way of deorphaning. +        # Maybe a crawler of unreferenced docs. + +        fd = self._get_flags_doc() +        hd = self._get_headers_doc() +        #bd = self._get_body_doc() +        #docs = [fd, hd, bd] + +        docs = [fd, hd] + +        #for pn in range(self._get_num_parts()[1:]): +            #ad = self._get_attachment_doc(pn) +            #docs.append(ad) + +        for d in filter(None, docs): +            self._soledad.delete_doc(d)      def does_exist(self):          """ -        Return True if there is actually a message for this +        Return True if there is actually a flags message for this          UID and mbox.          """ -        return bool(self._fdoc) +        return self._fdoc is not None  SoledadWriterPayload = namedtuple( @@ -349,6 +697,8 @@ SoledadWriterPayload = namedtuple(  SoledadWriterPayload.CREATE = 1  SoledadWriterPayload.PUT = 2 +SoledadWriterPayload.BODY_CREATE = 3 +SoledadWriterPayload.ATTACHMENT_CREATE = 4  class SoledadDocWriter(object): @@ -378,20 +728,98 @@ class SoledadDocWriter(object):          empty = queue.empty()          while not empty:              item = queue.get() +            call = None +            payload = item.payload +              if item.mode == SoledadWriterPayload.CREATE:                  call = self._soledad.create_doc +            elif item.mode == SoledadWriterPayload.BODY_CREATE: +                if not self._body_does_exist(payload): +                    call = self._soledad.create_doc +            elif item.mode == SoledadWriterPayload.ATTACHMENT_CREATE: +                if not self._attachment_does_exist(payload): +                    call = self._soledad.create_doc              elif item.mode == SoledadWriterPayload.PUT:                  call = self._soledad.put_doc -            # should handle errors -            try: -                call(item.payload) -            except u1db_errors.RevisionConflict as exc: -                logger.error("Error: %r" % (exc,)) -                raise exc +            # XXX delete? + +            if call: +                # should handle errors +                try: +                    call(item.payload) +                except u1db_errors.RevisionConflict as exc: +                    logger.error("Error: %r" % (exc,)) +                    raise exc              empty = queue.empty() +    """ +    Message deduplication. + +    We do a query for the content hashes before writing to our beloved +    slcipher backend of Soledad. This means, by now, that: + +    1. We will not store the same attachment twice, only the hash of it. +    2. We will not store the same message body twice, only the hash of it. + +    The first case is useful if you are always receiving the same old memes +    from unwary friends that still have not discovered that 4chan is the +    generator of the internet. The second will save your day if you have +    initiated session with the same account in two different machines. I also +    wonder why would you do that, but let's respect each other choices, like +    with the religious celebrations, and assume that one day we'll be able +    to run Bitmask in completely free phones. Yes, I mean that, the whole GSM +    Stack. +    """ + +    def _body_does_exist(self, doc): +        """ +        Check whether we already have a body payload with this hash in our +        database. + +        :param doc: tentative body document +        :type doc: dict +        :returns: True if that happens, False otherwise. +        """ +        if not doc: +            return False +        chash = doc[fields.CONTENT_HASH_KEY] +        body_docs = self._soledad.get_from_index( +            fields.TYPE_C_HASH_IDX, +            fields.TYPE_MESSAGE_VAL, str(chash)) +        if not body_docs: +            return False +        if len(body_docs) != 1: +            logger.warning("Found more than one copy of chash %s!" +                           % (chash,)) +        logger.debug("Found body doc with that hash! Skipping save!") +        return True + +    def _attachment_does_exist(self, doc): +        """ +        Check whether we already have an attachment payload with this hash +        in our database. + +        :param doc: tentative body document +        :type doc: dict +        :returns: True if that happens, False otherwise. +        """ +        if not doc: +            return False +        phash = doc[fields.PAYLOAD_HASH_KEY] +        attach_docs = self._soledad.get_from_index( +            fields.TYPE_P_HASH_IDX, +            fields.TYPE_ATTACHMENT_VAL, str(phash)) +        if not attach_docs: +            return False + +        if len(attach_docs) != 1: +            logger.warning("Found more than one copy of phash %s!" +                           % (phash,)) +        logger.debug("Found attachment doc with that hash! Skipping save!") +        return True +  class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):      """ @@ -402,35 +830,62 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):      database.      """      # XXX this should be able to produce a MessageSet methinks - -    EMPTY_MSG = { -        fields.TYPE_KEY: fields.TYPE_MESSAGE_VAL, -        fields.UID_KEY: 1, -        fields.MBOX_KEY: fields.INBOX_VAL, - -        fields.SUBJECT_KEY: "", -        fields.DATE_KEY: "", -        fields.RAW_KEY: "", - -        # XXX should separate headers into another doc -        fields.HEADERS_KEY: {}, +    # could validate these kinds of objects turning them +    # into a template for the class. +    FLAGS_DOC = "FLAGS" +    HEADERS_DOC = "HEADERS" +    ATTACHMENT_DOC = "ATTACHMENT" +    BODY_DOC = "BODY" + +    templates = { + +        FLAGS_DOC: { +            fields.TYPE_KEY: fields.TYPE_FLAGS_VAL, +            fields.UID_KEY: 1, +            fields.MBOX_KEY: fields.INBOX_VAL, + +            fields.SEEN_KEY: False, +            fields.RECENT_KEY: True, +            fields.FLAGS_KEY: [], +            fields.MULTIPART_KEY: False, +            fields.SIZE_KEY: 0 +        }, + +        HEADERS_DOC: { +            fields.TYPE_KEY: fields.TYPE_HEADERS_VAL, +            fields.CONTENT_HASH_KEY: "", + +            fields.HEADERS_KEY: {}, +            fields.NUM_PARTS_KEY: 0, +            fields.PARTS_MAP_KEY: {}, +            fields.DATE_KEY: "", +            fields.SUBJECT_KEY: "" +        }, + +        ATTACHMENT_DOC: { +            fields.TYPE_KEY: fields.TYPE_ATTACHMENT_VAL, +            fields.PART_NUMBER_KEY: 0, +            fields.CONTENT_HASH_KEY:  "", +            fields.PAYLOAD_HASH_KEY: "", + +            fields.RAW_KEY: "" +        }, + +        BODY_DOC: { +            fields.TYPE_KEY: fields.TYPE_MESSAGE_VAL, +            fields.CONTENT_HASH_KEY: "", + +            fields.BODY_KEY: "", + +            # this should not be needed, +            # but let's keep the raw msg for some time +            # until we are sure we can reconstruct +            # the original msg from our disection. +            fields.RAW_KEY: "", + +        }      } -    EMPTY_FLAGS = { -        fields.TYPE_KEY: fields.TYPE_FLAGS_VAL, -        fields.UID_KEY: 1, -        fields.MBOX_KEY: fields.INBOX_VAL, - -        fields.FLAGS_KEY: [], -        fields.SEEN_KEY: False, -        fields.RECENT_KEY: True, -        fields.MULTIPART_KEY: False, -    } - -    # get from SoledadBackedAccount the needed index-related constants -    INDEXES = SoledadBackedAccount.INDEXES -    TYPE_IDX = SoledadBackedAccount.TYPE_IDX -      def __init__(self, mbox=None, soledad=None):          """          Constructor for MessageCollection. @@ -465,23 +920,16 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):              SoledadDocWriter(soledad),              period=0.05) -    def _get_empty_msg(self): +    def _get_empty_doc(self, _type=FLAGS_DOC):          """ -        Returns an empty message. - -        :return: a dict containing a default empty message +        Returns an empty doc for storing different message parts. +        Defaults to returning a template for a flags document. +        :return: a dict with the template          :rtype: dict          """ -        return copy.deepcopy(self.EMPTY_MSG) - -    def _get_empty_flags_doc(self): -        """ -        Returns an empty doc for storing flags. - -        :return: -        :rtype: -        """ -        return copy.deepcopy(self.EMPTY_FLAGS) +        if not _type in self.templates.keys(): +            raise TypeError("Improper type passed to _get_empty_doc") +        return copy.deepcopy(self.templates[_type])      @deferred      def add_msg(self, raw, subject=None, flags=None, date=None, uid=1): @@ -509,52 +957,107 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):              flags = tuple()          leap_assert_type(flags, tuple) -        content_doc = self._get_empty_msg() -        flags_doc = self._get_empty_flags_doc() - -        content_doc[self.MBOX_KEY] = self.mbox -        flags_doc[self.MBOX_KEY] = self.mbox -        # ...should get a sanity check here. -        content_doc[self.UID_KEY] = uid -        flags_doc[self.UID_KEY] = uid - -        if flags: -            flags_doc[self.FLAGS_KEY] = map(self._stringify, flags) -            flags_doc[self.SEEN_KEY] = self.SEEN_FLAG in flags +        # docs for flags, headers, and body +        fd, hd, bd = map( +            lambda t: self._get_empty_doc(t), +            (self.FLAGS_DOC, self.HEADERS_DOC, self.BODY_DOC))          msg = self._get_parsed_msg(raw)          headers = dict(msg) - -        logger.debug("adding. is multipart:%s" % msg.is_multipart()) -        flags_doc[self.MULTIPART_KEY] = msg.is_multipart() -        # XXX get lower case for keys? -        # XXX get headers doc -        content_doc[self.HEADERS_KEY] = headers -        # set subject based on message headers and eventually replace by -        # subject given as param -        if self.SUBJECT_FIELD in headers: -            content_doc[self.SUBJECT_KEY] = headers[self.SUBJECT_FIELD] -        if subject is not None: -            content_doc[self.SUBJECT_KEY] = subject - -        # XXX could separate body into its own doc -        # but should also separate multiparts -        # that should be wrapped in MessagePart -        content_doc[self.RAW_KEY] = self._stringify(raw) -        content_doc[self.SIZE_KEY] = len(raw) - +        raw_str = msg.as_string() +        chash = self._get_hash(msg) +        multi = msg.is_multipart() + +        attaches = [] +        inner_parts = [] + +        if multi: +            # XXX should walk down recursively +            # in a better way.  but fixing this quick +            # to have an rc. +            # XXX should pick the content-type in txt +            body = first(msg.get_payload()).get_payload() +            if isinstance(body, list): +                # allowing one nesting level for now... +                body, rest = body[0].get_payload(), body[1:] +                for p in rest: +                    inner_parts.append(p) +        else: +            body = msg.get_payload() +        logger.debug("adding msg (multipart:%s)" % multi) + +        # flags doc --------------------------------------- +        fd[self.MBOX_KEY] = self.mbox +        fd[self.UID_KEY] = uid +        fd[self.CONTENT_HASH_KEY] = chash +        fd[self.MULTIPART_KEY] = multi +        fd[self.SIZE_KEY] = len(raw_str) +        if flags: +            fd[self.FLAGS_KEY] = map(self._stringify, flags) +            fd[self.SEEN_KEY] = self.SEEN_FLAG in flags +            fd[self.RECENT_KEY] = self.RECENT_FLAG in flags + +        # headers doc ---------------------------------------- +        hd[self.CONTENT_HASH_KEY] = chash +        hd[self.HEADERS_KEY] = headers +        if not subject and self.SUBJECT_FIELD in headers: +            hd[self.SUBJECT_KEY] = headers[self.SUBJECT_FIELD] +        else: +            hd[self.SUBJECT_KEY] = subject          if not date and self.DATE_FIELD in headers: -            content_doc[self.DATE_KEY] = headers[self.DATE_FIELD] +            hd[self.DATE_KEY] = headers[self.DATE_FIELD]          else: -            content_doc[self.DATE_KEY] = date - -        logger.debug('enqueuing message for write') - +            hd[self.DATE_KEY] = date +        if multi: +            hd[self.NUM_PARTS_KEY] = len(msg.get_payload()) + +        # body doc +        bd[self.CONTENT_HASH_KEY] = chash +        bd[self.BODY_KEY] = body +        # in an ideal world, we would not need to save a copy of the +        # raw message. But we'll keep it until we can be sure that +        # we can rebuild the original message from the parts. +        bd[self.RAW_KEY] = raw_str + +        docs = [fd, hd] + +        # attachment docs +        if multi: +            outer_parts = msg.get_payload() +            parts = outer_parts + inner_parts + +            # skip first part, we already got it in body +            to_attach = ((i, m) for i, m in enumerate(parts) if i > 0) +            for index, part_msg in to_attach: +                att_doc = self._get_empty_doc(self.ATTACHMENT_DOC) +                att_doc[self.PART_NUMBER_KEY] = index +                att_doc[self.CONTENT_HASH_KEY] = chash +                phash = self._get_hash(part_msg) +                att_doc[self.PAYLOAD_HASH_KEY] = phash +                att_doc[self.RAW_KEY] = part_msg.as_string() + +                # keep a pointer to the payload hash in the +                # headers doc, under the parts_map +                hd[self.PARTS_MAP_KEY][str(index)] = phash +                attaches.append(att_doc) + +        # Saving ... ------------------------------- +        # ok, there we go... +        logger.debug('enqueuing message docs for write')          ptuple = SoledadWriterPayload + +        # first, regular docs: flags and headers +        for doc in docs: +            self.soledad_writer.put(ptuple( +                mode=ptuple.CREATE, payload=doc)) +        # second, try to create body doc.          self.soledad_writer.put(ptuple( -            mode=ptuple.CREATE, payload=content_doc)) -        self.soledad_writer.put(ptuple( -            mode=ptuple.CREATE, payload=flags_doc)) +            mode=ptuple.BODY_CREATE, payload=bd)) +        # and last, but not least, try to create +        # attachment docs if not already there. +        for at in attaches: +            self.soledad_writer.put(ptuple( +                mode=ptuple.ATTACHMENT_CREATE, payload=at))      def remove(self, msg):          """ @@ -563,8 +1066,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          :param msg: a  Leapmessage instance          :type msg: LeapMessage          """ -        # XXX remove -        #self._soledad.delete_doc(msg)          msg.remove()      # getters @@ -596,14 +1097,14 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          :rtype: list of SoledadDocument          """          if _type not in fields.__dict__.values(): -            raise TypeError("Wrong type passed to get_all") +            raise TypeError("Wrong type passed to get_all_docs")          if sameProxiedObjects(self._soledad, None):              logger.warning('Tried to get messages but soledad is None!')              return []          all_docs = [doc for doc in self._soledad.get_from_index( -            SoledadBackedAccount.TYPE_MBOX_IDX, +            fields.TYPE_MBOX_IDX,              _type, self.mbox)]          # inneficient, but first let's grok it and then @@ -618,8 +1119,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          """          all_uids = (doc.content[self.UID_KEY] for doc in                      self._soledad.get_from_index( -                        SoledadBackedAccount.TYPE_MBOX_IDX, -                        self.TYPE_FLAGS_VAL, self.mbox)) +                        fields.TYPE_MBOX_IDX, +                        fields.TYPE_FLAGS_VAL, self.mbox))          return (u for u in sorted(all_uids))      def count(self): @@ -629,7 +1130,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          :rtype: int          """          count = self._soledad.get_count_from_index( -            SoledadBackedAccount.TYPE_MBOX_IDX, +            fields.TYPE_MBOX_IDX,              fields.TYPE_FLAGS_VAL, self.mbox)          return count @@ -645,8 +1146,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          """          return (doc.content[self.UID_KEY] for doc in                  self._soledad.get_from_index( -                    SoledadBackedAccount.TYPE_MBOX_SEEN_IDX, -                    self.TYPE_FLAGS_VAL, self.mbox, '0')) +                    fields.TYPE_MBOX_SEEN_IDX, +                    fields.TYPE_FLAGS_VAL, self.mbox, '0'))      def count_unseen(self):          """ @@ -656,8 +1157,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          :rtype: int          """          count = self._soledad.get_count_from_index( -            SoledadBackedAccount.TYPE_MBOX_SEEN_IDX, -            self.TYPE_FLAGS_VAL, self.mbox, '0') +            fields.TYPE_MBOX_SEEN_IDX, +            fields.TYPE_FLAGS_VAL, self.mbox, '0')          return count      def get_unseen(self): @@ -681,8 +1182,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          """          return (doc.content[self.UID_KEY] for doc in                  self._soledad.get_from_index( -                    SoledadBackedAccount.TYPE_MBOX_RECT_IDX, -                    self.TYPE_FLAGS_VAL, self.mbox, '1')) +                    fields.TYPE_MBOX_RECT_IDX, +                    fields.TYPE_FLAGS_VAL, self.mbox, '1'))      def get_recent(self):          """ @@ -702,8 +1203,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          :rtype: int          """          count = self._soledad.get_count_from_index( -            SoledadBackedAccount.TYPE_MBOX_RECT_IDX, -            self.TYPE_FLAGS_VAL, self.mbox, '1') +            fields.TYPE_MBOX_RECT_IDX, +            fields.TYPE_FLAGS_VAL, self.mbox, '1')          return count      def __len__(self): @@ -731,5 +1232,5 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          return u"<MessageCollection: mbox '%s' (%s)>" % (              self.mbox, self.count()) -    # XXX should implement __eq__ also !!! --- use a hash -    # of content for that, will be used for dedup. +    # XXX should implement __eq__ also !!! +    # --- use the content hash for that, will be used for dedup. diff --git a/mail/src/leap/mail/imap/parser.py b/mail/src/leap/mail/imap/parser.py index 1ae19c0..306dcf0 100644 --- a/mail/src/leap/mail/imap/parser.py +++ b/mail/src/leap/mail/imap/parser.py @@ -19,10 +19,14 @@ Mail parser mixins.  """  import cStringIO  import StringIO +import hashlib  import re +from email.message import Message  from email.parser import Parser +from leap.common.check import leap_assert_type +  class MailParser(object):      """ @@ -34,16 +38,30 @@ class MailParser(object):          """          self._parser = Parser() -    def _get_parsed_msg(self, raw): +    def _get_parsed_msg(self, raw, headersonly=False):          """          Return a parsed Message.          :param raw: the raw string to parse          :type raw: basestring, or StringIO object + +        :param headersonly: True for parsing only the headers. +        :type headersonly: bool          """ -        msg = self._get_parser_fun(raw)(raw, True) +        msg = self._get_parser_fun(raw)(raw, headersonly=headersonly)          return msg +    def _get_hash(self, msg): +        """ +        Returns a hash of the string representation of the raw message, +        suitable for indexing the inmutable pieces. + +        :param msg: a Message object +        :type msg: Message +        """ +        leap_assert_type(msg, Message) +        return hashlib.sha256(msg.as_string()).hexdigest() +      def _get_parser_fun(self, o):          """          Retunn the proper parser function for an object. @@ -67,6 +85,8 @@ class MailParser(object):          :param o: object          :type o: object          """ +        # XXX Maybe we don't need no more, we're using +        # msg.as_string()          if isinstance(o, (cStringIO.OutputType, StringIO.StringIO)):              return o.getvalue()          else: | 
