summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/messages.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/mail/imap/messages.py')
-rw-r--r--src/leap/mail/imap/messages.py1384
1 files changed, 1384 insertions, 0 deletions
diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py
new file mode 100644
index 0000000..b0b2f95
--- /dev/null
+++ b/src/leap/mail/imap/messages.py
@@ -0,0 +1,1384 @@
+# -*- coding: utf-8 -*-
+# messages.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+LeapMessage and MessageCollection.
+"""
+import copy
+import logging
+import re
+import threading
+import StringIO
+
+from collections import defaultdict
+from email import message_from_string
+from functools import partial
+
+from pycryptopp.hash import sha256
+from twisted.mail import imap4
+from twisted.internet import defer
+from zope.interface import implements
+from zope.proxy import sameProxiedObjects
+
+from leap.common.check import leap_assert, leap_assert_type
+from leap.common.decorators import memoized_method
+from leap.common.mail import get_email_charset
+from leap.mail import walk
+from leap.mail.utils import first, find_charset, lowerdict, empty
+from leap.mail.utils import stringify_parts_map
+from leap.mail.decorators import deferred_to_thread
+from leap.mail.imap.index import IndexedDB
+from leap.mail.imap.fields import fields, WithMsgFields
+from leap.mail.imap.memorystore import MessageWrapper
+from leap.mail.imap.messageparts import MessagePart, MessagePartDoc
+from leap.mail.imap.parser import MBoxParser
+
+logger = logging.getLogger(__name__)
+
+# TODO ------------------------------------------------------------
+
+# [ ] Add ref to incoming message during add_msg
+# [ ] Add linked-from info.
+# * Need a new type of documents: linkage info.
+# * HDOCS are linked from FDOCs (ref to chash)
+# * CDOCS are linked from HDOCS (ref to chash)
+
+# [ ] Delete incoming mail only after successful write!
+# [ ] Remove UID from syncable db. Store only those indexes locally.
+
+MSGID_PATTERN = r"""<([\w@.]+)>"""
+MSGID_RE = re.compile(MSGID_PATTERN)
+
+
+def try_unique_query(curried):
+ """
+ Try to execute a query that is expected to have a
+ single outcome, and log a warning if more than one document found.
+
+ :param curried: a curried function
+ :type curried: callable
+ """
+ leap_assert(callable(curried), "A callable is expected")
+ try:
+ query = curried()
+ if query:
+ if len(query) > 1:
+ # TODO we could take action, like trigger a background
+ # process to kill dupes.
+ name = getattr(curried, 'expected', 'doc')
+ logger.warning(
+ "More than one %s found for this mbox, "
+ "we got a duplicate!!" % (name,))
+ return query.pop()
+ else:
+ return None
+ except Exception as exc:
+ logger.exception("Unhandled error %r" % exc)
+
+
+"""
+A dictionary that keeps one lock per mbox and uid.
+"""
+# XXX too much overhead?
+fdoc_locks = defaultdict(lambda: defaultdict(lambda: threading.Lock()))
+
+
+class LeapMessage(fields, MBoxParser):
+ """
+ The main representation of a message.
+
+ It indexes the messages in one mailbox by a combination
+ of uid+mailbox name.
+ """
+
+ # TODO this has to change.
+ # Should index primarily by chash, and keep a local-only
+ # UID table.
+
+ implements(imap4.IMessage)
+
+ def __init__(self, soledad, uid, mbox, collection=None, container=None):
+ """
+ Initializes a LeapMessage.
+
+ :param soledad: a Soledad instance
+ :type soledad: Soledad
+ :param uid: the UID for the message.
+ :type uid: int or basestring
+ :param mbox: the mbox this message belongs to
+ :type mbox: str or unicode
+ :param collection: a reference to the parent collection object
+ :type collection: MessageCollection
+ :param container: a IMessageContainer implementor instance
+ :type container: IMessageContainer
+ """
+ self._soledad = soledad
+ self._uid = int(uid) if uid is not None else None
+ self._mbox = self._parse_mailbox_name(mbox)
+ self._collection = collection
+ self._container = container
+
+ self.__chash = None
+ self.__bdoc = None
+
+ from twisted.internet import reactor
+ self.reactor = reactor
+
+ # XXX make these properties public
+
+ @property
+ def fdoc(self):
+ """
+ An accessor to the flags document.
+ """
+ if all(map(bool, (self._uid, self._mbox))):
+ fdoc = None
+ if self._container is not None:
+ fdoc = self._container.fdoc
+ if not fdoc:
+ fdoc = self._get_flags_doc()
+ if fdoc:
+ fdoc_content = fdoc.content
+ self.__chash = fdoc_content.get(
+ fields.CONTENT_HASH_KEY, None)
+ return fdoc
+
+ @property
+ def hdoc(self):
+ """
+ An accessor to the headers document.
+ """
+ container = self._container
+ if container is not None:
+ hdoc = self._container.hdoc
+ if hdoc and not empty(hdoc.content):
+ return hdoc
+ hdoc = self._get_headers_doc()
+
+ if container and not empty(hdoc.content):
+ # mem-cache it
+ hdoc_content = hdoc.content
+ chash = hdoc_content.get(fields.CONTENT_HASH_KEY)
+ hdocs = {chash: hdoc_content}
+ container.memstore.load_header_docs(hdocs)
+ return hdoc
+
+ @property
+ def chash(self):
+ """
+ An accessor to the content hash for this message.
+ """
+ if not self.fdoc:
+ return None
+ if not self.__chash and self.fdoc:
+ self.__chash = self.fdoc.content.get(
+ fields.CONTENT_HASH_KEY, None)
+ return self.__chash
+
+ @property
+ def bdoc(self):
+ """
+ An accessor to the body document.
+ """
+ if not self.hdoc:
+ return None
+ if not self.__bdoc:
+ self.__bdoc = self._get_body_doc()
+ return self.__bdoc
+
+ # IMessage implementation
+
+ def getUID(self):
+ """
+ Retrieve the unique identifier associated with this Message.
+
+ :return: uid for this message
+ :rtype: int
+ """
+ return self._uid
+
+ def getFlags(self):
+ """
+ Retrieve the flags associated with this Message.
+
+ :return: The flags, represented as strings
+ :rtype: tuple
+ """
+ uid = self._uid
+
+ flags = set([])
+ fdoc = self.fdoc
+ if fdoc:
+ flags = set(fdoc.content.get(self.FLAGS_KEY, None))
+
+ msgcol = self._collection
+
+ # We treat the recent flag specially: gotten from
+ # a mailbox-level document.
+ if msgcol and uid in msgcol.recent_flags:
+ flags.add(fields.RECENT_FLAG)
+ if flags:
+ flags = map(str, flags)
+ return tuple(flags)
+
+ # setFlags not in the interface spec but we use it with store command.
+
+ def setFlags(self, flags, mode):
+ """
+ Sets the flags for this message
+
+ :param flags: the flags to update in the message.
+ :type flags: tuple of str
+ :param mode: the mode for setting. 1 is append, -1 is remove, 0 set.
+ :type mode: int
+ """
+ leap_assert(isinstance(flags, tuple), "flags need to be a tuple")
+ mbox, uid = self._mbox, self._uid
+
+ APPEND = 1
+ REMOVE = -1
+ SET = 0
+
+ with fdoc_locks[mbox][uid]:
+ doc = self.fdoc
+ if not doc:
+ logger.warning(
+ "Could not find FDOC for %r:%s while setting flags!" %
+ (mbox, uid))
+ return
+ current = doc.content[self.FLAGS_KEY]
+ if mode == APPEND:
+ newflags = tuple(set(tuple(current) + flags))
+ elif mode == REMOVE:
+ newflags = tuple(set(current).difference(set(flags)))
+ elif mode == SET:
+ newflags = flags
+ new_fdoc = {
+ self.FLAGS_KEY: newflags,
+ self.SEEN_KEY: self.SEEN_FLAG in newflags,
+ self.DEL_KEY: self.DELETED_FLAG in newflags}
+ self._collection.memstore.update_flags(mbox, uid, new_fdoc)
+
+ return map(str, newflags)
+
+ def getInternalDate(self):
+ """
+ Retrieve the date internally associated with this message
+
+ According to the spec, this is NOT the date and time in the
+ RFC-822 header, but rather a date and time that reflects when the
+ message was received.
+
+ * In SMTP, date and time of final delivery.
+ * In COPY, internal date/time of the source message.
+ * In APPEND, date/time specified.
+
+ :return: An RFC822-formatted date string.
+ :rtype: str
+ """
+ date = self.hdoc.content.get(fields.DATE_KEY, '')
+ return date
+
+ #
+ # IMessagePart
+ #
+
+ # XXX we should implement this interface too for the subparts
+ # so we allow nested parts...
+
+ def getBodyFile(self):
+ """
+ Retrieve a file object containing only the body of this message.
+
+ :return: file-like object opened for reading
+ :rtype: StringIO
+ """
+ def write_fd(body):
+ fd.write(body)
+ fd.seek(0)
+ return fd
+
+ # TODO refactor with getBodyFile in MessagePart
+
+ fd = StringIO.StringIO()
+
+ if self.bdoc is not None:
+ bdoc_content = self.bdoc.content
+ if empty(bdoc_content):
+ logger.warning("No BDOC content found for message!!!")
+ return write_fd("")
+
+ body = bdoc_content.get(self.RAW_KEY, "")
+ content_type = bdoc_content.get('content-type', "")
+ charset = find_charset(content_type)
+ if charset is None:
+ charset = self._get_charset(body)
+ try:
+ if isinstance(body, unicode):
+ body = body.encode(charset)
+ except UnicodeError as exc:
+ logger.error(
+ "Unicode error, using 'replace'. {0!r}".format(exc))
+ logger.debug("Attempted to encode with: %s" % charset)
+ body = body.encode(charset, 'replace')
+ finally:
+ return write_fd(body)
+
+ # We are still returning funky characters from here.
+ else:
+ logger.warning("No BDOC found for message.")
+ return write_fd("")
+
+ @memoized_method
+ def _get_charset(self, stuff):
+ """
+ Gets (guesses?) the charset of a payload.
+
+ :param stuff: the stuff to guess about.
+ :type stuff: basestring
+ :returns: charset
+ """
+ # XXX shouldn't we make the scope
+ # of the decorator somewhat more persistent?
+ # ah! yes! and put memory bounds.
+ return get_email_charset(stuff)
+
+ def getSize(self):
+ """
+ Return the total size, in octets, of this message.
+
+ :return: size of the message, in octets
+ :rtype: int
+ """
+ size = None
+ if self.fdoc is not None:
+ fdoc_content = self.fdoc.content
+ size = fdoc_content.get(self.SIZE_KEY, False)
+ else:
+ logger.warning("No FLAGS doc for %s:%s" % (self._mbox,
+ self._uid))
+ if not size:
+ # XXX fallback, should remove when all migrated.
+ size = self.getBodyFile().len
+ return size
+
+ def getHeaders(self, negate, *names):
+ """
+ Retrieve a group of message headers.
+
+ :param names: The names of the headers to retrieve or omit.
+ :type names: tuple of str
+
+ :param negate: If True, indicates that the headers listed in names
+ should be omitted from the return value, rather
+ than included.
+ :type negate: bool
+
+ :return: A mapping of header field names to header field values
+ :rtype: dict
+ """
+ # TODO split in smaller methods
+ # XXX refactor together with MessagePart method
+
+ headers = self._get_headers()
+ if not headers:
+ logger.warning("No headers found")
+ return {str('content-type'): str('')}
+
+ names = map(lambda s: s.upper(), names)
+ if negate:
+ cond = lambda key: key.upper() not in names
+ else:
+ cond = lambda key: key.upper() in names
+
+ if isinstance(headers, list):
+ headers = dict(headers)
+
+ # default to most likely standard
+ charset = find_charset(headers, "utf-8")
+ headers2 = dict()
+ for key, value in headers.items():
+ # twisted imap server expects *some* headers to be lowercase
+ # We could use a CaseInsensitiveDict here...
+ if key.lower() == "content-type":
+ key = key.lower()
+
+ if not isinstance(key, str):
+ key = key.encode(charset, 'replace')
+ if not isinstance(value, str):
+ value = value.encode(charset, 'replace')
+
+ if value.endswith(";"):
+ # bastards
+ value = value[:-1]
+
+ # filter original dict by negate-condition
+ if cond(key):
+ headers2[key] = value
+ return headers2
+
+ def _get_headers(self):
+ """
+ Return the headers dict for this message.
+ """
+ if self.hdoc is not None:
+ hdoc_content = self.hdoc.content
+ headers = hdoc_content.get(self.HEADERS_KEY, {})
+ return headers
+
+ else:
+ logger.warning(
+ "No HEADERS doc for msg %s:%s" % (
+ self._mbox,
+ self._uid))
+
+ def isMultipart(self):
+ """
+ Return True if this message is multipart.
+ """
+ if self.fdoc:
+ fdoc_content = self.fdoc.content
+ is_multipart = fdoc_content.get(self.MULTIPART_KEY, False)
+ return is_multipart
+ else:
+ logger.warning(
+ "No FLAGS doc for msg %s:%s" % (
+ self._mbox,
+ self._uid))
+
+ def getSubPart(self, part):
+ """
+ Retrieve a MIME submessage
+
+ :type part: C{int}
+ :param part: The number of the part to retrieve, indexed from 0.
+ :raise IndexError: Raised if the specified part does not exist.
+ :raise TypeError: Raised if this message is not multipart.
+ :rtype: Any object implementing C{IMessagePart}.
+ :return: The specified sub-part.
+ """
+ if not self.isMultipart():
+ raise TypeError
+ try:
+ pmap_dict = self._get_part_from_parts_map(part + 1)
+ except KeyError:
+ raise IndexError
+ return MessagePart(self._soledad, pmap_dict)
+
+ #
+ # accessors
+ #
+
+ def _get_part_from_parts_map(self, part):
+ """
+ Get a part map from the headers doc
+
+ :raises: KeyError if key does not exist
+ :rtype: dict
+ """
+ if not self.hdoc:
+ logger.warning("Tried to get part but no HDOC found!")
+ return None
+
+ hdoc_content = self.hdoc.content
+ pmap = hdoc_content.get(fields.PARTS_MAP_KEY, {})
+
+ # remember, lads, soledad is using strings in its keys,
+ # not integers!
+ return pmap[str(part)]
+
+ # XXX moved to memory store
+ # move the rest too. ------------------------------------------
+ def _get_flags_doc(self):
+ """
+ Return the document that keeps the flags for this
+ message.
+ """
+ result = {}
+ try:
+ flag_docs = self._soledad.get_from_index(
+ fields.TYPE_MBOX_UID_IDX,
+ fields.TYPE_FLAGS_VAL, self._mbox, str(self._uid))
+ result = first(flag_docs)
+ except Exception as exc:
+ # ugh! Something's broken down there!
+ logger.warning("ERROR while getting flags for UID: %s" % self._uid)
+ logger.exception(exc)
+ finally:
+ return result
+
+ # TODO move to soledadstore instead of accessing soledad directly
+ def _get_headers_doc(self):
+ """
+ Return the document that keeps the headers for this
+ message.
+ """
+ head_docs = self._soledad.get_from_index(
+ fields.TYPE_C_HASH_IDX,
+ fields.TYPE_HEADERS_VAL, str(self.chash))
+ return first(head_docs)
+
+ # TODO move to soledadstore instead of accessing soledad directly
+ def _get_body_doc(self):
+ """
+ Return the document that keeps the body for this
+ message.
+ """
+ hdoc_content = self.hdoc.content
+ body_phash = hdoc_content.get(
+ fields.BODY_KEY, None)
+ if not body_phash:
+ logger.warning("No body phash for this document!")
+ return None
+
+ # XXX get from memstore too...
+ # if memstore: memstore.get_phrash
+ # memstore should keep a dict with weakrefs to the
+ # phash doc...
+
+ if self._container is not None:
+ bdoc = self._container.memstore.get_cdoc_from_phash(body_phash)
+ if not empty(bdoc) and not empty(bdoc.content):
+ return bdoc
+
+ # no memstore, or no body doc found there
+ if self._soledad:
+ body_docs = self._soledad.get_from_index(
+ fields.TYPE_P_HASH_IDX,
+ fields.TYPE_CONTENT_VAL, str(body_phash))
+ return first(body_docs)
+ else:
+ logger.error("No phash in container, and no soledad found!")
+
+ def __getitem__(self, key):
+ """
+ Return an item from the content of the flags document,
+ for convenience.
+
+ :param key: The key
+ :type key: str
+
+ :return: The content value indexed by C{key} or None
+ :rtype: str
+ """
+ return self.fdoc.content.get(key, None)
+
+ def does_exist(self):
+ """
+ Return True if there is actually a flags document for this
+ UID and mbox.
+ """
+ return not empty(self.fdoc)
+
+
+class MessageCollection(WithMsgFields, IndexedDB, MBoxParser):
+ """
+ A collection of messages, surprisingly.
+
+ It is tied to a selected mailbox name that is passed to its constructor.
+ Implements a filter query over the messages contained in a soledad
+ database.
+ """
+
+ # XXX this should be able to produce a MessageSet methinks
+ # could validate these kinds of objects turning them
+ # into a template for the class.
+ FLAGS_DOC = "FLAGS"
+ HEADERS_DOC = "HEADERS"
+ CONTENT_DOC = "CONTENT"
+ """
+ RECENT_DOC is a document that stores a list of the UIDs
+ with the recent flag for this mailbox. It deserves a special treatment
+ because:
+ (1) it cannot be set by the user
+ (2) it's a flag that we set inmediately after a fetch, which is quite
+ often.
+ (3) we need to be able to set/unset it in batches without doing a single
+ write for each element in the sequence.
+ """
+ RECENT_DOC = "RECENT"
+ """
+ HDOCS_SET_DOC is a document that stores a set of the Document-IDs
+ (the u1db index) for all the headers documents for a given mailbox.
+ We use it to prefetch massively all the headers for a mailbox.
+ This is the second massive query, after fetching all the FLAGS, that
+ a MUA will do in a case where we do not have local disk cache.
+ """
+ HDOCS_SET_DOC = "HDOCS_SET"
+
+ templates = {
+
+ # Message Level
+
+ FLAGS_DOC: {
+ fields.TYPE_KEY: fields.TYPE_FLAGS_VAL,
+ fields.UID_KEY: 1, # XXX moe to a local table
+ fields.MBOX_KEY: fields.INBOX_VAL,
+ fields.CONTENT_HASH_KEY: "",
+
+ fields.SEEN_KEY: False,
+ fields.DEL_KEY: False,
+ fields.FLAGS_KEY: [],
+ fields.MULTIPART_KEY: False,
+ fields.SIZE_KEY: 0
+ },
+
+ HEADERS_DOC: {
+ fields.TYPE_KEY: fields.TYPE_HEADERS_VAL,
+ fields.CONTENT_HASH_KEY: "",
+
+ fields.DATE_KEY: "",
+ fields.SUBJECT_KEY: "",
+
+ fields.HEADERS_KEY: {},
+ fields.PARTS_MAP_KEY: {},
+ },
+
+ CONTENT_DOC: {
+ fields.TYPE_KEY: fields.TYPE_CONTENT_VAL,
+ fields.PAYLOAD_HASH_KEY: "",
+ fields.LINKED_FROM_KEY: [],
+ fields.CTYPE_KEY: "", # should index by this too
+
+ # should only get inmutable headers parts
+ # (for indexing)
+ fields.HEADERS_KEY: {},
+ fields.RAW_KEY: "",
+ fields.PARTS_MAP_KEY: {},
+ fields.HEADERS_KEY: {},
+ fields.MULTIPART_KEY: False,
+ },
+
+ # Mailbox Level
+
+ RECENT_DOC: {
+ fields.TYPE_KEY: fields.TYPE_RECENT_VAL,
+ fields.MBOX_KEY: fields.INBOX_VAL,
+ fields.RECENTFLAGS_KEY: [],
+ },
+
+ HDOCS_SET_DOC: {
+ fields.TYPE_KEY: fields.TYPE_HDOCS_SET_VAL,
+ fields.MBOX_KEY: fields.INBOX_VAL,
+ fields.HDOCS_SET_KEY: [],
+ }
+
+
+ }
+
+ # Different locks for wrapping both the u1db document getting/setting
+ # and the property getting/settting in an atomic operation.
+
+ # TODO we would abstract this to a SoledadProperty class
+
+ _rdoc_lock = defaultdict(lambda: threading.Lock())
+ _rdoc_write_lock = defaultdict(lambda: threading.Lock())
+ _rdoc_read_lock = defaultdict(lambda: threading.Lock())
+ _rdoc_property_lock = defaultdict(lambda: threading.Lock())
+
+ _initialized = {}
+
+ def __init__(self, mbox=None, soledad=None, memstore=None):
+ """
+ Constructor for MessageCollection.
+
+ On initialization, we ensure that we have a document for
+ storing the recent flags. The nature of this flag make us wanting
+ to store the set of the UIDs with this flag at the level of the
+ MessageCollection for each mailbox, instead of treating them
+ as a property of each message.
+
+ We are passed an instance of MemoryStore, the same for the
+ SoledadBackedAccount, that we use as a read cache and a buffer
+ for writes.
+
+ :param mbox: the name of the mailbox. It is the name
+ with which we filter the query over the
+ messages database.
+ :type mbox: str
+ :param soledad: Soledad database
+ :type soledad: Soledad instance
+ :param memstore: a MemoryStore instance
+ :type memstore: MemoryStore
+ """
+ leap_assert(mbox, "Need a mailbox name to initialize")
+ leap_assert(mbox.strip() != "", "mbox cannot be blank space")
+ leap_assert(isinstance(mbox, (str, unicode)),
+ "mbox needs to be a string")
+ leap_assert(soledad, "Need a soledad instance to initialize")
+
+ # okay, all in order, keep going...
+
+ self.mbox = self._parse_mailbox_name(mbox)
+
+ # XXX get a SoledadStore passed instead
+ self._soledad = soledad
+ self.memstore = memstore
+
+ self.__rflags = None
+
+ if not self._initialized.get(mbox, False):
+ try:
+ self.initialize_db()
+ # ensure that we have a recent-flags doc
+ self._get_or_create_rdoc()
+ except Exception:
+ logger.debug("Error initializing %r" % (mbox,))
+ else:
+ self._initialized[mbox] = True
+
+ from twisted.internet import reactor
+ self.reactor = reactor
+
+ def _get_empty_doc(self, _type=FLAGS_DOC):
+ """
+ Returns an empty doc for storing different message parts.
+ Defaults to returning a template for a flags document.
+ :return: a dict with the template
+ :rtype: dict
+ """
+ if not _type in self.templates.keys():
+ raise TypeError("Improper type passed to _get_empty_doc")
+ return copy.deepcopy(self.templates[_type])
+
+ def _get_or_create_rdoc(self):
+ """
+ Try to retrieve the recent-flags doc for this MessageCollection,
+ and create one if not found.
+ """
+ # XXX should move this to memstore too
+ with self._rdoc_write_lock[self.mbox]:
+ rdoc = self._get_recent_doc_from_soledad()
+ if rdoc is None:
+ rdoc = self._get_empty_doc(self.RECENT_DOC)
+ if self.mbox != fields.INBOX_VAL:
+ rdoc[fields.MBOX_KEY] = self.mbox
+ self._soledad.create_doc(rdoc)
+
+ @deferred_to_thread
+ def _do_parse(self, raw):
+ """
+ Parse raw message and return it along with
+ relevant information about its outer level.
+
+ This is done in a separate thread, and the callback is passed
+ to `_do_add_msg` method.
+
+ :param raw: the raw message
+ :type raw: StringIO or basestring
+ :return: msg, parts, chash, size, multi
+ :rtype: tuple
+ """
+ msg = message_from_string(raw)
+ parts = walk.get_parts(msg)
+ size = len(raw)
+ chash = sha256.SHA256(raw).hexdigest()
+ multi = msg.is_multipart()
+ return msg, parts, chash, size, multi
+
+ def _populate_flags(self, flags, uid, chash, size, multi):
+ """
+ Return a flags doc.
+
+ XXX Missing DOC -----------
+ """
+ fd = self._get_empty_doc(self.FLAGS_DOC)
+
+ fd[self.MBOX_KEY] = self.mbox
+ fd[self.UID_KEY] = uid
+ fd[self.CONTENT_HASH_KEY] = chash
+ fd[self.SIZE_KEY] = size
+ fd[self.MULTIPART_KEY] = multi
+ if flags:
+ fd[self.FLAGS_KEY] = flags
+ fd[self.SEEN_KEY] = self.SEEN_FLAG in flags
+ fd[self.DEL_KEY] = self.DELETED_FLAG in flags
+ fd[self.RECENT_KEY] = True # set always by default
+ return fd
+
+ def _populate_headr(self, msg, chash, subject, date):
+ """
+ Return a headers doc.
+
+ XXX Missing DOC -----------
+ """
+ headers = defaultdict(list)
+ for k, v in msg.items():
+ headers[k].append(v)
+
+ # "fix" for repeated headers.
+ for k, v in headers.items():
+ newline = "\n%s: " % (k,)
+ headers[k] = newline.join(v)
+
+ lower_headers = lowerdict(headers)
+ msgid = first(MSGID_RE.findall(
+ lower_headers.get('message-id', '')))
+
+ hd = self._get_empty_doc(self.HEADERS_DOC)
+ hd[self.CONTENT_HASH_KEY] = chash
+ hd[self.HEADERS_KEY] = headers
+ hd[self.MSGID_KEY] = msgid
+
+ if not subject and self.SUBJECT_FIELD in headers:
+ hd[self.SUBJECT_KEY] = headers[self.SUBJECT_FIELD]
+ else:
+ hd[self.SUBJECT_KEY] = subject
+
+ if not date and self.DATE_FIELD in headers:
+ hd[self.DATE_KEY] = headers[self.DATE_FIELD]
+ else:
+ hd[self.DATE_KEY] = date
+ return hd
+
+ def _fdoc_already_exists(self, chash):
+ """
+ Check whether we can find a flags doc for this mailbox with the
+ given content-hash. It enforces that we can only have the same maessage
+ listed once for a a given mailbox.
+
+ :param chash: the content-hash to check about.
+ :type chash: basestring
+ :return: False, if it does not exist, or UID.
+ """
+ exist = False
+ exist = self.memstore.get_fdoc_from_chash(chash, self.mbox)
+
+ if not exist:
+ exist = self._get_fdoc_from_chash(chash)
+ if exist and exist.content is not None:
+ return exist.content.get(fields.UID_KEY, "unknown-uid")
+ else:
+ return False
+
+ def add_msg(self, raw, subject=None, flags=None, date=None, uid=None,
+ notify_on_disk=False):
+ """
+ Creates a new message document.
+
+ :param raw: the raw message
+ :type raw: str
+
+ :param subject: subject of the message.
+ :type subject: str
+
+ :param flags: flags
+ :type flags: list
+
+ :param date: the received date for the message
+ :type date: str
+
+ :param uid: the message uid for this mailbox
+ :type uid: int
+
+ :return: a deferred that will be fired with the message
+ uid when the adding succeed.
+ :rtype: deferred
+ """
+ if flags is None:
+ flags = tuple()
+ leap_assert_type(flags, tuple)
+
+ observer = defer.Deferred()
+ d = self._do_parse(raw)
+ d.addCallback(lambda result: self.reactor.callInThread(
+ self._do_add_msg, result, flags, subject, date,
+ notify_on_disk, observer))
+ return observer
+
+ # Called in thread
+ def _do_add_msg(self, parse_result, flags, subject,
+ date, notify_on_disk, observer):
+ """
+ Helper that creates a new message document.
+ Here lives the magic of the leap mail. Well, in soledad, really.
+
+ See `add_msg` docstring for parameter info.
+
+ :param parse_result: a tuple with the results of `self._do_parse`
+ :type parse_result: tuple
+ :param observer: a deferred that will be fired with the message
+ uid when the adding succeed.
+ :type observer: deferred
+ """
+ # TODO signal that we can delete the original message!-----
+ # when all the processing is done.
+
+ # TODO add the linked-from info !
+ # TODO add reference to the original message
+
+ msg, parts, chash, size, multi = parse_result
+
+ # check for uniqueness --------------------------------
+ # Watch out! We're reserving a UID right after this!
+ existing_uid = self._fdoc_already_exists(chash)
+ if existing_uid:
+ msg = self.get_msg_by_uid(existing_uid)
+
+ # We can say the observer that we're done
+ self.reactor.callFromThread(observer.callback, existing_uid)
+ msg.setFlags((fields.DELETED_FLAG,), -1)
+ return
+
+ uid = self.memstore.increment_last_soledad_uid(self.mbox)
+
+ # We can say the observer that we're done at this point, but
+ # before that we should make sure it has no serious consequences
+ # if we're issued, for instance, a fetch command right after...
+ #self.reactor.callFromThread(observer.callback, uid)
+ # if we did the notify, we need to invalidate the deferred
+ # so not to try to fire it twice.
+ #observer = None
+
+ fd = self._populate_flags(flags, uid, chash, size, multi)
+ hd = self._populate_headr(msg, chash, subject, date)
+
+ body_phash_fun = [walk.get_body_phash_simple,
+ walk.get_body_phash_multi][int(multi)]
+ body_phash = body_phash_fun(walk.get_payloads(msg))
+ parts_map = walk.walk_msg_tree(parts, body_phash=body_phash)
+
+ # add parts map to header doc
+ # (body, multi, part_map)
+ for key in parts_map:
+ hd[key] = parts_map[key]
+ del parts_map
+
+ hd = stringify_parts_map(hd)
+
+ # The MessageContainer expects a dict, one-indexed
+ cdocs = dict(enumerate(walk.get_raw_docs(msg, parts), 1))
+
+ self.set_recent_flag(uid)
+ msg_container = MessageWrapper(fd, hd, cdocs)
+ self.memstore.create_message(
+ self.mbox, uid, msg_container,
+ observer=observer, notify_on_disk=notify_on_disk)
+
+ #
+ # getters: specific queries
+ #
+
+ # recent flags
+
+ def _get_recent_flags(self):
+ """
+ An accessor for the recent-flags set for this mailbox.
+ """
+ # XXX check if we should remove this
+ if self.__rflags is not None:
+ return self.__rflags
+
+ if self.memstore is not None:
+ with self._rdoc_lock[self.mbox]:
+ rflags = self.memstore.get_recent_flags(self.mbox)
+ if not rflags:
+ # not loaded in the memory store yet.
+ # let's fetch them from soledad...
+ rdoc = self._get_recent_doc_from_soledad()
+ if rdoc is None:
+ return set([])
+ rflags = set(rdoc.content.get(
+ fields.RECENTFLAGS_KEY, []))
+ # ...and cache them now.
+ self.memstore.load_recent_flags(
+ self.mbox,
+ {'doc_id': rdoc.doc_id, 'set': rflags})
+ return rflags
+
+ def _set_recent_flags(self, value):
+ """
+ Setter for the recent-flags set for this mailbox.
+ """
+ if self.memstore is not None:
+ self.memstore.set_recent_flags(self.mbox, value)
+
+ recent_flags = property(
+ _get_recent_flags, _set_recent_flags,
+ doc="Set of UIDs with the recent flag for this mailbox.")
+
+ def _get_recent_doc_from_soledad(self):
+ """
+ Get recent-flags document from Soledad for this mailbox.
+ :rtype: SoledadDocument or None
+ """
+ curried = partial(
+ self._soledad.get_from_index,
+ fields.TYPE_MBOX_IDX,
+ fields.TYPE_RECENT_VAL, self.mbox)
+ curried.expected = "rdoc"
+ with self._rdoc_read_lock[self.mbox]:
+ return try_unique_query(curried)
+
+ # Property-set modification (protected by a different
+ # lock to give atomicity to the read/write operation)
+
+ def unset_recent_flags(self, uids):
+ """
+ Unset Recent flag for a sequence of uids.
+
+ :param uids: the uids to unset
+ :type uid: sequence
+ """
+ with self._rdoc_property_lock[self.mbox]:
+ self.recent_flags.difference_update(
+ set(uids))
+
+ # Individual flags operations
+
+ def unset_recent_flag(self, uid):
+ """
+ Unset Recent flag for a given uid.
+
+ :param uid: the uid to unset
+ :type uid: int
+ """
+ with self._rdoc_property_lock[self.mbox]:
+ self.recent_flags.difference_update(
+ set([uid]))
+
+ @deferred_to_thread
+ def set_recent_flag(self, uid):
+ """
+ Set Recent flag for a given uid.
+
+ :param uid: the uid to set
+ :type uid: int
+ """
+ with self._rdoc_property_lock[self.mbox]:
+ self.recent_flags = self.recent_flags.union(
+ set([uid]))
+
+ # individual doc getters, message layer.
+
+ def _get_fdoc_from_chash(self, chash):
+ """
+ Return a flags document for this mailbox with a given chash.
+
+ :return: A SoledadDocument containing the Flags Document, or None if
+ the query failed.
+ :rtype: SoledadDocument or None.
+ """
+ curried = partial(
+ self._soledad.get_from_index,
+ fields.TYPE_MBOX_C_HASH_IDX,
+ fields.TYPE_FLAGS_VAL, self.mbox, chash)
+ curried.expected = "fdoc"
+ fdoc = try_unique_query(curried)
+ if fdoc is not None:
+ return fdoc
+ else:
+ # probably this should be the other way round,
+ # ie, try fist on memstore...
+ cf = self.memstore._chash_fdoc_store
+ fdoc = cf[chash][self.mbox]
+ # hey, I just needed to wrap fdoc thing into
+ # a "content" attribute, look a better way...
+ if not empty(fdoc):
+ return MessagePartDoc(
+ new=None, dirty=None, part=None,
+ store=None, doc_id=None,
+ content=fdoc)
+
+ def _get_uid_from_msgidCb(self, msgid):
+ hdoc = None
+ curried = partial(
+ self._soledad.get_from_index,
+ fields.TYPE_MSGID_IDX,
+ fields.TYPE_HEADERS_VAL, msgid)
+ curried.expected = "hdoc"
+ hdoc = try_unique_query(curried)
+
+ # XXX this is only a quick hack to avoid regression
+ # on the "multiple copies of the draft" issue, but
+ # this is currently broken since it's not efficient to
+ # look for this. Should lookup better.
+ # FIXME!
+
+ if hdoc is not None:
+ hdoc_dict = hdoc.content
+
+ else:
+ hdocstore = self.memstore._hdoc_store
+ match = [x for _, x in hdocstore.items() if x['msgid'] == msgid]
+ hdoc_dict = first(match)
+
+ if hdoc_dict is None:
+ logger.warning("Could not find hdoc for msgid %s"
+ % (msgid,))
+ return None
+ msg_chash = hdoc_dict.get(fields.CONTENT_HASH_KEY)
+
+ fdoc = self._get_fdoc_from_chash(msg_chash)
+ if not fdoc:
+ logger.warning("Could not find fdoc for msgid %s"
+ % (msgid,))
+ return None
+ return fdoc.content.get(fields.UID_KEY, None)
+
+ @deferred_to_thread
+ def _get_uid_from_msgid(self, msgid):
+ """
+ Return a UID for a given message-id.
+
+ It first gets the headers-doc for that msg-id, and
+ it found it queries the flags doc for the current mailbox
+ for the matching content-hash.
+
+ :return: A UID, or None
+ """
+ # We need to wait a little bit, cause in some of the cases
+ # the query is received right after we've saved the document,
+ # and we cannot find it otherwise. This seems to be enough.
+
+ # XXX do a deferLater instead ??
+ # XXX is this working?
+ return self._get_uid_from_msgidCb(msgid)
+
+ @deferred_to_thread
+ def set_flags(self, mbox, messages, flags, mode, observer):
+ """
+ Set flags for a sequence of messages.
+
+ :param mbox: the mbox this message belongs to
+ :type mbox: str or unicode
+ :param messages: the messages to iterate through
+ :type messages: sequence
+ :flags: the flags to be set
+ :type flags: tuple
+ :param mode: the mode for setting. 1 is append, -1 is remove, 0 set.
+ :type mode: int
+ :param observer: a deferred that will be called with the dictionary
+ mapping UIDs to flags after the operation has been
+ done.
+ :type observer: deferred
+ """
+ reactor = self.reactor
+ getmsg = self.get_msg_by_uid
+
+ def set_flags(uid, flags, mode):
+ msg = getmsg(uid, mem_only=True, flags_only=True)
+ if msg is not None:
+ return uid, msg.setFlags(flags, mode)
+
+ setted_flags = [set_flags(uid, flags, mode) for uid in messages]
+ result = dict(filter(None, setted_flags))
+
+ reactor.callFromThread(observer.callback, result)
+
+ # getters: generic for a mailbox
+
+ def get_msg_by_uid(self, uid, mem_only=False, flags_only=False):
+ """
+ Retrieves a LeapMessage by UID.
+ This is used primarity in the Mailbox fetch and store methods.
+
+ :param uid: the message uid to query by
+ :type uid: int
+ :param mem_only: a flag that indicates whether this Message should
+ pass a reference to soledad to retrieve missing pieces
+ or not.
+ :type mem_only: bool
+ :param flags_only: whether the message should carry only a reference
+ to the flags document.
+ :type flags_only: bool
+
+ :return: A LeapMessage instance matching the query,
+ or None if not found.
+ :rtype: LeapMessage
+ """
+ msg_container = self.memstore.get_message(
+ self.mbox, uid, flags_only=flags_only)
+
+ if msg_container is not None:
+ if mem_only:
+ msg = LeapMessage(None, uid, self.mbox, collection=self,
+ container=msg_container)
+ else:
+ # We pass a reference to soledad just to be able to retrieve
+ # missing parts that cannot be found in the container, like
+ # the content docs after a copy.
+ msg = LeapMessage(self._soledad, uid, self.mbox,
+ collection=self, container=msg_container)
+ else:
+ msg = LeapMessage(self._soledad, uid, self.mbox, collection=self)
+
+ if not msg.does_exist():
+ return None
+ return msg
+
+ def get_all_docs(self, _type=fields.TYPE_FLAGS_VAL):
+ """
+ Get all documents for the selected mailbox of the
+ passed type. By default, it returns the flag docs.
+
+ If you want acess to the content, use __iter__ instead
+
+ :return: a list of u1db documents
+ :rtype: list of SoledadDocument
+ """
+ if _type not in fields.__dict__.values():
+ raise TypeError("Wrong type passed to get_all_docs")
+
+ if sameProxiedObjects(self._soledad, None):
+ logger.warning('Tried to get messages but soledad is None!')
+ return []
+
+ all_docs = [doc for doc in self._soledad.get_from_index(
+ fields.TYPE_MBOX_IDX,
+ _type, self.mbox)]
+
+ # inneficient, but first let's grok it and then
+ # let's worry about efficiency.
+ # XXX FIXINDEX -- should implement order by in soledad
+ # FIXME ----------------------------------------------
+ return sorted(all_docs, key=lambda item: item.content['uid'])
+
+ def all_soledad_uid_iter(self):
+ """
+ Return an iterator through the UIDs of all messages, sorted in
+ ascending order.
+ """
+ db_uids = set([doc.content[self.UID_KEY] for doc in
+ self._soledad.get_from_index(
+ fields.TYPE_MBOX_IDX,
+ fields.TYPE_FLAGS_VAL, self.mbox)
+ if not empty(doc)])
+ return db_uids
+
+ def all_uid_iter(self):
+ """
+ Return an iterator through the UIDs of all messages, from memory.
+ """
+ mem_uids = self.memstore.get_uids(self.mbox)
+ soledad_known_uids = self.memstore.get_soledad_known_uids(
+ self.mbox)
+ combined = tuple(set(mem_uids).union(soledad_known_uids))
+ return combined
+
+ def get_all_soledad_flag_docs(self):
+ """
+ Return a dict with the content of all the flag documents
+ in soledad store for the given mbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :rtype: dict
+ """
+ # XXX we really could return a reduced version with
+ # just {'uid': (flags-tuple,) since the prefetch is
+ # only oriented to get the flag tuples.
+ all_docs = [(
+ doc.content[self.UID_KEY],
+ dict(doc.content))
+ for doc in
+ self._soledad.get_from_index(
+ fields.TYPE_MBOX_IDX,
+ fields.TYPE_FLAGS_VAL, self.mbox)
+ if not empty(doc.content)]
+ all_flags = dict(all_docs)
+ return all_flags
+
+ def all_headers(self):
+ """
+ Return a dict with all the header documents for this
+ mailbox.
+
+ :rtype: dict
+ """
+ return self.memstore.all_headers(self.mbox)
+
+ def count(self):
+ """
+ Return the count of messages for this mailbox.
+
+ :rtype: int
+ """
+ return self.memstore.count(self.mbox)
+
+ # unseen messages
+
+ def unseen_iter(self):
+ """
+ Get an iterator for the message UIDs with no `seen` flag
+ for this mailbox.
+
+ :return: iterator through unseen message doc UIDs
+ :rtype: iterable
+ """
+ return self.memstore.unseen_iter(self.mbox)
+
+ def count_unseen(self):
+ """
+ Count all messages with the `Unseen` flag.
+
+ :returns: count
+ :rtype: int
+ """
+ return len(list(self.unseen_iter()))
+
+ def get_unseen(self):
+ """
+ Get all messages with the `Unseen` flag
+
+ :returns: a list of LeapMessages
+ :rtype: list
+ """
+ return [LeapMessage(self._soledad, docid, self.mbox)
+ for docid in self.unseen_iter()]
+
+ # recent messages
+
+ # XXX take it from memstore
+ def count_recent(self):
+ """
+ Count all messages with the `Recent` flag.
+ It just retrieves the length of the recent_flags set,
+ which is stored in a specific type of document for
+ this collection.
+
+ :returns: count
+ :rtype: int
+ """
+ return len(self.recent_flags)
+
+ def __len__(self):
+ """
+ Returns the number of messages on this mailbox.
+
+ :rtype: int
+ """
+ return self.count()
+
+ def __iter__(self):
+ """
+ Returns an iterator over all messages.
+
+ :returns: iterator of dicts with content for all messages.
+ :rtype: iterable
+ """
+ return (LeapMessage(self._soledad, docuid, self.mbox)
+ for docuid in self.all_uid_iter())
+
+ def __repr__(self):
+ """
+ Representation string for this object.
+ """
+ return u"<MessageCollection: mbox '%s' (%s)>" % (
+ self.mbox, self.count())
+
+ # XXX should implement __eq__ also !!!
+ # use chash...