summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/messages.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/mail/imap/messages.py')
-rw-r--r--src/leap/mail/imap/messages.py1104
1 files changed, 378 insertions, 726 deletions
diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py
index 34304ea..25fc55f 100644
--- a/src/leap/mail/imap/messages.py
+++ b/src/leap/mail/imap/messages.py
@@ -20,17 +20,15 @@ LeapMessage and MessageCollection.
import copy
import logging
import re
-import time
import threading
import StringIO
-from collections import defaultdict, namedtuple
+from collections import defaultdict
from functools import partial
from twisted.mail import imap4
from twisted.internet import defer
from twisted.python import log
-from u1db import errors as u1db_errors
from zope.interface import implements
from zope.proxy import sameProxiedObjects
@@ -38,33 +36,30 @@ from leap.common.check import leap_assert, leap_assert_type
from leap.common.decorators import memoized_method
from leap.common.mail import get_email_charset
from leap.mail import walk
-from leap.mail.utils import first, find_charset
-from leap.mail.decorators import deferred
+from leap.mail.utils import first, find_charset, lowerdict, empty
+from leap.mail.utils import stringify_parts_map
+from leap.mail.decorators import deferred_to_thread
from leap.mail.imap.index import IndexedDB
from leap.mail.imap.fields import fields, WithMsgFields
+from leap.mail.imap.memorystore import MessageWrapper
+from leap.mail.imap.messageparts import MessagePart
from leap.mail.imap.parser import MailParser, MBoxParser
-from leap.mail.messageflow import IMessageConsumer
logger = logging.getLogger(__name__)
# TODO ------------------------------------------------------------
+# [ ] Add ref to incoming message during add_msg
# [ ] Add linked-from info.
+# * Need a new type of documents: linkage info.
+# * HDOCS are linked from FDOCs (ref to chash)
+# * CDOCS are linked from HDOCS (ref to chash)
+
# [ ] Delete incoming mail only after successful write!
# [ ] Remove UID from syncable db. Store only those indexes locally.
-
-def lowerdict(_dict):
- """
- Return a dict with the keys in lowercase.
-
- :param _dict: the dict to convert
- :rtype: dict
- """
- # TODO should properly implement a CaseInsensitive dict.
- # Look into requests code.
- return dict((key.lower(), value)
- for key, value in _dict.items())
+MSGID_PATTERN = r"""<([\w@.]+)>"""
+MSGID_RE = re.compile(MSGID_PATTERN)
def try_unique_query(curried):
@@ -92,232 +87,6 @@ def try_unique_query(curried):
except Exception as exc:
logger.exception("Unhandled error %r" % exc)
-MSGID_PATTERN = r"""<([\w@.]+)>"""
-MSGID_RE = re.compile(MSGID_PATTERN)
-
-
-class MessagePart(object):
- """
- IMessagePart implementor.
- It takes a subpart message and is able to find
- the inner parts.
-
- Excusatio non petita: see the interface documentation.
- """
-
- implements(imap4.IMessagePart)
-
- def __init__(self, soledad, part_map):
- """
- Initializes the MessagePart.
-
- :param part_map: a dictionary containing the parts map for this
- message
- :type part_map: dict
- """
- # TODO
- # It would be good to pass the uid/mailbox also
- # for references while debugging.
-
- # We have a problem on bulk moves, and is
- # that when the fetch on the new mailbox is done
- # the parts maybe are not complete.
- # So we should be able to fail with empty
- # docs until we solve that. The ideal would be
- # to gather the results of the deferred operations
- # to signal the operation is complete.
- #leap_assert(part_map, "part map dict cannot be null")
- self._soledad = soledad
- self._pmap = part_map
-
- def getSize(self):
- """
- Return the total size, in octets, of this message part.
-
- :return: size of the message, in octets
- :rtype: int
- """
- if not self._pmap:
- return 0
- size = self._pmap.get('size', None)
- if not size:
- logger.error("Message part cannot find size in the partmap")
- return size
-
- def getBodyFile(self):
- """
- Retrieve a file object containing only the body of this message.
-
- :return: file-like object opened for reading
- :rtype: StringIO
- """
- fd = StringIO.StringIO()
- if self._pmap:
- multi = self._pmap.get('multi')
- if not multi:
- phash = self._pmap.get("phash", None)
- else:
- pmap = self._pmap.get('part_map')
- first_part = pmap.get('1', None)
- if first_part:
- phash = first_part['phash']
-
- if not phash:
- logger.warning("Could not find phash for this subpart!")
- payload = str("")
- else:
- payload = self._get_payload_from_document(phash)
-
- else:
- logger.warning("Message with no part_map!")
- payload = str("")
-
- if payload:
- content_type = self._get_ctype_from_document(phash)
- charset = find_charset(content_type)
- logger.debug("Got charset from header: %s" % (charset,))
- if charset is None:
- charset = self._get_charset(payload)
- logger.debug("Got charset: %s" % (charset,))
- try:
- payload = payload.encode(charset)
- except (UnicodeEncodeError, UnicodeDecodeError) as e:
- logger.error("Unicode error, using 'replace'. {0!r}".format(e))
- payload = payload.encode(charset, 'replace')
-
- fd.write(payload)
- fd.seek(0)
- return fd
-
- # TODO cache the phash retrieval
- def _get_payload_from_document(self, phash):
- """
- Gets the message payload from the content document.
-
- :param phash: the payload hash to retrieve by.
- :type phash: basestring
- """
- cdocs = self._soledad.get_from_index(
- fields.TYPE_P_HASH_IDX,
- fields.TYPE_CONTENT_VAL, str(phash))
-
- cdoc = first(cdocs)
- if not cdoc:
- logger.warning(
- "Could not find the content doc "
- "for phash %s" % (phash,))
- payload = cdoc.content.get(fields.RAW_KEY, "")
- return payload
-
- # TODO cache the pahash retrieval
- def _get_ctype_from_document(self, phash):
- """
- Gets the content-type from the content document.
-
- :param phash: the payload hash to retrieve by.
- :type phash: basestring
- """
- cdocs = self._soledad.get_from_index(
- fields.TYPE_P_HASH_IDX,
- fields.TYPE_CONTENT_VAL, str(phash))
-
- cdoc = first(cdocs)
- if not cdoc:
- logger.warning(
- "Could not find the content doc "
- "for phash %s" % (phash,))
- ctype = cdoc.content.get('ctype', "")
- return ctype
-
- @memoized_method
- def _get_charset(self, stuff):
- # TODO put in a common class with LeapMessage
- """
- Gets (guesses?) the charset of a payload.
-
- :param stuff: the stuff to guess about.
- :type stuff: basestring
- :returns: charset
- """
- # XXX existential doubt 2. shouldn't we make the scope
- # of the decorator somewhat more persistent?
- # ah! yes! and put memory bounds.
- return get_email_charset(unicode(stuff))
-
- def getHeaders(self, negate, *names):
- """
- Retrieve a group of message headers.
-
- :param names: The names of the headers to retrieve or omit.
- :type names: tuple of str
-
- :param negate: If True, indicates that the headers listed in names
- should be omitted from the return value, rather
- than included.
- :type negate: bool
-
- :return: A mapping of header field names to header field values
- :rtype: dict
- """
- if not self._pmap:
- logger.warning("No pmap in Subpart!")
- return {}
- headers = dict(self._pmap.get("headers", []))
-
- # twisted imap server expects *some* headers to be lowercase
- # We could use a CaseInsensitiveDict here...
- headers = dict(
- (str(key), str(value)) if key.lower() != "content-type"
- else (str(key.lower()), str(value))
- for (key, value) in headers.items())
-
- names = map(lambda s: s.upper(), names)
- if negate:
- cond = lambda key: key.upper() not in names
- else:
- cond = lambda key: key.upper() in names
-
- # unpack and filter original dict by negate-condition
- filter_by_cond = [
- map(str, (key, val)) for
- key, val in headers.items()
- if cond(key)]
- filtered = dict(filter_by_cond)
- return filtered
-
- def isMultipart(self):
- """
- Return True if this message is multipart.
- """
- if not self._pmap:
- logger.warning("Could not get part map!")
- return False
- multi = self._pmap.get("multi", False)
- return multi
-
- def getSubPart(self, part):
- """
- Retrieve a MIME submessage
-
- :type part: C{int}
- :param part: The number of the part to retrieve, indexed from 0.
- :raise IndexError: Raised if the specified part does not exist.
- :raise TypeError: Raised if this message is not multipart.
- :rtype: Any object implementing C{IMessagePart}.
- :return: The specified sub-part.
- """
- if not self.isMultipart():
- raise TypeError
- sub_pmap = self._pmap.get("part_map", {})
- try:
- part_map = sub_pmap[str(part + 1)]
- except KeyError:
- logger.debug("getSubpart for %s: KeyError" % (part,))
- raise IndexError
-
- # XXX check for validity
- return MessagePart(self._soledad, part_map)
-
class LeapMessage(fields, MailParser, MBoxParser):
"""
@@ -328,12 +97,14 @@ class LeapMessage(fields, MailParser, MBoxParser):
"""
# TODO this has to change.
- # Should index primarily by chash, and keep a local-lonly
+ # Should index primarily by chash, and keep a local-only
# UID table.
implements(imap4.IMessage)
- def __init__(self, soledad, uid, mbox, collection=None):
+ flags_lock = threading.Lock()
+
+ def __init__(self, soledad, uid, mbox, collection=None, container=None):
"""
Initializes a LeapMessage.
@@ -342,32 +113,54 @@ class LeapMessage(fields, MailParser, MBoxParser):
:param uid: the UID for the message.
:type uid: int or basestring
:param mbox: the mbox this message belongs to
- :type mbox: basestring
+ :type mbox: str or unicode
:param collection: a reference to the parent collection object
:type collection: MessageCollection
+ :param container: a IMessageContainer implementor instance
+ :type container: IMessageContainer
"""
MailParser.__init__(self)
self._soledad = soledad
self._uid = int(uid)
self._mbox = self._parse_mailbox_name(mbox)
self._collection = collection
+ self._container = container
self.__chash = None
self.__bdoc = None
+ # XXX make these properties public
+
@property
def _fdoc(self):
"""
An accessor to the flags document.
"""
if all(map(bool, (self._uid, self._mbox))):
- fdoc = self._get_flags_doc()
+ fdoc = None
+ if self._container is not None:
+ fdoc = self._container.fdoc
+ if not fdoc:
+ fdoc = self._get_flags_doc()
if fdoc:
- self.__chash = fdoc.content.get(
+ fdoc_content = fdoc.content
+ self.__chash = fdoc_content.get(
fields.CONTENT_HASH_KEY, None)
return fdoc
@property
+ def _hdoc(self):
+ """
+ An accessor to the headers document.
+ """
+ if self._container is not None:
+ hdoc = self._container.hdoc
+ if hdoc and not empty(hdoc.content):
+ return hdoc
+ # XXX cache this into the memory store !!!
+ return self._get_headers_doc()
+
+ @property
def _chash(self):
"""
An accessor to the content hash for this message.
@@ -380,13 +173,6 @@ class LeapMessage(fields, MailParser, MBoxParser):
return self.__chash
@property
- def _hdoc(self):
- """
- An accessor to the headers document.
- """
- return self._get_headers_doc()
-
- @property
def _bdoc(self):
"""
An accessor to the body document.
@@ -415,39 +201,33 @@ class LeapMessage(fields, MailParser, MBoxParser):
:return: The flags, represented as strings
:rtype: tuple
"""
- if self._uid is None:
- return []
uid = self._uid
- flags = []
+ flags = set([])
fdoc = self._fdoc
if fdoc:
- flags = fdoc.content.get(self.FLAGS_KEY, None)
+ flags = set(fdoc.content.get(self.FLAGS_KEY, None))
msgcol = self._collection
# We treat the recent flag specially: gotten from
# a mailbox-level document.
if msgcol and uid in msgcol.recent_flags:
- flags.append(fields.RECENT_FLAG)
+ flags.add(fields.RECENT_FLAG)
if flags:
flags = map(str, flags)
return tuple(flags)
- # setFlags, addFlags, removeFlags are not in the interface spec
- # but we use them with store command.
+ # setFlags not in the interface spec but we use it with store command.
- def setFlags(self, flags):
+ def setFlags(self, flags, mode):
"""
Sets the flags for this message
- Returns a SoledadDocument that needs to be updated by the caller.
-
:param flags: the flags to update in the message.
:type flags: tuple of str
-
- :return: a SoledadDocument instance
- :rtype: SoledadDocument
+ :param mode: the mode for setting. 1 is append, -1 is remove, 0 set.
+ :type mode: int
"""
leap_assert(isinstance(flags, tuple), "flags need to be a tuple")
log.msg('setting flags: %s (%s)' % (self._uid, flags))
@@ -458,42 +238,36 @@ class LeapMessage(fields, MailParser, MBoxParser):
"Could not find FDOC for %s:%s while setting flags!" %
(self._mbox, self._uid))
return
- doc.content[self.FLAGS_KEY] = flags
- doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags
- doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags
- self._soledad.put_doc(doc)
-
- def addFlags(self, flags):
- """
- Adds flags to this message.
-
- Returns a SoledadDocument that needs to be updated by the caller.
- :param flags: the flags to add to the message.
- :type flags: tuple of str
-
- :return: a SoledadDocument instance
- :rtype: SoledadDocument
- """
- leap_assert(isinstance(flags, tuple), "flags need to be a tuple")
- oldflags = self.getFlags()
- self.setFlags(tuple(set(flags + oldflags)))
-
- def removeFlags(self, flags):
- """
- Remove flags from this message.
-
- Returns a SoledadDocument that needs to be updated by the caller.
-
- :param flags: the flags to be removed from the message.
- :type flags: tuple of str
-
- :return: a SoledadDocument instance
- :rtype: SoledadDocument
- """
- leap_assert(isinstance(flags, tuple), "flags need to be a tuple")
- oldflags = self.getFlags()
- self.setFlags(tuple(set(oldflags) - set(flags)))
+ APPEND = 1
+ REMOVE = -1
+ SET = 0
+
+ with self.flags_lock:
+ current = doc.content[self.FLAGS_KEY]
+ if mode == APPEND:
+ newflags = tuple(set(tuple(current) + flags))
+ elif mode == REMOVE:
+ newflags = tuple(set(current).difference(set(flags)))
+ elif mode == SET:
+ newflags = flags
+
+ # We could defer this, but I think it's better
+ # to put it under the lock...
+ doc.content[self.FLAGS_KEY] = newflags
+ doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags
+ doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags
+
+ if self._collection.memstore is not None:
+ log.msg("putting message in collection")
+ self._collection.memstore.put_message(
+ self._mbox, self._uid,
+ MessageWrapper(fdoc=doc.content, new=False, dirty=True,
+ docs_id={'fdoc': doc.doc_id}))
+ else:
+ # fallback for non-memstore initializations.
+ self._soledad.put_doc(doc)
+ return map(str, newflags)
def getInternalDate(self):
"""
@@ -519,29 +293,42 @@ class LeapMessage(fields, MailParser, MBoxParser):
:return: file-like object opened for reading
:rtype: StringIO
"""
+ def write_fd(body):
+ fd.write(body)
+ fd.seek(0)
+ return fd
+
# TODO refactor with getBodyFile in MessagePart
+
fd = StringIO.StringIO()
- bdoc = self._bdoc
- if bdoc:
- body = self._bdoc.content.get(self.RAW_KEY, "")
- content_type = bdoc.content.get('content-type', "")
+
+ if self._bdoc is not None:
+ bdoc_content = self._bdoc.content
+ if empty(bdoc_content):
+ logger.warning("No BDOC content found for message!!!")
+ return write_fd("")
+
+ body = bdoc_content.get(self.RAW_KEY, "")
+ content_type = bdoc_content.get('content-type', "")
charset = find_charset(content_type)
+ logger.debug('got charset from content-type: %s' % charset)
if charset is None:
charset = self._get_charset(body)
try:
- body = body.encode(charset)
- except UnicodeError as e:
- logger.error("Unicode error, using 'replace'. {0!r}".format(e))
+ if isinstance(body, unicode):
+ body = body.encode(charset)
+ except UnicodeError as exc:
+ logger.error(
+ "Unicode error, using 'replace'. {0!r}".format(exc))
+ logger.debug("Attempted to encode with: %s" % charset)
body = body.encode(charset, 'replace')
+ finally:
+ return write_fd(body)
# We are still returning funky characters from here.
else:
logger.warning("No BDOC found for message.")
- body = str("")
-
- fd.write(body)
- fd.seek(0)
- return fd
+ return write_fd("")
@memoized_method
def _get_charset(self, stuff):
@@ -552,11 +339,10 @@ class LeapMessage(fields, MailParser, MBoxParser):
:type stuff: basestring
:returns: charset
"""
- # TODO get from subpart headers
- # XXX existential doubt 2. shouldn't we make the scope
+ # XXX shouldn't we make the scope
# of the decorator somewhat more persistent?
# ah! yes! and put memory bounds.
- return get_email_charset(unicode(stuff))
+ return get_email_charset(stuff)
def getSize(self):
"""
@@ -567,7 +353,8 @@ class LeapMessage(fields, MailParser, MBoxParser):
"""
size = None
if self._fdoc:
- size = self._fdoc.content.get(self.SIZE_KEY, False)
+ fdoc_content = self._fdoc.content
+ size = fdoc_content.get(self.SIZE_KEY, False)
else:
logger.warning("No FLAGS doc for %s:%s" % (self._mbox,
self._uid))
@@ -592,6 +379,8 @@ class LeapMessage(fields, MailParser, MBoxParser):
:rtype: dict
"""
# TODO split in smaller methods
+ # XXX refactor together with MessagePart method
+
headers = self._get_headers()
if not headers:
logger.warning("No headers found")
@@ -608,11 +397,10 @@ class LeapMessage(fields, MailParser, MBoxParser):
# default to most likely standard
charset = find_charset(headers, "utf-8")
-
- # twisted imap server expects *some* headers to be lowercase
- # XXX refactor together with MessagePart method
headers2 = dict()
for key, value in headers.items():
+ # twisted imap server expects *some* headers to be lowercase
+ # We could use a CaseInsensitiveDict here...
if key.lower() == "content-type":
key = key.lower()
@@ -621,10 +409,13 @@ class LeapMessage(fields, MailParser, MBoxParser):
if not isinstance(value, str):
value = value.encode(charset, 'replace')
+ if value.endswith(";"):
+ # bastards
+ value = value[:-1]
+
# filter original dict by negate-condition
if cond(key):
headers2[key] = value
-
return headers2
def _get_headers(self):
@@ -632,7 +423,8 @@ class LeapMessage(fields, MailParser, MBoxParser):
Return the headers dict for this message.
"""
if self._hdoc is not None:
- headers = self._hdoc.content.get(self.HEADERS_KEY, {})
+ hdoc_content = self._hdoc.content
+ headers = hdoc_content.get(self.HEADERS_KEY, {})
return headers
else:
@@ -646,7 +438,8 @@ class LeapMessage(fields, MailParser, MBoxParser):
Return True if this message is multipart.
"""
if self._fdoc:
- is_multipart = self._fdoc.content.get(self.MULTIPART_KEY, False)
+ fdoc_content = self._fdoc.content
+ is_multipart = fdoc_content.get(self.MULTIPART_KEY, False)
return is_multipart
else:
logger.warning(
@@ -688,9 +481,15 @@ class LeapMessage(fields, MailParser, MBoxParser):
logger.warning("Tried to get part but no HDOC found!")
return None
- pmap = self._hdoc.content.get(fields.PARTS_MAP_KEY, {})
+ hdoc_content = self._hdoc.content
+ pmap = hdoc_content.get(fields.PARTS_MAP_KEY, {})
+
+ # remember, lads, soledad is using strings in its keys,
+ # not integers!
return pmap[str(part)]
+ # XXX moved to memory store
+ # move the rest too. ------------------------------------------
def _get_flags_doc(self):
"""
Return the document that keeps the flags for this
@@ -724,16 +523,31 @@ class LeapMessage(fields, MailParser, MBoxParser):
Return the document that keeps the body for this
message.
"""
- body_phash = self._hdoc.content.get(
+ hdoc_content = self._hdoc.content
+ body_phash = hdoc_content.get(
fields.BODY_KEY, None)
if not body_phash:
logger.warning("No body phash for this document!")
return None
- body_docs = self._soledad.get_from_index(
- fields.TYPE_P_HASH_IDX,
- fields.TYPE_CONTENT_VAL, str(body_phash))
- return first(body_docs)
+ # XXX get from memstore too...
+ # if memstore: memstore.get_phrash
+ # memstore should keep a dict with weakrefs to the
+ # phash doc...
+
+ if self._container is not None:
+ bdoc = self._container.memstore.get_cdoc_from_phash(body_phash)
+ if not empty(bdoc) and not empty(bdoc.content):
+ return bdoc
+
+ # no memstore, or no body doc found there
+ if self._soledad:
+ body_docs = self._soledad.get_from_index(
+ fields.TYPE_P_HASH_IDX,
+ fields.TYPE_CONTENT_VAL, str(body_phash))
+ return first(body_docs)
+ else:
+ logger.error("No phash in container, and no soledad found!")
def __getitem__(self, key):
"""
@@ -748,216 +562,19 @@ class LeapMessage(fields, MailParser, MBoxParser):
"""
return self._fdoc.content.get(key, None)
- # setters
-
- # XXX to be used in the messagecopier interface?!
-
- def set_uid(self, uid):
- """
- Set new uid for this message.
-
- :param uid: the new uid
- :type uid: basestring
- """
- # XXX dangerous! lock?
- self._uid = uid
- d = self._fdoc
- d.content[self.UID_KEY] = uid
- self._soledad.put_doc(d)
-
- def set_mbox(self, mbox):
- """
- Set new mbox for this message.
-
- :param mbox: the new mbox
- :type mbox: basestring
- """
- # XXX dangerous! lock?
- self._mbox = mbox
- d = self._fdoc
- d.content[self.MBOX_KEY] = mbox
- self._soledad.put_doc(d)
-
- # destructor
-
- @deferred
- def remove(self):
- """
- Remove all docs associated with this message.
- """
- # XXX For the moment we are only removing the flags and headers
- # docs. The rest we leave there polluting your hard disk,
- # until we think about a good way of deorphaning.
- # Maybe a crawler of unreferenced docs.
-
- # XXX implement elijah's idea of using a PUT document as a
- # token to ensure consistency in the removal.
-
- uid = self._uid
-
- fd = self._get_flags_doc()
- #hd = self._get_headers_doc()
- #bd = self._get_body_doc()
- #docs = [fd, hd, bd]
-
- docs = [fd]
-
- for d in filter(None, docs):
- try:
- self._soledad.delete_doc(d)
- except Exception as exc:
- logger.error(exc)
- return uid
-
def does_exist(self):
"""
- Return True if there is actually a flags message for this
+ Return True if there is actually a flags document for this
UID and mbox.
"""
- return self._fdoc is not None
-
-
-class ContentDedup(object):
- """
- Message deduplication.
-
- We do a query for the content hashes before writing to our beloved
- sqlcipher backend of Soledad. This means, by now, that:
-
- 1. We will not store the same attachment twice, only the hash of it.
- 2. We will not store the same message body twice, only the hash of it.
-
- The first case is useful if you are always receiving the same old memes
- from unwary friends that still have not discovered that 4chan is the
- generator of the internet. The second will save your day if you have
- initiated session with the same account in two different machines. I also
- wonder why would you do that, but let's respect each other choices, like
- with the religious celebrations, and assume that one day we'll be able
- to run Bitmask in completely free phones. Yes, I mean that, the whole GSM
- Stack.
- """
-
- def _content_does_exist(self, doc):
- """
- Check whether we already have a content document for a payload
- with this hash in our database.
-
- :param doc: tentative body document
- :type doc: dict
- :returns: True if that happens, False otherwise.
- """
- if not doc:
- return False
- phash = doc[fields.PAYLOAD_HASH_KEY]
- attach_docs = self._soledad.get_from_index(
- fields.TYPE_P_HASH_IDX,
- fields.TYPE_CONTENT_VAL, str(phash))
- if not attach_docs:
- return False
-
- if len(attach_docs) != 1:
- logger.warning("Found more than one copy of phash %s!"
- % (phash,))
- logger.debug("Found attachment doc with that hash! Skipping save!")
- return True
-
-
-SoledadWriterPayload = namedtuple(
- 'SoledadWriterPayload', ['mode', 'payload'])
-
-# TODO we could consider using enum here:
-# https://pypi.python.org/pypi/enum
-
-SoledadWriterPayload.CREATE = 1
-SoledadWriterPayload.PUT = 2
-SoledadWriterPayload.CONTENT_CREATE = 3
-
-
-"""
-SoledadDocWriter was used to avoid writing to the db from multiple threads.
-Its use here has been deprecated in favor of a local rw_lock in the client.
-But we might want to reuse in in the near future to implement priority queues.
-"""
-
-
-class SoledadDocWriter(object):
- """
- This writer will create docs serially in the local soledad database.
- """
-
- implements(IMessageConsumer)
-
- def __init__(self, soledad):
- """
- Initialize the writer.
-
- :param soledad: the soledad instance
- :type soledad: Soledad
- """
- self._soledad = soledad
-
- def _get_call_for_item(self, item):
- """
- Return the proper call type for a given item.
-
- :param item: one of the types defined under the
- attributes of SoledadWriterPayload
- :type item: int
- """
- call = None
- payload = item.payload
-
- if item.mode == SoledadWriterPayload.CREATE:
- call = self._soledad.create_doc
- elif (item.mode == SoledadWriterPayload.CONTENT_CREATE
- and not self._content_does_exist(payload)):
- call = self._soledad.create_doc
- elif item.mode == SoledadWriterPayload.PUT:
- call = self._soledad.put_doc
- return call
-
- def _process(self, queue):
- """
- Return the item and the proper call type for the next
- item in the queue if any.
+ return not empty(self._fdoc)
- :param queue: the queue from where we'll pick item.
- :type queue: Queue
- """
- item = queue.get()
- call = self._get_call_for_item(item)
- return item, call
-
- def consume(self, queue):
- """
- Creates a new document in soledad db.
-
- :param queue: queue to get item from, with content of the document
- to be inserted.
- :type queue: Queue
- """
- empty = queue.empty()
- while not empty:
- item, call = self._process(queue)
-
- if call:
- # XXX should handle the delete case
- # should handle errors
- try:
- call(item.payload)
- except u1db_errors.RevisionConflict as exc:
- logger.error("Error: %r" % (exc,))
- raise exc
- empty = queue.empty()
-
-
-class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
- ContentDedup):
+class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
"""
A collection of messages, surprisingly.
- It is tied to a selected mailbox name that is passed to constructor.
+ It is tied to a selected mailbox name that is passed to its constructor.
Implements a filter query over the messages contained in a soledad
database.
"""
@@ -1058,7 +675,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
_hdocset_lock = threading.Lock()
_hdocset_property_lock = threading.Lock()
- def __init__(self, mbox=None, soledad=None):
+ def __init__(self, mbox=None, soledad=None, memstore=None):
"""
Constructor for MessageCollection.
@@ -1068,13 +685,18 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
MessageCollection for each mailbox, instead of treating them
as a property of each message.
+ We are passed an instance of MemoryStore, the same for the
+ SoledadBackedAccount, that we use as a read cache and a buffer
+ for writes.
+
:param mbox: the name of the mailbox. It is the name
with which we filter the query over the
- messages database
+ messages database.
:type mbox: str
-
:param soledad: Soledad database
:type soledad: Soledad instance
+ :param memstore: a MemoryStore instance
+ :type memstore: MemoryStore
"""
MailParser.__init__(self)
leap_assert(mbox, "Need a mailbox name to initialize")
@@ -1084,15 +706,22 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
leap_assert(soledad, "Need a soledad instance to initialize")
# okay, all in order, keep going...
+
self.mbox = self._parse_mailbox_name(mbox)
+
+ # XXX get a SoledadStore passed instead
self._soledad = soledad
+ self.memstore = memstore
+
self.__rflags = None
self.__hdocset = None
self.initialize_db()
# ensure that we have a recent-flags and a hdocs-sec doc
self._get_or_create_rdoc()
- self._get_or_create_hdocset()
+
+ # Not for now...
+ #self._get_or_create_hdocset()
def _get_empty_doc(self, _type=FLAGS_DOC):
"""
@@ -1210,17 +839,21 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
:type chash: basestring
:return: False, if it does not exist, or UID.
"""
- exist = self._get_fdoc_from_chash(chash)
+ exist = False
+ if self.memstore is not None:
+ exist = self.memstore.get_fdoc_from_chash(chash, self.mbox)
+
+ if not exist:
+ exist = self._get_fdoc_from_chash(chash)
if exist:
return exist.content.get(fields.UID_KEY, "unknown-uid")
else:
return False
- @deferred
- def add_msg(self, raw, subject=None, flags=None, date=None, uid=1):
+ def add_msg(self, raw, subject=None, flags=None, date=None, uid=None,
+ notify_on_disk=False):
"""
Creates a new message document.
- Here lives the magic of the leap mail. Well, in soledad, really.
:param raw: the raw message
:type raw: str
@@ -1236,27 +869,63 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
:param uid: the message uid for this mailbox
:type uid: int
- """
- # TODO signal that we can delete the original message!-----
- # when all the processing is done.
-
- # TODO add the linked-from info !
+ :return: a deferred that will be fired with the message
+ uid when the adding succeed.
+ :rtype: deferred
+ """
logger.debug('adding message')
if flags is None:
flags = tuple()
leap_assert_type(flags, tuple)
+ d = defer.Deferred()
+ self._do_add_msg(raw, flags, subject, date, notify_on_disk, d)
+ return d
+
+ # We SHOULD defer this (or the heavy load here) to the thread pool,
+ # but it gives troubles with the QSocketNotifier used by Qt...
+ def _do_add_msg(self, raw, flags, subject, date, notify_on_disk, observer):
+ """
+ Helper that creates a new message document.
+ Here lives the magic of the leap mail. Well, in soledad, really.
+
+ See `add_msg` docstring for parameter info.
+
+ :param observer: a deferred that will be fired with the message
+ uid when the adding succeed.
+ :type observer: deferred
+ """
+ # TODO signal that we can delete the original message!-----
+ # when all the processing is done.
+
+ # TODO add the linked-from info !
+ # TODO add reference to the original message
+
# parse
msg, chash, size, multi = self._do_parse(raw)
- # check for uniqueness.
- if self._fdoc_already_exists(chash):
- logger.warning("We already have that message in this mailbox.")
- # note that this operation will leave holes in the UID sequence,
- # but we're gonna change that all the same for a local-only table.
- # so not touch it by the moment.
- return False
+ # check for uniqueness --------------------------------
+ # XXX profiler says that this test is costly.
+ # So we probably should just do an in-memory check and
+ # move the complete check to the soledad writer?
+ # Watch out! We're reserving a UID right after this!
+ existing_uid = self._fdoc_already_exists(chash)
+ if existing_uid:
+ logger.warning("We already have that message in this "
+ "mailbox, unflagging as deleted")
+ uid = existing_uid
+ msg = self.get_msg_by_uid(uid)
+ msg.setFlags((fields.DELETED_FLAG,), -1)
+
+ # XXX if this is deferred to thread again we should not use
+ # the callback in the deferred thread, but return and
+ # call the callback from the caller fun...
+ observer.callback(uid)
+ return
+
+ uid = self.memstore.increment_last_soledad_uid(self.mbox)
+ logger.info("ADDING MSG WITH UID: %s" % uid)
fd = self._populate_flags(flags, uid, chash, size, multi)
hd = self._populate_headr(msg, chash, subject, date)
@@ -1273,48 +942,16 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
hd[key] = parts_map[key]
del parts_map
- # Saving ----------------------------------------
- self.set_recent_flag(uid)
-
- # first, regular docs: flags and headers
- self._soledad.create_doc(fd)
- # XXX should check for content duplication on headers too
- # but with chash. !!!
- hdoc = self._soledad.create_doc(hd)
- # We add the newly created hdoc to the fast-access set of
- # headers documents associated with the mailbox.
- self.add_hdocset_docid(hdoc.doc_id)
-
- # and last, but not least, try to create
- # content docs if not already there.
- cdocs = walk.get_raw_docs(msg, parts)
- for cdoc in cdocs:
- if not self._content_does_exist(cdoc):
- self._soledad.create_doc(cdoc)
+ hd = stringify_parts_map(hd)
- def _remove_cb(self, result):
- return result
-
- def remove_all_deleted(self):
- """
- Removes all messages flagged as deleted.
- """
- delete_deferl = []
- for msg in self.get_deleted():
- delete_deferl.append(msg.remove())
- d1 = defer.gatherResults(delete_deferl, consumeErrors=True)
- d1.addCallback(self._remove_cb)
- return d1
+ # The MessageContainer expects a dict, one-indexed
+ cdocs = dict(enumerate(walk.get_raw_docs(msg, parts), 1))
- def remove(self, msg):
- """
- Remove a given msg.
- :param msg: the message to be removed
- :type msg: LeapMessage
- """
- d = msg.remove()
- d.addCallback(self._remove_cb)
- return d
+ self.set_recent_flag(uid)
+ msg_container = MessageWrapper(fd, hd, cdocs)
+ self.memstore.create_message(self.mbox, uid, msg_container,
+ observer=observer,
+ notify_on_disk=notify_on_disk)
#
# getters: specific queries
@@ -1326,32 +963,59 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
"""
An accessor for the recent-flags set for this mailbox.
"""
- if not self.__rflags:
+ # XXX check if we should remove this
+ if self.__rflags is not None:
+ return self.__rflags
+
+ if self.memstore is not None:
with self._rdoc_lock:
- rdoc = self._get_recent_doc()
- self.__rflags = set(rdoc.content.get(
- fields.RECENTFLAGS_KEY, []))
- return self.__rflags
+ rflags = self.memstore.get_recent_flags(self.mbox)
+ if not rflags:
+ # not loaded in the memory store yet.
+ # let's fetch them from soledad...
+ rdoc = self._get_recent_doc()
+ rflags = set(rdoc.content.get(
+ fields.RECENTFLAGS_KEY, []))
+ # ...and cache them now.
+ self.memstore.load_recent_flags(
+ self.mbox,
+ {'doc_id': rdoc.doc_id, 'set': rflags})
+ return rflags
+
+ #else:
+ # fallback for cases without memory store
+ #with self._rdoc_lock:
+ #rdoc = self._get_recent_doc()
+ #self.__rflags = set(rdoc.content.get(
+ #fields.RECENTFLAGS_KEY, []))
+ #return self.__rflags
def _set_recent_flags(self, value):
"""
Setter for the recent-flags set for this mailbox.
"""
- with self._rdoc_lock:
- rdoc = self._get_recent_doc()
- newv = set(value)
- self.__rflags = newv
- rdoc.content[fields.RECENTFLAGS_KEY] = list(newv)
- # XXX should deferLater 0 it?
- self._soledad.put_doc(rdoc)
+ if self.memstore is not None:
+ self.memstore.set_recent_flags(self.mbox, value)
+
+ #else:
+ # fallback for cases without memory store
+ #with self._rdoc_lock:
+ #rdoc = self._get_recent_doc()
+ #newv = set(value)
+ #self.__rflags = newv
+ #rdoc.content[fields.RECENTFLAGS_KEY] = list(newv)
+ # XXX should deferLater 0 it?
+ #self._soledad.put_doc(rdoc)
recent_flags = property(
_get_recent_flags, _set_recent_flags,
doc="Set of UIDs with the recent flag for this mailbox.")
+ # XXX change naming, indicate soledad query.
def _get_recent_doc(self):
"""
- Get recent-flags document for this mailbox.
+ Get recent-flags document from Soledad for this mailbox.
+ :rtype: SoledadDocument or None
"""
curried = partial(
self._soledad.get_from_index,
@@ -1367,100 +1031,39 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
def unset_recent_flags(self, uids):
"""
Unset Recent flag for a sequence of uids.
+
+ :param uids: the uids to unset
+ :type uid: sequence
"""
with self._rdoc_property_lock:
- self.recent_flags = self.recent_flags.difference(
+ self.recent_flags.difference_update(
set(uids))
+ # Individual flags operations
+
def unset_recent_flag(self, uid):
"""
Unset Recent flag for a given uid.
+
+ :param uid: the uid to unset
+ :type uid: int
"""
with self._rdoc_property_lock:
- self.recent_flags = self.recent_flags.difference(
+ self.recent_flags.difference_update(
set([uid]))
+ @deferred_to_thread
def set_recent_flag(self, uid):
"""
Set Recent flag for a given uid.
+
+ :param uid: the uid to set
+ :type uid: int
"""
with self._rdoc_property_lock:
self.recent_flags = self.recent_flags.union(
set([uid]))
- # headers-docs-set
-
- def _get_hdocset(self):
- """
- An accessor for the hdocs-set for this mailbox.
- """
- if not self.__hdocset:
- with self._hdocset_lock:
- hdocset_doc = self._get_hdocset_doc()
- value = set(hdocset_doc.content.get(
- fields.HDOCS_SET_KEY, []))
- self.__hdocset = value
- return self.__hdocset
-
- def _set_hdocset(self, value):
- """
- Setter for the hdocs-set for this mailbox.
- """
- with self._hdocset_lock:
- hdocset_doc = self._get_hdocset_doc()
- newv = set(value)
- self.__hdocset = newv
- hdocset_doc.content[fields.HDOCS_SET_KEY] = list(newv)
- # XXX should deferLater 0 it?
- self._soledad.put_doc(hdocset_doc)
-
- _hdocset = property(
- _get_hdocset, _set_hdocset,
- doc="Set of Document-IDs for the headers docs associated "
- "with this mailbox.")
-
- def _get_hdocset_doc(self):
- """
- Get hdocs-set document for this mailbox.
- """
- curried = partial(
- self._soledad.get_from_index,
- fields.TYPE_MBOX_IDX,
- fields.TYPE_HDOCS_SET_VAL, self.mbox)
- curried.expected = "hdocset"
- hdocset_doc = try_unique_query(curried)
- return hdocset_doc
-
- # Property-set modification (protected by a different
- # lock to give atomicity to the read/write operation)
-
- def remove_hdocset_docids(self, docids):
- """
- Remove the given document IDs from the set of
- header-documents associated with this mailbox.
- """
- with self._hdocset_property_lock:
- self._hdocset = self._hdocset.difference(
- set(docids))
-
- def remove_hdocset_docid(self, docid):
- """
- Remove the given document ID from the set of
- header-documents associated with this mailbox.
- """
- with self._hdocset_property_lock:
- self._hdocset = self._hdocset.difference(
- set([docid]))
-
- def add_hdocset_docid(self, docid):
- """
- Add the given document ID to the set of
- header-documents associated with this mailbox.
- """
- with self._hdocset_property_lock:
- self._hdocset = self._hdocset.union(
- set([docid]))
-
# individual doc getters, message layer.
def _get_fdoc_from_chash(self, chash):
@@ -1499,7 +1102,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
return None
return fdoc.content.get(fields.UID_KEY, None)
- @deferred
+ @deferred_to_thread
def _get_uid_from_msgid(self, msgid):
"""
Return a UID for a given message-id.
@@ -1515,24 +1118,83 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
# and we cannot find it otherwise. This seems to be enough.
# XXX do a deferLater instead ??
- time.sleep(0.3)
+ # XXX is this working?
return self._get_uid_from_msgidCb(msgid)
+ def set_flags(self, mbox, messages, flags, mode, observer):
+ """
+ Set flags for a sequence of messages.
+
+ :param mbox: the mbox this message belongs to
+ :type mbox: str or unicode
+ :param messages: the messages to iterate through
+ :type messages: sequence
+ :flags: the flags to be set
+ :type flags: tuple
+ :param mode: the mode for setting. 1 is append, -1 is remove, 0 set.
+ :type mode: int
+ :param observer: a deferred that will be called with the dictionary
+ mapping UIDs to flags after the operation has been
+ done.
+ :type observer: deferred
+ """
+ # XXX we could defer *this* to thread pool, and gather results...
+ # XXX use deferredList
+
+ deferreds = []
+ for msg_id in messages:
+ deferreds.append(
+ self._set_flag_for_uid(msg_id, flags, mode))
+
+ def notify(result):
+ observer.callback(dict(result))
+ d1 = defer.gatherResults(deferreds, consumeErrors=True)
+ d1.addCallback(notify)
+
+ @deferred_to_thread
+ def _set_flag_for_uid(self, msg_id, flags, mode):
+ """
+ Run the set_flag operation in the thread pool.
+ """
+ log.msg("MSG ID = %s" % msg_id)
+ msg = self.get_msg_by_uid(msg_id, mem_only=True, flags_only=True)
+ if msg is not None:
+ return msg_id, msg.setFlags(flags, mode)
+
# getters: generic for a mailbox
- def get_msg_by_uid(self, uid):
+ def get_msg_by_uid(self, uid, mem_only=False, flags_only=False):
"""
Retrieves a LeapMessage by UID.
This is used primarity in the Mailbox fetch and store methods.
:param uid: the message uid to query by
:type uid: int
+ :param mem_only: a flag that indicates whether this Message should
+ pass a reference to soledad to retrieve missing pieces
+ or not.
+ :type mem_only: bool
+ :param flags_only: whether the message should carry only a reference
+ to the flags document.
+ :type flags_only: bool
:return: A LeapMessage instance matching the query,
or None if not found.
:rtype: LeapMessage
"""
- msg = LeapMessage(self._soledad, uid, self.mbox, collection=self)
+ msg_container = self.memstore.get_message(self.mbox, uid, flags_only)
+ if msg_container is not None:
+ if mem_only:
+ msg = LeapMessage(None, uid, self.mbox, collection=self,
+ container=msg_container)
+ else:
+ # We pass a reference to soledad just to be able to retrieve
+ # missing parts that cannot be found in the container, like
+ # the content docs after a copy.
+ msg = LeapMessage(self._soledad, uid, self.mbox,
+ collection=self, container=msg_container)
+ else:
+ msg = LeapMessage(self._soledad, uid, self.mbox, collection=self)
if not msg.does_exist():
return None
return msg
@@ -1564,40 +1226,50 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
# FIXME ----------------------------------------------
return sorted(all_docs, key=lambda item: item.content['uid'])
- def all_uid_iter(self):
+ def all_soledad_uid_iter(self):
"""
- Return an iterator trhough the UIDs of all messages, sorted in
+ Return an iterator through the UIDs of all messages, sorted in
ascending order.
"""
- # XXX we should get this from the uid table, local-only
- all_uids = (doc.content[self.UID_KEY] for doc in
- self._soledad.get_from_index(
- fields.TYPE_MBOX_IDX,
- fields.TYPE_FLAGS_VAL, self.mbox))
- return (u for u in sorted(all_uids))
+ db_uids = set([doc.content[self.UID_KEY] for doc in
+ self._soledad.get_from_index(
+ fields.TYPE_MBOX_IDX,
+ fields.TYPE_FLAGS_VAL, self.mbox)])
+ return db_uids
- def reset_last_uid(self, param):
+ def all_uid_iter(self):
"""
- Set the last uid to the highest uid found.
- Used while expunging, passed as a callback.
+ Return an iterator through the UIDs of all messages, from memory.
"""
- try:
- self.last_uid = max(self.all_uid_iter()) + 1
- except ValueError:
- # empty sequence
- pass
- return param
+ if self.memstore is not None:
+ mem_uids = self.memstore.get_uids(self.mbox)
+ soledad_known_uids = self.memstore.get_soledad_known_uids(
+ self.mbox)
+ combined = tuple(set(mem_uids).union(soledad_known_uids))
+ return combined
+ # XXX MOVE to memstore
def all_flags(self):
"""
Return a dict with all flags documents for this mailbox.
"""
+ # XXX get all from memstore and cache it there
+ # FIXME should get all uids, get them fro memstore,
+ # and get only the missing ones from disk.
+
all_flags = dict(((
doc.content[self.UID_KEY],
doc.content[self.FLAGS_KEY]) for doc in
self._soledad.get_from_index(
fields.TYPE_MBOX_IDX,
fields.TYPE_FLAGS_VAL, self.mbox)))
+ if self.memstore is not None:
+ uids = self.memstore.get_uids(self.mbox)
+ docs = ((uid, self.memstore.get_message(self.mbox, uid))
+ for uid in uids)
+ for uid, doc in docs:
+ all_flags[uid] = doc.fdoc.content[self.FLAGS_KEY]
+
return all_flags
def all_flags_chash(self):
@@ -1630,9 +1302,12 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
:rtype: int
"""
+ # XXX We should cache this in memstore too until next write...
count = self._soledad.get_count_from_index(
fields.TYPE_MBOX_IDX,
fields.TYPE_FLAGS_VAL, self.mbox)
+ if self.memstore is not None:
+ count += self.memstore.count_new()
return count
# unseen messages
@@ -1674,6 +1349,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
# recent messages
+ # XXX take it from memstore
def count_recent(self):
"""
Count all messages with the `Recent` flag.
@@ -1686,30 +1362,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
"""
return len(self.recent_flags)
- # deleted messages
-
- def deleted_iter(self):
- """
- Get an iterator for the message UIDs with `deleted` flag.
-
- :return: iterator through deleted message docs
- :rtype: iterable
- """
- return (doc.content[self.UID_KEY] for doc in
- self._soledad.get_from_index(
- fields.TYPE_MBOX_DEL_IDX,
- fields.TYPE_FLAGS_VAL, self.mbox, '1'))
-
- def get_deleted(self):
- """
- Get all messages with the `Deleted` flag.
-
- :returns: a generator of LeapMessages
- :rtype: generator
- """
- return (LeapMessage(self._soledad, docid, self.mbox)
- for docid in self.deleted_iter())
-
def __len__(self):
"""
Returns the number of messages on this mailbox.