summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/messages.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/mail/imap/messages.py')
-rw-r--r--src/leap/mail/imap/messages.py722
1 files changed, 388 insertions, 334 deletions
diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py
index bfe913c..37e4311 100644
--- a/src/leap/mail/imap/messages.py
+++ b/src/leap/mail/imap/messages.py
@@ -33,6 +33,7 @@ from zope.proxy import sameProxiedObjects
from leap.common.check import leap_assert, leap_assert_type
from leap.common.decorators import memoized_method
from leap.common.mail import get_email_charset
+from leap.mail import walk
from leap.mail.utils import first
from leap.mail.decorators import deferred
from leap.mail.imap.index import IndexedDB
@@ -43,65 +44,58 @@ from leap.mail.messageflow import IMessageConsumer, MessageProducer
logger = logging.getLogger(__name__)
-class MessageBody(object):
- """
- IMessagePart implementor for the main
- body of a multipart message.
-
- Excusatio non petita: see the interface documentation.
- """
+# TODO ------------------------------------------------------------
- implements(imap4.IMessagePart)
-
- def __init__(self, fdoc, bdoc):
- self._fdoc = fdoc
- self._bdoc = bdoc
-
- def getSize(self):
- return len(self._bdoc.content[fields.BODY_KEY])
+# [ ] Add linked-from info.
+# [ ] Delete incoming mail only after successful write!
+# [ ] Remove UID from syncable db. Store only those indexes locally.
+# [ ] Send patch to twisted for bug in imap4.py:5717 (content-type can be
+# none? lower-case?)
- def getBodyFile(self):
- fd = StringIO.StringIO()
-
- if self._bdoc:
- body = self._bdoc.content[fields.BODY_KEY]
- else:
- body = ""
- charset = self._get_charset(body)
- try:
- body = body.encode(charset)
- except (UnicodeEncodeError, UnicodeDecodeError) as e:
- logger.error("Unicode error {0}".format(e))
- body = body.encode(charset, 'replace')
- fd.write(body)
- fd.seek(0)
- return fd
-
- @memoized_method
- def _get_charset(self, stuff):
- return get_email_charset(unicode(stuff))
-
- def getHeaders(self, negate, *names):
- return {}
+def lowerdict(_dict):
+ """
+ Return a dict with the keys in lowercase.
- def isMultipart(self):
- return False
+ :param _dict: the dict to convert
+ :rtype: dict
+ """
+ return dict((key.lower(), value)
+ for key, value in _dict.items())
- def getSubPart(self, part):
- return None
+class MessagePart(object):
+ """
+ IMessagePart implementor.
+ It takes a subpart message and is able to find
+ the inner parts.
-class MessageAttachment(object):
+ Excusatio non petita: see the interface documentation.
+ """
implements(imap4.IMessagePart)
- def __init__(self, msg):
+ def __init__(self, soledad, part_map):
"""
- Initializes the messagepart with a Message instance.
- :param msg: a message instance
- :type msg: Message
+ Initializes the MessagePart.
+
+ :param part_map: a dictionary containing the parts map for this
+ message
+ :type part_map: dict
"""
- self._msg = msg
+ # TODO
+ # It would be good to pass the uid/mailbox also
+ # for references while debugging.
+
+ # We have a problem on bulk moves, and is
+ # that when the fetch on the new mailbox is done
+ # the parts maybe are not complete.
+ # So we should be able to fail with empty
+ # docs until we solve that. The ideal would be
+ # to gather the results of the deferred operations
+ # to signal the operation is complete.
+ #leap_assert(part_map, "part map dict cannot be null")
+ self._soledad = soledad
+ self._pmap = part_map
def getSize(self):
"""
@@ -110,9 +104,12 @@ class MessageAttachment(object):
:return: size of the message, in octets
:rtype: int
"""
- if not self._msg:
+ if not self._pmap:
return 0
- return len(self._msg.as_string())
+ size = self._pmap.get('size', None)
+ if not size:
+ logger.error("Message part cannot find size in the partmap")
+ return size
def getBodyFile(self):
"""
@@ -122,24 +119,91 @@ class MessageAttachment(object):
:rtype: StringIO
"""
fd = StringIO.StringIO()
- if self._msg:
- body = self._msg.get_payload()
+ if self._pmap:
+ multi = self._pmap.get('multi')
+ if not multi:
+ phash = self._pmap.get("phash", None)
+ else:
+ pmap = self._pmap.get('part_map')
+ first_part = pmap.get('1', None)
+ if first_part:
+ phash = first_part['phash']
+
+ if not phash:
+ logger.warning("Could not find phash for this subpart!")
+ payload = str("")
+ else:
+ payload = self._get_payload_from_document(phash)
+
else:
- logger.debug("Empty message!")
- body = ""
-
- # XXX should only do the dance if we're sure it's
- # content/text-plain!!!
- #charset = self._get_charset(body)
- #try:
- #body = body.encode(charset)
- #except (UnicodeEncodeError, UnicodeDecodeError) as e:
- #logger.error("Unicode error {0}".format(e))
- #body = body.encode(charset, 'replace')
- fd.write(body)
+ logger.warning("Message with no part_map!")
+ payload = str("")
+
+ if payload:
+ #headers = self.getHeaders(True)
+ #headers = lowerdict(headers)
+ #content_type = headers.get('content-type', "")
+ content_type = self._get_ctype_from_document(phash)
+ charset_split = content_type.split('charset=')
+ # XXX fuck all this, use a regex!
+ if len(charset_split) > 1:
+ charset = charset_split[1]
+ if charset:
+ charset = charset.strip()
+ else:
+ charset = None
+ if not charset:
+ charset = self._get_charset(payload)
+ try:
+ payload = payload.encode(charset)
+ except (UnicodeEncodeError, UnicodeDecodeError) as e:
+ logger.error("Unicode error {0}".format(e))
+ payload = payload.encode(charset, 'replace')
+
+ fd.write(payload)
fd.seek(0)
return fd
+ # TODO cache the phash retrieval
+ def _get_payload_from_document(self, phash):
+ """
+ Gets the message payload from the content document.
+
+ :param phash: the payload hash to retrieve by.
+ :type phash: basestring
+ """
+ cdocs = self._soledad.get_from_index(
+ fields.TYPE_P_HASH_IDX,
+ fields.TYPE_CONTENT_VAL, str(phash))
+
+ cdoc = first(cdocs)
+ if not cdoc:
+ logger.warning(
+ "Could not find the content doc "
+ "for phash %s" % (phash,))
+ payload = cdoc.content.get(fields.RAW_KEY, "")
+ return payload
+
+ # TODO cache the pahash retrieval
+ def _get_ctype_from_document(self, phash):
+ """
+ Gets the content-type from the content document.
+
+ :param phash: the payload hash to retrieve by.
+ :type phash: basestring
+ """
+ cdocs = self._soledad.get_from_index(
+ fields.TYPE_P_HASH_IDX,
+ fields.TYPE_CONTENT_VAL, str(phash))
+
+ cdoc = first(cdocs)
+ if not cdoc:
+ logger.warning(
+ "Could not find the content doc "
+ "for phash %s" % (phash,))
+ ctype = cdoc.content.get('ctype', "")
+ return ctype
+
@memoized_method
def _get_charset(self, stuff):
# TODO put in a common class with LeapMessage
@@ -150,8 +214,6 @@ class MessageAttachment(object):
:type stuff: basestring
:returns: charset
"""
- # XXX existential doubt 1. wouldn't be smarter to
- # peek into the mail headers?
# XXX existential doubt 2. shouldn't we make the scope
# of the decorator somewhat more persistent?
# ah! yes! and put memory bounds.
@@ -172,9 +234,17 @@ class MessageAttachment(object):
:return: A mapping of header field names to header field values
:rtype: dict
"""
- if not self._msg:
+ if not self._pmap:
+ logger.warning("No pmap in Subpart!")
return {}
- headers = dict(self._msg.items())
+ headers = dict(self._pmap.get("headers", []))
+
+ # twisted imap server expects *some* headers to be lowercase
+ # We could use a CaseInsensitiveDict here...
+ headers = dict(
+ (str(key), str(value)) if key.lower() != "content-type"
+ else (str(key.lower()), str(value))
+ for (key, value) in headers.items())
names = map(lambda s: s.upper(), names)
if negate:
@@ -187,13 +257,18 @@ class MessageAttachment(object):
map(str, (key, val)) for
key, val in headers.items()
if cond(key)]
- return dict(filter_by_cond)
+ filtered = dict(filter_by_cond)
+ return filtered
def isMultipart(self):
"""
Return True if this message is multipart.
"""
- return self._msg.is_multipart()
+ if not self._pmap:
+ logger.warning("Could not get part map!")
+ return False
+ multi = self._pmap.get("multi", False)
+ return multi
def getSubPart(self, part):
"""
@@ -206,10 +281,30 @@ class MessageAttachment(object):
:rtype: Any object implementing C{IMessagePart}.
:return: The specified sub-part.
"""
- return self._msg.get_payload()
+ if not self.isMultipart():
+ raise TypeError
+ sub_pmap = self._pmap.get("part_map", {})
+ try:
+ part_map = sub_pmap[str(part + 1)]
+ except KeyError:
+ logger.debug("getSubpart for %s: KeyError" % (part,))
+ raise IndexError
+
+ # XXX check for validity
+ return MessagePart(self._soledad, part_map)
class LeapMessage(fields, MailParser, MBoxParser):
+ """
+ The main representation of a message.
+
+ It indexes the messages in one mailbox by a combination
+ of uid+mailbox name.
+ """
+
+ # TODO this has to change.
+ # Should index primarily by chash, and keep a local-lonly
+ # UID table.
implements(imap4.IMessage)
@@ -268,6 +363,8 @@ class LeapMessage(fields, MailParser, MBoxParser):
"""
An accessor to the body document.
"""
+ if not self._hdoc:
+ return None
if not self.__bdoc:
self.__bdoc = self._get_body_doc()
return self.__bdoc
@@ -320,6 +417,11 @@ class LeapMessage(fields, MailParser, MBoxParser):
log.msg('setting flags: %s' % (self._uid))
doc = self._fdoc
+ if not doc:
+ logger.warning(
+ "Could not find FDOC for %s:%s while setting flags!" %
+ (self._mbox, self._uid))
+ return
doc.content[self.FLAGS_KEY] = flags
doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags
doc.content[self.RECENT_KEY] = self.RECENT_FLAG in flags
@@ -384,16 +486,25 @@ class LeapMessage(fields, MailParser, MBoxParser):
fd = StringIO.StringIO()
bdoc = self._bdoc
if bdoc:
- body = self._bdoc.content.get(self.BODY_KEY, "")
+ body = str(self._bdoc.content.get(self.RAW_KEY, ""))
else:
- body = ""
+ logger.warning("No BDOC found for message.")
+ body = str("")
+
+ # XXX not needed, isn't it? ---- ivan?
+ #if bdoc:
+ #content_type = bdoc.content.get('content-type', "")
+ #charset = content_type.split('charset=')[1]
+ #if charset:
+ #charset = charset.strip()
+ #if not charset:
+ #charset = self._get_charset(body)
+ #try:
+ #body = str(body.encode(charset))
+ #except (UnicodeEncodeError, UnicodeDecodeError) as e:
+ #logger.error("Unicode error {0}".format(e))
+ #body = str(body.encode(charset, 'replace'))
- charset = self._get_charset(body)
- try:
- body = body.encode(charset)
- except (UnicodeEncodeError, UnicodeDecodeError) as e:
- logger.error("Unicode error {0}".format(e))
- body = body.encode(charset, 'replace')
fd.write(body)
fd.seek(0)
return fd
@@ -407,8 +518,7 @@ class LeapMessage(fields, MailParser, MBoxParser):
:type stuff: basestring
:returns: charset
"""
- # XXX existential doubt 1. wouldn't be smarter to
- # peek into the mail headers?
+ # TODO get from subpart headers
# XXX existential doubt 2. shouldn't we make the scope
# of the decorator somewhat more persistent?
# ah! yes! and put memory bounds.
@@ -447,9 +557,11 @@ class LeapMessage(fields, MailParser, MBoxParser):
:return: A mapping of header field names to header field values
:rtype: dict
"""
+ # TODO split in smaller methods
headers = self._get_headers()
if not headers:
- return {'content-type': ''}
+ logger.warning("No headers found")
+ return {str('content-type'): str('')}
names = map(lambda s: s.upper(), names)
if negate:
@@ -457,16 +569,20 @@ class LeapMessage(fields, MailParser, MBoxParser):
else:
cond = lambda key: key.upper() in names
- head = copy.deepcopy(dict(headers.items()))
+ if isinstance(headers, list):
+ headers = dict(headers)
- # twisted imap server expects headers to be lowercase
- head = dict(
- (str(key), map(str, value)) if key.lower() != "content-type"
- else (str(key.lower(), map(str, value)))
- for (key, value) in head.items())
+ # twisted imap server expects *some* headers to be lowercase
+ # XXX refactor together with MessagePart method
+ headers = dict(
+ (str(key), str(value)) if key.lower() != "content-type"
+ else (str(key.lower()), str(value))
+ for (key, value) in headers.items())
# unpack and filter original dict by negate-condition
- filter_by_cond = [(key, val) for key, val in head.items() if cond(key)]
+ filter_by_cond = [(key, val) for key, val
+ in headers.items() if cond(key)]
+
return dict(filter_by_cond)
def _get_headers(self):
@@ -474,7 +590,9 @@ class LeapMessage(fields, MailParser, MBoxParser):
Return the headers dict for this message.
"""
if self._hdoc is not None:
- return self._hdoc.content.get(self.HEADERS_KEY, {})
+ headers = self._hdoc.content.get(self.HEADERS_KEY, {})
+ return headers
+
else:
logger.warning(
"No HEADERS doc for msg %s:%s" % (
@@ -486,12 +604,13 @@ class LeapMessage(fields, MailParser, MBoxParser):
Return True if this message is multipart.
"""
if self._fdoc:
- return self._fdoc.content.get(self.MULTIPART_KEY, False)
+ is_multipart = self._fdoc.content.get(self.MULTIPART_KEY, False)
+ return is_multipart
else:
logger.warning(
"No FLAGS doc for msg %s:%s" % (
- self.mbox,
- self.uid))
+ self._mbox,
+ self._uid))
def getSubPart(self, part):
"""
@@ -504,27 +623,33 @@ class LeapMessage(fields, MailParser, MBoxParser):
:rtype: Any object implementing C{IMessagePart}.
:return: The specified sub-part.
"""
- logger.debug("Getting subpart: %s" % part)
if not self.isMultipart():
raise TypeError
-
- if part == 0:
- # Let's get the first part, which
- # is really the body.
- return MessageBody(self._fdoc, self._bdoc)
-
- attach_doc = self._get_attachment_doc(part)
- if not attach_doc:
- # so long and thanks for all the fish
- logger.debug("...not today")
+ try:
+ pmap_dict = self._get_part_from_parts_map(part + 1)
+ except KeyError:
+ logger.debug("getSubpart for %s: KeyError" % (part,))
raise IndexError
- msg_part = self._get_parsed_msg(attach_doc.content[self.RAW_KEY])
- return MessageAttachment(msg_part)
+ return MessagePart(self._soledad, pmap_dict)
#
# accessors
#
+ def _get_part_from_parts_map(self, part):
+ """
+ Get a part map from the headers doc
+
+ :raises: KeyError if key does not exist
+ :rtype: dict
+ """
+ if not self._hdoc:
+ logger.warning("Tried to get part but no HDOC found!")
+ return None
+
+ pmap = self._hdoc.content.get(fields.PARTS_MAP_KEY, {})
+ return pmap[str(part)]
+
def _get_flags_doc(self):
"""
Return the document that keeps the flags for this
@@ -550,63 +675,16 @@ class LeapMessage(fields, MailParser, MBoxParser):
Return the document that keeps the body for this
message.
"""
- body_docs = self._soledad.get_from_index(
- fields.TYPE_C_HASH_IDX,
- fields.TYPE_MESSAGE_VAL, str(self._chash))
- return first(body_docs)
-
- def _get_num_parts(self):
- """
- Return the number of parts for a multipart message.
- """
- if not self.isMultipart():
- raise TypeError(
- "Tried to get num parts in a non-multipart message")
- if not self._hdoc:
- return None
- return self._hdoc.content.get(fields.NUM_PARTS_KEY, 2)
-
- def _get_attachment_doc(self, part):
- """
- Return the document that keeps the headers for this
- message.
-
- :param part: the part number for the multipart message.
- :type part: int
- """
- if not self._hdoc:
- return None
- try:
- phash = self._hdoc.content[self.PARTS_MAP_KEY][str(part)]
- except KeyError:
- # this is the remnant of a debug session until
- # I found that the index is actually a string...
- # It should be safe to just raise the KeyError now,
- # but leaving it here while the blood is fresh...
- logger.warning("We expected a phash in the "
- "index %s, but noone found" % (part, ))
- logger.debug(self._hdoc.content[self.PARTS_MAP_KEY])
+ body_phash = self._hdoc.content.get(
+ fields.BODY_KEY, None)
+ if not body_phash:
+ logger.warning("No body phash for this document!")
return None
- attach_docs = self._soledad.get_from_index(
+ body_docs = self._soledad.get_from_index(
fields.TYPE_P_HASH_IDX,
- fields.TYPE_ATTACHMENT_VAL, str(phash))
-
- # The following is true for the fist owner.
- # We could use this relationship to flag the "owner"
- # and orphan when we delete it.
+ fields.TYPE_CONTENT_VAL, str(body_phash))
- #attach_docs = self._soledad.get_from_index(
- #fields.TYPE_C_HASH_PART_IDX,
- #fields.TYPE_ATTACHMENT_VAL, str(self._chash), str(part))
- return first(attach_docs)
-
- def _get_raw_msg(self):
- """
- Return the raw msg.
- :rtype: basestring
- """
- # TODO deprecate this.
- return self._bdoc.content.get(self.RAW_KEY, '')
+ return first(body_docs)
def __getitem__(self, key):
"""
@@ -658,27 +736,22 @@ class LeapMessage(fields, MailParser, MBoxParser):
"""
Remove all docs associated with this message.
"""
- # XXX this would ve more efficient if we can just pass
- # a sequence of uids.
-
# XXX For the moment we are only removing the flags and headers
# docs. The rest we leave there polluting your hard disk,
# until we think about a good way of deorphaning.
# Maybe a crawler of unreferenced docs.
+ # XXX implement elijah's idea of using a PUT document as a
+ # token to ensure consistency in the removal.
+
uid = self._uid
- print "removing...", uid
fd = self._get_flags_doc()
- hd = self._get_headers_doc()
+ #hd = self._get_headers_doc()
#bd = self._get_body_doc()
#docs = [fd, hd, bd]
- docs = [fd, hd]
-
- #for pn in range(self._get_num_parts()[1:]):
- #ad = self._get_attachment_doc(pn)
- #docs.append(ad)
+ docs = [fd]
for d in filter(None, docs):
try:
@@ -703,8 +776,7 @@ SoledadWriterPayload = namedtuple(
SoledadWriterPayload.CREATE = 1
SoledadWriterPayload.PUT = 2
-SoledadWriterPayload.BODY_CREATE = 3
-SoledadWriterPayload.ATTACHMENT_CREATE = 4
+SoledadWriterPayload.CONTENT_CREATE = 3
class SoledadDocWriter(object):
@@ -723,6 +795,38 @@ class SoledadDocWriter(object):
"""
self._soledad = soledad
+ def _get_call_for_item(self, item):
+ """
+ Return the proper call type for a given item.
+
+ :param item: one of the types defined under the
+ attributes of SoledadWriterPayload
+ :type item: int
+ """
+ call = None
+ payload = item.payload
+
+ if item.mode == SoledadWriterPayload.CREATE:
+ call = self._soledad.create_doc
+ elif (item.mode == SoledadWriterPayload.CONTENT_CREATE
+ and not self._content_does_exist(payload)):
+ call = self._soledad.create_doc
+ elif item.mode == SoledadWriterPayload.PUT:
+ call = self._soledad.put_doc
+ return call
+
+ def _process(self, queue):
+ """
+ Return the item and the proper call type for the next
+ item in the queue if any.
+
+ :param queue: the queue from where we'll pick item.
+ :type queue: Queue
+ """
+ item = queue.get()
+ call = self._get_call_for_item(item)
+ return item, call
+
def consume(self, queue):
"""
Creates a new document in soledad db.
@@ -733,24 +837,10 @@ class SoledadDocWriter(object):
"""
empty = queue.empty()
while not empty:
- item = queue.get()
- call = None
- payload = item.payload
-
- if item.mode == SoledadWriterPayload.CREATE:
- call = self._soledad.create_doc
- elif item.mode == SoledadWriterPayload.BODY_CREATE:
- if not self._body_does_exist(payload):
- call = self._soledad.create_doc
- elif item.mode == SoledadWriterPayload.ATTACHMENT_CREATE:
- if not self._attachment_does_exist(payload):
- call = self._soledad.create_doc
- elif item.mode == SoledadWriterPayload.PUT:
- call = self._soledad.put_doc
-
- # XXX delete?
+ item, call = self._process(queue)
if call:
+ # XXX should handle the delete case
# should handle errors
try:
call(item.payload)
@@ -779,33 +869,10 @@ class SoledadDocWriter(object):
Stack.
"""
- def _body_does_exist(self, doc):
+ def _content_does_exist(self, doc):
"""
- Check whether we already have a body payload with this hash in our
- database.
-
- :param doc: tentative body document
- :type doc: dict
- :returns: True if that happens, False otherwise.
- """
- if not doc:
- return False
- chash = doc[fields.CONTENT_HASH_KEY]
- body_docs = self._soledad.get_from_index(
- fields.TYPE_C_HASH_IDX,
- fields.TYPE_MESSAGE_VAL, str(chash))
- if not body_docs:
- return False
- if len(body_docs) != 1:
- logger.warning("Found more than one copy of chash %s!"
- % (chash,))
- logger.debug("Found body doc with that hash! Skipping save!")
- return True
-
- def _attachment_does_exist(self, doc):
- """
- Check whether we already have an attachment payload with this hash
- in our database.
+ Check whether we already have a content document for a payload
+ with this hash in our database.
:param doc: tentative body document
:type doc: dict
@@ -816,7 +883,7 @@ class SoledadDocWriter(object):
phash = doc[fields.PAYLOAD_HASH_KEY]
attach_docs = self._soledad.get_from_index(
fields.TYPE_P_HASH_IDX,
- fields.TYPE_ATTACHMENT_VAL, str(phash))
+ fields.TYPE_CONTENT_VAL, str(phash))
if not attach_docs:
return False
@@ -840,15 +907,15 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# into a template for the class.
FLAGS_DOC = "FLAGS"
HEADERS_DOC = "HEADERS"
- ATTACHMENT_DOC = "ATTACHMENT"
- BODY_DOC = "BODY"
+ CONTENT_DOC = "CONTENT"
templates = {
FLAGS_DOC: {
fields.TYPE_KEY: fields.TYPE_FLAGS_VAL,
- fields.UID_KEY: 1,
+ fields.UID_KEY: 1, # XXX moe to a local table
fields.MBOX_KEY: fields.INBOX_VAL,
+ fields.CONTENT_HASH_KEY: "",
fields.SEEN_KEY: False,
fields.RECENT_KEY: True,
@@ -862,35 +929,28 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
fields.TYPE_KEY: fields.TYPE_HEADERS_VAL,
fields.CONTENT_HASH_KEY: "",
+ fields.DATE_KEY: "",
+ fields.SUBJECT_KEY: "",
+
fields.HEADERS_KEY: {},
- fields.NUM_PARTS_KEY: 0,
fields.PARTS_MAP_KEY: {},
- fields.DATE_KEY: "",
- fields.SUBJECT_KEY: ""
},
- ATTACHMENT_DOC: {
- fields.TYPE_KEY: fields.TYPE_ATTACHMENT_VAL,
- fields.PART_NUMBER_KEY: 0,
- fields.CONTENT_HASH_KEY: "",
+ CONTENT_DOC: {
+ fields.TYPE_KEY: fields.TYPE_CONTENT_VAL,
fields.PAYLOAD_HASH_KEY: "",
+ fields.LINKED_FROM_KEY: [],
+ fields.CTYPE_KEY: "", # should index by this too
- fields.RAW_KEY: ""
- },
-
- BODY_DOC: {
- fields.TYPE_KEY: fields.TYPE_MESSAGE_VAL,
- fields.CONTENT_HASH_KEY: "",
-
- fields.BODY_KEY: "",
-
- # this should not be needed,
- # but let's keep the raw msg for some time
- # until we are sure we can reconstruct
- # the original msg from our disection.
+ # should only get inmutable headers parts
+ # (for indexing)
+ fields.HEADERS_KEY: {},
fields.RAW_KEY: "",
+ fields.PARTS_MAP_KEY: {},
+ fields.HEADERS_KEY: {},
+ fields.MULTIPART_KEY: False,
+ },
- }
}
def __init__(self, mbox=None, soledad=None):
@@ -938,128 +998,124 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
raise TypeError("Improper type passed to _get_empty_doc")
return copy.deepcopy(self.templates[_type])
- @deferred
- def add_msg(self, raw, subject=None, flags=None, date=None, uid=1):
+ def _do_parse(self, raw):
"""
- Creates a new message document.
+ Parse raw message and return it along with
+ relevant information about its outer level.
:param raw: the raw message
- :type raw: str
-
- :param subject: subject of the message.
- :type subject: str
-
- :param flags: flags
- :type flags: list
-
- :param date: the received date for the message
- :type date: str
-
- :param uid: the message uid for this mailbox
- :type uid: int
+ :type raw: StringIO or basestring
+ :return: msg, chash, size, multi
+ :rtype: tuple
"""
- # TODO: split in smaller methods
- logger.debug('adding message')
- if flags is None:
- flags = tuple()
- leap_assert_type(flags, tuple)
-
- # docs for flags, headers, and body
- fd, hd, bd = map(
- lambda t: self._get_empty_doc(t),
- (self.FLAGS_DOC, self.HEADERS_DOC, self.BODY_DOC))
-
msg = self._get_parsed_msg(raw)
- headers = defaultdict(list)
- for k, v in msg.items():
- headers[k].append(v)
- raw_str = msg.as_string()
chash = self._get_hash(msg)
+ size = len(msg.as_string())
multi = msg.is_multipart()
+ return msg, chash, size, multi
- attaches = []
- inner_parts = []
-
- if multi:
- # XXX should walk down recursively
- # in a better way. but fixing this quick
- # to have an rc.
- # XXX should pick the content-type in txt
- body = first(msg.get_payload()).get_payload()
- if isinstance(body, list):
- # allowing one nesting level for now...
- body, rest = body[0].get_payload(), body[1:]
- for p in rest:
- inner_parts.append(p)
- else:
- body = msg.get_payload()
- logger.debug("adding msg with uid %s (multipart:%s)" % (
- uid, multi))
+ def _populate_flags(self, flags, uid, chash, size, multi):
+ """
+ Return a flags doc.
+
+ XXX Missing DOC -----------
+ """
+ fd = self._get_empty_doc(self.FLAGS_DOC)
- # flags doc ---------------------------------------
fd[self.MBOX_KEY] = self.mbox
fd[self.UID_KEY] = uid
fd[self.CONTENT_HASH_KEY] = chash
+ fd[self.SIZE_KEY] = size
fd[self.MULTIPART_KEY] = multi
- fd[self.SIZE_KEY] = len(raw_str)
if flags:
fd[self.FLAGS_KEY] = map(self._stringify, flags)
fd[self.SEEN_KEY] = self.SEEN_FLAG in flags
fd[self.DEL_KEY] = self.DELETED_FLAG in flags
fd[self.RECENT_KEY] = True # set always by default
+ return fd
- # headers doc ----------------------------------------
+ def _populate_headr(self, msg, chash, subject, date):
+ """
+ Return a headers doc.
+
+ XXX Missing DOC -----------
+ """
+ headers = defaultdict(list)
+ for k, v in msg.items():
+ headers[k].append(v)
+
+ # "fix" for repeated headers.
+ for k, v in headers.items():
+ newline = "\n%s: " % (k,)
+ headers[k] = newline.join(v)
+
+ hd = self._get_empty_doc(self.HEADERS_DOC)
hd[self.CONTENT_HASH_KEY] = chash
hd[self.HEADERS_KEY] = headers
- print "headers"
- import pprint
- pprint.pprint(headers)
-
if not subject and self.SUBJECT_FIELD in headers:
hd[self.SUBJECT_KEY] = first(headers[self.SUBJECT_FIELD])
else:
hd[self.SUBJECT_KEY] = subject
+
if not date and self.DATE_FIELD in headers:
hd[self.DATE_KEY] = first(headers[self.DATE_FIELD])
else:
hd[self.DATE_KEY] = date
- if multi:
- # XXX fix for multipart nested case
- hd[self.NUM_PARTS_KEY] = len(msg.get_payload())
-
- # body doc
- bd[self.CONTENT_HASH_KEY] = chash
- bd[self.BODY_KEY] = body
- # XXX in an ideal world, we would not need to save a copy of the
- # raw message. But we'll keep it until we can be sure that
- # we can rebuild the original message from the parts.
- bd[self.RAW_KEY] = raw_str
+ return hd
+
+ @deferred
+ def add_msg(self, raw, subject=None, flags=None, date=None, uid=1):
+ """
+ Creates a new message document.
+
+ :param raw: the raw message
+ :type raw: str
+
+ :param subject: subject of the message.
+ :type subject: str
+
+ :param flags: flags
+ :type flags: list
+
+ :param date: the received date for the message
+ :type date: str
+
+ :param uid: the message uid for this mailbox
+ :type uid: int
+ """
+ # TODO signal that we can delete the original message!-----
+ # when all the processing is done.
+
+ # TODO add the linked-from info !
+
+ logger.debug('adding message')
+ if flags is None:
+ flags = tuple()
+ leap_assert_type(flags, tuple)
+
+ # parse
+ msg, chash, size, multi = self._do_parse(raw)
+
+ fd = self._populate_flags(flags, uid, chash, size, multi)
+ hd = self._populate_headr(msg, chash, subject, date)
+
+ parts = walk.get_parts(msg)
+ body_phash_fun = [walk.get_body_phash_simple,
+ walk.get_body_phash_multi][int(multi)]
+ body_phash = body_phash_fun(walk.get_payloads(msg))
+ parts_map = walk.walk_msg_tree(parts, body_phash=body_phash)
+
+ # add parts map to header doc
+ # (body, multi, part_map)
+ for key in parts_map:
+ hd[key] = parts_map[key]
+ del parts_map
docs = [fd, hd]
+ cdocs = walk.get_raw_docs(msg, parts)
- # attachment docs
- if multi:
- outer_parts = msg.get_payload()
- parts = outer_parts + inner_parts
-
- # skip first part, we already got it in body
- to_attach = ((i, m) for i, m in enumerate(parts) if i > 0)
- for index, part_msg in to_attach:
- att_doc = self._get_empty_doc(self.ATTACHMENT_DOC)
- att_doc[self.PART_NUMBER_KEY] = index
- att_doc[self.CONTENT_HASH_KEY] = chash
- phash = self._get_hash(part_msg)
- att_doc[self.PAYLOAD_HASH_KEY] = phash
- att_doc[self.RAW_KEY] = part_msg.as_string()
-
- # keep a pointer to the payload hash in the
- # headers doc, under the parts_map
- hd[self.PARTS_MAP_KEY][str(index)] = phash
- attaches.append(att_doc)
-
- # Saving ... -------------------------------
- # ok, there we go...
+ # Saving
logger.debug('enqueuing message docs for write')
ptuple = SoledadWriterPayload
@@ -1067,14 +1123,12 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
for doc in docs:
self.soledad_writer.put(ptuple(
mode=ptuple.CREATE, payload=doc))
- # second, try to create body doc.
- self.soledad_writer.put(ptuple(
- mode=ptuple.BODY_CREATE, payload=bd))
+
# and last, but not least, try to create
- # attachment docs if not already there.
- for at in attaches:
+ # content docs if not already there.
+ for cd in cdocs:
self.soledad_writer.put(ptuple(
- mode=ptuple.ATTACHMENT_CREATE, payload=at))
+ mode=ptuple.CONTENT_CREATE, payload=cd))
def _remove_cb(self, result):
return result