summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap
diff options
context:
space:
mode:
authorIvan Alejandro <ivanalejandro0@gmail.com>2014-02-04 16:20:54 -0300
committerIvan Alejandro <ivanalejandro0@gmail.com>2014-02-04 16:20:54 -0300
commit74428b3176d286312f69e124d4d613c27a1ec93e (patch)
treeeec0561e2184472dfedba3135a3d005efd478c34 /src/leap/mail/imap
parent781bd2f4d2a047088d1a0ecd673a38c80ea0c0c0 (diff)
parent23e28bae2c3cb74e00e29ee8add0b73adeb65c2b (diff)
Merge remote-tracking branch 'kali/feature/in-memory-store' into develop
Diffstat (limited to 'src/leap/mail/imap')
-rw-r--r--src/leap/mail/imap/account.py13
-rw-r--r--src/leap/mail/imap/fetch.py4
-rw-r--r--src/leap/mail/imap/interfaces.py94
-rw-r--r--src/leap/mail/imap/mailbox.py295
-rw-r--r--src/leap/mail/imap/memorystore.py961
-rw-r--r--src/leap/mail/imap/messageparts.py565
-rw-r--r--src/leap/mail/imap/messages.py1104
-rw-r--r--src/leap/mail/imap/server.py217
-rw-r--r--src/leap/mail/imap/service/imap.py189
-rw-r--r--src/leap/mail/imap/service/manhole.py130
-rw-r--r--src/leap/mail/imap/soledadstore.py487
-rwxr-xr-xsrc/leap/mail/imap/tests/leap_tests_imap.zsh4
-rw-r--r--src/leap/mail/imap/tests/walktree.py16
13 files changed, 3060 insertions, 1019 deletions
diff --git a/src/leap/mail/imap/account.py b/src/leap/mail/imap/account.py
index ce83079..f985c04 100644
--- a/src/leap/mail/imap/account.py
+++ b/src/leap/mail/imap/account.py
@@ -48,7 +48,7 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):
selected = None
closed = False
- def __init__(self, account_name, soledad=None):
+ def __init__(self, account_name, soledad, memstore=None):
"""
Creates a SoledadAccountIndex that keeps track of the mailboxes
and subscriptions handled by this account.
@@ -57,7 +57,9 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):
:type acct_name: str
:param soledad: a Soledad instance.
- :param soledad: Soledad
+ :type soledad: Soledad
+ :param memstore: a MemoryStore instance.
+ :type memstore: MemoryStore
"""
leap_assert(soledad, "Need a soledad instance to initialize")
leap_assert_type(soledad, Soledad)
@@ -67,6 +69,7 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):
self._account_name = self._parse_mailbox_name(account_name)
self._soledad = soledad
+ self._memstore = memstore
self.initialize_db()
@@ -131,7 +134,8 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):
if name not in self.mailboxes:
raise imap4.MailboxException("No such mailbox: %r" % name)
- return SoledadMailbox(name, soledad=self._soledad)
+ return SoledadMailbox(name, self._soledad,
+ memstore=self._memstore)
##
## IAccount
@@ -221,8 +225,7 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):
self.selected = name
return SoledadMailbox(
- name, rw=readwrite,
- soledad=self._soledad)
+ name, self._soledad, self._memstore, readwrite)
def delete(self, name, force=False):
"""
diff --git a/src/leap/mail/imap/fetch.py b/src/leap/mail/imap/fetch.py
index 817ad6a..40dadb3 100644
--- a/src/leap/mail/imap/fetch.py
+++ b/src/leap/mail/imap/fetch.py
@@ -45,7 +45,7 @@ from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL
from leap.common.mail import get_email_charset
from leap.keymanager import errors as keymanager_errors
from leap.keymanager.openpgp import OpenPGPKey
-from leap.mail.decorators import deferred
+from leap.mail.decorators import deferred_to_thread
from leap.mail.utils import json_loads
from leap.soledad.client import Soledad
from leap.soledad.common.crypto import ENC_SCHEME_KEY, ENC_JSON_KEY
@@ -199,7 +199,7 @@ class LeapIncomingMail(object):
logger.exception(failure.value)
traceback.print_tb(*sys.exc_info())
- @deferred
+ @deferred_to_thread
def _sync_soledad(self):
"""
Synchronizes with remote soledad.
diff --git a/src/leap/mail/imap/interfaces.py b/src/leap/mail/imap/interfaces.py
new file mode 100644
index 0000000..c906278
--- /dev/null
+++ b/src/leap/mail/imap/interfaces.py
@@ -0,0 +1,94 @@
+# -*- coding: utf-8 -*-
+# interfaces.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Interfaces for the IMAP module.
+"""
+from zope.interface import Interface, Attribute
+
+
+class IMessageContainer(Interface):
+ """
+ I am a container around the different documents that a message
+ is split into.
+ """
+ fdoc = Attribute('The flags document for this message, if any.')
+ hdoc = Attribute('The headers document for this message, if any.')
+ cdocs = Attribute('The dict of content documents for this message, '
+ 'if any.')
+
+ def walk(self):
+ """
+ Return an iterator to the docs for all the parts.
+
+ :rtype: iterator
+ """
+
+
+class IMessageStore(Interface):
+ """
+ I represent a generic storage for LEAP Messages.
+ """
+
+ def create_message(self, mbox, uid, message):
+ """
+ Put the passed message into this IMessageStore.
+
+ :param mbox: the mbox this message belongs.
+ :param uid: the UID that identifies this message in this mailbox.
+ :param message: a IMessageContainer implementor.
+ """
+
+ def put_message(self, mbox, uid, message):
+ """
+ Put the passed message into this IMessageStore.
+
+ :param mbox: the mbox this message belongs.
+ :param uid: the UID that identifies this message in this mailbox.
+ :param message: a IMessageContainer implementor.
+ """
+
+ def remove_message(self, mbox, uid):
+ """
+ Remove the given message from this IMessageStore.
+
+ :param mbox: the mbox this message belongs.
+ :param uid: the UID that identifies this message in this mailbox.
+ """
+
+ def get_message(self, mbox, uid):
+ """
+ Get a IMessageContainer for the given mbox and uid combination.
+
+ :param mbox: the mbox this message belongs.
+ :param uid: the UID that identifies this message in this mailbox.
+ :return: IMessageContainer
+ """
+
+
+class IMessageStoreWriter(Interface):
+ """
+ I represent a storage that is able to write its contents to another
+ different IMessageStore.
+ """
+
+ def write_messages(self, store):
+ """
+ Write the documents in this IMessageStore to a different
+ storage. Usually this will be done from a MemoryStorage to a DbStorage.
+
+ :param store: another IMessageStore implementor.
+ """
diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py
index 0131ce0..c682578 100644
--- a/src/leap/mail/imap/mailbox.py
+++ b/src/leap/mail/imap/mailbox.py
@@ -22,6 +22,7 @@ import threading
import logging
import StringIO
import cStringIO
+import os
from collections import defaultdict
@@ -35,13 +36,21 @@ from zope.interface import implements
from leap.common import events as leap_events
from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL
from leap.common.check import leap_assert, leap_assert_type
-from leap.mail.decorators import deferred
+from leap.mail.decorators import deferred_to_thread
+from leap.mail.utils import empty
from leap.mail.imap.fields import WithMsgFields, fields
from leap.mail.imap.messages import MessageCollection
+from leap.mail.imap.messageparts import MessageWrapper
from leap.mail.imap.parser import MBoxParser
logger = logging.getLogger(__name__)
+"""
+If the environment variable `LEAP_SKIPNOTIFY` is set, we avoid
+notifying clients of new messages. Use during stress tests.
+"""
+NOTIFY_NEW = not os.environ.get('LEAP_SKIPNOTIFY', False)
+
class SoledadMailbox(WithMsgFields, MBoxParser):
"""
@@ -76,11 +85,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
CMD_UIDVALIDITY = "UIDVALIDITY"
CMD_UNSEEN = "UNSEEN"
+ # FIXME we should turn this into a datastructure with limited capacity
_listeners = defaultdict(set)
next_uid_lock = threading.Lock()
- def __init__(self, mbox, soledad=None, rw=1):
+ def __init__(self, mbox, soledad, memstore, rw=1):
"""
SoledadMailbox constructor. Needs to get passed a name, plus a
Soledad instance.
@@ -91,7 +101,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:param soledad: a Soledad instance.
:type soledad: Soledad
- :param rw: read-and-write flags
+ :param memstore: a MemoryStore instance
+ :type memstore: MemoryStore
+
+ :param rw: read-and-write flag for this mailbox
:type rw: int
"""
leap_assert(mbox, "Need a mailbox name to initialize")
@@ -105,13 +118,18 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
self.rw = rw
self._soledad = soledad
+ self._memstore = memstore
self.messages = MessageCollection(
- mbox=mbox, soledad=self._soledad)
+ mbox=mbox, soledad=self._soledad, memstore=self._memstore)
if not self.getFlags():
self.setFlags(self.INIT_FLAGS)
+ if self._memstore:
+ self.prime_known_uids_to_memstore()
+ self.prime_last_uid_to_memstore()
+
@property
def listeners(self):
"""
@@ -125,6 +143,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
"""
return self._listeners[self.mbox]
+ # TODO this grows too crazily when many instances are fired, like
+ # during imaptest stress testing. Should have a queue of limited size
+ # instead.
def addListener(self, listener):
"""
Add a listener to the listeners queue.
@@ -134,6 +155,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:param listener: listener to add
:type listener: an object that implements IMailboxListener
"""
+ if not NOTIFY_NEW:
+ return
+
logger.debug('adding mailbox listener: %s' % listener)
self.listeners.add(listener)
@@ -146,6 +170,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
"""
self.listeners.remove(listener)
+ # TODO move completely to soledadstore, under memstore reponsibility.
def _get_mbox(self):
"""
Return mailbox document.
@@ -221,48 +246,38 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
def _get_last_uid(self):
"""
Return the last uid for this mailbox.
+ If we have a memory store, the last UID will be the highest
+ recorded UID in the message store, or a counter cached from
+ the mailbox document in soledad if this is higher.
:return: the last uid for messages in this mailbox
- :rtype: bool
+ :rtype: int
"""
- mbox = self._get_mbox()
- if not mbox:
- logger.error("We could not get a mbox!")
- # XXX It looks like it has been corrupted.
- # We need to be able to survive this.
- return None
- return mbox.content.get(self.LAST_UID_KEY, 1)
+ last = self._memstore.get_last_uid(self.mbox)
+ logger.debug("last uid for %s: %s (from memstore)" % (
+ repr(self.mbox), last))
+ return last
- def _set_last_uid(self, uid):
- """
- Sets the last uid for this mailbox.
+ last_uid = property(
+ _get_last_uid, doc="Last_UID attribute.")
- :param uid: the uid to be set
- :type uid: int
+ def prime_last_uid_to_memstore(self):
"""
- leap_assert(isinstance(uid, int), "uid has to be int")
- mbox = self._get_mbox()
- key = self.LAST_UID_KEY
-
- count = self.getMessageCount()
-
- # XXX safety-catch. If we do get duplicates,
- # we want to avoid further duplication.
-
- if uid >= count:
- value = uid
- else:
- # something is wrong,
- # just set the last uid
- # beyond the max msg count.
- logger.debug("WRONG uid < count. Setting last uid to %s", count)
- value = count
+ Prime memstore with last_uid value
+ """
+ set_exist = set(self.messages.all_uid_iter())
+ last = max(set_exist) if set_exist else 0
+ logger.info("Priming Soledad last_uid to %s" % (last,))
+ self._memstore.set_last_soledad_uid(self.mbox, last)
- mbox.content[key] = value
- self._soledad.put_doc(mbox)
+ def prime_known_uids_to_memstore(self):
+ """
+ Prime memstore with the set of all known uids.
- last_uid = property(
- _get_last_uid, _set_last_uid, doc="Last_UID attribute.")
+ We do this to be able to filter the requests efficiently.
+ """
+ known_uids = self.messages.all_soledad_uid_iter()
+ self._memstore.set_known_uids(self.mbox, known_uids)
def getUIDValidity(self):
"""
@@ -304,8 +319,15 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:rtype: int
"""
with self.next_uid_lock:
- self.last_uid += 1
- return self.last_uid
+ if self._memstore:
+ return self.last_uid + 1
+ else:
+ # XXX after lock, it should be safe to
+ # return just the increment here, and
+ # have a different method that actually increments
+ # the counter when really adding.
+ self.last_uid += 1
+ return self.last_uid
def getMessageCount(self):
"""
@@ -366,7 +388,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
if self.CMD_UIDNEXT in names:
r[self.CMD_UIDNEXT] = self.last_uid + 1
if self.CMD_UIDVALIDITY in names:
- r[self.CMD_UIDVALIDITY] = self.getUID()
+ r[self.CMD_UIDVALIDITY] = self.getUIDValidity()
if self.CMD_UNSEEN in names:
r[self.CMD_UNSEEN] = self.getUnseenCount()
return defer.succeed(r)
@@ -386,26 +408,26 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:return: a deferred that evals to None
"""
+ # TODO have a look at the cases for internal date in the rfc
if isinstance(message, (cStringIO.OutputType, StringIO.StringIO)):
message = message.getvalue()
- # XXX we should treat the message as an IMessage from here
+
+ # XXX we could treat the message as an IMessage from here
leap_assert_type(message, basestring)
- uid_next = self.getUIDNext()
- logger.debug('Adding msg with UID :%s' % uid_next)
if flags is None:
flags = tuple()
else:
flags = tuple(str(flag) for flag in flags)
- d = self._do_add_message(message, flags=flags, date=date, uid=uid_next)
+ d = self._do_add_message(message, flags=flags, date=date)
return d
- def _do_add_message(self, message, flags, date, uid):
+ def _do_add_message(self, message, flags, date):
"""
- Calls to the messageCollection add_msg method (deferred to thread).
+ Calls to the messageCollection add_msg method.
Invoked from addMessage.
"""
- d = self.messages.add_msg(message, flags=flags, date=date, uid=uid)
+ d = self.messages.add_msg(message, flags=flags, date=date)
# XXX Removing notify temporarily.
# This is interfering with imaptest results. I'm not clear if it's
# because we clutter the logging or because the set of listeners is
@@ -421,6 +443,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:param args: ignored.
"""
+ if not NOTIFY_NEW:
+ return
exists = self.getMessageCount()
recent = self.getRecentCount()
logger.debug("NOTIFY: there are %s messages, %s recent" % (
@@ -445,6 +469,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
# XXX removing the mailbox in situ for now,
# we should postpone the removal
+
+ # XXX move to memory store??
self._soledad.delete_doc(self._get_mbox())
def _close_cb(self, result):
@@ -467,13 +493,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
"""
if not self.isWriteable():
raise imap4.ReadOnlyMailbox
- d = self.messages.remove_all_deleted()
- d.addCallback(self._expunge_cb)
- d.addCallback(self.messages.reset_last_uid)
-
- # XXX DEBUG -------------------
- # FIXME !!!
- # XXX should remove the hdocset too!!!
+ d = defer.Deferred()
+ return self._memstore.expunge(self.mbox, d)
+ self._memstore.expunge(self.mbox)
+ d.addCallback(self._expunge_cb, d)
return d
def _bound_seq(self, messages_asked):
@@ -509,7 +532,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
seq_messg = set_asked.intersection(set_exist)
return seq_messg
- @deferred
+ @deferred_to_thread
+ #@profile
def fetch(self, messages_asked, uid):
"""
Retrieve one or more messages in this mailbox.
@@ -548,7 +572,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
result = ((msgid, getmsg(msgid)) for msgid in seq_messg)
return result
- @deferred
+ @deferred_to_thread
def fetch_flags(self, messages_asked, uid):
"""
A fast method to fetch all flags, tricking just the
@@ -589,10 +613,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
all_flags = self.messages.all_flags()
result = ((msgid, flagsPart(
- msgid, all_flags[msgid])) for msgid in seq_messg)
+ msgid, all_flags.get(msgid, tuple()))) for msgid in seq_messg)
return result
- @deferred
+ @deferred_to_thread
def fetch_headers(self, messages_asked, uid):
"""
A fast method to fetch all headers, tricking just the
@@ -641,14 +665,16 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
for msgid in seq_messg)
return result
- def signal_unread_to_ui(self):
+ def signal_unread_to_ui(self, *args, **kwargs):
"""
Sends unread event to ui.
+
+ :param args: ignored
+ :param kwargs: ignored
"""
unseen = self.getUnseenCount()
leap_events.signal(IMAP_UNREAD_MAIL, str(unseen))
- @deferred
def store(self, messages_asked, flags, mode, uid):
"""
Sets the flags of one or more messages.
@@ -670,49 +696,43 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
otherwise they are message sequence IDs.
:type uid: bool
- :return: A dict mapping message sequence numbers to sequences of
- str representing the flags set on the message after this
- operation has been performed.
- :rtype: dict
+ :return: A deferred, that will be called with a dict mapping message
+ sequence numbers to sequences of str representing the flags
+ set on the message after this operation has been performed.
+ :rtype: deferred
:raise ReadOnlyMailbox: Raised if this mailbox is not open for
read-write.
"""
+ from twisted.internet import reactor
+ if not self.isWriteable():
+ log.msg('read only mailbox!')
+ raise imap4.ReadOnlyMailbox
+
+ d = defer.Deferred()
+ deferLater(reactor, 0, self._do_store, messages_asked, flags,
+ mode, uid, d)
+ return d
+
+ def _do_store(self, messages_asked, flags, mode, uid, observer):
+ """
+ Helper method, invoke set_flags method in the MessageCollection.
+
+ See the documentation for the `store` method for the parameters.
+
+ :param observer: a deferred that will be called with the dictionary
+ mapping UIDs to flags after the operation has been
+ done.
+ :type observer: deferred
+ """
# XXX implement also sequence (uid = 0)
- # XXX we should prevent cclient from setting Recent flag.
+ # XXX we should prevent cclient from setting Recent flag?
leap_assert(not isinstance(flags, basestring),
"flags cannot be a string")
flags = tuple(flags)
-
messages_asked = self._bound_seq(messages_asked)
seq_messg = self._filter_msg_seq(messages_asked)
-
- if not self.isWriteable():
- log.msg('read only mailbox!')
- raise imap4.ReadOnlyMailbox
-
- result = {}
- for msg_id in seq_messg:
- log.msg("MSG ID = %s" % msg_id)
- msg = self.messages.get_msg_by_uid(msg_id)
- if not msg:
- continue
- if mode == 1:
- msg.addFlags(flags)
- elif mode == -1:
- msg.removeFlags(flags)
- elif mode == 0:
- msg.setFlags(flags)
- result[msg_id] = msg.getFlags()
-
- # After changing flags, we want to signal again to the
- # UI because the number of unread might have changed.
- # Hoever, we should probably limit this to INBOX only?
- # this should really be called as a final callback of
- # the do_STORE method...
- from twisted.internet import reactor
- deferLater(reactor, 1, self.signal_unread_to_ui)
- return result
+ self.messages.set_flags(self.mbox, seq_messg, flags, mode, observer)
# ISearchableMailbox
@@ -760,44 +780,85 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
# IMessageCopier
- @deferred
- def copy(self, messageObject):
+ def copy(self, message):
"""
Copy the given message object into this mailbox.
+
+ :param message: an IMessage implementor
+ :type message: LeapMessage
+ :return: a deferred that will be fired with the message
+ uid when the copy succeed.
+ :rtype: Deferred
"""
from twisted.internet import reactor
- uid_next = self.getUIDNext()
- msg = messageObject
- # XXX DEBUG ----------------------------------------
- #print "copying MESSAGE from %s (%s) to %s (%s)" % (
- #msg._mbox, msg._uid, self.mbox, uid_next)
+ d = defer.Deferred()
+ # XXX this should not happen ... track it down,
+ # probably to FETCH...
+ if message is None:
+ log.msg("BUG: COPY found a None in passed message")
+ d.callback(None)
+ deferLater(reactor, 0, self._do_copy, message, d)
+ return d
+
+ def _do_copy(self, message, observer):
+ """
+ Call invoked from the deferLater in `copy`. This will
+ copy the flags and header documents, and pass them to the
+ `create_message` method in the MemoryStore, together with
+ the observer deferred that we've been passed along.
+
+ :param message: an IMessage implementor
+ :type message: LeapMessage
+ :param observer: the deferred that will fire with the
+ UID of the message
+ :type observer: Deferred
+ """
+ # XXX for clarity, this could be delegated to a
+ # MessageCollection mixin that implements copy too, and
+ # moved out of here.
+ msg = message
+ memstore = self._memstore
# XXX should use a public api instead
fdoc = msg._fdoc
+ hdoc = msg._hdoc
if not fdoc:
- logger.debug("Tried to copy a MSG with no fdoc")
+ logger.warning("Tried to copy a MSG with no fdoc")
return
-
new_fdoc = copy.deepcopy(fdoc.content)
- new_fdoc[self.UID_KEY] = uid_next
- new_fdoc[self.MBOX_KEY] = self.mbox
- self._do_add_doc(new_fdoc)
- # XXX should use a public api instead
- hdoc = msg._hdoc
- self.messages.add_hdocset_docid(hdoc.doc_id)
+ fdoc_chash = new_fdoc[fields.CONTENT_HASH_KEY]
- deferLater(reactor, 1, self.notify_new)
+ # XXX is this hitting the db??? --- probably.
+ # We should profile after the pre-fetch.
+ dest_fdoc = memstore.get_fdoc_from_chash(
+ fdoc_chash, self.mbox)
+ exist = dest_fdoc and not empty(dest_fdoc.content)
- def _do_add_doc(self, doc):
- """
- Defer the adding of a new doc.
+ if exist:
+ # Should we signal error on the callback?
+ logger.warning("Destination message already exists!")
- :param doc: document to be created in soledad.
- :type doc: dict
- """
- self._soledad.create_doc(doc)
+ # XXX I'm still not clear if we should raise the
+ # errback. This actually rases an ugly warning
+ # in some muas like thunderbird. I guess the user does
+ # not deserve that.
+ observer.callback(True)
+ else:
+ mbox = self.mbox
+ uid_next = memstore.increment_last_soledad_uid(mbox)
+ new_fdoc[self.UID_KEY] = uid_next
+ new_fdoc[self.MBOX_KEY] = mbox
+
+ # FIXME set recent!
+
+ self._memstore.create_message(
+ self.mbox, uid_next,
+ MessageWrapper(
+ new_fdoc, hdoc.content),
+ observer=observer,
+ notify_on_disk=False)
# convenience fun
diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py
new file mode 100644
index 0000000..195cef7
--- /dev/null
+++ b/src/leap/mail/imap/memorystore.py
@@ -0,0 +1,961 @@
+# -*- coding: utf-8 -*-
+# memorystore.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+In-memory transient store for a LEAPIMAPServer.
+"""
+import contextlib
+import logging
+import threading
+import weakref
+
+from collections import defaultdict
+from copy import copy
+
+from twisted.internet import defer
+from twisted.internet.task import LoopingCall
+from twisted.python import log
+from zope.interface import implements
+
+from leap.common.check import leap_assert_type
+from leap.mail import size
+from leap.mail.decorators import deferred_to_thread
+from leap.mail.utils import empty
+from leap.mail.messageflow import MessageProducer
+from leap.mail.imap import interfaces
+from leap.mail.imap.fields import fields
+from leap.mail.imap.messageparts import MessagePartType, MessagePartDoc
+from leap.mail.imap.messageparts import RecentFlagsDoc
+from leap.mail.imap.messageparts import MessageWrapper
+from leap.mail.imap.messageparts import ReferenciableDict
+
+logger = logging.getLogger(__name__)
+
+
+# The default period to do writebacks to the permanent
+# soledad storage, in seconds.
+SOLEDAD_WRITE_PERIOD = 10
+
+
+@contextlib.contextmanager
+def set_bool_flag(obj, att):
+ """
+ Set a boolean flag to True while we're doing our thing.
+ Just to let the world know.
+ """
+ setattr(obj, att, True)
+ try:
+ yield True
+ except RuntimeError as exc:
+ logger.exception(exc)
+ finally:
+ setattr(obj, att, False)
+
+
+class MemoryStore(object):
+ """
+ An in-memory store to where we can write the different parts that
+ we split the messages into and buffer them until we write them to the
+ permanent storage.
+
+ It uses MessageWrapper instances to represent the message-parts, which are
+ indexed by mailbox name and UID.
+
+ It also can be passed a permanent storage as a paremeter (any implementor
+ of IMessageStore, in this case a SoledadStore). In this case, a periodic
+ dump of the messages stored in memory will be done. The period of the
+ writes to the permanent storage is controled by the write_period parameter
+ in the constructor.
+ """
+ implements(interfaces.IMessageStore,
+ interfaces.IMessageStoreWriter)
+
+ # TODO We will want to index by chash when we transition to local-only
+ # UIDs.
+
+ WRITING_FLAG = "_writing"
+ _last_uid_lock = threading.Lock()
+
+ def __init__(self, permanent_store=None,
+ write_period=SOLEDAD_WRITE_PERIOD):
+ """
+ Initialize a MemoryStore.
+
+ :param permanent_store: a IMessageStore implementor to dump
+ messages to.
+ :type permanent_store: IMessageStore
+ :param write_period: the interval to dump messages to disk, in seconds.
+ :type write_period: int
+ """
+ self._permanent_store = permanent_store
+ self._write_period = write_period
+
+ # Internal Storage: messages
+ self._msg_store = {}
+
+ # Internal Storage: payload-hash
+ """
+ {'phash': weakreaf.proxy(dict)}
+ """
+ self._phash_store = {}
+
+ # Internal Storage: content-hash:fdoc
+ """
+ chash-fdoc-store keeps references to
+ the flag-documents indexed by content-hash.
+
+ {'chash': {'mbox-a': weakref.proxy(dict),
+ 'mbox-b': weakref.proxy(dict)}
+ }
+ """
+ self._chash_fdoc_store = {}
+
+ # Internal Storage: recent-flags store
+ """
+ recent-flags store keeps one dict per mailbox,
+ with the document-id of the u1db document
+ and the set of the UIDs that have the recent flag.
+
+ {'mbox-a': {'doc_id': 'deadbeef',
+ 'set': {1,2,3,4}
+ }
+ }
+ """
+ # TODO this will have to transition to content-hash
+ # indexes after we move to local-only UIDs.
+
+ self._rflags_store = defaultdict(
+ lambda: {'doc_id': None, 'set': set([])})
+
+ """
+ last-uid store keeps the count of the highest UID
+ per mailbox.
+
+ {'mbox-a': 42,
+ 'mbox-b': 23}
+ """
+ self._last_uid = {}
+
+ """
+ known-uids keeps a count of the uids that soledad knows for a given
+ mailbox
+
+ {'mbox-a': set([1,2,3])}
+ """
+ self._known_uids = defaultdict(set)
+
+ # New and dirty flags, to set MessageWrapper State.
+ self._new = set([])
+ self._new_deferreds = {}
+ self._dirty = set([])
+ self._rflags_dirty = set([])
+ self._dirty_deferreds = {}
+
+ # Flag for signaling we're busy writing to the disk storage.
+ setattr(self, self.WRITING_FLAG, False)
+
+ if self._permanent_store is not None:
+ # this producer spits its messages to the permanent store
+ # consumer using a queue. We will use that to put
+ # our messages to be written.
+ self.producer = MessageProducer(permanent_store,
+ period=0.1)
+ # looping call for dumping to SoledadStore
+ self._write_loop = LoopingCall(self.write_messages,
+ permanent_store)
+
+ # We can start the write loop right now, why wait?
+ self._start_write_loop()
+
+ def _start_write_loop(self):
+ """
+ Start loop for writing to disk database.
+ """
+ if not self._write_loop.running:
+ self._write_loop.start(self._write_period, now=True)
+
+ def _stop_write_loop(self):
+ """
+ Stop loop for writing to disk database.
+ """
+ if self._write_loop.running:
+ self._write_loop.stop()
+
+ # IMessageStore
+
+ # XXX this would work well for whole message operations.
+ # We would have to add a put_flags operation to modify only
+ # the flags doc (and set the dirty flag accordingly)
+
+ def create_message(self, mbox, uid, message, observer,
+ notify_on_disk=True):
+ """
+ Create the passed message into this MemoryStore.
+
+ By default we consider that any message is a new message.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param uid: the UID for the message
+ :type uid: int
+ :param message: a message to be added
+ :type message: MessageWrapper
+ :param observer: the deferred that will fire with the
+ UID of the message. If notify_on_disk is True,
+ this will happen when the message is written to
+ Soledad. Otherwise it will fire as soon as we've
+ added the message to the memory store.
+ :type observer: Deferred
+ :param notify_on_disk: whether the `observer` deferred should
+ wait until the message is written to disk to
+ be fired.
+ :type notify_on_disk: bool
+ """
+ log.msg("adding new doc to memstore %r (%r)" % (mbox, uid))
+ key = mbox, uid
+
+ self._add_message(mbox, uid, message, notify_on_disk)
+ self._new.add(key)
+
+ def log_add(result):
+ log.msg("message save: %s" % result)
+ return result
+ observer.addCallback(log_add)
+
+ if notify_on_disk:
+ # We store this deferred so we can keep track of the pending
+ # operations internally.
+ # TODO this should fire with the UID !!! -- change that in
+ # the soledad store code.
+ self._new_deferreds[key] = observer
+ if not notify_on_disk:
+ # Caller does not care, just fired and forgot, so we pass
+ # a defer that will inmediately have its callback triggered.
+ observer.callback(uid)
+
+ def put_message(self, mbox, uid, message, notify_on_disk=True):
+ """
+ Put an existing message.
+
+ This will set the dirty flag on the MemoryStore.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param uid: the UID for the message
+ :type uid: int
+ :param message: a message to be added
+ :type message: MessageWrapper
+ :param notify_on_disk: whether the deferred that is returned should
+ wait until the message is written to disk to
+ be fired.
+ :type notify_on_disk: bool
+
+ :return: a Deferred. if notify_on_disk is True, will be fired
+ when written to the db on disk.
+ Otherwise will fire inmediately
+ :rtype: Deferred
+ """
+ key = mbox, uid
+ d = defer.Deferred()
+ d.addCallback(lambda result: log.msg("message PUT save: %s" % result))
+
+ self._dirty.add(key)
+ self._dirty_deferreds[key] = d
+ self._add_message(mbox, uid, message, notify_on_disk)
+ return d
+
+ def _add_message(self, mbox, uid, message, notify_on_disk=True):
+ """
+ Helper method, called by both create_message and put_message.
+ See those for parameter documentation.
+ """
+ # XXX have to differentiate between notify_new and notify_dirty
+ # TODO defaultdict the hell outa here...
+
+ key = mbox, uid
+ msg_dict = message.as_dict()
+
+ FDOC = MessagePartType.fdoc.key
+ HDOC = MessagePartType.hdoc.key
+ CDOCS = MessagePartType.cdocs.key
+ DOCS_ID = MessagePartType.docs_id.key
+
+ try:
+ store = self._msg_store[key]
+ except KeyError:
+ self._msg_store[key] = {FDOC: {},
+ HDOC: {},
+ CDOCS: {},
+ DOCS_ID: {}}
+ store = self._msg_store[key]
+
+ fdoc = msg_dict.get(FDOC, None)
+ if fdoc:
+ if not store.get(FDOC, None):
+ store[FDOC] = ReferenciableDict({})
+ store[FDOC].update(fdoc)
+
+ # content-hash indexing
+ chash = fdoc.get(fields.CONTENT_HASH_KEY)
+ chash_fdoc_store = self._chash_fdoc_store
+ if not chash in chash_fdoc_store:
+ chash_fdoc_store[chash] = {}
+
+ chash_fdoc_store[chash][mbox] = weakref.proxy(
+ store[FDOC])
+
+ hdoc = msg_dict.get(HDOC, None)
+ if hdoc is not None:
+ if not store.get(HDOC, None):
+ store[HDOC] = ReferenciableDict({})
+ store[HDOC].update(hdoc)
+
+ docs_id = msg_dict.get(DOCS_ID, None)
+ if docs_id:
+ if not store.get(DOCS_ID, None):
+ store[DOCS_ID] = {}
+ store[DOCS_ID].update(docs_id)
+
+ cdocs = message.cdocs
+ for cdoc_key in cdocs.keys():
+ if not store.get(CDOCS, None):
+ store[CDOCS] = {}
+
+ cdoc = cdocs[cdoc_key]
+ # first we make it weak-referenciable
+ referenciable_cdoc = ReferenciableDict(cdoc)
+ store[CDOCS][cdoc_key] = referenciable_cdoc
+ phash = cdoc.get(fields.PAYLOAD_HASH_KEY, None)
+ if not phash:
+ continue
+ self._phash_store[phash] = weakref.proxy(referenciable_cdoc)
+
+ def prune(seq, store):
+ for key in seq:
+ if key in store and empty(store.get(key)):
+ store.pop(key)
+ prune((FDOC, HDOC, CDOCS, DOCS_ID), store)
+
+ def get_docid_for_fdoc(self, mbox, uid):
+ """
+ Return Soledad document id for the flags-doc for a given mbox and uid,
+ or None of no flags document could be found.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param uid: the message UID
+ :type uid: int
+ :rtype: unicode or None
+ """
+ fdoc = self._permanent_store.get_flags_doc(mbox, uid)
+ if empty(fdoc):
+ return None
+ doc_id = fdoc.doc_id
+ return doc_id
+
+ def get_message(self, mbox, uid, flags_only=False):
+ """
+ Get a MessageWrapper for the given mbox and uid combination.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param uid: the message UID
+ :type uid: int
+ :param flags_only: whether the message should carry only a reference
+ to the flags document.
+ :type flags_only: bool
+
+ :return: MessageWrapper or None
+ """
+ key = mbox, uid
+ FDOC = MessagePartType.fdoc.key
+
+ msg_dict = self._msg_store.get(key, None)
+ if empty(msg_dict):
+ return None
+ new, dirty = self._get_new_dirty_state(key)
+ if flags_only:
+ return MessageWrapper(fdoc=msg_dict[FDOC],
+ new=new, dirty=dirty,
+ memstore=weakref.proxy(self))
+ else:
+ return MessageWrapper(from_dict=msg_dict,
+ new=new, dirty=dirty,
+ memstore=weakref.proxy(self))
+
+ def remove_message(self, mbox, uid):
+ """
+ Remove a Message from this MemoryStore.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param uid: the message UID
+ :type uid: int
+ """
+ # XXX For the moment we are only removing the flags and headers
+ # docs. The rest we leave there polluting your hard disk,
+ # until we think about a good way of deorphaning.
+
+ # XXX implement elijah's idea of using a PUT document as a
+ # token to ensure consistency in the removal.
+
+ try:
+ key = mbox, uid
+ self._new.discard(key)
+ self._dirty.discard(key)
+ self._msg_store.pop(key, None)
+ except Exception as exc:
+ logger.exception(exc)
+
+ # IMessageStoreWriter
+
+ def write_messages(self, store):
+ """
+ Write the message documents in this MemoryStore to a different store.
+
+ :param store: the IMessageStore to write to
+ """
+ # For now, we pass if the queue is not empty, to avoid duplicate
+ # queuing.
+ # We would better use a flag to know when we've already enqueued an
+ # item.
+
+ # XXX this could return the deferred for all the enqueued operations
+
+ if not self.producer.is_queue_empty():
+ return
+
+ if any(map(lambda i: not empty(i), (self._new, self._dirty))):
+ logger.info("Writing messages to Soledad...")
+
+ # TODO change for lock, and make the property access
+ # is accquired
+ with set_bool_flag(self, self.WRITING_FLAG):
+ for rflags_doc_wrapper in self.all_rdocs_iter():
+ self.producer.push(rflags_doc_wrapper)
+ for msg_wrapper in self.all_new_dirty_msg_iter():
+ self.producer.push(msg_wrapper)
+
+ # MemoryStore specific methods.
+
+ def get_uids(self, mbox):
+ """
+ Get all uids for a given mbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :rtype: list
+ """
+ all_keys = self._msg_store.keys()
+ return [uid for m, uid in all_keys if m == mbox]
+
+ def get_soledad_known_uids(self, mbox):
+ """
+ Get all uids that soledad knows about, from the memory cache.
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :rtype: list
+ """
+ return self._known_uids.get(mbox, [])
+
+ # last_uid
+
+ def get_last_uid(self, mbox):
+ """
+ Return the highest UID for a given mbox.
+ It will be the highest between the highest uid in the message store for
+ the mailbox, and the soledad integer cache.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :rtype: int
+ """
+ uids = self.get_uids(mbox)
+ last_mem_uid = uids and max(uids) or 0
+ last_soledad_uid = self.get_last_soledad_uid(mbox)
+ return max(last_mem_uid, last_soledad_uid)
+
+ def get_last_soledad_uid(self, mbox):
+ """
+ Get last uid for a given mbox from the soledad integer cache.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ """
+ return self._last_uid.get(mbox, 0)
+
+ def set_last_soledad_uid(self, mbox, value):
+ """
+ Set last uid for a given mbox in the soledad integer cache.
+ SoledadMailbox should prime this value during initialization.
+ Other methods (during message adding) SHOULD call
+ `increment_last_soledad_uid` instead.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param value: the value to set
+ :type value: int
+ """
+ leap_assert_type(value, int)
+ logger.info("setting last soledad uid for %s to %s" %
+ (mbox, value))
+ # if we already have a value here, don't do anything
+ with self._last_uid_lock:
+ if not self._last_uid.get(mbox, None):
+ self._last_uid[mbox] = value
+
+ def set_known_uids(self, mbox, value):
+ """
+ Set the value fo the known-uids set for this mbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param value: a sequence of integers to be added to the set.
+ :type value: tuple
+ """
+ current = self._known_uids[mbox]
+ self._known_uids[mbox] = current.union(set(value))
+
+ def increment_last_soledad_uid(self, mbox):
+ """
+ Increment by one the soledad integer cache for the last_uid for
+ this mbox, and fire a defer-to-thread to update the soledad value.
+ The caller should lock the call tho this method.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ """
+ with self._last_uid_lock:
+ self._last_uid[mbox] += 1
+ value = self._last_uid[mbox]
+ self.write_last_uid(mbox, value)
+ return value
+
+ @deferred_to_thread
+ def write_last_uid(self, mbox, value):
+ """
+ Increment the soledad integer cache for the highest uid value.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param value: the value to set
+ :type value: int
+ """
+ leap_assert_type(value, int)
+ if self._permanent_store:
+ self._permanent_store.write_last_uid(mbox, value)
+
+ # Counting sheeps...
+
+ def count_new_mbox(self, mbox):
+ """
+ Count the new messages by inbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :return: number of new messages
+ :rtype: int
+ """
+ return len([(m, uid) for m, uid in self._new if mbox == mbox])
+
+ # XXX used at all?
+ def count_new(self):
+ """
+ Count all the new messages in the MemoryStore.
+
+ :rtype: int
+ """
+ return len(self._new)
+
+ def get_cdoc_from_phash(self, phash):
+ """
+ Return a content-document by its payload-hash.
+
+ :param phash: the payload hash to check against
+ :type phash: str or unicode
+ :rtype: MessagePartDoc
+ """
+ doc = self._phash_store.get(phash, None)
+
+ # XXX return None for consistency?
+
+ # XXX have to keep a mapping between phash and its linkage
+ # info, to know if this payload is been already saved or not.
+ # We will be able to get this from the linkage-docs,
+ # not yet implemented.
+ new = True
+ dirty = False
+ return MessagePartDoc(
+ new=new, dirty=dirty, store="mem",
+ part=MessagePartType.cdoc,
+ content=doc,
+ doc_id=None)
+
+ def get_fdoc_from_chash(self, chash, mbox):
+ """
+ Return a flags-document by its content-hash and a given mailbox.
+ Used during content-duplication detection while copying or adding a
+ message.
+
+ :param chash: the content hash to check against
+ :type chash: str or unicode
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+
+ :return: MessagePartDoc. It will return None if the flags document
+ has empty content or it is flagged as \\Deleted.
+ """
+ docs_dict = self._chash_fdoc_store.get(chash, None)
+ fdoc = docs_dict.get(mbox, None) if docs_dict else None
+
+ # a couple of special cases.
+ # 1. We might have a doc with empty content...
+ if empty(fdoc):
+ return None
+
+ # 2. ...Or the message could exist, but being flagged for deletion.
+ # We want to create a new one in this case.
+ # Hmmm what if the deletion is un-done?? We would end with a
+ # duplicate...
+ if fdoc and fields.DELETED_FLAG in fdoc[fields.FLAGS_KEY]:
+ return None
+
+ uid = fdoc[fields.UID_KEY]
+ key = mbox, uid
+ new = key in self._new
+ dirty = key in self._dirty
+ return MessagePartDoc(
+ new=new, dirty=dirty, store="mem",
+ part=MessagePartType.fdoc,
+ content=fdoc,
+ doc_id=None)
+
+ def all_msg_iter(self):
+ """
+ Return generator that iterates through all messages in the store.
+
+ :return: generator of MessageWrappers
+ :rtype: generator
+ """
+ return (self.get_message(*key)
+ for key in sorted(self._msg_store.keys()))
+
+ def all_new_dirty_msg_iter(self):
+ """
+ Return generator that iterates through all new and dirty messages.
+
+ :return: generator of MessageWrappers
+ :rtype: generator
+ """
+ return (self.get_message(*key)
+ for key in sorted(self._msg_store.keys())
+ if key in self._new or key in self._dirty)
+
+ def all_msg_dict_for_mbox(self, mbox):
+ """
+ Return all the message dicts for a given mbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :return: list of dictionaries
+ :rtype: list
+ """
+ # This *needs* to return a fixed sequence. Otherwise the dictionary len
+ # will change during iteration, when we modify it
+ return [self._msg_store[(mb, uid)]
+ for mb, uid in self._msg_store if mb == mbox]
+
+ def all_deleted_uid_iter(self, mbox):
+ """
+ Return a list with the UIDs for all messags
+ with deleted flag in a given mailbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :return: list of integers
+ :rtype: list
+ """
+ # This *needs* to return a fixed sequence. Otherwise the dictionary len
+ # will change during iteration, when we modify it
+ all_deleted = [
+ msg['fdoc']['uid'] for msg in self.all_msg_dict_for_mbox(mbox)
+ if msg.get('fdoc', None)
+ and fields.DELETED_FLAG in msg['fdoc']['flags']]
+ return all_deleted
+
+ # new, dirty flags
+
+ def _get_new_dirty_state(self, key):
+ """
+ Return `new` and `dirty` flags for a given message.
+
+ :param key: the key for the message, in the form mbox, uid
+ :type key: tuple
+ :return: tuple of bools
+ :rtype: tuple
+ """
+ # XXX should return *first* the news, and *then* the dirty...
+ return map(lambda _set: key in _set, (self._new, self._dirty))
+
+ def set_new(self, key):
+ """
+ Add the key value to the `new` set.
+
+ :param key: the key for the message, in the form mbox, uid
+ :type key: tuple
+ """
+ self._new.add(key)
+
+ def unset_new(self, key):
+ """
+ Remove the key value from the `new` set.
+
+ :param key: the key for the message, in the form mbox, uid
+ :type key: tuple
+ """
+ self._new.discard(key)
+ deferreds = self._new_deferreds
+ d = deferreds.get(key, None)
+ if d:
+ # XXX use a namedtuple for passing the result
+ # when we check it in the other side.
+ d.callback('%s, ok' % str(key))
+ deferreds.pop(key)
+
+ def set_dirty(self, key):
+ """
+ Add the key value to the `dirty` set.
+
+ :param key: the key for the message, in the form mbox, uid
+ :type key: tuple
+ """
+ self._dirty.add(key)
+
+ def unset_dirty(self, key):
+ """
+ Remove the key value from the `dirty` set.
+
+ :param key: the key for the message, in the form mbox, uid
+ :type key: tuple
+ """
+ self._dirty.discard(key)
+ deferreds = self._dirty_deferreds
+ d = deferreds.get(key, None)
+ if d:
+ # XXX use a namedtuple for passing the result
+ # when we check it in the other side.
+ d.callback('%s, ok' % str(key))
+ deferreds.pop(key)
+
+ # Recent Flags
+
+ # TODO --- nice but unused
+ def set_recent_flag(self, mbox, uid):
+ """
+ Set the `Recent` flag for a given mailbox and UID.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param uid: the message UID
+ :type uid: int
+ """
+ self._rflags_dirty.add(mbox)
+ self._rflags_store[mbox]['set'].add(uid)
+
+ # TODO --- nice but unused
+ def unset_recent_flag(self, mbox, uid):
+ """
+ Unset the `Recent` flag for a given mailbox and UID.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param uid: the message UID
+ :type uid: int
+ """
+ self._rflags_store[mbox]['set'].discard(uid)
+
+ def set_recent_flags(self, mbox, value):
+ """
+ Set the value for the set of the recent flags.
+ Used from the property in the MessageCollection.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param value: a sequence of flags to set
+ :type value: sequence
+ """
+ self._rflags_dirty.add(mbox)
+ self._rflags_store[mbox]['set'] = set(value)
+
+ def load_recent_flags(self, mbox, flags_doc):
+ """
+ Load the passed flags document in the recent flags store, for a given
+ mailbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param flags_doc: A dictionary containing the `doc_id` of the Soledad
+ flags-document for this mailbox, and the `set`
+ of uids marked with that flag.
+ """
+ self._rflags_store[mbox] = flags_doc
+
+ def get_recent_flags(self, mbox):
+ """
+ Return the set of UIDs with the `Recent` flag for this mailbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :rtype: set, or None
+ """
+ rflag_for_mbox = self._rflags_store.get(mbox, None)
+ if not rflag_for_mbox:
+ return None
+ return self._rflags_store[mbox]['set']
+
+ def all_rdocs_iter(self):
+ """
+ Return an iterator through all in-memory recent flag dicts, wrapped
+ under a RecentFlagsDoc namedtuple.
+ Used for saving to disk.
+
+ :return: a generator of RecentFlagDoc
+ :rtype: generator
+ """
+ # XXX use enums
+ DOC_ID = "doc_id"
+ SET = "set"
+
+ rflags_store = self._rflags_store
+
+ def get_rdoc(mbox, rdict):
+ mbox_rflag_set = rdict[SET]
+ recent_set = copy(mbox_rflag_set)
+ # zero it!
+ mbox_rflag_set.difference_update(mbox_rflag_set)
+ return RecentFlagsDoc(
+ doc_id=rflags_store[mbox][DOC_ID],
+ content={
+ fields.TYPE_KEY: fields.TYPE_RECENT_VAL,
+ fields.MBOX_KEY: mbox,
+ fields.RECENTFLAGS_KEY: list(recent_set)
+ })
+
+ return (get_rdoc(mbox, rdict) for mbox, rdict in rflags_store.items()
+ if not empty(rdict[SET]))
+
+ # Methods that mirror the IMailbox interface
+
+ def remove_all_deleted(self, mbox):
+ """
+ Remove all messages flagged \\Deleted from this Memory Store only.
+ Called from `expunge`
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :return: a list of UIDs
+ :rtype: list
+ """
+ mem_deleted = self.all_deleted_uid_iter(mbox)
+ for uid in mem_deleted:
+ self.remove_message(mbox, uid)
+ return mem_deleted
+
+ def expunge(self, mbox, observer):
+ """
+ Remove all messages flagged \\Deleted, from the Memory Store
+ and from the permanent store also.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param observer: a deferred that will be fired when expunge is done
+ :type observer: Deferred
+ :return: a list of UIDs
+ :rtype: list
+ """
+ # TODO expunge should add itself as a callback to the ongoing
+ # writes.
+ soledad_store = self._permanent_store
+ all_deleted = []
+
+ try:
+ # 1. Stop the writing call
+ self._stop_write_loop()
+ # 2. Enqueue a last write.
+ #self.write_messages(soledad_store)
+ # 3. Should wait on the writebacks to finish ???
+ # FIXME wait for this, and add all the rest of the method
+ # as a callback!!!
+ except Exception as exc:
+ logger.exception(exc)
+
+ # Now, we...:
+
+ try:
+ # 1. Delete all messages marked as deleted in soledad.
+
+ # XXX this could be deferred for faster operation.
+ if soledad_store:
+ sol_deleted = soledad_store.remove_all_deleted(mbox)
+ else:
+ sol_deleted = []
+
+ try:
+ self._known_uids[mbox].difference_update(set(sol_deleted))
+ except Exception as exc:
+ logger.exception(exc)
+
+ # 2. Delete all messages marked as deleted in memory.
+ mem_deleted = self.remove_all_deleted(mbox)
+
+ all_deleted = set(mem_deleted).union(set(sol_deleted))
+ logger.debug("deleted %r" % all_deleted)
+ except Exception as exc:
+ logger.exception(exc)
+ finally:
+ self._start_write_loop()
+ observer.callback(True)
+ return all_deleted
+
+ # Dump-to-disk controls.
+
+ @property
+ def is_writing(self):
+ """
+ Property that returns whether the store is currently writing its
+ internal state to a permanent storage.
+
+ Used to evaluate whether the CHECK command can inform that the field
+ is clear to proceed, or waiting for the write operations to complete
+ is needed instead.
+
+ :rtype: bool
+ """
+ # FIXME this should return a deferred !!!
+ # XXX ----- can fire when all new + dirty deferreds
+ # are done (gatherResults)
+ return getattr(self, self.WRITING_FLAG)
+
+ # Memory management.
+
+ def get_size(self):
+ """
+ Return the size of the internal storage.
+ Use for calculating the limit beyond which we should flush the store.
+
+ :rtype: int
+ """
+ return size.get_size(self._msg_store)
diff --git a/src/leap/mail/imap/messageparts.py b/src/leap/mail/imap/messageparts.py
new file mode 100644
index 0000000..b07681b
--- /dev/null
+++ b/src/leap/mail/imap/messageparts.py
@@ -0,0 +1,565 @@
+# messageparts.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+MessagePart implementation. Used from LeapMessage.
+"""
+import logging
+import StringIO
+import weakref
+
+from collections import namedtuple
+
+from enum import Enum
+from zope.interface import implements
+from twisted.mail import imap4
+
+from leap.common.decorators import memoized_method
+from leap.common.mail import get_email_charset
+from leap.mail.imap import interfaces
+from leap.mail.imap.fields import fields
+from leap.mail.utils import empty, first, find_charset
+
+MessagePartType = Enum("hdoc", "fdoc", "cdoc", "cdocs", "docs_id")
+
+
+logger = logging.getLogger(__name__)
+
+
+"""
+A MessagePartDoc is a light wrapper around the dictionary-like
+data that we pass along for message parts. It can be used almost everywhere
+that you would expect a SoledadDocument, since it has a dict under the
+`content` attribute.
+
+We also keep some metadata on it, relative in part to the message as a whole,
+and sometimes to a part in particular only.
+
+* `new` indicates that the document has just been created. SoledadStore
+ should just create a new doc for all the related message parts.
+* `store` indicates the type of store a given MessagePartDoc lives in.
+ We currently use this to indicate that the document comes from memeory,
+ but we should probably get rid of it as soon as we extend the use of the
+ SoledadStore interface along LeapMessage, MessageCollection and Mailbox.
+* `part` is one of the MessagePartType enums.
+
+* `dirty` indicates that, while we already have the document in Soledad,
+ we have modified its state in memory, so we need to put_doc instead while
+ dumping the MemoryStore contents.
+ `dirty` attribute would only apply to flags-docs and linkage-docs.
+* `doc_id` is the identifier for the document in the u1db database, if any.
+
+"""
+
+MessagePartDoc = namedtuple(
+ 'MessagePartDoc',
+ ['new', 'dirty', 'part', 'store', 'content', 'doc_id'])
+
+"""
+A RecentFlagsDoc is used to send the recent-flags document payload to the
+SoledadWriter during dumps.
+"""
+RecentFlagsDoc = namedtuple(
+ 'RecentFlagsDoc',
+ ['content', 'doc_id'])
+
+
+class ReferenciableDict(dict):
+ """
+ A dict that can be weak-referenced.
+
+ Some builtin objects are not weak-referenciable unless
+ subclassed. So we do.
+
+ Used to return pointers to the items in the MemoryStore.
+ """
+
+
+class MessageWrapper(object):
+ """
+ A simple nested dictionary container around the different message subparts.
+ """
+ implements(interfaces.IMessageContainer)
+
+ FDOC = "fdoc"
+ HDOC = "hdoc"
+ CDOCS = "cdocs"
+ DOCS_ID = "docs_id"
+
+ # Using slots to limit some the memory footprint,
+ # Add your attribute here.
+
+ __slots__ = ["_dict", "_new", "_dirty", "_storetype", "memstore"]
+
+ def __init__(self, fdoc=None, hdoc=None, cdocs=None,
+ from_dict=None, memstore=None,
+ new=True, dirty=False, docs_id={}):
+ """
+ Initialize a MessageWrapper.
+ """
+ # TODO add optional reference to original message in the incoming
+ self._dict = {}
+ self.memstore = memstore
+
+ self._new = new
+ self._dirty = dirty
+
+ self._storetype = "mem"
+
+ if from_dict is not None:
+ self.from_dict(from_dict)
+ else:
+ if fdoc is not None:
+ self._dict[self.FDOC] = ReferenciableDict(fdoc)
+ if hdoc is not None:
+ self._dict[self.HDOC] = ReferenciableDict(hdoc)
+ if cdocs is not None:
+ self._dict[self.CDOCS] = ReferenciableDict(cdocs)
+
+ # This will keep references to the doc_ids to be able to put
+ # messages to soledad. It will be populated during the walk() to avoid
+ # the overhead of reading from the db.
+
+ # XXX it really *only* make sense for the FDOC, the other parts
+ # should not be "dirty", just new...!!!
+ self._dict[self.DOCS_ID] = docs_id
+
+ # properties
+
+ # TODO Could refactor new and dirty properties together.
+
+ def _get_new(self):
+ """
+ Get the value for the `new` flag.
+
+ :rtype: bool
+ """
+ return self._new
+
+ def _set_new(self, value=True):
+ """
+ Set the value for the `new` flag, and propagate it
+ to the memory store if any.
+
+ :param value: the value to set
+ :type value: bool
+ """
+ self._new = value
+ if self.memstore:
+ mbox = self.fdoc.content['mbox']
+ uid = self.fdoc.content['uid']
+ key = mbox, uid
+ fun = [self.memstore.unset_new,
+ self.memstore.set_new][int(value)]
+ fun(key)
+ else:
+ logger.warning("Could not find a memstore referenced from this "
+ "MessageWrapper. The value for new will not be "
+ "propagated")
+
+ new = property(_get_new, _set_new,
+ doc="The `new` flag for this MessageWrapper")
+
+ def _get_dirty(self):
+ """
+ Get the value for the `dirty` flag.
+
+ :rtype: bool
+ """
+ return self._dirty
+
+ def _set_dirty(self, value=True):
+ """
+ Set the value for the `dirty` flag, and propagate it
+ to the memory store if any.
+
+ :param value: the value to set
+ :type value: bool
+ """
+ self._dirty = value
+ if self.memstore:
+ mbox = self.fdoc.content['mbox']
+ uid = self.fdoc.content['uid']
+ key = mbox, uid
+ fun = [self.memstore.unset_dirty,
+ self.memstore.set_dirty][int(value)]
+ fun(key)
+ else:
+ logger.warning("Could not find a memstore referenced from this "
+ "MessageWrapper. The value for new will not be "
+ "propagated")
+
+ dirty = property(_get_dirty, _set_dirty)
+
+ # IMessageContainer
+
+ @property
+ def fdoc(self):
+ """
+ Return a MessagePartDoc wrapping around a weak reference to
+ the flags-document in this MemoryStore, if any.
+
+ :rtype: MessagePartDoc
+ """
+ _fdoc = self._dict.get(self.FDOC, None)
+ if _fdoc:
+ content_ref = weakref.proxy(_fdoc)
+ else:
+ logger.warning("NO FDOC!!!")
+ content_ref = {}
+
+ return MessagePartDoc(new=self.new, dirty=self.dirty,
+ store=self._storetype,
+ part=MessagePartType.fdoc,
+ content=content_ref,
+ doc_id=self._dict[self.DOCS_ID].get(
+ self.FDOC, None))
+
+ @property
+ def hdoc(self):
+ """
+ Return a MessagePartDoc wrapping around a weak reference to
+ the headers-document in this MemoryStore, if any.
+
+ :rtype: MessagePartDoc
+ """
+ _hdoc = self._dict.get(self.HDOC, None)
+ if _hdoc:
+ content_ref = weakref.proxy(_hdoc)
+ else:
+ content_ref = {}
+ return MessagePartDoc(new=self.new, dirty=self.dirty,
+ store=self._storetype,
+ part=MessagePartType.hdoc,
+ content=content_ref,
+ doc_id=self._dict[self.DOCS_ID].get(
+ self.HDOC, None))
+
+ @property
+ def cdocs(self):
+ """
+ Return a weak reference to a zero-indexed dict containing
+ the content-documents, or an empty dict if none found.
+ If you want access to the MessagePartDoc for the individual
+ parts, use the generator returned by `walk` instead.
+
+ :rtype: dict
+ """
+ _cdocs = self._dict.get(self.CDOCS, None)
+ if _cdocs:
+ return weakref.proxy(_cdocs)
+ else:
+ return {}
+
+ def walk(self):
+ """
+ Generator that iterates through all the parts, returning
+ MessagePartDoc. Used for writing to SoledadStore.
+
+ :rtype: generator
+ """
+ if self._dirty:
+ mbox = self.fdoc.content[fields.MBOX_KEY]
+ uid = self.fdoc.content[fields.UID_KEY]
+ docid_dict = self._dict[self.DOCS_ID]
+ docid_dict[self.FDOC] = self.memstore.get_docid_for_fdoc(
+ mbox, uid)
+
+ if not empty(self.fdoc.content):
+ yield self.fdoc
+ if not empty(self.hdoc.content):
+ yield self.hdoc
+ for cdoc in self.cdocs.values():
+ if not empty(cdoc):
+ content_ref = weakref.proxy(cdoc)
+ yield MessagePartDoc(new=self.new, dirty=self.dirty,
+ store=self._storetype,
+ part=MessagePartType.cdoc,
+ content=content_ref,
+ doc_id=None)
+
+ # i/o
+
+ def as_dict(self):
+ """
+ Return a dict representation of the parts contained.
+
+ :rtype: dict
+ """
+ return self._dict
+
+ def from_dict(self, msg_dict):
+ """
+ Populate MessageWrapper parts from a dictionary.
+ It expects the same format that we use in a
+ MessageWrapper.
+
+
+ :param msg_dict: a dictionary containing the parts to populate
+ the MessageWrapper from
+ :type msg_dict: dict
+ """
+ fdoc, hdoc, cdocs = map(
+ lambda part: msg_dict.get(part, None),
+ [self.FDOC, self.HDOC, self.CDOCS])
+
+ for t, doc in ((self.FDOC, fdoc), (self.HDOC, hdoc),
+ (self.CDOCS, cdocs)):
+ self._dict[t] = ReferenciableDict(doc) if doc else None
+
+
+class MessagePart(object):
+ """
+ IMessagePart implementor, to be passed to several methods
+ of the IMAP4Server.
+ It takes a subpart message and is able to find
+ the inner parts.
+
+ See the interface documentation.
+ """
+
+ implements(imap4.IMessagePart)
+
+ def __init__(self, soledad, part_map):
+ """
+ Initializes the MessagePart.
+
+ :param soledad: Soledad instance.
+ :type soledad: Soledad
+ :param part_map: a dictionary containing the parts map for this
+ message
+ :type part_map: dict
+ """
+ # TODO
+ # It would be good to pass the uid/mailbox also
+ # for references while debugging.
+
+ # We have a problem on bulk moves, and is
+ # that when the fetch on the new mailbox is done
+ # the parts maybe are not complete.
+ # So we should be able to fail with empty
+ # docs until we solve that. The ideal would be
+ # to gather the results of the deferred operations
+ # to signal the operation is complete.
+ #leap_assert(part_map, "part map dict cannot be null")
+
+ self._soledad = soledad
+ self._pmap = part_map
+
+ def getSize(self):
+ """
+ Return the total size, in octets, of this message part.
+
+ :return: size of the message, in octets
+ :rtype: int
+ """
+ if empty(self._pmap):
+ return 0
+ size = self._pmap.get('size', None)
+ if size is None:
+ logger.error("Message part cannot find size in the partmap")
+ size = 0
+ return size
+
+ def getBodyFile(self):
+ """
+ Retrieve a file object containing only the body of this message.
+
+ :return: file-like object opened for reading
+ :rtype: StringIO
+ """
+ fd = StringIO.StringIO()
+ if not empty(self._pmap):
+ multi = self._pmap.get('multi')
+ if not multi:
+ phash = self._pmap.get("phash", None)
+ else:
+ pmap = self._pmap.get('part_map')
+ first_part = pmap.get('1', None)
+ if not empty(first_part):
+ phash = first_part['phash']
+ else:
+ phash = None
+
+ if phash is None:
+ logger.warning("Could not find phash for this subpart!")
+ payload = ""
+ else:
+ payload = self._get_payload_from_document(phash)
+
+ else:
+ logger.warning("Message with no part_map!")
+ payload = ""
+
+ if payload:
+ content_type = self._get_ctype_from_document(phash)
+ charset = find_charset(content_type)
+ logger.debug("Got charset from header: %s" % (charset,))
+ if charset is None:
+ charset = self._get_charset(payload)
+ logger.debug("Got charset: %s" % (charset,))
+ try:
+ if isinstance(payload, unicode):
+ payload = payload.encode(charset)
+ except UnicodeError as exc:
+ logger.error(
+ "Unicode error, using 'replace'. {0!r}".format(exc))
+ payload = payload.encode(charset, 'replace')
+
+ fd.write(payload)
+ fd.seek(0)
+ return fd
+
+ # TODO should memory-bound this memoize!!!
+ @memoized_method
+ def _get_payload_from_document(self, phash):
+ """
+ Return the message payload from the content document.
+
+ :param phash: the payload hash to retrieve by.
+ :type phash: str or unicode
+ :rtype: str or unicode
+ """
+ cdocs = self._soledad.get_from_index(
+ fields.TYPE_P_HASH_IDX,
+ fields.TYPE_CONTENT_VAL, str(phash))
+
+ cdoc = first(cdocs)
+ if cdoc is None:
+ logger.warning(
+ "Could not find the content doc "
+ "for phash %s" % (phash,))
+ payload = ""
+ else:
+ payload = cdoc.content.get(fields.RAW_KEY, "")
+ return payload
+
+ # TODO should memory-bound this memoize!!!
+ @memoized_method
+ def _get_ctype_from_document(self, phash):
+ """
+ Reeturn the content-type from the content document.
+
+ :param phash: the payload hash to retrieve by.
+ :type phash: str or unicode
+ :rtype: str or unicode
+ """
+ cdocs = self._soledad.get_from_index(
+ fields.TYPE_P_HASH_IDX,
+ fields.TYPE_CONTENT_VAL, str(phash))
+
+ cdoc = first(cdocs)
+ if not cdoc:
+ logger.warning(
+ "Could not find the content doc "
+ "for phash %s" % (phash,))
+ ctype = cdoc.content.get('ctype', "")
+ return ctype
+
+ @memoized_method
+ def _get_charset(self, stuff):
+ # TODO put in a common class with LeapMessage
+ """
+ Gets (guesses?) the charset of a payload.
+
+ :param stuff: the stuff to guess about.
+ :type stuff: str or unicode
+ :return: charset
+ :rtype: unicode
+ """
+ # XXX existential doubt 2. shouldn't we make the scope
+ # of the decorator somewhat more persistent?
+ # ah! yes! and put memory bounds.
+ return get_email_charset(stuff)
+
+ def getHeaders(self, negate, *names):
+ """
+ Retrieve a group of message headers.
+
+ :param names: The names of the headers to retrieve or omit.
+ :type names: tuple of str
+
+ :param negate: If True, indicates that the headers listed in names
+ should be omitted from the return value, rather
+ than included.
+ :type negate: bool
+
+ :return: A mapping of header field names to header field values
+ :rtype: dict
+ """
+ # XXX refactor together with MessagePart method
+ if not self._pmap:
+ logger.warning("No pmap in Subpart!")
+ return {}
+ headers = dict(self._pmap.get("headers", []))
+
+ names = map(lambda s: s.upper(), names)
+ if negate:
+ cond = lambda key: key.upper() not in names
+ else:
+ cond = lambda key: key.upper() in names
+
+ # default to most likely standard
+ charset = find_charset(headers, "utf-8")
+ headers2 = dict()
+ for key, value in headers.items():
+ # twisted imap server expects *some* headers to be lowercase
+ # We could use a CaseInsensitiveDict here...
+ if key.lower() == "content-type":
+ key = key.lower()
+
+ if not isinstance(key, str):
+ key = key.encode(charset, 'replace')
+ if not isinstance(value, str):
+ value = value.encode(charset, 'replace')
+
+ # filter original dict by negate-condition
+ if cond(key):
+ headers2[key] = value
+ return headers2
+
+ def isMultipart(self):
+ """
+ Return True if this message is multipart.
+ """
+ if empty(self._pmap):
+ logger.warning("Could not get part map!")
+ return False
+ multi = self._pmap.get("multi", False)
+ return multi
+
+ def getSubPart(self, part):
+ """
+ Retrieve a MIME submessage
+
+ :type part: C{int}
+ :param part: The number of the part to retrieve, indexed from 0.
+ :raise IndexError: Raised if the specified part does not exist.
+ :raise TypeError: Raised if this message is not multipart.
+ :rtype: Any object implementing C{IMessagePart}.
+ :return: The specified sub-part.
+ """
+ if not self.isMultipart():
+ raise TypeError
+
+ sub_pmap = self._pmap.get("part_map", {})
+ try:
+ part_map = sub_pmap[str(part + 1)]
+ except KeyError:
+ logger.debug("getSubpart for %s: KeyError" % (part,))
+ raise IndexError
+
+ # XXX check for validity
+ return MessagePart(self._soledad, part_map)
diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py
index 34304ea..25fc55f 100644
--- a/src/leap/mail/imap/messages.py
+++ b/src/leap/mail/imap/messages.py
@@ -20,17 +20,15 @@ LeapMessage and MessageCollection.
import copy
import logging
import re
-import time
import threading
import StringIO
-from collections import defaultdict, namedtuple
+from collections import defaultdict
from functools import partial
from twisted.mail import imap4
from twisted.internet import defer
from twisted.python import log
-from u1db import errors as u1db_errors
from zope.interface import implements
from zope.proxy import sameProxiedObjects
@@ -38,33 +36,30 @@ from leap.common.check import leap_assert, leap_assert_type
from leap.common.decorators import memoized_method
from leap.common.mail import get_email_charset
from leap.mail import walk
-from leap.mail.utils import first, find_charset
-from leap.mail.decorators import deferred
+from leap.mail.utils import first, find_charset, lowerdict, empty
+from leap.mail.utils import stringify_parts_map
+from leap.mail.decorators import deferred_to_thread
from leap.mail.imap.index import IndexedDB
from leap.mail.imap.fields import fields, WithMsgFields
+from leap.mail.imap.memorystore import MessageWrapper
+from leap.mail.imap.messageparts import MessagePart
from leap.mail.imap.parser import MailParser, MBoxParser
-from leap.mail.messageflow import IMessageConsumer
logger = logging.getLogger(__name__)
# TODO ------------------------------------------------------------
+# [ ] Add ref to incoming message during add_msg
# [ ] Add linked-from info.
+# * Need a new type of documents: linkage info.
+# * HDOCS are linked from FDOCs (ref to chash)
+# * CDOCS are linked from HDOCS (ref to chash)
+
# [ ] Delete incoming mail only after successful write!
# [ ] Remove UID from syncable db. Store only those indexes locally.
-
-def lowerdict(_dict):
- """
- Return a dict with the keys in lowercase.
-
- :param _dict: the dict to convert
- :rtype: dict
- """
- # TODO should properly implement a CaseInsensitive dict.
- # Look into requests code.
- return dict((key.lower(), value)
- for key, value in _dict.items())
+MSGID_PATTERN = r"""<([\w@.]+)>"""
+MSGID_RE = re.compile(MSGID_PATTERN)
def try_unique_query(curried):
@@ -92,232 +87,6 @@ def try_unique_query(curried):
except Exception as exc:
logger.exception("Unhandled error %r" % exc)
-MSGID_PATTERN = r"""<([\w@.]+)>"""
-MSGID_RE = re.compile(MSGID_PATTERN)
-
-
-class MessagePart(object):
- """
- IMessagePart implementor.
- It takes a subpart message and is able to find
- the inner parts.
-
- Excusatio non petita: see the interface documentation.
- """
-
- implements(imap4.IMessagePart)
-
- def __init__(self, soledad, part_map):
- """
- Initializes the MessagePart.
-
- :param part_map: a dictionary containing the parts map for this
- message
- :type part_map: dict
- """
- # TODO
- # It would be good to pass the uid/mailbox also
- # for references while debugging.
-
- # We have a problem on bulk moves, and is
- # that when the fetch on the new mailbox is done
- # the parts maybe are not complete.
- # So we should be able to fail with empty
- # docs until we solve that. The ideal would be
- # to gather the results of the deferred operations
- # to signal the operation is complete.
- #leap_assert(part_map, "part map dict cannot be null")
- self._soledad = soledad
- self._pmap = part_map
-
- def getSize(self):
- """
- Return the total size, in octets, of this message part.
-
- :return: size of the message, in octets
- :rtype: int
- """
- if not self._pmap:
- return 0
- size = self._pmap.get('size', None)
- if not size:
- logger.error("Message part cannot find size in the partmap")
- return size
-
- def getBodyFile(self):
- """
- Retrieve a file object containing only the body of this message.
-
- :return: file-like object opened for reading
- :rtype: StringIO
- """
- fd = StringIO.StringIO()
- if self._pmap:
- multi = self._pmap.get('multi')
- if not multi:
- phash = self._pmap.get("phash", None)
- else:
- pmap = self._pmap.get('part_map')
- first_part = pmap.get('1', None)
- if first_part:
- phash = first_part['phash']
-
- if not phash:
- logger.warning("Could not find phash for this subpart!")
- payload = str("")
- else:
- payload = self._get_payload_from_document(phash)
-
- else:
- logger.warning("Message with no part_map!")
- payload = str("")
-
- if payload:
- content_type = self._get_ctype_from_document(phash)
- charset = find_charset(content_type)
- logger.debug("Got charset from header: %s" % (charset,))
- if charset is None:
- charset = self._get_charset(payload)
- logger.debug("Got charset: %s" % (charset,))
- try:
- payload = payload.encode(charset)
- except (UnicodeEncodeError, UnicodeDecodeError) as e:
- logger.error("Unicode error, using 'replace'. {0!r}".format(e))
- payload = payload.encode(charset, 'replace')
-
- fd.write(payload)
- fd.seek(0)
- return fd
-
- # TODO cache the phash retrieval
- def _get_payload_from_document(self, phash):
- """
- Gets the message payload from the content document.
-
- :param phash: the payload hash to retrieve by.
- :type phash: basestring
- """
- cdocs = self._soledad.get_from_index(
- fields.TYPE_P_HASH_IDX,
- fields.TYPE_CONTENT_VAL, str(phash))
-
- cdoc = first(cdocs)
- if not cdoc:
- logger.warning(
- "Could not find the content doc "
- "for phash %s" % (phash,))
- payload = cdoc.content.get(fields.RAW_KEY, "")
- return payload
-
- # TODO cache the pahash retrieval
- def _get_ctype_from_document(self, phash):
- """
- Gets the content-type from the content document.
-
- :param phash: the payload hash to retrieve by.
- :type phash: basestring
- """
- cdocs = self._soledad.get_from_index(
- fields.TYPE_P_HASH_IDX,
- fields.TYPE_CONTENT_VAL, str(phash))
-
- cdoc = first(cdocs)
- if not cdoc:
- logger.warning(
- "Could not find the content doc "
- "for phash %s" % (phash,))
- ctype = cdoc.content.get('ctype', "")
- return ctype
-
- @memoized_method
- def _get_charset(self, stuff):
- # TODO put in a common class with LeapMessage
- """
- Gets (guesses?) the charset of a payload.
-
- :param stuff: the stuff to guess about.
- :type stuff: basestring
- :returns: charset
- """
- # XXX existential doubt 2. shouldn't we make the scope
- # of the decorator somewhat more persistent?
- # ah! yes! and put memory bounds.
- return get_email_charset(unicode(stuff))
-
- def getHeaders(self, negate, *names):
- """
- Retrieve a group of message headers.
-
- :param names: The names of the headers to retrieve or omit.
- :type names: tuple of str
-
- :param negate: If True, indicates that the headers listed in names
- should be omitted from the return value, rather
- than included.
- :type negate: bool
-
- :return: A mapping of header field names to header field values
- :rtype: dict
- """
- if not self._pmap:
- logger.warning("No pmap in Subpart!")
- return {}
- headers = dict(self._pmap.get("headers", []))
-
- # twisted imap server expects *some* headers to be lowercase
- # We could use a CaseInsensitiveDict here...
- headers = dict(
- (str(key), str(value)) if key.lower() != "content-type"
- else (str(key.lower()), str(value))
- for (key, value) in headers.items())
-
- names = map(lambda s: s.upper(), names)
- if negate:
- cond = lambda key: key.upper() not in names
- else:
- cond = lambda key: key.upper() in names
-
- # unpack and filter original dict by negate-condition
- filter_by_cond = [
- map(str, (key, val)) for
- key, val in headers.items()
- if cond(key)]
- filtered = dict(filter_by_cond)
- return filtered
-
- def isMultipart(self):
- """
- Return True if this message is multipart.
- """
- if not self._pmap:
- logger.warning("Could not get part map!")
- return False
- multi = self._pmap.get("multi", False)
- return multi
-
- def getSubPart(self, part):
- """
- Retrieve a MIME submessage
-
- :type part: C{int}
- :param part: The number of the part to retrieve, indexed from 0.
- :raise IndexError: Raised if the specified part does not exist.
- :raise TypeError: Raised if this message is not multipart.
- :rtype: Any object implementing C{IMessagePart}.
- :return: The specified sub-part.
- """
- if not self.isMultipart():
- raise TypeError
- sub_pmap = self._pmap.get("part_map", {})
- try:
- part_map = sub_pmap[str(part + 1)]
- except KeyError:
- logger.debug("getSubpart for %s: KeyError" % (part,))
- raise IndexError
-
- # XXX check for validity
- return MessagePart(self._soledad, part_map)
-
class LeapMessage(fields, MailParser, MBoxParser):
"""
@@ -328,12 +97,14 @@ class LeapMessage(fields, MailParser, MBoxParser):
"""
# TODO this has to change.
- # Should index primarily by chash, and keep a local-lonly
+ # Should index primarily by chash, and keep a local-only
# UID table.
implements(imap4.IMessage)
- def __init__(self, soledad, uid, mbox, collection=None):
+ flags_lock = threading.Lock()
+
+ def __init__(self, soledad, uid, mbox, collection=None, container=None):
"""
Initializes a LeapMessage.
@@ -342,32 +113,54 @@ class LeapMessage(fields, MailParser, MBoxParser):
:param uid: the UID for the message.
:type uid: int or basestring
:param mbox: the mbox this message belongs to
- :type mbox: basestring
+ :type mbox: str or unicode
:param collection: a reference to the parent collection object
:type collection: MessageCollection
+ :param container: a IMessageContainer implementor instance
+ :type container: IMessageContainer
"""
MailParser.__init__(self)
self._soledad = soledad
self._uid = int(uid)
self._mbox = self._parse_mailbox_name(mbox)
self._collection = collection
+ self._container = container
self.__chash = None
self.__bdoc = None
+ # XXX make these properties public
+
@property
def _fdoc(self):
"""
An accessor to the flags document.
"""
if all(map(bool, (self._uid, self._mbox))):
- fdoc = self._get_flags_doc()
+ fdoc = None
+ if self._container is not None:
+ fdoc = self._container.fdoc
+ if not fdoc:
+ fdoc = self._get_flags_doc()
if fdoc:
- self.__chash = fdoc.content.get(
+ fdoc_content = fdoc.content
+ self.__chash = fdoc_content.get(
fields.CONTENT_HASH_KEY, None)
return fdoc
@property
+ def _hdoc(self):
+ """
+ An accessor to the headers document.
+ """
+ if self._container is not None:
+ hdoc = self._container.hdoc
+ if hdoc and not empty(hdoc.content):
+ return hdoc
+ # XXX cache this into the memory store !!!
+ return self._get_headers_doc()
+
+ @property
def _chash(self):
"""
An accessor to the content hash for this message.
@@ -380,13 +173,6 @@ class LeapMessage(fields, MailParser, MBoxParser):
return self.__chash
@property
- def _hdoc(self):
- """
- An accessor to the headers document.
- """
- return self._get_headers_doc()
-
- @property
def _bdoc(self):
"""
An accessor to the body document.
@@ -415,39 +201,33 @@ class LeapMessage(fields, MailParser, MBoxParser):
:return: The flags, represented as strings
:rtype: tuple
"""
- if self._uid is None:
- return []
uid = self._uid
- flags = []
+ flags = set([])
fdoc = self._fdoc
if fdoc:
- flags = fdoc.content.get(self.FLAGS_KEY, None)
+ flags = set(fdoc.content.get(self.FLAGS_KEY, None))
msgcol = self._collection
# We treat the recent flag specially: gotten from
# a mailbox-level document.
if msgcol and uid in msgcol.recent_flags:
- flags.append(fields.RECENT_FLAG)
+ flags.add(fields.RECENT_FLAG)
if flags:
flags = map(str, flags)
return tuple(flags)
- # setFlags, addFlags, removeFlags are not in the interface spec
- # but we use them with store command.
+ # setFlags not in the interface spec but we use it with store command.
- def setFlags(self, flags):
+ def setFlags(self, flags, mode):
"""
Sets the flags for this message
- Returns a SoledadDocument that needs to be updated by the caller.
-
:param flags: the flags to update in the message.
:type flags: tuple of str
-
- :return: a SoledadDocument instance
- :rtype: SoledadDocument
+ :param mode: the mode for setting. 1 is append, -1 is remove, 0 set.
+ :type mode: int
"""
leap_assert(isinstance(flags, tuple), "flags need to be a tuple")
log.msg('setting flags: %s (%s)' % (self._uid, flags))
@@ -458,42 +238,36 @@ class LeapMessage(fields, MailParser, MBoxParser):
"Could not find FDOC for %s:%s while setting flags!" %
(self._mbox, self._uid))
return
- doc.content[self.FLAGS_KEY] = flags
- doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags
- doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags
- self._soledad.put_doc(doc)
-
- def addFlags(self, flags):
- """
- Adds flags to this message.
-
- Returns a SoledadDocument that needs to be updated by the caller.
- :param flags: the flags to add to the message.
- :type flags: tuple of str
-
- :return: a SoledadDocument instance
- :rtype: SoledadDocument
- """
- leap_assert(isinstance(flags, tuple), "flags need to be a tuple")
- oldflags = self.getFlags()
- self.setFlags(tuple(set(flags + oldflags)))
-
- def removeFlags(self, flags):
- """
- Remove flags from this message.
-
- Returns a SoledadDocument that needs to be updated by the caller.
-
- :param flags: the flags to be removed from the message.
- :type flags: tuple of str
-
- :return: a SoledadDocument instance
- :rtype: SoledadDocument
- """
- leap_assert(isinstance(flags, tuple), "flags need to be a tuple")
- oldflags = self.getFlags()
- self.setFlags(tuple(set(oldflags) - set(flags)))
+ APPEND = 1
+ REMOVE = -1
+ SET = 0
+
+ with self.flags_lock:
+ current = doc.content[self.FLAGS_KEY]
+ if mode == APPEND:
+ newflags = tuple(set(tuple(current) + flags))
+ elif mode == REMOVE:
+ newflags = tuple(set(current).difference(set(flags)))
+ elif mode == SET:
+ newflags = flags
+
+ # We could defer this, but I think it's better
+ # to put it under the lock...
+ doc.content[self.FLAGS_KEY] = newflags
+ doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags
+ doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags
+
+ if self._collection.memstore is not None:
+ log.msg("putting message in collection")
+ self._collection.memstore.put_message(
+ self._mbox, self._uid,
+ MessageWrapper(fdoc=doc.content, new=False, dirty=True,
+ docs_id={'fdoc': doc.doc_id}))
+ else:
+ # fallback for non-memstore initializations.
+ self._soledad.put_doc(doc)
+ return map(str, newflags)
def getInternalDate(self):
"""
@@ -519,29 +293,42 @@ class LeapMessage(fields, MailParser, MBoxParser):
:return: file-like object opened for reading
:rtype: StringIO
"""
+ def write_fd(body):
+ fd.write(body)
+ fd.seek(0)
+ return fd
+
# TODO refactor with getBodyFile in MessagePart
+
fd = StringIO.StringIO()
- bdoc = self._bdoc
- if bdoc:
- body = self._bdoc.content.get(self.RAW_KEY, "")
- content_type = bdoc.content.get('content-type', "")
+
+ if self._bdoc is not None:
+ bdoc_content = self._bdoc.content
+ if empty(bdoc_content):
+ logger.warning("No BDOC content found for message!!!")
+ return write_fd("")
+
+ body = bdoc_content.get(self.RAW_KEY, "")
+ content_type = bdoc_content.get('content-type', "")
charset = find_charset(content_type)
+ logger.debug('got charset from content-type: %s' % charset)
if charset is None:
charset = self._get_charset(body)
try:
- body = body.encode(charset)
- except UnicodeError as e:
- logger.error("Unicode error, using 'replace'. {0!r}".format(e))
+ if isinstance(body, unicode):
+ body = body.encode(charset)
+ except UnicodeError as exc:
+ logger.error(
+ "Unicode error, using 'replace'. {0!r}".format(exc))
+ logger.debug("Attempted to encode with: %s" % charset)
body = body.encode(charset, 'replace')
+ finally:
+ return write_fd(body)
# We are still returning funky characters from here.
else:
logger.warning("No BDOC found for message.")
- body = str("")
-
- fd.write(body)
- fd.seek(0)
- return fd
+ return write_fd("")
@memoized_method
def _get_charset(self, stuff):
@@ -552,11 +339,10 @@ class LeapMessage(fields, MailParser, MBoxParser):
:type stuff: basestring
:returns: charset
"""
- # TODO get from subpart headers
- # XXX existential doubt 2. shouldn't we make the scope
+ # XXX shouldn't we make the scope
# of the decorator somewhat more persistent?
# ah! yes! and put memory bounds.
- return get_email_charset(unicode(stuff))
+ return get_email_charset(stuff)
def getSize(self):
"""
@@ -567,7 +353,8 @@ class LeapMessage(fields, MailParser, MBoxParser):
"""
size = None
if self._fdoc:
- size = self._fdoc.content.get(self.SIZE_KEY, False)
+ fdoc_content = self._fdoc.content
+ size = fdoc_content.get(self.SIZE_KEY, False)
else:
logger.warning("No FLAGS doc for %s:%s" % (self._mbox,
self._uid))
@@ -592,6 +379,8 @@ class LeapMessage(fields, MailParser, MBoxParser):
:rtype: dict
"""
# TODO split in smaller methods
+ # XXX refactor together with MessagePart method
+
headers = self._get_headers()
if not headers:
logger.warning("No headers found")
@@ -608,11 +397,10 @@ class LeapMessage(fields, MailParser, MBoxParser):
# default to most likely standard
charset = find_charset(headers, "utf-8")
-
- # twisted imap server expects *some* headers to be lowercase
- # XXX refactor together with MessagePart method
headers2 = dict()
for key, value in headers.items():
+ # twisted imap server expects *some* headers to be lowercase
+ # We could use a CaseInsensitiveDict here...
if key.lower() == "content-type":
key = key.lower()
@@ -621,10 +409,13 @@ class LeapMessage(fields, MailParser, MBoxParser):
if not isinstance(value, str):
value = value.encode(charset, 'replace')
+ if value.endswith(";"):
+ # bastards
+ value = value[:-1]
+
# filter original dict by negate-condition
if cond(key):
headers2[key] = value
-
return headers2
def _get_headers(self):
@@ -632,7 +423,8 @@ class LeapMessage(fields, MailParser, MBoxParser):
Return the headers dict for this message.
"""
if self._hdoc is not None:
- headers = self._hdoc.content.get(self.HEADERS_KEY, {})
+ hdoc_content = self._hdoc.content
+ headers = hdoc_content.get(self.HEADERS_KEY, {})
return headers
else:
@@ -646,7 +438,8 @@ class LeapMessage(fields, MailParser, MBoxParser):
Return True if this message is multipart.
"""
if self._fdoc:
- is_multipart = self._fdoc.content.get(self.MULTIPART_KEY, False)
+ fdoc_content = self._fdoc.content
+ is_multipart = fdoc_content.get(self.MULTIPART_KEY, False)
return is_multipart
else:
logger.warning(
@@ -688,9 +481,15 @@ class LeapMessage(fields, MailParser, MBoxParser):
logger.warning("Tried to get part but no HDOC found!")
return None
- pmap = self._hdoc.content.get(fields.PARTS_MAP_KEY, {})
+ hdoc_content = self._hdoc.content
+ pmap = hdoc_content.get(fields.PARTS_MAP_KEY, {})
+
+ # remember, lads, soledad is using strings in its keys,
+ # not integers!
return pmap[str(part)]
+ # XXX moved to memory store
+ # move the rest too. ------------------------------------------
def _get_flags_doc(self):
"""
Return the document that keeps the flags for this
@@ -724,16 +523,31 @@ class LeapMessage(fields, MailParser, MBoxParser):
Return the document that keeps the body for this
message.
"""
- body_phash = self._hdoc.content.get(
+ hdoc_content = self._hdoc.content
+ body_phash = hdoc_content.get(
fields.BODY_KEY, None)
if not body_phash:
logger.warning("No body phash for this document!")
return None
- body_docs = self._soledad.get_from_index(
- fields.TYPE_P_HASH_IDX,
- fields.TYPE_CONTENT_VAL, str(body_phash))
- return first(body_docs)
+ # XXX get from memstore too...
+ # if memstore: memstore.get_phrash
+ # memstore should keep a dict with weakrefs to the
+ # phash doc...
+
+ if self._container is not None:
+ bdoc = self._container.memstore.get_cdoc_from_phash(body_phash)
+ if not empty(bdoc) and not empty(bdoc.content):
+ return bdoc
+
+ # no memstore, or no body doc found there
+ if self._soledad:
+ body_docs = self._soledad.get_from_index(
+ fields.TYPE_P_HASH_IDX,
+ fields.TYPE_CONTENT_VAL, str(body_phash))
+ return first(body_docs)
+ else:
+ logger.error("No phash in container, and no soledad found!")
def __getitem__(self, key):
"""
@@ -748,216 +562,19 @@ class LeapMessage(fields, MailParser, MBoxParser):
"""
return self._fdoc.content.get(key, None)
- # setters
-
- # XXX to be used in the messagecopier interface?!
-
- def set_uid(self, uid):
- """
- Set new uid for this message.
-
- :param uid: the new uid
- :type uid: basestring
- """
- # XXX dangerous! lock?
- self._uid = uid
- d = self._fdoc
- d.content[self.UID_KEY] = uid
- self._soledad.put_doc(d)
-
- def set_mbox(self, mbox):
- """
- Set new mbox for this message.
-
- :param mbox: the new mbox
- :type mbox: basestring
- """
- # XXX dangerous! lock?
- self._mbox = mbox
- d = self._fdoc
- d.content[self.MBOX_KEY] = mbox
- self._soledad.put_doc(d)
-
- # destructor
-
- @deferred
- def remove(self):
- """
- Remove all docs associated with this message.
- """
- # XXX For the moment we are only removing the flags and headers
- # docs. The rest we leave there polluting your hard disk,
- # until we think about a good way of deorphaning.
- # Maybe a crawler of unreferenced docs.
-
- # XXX implement elijah's idea of using a PUT document as a
- # token to ensure consistency in the removal.
-
- uid = self._uid
-
- fd = self._get_flags_doc()
- #hd = self._get_headers_doc()
- #bd = self._get_body_doc()
- #docs = [fd, hd, bd]
-
- docs = [fd]
-
- for d in filter(None, docs):
- try:
- self._soledad.delete_doc(d)
- except Exception as exc:
- logger.error(exc)
- return uid
-
def does_exist(self):
"""
- Return True if there is actually a flags message for this
+ Return True if there is actually a flags document for this
UID and mbox.
"""
- return self._fdoc is not None
-
-
-class ContentDedup(object):
- """
- Message deduplication.
-
- We do a query for the content hashes before writing to our beloved
- sqlcipher backend of Soledad. This means, by now, that:
-
- 1. We will not store the same attachment twice, only the hash of it.
- 2. We will not store the same message body twice, only the hash of it.
-
- The first case is useful if you are always receiving the same old memes
- from unwary friends that still have not discovered that 4chan is the
- generator of the internet. The second will save your day if you have
- initiated session with the same account in two different machines. I also
- wonder why would you do that, but let's respect each other choices, like
- with the religious celebrations, and assume that one day we'll be able
- to run Bitmask in completely free phones. Yes, I mean that, the whole GSM
- Stack.
- """
-
- def _content_does_exist(self, doc):
- """
- Check whether we already have a content document for a payload
- with this hash in our database.
-
- :param doc: tentative body document
- :type doc: dict
- :returns: True if that happens, False otherwise.
- """
- if not doc:
- return False
- phash = doc[fields.PAYLOAD_HASH_KEY]
- attach_docs = self._soledad.get_from_index(
- fields.TYPE_P_HASH_IDX,
- fields.TYPE_CONTENT_VAL, str(phash))
- if not attach_docs:
- return False
-
- if len(attach_docs) != 1:
- logger.warning("Found more than one copy of phash %s!"
- % (phash,))
- logger.debug("Found attachment doc with that hash! Skipping save!")
- return True
-
-
-SoledadWriterPayload = namedtuple(
- 'SoledadWriterPayload', ['mode', 'payload'])
-
-# TODO we could consider using enum here:
-# https://pypi.python.org/pypi/enum
-
-SoledadWriterPayload.CREATE = 1
-SoledadWriterPayload.PUT = 2
-SoledadWriterPayload.CONTENT_CREATE = 3
-
-
-"""
-SoledadDocWriter was used to avoid writing to the db from multiple threads.
-Its use here has been deprecated in favor of a local rw_lock in the client.
-But we might want to reuse in in the near future to implement priority queues.
-"""
-
-
-class SoledadDocWriter(object):
- """
- This writer will create docs serially in the local soledad database.
- """
-
- implements(IMessageConsumer)
-
- def __init__(self, soledad):
- """
- Initialize the writer.
-
- :param soledad: the soledad instance
- :type soledad: Soledad
- """
- self._soledad = soledad
-
- def _get_call_for_item(self, item):
- """
- Return the proper call type for a given item.
-
- :param item: one of the types defined under the
- attributes of SoledadWriterPayload
- :type item: int
- """
- call = None
- payload = item.payload
-
- if item.mode == SoledadWriterPayload.CREATE:
- call = self._soledad.create_doc
- elif (item.mode == SoledadWriterPayload.CONTENT_CREATE
- and not self._content_does_exist(payload)):
- call = self._soledad.create_doc
- elif item.mode == SoledadWriterPayload.PUT:
- call = self._soledad.put_doc
- return call
-
- def _process(self, queue):
- """
- Return the item and the proper call type for the next
- item in the queue if any.
+ return not empty(self._fdoc)
- :param queue: the queue from where we'll pick item.
- :type queue: Queue
- """
- item = queue.get()
- call = self._get_call_for_item(item)
- return item, call
-
- def consume(self, queue):
- """
- Creates a new document in soledad db.
-
- :param queue: queue to get item from, with content of the document
- to be inserted.
- :type queue: Queue
- """
- empty = queue.empty()
- while not empty:
- item, call = self._process(queue)
-
- if call:
- # XXX should handle the delete case
- # should handle errors
- try:
- call(item.payload)
- except u1db_errors.RevisionConflict as exc:
- logger.error("Error: %r" % (exc,))
- raise exc
- empty = queue.empty()
-
-
-class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
- ContentDedup):
+class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
"""
A collection of messages, surprisingly.
- It is tied to a selected mailbox name that is passed to constructor.
+ It is tied to a selected mailbox name that is passed to its constructor.
Implements a filter query over the messages contained in a soledad
database.
"""
@@ -1058,7 +675,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
_hdocset_lock = threading.Lock()
_hdocset_property_lock = threading.Lock()
- def __init__(self, mbox=None, soledad=None):
+ def __init__(self, mbox=None, soledad=None, memstore=None):
"""
Constructor for MessageCollection.
@@ -1068,13 +685,18 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
MessageCollection for each mailbox, instead of treating them
as a property of each message.
+ We are passed an instance of MemoryStore, the same for the
+ SoledadBackedAccount, that we use as a read cache and a buffer
+ for writes.
+
:param mbox: the name of the mailbox. It is the name
with which we filter the query over the
- messages database
+ messages database.
:type mbox: str
-
:param soledad: Soledad database
:type soledad: Soledad instance
+ :param memstore: a MemoryStore instance
+ :type memstore: MemoryStore
"""
MailParser.__init__(self)
leap_assert(mbox, "Need a mailbox name to initialize")
@@ -1084,15 +706,22 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
leap_assert(soledad, "Need a soledad instance to initialize")
# okay, all in order, keep going...
+
self.mbox = self._parse_mailbox_name(mbox)
+
+ # XXX get a SoledadStore passed instead
self._soledad = soledad
+ self.memstore = memstore
+
self.__rflags = None
self.__hdocset = None
self.initialize_db()
# ensure that we have a recent-flags and a hdocs-sec doc
self._get_or_create_rdoc()
- self._get_or_create_hdocset()
+
+ # Not for now...
+ #self._get_or_create_hdocset()
def _get_empty_doc(self, _type=FLAGS_DOC):
"""
@@ -1210,17 +839,21 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
:type chash: basestring
:return: False, if it does not exist, or UID.
"""
- exist = self._get_fdoc_from_chash(chash)
+ exist = False
+ if self.memstore is not None:
+ exist = self.memstore.get_fdoc_from_chash(chash, self.mbox)
+
+ if not exist:
+ exist = self._get_fdoc_from_chash(chash)
if exist:
return exist.content.get(fields.UID_KEY, "unknown-uid")
else:
return False
- @deferred
- def add_msg(self, raw, subject=None, flags=None, date=None, uid=1):
+ def add_msg(self, raw, subject=None, flags=None, date=None, uid=None,
+ notify_on_disk=False):
"""
Creates a new message document.
- Here lives the magic of the leap mail. Well, in soledad, really.
:param raw: the raw message
:type raw: str
@@ -1236,27 +869,63 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
:param uid: the message uid for this mailbox
:type uid: int
- """
- # TODO signal that we can delete the original message!-----
- # when all the processing is done.
-
- # TODO add the linked-from info !
+ :return: a deferred that will be fired with the message
+ uid when the adding succeed.
+ :rtype: deferred
+ """
logger.debug('adding message')
if flags is None:
flags = tuple()
leap_assert_type(flags, tuple)
+ d = defer.Deferred()
+ self._do_add_msg(raw, flags, subject, date, notify_on_disk, d)
+ return d
+
+ # We SHOULD defer this (or the heavy load here) to the thread pool,
+ # but it gives troubles with the QSocketNotifier used by Qt...
+ def _do_add_msg(self, raw, flags, subject, date, notify_on_disk, observer):
+ """
+ Helper that creates a new message document.
+ Here lives the magic of the leap mail. Well, in soledad, really.
+
+ See `add_msg` docstring for parameter info.
+
+ :param observer: a deferred that will be fired with the message
+ uid when the adding succeed.
+ :type observer: deferred
+ """
+ # TODO signal that we can delete the original message!-----
+ # when all the processing is done.
+
+ # TODO add the linked-from info !
+ # TODO add reference to the original message
+
# parse
msg, chash, size, multi = self._do_parse(raw)
- # check for uniqueness.
- if self._fdoc_already_exists(chash):
- logger.warning("We already have that message in this mailbox.")
- # note that this operation will leave holes in the UID sequence,
- # but we're gonna change that all the same for a local-only table.
- # so not touch it by the moment.
- return False
+ # check for uniqueness --------------------------------
+ # XXX profiler says that this test is costly.
+ # So we probably should just do an in-memory check and
+ # move the complete check to the soledad writer?
+ # Watch out! We're reserving a UID right after this!
+ existing_uid = self._fdoc_already_exists(chash)
+ if existing_uid:
+ logger.warning("We already have that message in this "
+ "mailbox, unflagging as deleted")
+ uid = existing_uid
+ msg = self.get_msg_by_uid(uid)
+ msg.setFlags((fields.DELETED_FLAG,), -1)
+
+ # XXX if this is deferred to thread again we should not use
+ # the callback in the deferred thread, but return and
+ # call the callback from the caller fun...
+ observer.callback(uid)
+ return
+
+ uid = self.memstore.increment_last_soledad_uid(self.mbox)
+ logger.info("ADDING MSG WITH UID: %s" % uid)
fd = self._populate_flags(flags, uid, chash, size, multi)
hd = self._populate_headr(msg, chash, subject, date)
@@ -1273,48 +942,16 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
hd[key] = parts_map[key]
del parts_map
- # Saving ----------------------------------------
- self.set_recent_flag(uid)
-
- # first, regular docs: flags and headers
- self._soledad.create_doc(fd)
- # XXX should check for content duplication on headers too
- # but with chash. !!!
- hdoc = self._soledad.create_doc(hd)
- # We add the newly created hdoc to the fast-access set of
- # headers documents associated with the mailbox.
- self.add_hdocset_docid(hdoc.doc_id)
-
- # and last, but not least, try to create
- # content docs if not already there.
- cdocs = walk.get_raw_docs(msg, parts)
- for cdoc in cdocs:
- if not self._content_does_exist(cdoc):
- self._soledad.create_doc(cdoc)
+ hd = stringify_parts_map(hd)
- def _remove_cb(self, result):
- return result
-
- def remove_all_deleted(self):
- """
- Removes all messages flagged as deleted.
- """
- delete_deferl = []
- for msg in self.get_deleted():
- delete_deferl.append(msg.remove())
- d1 = defer.gatherResults(delete_deferl, consumeErrors=True)
- d1.addCallback(self._remove_cb)
- return d1
+ # The MessageContainer expects a dict, one-indexed
+ cdocs = dict(enumerate(walk.get_raw_docs(msg, parts), 1))
- def remove(self, msg):
- """
- Remove a given msg.
- :param msg: the message to be removed
- :type msg: LeapMessage
- """
- d = msg.remove()
- d.addCallback(self._remove_cb)
- return d
+ self.set_recent_flag(uid)
+ msg_container = MessageWrapper(fd, hd, cdocs)
+ self.memstore.create_message(self.mbox, uid, msg_container,
+ observer=observer,
+ notify_on_disk=notify_on_disk)
#
# getters: specific queries
@@ -1326,32 +963,59 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
"""
An accessor for the recent-flags set for this mailbox.
"""
- if not self.__rflags:
+ # XXX check if we should remove this
+ if self.__rflags is not None:
+ return self.__rflags
+
+ if self.memstore is not None:
with self._rdoc_lock:
- rdoc = self._get_recent_doc()
- self.__rflags = set(rdoc.content.get(
- fields.RECENTFLAGS_KEY, []))
- return self.__rflags
+ rflags = self.memstore.get_recent_flags(self.mbox)
+ if not rflags:
+ # not loaded in the memory store yet.
+ # let's fetch them from soledad...
+ rdoc = self._get_recent_doc()
+ rflags = set(rdoc.content.get(
+ fields.RECENTFLAGS_KEY, []))
+ # ...and cache them now.
+ self.memstore.load_recent_flags(
+ self.mbox,
+ {'doc_id': rdoc.doc_id, 'set': rflags})
+ return rflags
+
+ #else:
+ # fallback for cases without memory store
+ #with self._rdoc_lock:
+ #rdoc = self._get_recent_doc()
+ #self.__rflags = set(rdoc.content.get(
+ #fields.RECENTFLAGS_KEY, []))
+ #return self.__rflags
def _set_recent_flags(self, value):
"""
Setter for the recent-flags set for this mailbox.
"""
- with self._rdoc_lock:
- rdoc = self._get_recent_doc()
- newv = set(value)
- self.__rflags = newv
- rdoc.content[fields.RECENTFLAGS_KEY] = list(newv)
- # XXX should deferLater 0 it?
- self._soledad.put_doc(rdoc)
+ if self.memstore is not None:
+ self.memstore.set_recent_flags(self.mbox, value)
+
+ #else:
+ # fallback for cases without memory store
+ #with self._rdoc_lock:
+ #rdoc = self._get_recent_doc()
+ #newv = set(value)
+ #self.__rflags = newv
+ #rdoc.content[fields.RECENTFLAGS_KEY] = list(newv)
+ # XXX should deferLater 0 it?
+ #self._soledad.put_doc(rdoc)
recent_flags = property(
_get_recent_flags, _set_recent_flags,
doc="Set of UIDs with the recent flag for this mailbox.")
+ # XXX change naming, indicate soledad query.
def _get_recent_doc(self):
"""
- Get recent-flags document for this mailbox.
+ Get recent-flags document from Soledad for this mailbox.
+ :rtype: SoledadDocument or None
"""
curried = partial(
self._soledad.get_from_index,
@@ -1367,100 +1031,39 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
def unset_recent_flags(self, uids):
"""
Unset Recent flag for a sequence of uids.
+
+ :param uids: the uids to unset
+ :type uid: sequence
"""
with self._rdoc_property_lock:
- self.recent_flags = self.recent_flags.difference(
+ self.recent_flags.difference_update(
set(uids))
+ # Individual flags operations
+
def unset_recent_flag(self, uid):
"""
Unset Recent flag for a given uid.
+
+ :param uid: the uid to unset
+ :type uid: int
"""
with self._rdoc_property_lock:
- self.recent_flags = self.recent_flags.difference(
+ self.recent_flags.difference_update(
set([uid]))
+ @deferred_to_thread
def set_recent_flag(self, uid):
"""
Set Recent flag for a given uid.
+
+ :param uid: the uid to set
+ :type uid: int
"""
with self._rdoc_property_lock:
self.recent_flags = self.recent_flags.union(
set([uid]))
- # headers-docs-set
-
- def _get_hdocset(self):
- """
- An accessor for the hdocs-set for this mailbox.
- """
- if not self.__hdocset:
- with self._hdocset_lock:
- hdocset_doc = self._get_hdocset_doc()
- value = set(hdocset_doc.content.get(
- fields.HDOCS_SET_KEY, []))
- self.__hdocset = value
- return self.__hdocset
-
- def _set_hdocset(self, value):
- """
- Setter for the hdocs-set for this mailbox.
- """
- with self._hdocset_lock:
- hdocset_doc = self._get_hdocset_doc()
- newv = set(value)
- self.__hdocset = newv
- hdocset_doc.content[fields.HDOCS_SET_KEY] = list(newv)
- # XXX should deferLater 0 it?
- self._soledad.put_doc(hdocset_doc)
-
- _hdocset = property(
- _get_hdocset, _set_hdocset,
- doc="Set of Document-IDs for the headers docs associated "
- "with this mailbox.")
-
- def _get_hdocset_doc(self):
- """
- Get hdocs-set document for this mailbox.
- """
- curried = partial(
- self._soledad.get_from_index,
- fields.TYPE_MBOX_IDX,
- fields.TYPE_HDOCS_SET_VAL, self.mbox)
- curried.expected = "hdocset"
- hdocset_doc = try_unique_query(curried)
- return hdocset_doc
-
- # Property-set modification (protected by a different
- # lock to give atomicity to the read/write operation)
-
- def remove_hdocset_docids(self, docids):
- """
- Remove the given document IDs from the set of
- header-documents associated with this mailbox.
- """
- with self._hdocset_property_lock:
- self._hdocset = self._hdocset.difference(
- set(docids))
-
- def remove_hdocset_docid(self, docid):
- """
- Remove the given document ID from the set of
- header-documents associated with this mailbox.
- """
- with self._hdocset_property_lock:
- self._hdocset = self._hdocset.difference(
- set([docid]))
-
- def add_hdocset_docid(self, docid):
- """
- Add the given document ID to the set of
- header-documents associated with this mailbox.
- """
- with self._hdocset_property_lock:
- self._hdocset = self._hdocset.union(
- set([docid]))
-
# individual doc getters, message layer.
def _get_fdoc_from_chash(self, chash):
@@ -1499,7 +1102,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
return None
return fdoc.content.get(fields.UID_KEY, None)
- @deferred
+ @deferred_to_thread
def _get_uid_from_msgid(self, msgid):
"""
Return a UID for a given message-id.
@@ -1515,24 +1118,83 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
# and we cannot find it otherwise. This seems to be enough.
# XXX do a deferLater instead ??
- time.sleep(0.3)
+ # XXX is this working?
return self._get_uid_from_msgidCb(msgid)
+ def set_flags(self, mbox, messages, flags, mode, observer):
+ """
+ Set flags for a sequence of messages.
+
+ :param mbox: the mbox this message belongs to
+ :type mbox: str or unicode
+ :param messages: the messages to iterate through
+ :type messages: sequence
+ :flags: the flags to be set
+ :type flags: tuple
+ :param mode: the mode for setting. 1 is append, -1 is remove, 0 set.
+ :type mode: int
+ :param observer: a deferred that will be called with the dictionary
+ mapping UIDs to flags after the operation has been
+ done.
+ :type observer: deferred
+ """
+ # XXX we could defer *this* to thread pool, and gather results...
+ # XXX use deferredList
+
+ deferreds = []
+ for msg_id in messages:
+ deferreds.append(
+ self._set_flag_for_uid(msg_id, flags, mode))
+
+ def notify(result):
+ observer.callback(dict(result))
+ d1 = defer.gatherResults(deferreds, consumeErrors=True)
+ d1.addCallback(notify)
+
+ @deferred_to_thread
+ def _set_flag_for_uid(self, msg_id, flags, mode):
+ """
+ Run the set_flag operation in the thread pool.
+ """
+ log.msg("MSG ID = %s" % msg_id)
+ msg = self.get_msg_by_uid(msg_id, mem_only=True, flags_only=True)
+ if msg is not None:
+ return msg_id, msg.setFlags(flags, mode)
+
# getters: generic for a mailbox
- def get_msg_by_uid(self, uid):
+ def get_msg_by_uid(self, uid, mem_only=False, flags_only=False):
"""
Retrieves a LeapMessage by UID.
This is used primarity in the Mailbox fetch and store methods.
:param uid: the message uid to query by
:type uid: int
+ :param mem_only: a flag that indicates whether this Message should
+ pass a reference to soledad to retrieve missing pieces
+ or not.
+ :type mem_only: bool
+ :param flags_only: whether the message should carry only a reference
+ to the flags document.
+ :type flags_only: bool
:return: A LeapMessage instance matching the query,
or None if not found.
:rtype: LeapMessage
"""
- msg = LeapMessage(self._soledad, uid, self.mbox, collection=self)
+ msg_container = self.memstore.get_message(self.mbox, uid, flags_only)
+ if msg_container is not None:
+ if mem_only:
+ msg = LeapMessage(None, uid, self.mbox, collection=self,
+ container=msg_container)
+ else:
+ # We pass a reference to soledad just to be able to retrieve
+ # missing parts that cannot be found in the container, like
+ # the content docs after a copy.
+ msg = LeapMessage(self._soledad, uid, self.mbox,
+ collection=self, container=msg_container)
+ else:
+ msg = LeapMessage(self._soledad, uid, self.mbox, collection=self)
if not msg.does_exist():
return None
return msg
@@ -1564,40 +1226,50 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
# FIXME ----------------------------------------------
return sorted(all_docs, key=lambda item: item.content['uid'])
- def all_uid_iter(self):
+ def all_soledad_uid_iter(self):
"""
- Return an iterator trhough the UIDs of all messages, sorted in
+ Return an iterator through the UIDs of all messages, sorted in
ascending order.
"""
- # XXX we should get this from the uid table, local-only
- all_uids = (doc.content[self.UID_KEY] for doc in
- self._soledad.get_from_index(
- fields.TYPE_MBOX_IDX,
- fields.TYPE_FLAGS_VAL, self.mbox))
- return (u for u in sorted(all_uids))
+ db_uids = set([doc.content[self.UID_KEY] for doc in
+ self._soledad.get_from_index(
+ fields.TYPE_MBOX_IDX,
+ fields.TYPE_FLAGS_VAL, self.mbox)])
+ return db_uids
- def reset_last_uid(self, param):
+ def all_uid_iter(self):
"""
- Set the last uid to the highest uid found.
- Used while expunging, passed as a callback.
+ Return an iterator through the UIDs of all messages, from memory.
"""
- try:
- self.last_uid = max(self.all_uid_iter()) + 1
- except ValueError:
- # empty sequence
- pass
- return param
+ if self.memstore is not None:
+ mem_uids = self.memstore.get_uids(self.mbox)
+ soledad_known_uids = self.memstore.get_soledad_known_uids(
+ self.mbox)
+ combined = tuple(set(mem_uids).union(soledad_known_uids))
+ return combined
+ # XXX MOVE to memstore
def all_flags(self):
"""
Return a dict with all flags documents for this mailbox.
"""
+ # XXX get all from memstore and cache it there
+ # FIXME should get all uids, get them fro memstore,
+ # and get only the missing ones from disk.
+
all_flags = dict(((
doc.content[self.UID_KEY],
doc.content[self.FLAGS_KEY]) for doc in
self._soledad.get_from_index(
fields.TYPE_MBOX_IDX,
fields.TYPE_FLAGS_VAL, self.mbox)))
+ if self.memstore is not None:
+ uids = self.memstore.get_uids(self.mbox)
+ docs = ((uid, self.memstore.get_message(self.mbox, uid))
+ for uid in uids)
+ for uid, doc in docs:
+ all_flags[uid] = doc.fdoc.content[self.FLAGS_KEY]
+
return all_flags
def all_flags_chash(self):
@@ -1630,9 +1302,12 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
:rtype: int
"""
+ # XXX We should cache this in memstore too until next write...
count = self._soledad.get_count_from_index(
fields.TYPE_MBOX_IDX,
fields.TYPE_FLAGS_VAL, self.mbox)
+ if self.memstore is not None:
+ count += self.memstore.count_new()
return count
# unseen messages
@@ -1674,6 +1349,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
# recent messages
+ # XXX take it from memstore
def count_recent(self):
"""
Count all messages with the `Recent` flag.
@@ -1686,30 +1362,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
"""
return len(self.recent_flags)
- # deleted messages
-
- def deleted_iter(self):
- """
- Get an iterator for the message UIDs with `deleted` flag.
-
- :return: iterator through deleted message docs
- :rtype: iterable
- """
- return (doc.content[self.UID_KEY] for doc in
- self._soledad.get_from_index(
- fields.TYPE_MBOX_DEL_IDX,
- fields.TYPE_FLAGS_VAL, self.mbox, '1'))
-
- def get_deleted(self):
- """
- Get all messages with the `Deleted` flag.
-
- :returns: a generator of LeapMessages
- :rtype: generator
- """
- return (LeapMessage(self._soledad, docid, self.mbox)
- for docid in self.deleted_iter())
-
def __len__(self):
"""
Returns the number of messages on this mailbox.
diff --git a/src/leap/mail/imap/server.py b/src/leap/mail/imap/server.py
new file mode 100644
index 0000000..ba63846
--- /dev/null
+++ b/src/leap/mail/imap/server.py
@@ -0,0 +1,217 @@
+# -*- coding: utf-8 -*-
+# server.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Leap IMAP4 Server Implementation.
+"""
+from copy import copy
+
+from twisted import cred
+from twisted.internet import defer
+from twisted.internet.defer import maybeDeferred
+from twisted.internet.task import deferLater
+from twisted.mail import imap4
+from twisted.python import log
+
+from leap.common import events as leap_events
+from leap.common.check import leap_assert, leap_assert_type
+from leap.common.events.events_pb2 import IMAP_CLIENT_LOGIN
+from leap.soledad.client import Soledad
+
+
+class LeapIMAPServer(imap4.IMAP4Server):
+ """
+ An IMAP4 Server with mailboxes backed by soledad
+ """
+ def __init__(self, *args, **kwargs):
+ # pop extraneous arguments
+ soledad = kwargs.pop('soledad', None)
+ uuid = kwargs.pop('uuid', None)
+ userid = kwargs.pop('userid', None)
+ leap_assert(soledad, "need a soledad instance")
+ leap_assert_type(soledad, Soledad)
+ leap_assert(uuid, "need a user in the initialization")
+
+ self._userid = userid
+
+ # initialize imap server!
+ imap4.IMAP4Server.__init__(self, *args, **kwargs)
+
+ # we should initialize the account here,
+ # but we move it to the factory so we can
+ # populate the test account properly (and only once
+ # per session)
+
+ def lineReceived(self, line):
+ """
+ Attempt to parse a single line from the server.
+
+ :param line: the line from the server, without the line delimiter.
+ :type line: str
+ """
+ if self.theAccount.closed is True and self.state != "unauth":
+ log.msg("Closing the session. State: unauth")
+ self.state = "unauth"
+
+ if "login" in line.lower():
+ # avoid to log the pass, even though we are using a dummy auth
+ # by now.
+ msg = line[:7] + " [...]"
+ else:
+ msg = copy(line)
+ log.msg('rcv (%s): %s' % (self.state, msg))
+ imap4.IMAP4Server.lineReceived(self, line)
+
+ def authenticateLogin(self, username, password):
+ """
+ Lookup the account with the given parameters, and deny
+ the improper combinations.
+
+ :param username: the username that is attempting authentication.
+ :type username: str
+ :param password: the password to authenticate with.
+ :type password: str
+ """
+ # XXX this should use portal:
+ # return portal.login(cred.credentials.UsernamePassword(user, pass)
+ if username != self._userid:
+ # bad username, reject.
+ raise cred.error.UnauthorizedLogin()
+ # any dummy password is allowed so far. use realm instead!
+ leap_events.signal(IMAP_CLIENT_LOGIN, "1")
+ return imap4.IAccount, self.theAccount, lambda: None
+
+ def do_FETCH(self, tag, messages, query, uid=0):
+ """
+ Overwritten fetch dispatcher to use the fast fetch_flags
+ method
+ """
+ if not query:
+ self.sendPositiveResponse(tag, 'FETCH complete')
+ return
+
+ cbFetch = self._IMAP4Server__cbFetch
+ ebFetch = self._IMAP4Server__ebFetch
+
+ if len(query) == 1 and str(query[0]) == "flags":
+ self._oldTimeout = self.setTimeout(None)
+ # no need to call iter, we get a generator
+ maybeDeferred(
+ self.mbox.fetch_flags, messages, uid=uid
+ ).addCallback(
+ cbFetch, tag, query, uid
+ ).addErrback(ebFetch, tag)
+ elif len(query) == 1 and str(query[0]) == "rfc822.header":
+ self._oldTimeout = self.setTimeout(None)
+ # no need to call iter, we get a generator
+ maybeDeferred(
+ self.mbox.fetch_headers, messages, uid=uid
+ ).addCallback(
+ cbFetch, tag, query, uid
+ ).addErrback(ebFetch, tag)
+ else:
+ self._oldTimeout = self.setTimeout(None)
+ # no need to call iter, we get a generator
+ maybeDeferred(
+ self.mbox.fetch, messages, uid=uid
+ ).addCallback(
+ cbFetch, tag, query, uid
+ ).addErrback(
+ ebFetch, tag
+ ).addCallback(
+ self.on_fetch_finished, messages)
+
+ select_FETCH = (do_FETCH, imap4.IMAP4Server.arg_seqset,
+ imap4.IMAP4Server.arg_fetchatt)
+
+ def on_fetch_finished(self, _, messages):
+ from twisted.internet import reactor
+
+ print "FETCH FINISHED -- NOTIFY NEW"
+ deferLater(reactor, 0, self.notifyNew)
+ deferLater(reactor, 0, self.mbox.unset_recent_flags, messages)
+ deferLater(reactor, 0, self.mbox.signal_unread_to_ui)
+
+ def on_copy_finished(self, defers):
+ d = defer.gatherResults(filter(None, defers))
+
+ def when_finished(result):
+ log.msg("COPY FINISHED")
+ self.notifyNew()
+ self.mbox.signal_unread_to_ui()
+ d.addCallback(when_finished)
+ #d.addCallback(self.notifyNew)
+ #d.addCallback(self.mbox.signal_unread_to_ui)
+
+ def do_COPY(self, tag, messages, mailbox, uid=0):
+ from twisted.internet import reactor
+ defers = []
+ d = imap4.IMAP4Server.do_COPY(self, tag, messages, mailbox, uid)
+ defers.append(d)
+ deferLater(reactor, 0, self.on_copy_finished, defers)
+
+ select_COPY = (do_COPY, imap4.IMAP4Server.arg_seqset,
+ imap4.IMAP4Server.arg_astring)
+
+ def notifyNew(self, ignored=None):
+ """
+ Notify new messages to listeners.
+ """
+ print "TRYING TO NOTIFY NEW"
+ self.mbox.notify_new()
+
+ def _cbSelectWork(self, mbox, cmdName, tag):
+ """
+ Callback for selectWork, patched to avoid conformance errors due to
+ incomplete UIDVALIDITY line.
+ """
+ if mbox is None:
+ self.sendNegativeResponse(tag, 'No such mailbox')
+ return
+ if '\\noselect' in [s.lower() for s in mbox.getFlags()]:
+ self.sendNegativeResponse(tag, 'Mailbox cannot be selected')
+ return
+
+ flags = mbox.getFlags()
+ self.sendUntaggedResponse(str(mbox.getMessageCount()) + ' EXISTS')
+ self.sendUntaggedResponse(str(mbox.getRecentCount()) + ' RECENT')
+ self.sendUntaggedResponse('FLAGS (%s)' % ' '.join(flags))
+
+ # Patched -------------------------------------------------------
+ # imaptest was complaining about the incomplete line, we're adding
+ # "UIDs valid" here.
+ self.sendPositiveResponse(
+ None, '[UIDVALIDITY %d] UIDs valid' % mbox.getUIDValidity())
+ # ----------------------------------------------------------------
+
+ s = mbox.isWriteable() and 'READ-WRITE' or 'READ-ONLY'
+ mbox.addListener(self)
+ self.sendPositiveResponse(tag, '[%s] %s successful' % (s, cmdName))
+ self.state = 'select'
+ self.mbox = mbox
+
+ def checkpoint(self):
+ """
+ Called when the client issues a CHECK command.
+
+ This should perform any checkpoint operations required by the server.
+ It may be a long running operation, but may not block. If it returns
+ a deferred, the client will only be informed of success (or failure)
+ when the deferred's callback (or errback) is invoked.
+ """
+ # TODO return the output of _memstore.is_writing
+ # XXX and that should return a deferred!
+ return None
diff --git a/src/leap/mail/imap/service/imap.py b/src/leap/mail/imap/service/imap.py
index ad22da6..5487cfc 100644
--- a/src/leap/mail/imap/service/imap.py
+++ b/src/leap/mail/imap/service/imap.py
@@ -17,17 +17,12 @@
"""
Imap service initialization
"""
-from copy import copy
-
import logging
+import os
from twisted.internet.protocol import ServerFactory
-from twisted.internet.defer import maybeDeferred
from twisted.internet.error import CannotListenError
-from twisted.internet.task import deferLater
from twisted.mail import imap4
-from twisted.python import log
-from twisted import cred
logger = logging.getLogger(__name__)
@@ -36,6 +31,9 @@ from leap.common.check import leap_assert, leap_assert_type, leap_check
from leap.keymanager import KeyManager
from leap.mail.imap.account import SoledadBackedAccount
from leap.mail.imap.fetch import LeapIncomingMail
+from leap.mail.imap.memorystore import MemoryStore
+from leap.mail.imap.server import LeapIMAPServer
+from leap.mail.imap.soledadstore import SoledadStore
from leap.soledad.client import Soledad
# The default port in which imap service will run
@@ -47,7 +45,6 @@ INCOMING_CHECK_PERIOD = 60
from leap.common.events.events_pb2 import IMAP_SERVICE_STARTED
from leap.common.events.events_pb2 import IMAP_SERVICE_FAILED_TO_START
-from leap.common.events.events_pb2 import IMAP_CLIENT_LOGIN
######################################################
# Temporary workaround for RecursionLimit when using
@@ -68,160 +65,9 @@ except Exception:
######################################################
-
-class LeapIMAPServer(imap4.IMAP4Server):
- """
- An IMAP4 Server with mailboxes backed by soledad
- """
- def __init__(self, *args, **kwargs):
- # pop extraneous arguments
- soledad = kwargs.pop('soledad', None)
- uuid = kwargs.pop('uuid', None)
- userid = kwargs.pop('userid', None)
- leap_assert(soledad, "need a soledad instance")
- leap_assert_type(soledad, Soledad)
- leap_assert(uuid, "need a user in the initialization")
-
- self._userid = userid
-
- # initialize imap server!
- imap4.IMAP4Server.__init__(self, *args, **kwargs)
-
- # we should initialize the account here,
- # but we move it to the factory so we can
- # populate the test account properly (and only once
- # per session)
-
- def lineReceived(self, line):
- """
- Attempt to parse a single line from the server.
-
- :param line: the line from the server, without the line delimiter.
- :type line: str
- """
- if self.theAccount.closed is True and self.state != "unauth":
- log.msg("Closing the session. State: unauth")
- self.state = "unauth"
-
- if "login" in line.lower():
- # avoid to log the pass, even though we are using a dummy auth
- # by now.
- msg = line[:7] + " [...]"
- else:
- msg = copy(line)
- log.msg('rcv (%s): %s' % (self.state, msg))
- imap4.IMAP4Server.lineReceived(self, line)
-
- def authenticateLogin(self, username, password):
- """
- Lookup the account with the given parameters, and deny
- the improper combinations.
-
- :param username: the username that is attempting authentication.
- :type username: str
- :param password: the password to authenticate with.
- :type password: str
- """
- # XXX this should use portal:
- # return portal.login(cred.credentials.UsernamePassword(user, pass)
- if username != self._userid:
- # bad username, reject.
- raise cred.error.UnauthorizedLogin()
- # any dummy password is allowed so far. use realm instead!
- leap_events.signal(IMAP_CLIENT_LOGIN, "1")
- return imap4.IAccount, self.theAccount, lambda: None
-
- def do_FETCH(self, tag, messages, query, uid=0):
- """
- Overwritten fetch dispatcher to use the fast fetch_flags
- method
- """
- from twisted.internet import reactor
- if not query:
- self.sendPositiveResponse(tag, 'FETCH complete')
- return # XXX ???
-
- cbFetch = self._IMAP4Server__cbFetch
- ebFetch = self._IMAP4Server__ebFetch
-
- if len(query) == 1 and str(query[0]) == "flags":
- self._oldTimeout = self.setTimeout(None)
- # no need to call iter, we get a generator
- maybeDeferred(
- self.mbox.fetch_flags, messages, uid=uid
- ).addCallback(
- cbFetch, tag, query, uid
- ).addErrback(ebFetch, tag)
- elif len(query) == 1 and str(query[0]) == "rfc822.header":
- self._oldTimeout = self.setTimeout(None)
- # no need to call iter, we get a generator
- maybeDeferred(
- self.mbox.fetch_headers, messages, uid=uid
- ).addCallback(
- cbFetch, tag, query, uid
- ).addErrback(ebFetch, tag)
- else:
- self._oldTimeout = self.setTimeout(None)
- # no need to call iter, we get a generator
- maybeDeferred(
- self.mbox.fetch, messages, uid=uid
- ).addCallback(
- cbFetch, tag, query, uid
- ).addErrback(
- ebFetch, tag)
-
- deferLater(reactor,
- 2, self.mbox.unset_recent_flags, messages)
- deferLater(reactor, 1, self.mbox.signal_unread_to_ui)
-
- select_FETCH = (do_FETCH, imap4.IMAP4Server.arg_seqset,
- imap4.IMAP4Server.arg_fetchatt)
-
- def do_COPY(self, tag, messages, mailbox, uid=0):
- from twisted.internet import reactor
- imap4.IMAP4Server.do_COPY(self, tag, messages, mailbox, uid)
- deferLater(reactor,
- 2, self.mbox.unset_recent_flags, messages)
- deferLater(reactor, 1, self.mbox.signal_unread_to_ui)
-
- select_COPY = (do_COPY, imap4.IMAP4Server.arg_seqset,
- imap4.IMAP4Server.arg_astring)
-
- def notifyNew(self, ignored):
- """
- Notify new messages to listeners.
- """
- self.mbox.notify_new()
-
- def _cbSelectWork(self, mbox, cmdName, tag):
- """
- Callback for selectWork, patched to avoid conformance errors due to
- incomplete UIDVALIDITY line.
- """
- if mbox is None:
- self.sendNegativeResponse(tag, 'No such mailbox')
- return
- if '\\noselect' in [s.lower() for s in mbox.getFlags()]:
- self.sendNegativeResponse(tag, 'Mailbox cannot be selected')
- return
-
- flags = mbox.getFlags()
- self.sendUntaggedResponse(str(mbox.getMessageCount()) + ' EXISTS')
- self.sendUntaggedResponse(str(mbox.getRecentCount()) + ' RECENT')
- self.sendUntaggedResponse('FLAGS (%s)' % ' '.join(flags))
-
- # Patched -------------------------------------------------------
- # imaptest was complaining about the incomplete line, we're adding
- # "UIDs valid" here.
- self.sendPositiveResponse(
- None, '[UIDVALIDITY %d] UIDs valid' % mbox.getUIDValidity())
- # ----------------------------------------------------------------
-
- s = mbox.isWriteable() and 'READ-WRITE' or 'READ-ONLY'
- mbox.addListener(self)
- self.sendPositiveResponse(tag, '[%s] %s successful' % (s, cmdName))
- self.state = 'select'
- self.mbox = mbox
+DO_MANHOLE = os.environ.get("LEAP_MAIL_MANHOLE", None)
+if DO_MANHOLE:
+ from leap.mail.imap.service import manhole
class IMAPAuthRealm(object):
@@ -256,11 +102,16 @@ class LeapIMAPFactory(ServerFactory):
self._uuid = uuid
self._userid = userid
self._soledad = soledad
+ self._memstore = MemoryStore(
+ permanent_store=SoledadStore(soledad))
theAccount = SoledadBackedAccount(
- uuid, soledad=soledad)
+ uuid, soledad=soledad,
+ memstore=self._memstore)
self.theAccount = theAccount
+ # XXX how to pass the store along?
+
def buildProtocol(self, addr):
"Return a protocol suitable for the job."
imapProtocol = LeapIMAPServer(
@@ -281,6 +132,8 @@ def run_service(*args, **kwargs):
the reactor when starts listening, and the factory for
the protocol.
"""
+ from twisted.internet import reactor
+
leap_assert(len(args) == 2)
soledad, keymanager = args
leap_assert_type(soledad, Soledad)
@@ -295,8 +148,6 @@ def run_service(*args, **kwargs):
uuid = soledad._get_uuid()
factory = LeapIMAPFactory(uuid, userid, soledad)
- from twisted.internet import reactor
-
try:
tport = reactor.listenTCP(port, factory,
interface="localhost")
@@ -317,6 +168,16 @@ def run_service(*args, **kwargs):
else:
# all good.
# (the caller has still to call fetcher.start_loop)
+
+ if DO_MANHOLE:
+ # TODO get pass from env var.too.
+ manhole_factory = manhole.getManholeFactory(
+ {'f': factory,
+ 'a': factory.theAccount,
+ 'gm': factory.theAccount.getMailbox},
+ "boss", "leap")
+ reactor.listenTCP(manhole.MANHOLE_PORT, manhole_factory,
+ interface="127.0.0.1")
logger.debug("IMAP4 Server is RUNNING in port %s" % (port,))
leap_events.signal(IMAP_SERVICE_STARTED, str(port))
return fetcher, tport, factory
diff --git a/src/leap/mail/imap/service/manhole.py b/src/leap/mail/imap/service/manhole.py
new file mode 100644
index 0000000..c83ae89
--- /dev/null
+++ b/src/leap/mail/imap/service/manhole.py
@@ -0,0 +1,130 @@
+# -*- coding: utf-8 -*-
+# manhole.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Utilities for enabling the manhole administrative interface into the
+LEAP Mail application.
+"""
+MANHOLE_PORT = 2222
+
+
+def getManholeFactory(namespace, user, secret):
+ """
+ Get an administrative manhole into the application.
+
+ :param namespace: the namespace to show in the manhole
+ :type namespace: dict
+ :param user: the user to authenticate into the administrative shell.
+ :type user: str
+ :param secret: pass for this manhole
+ :type secret: str
+ """
+ import string
+
+ from twisted.cred.portal import Portal
+ from twisted.conch import manhole, manhole_ssh
+ from twisted.conch.insults import insults
+ from twisted.cred.checkers import (
+ InMemoryUsernamePasswordDatabaseDontUse as MemoryDB)
+
+ from rlcompleter import Completer
+
+ class EnhancedColoredManhole(manhole.ColoredManhole):
+ """
+ A Manhole with some primitive autocomplete support.
+ """
+ # TODO use introspection to make life easier
+
+ def find_common(self, l):
+ """
+ find common parts in thelist items
+ ex: 'ab' for ['abcd','abce','abf']
+ requires an ordered list
+ """
+ if len(l) == 1:
+ return l[0]
+
+ init = l[0]
+ for item in l[1:]:
+ for i, (x, y) in enumerate(zip(init, item)):
+ if x != y:
+ init = "".join(init[:i])
+ break
+
+ if not init:
+ return None
+ return init
+
+ def handle_TAB(self):
+ """
+ Trap the TAB keystroke.
+ """
+ necessarypart = "".join(self.lineBuffer).split(' ')[-1]
+ completer = Completer(globals())
+ if completer.complete(necessarypart, 0):
+ matches = list(set(completer.matches)) # has multiples
+
+ if len(matches) == 1:
+ length = len(necessarypart)
+ self.lineBuffer = self.lineBuffer[:-length]
+ self.lineBuffer.extend(matches[0])
+ self.lineBufferIndex = len(self.lineBuffer)
+ else:
+ matches.sort()
+ commons = self.find_common(matches)
+ if commons:
+ length = len(necessarypart)
+ self.lineBuffer = self.lineBuffer[:-length]
+ self.lineBuffer.extend(commons)
+ self.lineBufferIndex = len(self.lineBuffer)
+
+ self.terminal.nextLine()
+ while matches:
+ matches, part = matches[4:], matches[:4]
+ for item in part:
+ self.terminal.write('%s' % item.ljust(30))
+ self.terminal.write('\n')
+ self.terminal.nextLine()
+
+ self.terminal.eraseLine()
+ self.terminal.cursorBackward(self.lineBufferIndex + 5)
+ self.terminal.write("%s %s" % (
+ self.ps[self.pn], "".join(self.lineBuffer)))
+
+ def keystrokeReceived(self, keyID, modifier):
+ """
+ Act upon any keystroke received.
+ """
+ self.keyHandlers.update({'\b': self.handle_BACKSPACE})
+ m = self.keyHandlers.get(keyID)
+ if m is not None:
+ m()
+ elif keyID in string.printable:
+ self.characterReceived(keyID, False)
+
+ sshRealm = manhole_ssh.TerminalRealm()
+
+ def chainedProtocolFactory():
+ return insults.ServerProtocol(EnhancedColoredManhole, namespace)
+
+ sshRealm = manhole_ssh.TerminalRealm()
+ sshRealm.chainedProtocolFactory = chainedProtocolFactory
+
+ portal = Portal(
+ sshRealm, [MemoryDB(**{user: secret})])
+
+ f = manhole_ssh.ConchFactory(portal)
+ return f
diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py
new file mode 100644
index 0000000..8e22f26
--- /dev/null
+++ b/src/leap/mail/imap/soledadstore.py
@@ -0,0 +1,487 @@
+# -*- coding: utf-8 -*-
+# soledadstore.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+A MessageStore that writes to Soledad.
+"""
+import logging
+import threading
+
+from itertools import chain
+
+from u1db import errors as u1db_errors
+from twisted.internet import defer
+from twisted.python import log
+from zope.interface import implements
+
+from leap.common.check import leap_assert_type
+from leap.mail.decorators import deferred_to_thread
+from leap.mail.imap.messageparts import MessagePartType
+from leap.mail.imap.messageparts import MessageWrapper
+from leap.mail.imap.messageparts import RecentFlagsDoc
+from leap.mail.imap.fields import fields
+from leap.mail.imap.interfaces import IMessageStore
+from leap.mail.messageflow import IMessageConsumer
+from leap.mail.utils import first
+
+logger = logging.getLogger(__name__)
+
+
+# TODO
+# [ ] Delete original message from the incoming queue after all successful
+# writes.
+# [ ] Implement a retry queue.
+# [ ] Consider journaling of operations.
+
+
+class ContentDedup(object):
+ """
+ Message deduplication.
+
+ We do a query for the content hashes before writing to our beloved
+ sqlcipher backend of Soledad. This means, by now, that:
+
+ 1. We will not store the same body/attachment twice, only the hash of it.
+ 2. We will not store the same message header twice, only the hash of it.
+
+ The first case is useful if you are always receiving the same old memes
+ from unwary friends that still have not discovered that 4chan is the
+ generator of the internet. The second will save your day if you have
+ initiated session with the same account in two different machines. I also
+ wonder why would you do that, but let's respect each other choices, like
+ with the religious celebrations, and assume that one day we'll be able
+ to run Bitmask in completely free phones. Yes, I mean that, the whole GSM
+ Stack.
+ """
+ # TODO refactor using unique_query
+
+ def _header_does_exist(self, doc):
+ """
+ Check whether we already have a header document for this
+ content hash in our database.
+
+ :param doc: tentative header for document
+ :type doc: dict
+ :returns: True if it exists, False otherwise.
+ """
+ if not doc:
+ return False
+ chash = doc[fields.CONTENT_HASH_KEY]
+ header_docs = self._soledad.get_from_index(
+ fields.TYPE_C_HASH_IDX,
+ fields.TYPE_HEADERS_VAL, str(chash))
+ if not header_docs:
+ return False
+
+ if len(header_docs) != 1:
+ logger.warning("Found more than one copy of chash %s!"
+ % (chash,))
+ logger.debug("Found header doc with that hash! Skipping save!")
+ return True
+
+ def _content_does_exist(self, doc):
+ """
+ Check whether we already have a content document for a payload
+ with this hash in our database.
+
+ :param doc: tentative content for document
+ :type doc: dict
+ :returns: True if it exists, False otherwise.
+ """
+ if not doc:
+ return False
+ phash = doc[fields.PAYLOAD_HASH_KEY]
+ attach_docs = self._soledad.get_from_index(
+ fields.TYPE_P_HASH_IDX,
+ fields.TYPE_CONTENT_VAL, str(phash))
+ if not attach_docs:
+ return False
+
+ if len(attach_docs) != 1:
+ logger.warning("Found more than one copy of phash %s!"
+ % (phash,))
+ logger.debug("Found attachment doc with that hash! Skipping save!")
+ return True
+
+
+class MsgWriteError(Exception):
+ """
+ Raised if any exception is found while saving message parts.
+ """
+
+
+class SoledadStore(ContentDedup):
+ """
+ This will create docs in the local Soledad database.
+ """
+ _last_uid_lock = threading.Lock()
+
+ implements(IMessageConsumer, IMessageStore)
+
+ def __init__(self, soledad):
+ """
+ Initialize the permanent store that writes to Soledad database.
+
+ :param soledad: the soledad instance
+ :type soledad: Soledad
+ """
+ self._soledad = soledad
+
+ # IMessageStore
+
+ # -------------------------------------------------------------------
+ # We are not yet using this interface, but it would make sense
+ # to implement it.
+
+ def create_message(self, mbox, uid, message):
+ """
+ Create the passed message into this SoledadStore.
+
+ :param mbox: the mbox this message belongs.
+ :type mbox: str or unicode
+ :param uid: the UID that identifies this message in this mailbox.
+ :type uid: int
+ :param message: a IMessageContainer implementor.
+ """
+ raise NotImplementedError()
+
+ def put_message(self, mbox, uid, message):
+ """
+ Put the passed existing message into this SoledadStore.
+
+ :param mbox: the mbox this message belongs.
+ :type mbox: str or unicode
+ :param uid: the UID that identifies this message in this mailbox.
+ :type uid: int
+ :param message: a IMessageContainer implementor.
+ """
+ raise NotImplementedError()
+
+ def remove_message(self, mbox, uid):
+ """
+ Remove the given message from this SoledadStore.
+
+ :param mbox: the mbox this message belongs.
+ :type mbox: str or unicode
+ :param uid: the UID that identifies this message in this mailbox.
+ :type uid: int
+ """
+ raise NotImplementedError()
+
+ def get_message(self, mbox, uid):
+ """
+ Get a IMessageContainer for the given mbox and uid combination.
+
+ :param mbox: the mbox this message belongs.
+ :type mbox: str or unicode
+ :param uid: the UID that identifies this message in this mailbox.
+ :type uid: int
+ """
+ raise NotImplementedError()
+
+ # IMessageConsumer
+
+ # It's not thread-safe to defer this to a different thread
+
+ def consume(self, queue):
+ """
+ Creates a new document in soledad db.
+
+ :param queue: queue to get item from, with content of the document
+ to be inserted.
+ :type queue: Queue
+ """
+ # TODO should delete the original message from incoming only after
+ # the writes are done.
+ # TODO should handle the delete case
+ # TODO should handle errors
+ # TODO could generalize this method into a generic consumer
+ # and only implement `process` here
+
+ def docWriteCallBack(doc_wrapper):
+ """
+ Callback for a successful write of a document wrapper.
+ """
+ if isinstance(doc_wrapper, MessageWrapper):
+ # If everything went well, we can unset the new flag
+ # in the source store (memory store)
+ self._unset_new_dirty(doc_wrapper)
+
+ def docWriteErrorBack(failure):
+ """
+ Errorback for write operations.
+ """
+ log.error("Error while processing item.")
+ log.msg(failure.getTraceBack())
+
+ while not queue.empty():
+ doc_wrapper = queue.get()
+ d = defer.Deferred()
+ d.addCallbacks(docWriteCallBack, docWriteErrorBack)
+
+ self._consume_doc(doc_wrapper, d)
+
+ @deferred_to_thread
+ def _unset_new_dirty(self, doc_wrapper):
+ """
+ Unset the `new` and `dirty` flags for this document wrapper in the
+ memory store.
+
+ :param doc_wrapper: a MessageWrapper instance
+ :type doc_wrapper: MessageWrapper
+ """
+ # XXX debug msg id/mbox?
+ logger.info("unsetting new flag!")
+ doc_wrapper.new = False
+ doc_wrapper.dirty = False
+
+ @deferred_to_thread
+ def _consume_doc(self, doc_wrapper, deferred):
+ """
+ Consume each document wrapper in a separate thread.
+
+ :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance
+ :type doc_wrapper: MessageWrapper or RecentFlagsDoc
+ :param deferred: a deferred that will be fired when the write operation
+ has finished, either calling its callback or its
+ errback depending on whether it succeed.
+ :type deferred: Deferred
+ """
+ items = self._process(doc_wrapper)
+
+ # we prime the generator, that should return the
+ # message or flags wrapper item in the first place.
+ doc_wrapper = items.next()
+
+ # From here, we unpack the subpart items and
+ # the right soledad call.
+ failed = False
+ for item, call in items:
+ try:
+ self._try_call(call, item)
+ except Exception as exc:
+ failed = exc
+ continue
+ if failed:
+ deferred.errback(MsgWriteError(
+ "There was an error writing the mesage"))
+ else:
+ deferred.callback(doc_wrapper)
+
+ #
+ # SoledadStore specific methods.
+ #
+
+ def _process(self, doc_wrapper):
+ """
+ Return an iterator that will yield the doc_wrapper in the first place,
+ followed by the subparts item and the proper call type for every
+ item in the queue, if any.
+
+ :param queue: the queue from where we'll pick item.
+ :type queue: Queue
+ """
+ if isinstance(doc_wrapper, MessageWrapper):
+ return chain((doc_wrapper,),
+ self._get_calls_for_msg_parts(doc_wrapper))
+ elif isinstance(doc_wrapper, RecentFlagsDoc):
+ return chain((doc_wrapper,),
+ self._get_calls_for_rflags_doc(doc_wrapper))
+ else:
+ logger.warning("CANNOT PROCESS ITEM!")
+ return (i for i in [])
+
+ def _try_call(self, call, item):
+ """
+ Try to invoke a given call with item as a parameter.
+
+ :param call: the function to call
+ :type call: callable
+ :param item: the payload to pass to the call as argument
+ :type item: object
+ """
+ if call is None:
+ return
+ try:
+ call(item)
+ except u1db_errors.RevisionConflict as exc:
+ logger.exception("Error: %r" % (exc,))
+ raise exc
+
+ def _get_calls_for_msg_parts(self, msg_wrapper):
+ """
+ Generator that return the proper call type for a given item.
+
+ :param msg_wrapper: A MessageWrapper
+ :type msg_wrapper: IMessageContainer
+ :return: a generator of tuples with recent-flags doc payload
+ and callable
+ :rtype: generator
+ """
+ call = None
+
+ if msg_wrapper.new:
+ call = self._soledad.create_doc
+
+ # item is expected to be a MessagePartDoc
+ for item in msg_wrapper.walk():
+ if item.part == MessagePartType.fdoc:
+ yield dict(item.content), call
+
+ elif item.part == MessagePartType.hdoc:
+ if not self._header_does_exist(item.content):
+ yield dict(item.content), call
+
+ elif item.part == MessagePartType.cdoc:
+ if not self._content_does_exist(item.content):
+ yield dict(item.content), call
+
+ # For now, the only thing that will be dirty is
+ # the flags doc.
+
+ elif msg_wrapper.dirty:
+ call = self._soledad.put_doc
+ # item is expected to be a MessagePartDoc
+ for item in msg_wrapper.walk():
+ # XXX FIXME Give error if dirty and not doc_id !!!
+ doc_id = item.doc_id # defend!
+ if not doc_id:
+ continue
+ doc = self._soledad.get_doc(doc_id)
+ doc.content = dict(item.content)
+ if item.part == MessagePartType.fdoc:
+ logger.debug("PUT dirty fdoc")
+ yield doc, call
+
+ # XXX also for linkage-doc !!!
+ else:
+ logger.error("Cannot delete documents yet from the queue...!")
+
+ def _get_calls_for_rflags_doc(self, rflags_wrapper):
+ """
+ We always put these documents.
+
+ :param rflags_wrapper: A wrapper around recent flags doc.
+ :type rflags_wrapper: RecentFlagsWrapper
+ :return: a tuple with recent-flags doc payload and callable
+ :rtype: tuple
+ """
+ call = self._soledad.put_doc
+ rdoc = self._soledad.get_doc(rflags_wrapper.doc_id)
+
+ payload = rflags_wrapper.content
+ logger.debug("Saving RFLAGS to Soledad...")
+
+ if payload:
+ rdoc.content = payload
+ yield rdoc, call
+
+ def _get_mbox_document(self, mbox):
+ """
+ Return mailbox document.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :return: A SoledadDocument containing this mailbox, or None if
+ the query failed.
+ :rtype: SoledadDocument or None.
+ """
+ try:
+ query = self._soledad.get_from_index(
+ fields.TYPE_MBOX_IDX,
+ fields.TYPE_MBOX_VAL, mbox)
+ if query:
+ return query.pop()
+ except Exception as exc:
+ logger.exception("Unhandled error %r" % exc)
+
+ def get_flags_doc(self, mbox, uid):
+ """
+ Return the SoledadDocument for the given mbox and uid.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param uid: the UID for the message
+ :type uid: int
+ """
+ result = None
+ try:
+ flag_docs = self._soledad.get_from_index(
+ fields.TYPE_MBOX_UID_IDX,
+ fields.TYPE_FLAGS_VAL, mbox, str(uid))
+ result = first(flag_docs)
+ except Exception as exc:
+ # ugh! Something's broken down there!
+ logger.warning("ERROR while getting flags for UID: %s" % uid)
+ logger.exception(exc)
+ finally:
+ return result
+
+ def write_last_uid(self, mbox, value):
+ """
+ Write the `last_uid` integer to the proper mailbox document
+ in Soledad.
+ This is called from the deferred triggered by
+ memorystore.increment_last_soledad_uid, which is expected to
+ run in a separate thread.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param value: the value to set
+ :type value: int
+ """
+ leap_assert_type(value, int)
+ key = fields.LAST_UID_KEY
+
+ with self._last_uid_lock:
+ mbox_doc = self._get_mbox_document(mbox)
+ old_val = mbox_doc.content[key]
+ if value < old_val:
+ logger.error("%r:%s Tried to write a UID lesser than what's "
+ "stored!" % (mbox, value))
+ mbox_doc.content[key] = value
+ self._soledad.put_doc(mbox_doc)
+
+ # deleted messages
+
+ def deleted_iter(self, mbox):
+ """
+ Get an iterator for the SoledadDocuments for messages
+ with \\Deleted flag for a given mailbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :return: iterator through deleted message docs
+ :rtype: iterable
+ """
+ return (doc for doc in self._soledad.get_from_index(
+ fields.TYPE_MBOX_DEL_IDX,
+ fields.TYPE_FLAGS_VAL, mbox, '1'))
+
+ # TODO can deferToThread this?
+ def remove_all_deleted(self, mbox):
+ """
+ Remove from Soledad all messages flagged as deleted for a given
+ mailbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ """
+ deleted = []
+ for doc in self.deleted_iter(mbox):
+ deleted.append(doc.content[fields.UID_KEY])
+ self._soledad.delete_doc(doc)
+ return deleted
diff --git a/src/leap/mail/imap/tests/leap_tests_imap.zsh b/src/leap/mail/imap/tests/leap_tests_imap.zsh
index 676d1a8..544faca 100755
--- a/src/leap/mail/imap/tests/leap_tests_imap.zsh
+++ b/src/leap/mail/imap/tests/leap_tests_imap.zsh
@@ -76,7 +76,7 @@ imaptest_cmd() {
}
stress_imap() {
- mknod imap_pipe p
+ mkfifo imap_pipe
cat imap_pipe | tee output &
imaptest_cmd >> imap_pipe
}
@@ -99,7 +99,7 @@ print_results() {
echo "----------------------"
echo "\tavg\tstdev"
$GREP "avg" ./output | sed -e 's/^ *//g' -e 's/ *$//g' | \
- awk '
+ gawk '
function avg(data, count) {
sum=0;
for( x=0; x <= count-1; x++) {
diff --git a/src/leap/mail/imap/tests/walktree.py b/src/leap/mail/imap/tests/walktree.py
index 1626f65..f3cbcb0 100644
--- a/src/leap/mail/imap/tests/walktree.py
+++ b/src/leap/mail/imap/tests/walktree.py
@@ -18,12 +18,14 @@
Tests for the walktree module.
"""
import os
+import sys
from email import parser
from leap.mail import walk as W
DEBUG = os.environ.get("BITMASK_MAIL_DEBUG")
+
p = parser.Parser()
# TODO pass an argument of the type of message
@@ -31,9 +33,17 @@ p = parser.Parser()
##################################################
# Input from hell
-#msg = p.parse(open('rfc822.multi-signed.message'))
-#msg = p.parse(open('rfc822.plain.message'))
-msg = p.parse(open('rfc822.multi-minimal.message'))
+if len(sys.argv) > 1:
+ FILENAME = sys.argv[1]
+else:
+ FILENAME = "rfc822.multi-minimal.message"
+
+"""
+FILENAME = "rfc822.multi-signed.message"
+FILENAME = "rfc822.plain.message"
+"""
+
+msg = p.parse(open(FILENAME))
DO_CHECK = False
#################################################