diff options
author | Kali Kaneko <kali@leap.se> | 2013-12-27 02:06:44 -0400 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2014-01-08 20:32:08 -0400 |
commit | 72d07af0986d926af8bcd9b5435e0fa0f008db12 (patch) | |
tree | f42448e1d5d36c6ff0bdcc5743462fde7cd573ec | |
parent | 9fe076c87370030bcdd715c766c7d3515634edb7 (diff) |
First stage of the storage schema rewrite.
* Separates between flags, docs, body and attachment docs.
* Implement IMessageCopier interface: move and have fun!
This little change is known to push forward our beloved architect
emotional rollercoster.
* Message deduplication.
* It also fixes a hidden bug that was rendering the multipart mime
interface useless
(yes, the "True" parameter in the parsestr method).
* Does not handle well nested attachs, includes dirty workaround that
flattens them.
* Includes chiiph's patch for rc2:
* return deferred from addMessage
* convert StringIO types to string
* remove unneeded yields from the chain of deferreds in fetcher
-rw-r--r-- | mail/changes/feature_split_message_docs | 6 | ||||
-rw-r--r-- | mail/src/leap/mail/imap/fetch.py | 7 | ||||
-rw-r--r-- | mail/src/leap/mail/imap/fields.py | 49 | ||||
-rw-r--r-- | mail/src/leap/mail/imap/index.py | 4 | ||||
-rw-r--r-- | mail/src/leap/mail/imap/mailbox.py | 103 | ||||
-rw-r--r-- | mail/src/leap/mail/imap/messages.py | 831 | ||||
-rw-r--r-- | mail/src/leap/mail/imap/parser.py | 24 |
7 files changed, 808 insertions, 216 deletions
diff --git a/mail/changes/feature_split_message_docs b/mail/changes/feature_split_message_docs new file mode 100644 index 0000000..231c36e --- /dev/null +++ b/mail/changes/feature_split_message_docs @@ -0,0 +1,6 @@ + o Defer costly operations to a pool of threads. + o Split the internal representation of messages into four distinct documents: + 1) Flags 2) Headers 3) Body 4) Attachments. + o Add deduplication ability to the save operation, for body and attachments. + o Add IMessageCopier interface to mailbox implementation, so bulk moves + are costless. Closes: #4654 diff --git a/mail/src/leap/mail/imap/fetch.py b/mail/src/leap/mail/imap/fetch.py index 0b31c3b..fdf1412 100644 --- a/mail/src/leap/mail/imap/fetch.py +++ b/mail/src/leap/mail/imap/fetch.py @@ -412,13 +412,13 @@ class LeapIncomingMail(object): # decrypt or fail gracefully try: - decrdata, valid_sig = yield self._decrypt_and_verify_data( + decrdata, valid_sig = self._decrypt_and_verify_data( encdata, senderPubkey) except keymanager_errors.DecryptError as e: logger.warning('Failed to decrypt encrypted message (%s). ' 'Storing message without modifications.' % str(e)) # Bailing out! - yield (msg, False) + return (msg, False) # decrypted successully, now fix encoding and parse try: @@ -441,7 +441,7 @@ class LeapIncomingMail(object): # all ok, replace payload by unencrypted payload msg.set_payload(decrmsg.get_payload()) - yield (msg, valid_sig) + return (msg, valid_sig) def _maybe_decrypt_inline_encrypted_msg(self, origmsg, encoding, senderPubkey): @@ -527,6 +527,7 @@ class LeapIncomingMail(object): """ log.msg('adding message to local db') doc, data = msgtuple + if isinstance(data, list): data = data[0] diff --git a/mail/src/leap/mail/imap/fields.py b/mail/src/leap/mail/imap/fields.py index 96b937e..40817cd 100644 --- a/mail/src/leap/mail/imap/fields.py +++ b/mail/src/leap/mail/imap/fields.py @@ -25,18 +25,35 @@ class WithMsgFields(object): Container class for class-attributes to be shared by several message-related classes. """ - # Internal representation of Message - DATE_KEY = "date" - HEADERS_KEY = "headers" - FLAGS_KEY = "flags" - MBOX_KEY = "mbox" + # indexing CONTENT_HASH_KEY = "chash" - RAW_KEY = "raw" - SUBJECT_KEY = "subject" + PAYLOAD_HASH_KEY = "phash" + + # Internal representation of Message + + # flags doc UID_KEY = "uid" + MBOX_KEY = "mbox" + SEEN_KEY = "seen" + RECENT_KEY = "recent" + FLAGS_KEY = "flags" MULTIPART_KEY = "multi" SIZE_KEY = "size" + # headers + HEADERS_KEY = "headers" + NUM_PARTS_KEY = "numparts" + PARTS_MAP_KEY = "partmap" + DATE_KEY = "date" + SUBJECT_KEY = "subject" + + # attachment + PART_NUMBER_KEY = "part" + RAW_KEY = "raw" + + # content + BODY_KEY = "body" + # Mailbox specific keys CLOSED_KEY = "closed" CREATED_KEY = "created" @@ -55,10 +72,6 @@ class WithMsgFields(object): INBOX_VAL = "inbox" - # Flags for SoledadDocument for indexing. - SEEN_KEY = "seen" - RECENT_KEY = "recent" - # Flags in Mailbox and Message SEEN_FLAG = "\\Seen" RECENT_FLAG = "\\Recent" @@ -82,7 +95,9 @@ class WithMsgFields(object): TYPE_SUBS_IDX = 'by-type-and-subscribed' TYPE_MBOX_SEEN_IDX = 'by-type-and-mbox-and-seen' TYPE_MBOX_RECT_IDX = 'by-type-and-mbox-and-recent' - TYPE_HASH_IDX = 'by-type-and-hash' + TYPE_C_HASH_IDX = 'by-type-and-contenthash' + TYPE_C_HASH_PART_IDX = 'by-type-and-contenthash-and-partnumber' + TYPE_P_HASH_IDX = 'by-type-and-payloadhash' # Tomas created the `recent and seen index`, but the semantic is not too # correct since the recent flag is volatile. @@ -90,7 +105,9 @@ class WithMsgFields(object): KTYPE = TYPE_KEY MBOX_VAL = TYPE_MBOX_VAL - HASH_VAL = CONTENT_HASH_KEY + CHASH_VAL = CONTENT_HASH_KEY + PHASH_VAL = PAYLOAD_HASH_KEY + PART_VAL = PART_NUMBER_KEY INDEXES = { # generic @@ -102,7 +119,11 @@ class WithMsgFields(object): TYPE_SUBS_IDX: [KTYPE, 'bool(subscribed)'], # content, headers doc - TYPE_HASH_IDX: [KTYPE, HASH_VAL], + TYPE_C_HASH_IDX: [KTYPE, CHASH_VAL], + # attachment docs + TYPE_C_HASH_PART_IDX: [KTYPE, CHASH_VAL, PART_VAL], + # attachment payload dedup + TYPE_P_HASH_IDX: [KTYPE, PHASH_VAL], # messages TYPE_MBOX_SEEN_IDX: [KTYPE, MBOX_VAL, 'bool(seen)'], diff --git a/mail/src/leap/mail/imap/index.py b/mail/src/leap/mail/imap/index.py index 2280d86..5f0919a 100644 --- a/mail/src/leap/mail/imap/index.py +++ b/mail/src/leap/mail/imap/index.py @@ -21,7 +21,7 @@ import logging from leap.common.check import leap_assert, leap_assert_type -from leap.mail.imap.account import SoledadBackedAccount +from leap.mail.imap.fields import fields logger = logging.getLogger(__name__) @@ -54,7 +54,7 @@ class IndexedDB(object): db_indexes = dict() if self._soledad is not None: db_indexes = dict(self._soledad.list_indexes()) - for name, expression in SoledadBackedAccount.INDEXES.items(): + for name, expression in fields.INDEXES.items(): if name not in db_indexes: # The index does not yet exist. self._soledad.create_index(name, *expression) diff --git a/mail/src/leap/mail/imap/mailbox.py b/mail/src/leap/mail/imap/mailbox.py index 09c06a2..5ea6f55 100644 --- a/mail/src/leap/mail/imap/mailbox.py +++ b/mail/src/leap/mail/imap/mailbox.py @@ -17,7 +17,13 @@ """ Soledad Mailbox. """ +import copy +import threading import logging +import time +import StringIO +import cStringIO + from collections import defaultdict from twisted.internet import defer @@ -45,9 +51,14 @@ class SoledadMailbox(WithMsgFields, MBoxParser): The low-level database methods are contained in MessageCollection class, which we instantiate and make accessible in the `messages` attribute. """ - implements(imap4.IMailboxInfo, imap4.IMailbox, imap4.ICloseableMailbox) + implements( + imap4.IMailbox, + imap4.IMailboxInfo, + imap4.ICloseableMailbox, + imap4.IMessageCopier) + # XXX should finish the implementation of IMailboxListener - # XXX should implement IMessageCopier too + # XXX should implement ISearchableMailbox too messages = None _closed = False @@ -65,6 +76,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): CMD_UNSEEN = "UNSEEN" _listeners = defaultdict(set) + next_uid_lock = threading.Lock() def __init__(self, mbox, soledad=None, rw=1): """ @@ -284,8 +296,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :rtype: int """ - self.last_uid += 1 - return self.last_uid + with self.next_uid_lock: + self.last_uid += 1 + return self.last_uid def getMessageCount(self): """ @@ -366,6 +379,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :return: a deferred that evals to None """ + if isinstance(message, (cStringIO.OutputType, StringIO.StringIO)): + message = message.getvalue() # XXX we should treat the message as an IMessage from here leap_assert_type(message, basestring) uid_next = self.getUIDNext() @@ -375,11 +390,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser): else: flags = tuple(str(flag) for flag in flags) - d = self._do_add_messages(message, flags, date, uid_next) + d = self._do_add_message(message, flags, date, uid_next) d.addCallback(self._notify_new) + return d @deferred - def _do_add_messages(self, message, flags, date, uid_next): + def _do_add_message(self, message, flags, date, uid_next): """ Calls to the messageCollection add_msg method (deferred to thread). Invoked from addMessage. @@ -420,28 +436,21 @@ class SoledadMailbox(WithMsgFields, MBoxParser): # we should postpone the removal self._soledad.delete_doc(self._get_mbox()) + @deferred def expunge(self): """ Remove all messages flagged \\Deleted """ if not self.isWriteable(): raise imap4.ReadOnlyMailbox - delete = [] deleted = [] - - for m in self.messages.get_all_docs(): - # XXX should operate with LeapMessages instead, - # so we don't expose the implementation. - # (so, iterate for m in self.messages) - if self.DELETED_FLAG in m.content[self.FLAGS_KEY]: - delete.append(m) - for m in delete: - deleted.append(m.content) - self.messages.remove(m) - - # XXX should return the UIDs of the deleted messages - # more generically - return [x for x in range(len(deleted))] + for m in self.messages: + if self.DELETED_FLAG in m.getFlags(): + self.messages.remove(m) + # XXX this would ve more efficient if we can just pass + # a sequence of uids. + deleted.append(m.getUID()) + return deleted @deferred def fetch(self, messages, uid): @@ -510,6 +519,17 @@ class SoledadMailbox(WithMsgFields, MBoxParser): session is the first session to be notified about a message, then that message SHOULD be considered recent. """ + # TODO this fucker, for the sake of correctness, is messing with + # the whole collection of flag docs. + + # Possible ways of action: + # 1. Ignore it, we want fun. + # 2. Trigger it with a delay + # 3. Route it through a queue with lesser priority than the + # regularar writer. + + # hmm let's try 2. in a quickndirty way... + time.sleep(1) log.msg('unsetting recent flags...') for msg in self.messages.get_recent(): msg.removeFlags((fields.RECENT_FLAG,)) @@ -570,6 +590,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser): for msg_id in messages: log.msg("MSG ID = %s" % msg_id) msg = self.messages.get_msg_by_uid(msg_id) + if not msg: + return result if mode == 1: msg.addFlags(flags) elif mode == -1: @@ -589,15 +611,36 @@ class SoledadMailbox(WithMsgFields, MBoxParser): self.expunge() self.closed = True - #@deferred - #def copy(self, messageObject): - #""" - #Copy the given message object into this mailbox. - #""" - # XXX should just: - # 1. Get the message._fdoc - # 2. Change the UID to UIDNext for this mailbox - # 3. Add implements IMessageCopier + # IMessageCopier + + @deferred + def copy(self, messageObject): + """ + Copy the given message object into this mailbox. + """ + uid_next = self.getUIDNext() + msg = messageObject + + # XXX should use a public api instead + fdoc = msg._fdoc + if not fdoc: + logger.debug("Tried to copy a MSG with no fdoc") + return + + new_fdoc = copy.deepcopy(fdoc.content) + new_fdoc[self.UID_KEY] = uid_next + new_fdoc[self.MBOX_KEY] = self.mbox + + d = self._do_add_doc(new_fdoc) + d.addCallback(self._notify_new) + + @deferred + def _do_add_doc(self, doc): + """ + Defers the adding of a new doc. + :param doc: document to be created in soledad. + """ + self._soledad.create_doc(doc) # convenience fun diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py index b0d5da2..c69c023 100644 --- a/mail/src/leap/mail/imap/messages.py +++ b/mail/src/leap/mail/imap/messages.py @@ -29,9 +29,9 @@ from zope.interface import implements from zope.proxy import sameProxiedObjects from leap.common.check import leap_assert, leap_assert_type +from leap.common.decorators import memoized_method from leap.common.mail import get_email_charset from leap.mail.decorators import deferred -from leap.mail.imap.account import SoledadBackedAccount from leap.mail.imap.index import IndexedDB from leap.mail.imap.fields import fields, WithMsgFields from leap.mail.imap.parser import MailParser, MBoxParser @@ -40,6 +40,181 @@ from leap.mail.messageflow import IMessageConsumer, MessageProducer logger = logging.getLogger(__name__) +def first(things): + """ + Return the head of a collection. + """ + try: + return things[0] + except (IndexError, TypeError): + return None + + +class MessageBody(object): + """ + IMessagePart implementor for the main + body of a multipart message. + + Excusatio non petita: see the interface documentation. + """ + + implements(imap4.IMessagePart) + + def __init__(self, fdoc, bdoc): + self._fdoc = fdoc + self._bdoc = bdoc + + def getSize(self): + return len(self._bdoc.content[fields.BODY_KEY]) + + def getBodyFile(self): + fd = StringIO.StringIO() + + if self._bdoc: + body = self._bdoc.content[fields.BODY_KEY] + else: + body = "" + charset = self._get_charset(body) + try: + body = body.encode(charset) + except (UnicodeEncodeError, UnicodeDecodeError) as e: + logger.error("Unicode error {0}".format(e)) + body = body.encode(charset, 'replace') + fd.write(body) + fd.seek(0) + return fd + + @memoized_method + def _get_charset(self, stuff): + return get_email_charset(unicode(stuff)) + + def getHeaders(self, negate, *names): + return {} + + def isMultipart(self): + return False + + def getSubPart(self, part): + return None + + +class MessageAttachment(object): + + implements(imap4.IMessagePart) + + def __init__(self, msg): + """ + Initializes the messagepart with a Message instance. + :param msg: a message instance + :type msg: Message + """ + self._msg = msg + + def getSize(self): + """ + Return the total size, in octets, of this message part. + + :return: size of the message, in octets + :rtype: int + """ + if not self._msg: + return 0 + return len(self._msg.as_string()) + + def getBodyFile(self): + """ + Retrieve a file object containing only the body of this message. + + :return: file-like object opened for reading + :rtype: StringIO + """ + fd = StringIO.StringIO() + if self._msg: + body = self._msg.get_payload() + else: + logger.debug("Empty message!") + body = "" + + # XXX should only do the dance if we're sure it's + # content/text-plain!!! + #charset = self._get_charset(body) + #try: + #body = body.encode(charset) + #except (UnicodeEncodeError, UnicodeDecodeError) as e: + #logger.error("Unicode error {0}".format(e)) + #body = body.encode(charset, 'replace') + fd.write(body) + fd.seek(0) + return fd + + @memoized_method + def _get_charset(self, stuff): + # TODO put in a common class with LeapMessage + """ + Gets (guesses?) the charset of a payload. + + :param stuff: the stuff to guess about. + :type stuff: basestring + :returns: charset + """ + # XXX existential doubt 1. wouldn't be smarter to + # peek into the mail headers? + # XXX existential doubt 2. shouldn't we make the scope + # of the decorator somewhat more persistent? + # ah! yes! and put memory bounds. + return get_email_charset(unicode(stuff)) + + def getHeaders(self, negate, *names): + """ + Retrieve a group of message headers. + + :param names: The names of the headers to retrieve or omit. + :type names: tuple of str + + :param negate: If True, indicates that the headers listed in names + should be omitted from the return value, rather + than included. + :type negate: bool + + :return: A mapping of header field names to header field values + :rtype: dict + """ + if not self._msg: + return {} + headers = dict(self._msg.items()) + names = map(lambda s: s.upper(), names) + if negate: + cond = lambda key: key.upper() not in names + else: + cond = lambda key: key.upper() in names + + # unpack and filter original dict by negate-condition + filter_by_cond = [ + map(str, (key, val)) for + key, val in headers.items() + if cond(key)] + return dict(filter_by_cond) + + def isMultipart(self): + """ + Return True if this message is multipart. + """ + return self._msg.is_multipart() + + def getSubPart(self, part): + """ + Retrieve a MIME submessage + + :type part: C{int} + :param part: The number of the part to retrieve, indexed from 0. + :raise IndexError: Raised if the specified part does not exist. + :raise TypeError: Raised if this message is not multipart. + :rtype: Any object implementing C{IMessagePart}. + :return: The specified sub-part. + """ + return self._msg.get_payload() + + class LeapMessage(fields, MailParser, MBoxParser): implements(imap4.IMessage) @@ -59,25 +234,21 @@ class LeapMessage(fields, MailParser, MBoxParser): self._soledad = soledad self._uid = int(uid) self._mbox = self._parse_mailbox_name(mbox) - self._chash = None - self.__cdoc = None + self.__chash = None + self.__bdoc = None @property def _fdoc(self): """ An accessor to the flags document. """ - return self._get_flags_doc() - - @property - def _cdoc(self): - """ - An accessor to the content document. - """ - if not self.__cdoc: - self.__cdoc = self._get_content_doc() - return self.__cdoc + if all(map(bool, (self._uid, self._mbox))): + fdoc = self._get_flags_doc() + if fdoc: + self.__chash = fdoc.content.get( + fields.CONTENT_HASH_KEY, None) + return fdoc @property def _chash(self): @@ -86,7 +257,26 @@ class LeapMessage(fields, MailParser, MBoxParser): """ if not self._fdoc: return None - return self._fdoc.content.get(fields.CONTENT_HASH_KEY, None) + if not self.__chash and self._fdoc: + self.__chash = self._fdoc.content.get( + fields.CONTENT_HASH_KEY, None) + return self.__chash + + @property + def _hdoc(self): + """ + An accessor to the headers document. + """ + return self._get_headers_doc() + + @property + def _bdoc(self): + """ + An accessor to the body document. + """ + if not self.__bdoc: + self.__bdoc = self._get_body_doc() + return self.__bdoc # IMessage implementation @@ -110,9 +300,9 @@ class LeapMessage(fields, MailParser, MBoxParser): return [] flags = [] - flag_doc = self._fdoc - if flag_doc: - flags = flag_doc.content.get(self.FLAGS_KEY, None) + fdoc = self._fdoc + if fdoc: + flags = fdoc.content.get(self.FLAGS_KEY, None) if flags: flags = map(str, flags) return tuple(flags) @@ -180,7 +370,7 @@ class LeapMessage(fields, MailParser, MBoxParser): :rtype: C{str} :return: An RFC822-formatted date string. """ - return str(self._cdoc.content.get(self.DATE_KEY, '')) + return str(self._hdoc.content.get(self.DATE_KEY, '')) # # IMessagePart @@ -197,25 +387,38 @@ class LeapMessage(fields, MailParser, MBoxParser): :rtype: StringIO """ fd = StringIO.StringIO() + bdoc = self._bdoc + if bdoc: + body = self._bdoc.content.get(self.BODY_KEY, "") + else: + body = "" - cdoc = self._cdoc - content = cdoc.content.get(self.RAW_KEY, '') - charset = get_email_charset( - unicode(cdoc.content.get(self.RAW_KEY, ''))) + charset = self._get_charset(body) try: - content = content.encode(charset) + body = body.encode(charset) except (UnicodeEncodeError, UnicodeDecodeError) as e: logger.error("Unicode error {0}".format(e)) - content = content.encode(charset, 'replace') - - raw = self._get_raw_msg() - msg = self._get_parsed_msg(raw) - body = msg.get_payload() + body = body.encode(charset, 'replace') fd.write(body) - # XXX SHOULD use a separate BODY FIELD ... fd.seek(0) return fd + @memoized_method + def _get_charset(self, stuff): + """ + Gets (guesses?) the charset of a payload. + + :param stuff: the stuff to guess about. + :type stuff: basestring + :returns: charset + """ + # XXX existential doubt 1. wouldn't be smarter to + # peek into the mail headers? + # XXX existential doubt 2. shouldn't we make the scope + # of the decorator somewhat more persistent? + # ah! yes! and put memory bounds. + return get_email_charset(unicode(stuff)) + def getSize(self): """ Return the total size, in octets, of this message. @@ -223,19 +426,17 @@ class LeapMessage(fields, MailParser, MBoxParser): :return: size of the message, in octets :rtype: int """ - size = self._cdoc.content.get(self.SIZE_KEY, False) + size = None + if self._fdoc: + size = self._fdoc.content.get(self.SIZE_KEY, False) + else: + logger.warning("No FLAGS doc for %s:%s" % (self._mbox, + self._uid)) if not size: # XXX fallback, should remove when all migrated. size = self.getBodyFile().len return size - def _get_headers(self): - """ - Return the headers dict stored in this message document. - """ - # XXX get from the headers doc - return self._cdoc.content.get(self.HEADERS_KEY, {}) - def getHeaders(self, negate, *names): """ Retrieve a group of message headers. @@ -252,26 +453,49 @@ class LeapMessage(fields, MailParser, MBoxParser): :rtype: dict """ headers = self._get_headers() + if not headers: + return {'content-type': ''} names = map(lambda s: s.upper(), names) if negate: cond = lambda key: key.upper() not in names else: cond = lambda key: key.upper() in names + head = copy.deepcopy(dict(headers.items())) + + # twisted imap server expects headers to be lowercase + head = dict( + map(str, (key, value)) if key.lower() != "content-type" + else map(str, (key.lower(), value)) + for (key, value) in head.items()) + # unpack and filter original dict by negate-condition - filter_by_cond = [ - map(str, (key, val)) for - key, val in headers.items() - if cond(key)] + filter_by_cond = [(key, val) for key, val in head.items() if cond(key)] return dict(filter_by_cond) + def _get_headers(self): + """ + Return the headers dict for this message. + """ + if self._hdoc is not None: + return self._hdoc.content.get(self.HEADERS_KEY, {}) + else: + logger.warning( + "No HEADERS doc for msg %s:%s" % ( + self._mbox, + self._uid)) + def isMultipart(self): """ Return True if this message is multipart. """ - if self._cdoc: - retval = self._fdoc.content.get(self.MULTIPART_KEY, False) - return retval + if self._fdoc: + return self._fdoc.content.get(self.MULTIPART_KEY, False) + else: + logger.warning( + "No FLAGS doc for msg %s:%s" % ( + self.mbox, + self.uid)) def getSubPart(self, part): """ @@ -284,12 +508,22 @@ class LeapMessage(fields, MailParser, MBoxParser): :rtype: Any object implementing C{IMessagePart}. :return: The specified sub-part. """ + logger.debug("Getting subpart: %s" % part) if not self.isMultipart(): raise TypeError - msg = self._get_parsed_msg() - # XXX should wrap IMessagePart - return msg.get_payload()[part] + if part == 0: + # Let's get the first part, which + # is really the body. + return MessageBody(self._fdoc, self._bdoc) + + attach_doc = self._get_attachment_doc(part) + if not attach_doc: + # so long and thanks for all the fish + logger.debug("...not today") + raise IndexError + msg_part = self._get_parsed_msg(attach_doc.content[self.RAW_KEY]) + return MessageAttachment(msg_part) # # accessors @@ -301,32 +535,87 @@ class LeapMessage(fields, MailParser, MBoxParser): message. """ flag_docs = self._soledad.get_from_index( - SoledadBackedAccount.TYPE_MBOX_UID_IDX, + fields.TYPE_MBOX_UID_IDX, fields.TYPE_FLAGS_VAL, self._mbox, str(self._uid)) - flag_doc = flag_docs[0] if flag_docs else None - return flag_doc + return first(flag_docs) - def _get_content_doc(self): + def _get_headers_doc(self): """ - Return the document that keeps the flags for this + Return the document that keeps the headers for this + message. + """ + head_docs = self._soledad.get_from_index( + fields.TYPE_C_HASH_IDX, + fields.TYPE_HEADERS_VAL, str(self._chash)) + return first(head_docs) + + def _get_body_doc(self): + """ + Return the document that keeps the body for this message. """ - cont_docs = self._soledad.get_from_index( - SoledadBackedAccount.TYPE_HASH_IDX, - fields.TYPE_MESSAGE_VAL, self._content_hash, str(self._uid)) - cont_doc = cont_docs[0] if cont_docs else None - return cont_doc + body_docs = self._soledad.get_from_index( + fields.TYPE_C_HASH_IDX, + fields.TYPE_MESSAGE_VAL, str(self._chash)) + return first(body_docs) + + def _get_num_parts(self): + """ + Return the number of parts for a multipart message. + """ + if not self.isMultipart(): + raise TypeError( + "Tried to get num parts in a non-multipart message") + if not self._hdoc: + return None + return self._hdoc.content.get(fields.NUM_PARTS_KEY, 2) + + def _get_attachment_doc(self, part): + """ + Return the document that keeps the headers for this + message. + + :param part: the part number for the multipart message. + :type part: int + """ + if not self._hdoc: + return None + try: + phash = self._hdoc.content[self.PARTS_MAP_KEY][str(part)] + except KeyError: + # this is the remnant of a debug session until + # I found that the index is actually a string... + # It should be safe to just raise the KeyError now, + # but leaving it here while the blood is fresh... + logger.warning("We expected a phash in the " + "index %s, but noone found" % (part, )) + logger.debug(self._hdoc.content[self.PARTS_MAP_KEY]) + return None + attach_docs = self._soledad.get_from_index( + fields.TYPE_P_HASH_IDX, + fields.TYPE_ATTACHMENT_VAL, str(phash)) + + # The following is true for the fist owner. + # We could use this relationship to flag the "owner" + # and orphan when we delete it. + + #attach_docs = self._soledad.get_from_index( + #fields.TYPE_C_HASH_PART_IDX, + #fields.TYPE_ATTACHMENT_VAL, str(self._chash), str(part)) + return first(attach_docs) def _get_raw_msg(self): """ Return the raw msg. :rtype: basestring """ - return self._cdoc.content.get(self.RAW_KEY, '') + # TODO deprecate this. + return self._bdoc.content.get(self.RAW_KEY, '') def __getitem__(self, key): """ - Return the content of the message document. + Return an item from the content of the flags document, + for convenience. :param key: The key :type key: str @@ -334,14 +623,73 @@ class LeapMessage(fields, MailParser, MBoxParser): :return: The content value indexed by C{key} or None :rtype: str """ - return self._cdoc.content.get(key, None) + return self._fdoc.content.get(key, None) + + # setters + + # XXX to be used in the messagecopier interface?! + + def set_uid(self, uid): + """ + Set new uid for this message. + + :param uid: the new uid + :type uid: basestring + """ + # XXX dangerous! lock? + self._uid = uid + d = self._fdoc + d.content[self.UID_KEY] = uid + self._soledad.put_doc(d) + + def set_mbox(self, mbox): + """ + Set new mbox for this message. + + :param mbox: the new mbox + :type mbox: basestring + """ + # XXX dangerous! lock? + self._mbox = mbox + d = self._fdoc + d.content[self.MBOX_KEY] = mbox + self._soledad.put_doc(d) + + # destructor + + @deferred + def remove(self): + """ + Remove all docs associated with this message. + """ + # XXX this would ve more efficient if we can just pass + # a sequence of uids. + + # XXX For the moment we are only removing the flags and headers + # docs. The rest we leave there polluting your hard disk, + # until we think about a good way of deorphaning. + # Maybe a crawler of unreferenced docs. + + fd = self._get_flags_doc() + hd = self._get_headers_doc() + #bd = self._get_body_doc() + #docs = [fd, hd, bd] + + docs = [fd, hd] + + #for pn in range(self._get_num_parts()[1:]): + #ad = self._get_attachment_doc(pn) + #docs.append(ad) + + for d in filter(None, docs): + self._soledad.delete_doc(d) def does_exist(self): """ - Return True if there is actually a message for this + Return True if there is actually a flags message for this UID and mbox. """ - return bool(self._fdoc) + return self._fdoc is not None SoledadWriterPayload = namedtuple( @@ -349,6 +697,8 @@ SoledadWriterPayload = namedtuple( SoledadWriterPayload.CREATE = 1 SoledadWriterPayload.PUT = 2 +SoledadWriterPayload.BODY_CREATE = 3 +SoledadWriterPayload.ATTACHMENT_CREATE = 4 class SoledadDocWriter(object): @@ -378,20 +728,98 @@ class SoledadDocWriter(object): empty = queue.empty() while not empty: item = queue.get() + call = None + payload = item.payload + if item.mode == SoledadWriterPayload.CREATE: call = self._soledad.create_doc + elif item.mode == SoledadWriterPayload.BODY_CREATE: + if not self._body_does_exist(payload): + call = self._soledad.create_doc + elif item.mode == SoledadWriterPayload.ATTACHMENT_CREATE: + if not self._attachment_does_exist(payload): + call = self._soledad.create_doc elif item.mode == SoledadWriterPayload.PUT: call = self._soledad.put_doc - # should handle errors - try: - call(item.payload) - except u1db_errors.RevisionConflict as exc: - logger.error("Error: %r" % (exc,)) - raise exc + # XXX delete? + + if call: + # should handle errors + try: + call(item.payload) + except u1db_errors.RevisionConflict as exc: + logger.error("Error: %r" % (exc,)) + raise exc empty = queue.empty() + """ + Message deduplication. + + We do a query for the content hashes before writing to our beloved + slcipher backend of Soledad. This means, by now, that: + + 1. We will not store the same attachment twice, only the hash of it. + 2. We will not store the same message body twice, only the hash of it. + + The first case is useful if you are always receiving the same old memes + from unwary friends that still have not discovered that 4chan is the + generator of the internet. The second will save your day if you have + initiated session with the same account in two different machines. I also + wonder why would you do that, but let's respect each other choices, like + with the religious celebrations, and assume that one day we'll be able + to run Bitmask in completely free phones. Yes, I mean that, the whole GSM + Stack. + """ + + def _body_does_exist(self, doc): + """ + Check whether we already have a body payload with this hash in our + database. + + :param doc: tentative body document + :type doc: dict + :returns: True if that happens, False otherwise. + """ + if not doc: + return False + chash = doc[fields.CONTENT_HASH_KEY] + body_docs = self._soledad.get_from_index( + fields.TYPE_C_HASH_IDX, + fields.TYPE_MESSAGE_VAL, str(chash)) + if not body_docs: + return False + if len(body_docs) != 1: + logger.warning("Found more than one copy of chash %s!" + % (chash,)) + logger.debug("Found body doc with that hash! Skipping save!") + return True + + def _attachment_does_exist(self, doc): + """ + Check whether we already have an attachment payload with this hash + in our database. + + :param doc: tentative body document + :type doc: dict + :returns: True if that happens, False otherwise. + """ + if not doc: + return False + phash = doc[fields.PAYLOAD_HASH_KEY] + attach_docs = self._soledad.get_from_index( + fields.TYPE_P_HASH_IDX, + fields.TYPE_ATTACHMENT_VAL, str(phash)) + if not attach_docs: + return False + + if len(attach_docs) != 1: + logger.warning("Found more than one copy of phash %s!" + % (phash,)) + logger.debug("Found attachment doc with that hash! Skipping save!") + return True + class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): """ @@ -402,35 +830,62 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): database. """ # XXX this should be able to produce a MessageSet methinks - - EMPTY_MSG = { - fields.TYPE_KEY: fields.TYPE_MESSAGE_VAL, - fields.UID_KEY: 1, - fields.MBOX_KEY: fields.INBOX_VAL, - - fields.SUBJECT_KEY: "", - fields.DATE_KEY: "", - fields.RAW_KEY: "", - - # XXX should separate headers into another doc - fields.HEADERS_KEY: {}, + # could validate these kinds of objects turning them + # into a template for the class. + FLAGS_DOC = "FLAGS" + HEADERS_DOC = "HEADERS" + ATTACHMENT_DOC = "ATTACHMENT" + BODY_DOC = "BODY" + + templates = { + + FLAGS_DOC: { + fields.TYPE_KEY: fields.TYPE_FLAGS_VAL, + fields.UID_KEY: 1, + fields.MBOX_KEY: fields.INBOX_VAL, + + fields.SEEN_KEY: False, + fields.RECENT_KEY: True, + fields.FLAGS_KEY: [], + fields.MULTIPART_KEY: False, + fields.SIZE_KEY: 0 + }, + + HEADERS_DOC: { + fields.TYPE_KEY: fields.TYPE_HEADERS_VAL, + fields.CONTENT_HASH_KEY: "", + + fields.HEADERS_KEY: {}, + fields.NUM_PARTS_KEY: 0, + fields.PARTS_MAP_KEY: {}, + fields.DATE_KEY: "", + fields.SUBJECT_KEY: "" + }, + + ATTACHMENT_DOC: { + fields.TYPE_KEY: fields.TYPE_ATTACHMENT_VAL, + fields.PART_NUMBER_KEY: 0, + fields.CONTENT_HASH_KEY: "", + fields.PAYLOAD_HASH_KEY: "", + + fields.RAW_KEY: "" + }, + + BODY_DOC: { + fields.TYPE_KEY: fields.TYPE_MESSAGE_VAL, + fields.CONTENT_HASH_KEY: "", + + fields.BODY_KEY: "", + + # this should not be needed, + # but let's keep the raw msg for some time + # until we are sure we can reconstruct + # the original msg from our disection. + fields.RAW_KEY: "", + + } } - EMPTY_FLAGS = { - fields.TYPE_KEY: fields.TYPE_FLAGS_VAL, - fields.UID_KEY: 1, - fields.MBOX_KEY: fields.INBOX_VAL, - - fields.FLAGS_KEY: [], - fields.SEEN_KEY: False, - fields.RECENT_KEY: True, - fields.MULTIPART_KEY: False, - } - - # get from SoledadBackedAccount the needed index-related constants - INDEXES = SoledadBackedAccount.INDEXES - TYPE_IDX = SoledadBackedAccount.TYPE_IDX - def __init__(self, mbox=None, soledad=None): """ Constructor for MessageCollection. @@ -465,23 +920,16 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): SoledadDocWriter(soledad), period=0.05) - def _get_empty_msg(self): + def _get_empty_doc(self, _type=FLAGS_DOC): """ - Returns an empty message. - - :return: a dict containing a default empty message + Returns an empty doc for storing different message parts. + Defaults to returning a template for a flags document. + :return: a dict with the template :rtype: dict """ - return copy.deepcopy(self.EMPTY_MSG) - - def _get_empty_flags_doc(self): - """ - Returns an empty doc for storing flags. - - :return: - :rtype: - """ - return copy.deepcopy(self.EMPTY_FLAGS) + if not _type in self.templates.keys(): + raise TypeError("Improper type passed to _get_empty_doc") + return copy.deepcopy(self.templates[_type]) @deferred def add_msg(self, raw, subject=None, flags=None, date=None, uid=1): @@ -509,52 +957,107 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): flags = tuple() leap_assert_type(flags, tuple) - content_doc = self._get_empty_msg() - flags_doc = self._get_empty_flags_doc() - - content_doc[self.MBOX_KEY] = self.mbox - flags_doc[self.MBOX_KEY] = self.mbox - # ...should get a sanity check here. - content_doc[self.UID_KEY] = uid - flags_doc[self.UID_KEY] = uid - - if flags: - flags_doc[self.FLAGS_KEY] = map(self._stringify, flags) - flags_doc[self.SEEN_KEY] = self.SEEN_FLAG in flags + # docs for flags, headers, and body + fd, hd, bd = map( + lambda t: self._get_empty_doc(t), + (self.FLAGS_DOC, self.HEADERS_DOC, self.BODY_DOC)) msg = self._get_parsed_msg(raw) headers = dict(msg) - - logger.debug("adding. is multipart:%s" % msg.is_multipart()) - flags_doc[self.MULTIPART_KEY] = msg.is_multipart() - # XXX get lower case for keys? - # XXX get headers doc - content_doc[self.HEADERS_KEY] = headers - # set subject based on message headers and eventually replace by - # subject given as param - if self.SUBJECT_FIELD in headers: - content_doc[self.SUBJECT_KEY] = headers[self.SUBJECT_FIELD] - if subject is not None: - content_doc[self.SUBJECT_KEY] = subject - - # XXX could separate body into its own doc - # but should also separate multiparts - # that should be wrapped in MessagePart - content_doc[self.RAW_KEY] = self._stringify(raw) - content_doc[self.SIZE_KEY] = len(raw) - + raw_str = msg.as_string() + chash = self._get_hash(msg) + multi = msg.is_multipart() + + attaches = [] + inner_parts = [] + + if multi: + # XXX should walk down recursively + # in a better way. but fixing this quick + # to have an rc. + # XXX should pick the content-type in txt + body = first(msg.get_payload()).get_payload() + if isinstance(body, list): + # allowing one nesting level for now... + body, rest = body[0].get_payload(), body[1:] + for p in rest: + inner_parts.append(p) + else: + body = msg.get_payload() + logger.debug("adding msg (multipart:%s)" % multi) + + # flags doc --------------------------------------- + fd[self.MBOX_KEY] = self.mbox + fd[self.UID_KEY] = uid + fd[self.CONTENT_HASH_KEY] = chash + fd[self.MULTIPART_KEY] = multi + fd[self.SIZE_KEY] = len(raw_str) + if flags: + fd[self.FLAGS_KEY] = map(self._stringify, flags) + fd[self.SEEN_KEY] = self.SEEN_FLAG in flags + fd[self.RECENT_KEY] = self.RECENT_FLAG in flags + + # headers doc ---------------------------------------- + hd[self.CONTENT_HASH_KEY] = chash + hd[self.HEADERS_KEY] = headers + if not subject and self.SUBJECT_FIELD in headers: + hd[self.SUBJECT_KEY] = headers[self.SUBJECT_FIELD] + else: + hd[self.SUBJECT_KEY] = subject if not date and self.DATE_FIELD in headers: - content_doc[self.DATE_KEY] = headers[self.DATE_FIELD] + hd[self.DATE_KEY] = headers[self.DATE_FIELD] else: - content_doc[self.DATE_KEY] = date - - logger.debug('enqueuing message for write') - + hd[self.DATE_KEY] = date + if multi: + hd[self.NUM_PARTS_KEY] = len(msg.get_payload()) + + # body doc + bd[self.CONTENT_HASH_KEY] = chash + bd[self.BODY_KEY] = body + # in an ideal world, we would not need to save a copy of the + # raw message. But we'll keep it until we can be sure that + # we can rebuild the original message from the parts. + bd[self.RAW_KEY] = raw_str + + docs = [fd, hd] + + # attachment docs + if multi: + outer_parts = msg.get_payload() + parts = outer_parts + inner_parts + + # skip first part, we already got it in body + to_attach = ((i, m) for i, m in enumerate(parts) if i > 0) + for index, part_msg in to_attach: + att_doc = self._get_empty_doc(self.ATTACHMENT_DOC) + att_doc[self.PART_NUMBER_KEY] = index + att_doc[self.CONTENT_HASH_KEY] = chash + phash = self._get_hash(part_msg) + att_doc[self.PAYLOAD_HASH_KEY] = phash + att_doc[self.RAW_KEY] = part_msg.as_string() + + # keep a pointer to the payload hash in the + # headers doc, under the parts_map + hd[self.PARTS_MAP_KEY][str(index)] = phash + attaches.append(att_doc) + + # Saving ... ------------------------------- + # ok, there we go... + logger.debug('enqueuing message docs for write') ptuple = SoledadWriterPayload + + # first, regular docs: flags and headers + for doc in docs: + self.soledad_writer.put(ptuple( + mode=ptuple.CREATE, payload=doc)) + # second, try to create body doc. self.soledad_writer.put(ptuple( - mode=ptuple.CREATE, payload=content_doc)) - self.soledad_writer.put(ptuple( - mode=ptuple.CREATE, payload=flags_doc)) + mode=ptuple.BODY_CREATE, payload=bd)) + # and last, but not least, try to create + # attachment docs if not already there. + for at in attaches: + self.soledad_writer.put(ptuple( + mode=ptuple.ATTACHMENT_CREATE, payload=at)) def remove(self, msg): """ @@ -563,8 +1066,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): :param msg: a Leapmessage instance :type msg: LeapMessage """ - # XXX remove - #self._soledad.delete_doc(msg) msg.remove() # getters @@ -596,14 +1097,14 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): :rtype: list of SoledadDocument """ if _type not in fields.__dict__.values(): - raise TypeError("Wrong type passed to get_all") + raise TypeError("Wrong type passed to get_all_docs") if sameProxiedObjects(self._soledad, None): logger.warning('Tried to get messages but soledad is None!') return [] all_docs = [doc for doc in self._soledad.get_from_index( - SoledadBackedAccount.TYPE_MBOX_IDX, + fields.TYPE_MBOX_IDX, _type, self.mbox)] # inneficient, but first let's grok it and then @@ -618,8 +1119,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): """ all_uids = (doc.content[self.UID_KEY] for doc in self._soledad.get_from_index( - SoledadBackedAccount.TYPE_MBOX_IDX, - self.TYPE_FLAGS_VAL, self.mbox)) + fields.TYPE_MBOX_IDX, + fields.TYPE_FLAGS_VAL, self.mbox)) return (u for u in sorted(all_uids)) def count(self): @@ -629,7 +1130,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): :rtype: int """ count = self._soledad.get_count_from_index( - SoledadBackedAccount.TYPE_MBOX_IDX, + fields.TYPE_MBOX_IDX, fields.TYPE_FLAGS_VAL, self.mbox) return count @@ -645,8 +1146,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): """ return (doc.content[self.UID_KEY] for doc in self._soledad.get_from_index( - SoledadBackedAccount.TYPE_MBOX_SEEN_IDX, - self.TYPE_FLAGS_VAL, self.mbox, '0')) + fields.TYPE_MBOX_SEEN_IDX, + fields.TYPE_FLAGS_VAL, self.mbox, '0')) def count_unseen(self): """ @@ -656,8 +1157,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): :rtype: int """ count = self._soledad.get_count_from_index( - SoledadBackedAccount.TYPE_MBOX_SEEN_IDX, - self.TYPE_FLAGS_VAL, self.mbox, '0') + fields.TYPE_MBOX_SEEN_IDX, + fields.TYPE_FLAGS_VAL, self.mbox, '0') return count def get_unseen(self): @@ -681,8 +1182,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): """ return (doc.content[self.UID_KEY] for doc in self._soledad.get_from_index( - SoledadBackedAccount.TYPE_MBOX_RECT_IDX, - self.TYPE_FLAGS_VAL, self.mbox, '1')) + fields.TYPE_MBOX_RECT_IDX, + fields.TYPE_FLAGS_VAL, self.mbox, '1')) def get_recent(self): """ @@ -702,8 +1203,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): :rtype: int """ count = self._soledad.get_count_from_index( - SoledadBackedAccount.TYPE_MBOX_RECT_IDX, - self.TYPE_FLAGS_VAL, self.mbox, '1') + fields.TYPE_MBOX_RECT_IDX, + fields.TYPE_FLAGS_VAL, self.mbox, '1') return count def __len__(self): @@ -731,5 +1232,5 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): return u"<MessageCollection: mbox '%s' (%s)>" % ( self.mbox, self.count()) - # XXX should implement __eq__ also !!! --- use a hash - # of content for that, will be used for dedup. + # XXX should implement __eq__ also !!! + # --- use the content hash for that, will be used for dedup. diff --git a/mail/src/leap/mail/imap/parser.py b/mail/src/leap/mail/imap/parser.py index 1ae19c0..306dcf0 100644 --- a/mail/src/leap/mail/imap/parser.py +++ b/mail/src/leap/mail/imap/parser.py @@ -19,10 +19,14 @@ Mail parser mixins. """ import cStringIO import StringIO +import hashlib import re +from email.message import Message from email.parser import Parser +from leap.common.check import leap_assert_type + class MailParser(object): """ @@ -34,16 +38,30 @@ class MailParser(object): """ self._parser = Parser() - def _get_parsed_msg(self, raw): + def _get_parsed_msg(self, raw, headersonly=False): """ Return a parsed Message. :param raw: the raw string to parse :type raw: basestring, or StringIO object + + :param headersonly: True for parsing only the headers. + :type headersonly: bool """ - msg = self._get_parser_fun(raw)(raw, True) + msg = self._get_parser_fun(raw)(raw, headersonly=headersonly) return msg + def _get_hash(self, msg): + """ + Returns a hash of the string representation of the raw message, + suitable for indexing the inmutable pieces. + + :param msg: a Message object + :type msg: Message + """ + leap_assert_type(msg, Message) + return hashlib.sha256(msg.as_string()).hexdigest() + def _get_parser_fun(self, o): """ Retunn the proper parser function for an object. @@ -67,6 +85,8 @@ class MailParser(object): :param o: object :type o: object """ + # XXX Maybe we don't need no more, we're using + # msg.as_string() if isinstance(o, (cStringIO.OutputType, StringIO.StringIO)): return o.getvalue() else: |