summaryrefslogtreecommitdiff
path: root/src/leap/mail
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/mail')
-rw-r--r--src/leap/mail/imap/fetch.py37
-rw-r--r--src/leap/mail/imap/server.py48
-rw-r--r--src/leap/mail/messageflow.py149
3 files changed, 222 insertions, 12 deletions
diff --git a/src/leap/mail/imap/fetch.py b/src/leap/mail/imap/fetch.py
index 831ff22..14f7a9b 100644
--- a/src/leap/mail/imap/fetch.py
+++ b/src/leap/mail/imap/fetch.py
@@ -61,7 +61,15 @@ class MalformedMessage(Exception):
class LeapIncomingMail(object):
"""
- Fetches mail from the incoming queue.
+ Fetches and process mail from the incoming pool.
+
+ This object has public methods start_loop and stop that will
+ actually initiate a LoopingCall with check_period recurrency.
+ The LoopingCall itself will invoke the fetch method each time
+ that the check_period expires.
+
+ This loop will sync the soledad db with the remote server and
+ process all the documents found tagged as incoming mail.
"""
RECENT_FLAG = "\\Recent"
@@ -85,7 +93,7 @@ class LeapIncomingMail(object):
check_period, userid):
"""
- Initialize LeapIMAP.
+ Initialize LeapIncomingMail..
:param keymanager: a keymanager instance
:type keymanager: keymanager.KeyManager
@@ -162,6 +170,7 @@ class LeapIncomingMail(object):
logger.warning("Tried to start an already running fetching loop.")
def stop(self):
+ # XXX change the name to stop_loop, for consistency.
"""
Stops the loop that fetches mail.
"""
@@ -185,7 +194,9 @@ class LeapIncomingMail(object):
with self.fetching_lock:
log.msg('syncing soledad...')
self._soledad.sync()
+ log.msg('soledad synced.')
doclist = self._soledad.get_from_index("just-mail", "*")
+
return doclist
def _signal_unread_to_ui(self):
@@ -249,6 +260,8 @@ class LeapIncomingMail(object):
err = failure.value
logger.error("error saving msg locally: %s" % (err,))
+ # process incoming mail.
+
def _process_doclist(self, doclist):
"""
Iterates through the doclist, checks if each doc
@@ -267,7 +280,7 @@ class LeapIncomingMail(object):
docs_cb = []
for index, doc in enumerate(doclist):
- logger.debug("processing doc %d of %d" % (index, num_mails))
+ logger.debug("processing doc %d of %d" % (index + 1, num_mails))
leap_events.signal(
IMAP_MSG_PROCESSING, str(index), str(num_mails))
keys = doc.content.keys()
@@ -275,15 +288,13 @@ class LeapIncomingMail(object):
# Ok, this looks like a legit msg.
# Let's process it!
# Deferred chain for individual messages
+
+ # XXX use an IConsumer instead... ?
d = deferToThread(self._decrypt_doc, doc)
d.addCallback(self._process_decrypted_doc)
d.addErrback(self._log_err)
d.addCallback(self._add_message_locally)
d.addErrback(self._log_err)
- # XXX check this, add_locally should not get called if we
- # get an error in process
- #d.addCallbacks(self._process_decrypted, self._decryption_error)
- #d.addCallbacks(self._add_message_locally, self._saving_error)
docs_cb.append(d)
else:
# Ooops, this does not.
@@ -317,6 +328,7 @@ class LeapIncomingMail(object):
"""
log.msg('decrypting msg')
success = False
+
try:
decrdata = self._keymanager.decrypt(
doc.content[ENC_JSON_KEY],
@@ -341,6 +353,7 @@ class LeapIncomingMail(object):
:return: a SoledadDocument and the processed data.
:rtype: (doc, data)
"""
+ log.msg('processing decrypted doc')
doc, data = msgtuple
msg = json.loads(data)
if not isinstance(msg, dict):
@@ -364,6 +377,7 @@ class LeapIncomingMail(object):
:return: data, possibly descrypted.
:rtype: str
"""
+ log.msg('maybe decrypting doc')
leap_assert_type(data, unicode)
# parse the original message
@@ -384,7 +398,6 @@ class LeapIncomingMail(object):
pass
valid_sig = False # we will add a header saying if sig is valid
- decrdata = ''
if msg.get_content_type() == 'multipart/encrypted':
decrmsg, valid_sig = self._decrypt_multipart_encrypted_msg(
msg, encoding, senderPubkey)
@@ -400,7 +413,7 @@ class LeapIncomingMail(object):
else:
decrmsg.add_header(
self.LEAP_SIGNATURE_HEADER,
- self.LEAP_SIGNATURE_VALID if valid_sig else \
+ self.LEAP_SIGNATURE_VALID if valid_sig else
self.LEAP_SIGNATURE_INVALID,
pubkey=senderPubkey.key_id)
@@ -420,6 +433,7 @@ class LeapIncomingMail(object):
:return: A unitary tuple containing a decrypted message.
:rtype: (Message)
"""
+ log.msg('decrypting multipart encrypted msg')
msg = copy.deepcopy(msg)
# sanity check
payload = msg.get_payload()
@@ -470,7 +484,7 @@ class LeapIncomingMail(object):
return msg, valid_sig
def _maybe_decrypt_inline_encrypted_msg(self, origmsg, encoding,
- senderPubkey):
+ senderPubkey):
"""
Possibly decrypt an inline OpenPGP encrypted message.
@@ -484,6 +498,7 @@ class LeapIncomingMail(object):
:return: A unitary tuple containing a decrypted message.
:rtype: (Message)
"""
+ log.msg('maybe decrypting inline encrypted msg')
# serialize the original message
buf = StringIO()
g = Generator(buf)
@@ -527,6 +542,7 @@ class LeapIncomingMail(object):
:raise DecryptError: Raised if failed to decrypt.
"""
+ log.msg('decrypting and verifying data')
valid_sig = False
try:
decrdata = self._keymanager.decrypt(
@@ -550,6 +566,7 @@ class LeapIncomingMail(object):
incoming message
:type msgtuple: (SoledadDocument, str)
"""
+ log.msg('adding message to local db')
doc, data = msgtuple
self._inbox.addMessage(data, (self.RECENT_FLAG,))
leap_events.signal(IMAP_MSG_SAVED_LOCALLY)
diff --git a/src/leap/mail/imap/server.py b/src/leap/mail/imap/server.py
index 733944c..6320a51 100644
--- a/src/leap/mail/imap/server.py
+++ b/src/leap/mail/imap/server.py
@@ -27,6 +27,7 @@ from collections import defaultdict
from email.parser import Parser
from zope.interface import implements
+from zope.proxy import sameProxiedObjects
from twisted.mail import imap4
from twisted.internet import defer
@@ -36,6 +37,7 @@ from leap.common import events as leap_events
from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL
from leap.common.check import leap_assert, leap_assert_type
from leap.common.mail import get_email_charset
+from leap.mail.messageflow import IMessageConsumer, MessageProducer
from leap.soledad.client import Soledad
logger = logging.getLogger(__name__)
@@ -816,6 +818,32 @@ class LeapMessage(WithMsgFields):
return self._doc.content.get(key, None)
+class SoledadDocWriter(object):
+ """
+ This writer will create docs serially in the local soledad database.
+ """
+
+ implements(IMessageConsumer)
+
+ def __init__(self, soledad):
+ """
+ Initialize the writer.
+
+ :param soledad: the soledad instance
+ :type soledad: Soledad
+ """
+ self._soledad = soledad
+
+ def consume(self, item):
+ """
+ Creates a new document in soledad db.
+
+ :param item: object to update. content of the document to be inserted.
+ :type item: dict
+ """
+ self._soledad.create_doc(item)
+
+
class MessageCollection(WithMsgFields, IndexedDB):
"""
A collection of messages, surprisingly.
@@ -875,6 +903,16 @@ class MessageCollection(WithMsgFields, IndexedDB):
self.initialize_db()
self._parser = Parser()
+ # I think of someone like nietzsche when reading this
+
+ # this will be the producer that will enqueue the content
+ # to be processed serially by the consumer (the writer). We just
+ # need to `put` the new material on its plate.
+
+ self._soledad_writer = MessageProducer(
+ SoledadDocWriter(soledad),
+ period=0.2)
+
def _get_empty_msg(self):
"""
Returns an empty message.
@@ -947,7 +985,9 @@ class MessageCollection(WithMsgFields, IndexedDB):
# ...should get a sanity check here.
content[self.UID_KEY] = uid
- return self._soledad.create_doc(content)
+ self._soledad_writer.put(content)
+ # XXX have to decide what shall we do with errors with this change...
+ #return self._soledad.create_doc(content)
def remove(self, msg):
"""
@@ -1041,7 +1081,11 @@ class MessageCollection(WithMsgFields, IndexedDB):
:return: a list of u1db documents
:rtype: list of SoledadDocument
"""
- # XXX this should return LeapMessage instances
+ if sameProxiedObjects(self._soledad, None):
+ logger.warning('Tried to get messages but soledad is None!')
+ return []
+
+ #f XXX this should return LeapMessage instances
all_docs = [doc for doc in self._soledad.get_from_index(
SoledadBackedAccount.TYPE_MBOX_IDX,
self.TYPE_MESSAGE_VAL, self.mbox)]
diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py
new file mode 100644
index 0000000..21f6d62
--- /dev/null
+++ b/src/leap/mail/messageflow.py
@@ -0,0 +1,149 @@
+# -*- coding: utf-8 -*-
+# messageflow.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Message Producers and Consumers for flow control.
+"""
+import Queue
+
+from twisted.internet.task import LoopingCall
+
+from zope.interface import Interface, implements
+
+
+class IMessageConsumer(Interface):
+
+ def consume(self, item):
+ """
+ Consumes the passed item.
+
+ :param item: an object to be consumed.
+ :type item: object
+ """
+ # TODO we could add an optional type to be passed
+ # for doing type check.
+
+ # TODO in case of errors, we could return the object to
+ # the queue, maybe wrapped in an object with a retries attribute.
+
+
+class DummyMsgConsumer(object):
+
+ implements(IMessageConsumer)
+
+ def consume(self, item):
+ """
+ Just prints the passed item.
+ """
+ print "got item %s" % item
+
+
+class MessageProducer(object):
+ """
+ A Producer class that we can use to temporarily buffer the production
+ of messages so that different objects can consume them.
+
+ This is useful for serializing the consumption of the messages stream
+ in the case of an slow resource (db), or for returning early from a
+ deferred chain and leave further processing detached from the calling loop,
+ as in the case of smtp.
+ """
+ # TODO this can be seen as a first step towards properly implementing
+ # components that implement IPushProducer / IConsumer interfaces.
+ # However, I need to think more about how to pause the streaming.
+ # In any case, the differential rate between message production
+ # and consumption is not likely (?) to consume huge amounts of memory in
+ # our current settings, so the need to pause the stream is not urgent now.
+
+ def __init__(self, consumer, queue=Queue.Queue, period=1):
+ """
+ Initializes the MessageProducer
+
+ :param consumer: an instance of a IMessageConsumer that will consume
+ the new messages.
+ :param queue: any queue implementation to be used as the temporary
+ buffer for new items. Default is a FIFO Queue.
+ :param period: the period to check for new items, in seconds.
+ """
+ # XXX should assert it implements IConsumer / IMailConsumer
+ # it should implement a `consume` method
+ self._consumer = consumer
+
+ self._queue = queue()
+ self._period = period
+
+ self._loop = LoopingCall(self._check_for_new)
+
+ # private methods
+
+ def _check_for_new(self):
+ """
+ Checks for new items in the internal queue, and calls the consume
+ method in the consumer.
+
+ If the queue is found empty, the loop is stopped. It will be started
+ again after the addition of new items.
+ """
+ # XXX right now I'm assuming that the period is good enough to allow
+ # a right pace of processing. but we could also pass the queue object
+ # to the consumer and let it choose whether process a new item or not.
+
+ if self._queue.empty():
+ self.stop()
+ else:
+ self._consumer.consume(self._queue.get())
+
+ # public methods
+
+ def put(self, item):
+ """
+ Puts a new item in the queue.
+
+ If the queue was empty, we will start the loop again.
+ """
+ was_empty = self._queue.empty()
+
+ # XXX this might raise if the queue does not accept any new
+ # items. what to do then?
+ self._queue.put(item)
+ if was_empty:
+ self.start()
+
+ def start(self):
+ """
+ Starts polling for new items.
+ """
+ if not self._loop.running:
+ self._loop.start(self._period)
+
+ def stop(self):
+ """
+ Stop polling for new items.
+ """
+ if self._loop.running:
+ self._loop.stop()
+
+
+if __name__ == "__main__":
+ from twisted.internet import reactor
+ producer = MessageProducer(DummyMsgConsumer())
+ producer.start()
+
+ for delay, item in ((2, 1), (3, 2), (4, 3),
+ (6, 4), (7, 5), (8, 6), (8.2, 7),
+ (15, 'a'), (16, 'b'), (17, 'c')):
+ reactor.callLater(delay, producer.put, item)
+ reactor.run()