From da9b210c4bd16d67b4b47b299df7913b2d2f1066 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 7 Jan 2014 11:34:08 -0400 Subject: Second stage of the new year's storage rewrite. * documents of only three types: * flags * headers * content * add algorithm for walking the parsed message tree. * treat special cases like a multipart with a single part. * modify add_msg to use the walk routine * modify twisted interfaces to use the new storage schema. * tests for different multipart cases * fix multipart detection typo in the fetch This is a merge proposal for the 0.5.0-rc3. known bugs ---------- Some things are still know not to work well at this point (some cases of multipart messages do not display the bodies). IMAP server also is left in a bad internal state after a logout/login. --- mail/src/leap/mail/decorators.py | 5 + mail/src/leap/mail/imap/fetch.py | 2 +- mail/src/leap/mail/imap/fields.py | 26 +- mail/src/leap/mail/imap/messages.py | 722 +++++++++++---------- mail/src/leap/mail/imap/service/imap.py | 2 + .../mail/imap/tests/rfc822.multi-minimal.message | 16 + mail/src/leap/mail/imap/tests/rfc822.plain.message | 66 ++ mail/src/leap/mail/imap/tests/walktree.py | 117 ++++ mail/src/leap/mail/walk.py | 160 +++++ 9 files changed, 768 insertions(+), 348 deletions(-) create mode 100644 mail/src/leap/mail/imap/tests/rfc822.multi-minimal.message create mode 100644 mail/src/leap/mail/imap/tests/rfc822.plain.message create mode 100644 mail/src/leap/mail/imap/tests/walktree.py create mode 100644 mail/src/leap/mail/walk.py diff --git a/mail/src/leap/mail/decorators.py b/mail/src/leap/mail/decorators.py index 024a139..d5eac97 100644 --- a/mail/src/leap/mail/decorators.py +++ b/mail/src/leap/mail/decorators.py @@ -27,6 +27,11 @@ from twisted.internet.threads import deferToThread logger = logging.getLogger(__name__) +# TODO +# Should write a helper to be able to pass a timeout argument. +# See this answer: http://stackoverflow.com/a/19019648/1157664 +# And the notes by glyph and jpcalderone + def deferred(f): """ Decorator, for deferring methods to Threads. diff --git a/mail/src/leap/mail/imap/fetch.py b/mail/src/leap/mail/imap/fetch.py index cb200be..604a2ea 100644 --- a/mail/src/leap/mail/imap/fetch.py +++ b/mail/src/leap/mail/imap/fetch.py @@ -404,7 +404,7 @@ class LeapIncomingMail(object): """ log.msg('decrypting multipart encrypted msg') msg = copy.deepcopy(msg) - self._multipart_sanity_check(msg) + self._msg_multipart_sanity_check(msg) # parse message and get encrypted content pgpencmsg = msg.get_payload()[1] diff --git a/mail/src/leap/mail/imap/fields.py b/mail/src/leap/mail/imap/fields.py index bc536fe..2545adf 100644 --- a/mail/src/leap/mail/imap/fields.py +++ b/mail/src/leap/mail/imap/fields.py @@ -43,17 +43,17 @@ class WithMsgFields(object): # headers HEADERS_KEY = "headers" - NUM_PARTS_KEY = "numparts" - PARTS_MAP_KEY = "partmap" DATE_KEY = "date" SUBJECT_KEY = "subject" - - # attachment - PART_NUMBER_KEY = "part" - RAW_KEY = "raw" + # XXX DELETE-ME + #NUM_PARTS_KEY = "numparts" # not needed?! + PARTS_MAP_KEY = "part_map" + BODY_KEY = "body" # link to phash of body # content - BODY_KEY = "body" + LINKED_FROM_KEY = "lkf" + RAW_KEY = "raw" + CTYPE_KEY = "ctype" # Mailbox specific keys CLOSED_KEY = "closed" @@ -65,11 +65,13 @@ class WithMsgFields(object): # Document Type, for indexing TYPE_KEY = "type" TYPE_MBOX_VAL = "mbox" - TYPE_MESSAGE_VAL = "msg" TYPE_FLAGS_VAL = "flags" TYPE_HEADERS_VAL = "head" - TYPE_ATTACHMENT_VAL = "attach" - # should add also a headers val + TYPE_CONTENT_VAL = "cnt" + + # XXX DEPRECATE + #TYPE_MESSAGE_VAL = "msg" + #TYPE_ATTACHMENT_VAL = "attach" INBOX_VAL = "inbox" @@ -109,7 +111,6 @@ class WithMsgFields(object): MBOX_VAL = TYPE_MBOX_VAL CHASH_VAL = CONTENT_HASH_KEY PHASH_VAL = PAYLOAD_HASH_KEY - PART_VAL = PART_NUMBER_KEY INDEXES = { # generic @@ -122,8 +123,7 @@ class WithMsgFields(object): # content, headers doc TYPE_C_HASH_IDX: [KTYPE, CHASH_VAL], - # attachment docs - TYPE_C_HASH_PART_IDX: [KTYPE, CHASH_VAL, PART_VAL], + # attachment payload dedup TYPE_P_HASH_IDX: [KTYPE, PHASH_VAL], diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py index bfe913c..37e4311 100644 --- a/mail/src/leap/mail/imap/messages.py +++ b/mail/src/leap/mail/imap/messages.py @@ -33,6 +33,7 @@ from zope.proxy import sameProxiedObjects from leap.common.check import leap_assert, leap_assert_type from leap.common.decorators import memoized_method from leap.common.mail import get_email_charset +from leap.mail import walk from leap.mail.utils import first from leap.mail.decorators import deferred from leap.mail.imap.index import IndexedDB @@ -43,65 +44,58 @@ from leap.mail.messageflow import IMessageConsumer, MessageProducer logger = logging.getLogger(__name__) -class MessageBody(object): - """ - IMessagePart implementor for the main - body of a multipart message. - - Excusatio non petita: see the interface documentation. - """ +# TODO ------------------------------------------------------------ - implements(imap4.IMessagePart) - - def __init__(self, fdoc, bdoc): - self._fdoc = fdoc - self._bdoc = bdoc - - def getSize(self): - return len(self._bdoc.content[fields.BODY_KEY]) +# [ ] Add linked-from info. +# [ ] Delete incoming mail only after successful write! +# [ ] Remove UID from syncable db. Store only those indexes locally. +# [ ] Send patch to twisted for bug in imap4.py:5717 (content-type can be +# none? lower-case?) - def getBodyFile(self): - fd = StringIO.StringIO() - - if self._bdoc: - body = self._bdoc.content[fields.BODY_KEY] - else: - body = "" - charset = self._get_charset(body) - try: - body = body.encode(charset) - except (UnicodeEncodeError, UnicodeDecodeError) as e: - logger.error("Unicode error {0}".format(e)) - body = body.encode(charset, 'replace') - fd.write(body) - fd.seek(0) - return fd - - @memoized_method - def _get_charset(self, stuff): - return get_email_charset(unicode(stuff)) - - def getHeaders(self, negate, *names): - return {} +def lowerdict(_dict): + """ + Return a dict with the keys in lowercase. - def isMultipart(self): - return False + :param _dict: the dict to convert + :rtype: dict + """ + return dict((key.lower(), value) + for key, value in _dict.items()) - def getSubPart(self, part): - return None +class MessagePart(object): + """ + IMessagePart implementor. + It takes a subpart message and is able to find + the inner parts. -class MessageAttachment(object): + Excusatio non petita: see the interface documentation. + """ implements(imap4.IMessagePart) - def __init__(self, msg): + def __init__(self, soledad, part_map): """ - Initializes the messagepart with a Message instance. - :param msg: a message instance - :type msg: Message + Initializes the MessagePart. + + :param part_map: a dictionary containing the parts map for this + message + :type part_map: dict """ - self._msg = msg + # TODO + # It would be good to pass the uid/mailbox also + # for references while debugging. + + # We have a problem on bulk moves, and is + # that when the fetch on the new mailbox is done + # the parts maybe are not complete. + # So we should be able to fail with empty + # docs until we solve that. The ideal would be + # to gather the results of the deferred operations + # to signal the operation is complete. + #leap_assert(part_map, "part map dict cannot be null") + self._soledad = soledad + self._pmap = part_map def getSize(self): """ @@ -110,9 +104,12 @@ class MessageAttachment(object): :return: size of the message, in octets :rtype: int """ - if not self._msg: + if not self._pmap: return 0 - return len(self._msg.as_string()) + size = self._pmap.get('size', None) + if not size: + logger.error("Message part cannot find size in the partmap") + return size def getBodyFile(self): """ @@ -122,24 +119,91 @@ class MessageAttachment(object): :rtype: StringIO """ fd = StringIO.StringIO() - if self._msg: - body = self._msg.get_payload() + if self._pmap: + multi = self._pmap.get('multi') + if not multi: + phash = self._pmap.get("phash", None) + else: + pmap = self._pmap.get('part_map') + first_part = pmap.get('1', None) + if first_part: + phash = first_part['phash'] + + if not phash: + logger.warning("Could not find phash for this subpart!") + payload = str("") + else: + payload = self._get_payload_from_document(phash) + else: - logger.debug("Empty message!") - body = "" - - # XXX should only do the dance if we're sure it's - # content/text-plain!!! - #charset = self._get_charset(body) - #try: - #body = body.encode(charset) - #except (UnicodeEncodeError, UnicodeDecodeError) as e: - #logger.error("Unicode error {0}".format(e)) - #body = body.encode(charset, 'replace') - fd.write(body) + logger.warning("Message with no part_map!") + payload = str("") + + if payload: + #headers = self.getHeaders(True) + #headers = lowerdict(headers) + #content_type = headers.get('content-type', "") + content_type = self._get_ctype_from_document(phash) + charset_split = content_type.split('charset=') + # XXX fuck all this, use a regex! + if len(charset_split) > 1: + charset = charset_split[1] + if charset: + charset = charset.strip() + else: + charset = None + if not charset: + charset = self._get_charset(payload) + try: + payload = payload.encode(charset) + except (UnicodeEncodeError, UnicodeDecodeError) as e: + logger.error("Unicode error {0}".format(e)) + payload = payload.encode(charset, 'replace') + + fd.write(payload) fd.seek(0) return fd + # TODO cache the phash retrieval + def _get_payload_from_document(self, phash): + """ + Gets the message payload from the content document. + + :param phash: the payload hash to retrieve by. + :type phash: basestring + """ + cdocs = self._soledad.get_from_index( + fields.TYPE_P_HASH_IDX, + fields.TYPE_CONTENT_VAL, str(phash)) + + cdoc = first(cdocs) + if not cdoc: + logger.warning( + "Could not find the content doc " + "for phash %s" % (phash,)) + payload = cdoc.content.get(fields.RAW_KEY, "") + return payload + + # TODO cache the pahash retrieval + def _get_ctype_from_document(self, phash): + """ + Gets the content-type from the content document. + + :param phash: the payload hash to retrieve by. + :type phash: basestring + """ + cdocs = self._soledad.get_from_index( + fields.TYPE_P_HASH_IDX, + fields.TYPE_CONTENT_VAL, str(phash)) + + cdoc = first(cdocs) + if not cdoc: + logger.warning( + "Could not find the content doc " + "for phash %s" % (phash,)) + ctype = cdoc.content.get('ctype', "") + return ctype + @memoized_method def _get_charset(self, stuff): # TODO put in a common class with LeapMessage @@ -150,8 +214,6 @@ class MessageAttachment(object): :type stuff: basestring :returns: charset """ - # XXX existential doubt 1. wouldn't be smarter to - # peek into the mail headers? # XXX existential doubt 2. shouldn't we make the scope # of the decorator somewhat more persistent? # ah! yes! and put memory bounds. @@ -172,9 +234,17 @@ class MessageAttachment(object): :return: A mapping of header field names to header field values :rtype: dict """ - if not self._msg: + if not self._pmap: + logger.warning("No pmap in Subpart!") return {} - headers = dict(self._msg.items()) + headers = dict(self._pmap.get("headers", [])) + + # twisted imap server expects *some* headers to be lowercase + # We could use a CaseInsensitiveDict here... + headers = dict( + (str(key), str(value)) if key.lower() != "content-type" + else (str(key.lower()), str(value)) + for (key, value) in headers.items()) names = map(lambda s: s.upper(), names) if negate: @@ -187,13 +257,18 @@ class MessageAttachment(object): map(str, (key, val)) for key, val in headers.items() if cond(key)] - return dict(filter_by_cond) + filtered = dict(filter_by_cond) + return filtered def isMultipart(self): """ Return True if this message is multipart. """ - return self._msg.is_multipart() + if not self._pmap: + logger.warning("Could not get part map!") + return False + multi = self._pmap.get("multi", False) + return multi def getSubPart(self, part): """ @@ -206,10 +281,30 @@ class MessageAttachment(object): :rtype: Any object implementing C{IMessagePart}. :return: The specified sub-part. """ - return self._msg.get_payload() + if not self.isMultipart(): + raise TypeError + sub_pmap = self._pmap.get("part_map", {}) + try: + part_map = sub_pmap[str(part + 1)] + except KeyError: + logger.debug("getSubpart for %s: KeyError" % (part,)) + raise IndexError + + # XXX check for validity + return MessagePart(self._soledad, part_map) class LeapMessage(fields, MailParser, MBoxParser): + """ + The main representation of a message. + + It indexes the messages in one mailbox by a combination + of uid+mailbox name. + """ + + # TODO this has to change. + # Should index primarily by chash, and keep a local-lonly + # UID table. implements(imap4.IMessage) @@ -268,6 +363,8 @@ class LeapMessage(fields, MailParser, MBoxParser): """ An accessor to the body document. """ + if not self._hdoc: + return None if not self.__bdoc: self.__bdoc = self._get_body_doc() return self.__bdoc @@ -320,6 +417,11 @@ class LeapMessage(fields, MailParser, MBoxParser): log.msg('setting flags: %s' % (self._uid)) doc = self._fdoc + if not doc: + logger.warning( + "Could not find FDOC for %s:%s while setting flags!" % + (self._mbox, self._uid)) + return doc.content[self.FLAGS_KEY] = flags doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags doc.content[self.RECENT_KEY] = self.RECENT_FLAG in flags @@ -384,16 +486,25 @@ class LeapMessage(fields, MailParser, MBoxParser): fd = StringIO.StringIO() bdoc = self._bdoc if bdoc: - body = self._bdoc.content.get(self.BODY_KEY, "") + body = str(self._bdoc.content.get(self.RAW_KEY, "")) else: - body = "" + logger.warning("No BDOC found for message.") + body = str("") + + # XXX not needed, isn't it? ---- ivan? + #if bdoc: + #content_type = bdoc.content.get('content-type', "") + #charset = content_type.split('charset=')[1] + #if charset: + #charset = charset.strip() + #if not charset: + #charset = self._get_charset(body) + #try: + #body = str(body.encode(charset)) + #except (UnicodeEncodeError, UnicodeDecodeError) as e: + #logger.error("Unicode error {0}".format(e)) + #body = str(body.encode(charset, 'replace')) - charset = self._get_charset(body) - try: - body = body.encode(charset) - except (UnicodeEncodeError, UnicodeDecodeError) as e: - logger.error("Unicode error {0}".format(e)) - body = body.encode(charset, 'replace') fd.write(body) fd.seek(0) return fd @@ -407,8 +518,7 @@ class LeapMessage(fields, MailParser, MBoxParser): :type stuff: basestring :returns: charset """ - # XXX existential doubt 1. wouldn't be smarter to - # peek into the mail headers? + # TODO get from subpart headers # XXX existential doubt 2. shouldn't we make the scope # of the decorator somewhat more persistent? # ah! yes! and put memory bounds. @@ -447,9 +557,11 @@ class LeapMessage(fields, MailParser, MBoxParser): :return: A mapping of header field names to header field values :rtype: dict """ + # TODO split in smaller methods headers = self._get_headers() if not headers: - return {'content-type': ''} + logger.warning("No headers found") + return {str('content-type'): str('')} names = map(lambda s: s.upper(), names) if negate: @@ -457,16 +569,20 @@ class LeapMessage(fields, MailParser, MBoxParser): else: cond = lambda key: key.upper() in names - head = copy.deepcopy(dict(headers.items())) + if isinstance(headers, list): + headers = dict(headers) - # twisted imap server expects headers to be lowercase - head = dict( - (str(key), map(str, value)) if key.lower() != "content-type" - else (str(key.lower(), map(str, value))) - for (key, value) in head.items()) + # twisted imap server expects *some* headers to be lowercase + # XXX refactor together with MessagePart method + headers = dict( + (str(key), str(value)) if key.lower() != "content-type" + else (str(key.lower()), str(value)) + for (key, value) in headers.items()) # unpack and filter original dict by negate-condition - filter_by_cond = [(key, val) for key, val in head.items() if cond(key)] + filter_by_cond = [(key, val) for key, val + in headers.items() if cond(key)] + return dict(filter_by_cond) def _get_headers(self): @@ -474,7 +590,9 @@ class LeapMessage(fields, MailParser, MBoxParser): Return the headers dict for this message. """ if self._hdoc is not None: - return self._hdoc.content.get(self.HEADERS_KEY, {}) + headers = self._hdoc.content.get(self.HEADERS_KEY, {}) + return headers + else: logger.warning( "No HEADERS doc for msg %s:%s" % ( @@ -486,12 +604,13 @@ class LeapMessage(fields, MailParser, MBoxParser): Return True if this message is multipart. """ if self._fdoc: - return self._fdoc.content.get(self.MULTIPART_KEY, False) + is_multipart = self._fdoc.content.get(self.MULTIPART_KEY, False) + return is_multipart else: logger.warning( "No FLAGS doc for msg %s:%s" % ( - self.mbox, - self.uid)) + self._mbox, + self._uid)) def getSubPart(self, part): """ @@ -504,27 +623,33 @@ class LeapMessage(fields, MailParser, MBoxParser): :rtype: Any object implementing C{IMessagePart}. :return: The specified sub-part. """ - logger.debug("Getting subpart: %s" % part) if not self.isMultipart(): raise TypeError - - if part == 0: - # Let's get the first part, which - # is really the body. - return MessageBody(self._fdoc, self._bdoc) - - attach_doc = self._get_attachment_doc(part) - if not attach_doc: - # so long and thanks for all the fish - logger.debug("...not today") + try: + pmap_dict = self._get_part_from_parts_map(part + 1) + except KeyError: + logger.debug("getSubpart for %s: KeyError" % (part,)) raise IndexError - msg_part = self._get_parsed_msg(attach_doc.content[self.RAW_KEY]) - return MessageAttachment(msg_part) + return MessagePart(self._soledad, pmap_dict) # # accessors # + def _get_part_from_parts_map(self, part): + """ + Get a part map from the headers doc + + :raises: KeyError if key does not exist + :rtype: dict + """ + if not self._hdoc: + logger.warning("Tried to get part but no HDOC found!") + return None + + pmap = self._hdoc.content.get(fields.PARTS_MAP_KEY, {}) + return pmap[str(part)] + def _get_flags_doc(self): """ Return the document that keeps the flags for this @@ -550,63 +675,16 @@ class LeapMessage(fields, MailParser, MBoxParser): Return the document that keeps the body for this message. """ - body_docs = self._soledad.get_from_index( - fields.TYPE_C_HASH_IDX, - fields.TYPE_MESSAGE_VAL, str(self._chash)) - return first(body_docs) - - def _get_num_parts(self): - """ - Return the number of parts for a multipart message. - """ - if not self.isMultipart(): - raise TypeError( - "Tried to get num parts in a non-multipart message") - if not self._hdoc: - return None - return self._hdoc.content.get(fields.NUM_PARTS_KEY, 2) - - def _get_attachment_doc(self, part): - """ - Return the document that keeps the headers for this - message. - - :param part: the part number for the multipart message. - :type part: int - """ - if not self._hdoc: - return None - try: - phash = self._hdoc.content[self.PARTS_MAP_KEY][str(part)] - except KeyError: - # this is the remnant of a debug session until - # I found that the index is actually a string... - # It should be safe to just raise the KeyError now, - # but leaving it here while the blood is fresh... - logger.warning("We expected a phash in the " - "index %s, but noone found" % (part, )) - logger.debug(self._hdoc.content[self.PARTS_MAP_KEY]) + body_phash = self._hdoc.content.get( + fields.BODY_KEY, None) + if not body_phash: + logger.warning("No body phash for this document!") return None - attach_docs = self._soledad.get_from_index( + body_docs = self._soledad.get_from_index( fields.TYPE_P_HASH_IDX, - fields.TYPE_ATTACHMENT_VAL, str(phash)) - - # The following is true for the fist owner. - # We could use this relationship to flag the "owner" - # and orphan when we delete it. + fields.TYPE_CONTENT_VAL, str(body_phash)) - #attach_docs = self._soledad.get_from_index( - #fields.TYPE_C_HASH_PART_IDX, - #fields.TYPE_ATTACHMENT_VAL, str(self._chash), str(part)) - return first(attach_docs) - - def _get_raw_msg(self): - """ - Return the raw msg. - :rtype: basestring - """ - # TODO deprecate this. - return self._bdoc.content.get(self.RAW_KEY, '') + return first(body_docs) def __getitem__(self, key): """ @@ -658,27 +736,22 @@ class LeapMessage(fields, MailParser, MBoxParser): """ Remove all docs associated with this message. """ - # XXX this would ve more efficient if we can just pass - # a sequence of uids. - # XXX For the moment we are only removing the flags and headers # docs. The rest we leave there polluting your hard disk, # until we think about a good way of deorphaning. # Maybe a crawler of unreferenced docs. + # XXX implement elijah's idea of using a PUT document as a + # token to ensure consistency in the removal. + uid = self._uid - print "removing...", uid fd = self._get_flags_doc() - hd = self._get_headers_doc() + #hd = self._get_headers_doc() #bd = self._get_body_doc() #docs = [fd, hd, bd] - docs = [fd, hd] - - #for pn in range(self._get_num_parts()[1:]): - #ad = self._get_attachment_doc(pn) - #docs.append(ad) + docs = [fd] for d in filter(None, docs): try: @@ -703,8 +776,7 @@ SoledadWriterPayload = namedtuple( SoledadWriterPayload.CREATE = 1 SoledadWriterPayload.PUT = 2 -SoledadWriterPayload.BODY_CREATE = 3 -SoledadWriterPayload.ATTACHMENT_CREATE = 4 +SoledadWriterPayload.CONTENT_CREATE = 3 class SoledadDocWriter(object): @@ -723,6 +795,38 @@ class SoledadDocWriter(object): """ self._soledad = soledad + def _get_call_for_item(self, item): + """ + Return the proper call type for a given item. + + :param item: one of the types defined under the + attributes of SoledadWriterPayload + :type item: int + """ + call = None + payload = item.payload + + if item.mode == SoledadWriterPayload.CREATE: + call = self._soledad.create_doc + elif (item.mode == SoledadWriterPayload.CONTENT_CREATE + and not self._content_does_exist(payload)): + call = self._soledad.create_doc + elif item.mode == SoledadWriterPayload.PUT: + call = self._soledad.put_doc + return call + + def _process(self, queue): + """ + Return the item and the proper call type for the next + item in the queue if any. + + :param queue: the queue from where we'll pick item. + :type queue: Queue + """ + item = queue.get() + call = self._get_call_for_item(item) + return item, call + def consume(self, queue): """ Creates a new document in soledad db. @@ -733,24 +837,10 @@ class SoledadDocWriter(object): """ empty = queue.empty() while not empty: - item = queue.get() - call = None - payload = item.payload - - if item.mode == SoledadWriterPayload.CREATE: - call = self._soledad.create_doc - elif item.mode == SoledadWriterPayload.BODY_CREATE: - if not self._body_does_exist(payload): - call = self._soledad.create_doc - elif item.mode == SoledadWriterPayload.ATTACHMENT_CREATE: - if not self._attachment_does_exist(payload): - call = self._soledad.create_doc - elif item.mode == SoledadWriterPayload.PUT: - call = self._soledad.put_doc - - # XXX delete? + item, call = self._process(queue) if call: + # XXX should handle the delete case # should handle errors try: call(item.payload) @@ -779,33 +869,10 @@ class SoledadDocWriter(object): Stack. """ - def _body_does_exist(self, doc): + def _content_does_exist(self, doc): """ - Check whether we already have a body payload with this hash in our - database. - - :param doc: tentative body document - :type doc: dict - :returns: True if that happens, False otherwise. - """ - if not doc: - return False - chash = doc[fields.CONTENT_HASH_KEY] - body_docs = self._soledad.get_from_index( - fields.TYPE_C_HASH_IDX, - fields.TYPE_MESSAGE_VAL, str(chash)) - if not body_docs: - return False - if len(body_docs) != 1: - logger.warning("Found more than one copy of chash %s!" - % (chash,)) - logger.debug("Found body doc with that hash! Skipping save!") - return True - - def _attachment_does_exist(self, doc): - """ - Check whether we already have an attachment payload with this hash - in our database. + Check whether we already have a content document for a payload + with this hash in our database. :param doc: tentative body document :type doc: dict @@ -816,7 +883,7 @@ class SoledadDocWriter(object): phash = doc[fields.PAYLOAD_HASH_KEY] attach_docs = self._soledad.get_from_index( fields.TYPE_P_HASH_IDX, - fields.TYPE_ATTACHMENT_VAL, str(phash)) + fields.TYPE_CONTENT_VAL, str(phash)) if not attach_docs: return False @@ -840,15 +907,15 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # into a template for the class. FLAGS_DOC = "FLAGS" HEADERS_DOC = "HEADERS" - ATTACHMENT_DOC = "ATTACHMENT" - BODY_DOC = "BODY" + CONTENT_DOC = "CONTENT" templates = { FLAGS_DOC: { fields.TYPE_KEY: fields.TYPE_FLAGS_VAL, - fields.UID_KEY: 1, + fields.UID_KEY: 1, # XXX moe to a local table fields.MBOX_KEY: fields.INBOX_VAL, + fields.CONTENT_HASH_KEY: "", fields.SEEN_KEY: False, fields.RECENT_KEY: True, @@ -862,35 +929,28 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): fields.TYPE_KEY: fields.TYPE_HEADERS_VAL, fields.CONTENT_HASH_KEY: "", + fields.DATE_KEY: "", + fields.SUBJECT_KEY: "", + fields.HEADERS_KEY: {}, - fields.NUM_PARTS_KEY: 0, fields.PARTS_MAP_KEY: {}, - fields.DATE_KEY: "", - fields.SUBJECT_KEY: "" }, - ATTACHMENT_DOC: { - fields.TYPE_KEY: fields.TYPE_ATTACHMENT_VAL, - fields.PART_NUMBER_KEY: 0, - fields.CONTENT_HASH_KEY: "", + CONTENT_DOC: { + fields.TYPE_KEY: fields.TYPE_CONTENT_VAL, fields.PAYLOAD_HASH_KEY: "", + fields.LINKED_FROM_KEY: [], + fields.CTYPE_KEY: "", # should index by this too - fields.RAW_KEY: "" - }, - - BODY_DOC: { - fields.TYPE_KEY: fields.TYPE_MESSAGE_VAL, - fields.CONTENT_HASH_KEY: "", - - fields.BODY_KEY: "", - - # this should not be needed, - # but let's keep the raw msg for some time - # until we are sure we can reconstruct - # the original msg from our disection. + # should only get inmutable headers parts + # (for indexing) + fields.HEADERS_KEY: {}, fields.RAW_KEY: "", + fields.PARTS_MAP_KEY: {}, + fields.HEADERS_KEY: {}, + fields.MULTIPART_KEY: False, + }, - } } def __init__(self, mbox=None, soledad=None): @@ -938,128 +998,124 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): raise TypeError("Improper type passed to _get_empty_doc") return copy.deepcopy(self.templates[_type]) - @deferred - def add_msg(self, raw, subject=None, flags=None, date=None, uid=1): + def _do_parse(self, raw): """ - Creates a new message document. + Parse raw message and return it along with + relevant information about its outer level. :param raw: the raw message - :type raw: str - - :param subject: subject of the message. - :type subject: str - - :param flags: flags - :type flags: list - - :param date: the received date for the message - :type date: str - - :param uid: the message uid for this mailbox - :type uid: int + :type raw: StringIO or basestring + :return: msg, chash, size, multi + :rtype: tuple """ - # TODO: split in smaller methods - logger.debug('adding message') - if flags is None: - flags = tuple() - leap_assert_type(flags, tuple) - - # docs for flags, headers, and body - fd, hd, bd = map( - lambda t: self._get_empty_doc(t), - (self.FLAGS_DOC, self.HEADERS_DOC, self.BODY_DOC)) - msg = self._get_parsed_msg(raw) - headers = defaultdict(list) - for k, v in msg.items(): - headers[k].append(v) - raw_str = msg.as_string() chash = self._get_hash(msg) + size = len(msg.as_string()) multi = msg.is_multipart() + return msg, chash, size, multi - attaches = [] - inner_parts = [] - - if multi: - # XXX should walk down recursively - # in a better way. but fixing this quick - # to have an rc. - # XXX should pick the content-type in txt - body = first(msg.get_payload()).get_payload() - if isinstance(body, list): - # allowing one nesting level for now... - body, rest = body[0].get_payload(), body[1:] - for p in rest: - inner_parts.append(p) - else: - body = msg.get_payload() - logger.debug("adding msg with uid %s (multipart:%s)" % ( - uid, multi)) + def _populate_flags(self, flags, uid, chash, size, multi): + """ + Return a flags doc. + + XXX Missing DOC ----------- + """ + fd = self._get_empty_doc(self.FLAGS_DOC) - # flags doc --------------------------------------- fd[self.MBOX_KEY] = self.mbox fd[self.UID_KEY] = uid fd[self.CONTENT_HASH_KEY] = chash + fd[self.SIZE_KEY] = size fd[self.MULTIPART_KEY] = multi - fd[self.SIZE_KEY] = len(raw_str) if flags: fd[self.FLAGS_KEY] = map(self._stringify, flags) fd[self.SEEN_KEY] = self.SEEN_FLAG in flags fd[self.DEL_KEY] = self.DELETED_FLAG in flags fd[self.RECENT_KEY] = True # set always by default + return fd - # headers doc ---------------------------------------- + def _populate_headr(self, msg, chash, subject, date): + """ + Return a headers doc. + + XXX Missing DOC ----------- + """ + headers = defaultdict(list) + for k, v in msg.items(): + headers[k].append(v) + + # "fix" for repeated headers. + for k, v in headers.items(): + newline = "\n%s: " % (k,) + headers[k] = newline.join(v) + + hd = self._get_empty_doc(self.HEADERS_DOC) hd[self.CONTENT_HASH_KEY] = chash hd[self.HEADERS_KEY] = headers - print "headers" - import pprint - pprint.pprint(headers) - if not subject and self.SUBJECT_FIELD in headers: hd[self.SUBJECT_KEY] = first(headers[self.SUBJECT_FIELD]) else: hd[self.SUBJECT_KEY] = subject + if not date and self.DATE_FIELD in headers: hd[self.DATE_KEY] = first(headers[self.DATE_FIELD]) else: hd[self.DATE_KEY] = date - if multi: - # XXX fix for multipart nested case - hd[self.NUM_PARTS_KEY] = len(msg.get_payload()) - - # body doc - bd[self.CONTENT_HASH_KEY] = chash - bd[self.BODY_KEY] = body - # XXX in an ideal world, we would not need to save a copy of the - # raw message. But we'll keep it until we can be sure that - # we can rebuild the original message from the parts. - bd[self.RAW_KEY] = raw_str + return hd + + @deferred + def add_msg(self, raw, subject=None, flags=None, date=None, uid=1): + """ + Creates a new message document. + + :param raw: the raw message + :type raw: str + + :param subject: subject of the message. + :type subject: str + + :param flags: flags + :type flags: list + + :param date: the received date for the message + :type date: str + + :param uid: the message uid for this mailbox + :type uid: int + """ + # TODO signal that we can delete the original message!----- + # when all the processing is done. + + # TODO add the linked-from info ! + + logger.debug('adding message') + if flags is None: + flags = tuple() + leap_assert_type(flags, tuple) + + # parse + msg, chash, size, multi = self._do_parse(raw) + + fd = self._populate_flags(flags, uid, chash, size, multi) + hd = self._populate_headr(msg, chash, subject, date) + + parts = walk.get_parts(msg) + body_phash_fun = [walk.get_body_phash_simple, + walk.get_body_phash_multi][int(multi)] + body_phash = body_phash_fun(walk.get_payloads(msg)) + parts_map = walk.walk_msg_tree(parts, body_phash=body_phash) + + # add parts map to header doc + # (body, multi, part_map) + for key in parts_map: + hd[key] = parts_map[key] + del parts_map docs = [fd, hd] + cdocs = walk.get_raw_docs(msg, parts) - # attachment docs - if multi: - outer_parts = msg.get_payload() - parts = outer_parts + inner_parts - - # skip first part, we already got it in body - to_attach = ((i, m) for i, m in enumerate(parts) if i > 0) - for index, part_msg in to_attach: - att_doc = self._get_empty_doc(self.ATTACHMENT_DOC) - att_doc[self.PART_NUMBER_KEY] = index - att_doc[self.CONTENT_HASH_KEY] = chash - phash = self._get_hash(part_msg) - att_doc[self.PAYLOAD_HASH_KEY] = phash - att_doc[self.RAW_KEY] = part_msg.as_string() - - # keep a pointer to the payload hash in the - # headers doc, under the parts_map - hd[self.PARTS_MAP_KEY][str(index)] = phash - attaches.append(att_doc) - - # Saving ... ------------------------------- - # ok, there we go... + # Saving logger.debug('enqueuing message docs for write') ptuple = SoledadWriterPayload @@ -1067,14 +1123,12 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): for doc in docs: self.soledad_writer.put(ptuple( mode=ptuple.CREATE, payload=doc)) - # second, try to create body doc. - self.soledad_writer.put(ptuple( - mode=ptuple.BODY_CREATE, payload=bd)) + # and last, but not least, try to create - # attachment docs if not already there. - for at in attaches: + # content docs if not already there. + for cd in cdocs: self.soledad_writer.put(ptuple( - mode=ptuple.ATTACHMENT_CREATE, payload=at)) + mode=ptuple.CONTENT_CREATE, payload=cd)) def _remove_cb(self, result): return result diff --git a/mail/src/leap/mail/imap/service/imap.py b/mail/src/leap/mail/imap/service/imap.py index 26e14c3..234996d 100644 --- a/mail/src/leap/mail/imap/service/imap.py +++ b/mail/src/leap/mail/imap/service/imap.py @@ -87,6 +87,8 @@ class LeapIMAPServer(imap4.IMAP4Server): :param line: the line from the server, without the line delimiter. :type line: str """ + print "RECV: STATE (%s)" % self.state + if "login" in line.lower(): # avoid to log the pass, even though we are using a dummy auth # by now. diff --git a/mail/src/leap/mail/imap/tests/rfc822.multi-minimal.message b/mail/src/leap/mail/imap/tests/rfc822.multi-minimal.message new file mode 100644 index 0000000..582297c --- /dev/null +++ b/mail/src/leap/mail/imap/tests/rfc822.multi-minimal.message @@ -0,0 +1,16 @@ +Content-Type: multipart/mixed; boundary="===============6203542367371144092==" +MIME-Version: 1.0 +Subject: [TEST] 010 - Inceptos cum lorem risus congue +From: testmailbitmaskspam@gmail.com +To: test_c5@dev.bitmask.net + +--===============6203542367371144092== +Content-Type: text/plain; charset="us-ascii" +MIME-Version: 1.0 +Content-Transfer-Encoding: 7bit + +Howdy from python! +The subject: [TEST] 010 - Inceptos cum lorem risus congue +Current date & time: Wed Jan 8 16:36:21 2014 +Trying to attach: [] +--===============6203542367371144092==-- diff --git a/mail/src/leap/mail/imap/tests/rfc822.plain.message b/mail/src/leap/mail/imap/tests/rfc822.plain.message new file mode 100644 index 0000000..fc627c3 --- /dev/null +++ b/mail/src/leap/mail/imap/tests/rfc822.plain.message @@ -0,0 +1,66 @@ +From pyar-bounces@python.org.ar Wed Jan 8 14:46:02 2014 +Return-Path: +X-Spam-Checker-Version: SpamAssassin 3.3.2 (2011-06-06) on spamd2.riseup.net +X-Spam-Level: ** +X-Spam-Pyzor: Reported 0 times. +X-Spam-Status: No, score=2.1 required=8.0 tests=AM_TRUNCATED,CK_419SIZE, + CK_NAIVER_NO_DNS,CK_NAIVE_NO_DNS,ENV_FROM_DIFF0,HAS_REPLY_TO,LINK_NR_TOP, + NO_REAL_NAME,RDNS_NONE,RISEUP_SPEAR_C shortcircuit=no autolearn=disabled + version=3.3.2 +Delivered-To: kali@leap.se +Received: from mx1.riseup.net (mx1-pn.riseup.net [10.0.1.33]) + (using TLSv1 with cipher DHE-RSA-AES256-SHA (256/256 bits)) + (Client CN "*.riseup.net", Issuer "Gandi Standard SSL CA" (not verified)) + by vireo.riseup.net (Postfix) with ESMTPS id 6C39A8F + for ; Wed, 8 Jan 2014 18:46:02 +0000 (UTC) +Received: from pyar.usla.org.ar (unknown [190.228.30.157]) + by mx1.riseup.net (Postfix) with ESMTP id F244C533F4 + for ; Wed, 8 Jan 2014 10:46:01 -0800 (PST) +Received: from [127.0.0.1] (localhost [127.0.0.1]) + by pyar.usla.org.ar (Postfix) with ESMTP id CC51D26A4F + for ; Wed, 8 Jan 2014 15:46:00 -0300 (ART) +MIME-Version: 1.0 +Content-Type: text/plain; charset="iso-8859-1" +Content-Transfer-Encoding: quoted-printable +From: pyar-request@python.org.ar +To: kali@leap.se +Subject: confirm 0e47e4342e4d42508e8c283175b05b3377148ac2 +Reply-To: pyar-request@python.org.ar +Auto-Submitted: auto-replied +Message-ID: +Date: Wed, 08 Jan 2014 15:45:59 -0300 +Precedence: bulk +X-BeenThere: pyar@python.org.ar +X-Mailman-Version: 2.1.15 +List-Id: Python Argentina +X-List-Administrivia: yes +Errors-To: pyar-bounces@python.org.ar +Sender: "pyar" +X-Virus-Scanned: clamav-milter 0.97.8 at mx1 +X-Virus-Status: Clean + +Mailing list subscription confirmation notice for mailing list pyar + +We have received a request de kaliyuga@riseup.net for subscription of +your email address, "kaliyuga@riseup.net", to the pyar@python.org.ar +mailing list. To confirm that you want to be added to this mailing +list, simply reply to this message, keeping the Subject: header +intact. Or visit this web page: + + http://listas.python.org.ar/confirm/pyar/0e47e4342e4d42508e8c283175b05b= +3377148ac2 + + +Or include the following line -- and only the following line -- in a +message to pyar-request@python.org.ar: + + confirm 0e47e4342e4d42508e8c283175b05b3377148ac2 + +Note that simply sending a `reply' to this message should work from +most mail readers, since that usually leaves the Subject: line in the +right form (additional "Re:" text in the Subject: is okay). + +If you do not wish to be subscribed to this list, please simply +disregard this message. If you think you are being maliciously +subscribed to the list, or have any other questions, send them to +pyar-owner@python.org.ar. diff --git a/mail/src/leap/mail/imap/tests/walktree.py b/mail/src/leap/mail/imap/tests/walktree.py new file mode 100644 index 0000000..1626f65 --- /dev/null +++ b/mail/src/leap/mail/imap/tests/walktree.py @@ -0,0 +1,117 @@ +#t -*- coding: utf-8 -*- +# walktree.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for the walktree module. +""" +import os +from email import parser + +from leap.mail import walk as W + +DEBUG = os.environ.get("BITMASK_MAIL_DEBUG") + +p = parser.Parser() + +# TODO pass an argument of the type of message + +################################################## +# Input from hell + +#msg = p.parse(open('rfc822.multi-signed.message')) +#msg = p.parse(open('rfc822.plain.message')) +msg = p.parse(open('rfc822.multi-minimal.message')) +DO_CHECK = False +################################################# + +parts = W.get_parts(msg) + +if DEBUG: + def trim(item): + item = item[:10] + [trim(part["phash"]) for part in parts if part.get('phash', None)] + +raw_docs = list(W.get_raw_docs(msg, parts)) + +body_phash_fun = [W.get_body_phash_simple, + W.get_body_phash_multi][int(msg.is_multipart())] +body_phash = body_phash_fun(W.get_payloads(msg)) +parts_map = W.walk_msg_tree(parts, body_phash=body_phash) + + +# TODO add missing headers! +expected = { + 'body': '1ddfa80485', + 'multi': True, + 'part_map': { + 1: { + 'headers': {'Content-Disposition': 'inline', + 'Content-Type': 'multipart/mixed; ' + 'boundary="z0eOaCaDLjvTGF2l"'}, + 'multi': True, + 'part_map': {1: {'ctype': 'text/plain', + 'headers': [ + ('Content-Type', + 'text/plain; charset=utf-8'), + ('Content-Disposition', + 'inline'), + ('Content-Transfer-Encoding', + 'quoted-printable')], + 'multi': False, + 'parts': 1, + 'phash': '1ddfa80485', + 'size': 206}, + 2: {'ctype': 'text/plain', + 'headers': [('Content-Type', + 'text/plain; charset=us-ascii'), + ('Content-Disposition', + 'attachment; ' + 'filename="attach.txt"')], + 'multi': False, + 'parts': 1, + 'phash': '7a94e4d769', + 'size': 133}, + 3: {'ctype': 'application/octet-stream', + 'headers': [('Content-Type', + 'application/octet-stream'), + ('Content-Disposition', + 'attachment; filename="hack.ico"'), + ('Content-Transfer-Encoding', + 'base64')], + 'multi': False, + 'parts': 1, + 'phash': 'c42cccebbd', + 'size': 12736}}}, + 2: {'ctype': 'application/pgp-signature', + 'headers': [('Content-Type', 'application/pgp-signature')], + 'multi': False, + 'parts': 1, + 'phash': '8f49fbf749', + 'size': 877}}} + +if DEBUG and DO_CHECK: + # TODO turn this into a proper unittest + assert(parts_map == expected) + print "Structure: OK" + + +import pprint +print +print "RAW DOCS" +pprint.pprint(raw_docs) +print +print "PARTS MAP" +pprint.pprint(parts_map) diff --git a/mail/src/leap/mail/walk.py b/mail/src/leap/mail/walk.py new file mode 100644 index 0000000..820b8c7 --- /dev/null +++ b/mail/src/leap/mail/walk.py @@ -0,0 +1,160 @@ +# -*- coding: utf-8 -*- +# walk.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Utilities for walking along a message tree. +""" +import hashlib +import os + +from leap.mail.utils import first + +DEBUG = os.environ.get("BITMASK_MAIL_DEBUG") + +if DEBUG: + get_hash = lambda s: hashlib.sha256(s).hexdigest()[:10] +else: + get_hash = lambda s: hashlib.sha256(s).hexdigest() + + +""" +Get interesting message parts +""" +get_parts = lambda msg: [ + {'multi': part.is_multipart(), + 'ctype': part.get_content_type(), + 'size': len(part.as_string()), + 'parts': len(part.get_payload()) + if isinstance(part.get_payload(), list) + else 1, + 'headers': part.items(), + 'phash': get_hash(part.get_payload()) + if not part.is_multipart() else None} + for part in msg.walk()] + +""" +Utility lambda functions for getting the parts vector and the +payloads from the original message. +""" + +get_parts_vector = lambda parts: (x.get('parts', 1) for x in parts) +get_payloads = lambda msg: ((x.get_payload(), + dict(((str.lower(k), v) for k, v in (x.items())))) + for x in msg.walk()) + +get_body_phash_simple = lambda payloads: first( + [get_hash(payload) for payload, headers in payloads + if "text/plain" in headers.get('content-type')]) + +get_body_phash_multi = lambda payloads: (first( + [get_hash(payload) for payload, headers in payloads + if "text/plain" in headers.get('content-type')]) + or get_body_phash_simple(payloads)) + +""" +On getting the raw docs, we get also some of the headers to be able to +index the content. Here we remove any mutable part, as the the filename +in the content disposition. +""" + +get_raw_docs = lambda msg, parts: ( + {"type": "cnt", # type content they'll be + "raw": payload if not DEBUG else payload[:100], + "phash": get_hash(payload), + "content-disposition": first(headers.get( + 'content-disposition', '').split(';')), + "content-type": headers.get( + 'content-type', ''), + "content-transfer-encoding": headers.get( + 'content-transfer-type', '')} + for payload, headers in get_payloads(msg) + if not isinstance(payload, list)) + + +def walk_msg_tree(parts, body_phash=None): + """ + Take a list of interesting items of a message subparts structure, + and return a dict of dicts almost ready to be written to the content + documents that will be stored in Soledad. + + It walks down the subparts in the parsed message tree, and collapses + the leaf docuents into a wrapper document until no multipart submessages + are left. To achieve this, it iteratively calculates a wrapper vector of + all documents in the sequence that have more than one part and have unitary + documents to their right. To collapse a multipart, take as many + unitary documents as parts the submessage contains, and replace the object + in the sequence with the new wrapper document. + + :param parts: A list of dicts containing the interesting properties for + the message structure. Normally this has been generated by + doing a message walk. + :type parts: list of dicts. + :param body_phash: the payload hash of the body part, to be included + in the outer content doc for convenience. + :type body_phash: basestring or None + """ + # parts vector + pv = list(get_parts_vector(parts)) + + if len(parts) == 2: + inner_headers = parts[1].get("headers", None) + + if DEBUG: + print "parts vector: ", pv + print + + # wrappers vector + getwv = lambda pv: [True if pv[i] != 1 and pv[i + 1] == 1 else False + for i in range(len(pv) - 1)] + wv = getwv(pv) + + # do until no wrapper document is left + while any(wv): + wind = wv.index(True) # wrapper index + nsub = pv[wind] # number of subparts to pick + slic = parts[wind + 1:wind + 1 + nsub] # slice with subparts + + cwra = { + "multi": True, + "part_map": dict((index + 1, part) # content wrapper + for index, part in enumerate(slic)), + "headers": dict(parts[wind]['headers']) + } + + # remove subparts and substitue wrapper + map(lambda i: parts.remove(i), slic) + parts[wind] = cwra + + # refresh vectors for this iteration + pv = list(get_parts_vector(parts)) + wv = getwv(pv) + + outer = parts[0] + outer.pop('headers') + if not "part_map" in outer: + # we have a multipart with 1 part only, so kind of fix it + # although it would be prettier if I take this special case at + # the beginning of the walk. + pdoc = {"multi": True, + "part_map": {1: outer}} + pdoc["part_map"][1]["multi"] = False + if not pdoc["part_map"][1].get("phash", None): + pdoc["part_map"][1]["phash"] = body_phash + pdoc["part_map"][1]["headers"] = inner_headers + else: + pdoc = outer + pdoc["body"] = body_phash + return pdoc -- cgit v1.2.3