From 62b0cd6301b7097dfa2776b677ab3c7d27f60d7b Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 26 Dec 2013 14:10:14 -0400 Subject: Split the near-2k loc file into more handy modules. ...aaaand not a single fuck was given that day! --- src/leap/mail/imap/messages.py | 735 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 735 insertions(+) create mode 100644 src/leap/mail/imap/messages.py (limited to 'src/leap/mail/imap/messages.py') diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py new file mode 100644 index 0000000..b0d5da2 --- /dev/null +++ b/src/leap/mail/imap/messages.py @@ -0,0 +1,735 @@ +# -*- coding: utf-8 -*- +# messages.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +LeapMessage and MessageCollection. +""" +import copy +import logging +import StringIO +from collections import namedtuple + +from twisted.mail import imap4 +from twisted.python import log +from u1db import errors as u1db_errors +from zope.interface import implements +from zope.proxy import sameProxiedObjects + +from leap.common.check import leap_assert, leap_assert_type +from leap.common.mail import get_email_charset +from leap.mail.decorators import deferred +from leap.mail.imap.account import SoledadBackedAccount +from leap.mail.imap.index import IndexedDB +from leap.mail.imap.fields import fields, WithMsgFields +from leap.mail.imap.parser import MailParser, MBoxParser +from leap.mail.messageflow import IMessageConsumer, MessageProducer + +logger = logging.getLogger(__name__) + + +class LeapMessage(fields, MailParser, MBoxParser): + + implements(imap4.IMessage) + + def __init__(self, soledad, uid, mbox): + """ + Initializes a LeapMessage. + + :param soledad: a Soledad instance + :type soledad: Soledad + :param uid: the UID for the message. + :type uid: int or basestring + :param mbox: the mbox this message belongs to + :type mbox: basestring + """ + MailParser.__init__(self) + self._soledad = soledad + self._uid = int(uid) + self._mbox = self._parse_mailbox_name(mbox) + self._chash = None + + self.__cdoc = None + + @property + def _fdoc(self): + """ + An accessor to the flags document. + """ + return self._get_flags_doc() + + @property + def _cdoc(self): + """ + An accessor to the content document. + """ + if not self.__cdoc: + self.__cdoc = self._get_content_doc() + return self.__cdoc + + @property + def _chash(self): + """ + An accessor to the content hash for this message. + """ + if not self._fdoc: + return None + return self._fdoc.content.get(fields.CONTENT_HASH_KEY, None) + + # IMessage implementation + + def getUID(self): + """ + Retrieve the unique identifier associated with this message + + :return: uid for this message + :rtype: int + """ + return self._uid + + def getFlags(self): + """ + Retrieve the flags associated with this message + + :return: The flags, represented as strings + :rtype: tuple + """ + if self._uid is None: + return [] + + flags = [] + flag_doc = self._fdoc + if flag_doc: + flags = flag_doc.content.get(self.FLAGS_KEY, None) + if flags: + flags = map(str, flags) + return tuple(flags) + + # setFlags, addFlags, removeFlags are not in the interface spec + # but we use them with store command. + + def setFlags(self, flags): + """ + Sets the flags for this message + + Returns a SoledadDocument that needs to be updated by the caller. + + :param flags: the flags to update in the message. + :type flags: tuple of str + + :return: a SoledadDocument instance + :rtype: SoledadDocument + """ + leap_assert(isinstance(flags, tuple), "flags need to be a tuple") + log.msg('setting flags: %s' % (self._uid)) + + doc = self._fdoc + doc.content[self.FLAGS_KEY] = flags + doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags + doc.content[self.RECENT_KEY] = self.RECENT_FLAG in flags + self._soledad.put_doc(doc) + + def addFlags(self, flags): + """ + Adds flags to this message. + + Returns a SoledadDocument that needs to be updated by the caller. + + :param flags: the flags to add to the message. + :type flags: tuple of str + + :return: a SoledadDocument instance + :rtype: SoledadDocument + """ + leap_assert(isinstance(flags, tuple), "flags need to be a tuple") + oldflags = self.getFlags() + self.setFlags(tuple(set(flags + oldflags))) + + def removeFlags(self, flags): + """ + Remove flags from this message. + + Returns a SoledadDocument that needs to be updated by the caller. + + :param flags: the flags to be removed from the message. + :type flags: tuple of str + + :return: a SoledadDocument instance + :rtype: SoledadDocument + """ + leap_assert(isinstance(flags, tuple), "flags need to be a tuple") + oldflags = self.getFlags() + self.setFlags(tuple(set(oldflags) - set(flags))) + + def getInternalDate(self): + """ + Retrieve the date internally associated with this message + + :rtype: C{str} + :return: An RFC822-formatted date string. + """ + return str(self._cdoc.content.get(self.DATE_KEY, '')) + + # + # IMessagePart + # + + # XXX we should implement this interface too for the subparts + # so we allow nested parts... + + def getBodyFile(self): + """ + Retrieve a file object containing only the body of this message. + + :return: file-like object opened for reading + :rtype: StringIO + """ + fd = StringIO.StringIO() + + cdoc = self._cdoc + content = cdoc.content.get(self.RAW_KEY, '') + charset = get_email_charset( + unicode(cdoc.content.get(self.RAW_KEY, ''))) + try: + content = content.encode(charset) + except (UnicodeEncodeError, UnicodeDecodeError) as e: + logger.error("Unicode error {0}".format(e)) + content = content.encode(charset, 'replace') + + raw = self._get_raw_msg() + msg = self._get_parsed_msg(raw) + body = msg.get_payload() + fd.write(body) + # XXX SHOULD use a separate BODY FIELD ... + fd.seek(0) + return fd + + def getSize(self): + """ + Return the total size, in octets, of this message. + + :return: size of the message, in octets + :rtype: int + """ + size = self._cdoc.content.get(self.SIZE_KEY, False) + if not size: + # XXX fallback, should remove when all migrated. + size = self.getBodyFile().len + return size + + def _get_headers(self): + """ + Return the headers dict stored in this message document. + """ + # XXX get from the headers doc + return self._cdoc.content.get(self.HEADERS_KEY, {}) + + def getHeaders(self, negate, *names): + """ + Retrieve a group of message headers. + + :param names: The names of the headers to retrieve or omit. + :type names: tuple of str + + :param negate: If True, indicates that the headers listed in names + should be omitted from the return value, rather + than included. + :type negate: bool + + :return: A mapping of header field names to header field values + :rtype: dict + """ + headers = self._get_headers() + names = map(lambda s: s.upper(), names) + if negate: + cond = lambda key: key.upper() not in names + else: + cond = lambda key: key.upper() in names + + # unpack and filter original dict by negate-condition + filter_by_cond = [ + map(str, (key, val)) for + key, val in headers.items() + if cond(key)] + return dict(filter_by_cond) + + def isMultipart(self): + """ + Return True if this message is multipart. + """ + if self._cdoc: + retval = self._fdoc.content.get(self.MULTIPART_KEY, False) + return retval + + def getSubPart(self, part): + """ + Retrieve a MIME submessage + + :type part: C{int} + :param part: The number of the part to retrieve, indexed from 0. + :raise IndexError: Raised if the specified part does not exist. + :raise TypeError: Raised if this message is not multipart. + :rtype: Any object implementing C{IMessagePart}. + :return: The specified sub-part. + """ + if not self.isMultipart(): + raise TypeError + + msg = self._get_parsed_msg() + # XXX should wrap IMessagePart + return msg.get_payload()[part] + + # + # accessors + # + + def _get_flags_doc(self): + """ + Return the document that keeps the flags for this + message. + """ + flag_docs = self._soledad.get_from_index( + SoledadBackedAccount.TYPE_MBOX_UID_IDX, + fields.TYPE_FLAGS_VAL, self._mbox, str(self._uid)) + flag_doc = flag_docs[0] if flag_docs else None + return flag_doc + + def _get_content_doc(self): + """ + Return the document that keeps the flags for this + message. + """ + cont_docs = self._soledad.get_from_index( + SoledadBackedAccount.TYPE_HASH_IDX, + fields.TYPE_MESSAGE_VAL, self._content_hash, str(self._uid)) + cont_doc = cont_docs[0] if cont_docs else None + return cont_doc + + def _get_raw_msg(self): + """ + Return the raw msg. + :rtype: basestring + """ + return self._cdoc.content.get(self.RAW_KEY, '') + + def __getitem__(self, key): + """ + Return the content of the message document. + + :param key: The key + :type key: str + + :return: The content value indexed by C{key} or None + :rtype: str + """ + return self._cdoc.content.get(key, None) + + def does_exist(self): + """ + Return True if there is actually a message for this + UID and mbox. + """ + return bool(self._fdoc) + + +SoledadWriterPayload = namedtuple( + 'SoledadWriterPayload', ['mode', 'payload']) + +SoledadWriterPayload.CREATE = 1 +SoledadWriterPayload.PUT = 2 + + +class SoledadDocWriter(object): + """ + This writer will create docs serially in the local soledad database. + """ + + implements(IMessageConsumer) + + def __init__(self, soledad): + """ + Initialize the writer. + + :param soledad: the soledad instance + :type soledad: Soledad + """ + self._soledad = soledad + + def consume(self, queue): + """ + Creates a new document in soledad db. + + :param queue: queue to get item from, with content of the document + to be inserted. + :type queue: Queue + """ + empty = queue.empty() + while not empty: + item = queue.get() + if item.mode == SoledadWriterPayload.CREATE: + call = self._soledad.create_doc + elif item.mode == SoledadWriterPayload.PUT: + call = self._soledad.put_doc + + # should handle errors + try: + call(item.payload) + except u1db_errors.RevisionConflict as exc: + logger.error("Error: %r" % (exc,)) + raise exc + + empty = queue.empty() + + +class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): + """ + A collection of messages, surprisingly. + + It is tied to a selected mailbox name that is passed to constructor. + Implements a filter query over the messages contained in a soledad + database. + """ + # XXX this should be able to produce a MessageSet methinks + + EMPTY_MSG = { + fields.TYPE_KEY: fields.TYPE_MESSAGE_VAL, + fields.UID_KEY: 1, + fields.MBOX_KEY: fields.INBOX_VAL, + + fields.SUBJECT_KEY: "", + fields.DATE_KEY: "", + fields.RAW_KEY: "", + + # XXX should separate headers into another doc + fields.HEADERS_KEY: {}, + } + + EMPTY_FLAGS = { + fields.TYPE_KEY: fields.TYPE_FLAGS_VAL, + fields.UID_KEY: 1, + fields.MBOX_KEY: fields.INBOX_VAL, + + fields.FLAGS_KEY: [], + fields.SEEN_KEY: False, + fields.RECENT_KEY: True, + fields.MULTIPART_KEY: False, + } + + # get from SoledadBackedAccount the needed index-related constants + INDEXES = SoledadBackedAccount.INDEXES + TYPE_IDX = SoledadBackedAccount.TYPE_IDX + + def __init__(self, mbox=None, soledad=None): + """ + Constructor for MessageCollection. + + :param mbox: the name of the mailbox. It is the name + with which we filter the query over the + messages database + :type mbox: str + + :param soledad: Soledad database + :type soledad: Soledad instance + """ + MailParser.__init__(self) + leap_assert(mbox, "Need a mailbox name to initialize") + leap_assert(mbox.strip() != "", "mbox cannot be blank space") + leap_assert(isinstance(mbox, (str, unicode)), + "mbox needs to be a string") + leap_assert(soledad, "Need a soledad instance to initialize") + + # okay, all in order, keep going... + self.mbox = self._parse_mailbox_name(mbox) + self._soledad = soledad + self.initialize_db() + + # I think of someone like nietzsche when reading this + + # this will be the producer that will enqueue the content + # to be processed serially by the consumer (the writer). We just + # need to `put` the new material on its plate. + + self.soledad_writer = MessageProducer( + SoledadDocWriter(soledad), + period=0.05) + + def _get_empty_msg(self): + """ + Returns an empty message. + + :return: a dict containing a default empty message + :rtype: dict + """ + return copy.deepcopy(self.EMPTY_MSG) + + def _get_empty_flags_doc(self): + """ + Returns an empty doc for storing flags. + + :return: + :rtype: + """ + return copy.deepcopy(self.EMPTY_FLAGS) + + @deferred + def add_msg(self, raw, subject=None, flags=None, date=None, uid=1): + """ + Creates a new message document. + + :param raw: the raw message + :type raw: str + + :param subject: subject of the message. + :type subject: str + + :param flags: flags + :type flags: list + + :param date: the received date for the message + :type date: str + + :param uid: the message uid for this mailbox + :type uid: int + """ + # TODO: split in smaller methods + logger.debug('adding message') + if flags is None: + flags = tuple() + leap_assert_type(flags, tuple) + + content_doc = self._get_empty_msg() + flags_doc = self._get_empty_flags_doc() + + content_doc[self.MBOX_KEY] = self.mbox + flags_doc[self.MBOX_KEY] = self.mbox + # ...should get a sanity check here. + content_doc[self.UID_KEY] = uid + flags_doc[self.UID_KEY] = uid + + if flags: + flags_doc[self.FLAGS_KEY] = map(self._stringify, flags) + flags_doc[self.SEEN_KEY] = self.SEEN_FLAG in flags + + msg = self._get_parsed_msg(raw) + headers = dict(msg) + + logger.debug("adding. is multipart:%s" % msg.is_multipart()) + flags_doc[self.MULTIPART_KEY] = msg.is_multipart() + # XXX get lower case for keys? + # XXX get headers doc + content_doc[self.HEADERS_KEY] = headers + # set subject based on message headers and eventually replace by + # subject given as param + if self.SUBJECT_FIELD in headers: + content_doc[self.SUBJECT_KEY] = headers[self.SUBJECT_FIELD] + if subject is not None: + content_doc[self.SUBJECT_KEY] = subject + + # XXX could separate body into its own doc + # but should also separate multiparts + # that should be wrapped in MessagePart + content_doc[self.RAW_KEY] = self._stringify(raw) + content_doc[self.SIZE_KEY] = len(raw) + + if not date and self.DATE_FIELD in headers: + content_doc[self.DATE_KEY] = headers[self.DATE_FIELD] + else: + content_doc[self.DATE_KEY] = date + + logger.debug('enqueuing message for write') + + ptuple = SoledadWriterPayload + self.soledad_writer.put(ptuple( + mode=ptuple.CREATE, payload=content_doc)) + self.soledad_writer.put(ptuple( + mode=ptuple.CREATE, payload=flags_doc)) + + def remove(self, msg): + """ + Removes a message. + + :param msg: a Leapmessage instance + :type msg: LeapMessage + """ + # XXX remove + #self._soledad.delete_doc(msg) + msg.remove() + + # getters + + def get_msg_by_uid(self, uid): + """ + Retrieves a LeapMessage by UID. + + :param uid: the message uid to query by + :type uid: int + + :return: A LeapMessage instance matching the query, + or None if not found. + :rtype: LeapMessage + """ + msg = LeapMessage(self._soledad, uid, self.mbox) + if not msg.does_exist(): + return None + return msg + + def get_all_docs(self, _type=fields.TYPE_FLAGS_VAL): + """ + Get all documents for the selected mailbox of the + passed type. By default, it returns the flag docs. + + If you want acess to the content, use __iter__ instead + + :return: a list of u1db documents + :rtype: list of SoledadDocument + """ + if _type not in fields.__dict__.values(): + raise TypeError("Wrong type passed to get_all") + + if sameProxiedObjects(self._soledad, None): + logger.warning('Tried to get messages but soledad is None!') + return [] + + all_docs = [doc for doc in self._soledad.get_from_index( + SoledadBackedAccount.TYPE_MBOX_IDX, + _type, self.mbox)] + + # inneficient, but first let's grok it and then + # let's worry about efficiency. + # XXX FIXINDEX -- should implement order by in soledad + return sorted(all_docs, key=lambda item: item.content['uid']) + + def all_msg_iter(self): + """ + Return an iterator trhough the UIDs of all messages, sorted in + ascending order. + """ + all_uids = (doc.content[self.UID_KEY] for doc in + self._soledad.get_from_index( + SoledadBackedAccount.TYPE_MBOX_IDX, + self.TYPE_FLAGS_VAL, self.mbox)) + return (u for u in sorted(all_uids)) + + def count(self): + """ + Return the count of messages for this mailbox. + + :rtype: int + """ + count = self._soledad.get_count_from_index( + SoledadBackedAccount.TYPE_MBOX_IDX, + fields.TYPE_FLAGS_VAL, self.mbox) + return count + + # unseen messages + + def unseen_iter(self): + """ + Get an iterator for the message UIDs with no `seen` flag + for this mailbox. + + :return: iterator through unseen message doc UIDs + :rtype: iterable + """ + return (doc.content[self.UID_KEY] for doc in + self._soledad.get_from_index( + SoledadBackedAccount.TYPE_MBOX_SEEN_IDX, + self.TYPE_FLAGS_VAL, self.mbox, '0')) + + def count_unseen(self): + """ + Count all messages with the `Unseen` flag. + + :returns: count + :rtype: int + """ + count = self._soledad.get_count_from_index( + SoledadBackedAccount.TYPE_MBOX_SEEN_IDX, + self.TYPE_FLAGS_VAL, self.mbox, '0') + return count + + def get_unseen(self): + """ + Get all messages with the `Unseen` flag + + :returns: a list of LeapMessages + :rtype: list + """ + return [LeapMessage(self._soledad, docid, self.mbox) + for docid in self.unseen_iter()] + + # recent messages + + def recent_iter(self): + """ + Get an iterator for the message docs with `recent` flag. + + :return: iterator through recent message docs + :rtype: iterable + """ + return (doc.content[self.UID_KEY] for doc in + self._soledad.get_from_index( + SoledadBackedAccount.TYPE_MBOX_RECT_IDX, + self.TYPE_FLAGS_VAL, self.mbox, '1')) + + def get_recent(self): + """ + Get all messages with the `Recent` flag. + + :returns: a list of LeapMessages + :rtype: list + """ + return [LeapMessage(self._soledad, docid, self.mbox) + for docid in self.recent_iter()] + + def count_recent(self): + """ + Count all messages with the `Recent` flag. + + :returns: count + :rtype: int + """ + count = self._soledad.get_count_from_index( + SoledadBackedAccount.TYPE_MBOX_RECT_IDX, + self.TYPE_FLAGS_VAL, self.mbox, '1') + return count + + def __len__(self): + """ + Returns the number of messages on this mailbox. + + :rtype: int + """ + return self.count() + + def __iter__(self): + """ + Returns an iterator over all messages. + + :returns: iterator of dicts with content for all messages. + :rtype: iterable + """ + return (LeapMessage(self._soledad, docuid, self.mbox) + for docuid in self.all_msg_iter()) + + def __repr__(self): + """ + Representation string for this object. + """ + return u"" % ( + self.mbox, self.count()) + + # XXX should implement __eq__ also !!! --- use a hash + # of content for that, will be used for dedup. -- cgit v1.2.3 From 25a0aea875fd0d67238beed1237f7239474673ec Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 27 Dec 2013 02:06:44 -0400 Subject: First stage of the storage schema rewrite. * Separates between flags, docs, body and attachment docs. * Implement IMessageCopier interface: move and have fun! This little change is known to push forward our beloved architect emotional rollercoster. * Message deduplication. * It also fixes a hidden bug that was rendering the multipart mime interface useless (yes, the "True" parameter in the parsestr method). * Does not handle well nested attachs, includes dirty workaround that flattens them. * Includes chiiph's patch for rc2: * return deferred from addMessage * convert StringIO types to string * remove unneeded yields from the chain of deferreds in fetcher --- src/leap/mail/imap/messages.py | 831 +++++++++++++++++++++++++++++++++-------- 1 file changed, 666 insertions(+), 165 deletions(-) (limited to 'src/leap/mail/imap/messages.py') diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py index b0d5da2..c69c023 100644 --- a/src/leap/mail/imap/messages.py +++ b/src/leap/mail/imap/messages.py @@ -29,9 +29,9 @@ from zope.interface import implements from zope.proxy import sameProxiedObjects from leap.common.check import leap_assert, leap_assert_type +from leap.common.decorators import memoized_method from leap.common.mail import get_email_charset from leap.mail.decorators import deferred -from leap.mail.imap.account import SoledadBackedAccount from leap.mail.imap.index import IndexedDB from leap.mail.imap.fields import fields, WithMsgFields from leap.mail.imap.parser import MailParser, MBoxParser @@ -40,6 +40,181 @@ from leap.mail.messageflow import IMessageConsumer, MessageProducer logger = logging.getLogger(__name__) +def first(things): + """ + Return the head of a collection. + """ + try: + return things[0] + except (IndexError, TypeError): + return None + + +class MessageBody(object): + """ + IMessagePart implementor for the main + body of a multipart message. + + Excusatio non petita: see the interface documentation. + """ + + implements(imap4.IMessagePart) + + def __init__(self, fdoc, bdoc): + self._fdoc = fdoc + self._bdoc = bdoc + + def getSize(self): + return len(self._bdoc.content[fields.BODY_KEY]) + + def getBodyFile(self): + fd = StringIO.StringIO() + + if self._bdoc: + body = self._bdoc.content[fields.BODY_KEY] + else: + body = "" + charset = self._get_charset(body) + try: + body = body.encode(charset) + except (UnicodeEncodeError, UnicodeDecodeError) as e: + logger.error("Unicode error {0}".format(e)) + body = body.encode(charset, 'replace') + fd.write(body) + fd.seek(0) + return fd + + @memoized_method + def _get_charset(self, stuff): + return get_email_charset(unicode(stuff)) + + def getHeaders(self, negate, *names): + return {} + + def isMultipart(self): + return False + + def getSubPart(self, part): + return None + + +class MessageAttachment(object): + + implements(imap4.IMessagePart) + + def __init__(self, msg): + """ + Initializes the messagepart with a Message instance. + :param msg: a message instance + :type msg: Message + """ + self._msg = msg + + def getSize(self): + """ + Return the total size, in octets, of this message part. + + :return: size of the message, in octets + :rtype: int + """ + if not self._msg: + return 0 + return len(self._msg.as_string()) + + def getBodyFile(self): + """ + Retrieve a file object containing only the body of this message. + + :return: file-like object opened for reading + :rtype: StringIO + """ + fd = StringIO.StringIO() + if self._msg: + body = self._msg.get_payload() + else: + logger.debug("Empty message!") + body = "" + + # XXX should only do the dance if we're sure it's + # content/text-plain!!! + #charset = self._get_charset(body) + #try: + #body = body.encode(charset) + #except (UnicodeEncodeError, UnicodeDecodeError) as e: + #logger.error("Unicode error {0}".format(e)) + #body = body.encode(charset, 'replace') + fd.write(body) + fd.seek(0) + return fd + + @memoized_method + def _get_charset(self, stuff): + # TODO put in a common class with LeapMessage + """ + Gets (guesses?) the charset of a payload. + + :param stuff: the stuff to guess about. + :type stuff: basestring + :returns: charset + """ + # XXX existential doubt 1. wouldn't be smarter to + # peek into the mail headers? + # XXX existential doubt 2. shouldn't we make the scope + # of the decorator somewhat more persistent? + # ah! yes! and put memory bounds. + return get_email_charset(unicode(stuff)) + + def getHeaders(self, negate, *names): + """ + Retrieve a group of message headers. + + :param names: The names of the headers to retrieve or omit. + :type names: tuple of str + + :param negate: If True, indicates that the headers listed in names + should be omitted from the return value, rather + than included. + :type negate: bool + + :return: A mapping of header field names to header field values + :rtype: dict + """ + if not self._msg: + return {} + headers = dict(self._msg.items()) + names = map(lambda s: s.upper(), names) + if negate: + cond = lambda key: key.upper() not in names + else: + cond = lambda key: key.upper() in names + + # unpack and filter original dict by negate-condition + filter_by_cond = [ + map(str, (key, val)) for + key, val in headers.items() + if cond(key)] + return dict(filter_by_cond) + + def isMultipart(self): + """ + Return True if this message is multipart. + """ + return self._msg.is_multipart() + + def getSubPart(self, part): + """ + Retrieve a MIME submessage + + :type part: C{int} + :param part: The number of the part to retrieve, indexed from 0. + :raise IndexError: Raised if the specified part does not exist. + :raise TypeError: Raised if this message is not multipart. + :rtype: Any object implementing C{IMessagePart}. + :return: The specified sub-part. + """ + return self._msg.get_payload() + + class LeapMessage(fields, MailParser, MBoxParser): implements(imap4.IMessage) @@ -59,25 +234,21 @@ class LeapMessage(fields, MailParser, MBoxParser): self._soledad = soledad self._uid = int(uid) self._mbox = self._parse_mailbox_name(mbox) - self._chash = None - self.__cdoc = None + self.__chash = None + self.__bdoc = None @property def _fdoc(self): """ An accessor to the flags document. """ - return self._get_flags_doc() - - @property - def _cdoc(self): - """ - An accessor to the content document. - """ - if not self.__cdoc: - self.__cdoc = self._get_content_doc() - return self.__cdoc + if all(map(bool, (self._uid, self._mbox))): + fdoc = self._get_flags_doc() + if fdoc: + self.__chash = fdoc.content.get( + fields.CONTENT_HASH_KEY, None) + return fdoc @property def _chash(self): @@ -86,7 +257,26 @@ class LeapMessage(fields, MailParser, MBoxParser): """ if not self._fdoc: return None - return self._fdoc.content.get(fields.CONTENT_HASH_KEY, None) + if not self.__chash and self._fdoc: + self.__chash = self._fdoc.content.get( + fields.CONTENT_HASH_KEY, None) + return self.__chash + + @property + def _hdoc(self): + """ + An accessor to the headers document. + """ + return self._get_headers_doc() + + @property + def _bdoc(self): + """ + An accessor to the body document. + """ + if not self.__bdoc: + self.__bdoc = self._get_body_doc() + return self.__bdoc # IMessage implementation @@ -110,9 +300,9 @@ class LeapMessage(fields, MailParser, MBoxParser): return [] flags = [] - flag_doc = self._fdoc - if flag_doc: - flags = flag_doc.content.get(self.FLAGS_KEY, None) + fdoc = self._fdoc + if fdoc: + flags = fdoc.content.get(self.FLAGS_KEY, None) if flags: flags = map(str, flags) return tuple(flags) @@ -180,7 +370,7 @@ class LeapMessage(fields, MailParser, MBoxParser): :rtype: C{str} :return: An RFC822-formatted date string. """ - return str(self._cdoc.content.get(self.DATE_KEY, '')) + return str(self._hdoc.content.get(self.DATE_KEY, '')) # # IMessagePart @@ -197,25 +387,38 @@ class LeapMessage(fields, MailParser, MBoxParser): :rtype: StringIO """ fd = StringIO.StringIO() + bdoc = self._bdoc + if bdoc: + body = self._bdoc.content.get(self.BODY_KEY, "") + else: + body = "" - cdoc = self._cdoc - content = cdoc.content.get(self.RAW_KEY, '') - charset = get_email_charset( - unicode(cdoc.content.get(self.RAW_KEY, ''))) + charset = self._get_charset(body) try: - content = content.encode(charset) + body = body.encode(charset) except (UnicodeEncodeError, UnicodeDecodeError) as e: logger.error("Unicode error {0}".format(e)) - content = content.encode(charset, 'replace') - - raw = self._get_raw_msg() - msg = self._get_parsed_msg(raw) - body = msg.get_payload() + body = body.encode(charset, 'replace') fd.write(body) - # XXX SHOULD use a separate BODY FIELD ... fd.seek(0) return fd + @memoized_method + def _get_charset(self, stuff): + """ + Gets (guesses?) the charset of a payload. + + :param stuff: the stuff to guess about. + :type stuff: basestring + :returns: charset + """ + # XXX existential doubt 1. wouldn't be smarter to + # peek into the mail headers? + # XXX existential doubt 2. shouldn't we make the scope + # of the decorator somewhat more persistent? + # ah! yes! and put memory bounds. + return get_email_charset(unicode(stuff)) + def getSize(self): """ Return the total size, in octets, of this message. @@ -223,19 +426,17 @@ class LeapMessage(fields, MailParser, MBoxParser): :return: size of the message, in octets :rtype: int """ - size = self._cdoc.content.get(self.SIZE_KEY, False) + size = None + if self._fdoc: + size = self._fdoc.content.get(self.SIZE_KEY, False) + else: + logger.warning("No FLAGS doc for %s:%s" % (self._mbox, + self._uid)) if not size: # XXX fallback, should remove when all migrated. size = self.getBodyFile().len return size - def _get_headers(self): - """ - Return the headers dict stored in this message document. - """ - # XXX get from the headers doc - return self._cdoc.content.get(self.HEADERS_KEY, {}) - def getHeaders(self, negate, *names): """ Retrieve a group of message headers. @@ -252,26 +453,49 @@ class LeapMessage(fields, MailParser, MBoxParser): :rtype: dict """ headers = self._get_headers() + if not headers: + return {'content-type': ''} names = map(lambda s: s.upper(), names) if negate: cond = lambda key: key.upper() not in names else: cond = lambda key: key.upper() in names + head = copy.deepcopy(dict(headers.items())) + + # twisted imap server expects headers to be lowercase + head = dict( + map(str, (key, value)) if key.lower() != "content-type" + else map(str, (key.lower(), value)) + for (key, value) in head.items()) + # unpack and filter original dict by negate-condition - filter_by_cond = [ - map(str, (key, val)) for - key, val in headers.items() - if cond(key)] + filter_by_cond = [(key, val) for key, val in head.items() if cond(key)] return dict(filter_by_cond) + def _get_headers(self): + """ + Return the headers dict for this message. + """ + if self._hdoc is not None: + return self._hdoc.content.get(self.HEADERS_KEY, {}) + else: + logger.warning( + "No HEADERS doc for msg %s:%s" % ( + self._mbox, + self._uid)) + def isMultipart(self): """ Return True if this message is multipart. """ - if self._cdoc: - retval = self._fdoc.content.get(self.MULTIPART_KEY, False) - return retval + if self._fdoc: + return self._fdoc.content.get(self.MULTIPART_KEY, False) + else: + logger.warning( + "No FLAGS doc for msg %s:%s" % ( + self.mbox, + self.uid)) def getSubPart(self, part): """ @@ -284,12 +508,22 @@ class LeapMessage(fields, MailParser, MBoxParser): :rtype: Any object implementing C{IMessagePart}. :return: The specified sub-part. """ + logger.debug("Getting subpart: %s" % part) if not self.isMultipart(): raise TypeError - msg = self._get_parsed_msg() - # XXX should wrap IMessagePart - return msg.get_payload()[part] + if part == 0: + # Let's get the first part, which + # is really the body. + return MessageBody(self._fdoc, self._bdoc) + + attach_doc = self._get_attachment_doc(part) + if not attach_doc: + # so long and thanks for all the fish + logger.debug("...not today") + raise IndexError + msg_part = self._get_parsed_msg(attach_doc.content[self.RAW_KEY]) + return MessageAttachment(msg_part) # # accessors @@ -301,32 +535,87 @@ class LeapMessage(fields, MailParser, MBoxParser): message. """ flag_docs = self._soledad.get_from_index( - SoledadBackedAccount.TYPE_MBOX_UID_IDX, + fields.TYPE_MBOX_UID_IDX, fields.TYPE_FLAGS_VAL, self._mbox, str(self._uid)) - flag_doc = flag_docs[0] if flag_docs else None - return flag_doc + return first(flag_docs) - def _get_content_doc(self): + def _get_headers_doc(self): """ - Return the document that keeps the flags for this + Return the document that keeps the headers for this + message. + """ + head_docs = self._soledad.get_from_index( + fields.TYPE_C_HASH_IDX, + fields.TYPE_HEADERS_VAL, str(self._chash)) + return first(head_docs) + + def _get_body_doc(self): + """ + Return the document that keeps the body for this message. """ - cont_docs = self._soledad.get_from_index( - SoledadBackedAccount.TYPE_HASH_IDX, - fields.TYPE_MESSAGE_VAL, self._content_hash, str(self._uid)) - cont_doc = cont_docs[0] if cont_docs else None - return cont_doc + body_docs = self._soledad.get_from_index( + fields.TYPE_C_HASH_IDX, + fields.TYPE_MESSAGE_VAL, str(self._chash)) + return first(body_docs) + + def _get_num_parts(self): + """ + Return the number of parts for a multipart message. + """ + if not self.isMultipart(): + raise TypeError( + "Tried to get num parts in a non-multipart message") + if not self._hdoc: + return None + return self._hdoc.content.get(fields.NUM_PARTS_KEY, 2) + + def _get_attachment_doc(self, part): + """ + Return the document that keeps the headers for this + message. + + :param part: the part number for the multipart message. + :type part: int + """ + if not self._hdoc: + return None + try: + phash = self._hdoc.content[self.PARTS_MAP_KEY][str(part)] + except KeyError: + # this is the remnant of a debug session until + # I found that the index is actually a string... + # It should be safe to just raise the KeyError now, + # but leaving it here while the blood is fresh... + logger.warning("We expected a phash in the " + "index %s, but noone found" % (part, )) + logger.debug(self._hdoc.content[self.PARTS_MAP_KEY]) + return None + attach_docs = self._soledad.get_from_index( + fields.TYPE_P_HASH_IDX, + fields.TYPE_ATTACHMENT_VAL, str(phash)) + + # The following is true for the fist owner. + # We could use this relationship to flag the "owner" + # and orphan when we delete it. + + #attach_docs = self._soledad.get_from_index( + #fields.TYPE_C_HASH_PART_IDX, + #fields.TYPE_ATTACHMENT_VAL, str(self._chash), str(part)) + return first(attach_docs) def _get_raw_msg(self): """ Return the raw msg. :rtype: basestring """ - return self._cdoc.content.get(self.RAW_KEY, '') + # TODO deprecate this. + return self._bdoc.content.get(self.RAW_KEY, '') def __getitem__(self, key): """ - Return the content of the message document. + Return an item from the content of the flags document, + for convenience. :param key: The key :type key: str @@ -334,14 +623,73 @@ class LeapMessage(fields, MailParser, MBoxParser): :return: The content value indexed by C{key} or None :rtype: str """ - return self._cdoc.content.get(key, None) + return self._fdoc.content.get(key, None) + + # setters + + # XXX to be used in the messagecopier interface?! + + def set_uid(self, uid): + """ + Set new uid for this message. + + :param uid: the new uid + :type uid: basestring + """ + # XXX dangerous! lock? + self._uid = uid + d = self._fdoc + d.content[self.UID_KEY] = uid + self._soledad.put_doc(d) + + def set_mbox(self, mbox): + """ + Set new mbox for this message. + + :param mbox: the new mbox + :type mbox: basestring + """ + # XXX dangerous! lock? + self._mbox = mbox + d = self._fdoc + d.content[self.MBOX_KEY] = mbox + self._soledad.put_doc(d) + + # destructor + + @deferred + def remove(self): + """ + Remove all docs associated with this message. + """ + # XXX this would ve more efficient if we can just pass + # a sequence of uids. + + # XXX For the moment we are only removing the flags and headers + # docs. The rest we leave there polluting your hard disk, + # until we think about a good way of deorphaning. + # Maybe a crawler of unreferenced docs. + + fd = self._get_flags_doc() + hd = self._get_headers_doc() + #bd = self._get_body_doc() + #docs = [fd, hd, bd] + + docs = [fd, hd] + + #for pn in range(self._get_num_parts()[1:]): + #ad = self._get_attachment_doc(pn) + #docs.append(ad) + + for d in filter(None, docs): + self._soledad.delete_doc(d) def does_exist(self): """ - Return True if there is actually a message for this + Return True if there is actually a flags message for this UID and mbox. """ - return bool(self._fdoc) + return self._fdoc is not None SoledadWriterPayload = namedtuple( @@ -349,6 +697,8 @@ SoledadWriterPayload = namedtuple( SoledadWriterPayload.CREATE = 1 SoledadWriterPayload.PUT = 2 +SoledadWriterPayload.BODY_CREATE = 3 +SoledadWriterPayload.ATTACHMENT_CREATE = 4 class SoledadDocWriter(object): @@ -378,20 +728,98 @@ class SoledadDocWriter(object): empty = queue.empty() while not empty: item = queue.get() + call = None + payload = item.payload + if item.mode == SoledadWriterPayload.CREATE: call = self._soledad.create_doc + elif item.mode == SoledadWriterPayload.BODY_CREATE: + if not self._body_does_exist(payload): + call = self._soledad.create_doc + elif item.mode == SoledadWriterPayload.ATTACHMENT_CREATE: + if not self._attachment_does_exist(payload): + call = self._soledad.create_doc elif item.mode == SoledadWriterPayload.PUT: call = self._soledad.put_doc - # should handle errors - try: - call(item.payload) - except u1db_errors.RevisionConflict as exc: - logger.error("Error: %r" % (exc,)) - raise exc + # XXX delete? + + if call: + # should handle errors + try: + call(item.payload) + except u1db_errors.RevisionConflict as exc: + logger.error("Error: %r" % (exc,)) + raise exc empty = queue.empty() + """ + Message deduplication. + + We do a query for the content hashes before writing to our beloved + slcipher backend of Soledad. This means, by now, that: + + 1. We will not store the same attachment twice, only the hash of it. + 2. We will not store the same message body twice, only the hash of it. + + The first case is useful if you are always receiving the same old memes + from unwary friends that still have not discovered that 4chan is the + generator of the internet. The second will save your day if you have + initiated session with the same account in two different machines. I also + wonder why would you do that, but let's respect each other choices, like + with the religious celebrations, and assume that one day we'll be able + to run Bitmask in completely free phones. Yes, I mean that, the whole GSM + Stack. + """ + + def _body_does_exist(self, doc): + """ + Check whether we already have a body payload with this hash in our + database. + + :param doc: tentative body document + :type doc: dict + :returns: True if that happens, False otherwise. + """ + if not doc: + return False + chash = doc[fields.CONTENT_HASH_KEY] + body_docs = self._soledad.get_from_index( + fields.TYPE_C_HASH_IDX, + fields.TYPE_MESSAGE_VAL, str(chash)) + if not body_docs: + return False + if len(body_docs) != 1: + logger.warning("Found more than one copy of chash %s!" + % (chash,)) + logger.debug("Found body doc with that hash! Skipping save!") + return True + + def _attachment_does_exist(self, doc): + """ + Check whether we already have an attachment payload with this hash + in our database. + + :param doc: tentative body document + :type doc: dict + :returns: True if that happens, False otherwise. + """ + if not doc: + return False + phash = doc[fields.PAYLOAD_HASH_KEY] + attach_docs = self._soledad.get_from_index( + fields.TYPE_P_HASH_IDX, + fields.TYPE_ATTACHMENT_VAL, str(phash)) + if not attach_docs: + return False + + if len(attach_docs) != 1: + logger.warning("Found more than one copy of phash %s!" + % (phash,)) + logger.debug("Found attachment doc with that hash! Skipping save!") + return True + class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): """ @@ -402,35 +830,62 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): database. """ # XXX this should be able to produce a MessageSet methinks - - EMPTY_MSG = { - fields.TYPE_KEY: fields.TYPE_MESSAGE_VAL, - fields.UID_KEY: 1, - fields.MBOX_KEY: fields.INBOX_VAL, - - fields.SUBJECT_KEY: "", - fields.DATE_KEY: "", - fields.RAW_KEY: "", - - # XXX should separate headers into another doc - fields.HEADERS_KEY: {}, + # could validate these kinds of objects turning them + # into a template for the class. + FLAGS_DOC = "FLAGS" + HEADERS_DOC = "HEADERS" + ATTACHMENT_DOC = "ATTACHMENT" + BODY_DOC = "BODY" + + templates = { + + FLAGS_DOC: { + fields.TYPE_KEY: fields.TYPE_FLAGS_VAL, + fields.UID_KEY: 1, + fields.MBOX_KEY: fields.INBOX_VAL, + + fields.SEEN_KEY: False, + fields.RECENT_KEY: True, + fields.FLAGS_KEY: [], + fields.MULTIPART_KEY: False, + fields.SIZE_KEY: 0 + }, + + HEADERS_DOC: { + fields.TYPE_KEY: fields.TYPE_HEADERS_VAL, + fields.CONTENT_HASH_KEY: "", + + fields.HEADERS_KEY: {}, + fields.NUM_PARTS_KEY: 0, + fields.PARTS_MAP_KEY: {}, + fields.DATE_KEY: "", + fields.SUBJECT_KEY: "" + }, + + ATTACHMENT_DOC: { + fields.TYPE_KEY: fields.TYPE_ATTACHMENT_VAL, + fields.PART_NUMBER_KEY: 0, + fields.CONTENT_HASH_KEY: "", + fields.PAYLOAD_HASH_KEY: "", + + fields.RAW_KEY: "" + }, + + BODY_DOC: { + fields.TYPE_KEY: fields.TYPE_MESSAGE_VAL, + fields.CONTENT_HASH_KEY: "", + + fields.BODY_KEY: "", + + # this should not be needed, + # but let's keep the raw msg for some time + # until we are sure we can reconstruct + # the original msg from our disection. + fields.RAW_KEY: "", + + } } - EMPTY_FLAGS = { - fields.TYPE_KEY: fields.TYPE_FLAGS_VAL, - fields.UID_KEY: 1, - fields.MBOX_KEY: fields.INBOX_VAL, - - fields.FLAGS_KEY: [], - fields.SEEN_KEY: False, - fields.RECENT_KEY: True, - fields.MULTIPART_KEY: False, - } - - # get from SoledadBackedAccount the needed index-related constants - INDEXES = SoledadBackedAccount.INDEXES - TYPE_IDX = SoledadBackedAccount.TYPE_IDX - def __init__(self, mbox=None, soledad=None): """ Constructor for MessageCollection. @@ -465,23 +920,16 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): SoledadDocWriter(soledad), period=0.05) - def _get_empty_msg(self): + def _get_empty_doc(self, _type=FLAGS_DOC): """ - Returns an empty message. - - :return: a dict containing a default empty message + Returns an empty doc for storing different message parts. + Defaults to returning a template for a flags document. + :return: a dict with the template :rtype: dict """ - return copy.deepcopy(self.EMPTY_MSG) - - def _get_empty_flags_doc(self): - """ - Returns an empty doc for storing flags. - - :return: - :rtype: - """ - return copy.deepcopy(self.EMPTY_FLAGS) + if not _type in self.templates.keys(): + raise TypeError("Improper type passed to _get_empty_doc") + return copy.deepcopy(self.templates[_type]) @deferred def add_msg(self, raw, subject=None, flags=None, date=None, uid=1): @@ -509,52 +957,107 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): flags = tuple() leap_assert_type(flags, tuple) - content_doc = self._get_empty_msg() - flags_doc = self._get_empty_flags_doc() - - content_doc[self.MBOX_KEY] = self.mbox - flags_doc[self.MBOX_KEY] = self.mbox - # ...should get a sanity check here. - content_doc[self.UID_KEY] = uid - flags_doc[self.UID_KEY] = uid - - if flags: - flags_doc[self.FLAGS_KEY] = map(self._stringify, flags) - flags_doc[self.SEEN_KEY] = self.SEEN_FLAG in flags + # docs for flags, headers, and body + fd, hd, bd = map( + lambda t: self._get_empty_doc(t), + (self.FLAGS_DOC, self.HEADERS_DOC, self.BODY_DOC)) msg = self._get_parsed_msg(raw) headers = dict(msg) - - logger.debug("adding. is multipart:%s" % msg.is_multipart()) - flags_doc[self.MULTIPART_KEY] = msg.is_multipart() - # XXX get lower case for keys? - # XXX get headers doc - content_doc[self.HEADERS_KEY] = headers - # set subject based on message headers and eventually replace by - # subject given as param - if self.SUBJECT_FIELD in headers: - content_doc[self.SUBJECT_KEY] = headers[self.SUBJECT_FIELD] - if subject is not None: - content_doc[self.SUBJECT_KEY] = subject - - # XXX could separate body into its own doc - # but should also separate multiparts - # that should be wrapped in MessagePart - content_doc[self.RAW_KEY] = self._stringify(raw) - content_doc[self.SIZE_KEY] = len(raw) - + raw_str = msg.as_string() + chash = self._get_hash(msg) + multi = msg.is_multipart() + + attaches = [] + inner_parts = [] + + if multi: + # XXX should walk down recursively + # in a better way. but fixing this quick + # to have an rc. + # XXX should pick the content-type in txt + body = first(msg.get_payload()).get_payload() + if isinstance(body, list): + # allowing one nesting level for now... + body, rest = body[0].get_payload(), body[1:] + for p in rest: + inner_parts.append(p) + else: + body = msg.get_payload() + logger.debug("adding msg (multipart:%s)" % multi) + + # flags doc --------------------------------------- + fd[self.MBOX_KEY] = self.mbox + fd[self.UID_KEY] = uid + fd[self.CONTENT_HASH_KEY] = chash + fd[self.MULTIPART_KEY] = multi + fd[self.SIZE_KEY] = len(raw_str) + if flags: + fd[self.FLAGS_KEY] = map(self._stringify, flags) + fd[self.SEEN_KEY] = self.SEEN_FLAG in flags + fd[self.RECENT_KEY] = self.RECENT_FLAG in flags + + # headers doc ---------------------------------------- + hd[self.CONTENT_HASH_KEY] = chash + hd[self.HEADERS_KEY] = headers + if not subject and self.SUBJECT_FIELD in headers: + hd[self.SUBJECT_KEY] = headers[self.SUBJECT_FIELD] + else: + hd[self.SUBJECT_KEY] = subject if not date and self.DATE_FIELD in headers: - content_doc[self.DATE_KEY] = headers[self.DATE_FIELD] + hd[self.DATE_KEY] = headers[self.DATE_FIELD] else: - content_doc[self.DATE_KEY] = date - - logger.debug('enqueuing message for write') - + hd[self.DATE_KEY] = date + if multi: + hd[self.NUM_PARTS_KEY] = len(msg.get_payload()) + + # body doc + bd[self.CONTENT_HASH_KEY] = chash + bd[self.BODY_KEY] = body + # in an ideal world, we would not need to save a copy of the + # raw message. But we'll keep it until we can be sure that + # we can rebuild the original message from the parts. + bd[self.RAW_KEY] = raw_str + + docs = [fd, hd] + + # attachment docs + if multi: + outer_parts = msg.get_payload() + parts = outer_parts + inner_parts + + # skip first part, we already got it in body + to_attach = ((i, m) for i, m in enumerate(parts) if i > 0) + for index, part_msg in to_attach: + att_doc = self._get_empty_doc(self.ATTACHMENT_DOC) + att_doc[self.PART_NUMBER_KEY] = index + att_doc[self.CONTENT_HASH_KEY] = chash + phash = self._get_hash(part_msg) + att_doc[self.PAYLOAD_HASH_KEY] = phash + att_doc[self.RAW_KEY] = part_msg.as_string() + + # keep a pointer to the payload hash in the + # headers doc, under the parts_map + hd[self.PARTS_MAP_KEY][str(index)] = phash + attaches.append(att_doc) + + # Saving ... ------------------------------- + # ok, there we go... + logger.debug('enqueuing message docs for write') ptuple = SoledadWriterPayload + + # first, regular docs: flags and headers + for doc in docs: + self.soledad_writer.put(ptuple( + mode=ptuple.CREATE, payload=doc)) + # second, try to create body doc. self.soledad_writer.put(ptuple( - mode=ptuple.CREATE, payload=content_doc)) - self.soledad_writer.put(ptuple( - mode=ptuple.CREATE, payload=flags_doc)) + mode=ptuple.BODY_CREATE, payload=bd)) + # and last, but not least, try to create + # attachment docs if not already there. + for at in attaches: + self.soledad_writer.put(ptuple( + mode=ptuple.ATTACHMENT_CREATE, payload=at)) def remove(self, msg): """ @@ -563,8 +1066,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): :param msg: a Leapmessage instance :type msg: LeapMessage """ - # XXX remove - #self._soledad.delete_doc(msg) msg.remove() # getters @@ -596,14 +1097,14 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): :rtype: list of SoledadDocument """ if _type not in fields.__dict__.values(): - raise TypeError("Wrong type passed to get_all") + raise TypeError("Wrong type passed to get_all_docs") if sameProxiedObjects(self._soledad, None): logger.warning('Tried to get messages but soledad is None!') return [] all_docs = [doc for doc in self._soledad.get_from_index( - SoledadBackedAccount.TYPE_MBOX_IDX, + fields.TYPE_MBOX_IDX, _type, self.mbox)] # inneficient, but first let's grok it and then @@ -618,8 +1119,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): """ all_uids = (doc.content[self.UID_KEY] for doc in self._soledad.get_from_index( - SoledadBackedAccount.TYPE_MBOX_IDX, - self.TYPE_FLAGS_VAL, self.mbox)) + fields.TYPE_MBOX_IDX, + fields.TYPE_FLAGS_VAL, self.mbox)) return (u for u in sorted(all_uids)) def count(self): @@ -629,7 +1130,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): :rtype: int """ count = self._soledad.get_count_from_index( - SoledadBackedAccount.TYPE_MBOX_IDX, + fields.TYPE_MBOX_IDX, fields.TYPE_FLAGS_VAL, self.mbox) return count @@ -645,8 +1146,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): """ return (doc.content[self.UID_KEY] for doc in self._soledad.get_from_index( - SoledadBackedAccount.TYPE_MBOX_SEEN_IDX, - self.TYPE_FLAGS_VAL, self.mbox, '0')) + fields.TYPE_MBOX_SEEN_IDX, + fields.TYPE_FLAGS_VAL, self.mbox, '0')) def count_unseen(self): """ @@ -656,8 +1157,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): :rtype: int """ count = self._soledad.get_count_from_index( - SoledadBackedAccount.TYPE_MBOX_SEEN_IDX, - self.TYPE_FLAGS_VAL, self.mbox, '0') + fields.TYPE_MBOX_SEEN_IDX, + fields.TYPE_FLAGS_VAL, self.mbox, '0') return count def get_unseen(self): @@ -681,8 +1182,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): """ return (doc.content[self.UID_KEY] for doc in self._soledad.get_from_index( - SoledadBackedAccount.TYPE_MBOX_RECT_IDX, - self.TYPE_FLAGS_VAL, self.mbox, '1')) + fields.TYPE_MBOX_RECT_IDX, + fields.TYPE_FLAGS_VAL, self.mbox, '1')) def get_recent(self): """ @@ -702,8 +1203,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): :rtype: int """ count = self._soledad.get_count_from_index( - SoledadBackedAccount.TYPE_MBOX_RECT_IDX, - self.TYPE_FLAGS_VAL, self.mbox, '1') + fields.TYPE_MBOX_RECT_IDX, + fields.TYPE_FLAGS_VAL, self.mbox, '1') return count def __len__(self): @@ -731,5 +1232,5 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): return u"" % ( self.mbox, self.count()) - # XXX should implement __eq__ also !!! --- use a hash - # of content for that, will be used for dedup. + # XXX should implement __eq__ also !!! + # --- use the content hash for that, will be used for dedup. -- cgit v1.2.3 From a912729c4788d46d648a72126226741b63e0a37c Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 2 Jan 2014 17:14:03 -0400 Subject: add documentation to the decorator, fix errorback. * it also fixes the traceback in the errorback, thanks to chiiph, who reads documentation instead of whinning :D * other minor documentation corrections --- src/leap/mail/imap/messages.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'src/leap/mail/imap/messages.py') diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py index c69c023..47c40d5 100644 --- a/src/leap/mail/imap/messages.py +++ b/src/leap/mail/imap/messages.py @@ -695,6 +695,9 @@ class LeapMessage(fields, MailParser, MBoxParser): SoledadWriterPayload = namedtuple( 'SoledadWriterPayload', ['mode', 'payload']) +# TODO we could consider using enum here: +# https://pypi.python.org/pypi/enum + SoledadWriterPayload.CREATE = 1 SoledadWriterPayload.PUT = 2 SoledadWriterPayload.BODY_CREATE = 3 @@ -758,7 +761,7 @@ class SoledadDocWriter(object): Message deduplication. We do a query for the content hashes before writing to our beloved - slcipher backend of Soledad. This means, by now, that: + sqlcipher backend of Soledad. This means, by now, that: 1. We will not store the same attachment twice, only the hash of it. 2. We will not store the same message body twice, only the hash of it. -- cgit v1.2.3 From 5585ff784940dee267576d097076de66797f9188 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 2 Jan 2014 16:08:09 -0400 Subject: fix tests after rewrite --- src/leap/mail/imap/messages.py | 94 +++++++++++++++++++++++++++++++++++------- 1 file changed, 78 insertions(+), 16 deletions(-) (limited to 'src/leap/mail/imap/messages.py') diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py index 47c40d5..80411f9 100644 --- a/src/leap/mail/imap/messages.py +++ b/src/leap/mail/imap/messages.py @@ -20,9 +20,11 @@ LeapMessage and MessageCollection. import copy import logging import StringIO -from collections import namedtuple + +from collections import defaultdict, namedtuple from twisted.mail import imap4 +from twisted.internet import defer from twisted.python import log from u1db import errors as u1db_errors from zope.interface import implements @@ -182,6 +184,7 @@ class MessageAttachment(object): if not self._msg: return {} headers = dict(self._msg.items()) + names = map(lambda s: s.upper(), names) if negate: cond = lambda key: key.upper() not in names @@ -329,6 +332,7 @@ class LeapMessage(fields, MailParser, MBoxParser): doc.content[self.FLAGS_KEY] = flags doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags doc.content[self.RECENT_KEY] = self.RECENT_FLAG in flags + doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags self._soledad.put_doc(doc) def addFlags(self, flags): @@ -455,6 +459,7 @@ class LeapMessage(fields, MailParser, MBoxParser): headers = self._get_headers() if not headers: return {'content-type': ''} + names = map(lambda s: s.upper(), names) if negate: cond = lambda key: key.upper() not in names @@ -465,8 +470,8 @@ class LeapMessage(fields, MailParser, MBoxParser): # twisted imap server expects headers to be lowercase head = dict( - map(str, (key, value)) if key.lower() != "content-type" - else map(str, (key.lower(), value)) + (str(key), map(str, value)) if key.lower() != "content-type" + else (str(key.lower(), map(str, value))) for (key, value) in head.items()) # unpack and filter original dict by negate-condition @@ -670,6 +675,9 @@ class LeapMessage(fields, MailParser, MBoxParser): # until we think about a good way of deorphaning. # Maybe a crawler of unreferenced docs. + uid = self._uid + print "removing...", uid + fd = self._get_flags_doc() hd = self._get_headers_doc() #bd = self._get_body_doc() @@ -682,7 +690,11 @@ class LeapMessage(fields, MailParser, MBoxParser): #docs.append(ad) for d in filter(None, docs): - self._soledad.delete_doc(d) + try: + self._soledad.delete_doc(d) + except Exception as exc: + logger.error(exc) + return uid def does_exist(self): """ @@ -849,6 +861,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): fields.SEEN_KEY: False, fields.RECENT_KEY: True, + fields.DEL_KEY: False, fields.FLAGS_KEY: [], fields.MULTIPART_KEY: False, fields.SIZE_KEY: 0 @@ -921,7 +934,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): self.soledad_writer = MessageProducer( SoledadDocWriter(soledad), - period=0.05) + period=0.02) def _get_empty_doc(self, _type=FLAGS_DOC): """ @@ -966,7 +979,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): (self.FLAGS_DOC, self.HEADERS_DOC, self.BODY_DOC)) msg = self._get_parsed_msg(raw) - headers = dict(msg) + headers = defaultdict(list) + for k, v in msg.items(): + headers[k].append(v) raw_str = msg.as_string() chash = self._get_hash(msg) multi = msg.is_multipart() @@ -987,7 +1002,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): inner_parts.append(p) else: body = msg.get_payload() - logger.debug("adding msg (multipart:%s)" % multi) + logger.debug("adding msg with uid %s (multipart:%s)" % ( + uid, multi)) # flags doc --------------------------------------- fd[self.MBOX_KEY] = self.mbox @@ -998,26 +1014,33 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): if flags: fd[self.FLAGS_KEY] = map(self._stringify, flags) fd[self.SEEN_KEY] = self.SEEN_FLAG in flags - fd[self.RECENT_KEY] = self.RECENT_FLAG in flags + fd[self.DEL_KEY] = self.DELETED_FLAG in flags + fd[self.RECENT_KEY] = True # set always by default # headers doc ---------------------------------------- hd[self.CONTENT_HASH_KEY] = chash hd[self.HEADERS_KEY] = headers + + print "headers" + import pprint + pprint.pprint(headers) + if not subject and self.SUBJECT_FIELD in headers: - hd[self.SUBJECT_KEY] = headers[self.SUBJECT_FIELD] + hd[self.SUBJECT_KEY] = first(headers[self.SUBJECT_FIELD]) else: hd[self.SUBJECT_KEY] = subject if not date and self.DATE_FIELD in headers: - hd[self.DATE_KEY] = headers[self.DATE_FIELD] + hd[self.DATE_KEY] = first(headers[self.DATE_FIELD]) else: hd[self.DATE_KEY] = date if multi: + # XXX fix for multipart nested case hd[self.NUM_PARTS_KEY] = len(msg.get_payload()) # body doc bd[self.CONTENT_HASH_KEY] = chash bd[self.BODY_KEY] = body - # in an ideal world, we would not need to save a copy of the + # XXX in an ideal world, we would not need to save a copy of the # raw message. But we'll keep it until we can be sure that # we can rebuild the original message from the parts. bd[self.RAW_KEY] = raw_str @@ -1062,14 +1085,29 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): self.soledad_writer.put(ptuple( mode=ptuple.ATTACHMENT_CREATE, payload=at)) - def remove(self, msg): + def _remove_cb(self, result): + return result + + def remove_all_deleted(self): + """ + Removes all messages flagged as deleted. """ - Removes a message. + delete_deferl = [] + for msg in self.get_deleted(): + delete_deferl.append(msg.remove()) + d1 = defer.gatherResults(delete_deferl, consumeErrors=True) + d1.addCallback(self._remove_cb) + return d1 - :param msg: a Leapmessage instance + def remove(self, msg): + """ + Remove a given msg. + :param msg: the message to be removed :type msg: LeapMessage """ - msg.remove() + d = msg.remove() + d.addCallback(self._remove_cb) + return d # getters @@ -1178,7 +1216,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): def recent_iter(self): """ - Get an iterator for the message docs with `recent` flag. + Get an iterator for the message UIDs with `recent` flag. :return: iterator through recent message docs :rtype: iterable @@ -1210,6 +1248,30 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): fields.TYPE_FLAGS_VAL, self.mbox, '1') return count + # deleted messages + + def deleted_iter(self): + """ + Get an iterator for the message UIDs with `deleted` flag. + + :return: iterator through deleted message docs + :rtype: iterable + """ + return (doc.content[self.UID_KEY] for doc in + self._soledad.get_from_index( + fields.TYPE_MBOX_DEL_IDX, + fields.TYPE_FLAGS_VAL, self.mbox, '1')) + + def get_deleted(self): + """ + Get all messages with the `Deleted` flag. + + :returns: a generator of LeapMessages + :rtype: generator + """ + return (LeapMessage(self._soledad, docid, self.mbox) + for docid in self.deleted_iter()) + def __len__(self): """ Returns the number of messages on this mailbox. -- cgit v1.2.3 From a203337d155a6e7186980ef175642adc91d472fe Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 7 Jan 2014 14:23:25 -0400 Subject: move utility to its own --- src/leap/mail/imap/messages.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) (limited to 'src/leap/mail/imap/messages.py') diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py index 80411f9..bfe913c 100644 --- a/src/leap/mail/imap/messages.py +++ b/src/leap/mail/imap/messages.py @@ -33,6 +33,7 @@ from zope.proxy import sameProxiedObjects from leap.common.check import leap_assert, leap_assert_type from leap.common.decorators import memoized_method from leap.common.mail import get_email_charset +from leap.mail.utils import first from leap.mail.decorators import deferred from leap.mail.imap.index import IndexedDB from leap.mail.imap.fields import fields, WithMsgFields @@ -42,16 +43,6 @@ from leap.mail.messageflow import IMessageConsumer, MessageProducer logger = logging.getLogger(__name__) -def first(things): - """ - Return the head of a collection. - """ - try: - return things[0] - except (IndexError, TypeError): - return None - - class MessageBody(object): """ IMessagePart implementor for the main -- cgit v1.2.3 From 4ba5d5b405e3c6a6bc997df2073ffc8ea3fa75a9 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 7 Jan 2014 11:34:08 -0400 Subject: Second stage of the new year's storage rewrite. * documents of only three types: * flags * headers * content * add algorithm for walking the parsed message tree. * treat special cases like a multipart with a single part. * modify add_msg to use the walk routine * modify twisted interfaces to use the new storage schema. * tests for different multipart cases * fix multipart detection typo in the fetch This is a merge proposal for the 0.5.0-rc3. known bugs ---------- Some things are still know not to work well at this point (some cases of multipart messages do not display the bodies). IMAP server also is left in a bad internal state after a logout/login. --- src/leap/mail/imap/messages.py | 722 ++++++++++++++++++++++------------------- 1 file changed, 388 insertions(+), 334 deletions(-) (limited to 'src/leap/mail/imap/messages.py') diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py index bfe913c..37e4311 100644 --- a/src/leap/mail/imap/messages.py +++ b/src/leap/mail/imap/messages.py @@ -33,6 +33,7 @@ from zope.proxy import sameProxiedObjects from leap.common.check import leap_assert, leap_assert_type from leap.common.decorators import memoized_method from leap.common.mail import get_email_charset +from leap.mail import walk from leap.mail.utils import first from leap.mail.decorators import deferred from leap.mail.imap.index import IndexedDB @@ -43,65 +44,58 @@ from leap.mail.messageflow import IMessageConsumer, MessageProducer logger = logging.getLogger(__name__) -class MessageBody(object): - """ - IMessagePart implementor for the main - body of a multipart message. - - Excusatio non petita: see the interface documentation. - """ +# TODO ------------------------------------------------------------ - implements(imap4.IMessagePart) - - def __init__(self, fdoc, bdoc): - self._fdoc = fdoc - self._bdoc = bdoc - - def getSize(self): - return len(self._bdoc.content[fields.BODY_KEY]) +# [ ] Add linked-from info. +# [ ] Delete incoming mail only after successful write! +# [ ] Remove UID from syncable db. Store only those indexes locally. +# [ ] Send patch to twisted for bug in imap4.py:5717 (content-type can be +# none? lower-case?) - def getBodyFile(self): - fd = StringIO.StringIO() - - if self._bdoc: - body = self._bdoc.content[fields.BODY_KEY] - else: - body = "" - charset = self._get_charset(body) - try: - body = body.encode(charset) - except (UnicodeEncodeError, UnicodeDecodeError) as e: - logger.error("Unicode error {0}".format(e)) - body = body.encode(charset, 'replace') - fd.write(body) - fd.seek(0) - return fd - - @memoized_method - def _get_charset(self, stuff): - return get_email_charset(unicode(stuff)) - - def getHeaders(self, negate, *names): - return {} +def lowerdict(_dict): + """ + Return a dict with the keys in lowercase. - def isMultipart(self): - return False + :param _dict: the dict to convert + :rtype: dict + """ + return dict((key.lower(), value) + for key, value in _dict.items()) - def getSubPart(self, part): - return None +class MessagePart(object): + """ + IMessagePart implementor. + It takes a subpart message and is able to find + the inner parts. -class MessageAttachment(object): + Excusatio non petita: see the interface documentation. + """ implements(imap4.IMessagePart) - def __init__(self, msg): + def __init__(self, soledad, part_map): """ - Initializes the messagepart with a Message instance. - :param msg: a message instance - :type msg: Message + Initializes the MessagePart. + + :param part_map: a dictionary containing the parts map for this + message + :type part_map: dict """ - self._msg = msg + # TODO + # It would be good to pass the uid/mailbox also + # for references while debugging. + + # We have a problem on bulk moves, and is + # that when the fetch on the new mailbox is done + # the parts maybe are not complete. + # So we should be able to fail with empty + # docs until we solve that. The ideal would be + # to gather the results of the deferred operations + # to signal the operation is complete. + #leap_assert(part_map, "part map dict cannot be null") + self._soledad = soledad + self._pmap = part_map def getSize(self): """ @@ -110,9 +104,12 @@ class MessageAttachment(object): :return: size of the message, in octets :rtype: int """ - if not self._msg: + if not self._pmap: return 0 - return len(self._msg.as_string()) + size = self._pmap.get('size', None) + if not size: + logger.error("Message part cannot find size in the partmap") + return size def getBodyFile(self): """ @@ -122,24 +119,91 @@ class MessageAttachment(object): :rtype: StringIO """ fd = StringIO.StringIO() - if self._msg: - body = self._msg.get_payload() + if self._pmap: + multi = self._pmap.get('multi') + if not multi: + phash = self._pmap.get("phash", None) + else: + pmap = self._pmap.get('part_map') + first_part = pmap.get('1', None) + if first_part: + phash = first_part['phash'] + + if not phash: + logger.warning("Could not find phash for this subpart!") + payload = str("") + else: + payload = self._get_payload_from_document(phash) + else: - logger.debug("Empty message!") - body = "" - - # XXX should only do the dance if we're sure it's - # content/text-plain!!! - #charset = self._get_charset(body) - #try: - #body = body.encode(charset) - #except (UnicodeEncodeError, UnicodeDecodeError) as e: - #logger.error("Unicode error {0}".format(e)) - #body = body.encode(charset, 'replace') - fd.write(body) + logger.warning("Message with no part_map!") + payload = str("") + + if payload: + #headers = self.getHeaders(True) + #headers = lowerdict(headers) + #content_type = headers.get('content-type', "") + content_type = self._get_ctype_from_document(phash) + charset_split = content_type.split('charset=') + # XXX fuck all this, use a regex! + if len(charset_split) > 1: + charset = charset_split[1] + if charset: + charset = charset.strip() + else: + charset = None + if not charset: + charset = self._get_charset(payload) + try: + payload = payload.encode(charset) + except (UnicodeEncodeError, UnicodeDecodeError) as e: + logger.error("Unicode error {0}".format(e)) + payload = payload.encode(charset, 'replace') + + fd.write(payload) fd.seek(0) return fd + # TODO cache the phash retrieval + def _get_payload_from_document(self, phash): + """ + Gets the message payload from the content document. + + :param phash: the payload hash to retrieve by. + :type phash: basestring + """ + cdocs = self._soledad.get_from_index( + fields.TYPE_P_HASH_IDX, + fields.TYPE_CONTENT_VAL, str(phash)) + + cdoc = first(cdocs) + if not cdoc: + logger.warning( + "Could not find the content doc " + "for phash %s" % (phash,)) + payload = cdoc.content.get(fields.RAW_KEY, "") + return payload + + # TODO cache the pahash retrieval + def _get_ctype_from_document(self, phash): + """ + Gets the content-type from the content document. + + :param phash: the payload hash to retrieve by. + :type phash: basestring + """ + cdocs = self._soledad.get_from_index( + fields.TYPE_P_HASH_IDX, + fields.TYPE_CONTENT_VAL, str(phash)) + + cdoc = first(cdocs) + if not cdoc: + logger.warning( + "Could not find the content doc " + "for phash %s" % (phash,)) + ctype = cdoc.content.get('ctype', "") + return ctype + @memoized_method def _get_charset(self, stuff): # TODO put in a common class with LeapMessage @@ -150,8 +214,6 @@ class MessageAttachment(object): :type stuff: basestring :returns: charset """ - # XXX existential doubt 1. wouldn't be smarter to - # peek into the mail headers? # XXX existential doubt 2. shouldn't we make the scope # of the decorator somewhat more persistent? # ah! yes! and put memory bounds. @@ -172,9 +234,17 @@ class MessageAttachment(object): :return: A mapping of header field names to header field values :rtype: dict """ - if not self._msg: + if not self._pmap: + logger.warning("No pmap in Subpart!") return {} - headers = dict(self._msg.items()) + headers = dict(self._pmap.get("headers", [])) + + # twisted imap server expects *some* headers to be lowercase + # We could use a CaseInsensitiveDict here... + headers = dict( + (str(key), str(value)) if key.lower() != "content-type" + else (str(key.lower()), str(value)) + for (key, value) in headers.items()) names = map(lambda s: s.upper(), names) if negate: @@ -187,13 +257,18 @@ class MessageAttachment(object): map(str, (key, val)) for key, val in headers.items() if cond(key)] - return dict(filter_by_cond) + filtered = dict(filter_by_cond) + return filtered def isMultipart(self): """ Return True if this message is multipart. """ - return self._msg.is_multipart() + if not self._pmap: + logger.warning("Could not get part map!") + return False + multi = self._pmap.get("multi", False) + return multi def getSubPart(self, part): """ @@ -206,10 +281,30 @@ class MessageAttachment(object): :rtype: Any object implementing C{IMessagePart}. :return: The specified sub-part. """ - return self._msg.get_payload() + if not self.isMultipart(): + raise TypeError + sub_pmap = self._pmap.get("part_map", {}) + try: + part_map = sub_pmap[str(part + 1)] + except KeyError: + logger.debug("getSubpart for %s: KeyError" % (part,)) + raise IndexError + + # XXX check for validity + return MessagePart(self._soledad, part_map) class LeapMessage(fields, MailParser, MBoxParser): + """ + The main representation of a message. + + It indexes the messages in one mailbox by a combination + of uid+mailbox name. + """ + + # TODO this has to change. + # Should index primarily by chash, and keep a local-lonly + # UID table. implements(imap4.IMessage) @@ -268,6 +363,8 @@ class LeapMessage(fields, MailParser, MBoxParser): """ An accessor to the body document. """ + if not self._hdoc: + return None if not self.__bdoc: self.__bdoc = self._get_body_doc() return self.__bdoc @@ -320,6 +417,11 @@ class LeapMessage(fields, MailParser, MBoxParser): log.msg('setting flags: %s' % (self._uid)) doc = self._fdoc + if not doc: + logger.warning( + "Could not find FDOC for %s:%s while setting flags!" % + (self._mbox, self._uid)) + return doc.content[self.FLAGS_KEY] = flags doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags doc.content[self.RECENT_KEY] = self.RECENT_FLAG in flags @@ -384,16 +486,25 @@ class LeapMessage(fields, MailParser, MBoxParser): fd = StringIO.StringIO() bdoc = self._bdoc if bdoc: - body = self._bdoc.content.get(self.BODY_KEY, "") + body = str(self._bdoc.content.get(self.RAW_KEY, "")) else: - body = "" + logger.warning("No BDOC found for message.") + body = str("") + + # XXX not needed, isn't it? ---- ivan? + #if bdoc: + #content_type = bdoc.content.get('content-type', "") + #charset = content_type.split('charset=')[1] + #if charset: + #charset = charset.strip() + #if not charset: + #charset = self._get_charset(body) + #try: + #body = str(body.encode(charset)) + #except (UnicodeEncodeError, UnicodeDecodeError) as e: + #logger.error("Unicode error {0}".format(e)) + #body = str(body.encode(charset, 'replace')) - charset = self._get_charset(body) - try: - body = body.encode(charset) - except (UnicodeEncodeError, UnicodeDecodeError) as e: - logger.error("Unicode error {0}".format(e)) - body = body.encode(charset, 'replace') fd.write(body) fd.seek(0) return fd @@ -407,8 +518,7 @@ class LeapMessage(fields, MailParser, MBoxParser): :type stuff: basestring :returns: charset """ - # XXX existential doubt 1. wouldn't be smarter to - # peek into the mail headers? + # TODO get from subpart headers # XXX existential doubt 2. shouldn't we make the scope # of the decorator somewhat more persistent? # ah! yes! and put memory bounds. @@ -447,9 +557,11 @@ class LeapMessage(fields, MailParser, MBoxParser): :return: A mapping of header field names to header field values :rtype: dict """ + # TODO split in smaller methods headers = self._get_headers() if not headers: - return {'content-type': ''} + logger.warning("No headers found") + return {str('content-type'): str('')} names = map(lambda s: s.upper(), names) if negate: @@ -457,16 +569,20 @@ class LeapMessage(fields, MailParser, MBoxParser): else: cond = lambda key: key.upper() in names - head = copy.deepcopy(dict(headers.items())) + if isinstance(headers, list): + headers = dict(headers) - # twisted imap server expects headers to be lowercase - head = dict( - (str(key), map(str, value)) if key.lower() != "content-type" - else (str(key.lower(), map(str, value))) - for (key, value) in head.items()) + # twisted imap server expects *some* headers to be lowercase + # XXX refactor together with MessagePart method + headers = dict( + (str(key), str(value)) if key.lower() != "content-type" + else (str(key.lower()), str(value)) + for (key, value) in headers.items()) # unpack and filter original dict by negate-condition - filter_by_cond = [(key, val) for key, val in head.items() if cond(key)] + filter_by_cond = [(key, val) for key, val + in headers.items() if cond(key)] + return dict(filter_by_cond) def _get_headers(self): @@ -474,7 +590,9 @@ class LeapMessage(fields, MailParser, MBoxParser): Return the headers dict for this message. """ if self._hdoc is not None: - return self._hdoc.content.get(self.HEADERS_KEY, {}) + headers = self._hdoc.content.get(self.HEADERS_KEY, {}) + return headers + else: logger.warning( "No HEADERS doc for msg %s:%s" % ( @@ -486,12 +604,13 @@ class LeapMessage(fields, MailParser, MBoxParser): Return True if this message is multipart. """ if self._fdoc: - return self._fdoc.content.get(self.MULTIPART_KEY, False) + is_multipart = self._fdoc.content.get(self.MULTIPART_KEY, False) + return is_multipart else: logger.warning( "No FLAGS doc for msg %s:%s" % ( - self.mbox, - self.uid)) + self._mbox, + self._uid)) def getSubPart(self, part): """ @@ -504,27 +623,33 @@ class LeapMessage(fields, MailParser, MBoxParser): :rtype: Any object implementing C{IMessagePart}. :return: The specified sub-part. """ - logger.debug("Getting subpart: %s" % part) if not self.isMultipart(): raise TypeError - - if part == 0: - # Let's get the first part, which - # is really the body. - return MessageBody(self._fdoc, self._bdoc) - - attach_doc = self._get_attachment_doc(part) - if not attach_doc: - # so long and thanks for all the fish - logger.debug("...not today") + try: + pmap_dict = self._get_part_from_parts_map(part + 1) + except KeyError: + logger.debug("getSubpart for %s: KeyError" % (part,)) raise IndexError - msg_part = self._get_parsed_msg(attach_doc.content[self.RAW_KEY]) - return MessageAttachment(msg_part) + return MessagePart(self._soledad, pmap_dict) # # accessors # + def _get_part_from_parts_map(self, part): + """ + Get a part map from the headers doc + + :raises: KeyError if key does not exist + :rtype: dict + """ + if not self._hdoc: + logger.warning("Tried to get part but no HDOC found!") + return None + + pmap = self._hdoc.content.get(fields.PARTS_MAP_KEY, {}) + return pmap[str(part)] + def _get_flags_doc(self): """ Return the document that keeps the flags for this @@ -550,63 +675,16 @@ class LeapMessage(fields, MailParser, MBoxParser): Return the document that keeps the body for this message. """ - body_docs = self._soledad.get_from_index( - fields.TYPE_C_HASH_IDX, - fields.TYPE_MESSAGE_VAL, str(self._chash)) - return first(body_docs) - - def _get_num_parts(self): - """ - Return the number of parts for a multipart message. - """ - if not self.isMultipart(): - raise TypeError( - "Tried to get num parts in a non-multipart message") - if not self._hdoc: - return None - return self._hdoc.content.get(fields.NUM_PARTS_KEY, 2) - - def _get_attachment_doc(self, part): - """ - Return the document that keeps the headers for this - message. - - :param part: the part number for the multipart message. - :type part: int - """ - if not self._hdoc: - return None - try: - phash = self._hdoc.content[self.PARTS_MAP_KEY][str(part)] - except KeyError: - # this is the remnant of a debug session until - # I found that the index is actually a string... - # It should be safe to just raise the KeyError now, - # but leaving it here while the blood is fresh... - logger.warning("We expected a phash in the " - "index %s, but noone found" % (part, )) - logger.debug(self._hdoc.content[self.PARTS_MAP_KEY]) + body_phash = self._hdoc.content.get( + fields.BODY_KEY, None) + if not body_phash: + logger.warning("No body phash for this document!") return None - attach_docs = self._soledad.get_from_index( + body_docs = self._soledad.get_from_index( fields.TYPE_P_HASH_IDX, - fields.TYPE_ATTACHMENT_VAL, str(phash)) - - # The following is true for the fist owner. - # We could use this relationship to flag the "owner" - # and orphan when we delete it. + fields.TYPE_CONTENT_VAL, str(body_phash)) - #attach_docs = self._soledad.get_from_index( - #fields.TYPE_C_HASH_PART_IDX, - #fields.TYPE_ATTACHMENT_VAL, str(self._chash), str(part)) - return first(attach_docs) - - def _get_raw_msg(self): - """ - Return the raw msg. - :rtype: basestring - """ - # TODO deprecate this. - return self._bdoc.content.get(self.RAW_KEY, '') + return first(body_docs) def __getitem__(self, key): """ @@ -658,27 +736,22 @@ class LeapMessage(fields, MailParser, MBoxParser): """ Remove all docs associated with this message. """ - # XXX this would ve more efficient if we can just pass - # a sequence of uids. - # XXX For the moment we are only removing the flags and headers # docs. The rest we leave there polluting your hard disk, # until we think about a good way of deorphaning. # Maybe a crawler of unreferenced docs. + # XXX implement elijah's idea of using a PUT document as a + # token to ensure consistency in the removal. + uid = self._uid - print "removing...", uid fd = self._get_flags_doc() - hd = self._get_headers_doc() + #hd = self._get_headers_doc() #bd = self._get_body_doc() #docs = [fd, hd, bd] - docs = [fd, hd] - - #for pn in range(self._get_num_parts()[1:]): - #ad = self._get_attachment_doc(pn) - #docs.append(ad) + docs = [fd] for d in filter(None, docs): try: @@ -703,8 +776,7 @@ SoledadWriterPayload = namedtuple( SoledadWriterPayload.CREATE = 1 SoledadWriterPayload.PUT = 2 -SoledadWriterPayload.BODY_CREATE = 3 -SoledadWriterPayload.ATTACHMENT_CREATE = 4 +SoledadWriterPayload.CONTENT_CREATE = 3 class SoledadDocWriter(object): @@ -723,6 +795,38 @@ class SoledadDocWriter(object): """ self._soledad = soledad + def _get_call_for_item(self, item): + """ + Return the proper call type for a given item. + + :param item: one of the types defined under the + attributes of SoledadWriterPayload + :type item: int + """ + call = None + payload = item.payload + + if item.mode == SoledadWriterPayload.CREATE: + call = self._soledad.create_doc + elif (item.mode == SoledadWriterPayload.CONTENT_CREATE + and not self._content_does_exist(payload)): + call = self._soledad.create_doc + elif item.mode == SoledadWriterPayload.PUT: + call = self._soledad.put_doc + return call + + def _process(self, queue): + """ + Return the item and the proper call type for the next + item in the queue if any. + + :param queue: the queue from where we'll pick item. + :type queue: Queue + """ + item = queue.get() + call = self._get_call_for_item(item) + return item, call + def consume(self, queue): """ Creates a new document in soledad db. @@ -733,24 +837,10 @@ class SoledadDocWriter(object): """ empty = queue.empty() while not empty: - item = queue.get() - call = None - payload = item.payload - - if item.mode == SoledadWriterPayload.CREATE: - call = self._soledad.create_doc - elif item.mode == SoledadWriterPayload.BODY_CREATE: - if not self._body_does_exist(payload): - call = self._soledad.create_doc - elif item.mode == SoledadWriterPayload.ATTACHMENT_CREATE: - if not self._attachment_does_exist(payload): - call = self._soledad.create_doc - elif item.mode == SoledadWriterPayload.PUT: - call = self._soledad.put_doc - - # XXX delete? + item, call = self._process(queue) if call: + # XXX should handle the delete case # should handle errors try: call(item.payload) @@ -779,33 +869,10 @@ class SoledadDocWriter(object): Stack. """ - def _body_does_exist(self, doc): + def _content_does_exist(self, doc): """ - Check whether we already have a body payload with this hash in our - database. - - :param doc: tentative body document - :type doc: dict - :returns: True if that happens, False otherwise. - """ - if not doc: - return False - chash = doc[fields.CONTENT_HASH_KEY] - body_docs = self._soledad.get_from_index( - fields.TYPE_C_HASH_IDX, - fields.TYPE_MESSAGE_VAL, str(chash)) - if not body_docs: - return False - if len(body_docs) != 1: - logger.warning("Found more than one copy of chash %s!" - % (chash,)) - logger.debug("Found body doc with that hash! Skipping save!") - return True - - def _attachment_does_exist(self, doc): - """ - Check whether we already have an attachment payload with this hash - in our database. + Check whether we already have a content document for a payload + with this hash in our database. :param doc: tentative body document :type doc: dict @@ -816,7 +883,7 @@ class SoledadDocWriter(object): phash = doc[fields.PAYLOAD_HASH_KEY] attach_docs = self._soledad.get_from_index( fields.TYPE_P_HASH_IDX, - fields.TYPE_ATTACHMENT_VAL, str(phash)) + fields.TYPE_CONTENT_VAL, str(phash)) if not attach_docs: return False @@ -840,15 +907,15 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # into a template for the class. FLAGS_DOC = "FLAGS" HEADERS_DOC = "HEADERS" - ATTACHMENT_DOC = "ATTACHMENT" - BODY_DOC = "BODY" + CONTENT_DOC = "CONTENT" templates = { FLAGS_DOC: { fields.TYPE_KEY: fields.TYPE_FLAGS_VAL, - fields.UID_KEY: 1, + fields.UID_KEY: 1, # XXX moe to a local table fields.MBOX_KEY: fields.INBOX_VAL, + fields.CONTENT_HASH_KEY: "", fields.SEEN_KEY: False, fields.RECENT_KEY: True, @@ -862,35 +929,28 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): fields.TYPE_KEY: fields.TYPE_HEADERS_VAL, fields.CONTENT_HASH_KEY: "", + fields.DATE_KEY: "", + fields.SUBJECT_KEY: "", + fields.HEADERS_KEY: {}, - fields.NUM_PARTS_KEY: 0, fields.PARTS_MAP_KEY: {}, - fields.DATE_KEY: "", - fields.SUBJECT_KEY: "" }, - ATTACHMENT_DOC: { - fields.TYPE_KEY: fields.TYPE_ATTACHMENT_VAL, - fields.PART_NUMBER_KEY: 0, - fields.CONTENT_HASH_KEY: "", + CONTENT_DOC: { + fields.TYPE_KEY: fields.TYPE_CONTENT_VAL, fields.PAYLOAD_HASH_KEY: "", + fields.LINKED_FROM_KEY: [], + fields.CTYPE_KEY: "", # should index by this too - fields.RAW_KEY: "" - }, - - BODY_DOC: { - fields.TYPE_KEY: fields.TYPE_MESSAGE_VAL, - fields.CONTENT_HASH_KEY: "", - - fields.BODY_KEY: "", - - # this should not be needed, - # but let's keep the raw msg for some time - # until we are sure we can reconstruct - # the original msg from our disection. + # should only get inmutable headers parts + # (for indexing) + fields.HEADERS_KEY: {}, fields.RAW_KEY: "", + fields.PARTS_MAP_KEY: {}, + fields.HEADERS_KEY: {}, + fields.MULTIPART_KEY: False, + }, - } } def __init__(self, mbox=None, soledad=None): @@ -938,128 +998,124 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): raise TypeError("Improper type passed to _get_empty_doc") return copy.deepcopy(self.templates[_type]) - @deferred - def add_msg(self, raw, subject=None, flags=None, date=None, uid=1): + def _do_parse(self, raw): """ - Creates a new message document. + Parse raw message and return it along with + relevant information about its outer level. :param raw: the raw message - :type raw: str - - :param subject: subject of the message. - :type subject: str - - :param flags: flags - :type flags: list - - :param date: the received date for the message - :type date: str - - :param uid: the message uid for this mailbox - :type uid: int + :type raw: StringIO or basestring + :return: msg, chash, size, multi + :rtype: tuple """ - # TODO: split in smaller methods - logger.debug('adding message') - if flags is None: - flags = tuple() - leap_assert_type(flags, tuple) - - # docs for flags, headers, and body - fd, hd, bd = map( - lambda t: self._get_empty_doc(t), - (self.FLAGS_DOC, self.HEADERS_DOC, self.BODY_DOC)) - msg = self._get_parsed_msg(raw) - headers = defaultdict(list) - for k, v in msg.items(): - headers[k].append(v) - raw_str = msg.as_string() chash = self._get_hash(msg) + size = len(msg.as_string()) multi = msg.is_multipart() + return msg, chash, size, multi - attaches = [] - inner_parts = [] - - if multi: - # XXX should walk down recursively - # in a better way. but fixing this quick - # to have an rc. - # XXX should pick the content-type in txt - body = first(msg.get_payload()).get_payload() - if isinstance(body, list): - # allowing one nesting level for now... - body, rest = body[0].get_payload(), body[1:] - for p in rest: - inner_parts.append(p) - else: - body = msg.get_payload() - logger.debug("adding msg with uid %s (multipart:%s)" % ( - uid, multi)) + def _populate_flags(self, flags, uid, chash, size, multi): + """ + Return a flags doc. + + XXX Missing DOC ----------- + """ + fd = self._get_empty_doc(self.FLAGS_DOC) - # flags doc --------------------------------------- fd[self.MBOX_KEY] = self.mbox fd[self.UID_KEY] = uid fd[self.CONTENT_HASH_KEY] = chash + fd[self.SIZE_KEY] = size fd[self.MULTIPART_KEY] = multi - fd[self.SIZE_KEY] = len(raw_str) if flags: fd[self.FLAGS_KEY] = map(self._stringify, flags) fd[self.SEEN_KEY] = self.SEEN_FLAG in flags fd[self.DEL_KEY] = self.DELETED_FLAG in flags fd[self.RECENT_KEY] = True # set always by default + return fd - # headers doc ---------------------------------------- + def _populate_headr(self, msg, chash, subject, date): + """ + Return a headers doc. + + XXX Missing DOC ----------- + """ + headers = defaultdict(list) + for k, v in msg.items(): + headers[k].append(v) + + # "fix" for repeated headers. + for k, v in headers.items(): + newline = "\n%s: " % (k,) + headers[k] = newline.join(v) + + hd = self._get_empty_doc(self.HEADERS_DOC) hd[self.CONTENT_HASH_KEY] = chash hd[self.HEADERS_KEY] = headers - print "headers" - import pprint - pprint.pprint(headers) - if not subject and self.SUBJECT_FIELD in headers: hd[self.SUBJECT_KEY] = first(headers[self.SUBJECT_FIELD]) else: hd[self.SUBJECT_KEY] = subject + if not date and self.DATE_FIELD in headers: hd[self.DATE_KEY] = first(headers[self.DATE_FIELD]) else: hd[self.DATE_KEY] = date - if multi: - # XXX fix for multipart nested case - hd[self.NUM_PARTS_KEY] = len(msg.get_payload()) - - # body doc - bd[self.CONTENT_HASH_KEY] = chash - bd[self.BODY_KEY] = body - # XXX in an ideal world, we would not need to save a copy of the - # raw message. But we'll keep it until we can be sure that - # we can rebuild the original message from the parts. - bd[self.RAW_KEY] = raw_str + return hd + + @deferred + def add_msg(self, raw, subject=None, flags=None, date=None, uid=1): + """ + Creates a new message document. + + :param raw: the raw message + :type raw: str + + :param subject: subject of the message. + :type subject: str + + :param flags: flags + :type flags: list + + :param date: the received date for the message + :type date: str + + :param uid: the message uid for this mailbox + :type uid: int + """ + # TODO signal that we can delete the original message!----- + # when all the processing is done. + + # TODO add the linked-from info ! + + logger.debug('adding message') + if flags is None: + flags = tuple() + leap_assert_type(flags, tuple) + + # parse + msg, chash, size, multi = self._do_parse(raw) + + fd = self._populate_flags(flags, uid, chash, size, multi) + hd = self._populate_headr(msg, chash, subject, date) + + parts = walk.get_parts(msg) + body_phash_fun = [walk.get_body_phash_simple, + walk.get_body_phash_multi][int(multi)] + body_phash = body_phash_fun(walk.get_payloads(msg)) + parts_map = walk.walk_msg_tree(parts, body_phash=body_phash) + + # add parts map to header doc + # (body, multi, part_map) + for key in parts_map: + hd[key] = parts_map[key] + del parts_map docs = [fd, hd] + cdocs = walk.get_raw_docs(msg, parts) - # attachment docs - if multi: - outer_parts = msg.get_payload() - parts = outer_parts + inner_parts - - # skip first part, we already got it in body - to_attach = ((i, m) for i, m in enumerate(parts) if i > 0) - for index, part_msg in to_attach: - att_doc = self._get_empty_doc(self.ATTACHMENT_DOC) - att_doc[self.PART_NUMBER_KEY] = index - att_doc[self.CONTENT_HASH_KEY] = chash - phash = self._get_hash(part_msg) - att_doc[self.PAYLOAD_HASH_KEY] = phash - att_doc[self.RAW_KEY] = part_msg.as_string() - - # keep a pointer to the payload hash in the - # headers doc, under the parts_map - hd[self.PARTS_MAP_KEY][str(index)] = phash - attaches.append(att_doc) - - # Saving ... ------------------------------- - # ok, there we go... + # Saving logger.debug('enqueuing message docs for write') ptuple = SoledadWriterPayload @@ -1067,14 +1123,12 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): for doc in docs: self.soledad_writer.put(ptuple( mode=ptuple.CREATE, payload=doc)) - # second, try to create body doc. - self.soledad_writer.put(ptuple( - mode=ptuple.BODY_CREATE, payload=bd)) + # and last, but not least, try to create - # attachment docs if not already there. - for at in attaches: + # content docs if not already there. + for cd in cdocs: self.soledad_writer.put(ptuple( - mode=ptuple.ATTACHMENT_CREATE, payload=at)) + mode=ptuple.CONTENT_CREATE, payload=cd)) def _remove_cb(self, result): return result -- cgit v1.2.3