diff options
author | Kali Kaneko <kali@leap.se> | 2014-01-07 11:34:08 -0400 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2014-01-08 20:53:47 -0400 |
commit | 4ba5d5b405e3c6a6bc997df2073ffc8ea3fa75a9 (patch) | |
tree | 7519ccd4dec15240cc8a89ff34fdc61ee7236141 /src/leap/mail/imap/messages.py | |
parent | a203337d155a6e7186980ef175642adc91d472fe (diff) |
Second stage of the new year's storage rewrite.
* documents of only three types:
* flags
* headers
* content
* add algorithm for walking the parsed message tree.
* treat special cases like a multipart with a single part.
* modify add_msg to use the walk routine
* modify twisted interfaces to use the new storage schema.
* tests for different multipart cases
* fix multipart detection typo in the fetch
This is a merge proposal for the 0.5.0-rc3.
known bugs
----------
Some things are still know not to work well at this point
(some cases of multipart messages do not display the bodies).
IMAP server also is left in a bad internal state after a logout/login.
Diffstat (limited to 'src/leap/mail/imap/messages.py')
-rw-r--r-- | src/leap/mail/imap/messages.py | 722 |
1 files changed, 388 insertions, 334 deletions
diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py index bfe913c..37e4311 100644 --- a/src/leap/mail/imap/messages.py +++ b/src/leap/mail/imap/messages.py @@ -33,6 +33,7 @@ from zope.proxy import sameProxiedObjects from leap.common.check import leap_assert, leap_assert_type from leap.common.decorators import memoized_method from leap.common.mail import get_email_charset +from leap.mail import walk from leap.mail.utils import first from leap.mail.decorators import deferred from leap.mail.imap.index import IndexedDB @@ -43,65 +44,58 @@ from leap.mail.messageflow import IMessageConsumer, MessageProducer logger = logging.getLogger(__name__) -class MessageBody(object): - """ - IMessagePart implementor for the main - body of a multipart message. - - Excusatio non petita: see the interface documentation. - """ +# TODO ------------------------------------------------------------ - implements(imap4.IMessagePart) - - def __init__(self, fdoc, bdoc): - self._fdoc = fdoc - self._bdoc = bdoc - - def getSize(self): - return len(self._bdoc.content[fields.BODY_KEY]) +# [ ] Add linked-from info. +# [ ] Delete incoming mail only after successful write! +# [ ] Remove UID from syncable db. Store only those indexes locally. +# [ ] Send patch to twisted for bug in imap4.py:5717 (content-type can be +# none? lower-case?) - def getBodyFile(self): - fd = StringIO.StringIO() - - if self._bdoc: - body = self._bdoc.content[fields.BODY_KEY] - else: - body = "" - charset = self._get_charset(body) - try: - body = body.encode(charset) - except (UnicodeEncodeError, UnicodeDecodeError) as e: - logger.error("Unicode error {0}".format(e)) - body = body.encode(charset, 'replace') - fd.write(body) - fd.seek(0) - return fd - - @memoized_method - def _get_charset(self, stuff): - return get_email_charset(unicode(stuff)) - - def getHeaders(self, negate, *names): - return {} +def lowerdict(_dict): + """ + Return a dict with the keys in lowercase. - def isMultipart(self): - return False + :param _dict: the dict to convert + :rtype: dict + """ + return dict((key.lower(), value) + for key, value in _dict.items()) - def getSubPart(self, part): - return None +class MessagePart(object): + """ + IMessagePart implementor. + It takes a subpart message and is able to find + the inner parts. -class MessageAttachment(object): + Excusatio non petita: see the interface documentation. + """ implements(imap4.IMessagePart) - def __init__(self, msg): + def __init__(self, soledad, part_map): """ - Initializes the messagepart with a Message instance. - :param msg: a message instance - :type msg: Message + Initializes the MessagePart. + + :param part_map: a dictionary containing the parts map for this + message + :type part_map: dict """ - self._msg = msg + # TODO + # It would be good to pass the uid/mailbox also + # for references while debugging. + + # We have a problem on bulk moves, and is + # that when the fetch on the new mailbox is done + # the parts maybe are not complete. + # So we should be able to fail with empty + # docs until we solve that. The ideal would be + # to gather the results of the deferred operations + # to signal the operation is complete. + #leap_assert(part_map, "part map dict cannot be null") + self._soledad = soledad + self._pmap = part_map def getSize(self): """ @@ -110,9 +104,12 @@ class MessageAttachment(object): :return: size of the message, in octets :rtype: int """ - if not self._msg: + if not self._pmap: return 0 - return len(self._msg.as_string()) + size = self._pmap.get('size', None) + if not size: + logger.error("Message part cannot find size in the partmap") + return size def getBodyFile(self): """ @@ -122,24 +119,91 @@ class MessageAttachment(object): :rtype: StringIO """ fd = StringIO.StringIO() - if self._msg: - body = self._msg.get_payload() + if self._pmap: + multi = self._pmap.get('multi') + if not multi: + phash = self._pmap.get("phash", None) + else: + pmap = self._pmap.get('part_map') + first_part = pmap.get('1', None) + if first_part: + phash = first_part['phash'] + + if not phash: + logger.warning("Could not find phash for this subpart!") + payload = str("") + else: + payload = self._get_payload_from_document(phash) + else: - logger.debug("Empty message!") - body = "" - - # XXX should only do the dance if we're sure it's - # content/text-plain!!! - #charset = self._get_charset(body) - #try: - #body = body.encode(charset) - #except (UnicodeEncodeError, UnicodeDecodeError) as e: - #logger.error("Unicode error {0}".format(e)) - #body = body.encode(charset, 'replace') - fd.write(body) + logger.warning("Message with no part_map!") + payload = str("") + + if payload: + #headers = self.getHeaders(True) + #headers = lowerdict(headers) + #content_type = headers.get('content-type', "") + content_type = self._get_ctype_from_document(phash) + charset_split = content_type.split('charset=') + # XXX fuck all this, use a regex! + if len(charset_split) > 1: + charset = charset_split[1] + if charset: + charset = charset.strip() + else: + charset = None + if not charset: + charset = self._get_charset(payload) + try: + payload = payload.encode(charset) + except (UnicodeEncodeError, UnicodeDecodeError) as e: + logger.error("Unicode error {0}".format(e)) + payload = payload.encode(charset, 'replace') + + fd.write(payload) fd.seek(0) return fd + # TODO cache the phash retrieval + def _get_payload_from_document(self, phash): + """ + Gets the message payload from the content document. + + :param phash: the payload hash to retrieve by. + :type phash: basestring + """ + cdocs = self._soledad.get_from_index( + fields.TYPE_P_HASH_IDX, + fields.TYPE_CONTENT_VAL, str(phash)) + + cdoc = first(cdocs) + if not cdoc: + logger.warning( + "Could not find the content doc " + "for phash %s" % (phash,)) + payload = cdoc.content.get(fields.RAW_KEY, "") + return payload + + # TODO cache the pahash retrieval + def _get_ctype_from_document(self, phash): + """ + Gets the content-type from the content document. + + :param phash: the payload hash to retrieve by. + :type phash: basestring + """ + cdocs = self._soledad.get_from_index( + fields.TYPE_P_HASH_IDX, + fields.TYPE_CONTENT_VAL, str(phash)) + + cdoc = first(cdocs) + if not cdoc: + logger.warning( + "Could not find the content doc " + "for phash %s" % (phash,)) + ctype = cdoc.content.get('ctype', "") + return ctype + @memoized_method def _get_charset(self, stuff): # TODO put in a common class with LeapMessage @@ -150,8 +214,6 @@ class MessageAttachment(object): :type stuff: basestring :returns: charset """ - # XXX existential doubt 1. wouldn't be smarter to - # peek into the mail headers? # XXX existential doubt 2. shouldn't we make the scope # of the decorator somewhat more persistent? # ah! yes! and put memory bounds. @@ -172,9 +234,17 @@ class MessageAttachment(object): :return: A mapping of header field names to header field values :rtype: dict """ - if not self._msg: + if not self._pmap: + logger.warning("No pmap in Subpart!") return {} - headers = dict(self._msg.items()) + headers = dict(self._pmap.get("headers", [])) + + # twisted imap server expects *some* headers to be lowercase + # We could use a CaseInsensitiveDict here... + headers = dict( + (str(key), str(value)) if key.lower() != "content-type" + else (str(key.lower()), str(value)) + for (key, value) in headers.items()) names = map(lambda s: s.upper(), names) if negate: @@ -187,13 +257,18 @@ class MessageAttachment(object): map(str, (key, val)) for key, val in headers.items() if cond(key)] - return dict(filter_by_cond) + filtered = dict(filter_by_cond) + return filtered def isMultipart(self): """ Return True if this message is multipart. """ - return self._msg.is_multipart() + if not self._pmap: + logger.warning("Could not get part map!") + return False + multi = self._pmap.get("multi", False) + return multi def getSubPart(self, part): """ @@ -206,10 +281,30 @@ class MessageAttachment(object): :rtype: Any object implementing C{IMessagePart}. :return: The specified sub-part. """ - return self._msg.get_payload() + if not self.isMultipart(): + raise TypeError + sub_pmap = self._pmap.get("part_map", {}) + try: + part_map = sub_pmap[str(part + 1)] + except KeyError: + logger.debug("getSubpart for %s: KeyError" % (part,)) + raise IndexError + + # XXX check for validity + return MessagePart(self._soledad, part_map) class LeapMessage(fields, MailParser, MBoxParser): + """ + The main representation of a message. + + It indexes the messages in one mailbox by a combination + of uid+mailbox name. + """ + + # TODO this has to change. + # Should index primarily by chash, and keep a local-lonly + # UID table. implements(imap4.IMessage) @@ -268,6 +363,8 @@ class LeapMessage(fields, MailParser, MBoxParser): """ An accessor to the body document. """ + if not self._hdoc: + return None if not self.__bdoc: self.__bdoc = self._get_body_doc() return self.__bdoc @@ -320,6 +417,11 @@ class LeapMessage(fields, MailParser, MBoxParser): log.msg('setting flags: %s' % (self._uid)) doc = self._fdoc + if not doc: + logger.warning( + "Could not find FDOC for %s:%s while setting flags!" % + (self._mbox, self._uid)) + return doc.content[self.FLAGS_KEY] = flags doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags doc.content[self.RECENT_KEY] = self.RECENT_FLAG in flags @@ -384,16 +486,25 @@ class LeapMessage(fields, MailParser, MBoxParser): fd = StringIO.StringIO() bdoc = self._bdoc if bdoc: - body = self._bdoc.content.get(self.BODY_KEY, "") + body = str(self._bdoc.content.get(self.RAW_KEY, "")) else: - body = "" + logger.warning("No BDOC found for message.") + body = str("") + + # XXX not needed, isn't it? ---- ivan? + #if bdoc: + #content_type = bdoc.content.get('content-type', "") + #charset = content_type.split('charset=')[1] + #if charset: + #charset = charset.strip() + #if not charset: + #charset = self._get_charset(body) + #try: + #body = str(body.encode(charset)) + #except (UnicodeEncodeError, UnicodeDecodeError) as e: + #logger.error("Unicode error {0}".format(e)) + #body = str(body.encode(charset, 'replace')) - charset = self._get_charset(body) - try: - body = body.encode(charset) - except (UnicodeEncodeError, UnicodeDecodeError) as e: - logger.error("Unicode error {0}".format(e)) - body = body.encode(charset, 'replace') fd.write(body) fd.seek(0) return fd @@ -407,8 +518,7 @@ class LeapMessage(fields, MailParser, MBoxParser): :type stuff: basestring :returns: charset """ - # XXX existential doubt 1. wouldn't be smarter to - # peek into the mail headers? + # TODO get from subpart headers # XXX existential doubt 2. shouldn't we make the scope # of the decorator somewhat more persistent? # ah! yes! and put memory bounds. @@ -447,9 +557,11 @@ class LeapMessage(fields, MailParser, MBoxParser): :return: A mapping of header field names to header field values :rtype: dict """ + # TODO split in smaller methods headers = self._get_headers() if not headers: - return {'content-type': ''} + logger.warning("No headers found") + return {str('content-type'): str('')} names = map(lambda s: s.upper(), names) if negate: @@ -457,16 +569,20 @@ class LeapMessage(fields, MailParser, MBoxParser): else: cond = lambda key: key.upper() in names - head = copy.deepcopy(dict(headers.items())) + if isinstance(headers, list): + headers = dict(headers) - # twisted imap server expects headers to be lowercase - head = dict( - (str(key), map(str, value)) if key.lower() != "content-type" - else (str(key.lower(), map(str, value))) - for (key, value) in head.items()) + # twisted imap server expects *some* headers to be lowercase + # XXX refactor together with MessagePart method + headers = dict( + (str(key), str(value)) if key.lower() != "content-type" + else (str(key.lower()), str(value)) + for (key, value) in headers.items()) # unpack and filter original dict by negate-condition - filter_by_cond = [(key, val) for key, val in head.items() if cond(key)] + filter_by_cond = [(key, val) for key, val + in headers.items() if cond(key)] + return dict(filter_by_cond) def _get_headers(self): @@ -474,7 +590,9 @@ class LeapMessage(fields, MailParser, MBoxParser): Return the headers dict for this message. """ if self._hdoc is not None: - return self._hdoc.content.get(self.HEADERS_KEY, {}) + headers = self._hdoc.content.get(self.HEADERS_KEY, {}) + return headers + else: logger.warning( "No HEADERS doc for msg %s:%s" % ( @@ -486,12 +604,13 @@ class LeapMessage(fields, MailParser, MBoxParser): Return True if this message is multipart. """ if self._fdoc: - return self._fdoc.content.get(self.MULTIPART_KEY, False) + is_multipart = self._fdoc.content.get(self.MULTIPART_KEY, False) + return is_multipart else: logger.warning( "No FLAGS doc for msg %s:%s" % ( - self.mbox, - self.uid)) + self._mbox, + self._uid)) def getSubPart(self, part): """ @@ -504,27 +623,33 @@ class LeapMessage(fields, MailParser, MBoxParser): :rtype: Any object implementing C{IMessagePart}. :return: The specified sub-part. """ - logger.debug("Getting subpart: %s" % part) if not self.isMultipart(): raise TypeError - - if part == 0: - # Let's get the first part, which - # is really the body. - return MessageBody(self._fdoc, self._bdoc) - - attach_doc = self._get_attachment_doc(part) - if not attach_doc: - # so long and thanks for all the fish - logger.debug("...not today") + try: + pmap_dict = self._get_part_from_parts_map(part + 1) + except KeyError: + logger.debug("getSubpart for %s: KeyError" % (part,)) raise IndexError - msg_part = self._get_parsed_msg(attach_doc.content[self.RAW_KEY]) - return MessageAttachment(msg_part) + return MessagePart(self._soledad, pmap_dict) # # accessors # + def _get_part_from_parts_map(self, part): + """ + Get a part map from the headers doc + + :raises: KeyError if key does not exist + :rtype: dict + """ + if not self._hdoc: + logger.warning("Tried to get part but no HDOC found!") + return None + + pmap = self._hdoc.content.get(fields.PARTS_MAP_KEY, {}) + return pmap[str(part)] + def _get_flags_doc(self): """ Return the document that keeps the flags for this @@ -550,63 +675,16 @@ class LeapMessage(fields, MailParser, MBoxParser): Return the document that keeps the body for this message. """ - body_docs = self._soledad.get_from_index( - fields.TYPE_C_HASH_IDX, - fields.TYPE_MESSAGE_VAL, str(self._chash)) - return first(body_docs) - - def _get_num_parts(self): - """ - Return the number of parts for a multipart message. - """ - if not self.isMultipart(): - raise TypeError( - "Tried to get num parts in a non-multipart message") - if not self._hdoc: - return None - return self._hdoc.content.get(fields.NUM_PARTS_KEY, 2) - - def _get_attachment_doc(self, part): - """ - Return the document that keeps the headers for this - message. - - :param part: the part number for the multipart message. - :type part: int - """ - if not self._hdoc: - return None - try: - phash = self._hdoc.content[self.PARTS_MAP_KEY][str(part)] - except KeyError: - # this is the remnant of a debug session until - # I found that the index is actually a string... - # It should be safe to just raise the KeyError now, - # but leaving it here while the blood is fresh... - logger.warning("We expected a phash in the " - "index %s, but noone found" % (part, )) - logger.debug(self._hdoc.content[self.PARTS_MAP_KEY]) + body_phash = self._hdoc.content.get( + fields.BODY_KEY, None) + if not body_phash: + logger.warning("No body phash for this document!") return None - attach_docs = self._soledad.get_from_index( + body_docs = self._soledad.get_from_index( fields.TYPE_P_HASH_IDX, - fields.TYPE_ATTACHMENT_VAL, str(phash)) - - # The following is true for the fist owner. - # We could use this relationship to flag the "owner" - # and orphan when we delete it. + fields.TYPE_CONTENT_VAL, str(body_phash)) - #attach_docs = self._soledad.get_from_index( - #fields.TYPE_C_HASH_PART_IDX, - #fields.TYPE_ATTACHMENT_VAL, str(self._chash), str(part)) - return first(attach_docs) - - def _get_raw_msg(self): - """ - Return the raw msg. - :rtype: basestring - """ - # TODO deprecate this. - return self._bdoc.content.get(self.RAW_KEY, '') + return first(body_docs) def __getitem__(self, key): """ @@ -658,27 +736,22 @@ class LeapMessage(fields, MailParser, MBoxParser): """ Remove all docs associated with this message. """ - # XXX this would ve more efficient if we can just pass - # a sequence of uids. - # XXX For the moment we are only removing the flags and headers # docs. The rest we leave there polluting your hard disk, # until we think about a good way of deorphaning. # Maybe a crawler of unreferenced docs. + # XXX implement elijah's idea of using a PUT document as a + # token to ensure consistency in the removal. + uid = self._uid - print "removing...", uid fd = self._get_flags_doc() - hd = self._get_headers_doc() + #hd = self._get_headers_doc() #bd = self._get_body_doc() #docs = [fd, hd, bd] - docs = [fd, hd] - - #for pn in range(self._get_num_parts()[1:]): - #ad = self._get_attachment_doc(pn) - #docs.append(ad) + docs = [fd] for d in filter(None, docs): try: @@ -703,8 +776,7 @@ SoledadWriterPayload = namedtuple( SoledadWriterPayload.CREATE = 1 SoledadWriterPayload.PUT = 2 -SoledadWriterPayload.BODY_CREATE = 3 -SoledadWriterPayload.ATTACHMENT_CREATE = 4 +SoledadWriterPayload.CONTENT_CREATE = 3 class SoledadDocWriter(object): @@ -723,6 +795,38 @@ class SoledadDocWriter(object): """ self._soledad = soledad + def _get_call_for_item(self, item): + """ + Return the proper call type for a given item. + + :param item: one of the types defined under the + attributes of SoledadWriterPayload + :type item: int + """ + call = None + payload = item.payload + + if item.mode == SoledadWriterPayload.CREATE: + call = self._soledad.create_doc + elif (item.mode == SoledadWriterPayload.CONTENT_CREATE + and not self._content_does_exist(payload)): + call = self._soledad.create_doc + elif item.mode == SoledadWriterPayload.PUT: + call = self._soledad.put_doc + return call + + def _process(self, queue): + """ + Return the item and the proper call type for the next + item in the queue if any. + + :param queue: the queue from where we'll pick item. + :type queue: Queue + """ + item = queue.get() + call = self._get_call_for_item(item) + return item, call + def consume(self, queue): """ Creates a new document in soledad db. @@ -733,24 +837,10 @@ class SoledadDocWriter(object): """ empty = queue.empty() while not empty: - item = queue.get() - call = None - payload = item.payload - - if item.mode == SoledadWriterPayload.CREATE: - call = self._soledad.create_doc - elif item.mode == SoledadWriterPayload.BODY_CREATE: - if not self._body_does_exist(payload): - call = self._soledad.create_doc - elif item.mode == SoledadWriterPayload.ATTACHMENT_CREATE: - if not self._attachment_does_exist(payload): - call = self._soledad.create_doc - elif item.mode == SoledadWriterPayload.PUT: - call = self._soledad.put_doc - - # XXX delete? + item, call = self._process(queue) if call: + # XXX should handle the delete case # should handle errors try: call(item.payload) @@ -779,33 +869,10 @@ class SoledadDocWriter(object): Stack. """ - def _body_does_exist(self, doc): + def _content_does_exist(self, doc): """ - Check whether we already have a body payload with this hash in our - database. - - :param doc: tentative body document - :type doc: dict - :returns: True if that happens, False otherwise. - """ - if not doc: - return False - chash = doc[fields.CONTENT_HASH_KEY] - body_docs = self._soledad.get_from_index( - fields.TYPE_C_HASH_IDX, - fields.TYPE_MESSAGE_VAL, str(chash)) - if not body_docs: - return False - if len(body_docs) != 1: - logger.warning("Found more than one copy of chash %s!" - % (chash,)) - logger.debug("Found body doc with that hash! Skipping save!") - return True - - def _attachment_does_exist(self, doc): - """ - Check whether we already have an attachment payload with this hash - in our database. + Check whether we already have a content document for a payload + with this hash in our database. :param doc: tentative body document :type doc: dict @@ -816,7 +883,7 @@ class SoledadDocWriter(object): phash = doc[fields.PAYLOAD_HASH_KEY] attach_docs = self._soledad.get_from_index( fields.TYPE_P_HASH_IDX, - fields.TYPE_ATTACHMENT_VAL, str(phash)) + fields.TYPE_CONTENT_VAL, str(phash)) if not attach_docs: return False @@ -840,15 +907,15 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # into a template for the class. FLAGS_DOC = "FLAGS" HEADERS_DOC = "HEADERS" - ATTACHMENT_DOC = "ATTACHMENT" - BODY_DOC = "BODY" + CONTENT_DOC = "CONTENT" templates = { FLAGS_DOC: { fields.TYPE_KEY: fields.TYPE_FLAGS_VAL, - fields.UID_KEY: 1, + fields.UID_KEY: 1, # XXX moe to a local table fields.MBOX_KEY: fields.INBOX_VAL, + fields.CONTENT_HASH_KEY: "", fields.SEEN_KEY: False, fields.RECENT_KEY: True, @@ -862,35 +929,28 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): fields.TYPE_KEY: fields.TYPE_HEADERS_VAL, fields.CONTENT_HASH_KEY: "", + fields.DATE_KEY: "", + fields.SUBJECT_KEY: "", + fields.HEADERS_KEY: {}, - fields.NUM_PARTS_KEY: 0, fields.PARTS_MAP_KEY: {}, - fields.DATE_KEY: "", - fields.SUBJECT_KEY: "" }, - ATTACHMENT_DOC: { - fields.TYPE_KEY: fields.TYPE_ATTACHMENT_VAL, - fields.PART_NUMBER_KEY: 0, - fields.CONTENT_HASH_KEY: "", + CONTENT_DOC: { + fields.TYPE_KEY: fields.TYPE_CONTENT_VAL, fields.PAYLOAD_HASH_KEY: "", + fields.LINKED_FROM_KEY: [], + fields.CTYPE_KEY: "", # should index by this too - fields.RAW_KEY: "" - }, - - BODY_DOC: { - fields.TYPE_KEY: fields.TYPE_MESSAGE_VAL, - fields.CONTENT_HASH_KEY: "", - - fields.BODY_KEY: "", - - # this should not be needed, - # but let's keep the raw msg for some time - # until we are sure we can reconstruct - # the original msg from our disection. + # should only get inmutable headers parts + # (for indexing) + fields.HEADERS_KEY: {}, fields.RAW_KEY: "", + fields.PARTS_MAP_KEY: {}, + fields.HEADERS_KEY: {}, + fields.MULTIPART_KEY: False, + }, - } } def __init__(self, mbox=None, soledad=None): @@ -938,128 +998,124 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): raise TypeError("Improper type passed to _get_empty_doc") return copy.deepcopy(self.templates[_type]) - @deferred - def add_msg(self, raw, subject=None, flags=None, date=None, uid=1): + def _do_parse(self, raw): """ - Creates a new message document. + Parse raw message and return it along with + relevant information about its outer level. :param raw: the raw message - :type raw: str - - :param subject: subject of the message. - :type subject: str - - :param flags: flags - :type flags: list - - :param date: the received date for the message - :type date: str - - :param uid: the message uid for this mailbox - :type uid: int + :type raw: StringIO or basestring + :return: msg, chash, size, multi + :rtype: tuple """ - # TODO: split in smaller methods - logger.debug('adding message') - if flags is None: - flags = tuple() - leap_assert_type(flags, tuple) - - # docs for flags, headers, and body - fd, hd, bd = map( - lambda t: self._get_empty_doc(t), - (self.FLAGS_DOC, self.HEADERS_DOC, self.BODY_DOC)) - msg = self._get_parsed_msg(raw) - headers = defaultdict(list) - for k, v in msg.items(): - headers[k].append(v) - raw_str = msg.as_string() chash = self._get_hash(msg) + size = len(msg.as_string()) multi = msg.is_multipart() + return msg, chash, size, multi - attaches = [] - inner_parts = [] - - if multi: - # XXX should walk down recursively - # in a better way. but fixing this quick - # to have an rc. - # XXX should pick the content-type in txt - body = first(msg.get_payload()).get_payload() - if isinstance(body, list): - # allowing one nesting level for now... - body, rest = body[0].get_payload(), body[1:] - for p in rest: - inner_parts.append(p) - else: - body = msg.get_payload() - logger.debug("adding msg with uid %s (multipart:%s)" % ( - uid, multi)) + def _populate_flags(self, flags, uid, chash, size, multi): + """ + Return a flags doc. + + XXX Missing DOC ----------- + """ + fd = self._get_empty_doc(self.FLAGS_DOC) - # flags doc --------------------------------------- fd[self.MBOX_KEY] = self.mbox fd[self.UID_KEY] = uid fd[self.CONTENT_HASH_KEY] = chash + fd[self.SIZE_KEY] = size fd[self.MULTIPART_KEY] = multi - fd[self.SIZE_KEY] = len(raw_str) if flags: fd[self.FLAGS_KEY] = map(self._stringify, flags) fd[self.SEEN_KEY] = self.SEEN_FLAG in flags fd[self.DEL_KEY] = self.DELETED_FLAG in flags fd[self.RECENT_KEY] = True # set always by default + return fd - # headers doc ---------------------------------------- + def _populate_headr(self, msg, chash, subject, date): + """ + Return a headers doc. + + XXX Missing DOC ----------- + """ + headers = defaultdict(list) + for k, v in msg.items(): + headers[k].append(v) + + # "fix" for repeated headers. + for k, v in headers.items(): + newline = "\n%s: " % (k,) + headers[k] = newline.join(v) + + hd = self._get_empty_doc(self.HEADERS_DOC) hd[self.CONTENT_HASH_KEY] = chash hd[self.HEADERS_KEY] = headers - print "headers" - import pprint - pprint.pprint(headers) - if not subject and self.SUBJECT_FIELD in headers: hd[self.SUBJECT_KEY] = first(headers[self.SUBJECT_FIELD]) else: hd[self.SUBJECT_KEY] = subject + if not date and self.DATE_FIELD in headers: hd[self.DATE_KEY] = first(headers[self.DATE_FIELD]) else: hd[self.DATE_KEY] = date - if multi: - # XXX fix for multipart nested case - hd[self.NUM_PARTS_KEY] = len(msg.get_payload()) - - # body doc - bd[self.CONTENT_HASH_KEY] = chash - bd[self.BODY_KEY] = body - # XXX in an ideal world, we would not need to save a copy of the - # raw message. But we'll keep it until we can be sure that - # we can rebuild the original message from the parts. - bd[self.RAW_KEY] = raw_str + return hd + + @deferred + def add_msg(self, raw, subject=None, flags=None, date=None, uid=1): + """ + Creates a new message document. + + :param raw: the raw message + :type raw: str + + :param subject: subject of the message. + :type subject: str + + :param flags: flags + :type flags: list + + :param date: the received date for the message + :type date: str + + :param uid: the message uid for this mailbox + :type uid: int + """ + # TODO signal that we can delete the original message!----- + # when all the processing is done. + + # TODO add the linked-from info ! + + logger.debug('adding message') + if flags is None: + flags = tuple() + leap_assert_type(flags, tuple) + + # parse + msg, chash, size, multi = self._do_parse(raw) + + fd = self._populate_flags(flags, uid, chash, size, multi) + hd = self._populate_headr(msg, chash, subject, date) + + parts = walk.get_parts(msg) + body_phash_fun = [walk.get_body_phash_simple, + walk.get_body_phash_multi][int(multi)] + body_phash = body_phash_fun(walk.get_payloads(msg)) + parts_map = walk.walk_msg_tree(parts, body_phash=body_phash) + + # add parts map to header doc + # (body, multi, part_map) + for key in parts_map: + hd[key] = parts_map[key] + del parts_map docs = [fd, hd] + cdocs = walk.get_raw_docs(msg, parts) - # attachment docs - if multi: - outer_parts = msg.get_payload() - parts = outer_parts + inner_parts - - # skip first part, we already got it in body - to_attach = ((i, m) for i, m in enumerate(parts) if i > 0) - for index, part_msg in to_attach: - att_doc = self._get_empty_doc(self.ATTACHMENT_DOC) - att_doc[self.PART_NUMBER_KEY] = index - att_doc[self.CONTENT_HASH_KEY] = chash - phash = self._get_hash(part_msg) - att_doc[self.PAYLOAD_HASH_KEY] = phash - att_doc[self.RAW_KEY] = part_msg.as_string() - - # keep a pointer to the payload hash in the - # headers doc, under the parts_map - hd[self.PARTS_MAP_KEY][str(index)] = phash - attaches.append(att_doc) - - # Saving ... ------------------------------- - # ok, there we go... + # Saving logger.debug('enqueuing message docs for write') ptuple = SoledadWriterPayload @@ -1067,14 +1123,12 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): for doc in docs: self.soledad_writer.put(ptuple( mode=ptuple.CREATE, payload=doc)) - # second, try to create body doc. - self.soledad_writer.put(ptuple( - mode=ptuple.BODY_CREATE, payload=bd)) + # and last, but not least, try to create - # attachment docs if not already there. - for at in attaches: + # content docs if not already there. + for cd in cdocs: self.soledad_writer.put(ptuple( - mode=ptuple.ATTACHMENT_CREATE, payload=at)) + mode=ptuple.CONTENT_CREATE, payload=cd)) def _remove_cb(self, result): return result |