summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTomás Touceda <chiiph@leap.se>2014-01-17 13:21:49 -0300
committerTomás Touceda <chiiph@leap.se>2014-01-17 13:21:49 -0300
commit6dee4ec9790f25335d18ec3b13b7df45d2a8c98f (patch)
treebfbb330f27069b0e4003e2f68a4f71c786b71d70
parente3692d50ca2fa4110ba37322b1f46d71d93ac135 (diff)
parent9ef1cd79397d811575826025b924c615e6ce2aa4 (diff)
Merge remote-tracking branch 'refs/remotes/kali/feature/separate-recent-flag-doc' into develop
-rw-r--r--src/leap/mail/imap/fields.py4
-rw-r--r--src/leap/mail/imap/mailbox.py130
-rw-r--r--src/leap/mail/imap/messages.py386
-rw-r--r--src/leap/mail/imap/service/imap.py38
4 files changed, 421 insertions, 137 deletions
diff --git a/src/leap/mail/imap/fields.py b/src/leap/mail/imap/fields.py
index 3d2ac92..886ee63 100644
--- a/src/leap/mail/imap/fields.py
+++ b/src/leap/mail/imap/fields.py
@@ -60,6 +60,8 @@ class WithMsgFields(object):
SUBSCRIBED_KEY = "subscribed"
RW_KEY = "rw"
LAST_UID_KEY = "lastuid"
+ RECENTFLAGS_KEY = "rct"
+ HDOCS_SET_KEY = "hdocset"
# Document Type, for indexing
TYPE_KEY = "type"
@@ -67,6 +69,8 @@ class WithMsgFields(object):
TYPE_FLAGS_VAL = "flags"
TYPE_HEADERS_VAL = "head"
TYPE_CONTENT_VAL = "cnt"
+ TYPE_RECENT_VAL = "rct"
+ TYPE_HDOCS_SET_VAL = "hdocset"
INBOX_VAL = "inbox"
diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py
index cf09bc4..b186e75 100644
--- a/src/leap/mail/imap/mailbox.py
+++ b/src/leap/mail/imap/mailbox.py
@@ -398,18 +398,19 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
flags = tuple(str(flag) for flag in flags)
d = self._do_add_message(message, flags=flags, date=date, uid=uid_next)
- d.addCallback(self._notify_new)
return d
- @deferred
def _do_add_message(self, message, flags, date, uid):
"""
Calls to the messageCollection add_msg method (deferred to thread).
Invoked from addMessage.
"""
- self.messages.add_msg(message, flags=flags, date=date, uid=uid)
+ d = self.messages.add_msg(message, flags=flags, date=date, uid=uid)
+ # XXX notify after batch APPEND?
+ d.addCallback(self.notify_new)
+ return d
- def _notify_new(self, *args):
+ def notify_new(self, *args):
"""
Notify of new messages to all the listeners.
@@ -463,8 +464,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
if not self.isWriteable():
raise imap4.ReadOnlyMailbox
d = self.messages.remove_all_deleted()
- d.addCallback(self.messages.reset_last_uid)
d.addCallback(self._expunge_cb)
+ d.addCallback(self.messages.reset_last_uid)
+
+ # XXX DEBUG -------------------
+ # FIXME !!!
+ # XXX should remove the hdocset too!!!
return d
def _bound_seq(self, messages_asked):
@@ -480,7 +485,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
iter(messages_asked)
except TypeError:
# looks like we cannot iterate
- messages_asked.last = self.last_uid
+ try:
+ messages_asked.last = self.last_uid
+ except ValueError:
+ pass
return messages_asked
def _filter_msg_seq(self, messages_asked):
@@ -516,8 +524,6 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:rtype: A tuple of two-tuples of message sequence numbers and
LeapMessage
"""
- from twisted.internet import reactor
-
# For the moment our UID is sequential, so we
# can treat them all the same.
# Change this to the flag that twisted expects when we
@@ -528,23 +534,14 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
messages_asked = self._bound_seq(messages_asked)
seq_messg = self._filter_msg_seq(messages_asked)
-
- def getmsg(msgid):
- if self.isWriteable():
- deferLater(reactor, 2, self._unset_recent_flag, messages_asked)
- return self.messages.get_msg_by_uid(msgid)
+ getmsg = lambda uid: self.messages.get_msg_by_uid(uid)
# for sequence numbers (uid = 0)
if sequence:
logger.debug("Getting msg by index: INEFFICIENT call!")
raise NotImplementedError
-
else:
result = ((msgid, getmsg(msgid)) for msgid in seq_messg)
-
- # this should really be called as a final callback of
- # the do_FETCH method...
- deferLater(reactor, 1, self._signal_unread_to_ui)
return result
@deferred
@@ -557,7 +554,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
Given how LEAP Mail is supposed to work without local cache,
this query is going to be quite common, and also we expect
it to be in the form 1:* at the beginning of a session, so
- it's not bad to fetch all the flags doc at once.
+ it's not bad to fetch all the FLAGS docs at once.
:param messages_asked: IDs of the messages to retrieve information
about
@@ -592,36 +589,55 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
return result
@deferred
- def _unset_recent_flag(self, message_uid):
+ def fetch_headers(self, messages_asked, uid):
"""
- Unsets `Recent` flag from a tuple of messages.
- Called from fetch.
+ A fast method to fetch all headers, tricking just the
+ needed subset of the MIME interface that's needed to satisfy
+ a generic HEADERS query.
- From RFC, about `Recent`:
+ Given how LEAP Mail is supposed to work without local cache,
+ this query is going to be quite common, and also we expect
+ it to be in the form 1:* at the beginning of a session, so
+ **MAYBE** it's not too bad to fetch all the HEADERS docs at once.
- Message is "recently" arrived in this mailbox. This session
- is the first session to have been notified about this
- message; if the session is read-write, subsequent sessions
- will not see \Recent set for this message. This flag can not
- be altered by the client.
+ :param messages_asked: IDs of the messages to retrieve information
+ about
+ :type messages_asked: MessageSet
- If it is not possible to determine whether or not this
- session is the first session to be notified about a message,
- then that message SHOULD be considered recent.
+ :param uid: If true, the IDs are UIDs. They are message sequence IDs
+ otherwise.
+ :type uid: bool
- :param message_uids: the sequence of msg ids to update.
- :type message_uids: sequence
+ :return: A tuple of two-tuples of message sequence numbers and
+ headersPart, which is a only a partial implementation of
+ MessagePart.
+ :rtype: tuple
"""
- # XXX deprecate this!
- # move to a mailbox-level call, and do it in batches!
+ class headersPart(object):
+ def __init__(self, uid, headers):
+ self.uid = uid
+ self.headers = headers
- log.msg('unsetting recent flag: %s' % message_uid)
- msg = self.messages.get_msg_by_uid(message_uid)
- msg.removeFlags((fields.RECENT_FLAG,))
- self._signal_unread_to_ui()
+ def getUID(self):
+ return self.uid
- @deferred
- def _signal_unread_to_ui(self):
+ def getHeaders(self, _):
+ return dict(
+ (str(key), str(value))
+ for key, value in
+ self.headers.items())
+
+ messages_asked = self._bound_seq(messages_asked)
+ seq_messg = self._filter_msg_seq(messages_asked)
+
+ all_chash = self.messages.all_flags_chash()
+ all_headers = self.messages.all_headers()
+ result = ((msgid, headersPart(
+ msgid, all_headers.get(all_chash.get(msgid, 'nil'), {})))
+ for msgid in seq_messg)
+ return result
+
+ def signal_unread_to_ui(self):
"""
Sends unread event to ui.
"""
@@ -658,7 +674,6 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:raise ReadOnlyMailbox: Raised if this mailbox is not open for
read-write.
"""
- from twisted.internet import reactor
# XXX implement also sequence (uid = 0)
# XXX we should prevent cclient from setting Recent flag.
leap_assert(not isinstance(flags, basestring),
@@ -686,8 +701,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
msg.setFlags(flags)
result[msg_id] = msg.getFlags()
+ # After changing flags, we want to signal again to the
+ # UI because the number of unread might have changed.
+ # Hoever, we should probably limit this to INBOX only?
# this should really be called as a final callback of
- # the do_FETCH method...
+ # the do_STORE method...
+ from twisted.internet import reactor
deferLater(reactor, 1, self._signal_unread_to_ui)
return result
@@ -741,6 +760,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
"""
Copy the given message object into this mailbox.
"""
+ from twisted.internet import reactor
uid_next = self.getUIDNext()
msg = messageObject
@@ -753,17 +773,20 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
new_fdoc = copy.deepcopy(fdoc.content)
new_fdoc[self.UID_KEY] = uid_next
new_fdoc[self.MBOX_KEY] = self.mbox
+ self._do_add_doc(new_fdoc)
- d = self._do_add_doc(new_fdoc)
- # XXX notify should be done when all the
- # copies in the batch are finished.
- d.addCallback(self._notify_new)
+ # XXX should use a public api instead
+ hdoc = msg._hdoc
+ self.messages.add_hdocset_docid(hdoc.doc_id)
+
+ deferLater(reactor, 1, self.notify_new)
- @deferred
def _do_add_doc(self, doc):
"""
- Defers the adding of a new doc.
+ Defer the adding of a new doc.
+
:param doc: document to be created in soledad.
+ :type doc: dict
"""
self._soledad.create_doc(doc)
@@ -771,12 +794,19 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
def deleteAllDocs(self):
"""
- Deletes all docs in this mailbox
+ Delete all docs in this mailbox
"""
docs = self.messages.get_all_docs()
for doc in docs:
self.messages._soledad.delete_doc(doc)
+ def unset_recent_flags(self, uids):
+ """
+ Unset Recent flag for a sequence of UIDs.
+ """
+ seq_messg = self._bound_seq(uids)
+ self.messages.unset_recent_flags(seq_messg)
+
def __repr__(self):
"""
Representation string for this mailbox.
diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py
index 1b996b6..7a21009 100644
--- a/src/leap/mail/imap/messages.py
+++ b/src/leap/mail/imap/messages.py
@@ -21,9 +21,11 @@ import copy
import logging
import re
import time
+import threading
import StringIO
from collections import defaultdict, namedtuple
+from functools import partial
from twisted.mail import imap4
from twisted.internet import defer
@@ -41,7 +43,7 @@ from leap.mail.decorators import deferred
from leap.mail.imap.index import IndexedDB
from leap.mail.imap.fields import fields, WithMsgFields
from leap.mail.imap.parser import MailParser, MBoxParser
-from leap.mail.messageflow import IMessageConsumer, MessageProducer
+from leap.mail.messageflow import IMessageConsumer
logger = logging.getLogger(__name__)
@@ -65,6 +67,31 @@ def lowerdict(_dict):
for key, value in _dict.items())
+def try_unique_query(curried):
+ """
+ Try to execute a query that is expected to have a
+ single outcome, and log a warning if more than one document found.
+
+ :param curried: a curried function
+ :type curried: callable
+ """
+ leap_assert(callable(curried), "A callable is expected")
+ try:
+ query = curried()
+ if query:
+ if len(query) > 1:
+ # TODO we could take action, like trigger a background
+ # process to kill dupes.
+ name = getattr(curried, 'expected', 'doc')
+ logger.warning(
+ "More than one %s found for this mbox, "
+ "we got a duplicate!!" % (name,))
+ return query.pop()
+ else:
+ return None
+ except Exception as exc:
+ logger.exception("Unhandled error %r" % exc)
+
CHARSET_PATTERN = r"""charset=([\w-]+)"""
MSGID_PATTERN = r"""<([\w@.]+)>"""
@@ -308,7 +335,7 @@ class LeapMessage(fields, MailParser, MBoxParser):
implements(imap4.IMessage)
- def __init__(self, soledad, uid, mbox):
+ def __init__(self, soledad, uid, mbox, collection=None):
"""
Initializes a LeapMessage.
@@ -318,11 +345,14 @@ class LeapMessage(fields, MailParser, MBoxParser):
:type uid: int or basestring
:param mbox: the mbox this message belongs to
:type mbox: basestring
+ :param collection: a reference to the parent collection object
+ :type collection: MessageCollection
"""
MailParser.__init__(self)
self._soledad = soledad
self._uid = int(uid)
self._mbox = self._parse_mailbox_name(mbox)
+ self._collection = collection
self.__chash = None
self.__bdoc = None
@@ -373,7 +403,7 @@ class LeapMessage(fields, MailParser, MBoxParser):
def getUID(self):
"""
- Retrieve the unique identifier associated with this message
+ Retrieve the unique identifier associated with this Message.
:return: uid for this message
:rtype: int
@@ -382,18 +412,26 @@ class LeapMessage(fields, MailParser, MBoxParser):
def getFlags(self):
"""
- Retrieve the flags associated with this message
+ Retrieve the flags associated with this Message.
:return: The flags, represented as strings
:rtype: tuple
"""
if self._uid is None:
return []
+ uid = self._uid
flags = []
fdoc = self._fdoc
if fdoc:
flags = fdoc.content.get(self.FLAGS_KEY, None)
+
+ msgcol = self._collection
+
+ # We treat the recent flag specially: gotten from
+ # a mailbox-level document.
+ if msgcol and uid in msgcol.recent_flags:
+ flags.append(fields.RECENT_FLAG)
if flags:
flags = map(str, flags)
return tuple(flags)
@@ -414,7 +452,7 @@ class LeapMessage(fields, MailParser, MBoxParser):
:rtype: SoledadDocument
"""
leap_assert(isinstance(flags, tuple), "flags need to be a tuple")
- log.msg('setting flags: %s' % (self._uid))
+ log.msg('setting flags: %s (%s)' % (self._uid, flags))
doc = self._fdoc
if not doc:
@@ -424,7 +462,6 @@ class LeapMessage(fields, MailParser, MBoxParser):
return
doc.content[self.FLAGS_KEY] = flags
doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags
- doc.content[self.RECENT_KEY] = self.RECENT_FLAG in flags
doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags
self._soledad.put_doc(doc)
@@ -927,9 +964,30 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
FLAGS_DOC = "FLAGS"
HEADERS_DOC = "HEADERS"
CONTENT_DOC = "CONTENT"
+ """
+ RECENT_DOC is a document that stores a list of the UIDs
+ with the recent flag for this mailbox. It deserves a special treatment
+ because:
+ (1) it cannot be set by the user
+ (2) it's a flag that we set inmediately after a fetch, which is quite
+ often.
+ (3) we need to be able to set/unset it in batches without doing a single
+ write for each element in the sequence.
+ """
+ RECENT_DOC = "RECENT"
+ """
+ HDOCS_SET_DOC is a document that stores a set of the Document-IDs
+ (the u1db index) for all the headers documents for a given mailbox.
+ We use it to prefetch massively all the headers for a mailbox.
+ This is the second massive query, after fetching all the FLAGS, that
+ a MUA will do in a case where we do not have local disk cache.
+ """
+ HDOCS_SET_DOC = "HDOCS_SET"
templates = {
+ # Message Level
+
FLAGS_DOC: {
fields.TYPE_KEY: fields.TYPE_FLAGS_VAL,
fields.UID_KEY: 1, # XXX moe to a local table
@@ -937,7 +995,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
fields.CONTENT_HASH_KEY: "",
fields.SEEN_KEY: False,
- fields.RECENT_KEY: True,
fields.DEL_KEY: False,
fields.FLAGS_KEY: [],
fields.MULTIPART_KEY: False,
@@ -970,12 +1027,36 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
fields.MULTIPART_KEY: False,
},
+ # Mailbox Level
+
+ RECENT_DOC: {
+ fields.TYPE_KEY: fields.TYPE_RECENT_VAL,
+ fields.MBOX_KEY: fields.INBOX_VAL,
+ fields.RECENTFLAGS_KEY: [],
+ },
+
+ HDOCS_SET_DOC: {
+ fields.TYPE_KEY: fields.TYPE_HDOCS_SET_VAL,
+ fields.MBOX_KEY: fields.INBOX_VAL,
+ fields.HDOCS_SET_KEY: [],
+ }
+
+
}
+ _rdoc_lock = threading.Lock()
+ _hdocset_lock = threading.Lock()
+
def __init__(self, mbox=None, soledad=None):
"""
Constructor for MessageCollection.
+ On initialization, we ensure that we have a document for
+ storing the recent flags. The nature of this flag make us wanting
+ to store the set of the UIDs with this flag at the level of the
+ MessageCollection for each mailbox, instead of treating them
+ as a property of each message.
+
:param mbox: the name of the mailbox. It is the name
with which we filter the query over the
messages database
@@ -994,17 +1075,13 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
# okay, all in order, keep going...
self.mbox = self._parse_mailbox_name(mbox)
self._soledad = soledad
+ self.__rflags = None
+ self.__hdocset = None
self.initialize_db()
- # I think of someone like nietzsche when reading this
-
- # this will be the producer that will enqueue the content
- # to be processed serially by the consumer (the writer). We just
- # need to `put` the new material on its plate.
-
- self.soledad_writer = MessageProducer(
- SoledadDocWriter(soledad),
- period=0.02)
+ # ensure that we have a recent-flags and a hdocs-sec doc
+ self._get_or_create_rdoc()
+ self._get_or_create_hdocset()
def _get_empty_doc(self, _type=FLAGS_DOC):
"""
@@ -1017,6 +1094,30 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
raise TypeError("Improper type passed to _get_empty_doc")
return copy.deepcopy(self.templates[_type])
+ def _get_or_create_rdoc(self):
+ """
+ Try to retrieve the recent-flags doc for this MessageCollection,
+ and create one if not found.
+ """
+ rdoc = self._get_recent_doc()
+ if not rdoc:
+ rdoc = self._get_empty_doc(self.RECENT_DOC)
+ if self.mbox != fields.INBOX_VAL:
+ rdoc[fields.MBOX_KEY] = self.mbox
+ self._soledad.create_doc(rdoc)
+
+ def _get_or_create_hdocset(self):
+ """
+ Try to retrieve the hdocs-set doc for this MessageCollection,
+ and create one if not found.
+ """
+ hdocset = self._get_hdocset_doc()
+ if not hdocset:
+ hdocset = self._get_empty_doc(self.HDOCS_SET_DOC)
+ if self.mbox != fields.INBOX_VAL:
+ hdocset[fields.MBOX_KEY] = self.mbox
+ self._soledad.create_doc(hdocset)
+
def _do_parse(self, raw):
"""
Parse raw message and return it along with
@@ -1161,14 +1262,17 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
hd[key] = parts_map[key]
del parts_map
- # Saving
+ # Saving ----------------------------------------
+ self.set_recent_flag(uid)
# first, regular docs: flags and headers
self._soledad.create_doc(fd)
-
# XXX should check for content duplication on headers too
# but with chash. !!!
- self._soledad.create_doc(hd)
+ hdoc = self._soledad.create_doc(hd)
+ # We add the newly created hdoc to the fast-access set of
+ # headers documents associated with the mailbox.
+ self.add_hdocset_docid(hdoc.doc_id)
# and last, but not least, try to create
# content docs if not already there.
@@ -1201,7 +1305,141 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
d.addCallback(self._remove_cb)
return d
+ #
# getters: specific queries
+ #
+
+ # recent flags
+
+ def _get_recent_flags(self):
+ """
+ An accessor for the recent-flags set for this mailbox.
+ """
+ if not self.__rflags:
+ rdoc = self._get_recent_doc()
+ self.__rflags = set(rdoc.content.get(
+ fields.RECENTFLAGS_KEY, []))
+ return self.__rflags
+
+ def _set_recent_flags(self, value):
+ """
+ Setter for the recent-flags set for this mailbox.
+ """
+ rdoc = self._get_recent_doc()
+ newv = set(value)
+ self.__rflags = newv
+
+ with self._rdoc_lock:
+ rdoc.content[fields.RECENTFLAGS_KEY] = list(newv)
+ # XXX should deferLater 0 it?
+ self._soledad.put_doc(rdoc)
+
+ recent_flags = property(
+ _get_recent_flags, _set_recent_flags,
+ doc="Set of UIDs with the recent flag for this mailbox.")
+
+ def unset_recent_flags(self, uids):
+ """
+ Unset Recent flag for a sequence of uids.
+ """
+ self.recent_flags = self.recent_flags.difference(
+ set(uids))
+
+ def unset_recent_flag(self, uid):
+ """
+ Unset Recent flag for a given uid.
+ """
+ self.recent_flags = self.recent_flags.difference(
+ set([uid]))
+
+ def set_recent_flag(self, uid):
+ """
+ Set Recent flag for a given uid.
+ """
+ self.recent_flags = self.recent_flags.union(
+ set([uid]))
+
+ def _get_recent_doc(self):
+ """
+ Get recent-flags document for this mailbox.
+ """
+ curried = partial(
+ self._soledad.get_from_index,
+ fields.TYPE_MBOX_IDX,
+ fields.TYPE_RECENT_VAL, self.mbox)
+ curried.expected = "rdoc"
+ with self._rdoc_lock:
+ return try_unique_query(curried)
+
+ # headers-docs-set
+
+ def _get_hdocset(self):
+ """
+ An accessor for the hdocs-set for this mailbox.
+ """
+ if not self.__hdocset:
+ hdocset_doc = self._get_hdocset_doc()
+ value = set(hdocset_doc.content.get(
+ fields.HDOCS_SET_KEY, []))
+ self.__hdocset = value
+ return self.__hdocset
+
+ def _set_hdocset(self, value):
+ """
+ Setter for the hdocs-set for this mailbox.
+ """
+ hdocset_doc = self._get_hdocset_doc()
+ newv = set(value)
+ self.__hdocset = newv
+
+ with self._hdocset_lock:
+ hdocset_doc.content[fields.HDOCS_SET_KEY] = list(newv)
+ # XXX should deferLater 0 it?
+ self._soledad.put_doc(hdocset_doc)
+
+ _hdocset = property(
+ _get_hdocset, _set_hdocset,
+ doc="Set of Document-IDs for the headers docs associated "
+ "with this mailbox.")
+
+ def _get_hdocset_doc(self):
+ """
+ Get hdocs-set document for this mailbox.
+ """
+ curried = partial(
+ self._soledad.get_from_index,
+ fields.TYPE_MBOX_IDX,
+ fields.TYPE_HDOCS_SET_VAL, self.mbox)
+ curried.expected = "hdocset"
+ with self._hdocset_lock:
+ hdocset_doc = try_unique_query(curried)
+ return hdocset_doc
+
+ def remove_hdocset_docids(self, docids):
+ """
+ Remove the given document IDs from the set of
+ header-documents associated with this mailbox.
+ """
+ self._hdocset = self._hdocset.difference(
+ set(docids))
+
+ def remove_hdocset_docid(self, docid):
+ """
+ Remove the given document ID from the set of
+ header-documents associated with this mailbox.
+ """
+ self._hdocset = self._hdocset.difference(
+ set([docid]))
+
+ def add_hdocset_docid(self, docid):
+ """
+ Add the given document ID to the set of
+ header-documents associated with this mailbox.
+ """
+ hdocset = self._hdocset
+ self._hdocset = hdocset.union(set([docid]))
+
+ # individual doc getters, message layer.
def _get_fdoc_from_chash(self, chash):
"""
@@ -1211,39 +1449,21 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
the query failed.
:rtype: SoledadDocument or None.
"""
- try:
- query = self._soledad.get_from_index(
- fields.TYPE_MBOX_C_HASH_IDX,
- fields.TYPE_FLAGS_VAL, self.mbox, chash)
- if query:
- if len(query) > 1:
- logger.warning(
- "More than one fdoc found for this chash, "
- "we got a duplicate!!")
- # XXX we could take action, like trigger a background
- # process to kill dupes.
- return query.pop()
- else:
- return None
- except Exception as exc:
- logger.exception("Unhandled error %r" % exc)
+ curried = partial(
+ self._soledad.get_from_index,
+ fields.TYPE_MBOX_C_HASH_IDX,
+ fields.TYPE_FLAGS_VAL, self.mbox, chash)
+ curried.expected = "fdoc"
+ return try_unique_query(curried)
def _get_uid_from_msgidCb(self, msgid):
hdoc = None
- try:
- query = self._soledad.get_from_index(
- fields.TYPE_MSGID_IDX,
- fields.TYPE_HEADERS_VAL, msgid)
- if query:
- if len(query) > 1:
- logger.warning(
- "More than one hdoc found for this msgid, "
- "we got a duplicate!!")
- # XXX we could take action, like trigger a background
- # process to kill dupes.
- hdoc = query.pop()
- except Exception as exc:
- logger.exception("Unhandled error %r" % exc)
+ curried = partial(
+ self._soledad.get_from_index,
+ fields.TYPE_MSGID_IDX,
+ fields.TYPE_HEADERS_VAL, msgid)
+ curried.expected = "hdoc"
+ hdoc = try_unique_query(curried)
if hdoc is None:
logger.warning("Could not find hdoc for msgid %s"
@@ -1272,13 +1492,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
# the query is received right after we've saved the document,
# and we cannot find it otherwise. This seems to be enough.
- # Doing a sleep since we'll be calling this in a secondary thread,
- # but we'll should be able to collect the results after a
- # reactor.callLater.
- # Maybe we can implement something like NOT_DONE_YET in the web
- # framework, and return from the callback?
- # See: http://jcalderone.livejournal.com/50226.html
- # reactor.callLater(0.3, self._get_uid_from_msgidCb, msgid)
+ # XXX do a deferLater instead ??
time.sleep(0.3)
return self._get_uid_from_msgidCb(msgid)
@@ -1287,6 +1501,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
def get_msg_by_uid(self, uid):
"""
Retrieves a LeapMessage by UID.
+ This is used primarity in the Mailbox fetch and store methods.
:param uid: the message uid to query by
:type uid: int
@@ -1295,7 +1510,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
or None if not found.
:rtype: LeapMessage
"""
- msg = LeapMessage(self._soledad, uid, self.mbox)
+ msg = LeapMessage(self._soledad, uid, self.mbox, collection=self)
if not msg.does_exist():
return None
return msg
@@ -1324,6 +1539,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
# inneficient, but first let's grok it and then
# let's worry about efficiency.
# XXX FIXINDEX -- should implement order by in soledad
+ # FIXME ----------------------------------------------
return sorted(all_docs, key=lambda item: item.content['uid'])
def all_uid_iter(self):
@@ -1362,6 +1578,30 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
fields.TYPE_FLAGS_VAL, self.mbox)))
return all_flags
+ def all_flags_chash(self):
+ """
+ Return a dict with the content-hash for all flag documents
+ for this mailbox.
+ """
+ all_flags_chash = dict(((
+ doc.content[self.UID_KEY],
+ doc.content[self.CONTENT_HASH_KEY]) for doc in
+ self._soledad.get_from_index(
+ fields.TYPE_MBOX_IDX,
+ fields.TYPE_FLAGS_VAL, self.mbox)))
+ return all_flags_chash
+
+ def all_headers(self):
+ """
+ Return a dict with all the headers documents for this
+ mailbox.
+ """
+ all_headers = dict(((
+ doc.content[self.CONTENT_HASH_KEY],
+ doc.content[self.HEADERS_KEY]) for doc in
+ self._soledad.get_docs(self._hdocset)))
+ return all_headers
+
def count(self):
"""
Return the count of messages for this mailbox.
@@ -1412,39 +1652,17 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
# recent messages
- def recent_iter(self):
- """
- Get an iterator for the message UIDs with `recent` flag.
-
- :return: iterator through recent message docs
- :rtype: iterable
- """
- return (doc.content[self.UID_KEY] for doc in
- self._soledad.get_from_index(
- fields.TYPE_MBOX_RECT_IDX,
- fields.TYPE_FLAGS_VAL, self.mbox, '1'))
-
- def get_recent(self):
- """
- Get all messages with the `Recent` flag.
-
- :returns: a list of LeapMessages
- :rtype: list
- """
- return [LeapMessage(self._soledad, docid, self.mbox)
- for docid in self.recent_iter()]
-
def count_recent(self):
"""
Count all messages with the `Recent` flag.
+ It just retrieves the length of the recent_flags set,
+ which is stored in a specific type of document for
+ this collection.
:returns: count
:rtype: int
"""
- count = self._soledad.get_count_from_index(
- fields.TYPE_MBOX_RECT_IDX,
- fields.TYPE_FLAGS_VAL, self.mbox, '1')
- return count
+ return len(self.recent_flags)
# deleted messages
@@ -1496,4 +1714,4 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
self.mbox, self.count())
# XXX should implement __eq__ also !!!
- # --- use the content hash for that, will be used for dedup.
+ # use chash...
diff --git a/src/leap/mail/imap/service/imap.py b/src/leap/mail/imap/service/imap.py
index 6e03456..a1d3ab7 100644
--- a/src/leap/mail/imap/service/imap.py
+++ b/src/leap/mail/imap/service/imap.py
@@ -24,6 +24,7 @@ import logging
from twisted.internet.protocol import ServerFactory
from twisted.internet.defer import maybeDeferred
from twisted.internet.error import CannotListenError
+from twisted.internet.task import deferLater
from twisted.mail import imap4
from twisted.python import log
from twisted import cred
@@ -116,16 +117,18 @@ class LeapIMAPServer(imap4.IMAP4Server):
Overwritten fetch dispatcher to use the fast fetch_flags
method
"""
+ from twisted.internet import reactor
log.msg("LEAP Overwritten fetch...")
if not query:
self.sendPositiveResponse(tag, 'FETCH complete')
return # XXX ???
+ print "QUERY ", query
+ print query[0]
+
cbFetch = self._IMAP4Server__cbFetch
ebFetch = self._IMAP4Server__ebFetch
- print "QUERY: ", query
-
if len(query) == 1 and str(query[0]) == "flags":
self._oldTimeout = self.setTimeout(None)
# no need to call iter, we get a generator
@@ -134,6 +137,14 @@ class LeapIMAPServer(imap4.IMAP4Server):
).addCallback(
cbFetch, tag, query, uid
).addErrback(ebFetch, tag)
+ elif len(query) == 1 and str(query[0]) == "rfc822.header":
+ self._oldTimeout = self.setTimeout(None)
+ # no need to call iter, we get a generator
+ maybeDeferred(
+ self.mbox.fetch_headers, messages, uid=uid
+ ).addCallback(
+ cbFetch, tag, query, uid
+ ).addErrback(ebFetch, tag)
else:
self._oldTimeout = self.setTimeout(None)
# no need to call iter, we get a generator
@@ -141,11 +152,32 @@ class LeapIMAPServer(imap4.IMAP4Server):
self.mbox.fetch, messages, uid=uid
).addCallback(
cbFetch, tag, query, uid
- ).addErrback(ebFetch, tag)
+ ).addErrback(
+ ebFetch, tag)
+
+ deferLater(reactor,
+ 2, self.mbox.unset_recent_flags, messages)
+ deferLater(reactor, 1, self.mbox.signal_unread_to_ui)
select_FETCH = (do_FETCH, imap4.IMAP4Server.arg_seqset,
imap4.IMAP4Server.arg_fetchatt)
+ def do_COPY(self, tag, messages, mailbox, uid=0):
+ from twisted.internet import reactor
+ imap4.IMAP4Server.do_COPY(self, tag, messages, mailbox, uid)
+ deferLater(reactor,
+ 2, self.mbox.unset_recent_flags, messages)
+ deferLater(reactor, 1, self.mbox.signal_unread_to_ui)
+
+ select_COPY = (do_COPY, imap4.IMAP4Server.arg_seqset,
+ imap4.IMAP4Server.arg_astring)
+
+ def notifyNew(self, ignored):
+ """
+ Notify new messages to listeners.
+ """
+ self.mbox.notify_new()
+
def _cbSelectWork(self, mbox, cmdName, tag):
"""
Callback for selectWork, patched to avoid conformance errors due to