From 47f1f744bd8bcc42287f11e19afa849fc6c10211 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 28 Nov 2013 12:47:30 -0200 Subject: add message producer and consumer --- src/leap/mail/messageflow.py | 149 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 149 insertions(+) create mode 100644 src/leap/mail/messageflow.py (limited to 'src/leap/mail') diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py new file mode 100644 index 0000000..21f6d62 --- /dev/null +++ b/src/leap/mail/messageflow.py @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- +# messageflow.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Message Producers and Consumers for flow control. +""" +import Queue + +from twisted.internet.task import LoopingCall + +from zope.interface import Interface, implements + + +class IMessageConsumer(Interface): + + def consume(self, item): + """ + Consumes the passed item. + + :param item: an object to be consumed. + :type item: object + """ + # TODO we could add an optional type to be passed + # for doing type check. + + # TODO in case of errors, we could return the object to + # the queue, maybe wrapped in an object with a retries attribute. + + +class DummyMsgConsumer(object): + + implements(IMessageConsumer) + + def consume(self, item): + """ + Just prints the passed item. + """ + print "got item %s" % item + + +class MessageProducer(object): + """ + A Producer class that we can use to temporarily buffer the production + of messages so that different objects can consume them. + + This is useful for serializing the consumption of the messages stream + in the case of an slow resource (db), or for returning early from a + deferred chain and leave further processing detached from the calling loop, + as in the case of smtp. + """ + # TODO this can be seen as a first step towards properly implementing + # components that implement IPushProducer / IConsumer interfaces. + # However, I need to think more about how to pause the streaming. + # In any case, the differential rate between message production + # and consumption is not likely (?) to consume huge amounts of memory in + # our current settings, so the need to pause the stream is not urgent now. + + def __init__(self, consumer, queue=Queue.Queue, period=1): + """ + Initializes the MessageProducer + + :param consumer: an instance of a IMessageConsumer that will consume + the new messages. + :param queue: any queue implementation to be used as the temporary + buffer for new items. Default is a FIFO Queue. + :param period: the period to check for new items, in seconds. + """ + # XXX should assert it implements IConsumer / IMailConsumer + # it should implement a `consume` method + self._consumer = consumer + + self._queue = queue() + self._period = period + + self._loop = LoopingCall(self._check_for_new) + + # private methods + + def _check_for_new(self): + """ + Checks for new items in the internal queue, and calls the consume + method in the consumer. + + If the queue is found empty, the loop is stopped. It will be started + again after the addition of new items. + """ + # XXX right now I'm assuming that the period is good enough to allow + # a right pace of processing. but we could also pass the queue object + # to the consumer and let it choose whether process a new item or not. + + if self._queue.empty(): + self.stop() + else: + self._consumer.consume(self._queue.get()) + + # public methods + + def put(self, item): + """ + Puts a new item in the queue. + + If the queue was empty, we will start the loop again. + """ + was_empty = self._queue.empty() + + # XXX this might raise if the queue does not accept any new + # items. what to do then? + self._queue.put(item) + if was_empty: + self.start() + + def start(self): + """ + Starts polling for new items. + """ + if not self._loop.running: + self._loop.start(self._period) + + def stop(self): + """ + Stop polling for new items. + """ + if self._loop.running: + self._loop.stop() + + +if __name__ == "__main__": + from twisted.internet import reactor + producer = MessageProducer(DummyMsgConsumer()) + producer.start() + + for delay, item in ((2, 1), (3, 2), (4, 3), + (6, 4), (7, 5), (8, 6), (8.2, 7), + (15, 'a'), (16, 'b'), (17, 'c')): + reactor.callLater(delay, producer.put, item) + reactor.run() -- cgit v1.2.3 From 8791daed383605fca6e307c68e70cd94b1b0aa28 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 28 Nov 2013 12:49:03 -0200 Subject: use messageproducer to write messages to soledad --- src/leap/mail/imap/fetch.py | 37 +++++++++++++++++++++++++--------- src/leap/mail/imap/server.py | 48 ++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 73 insertions(+), 12 deletions(-) (limited to 'src/leap/mail') diff --git a/src/leap/mail/imap/fetch.py b/src/leap/mail/imap/fetch.py index 831ff22..14f7a9b 100644 --- a/src/leap/mail/imap/fetch.py +++ b/src/leap/mail/imap/fetch.py @@ -61,7 +61,15 @@ class MalformedMessage(Exception): class LeapIncomingMail(object): """ - Fetches mail from the incoming queue. + Fetches and process mail from the incoming pool. + + This object has public methods start_loop and stop that will + actually initiate a LoopingCall with check_period recurrency. + The LoopingCall itself will invoke the fetch method each time + that the check_period expires. + + This loop will sync the soledad db with the remote server and + process all the documents found tagged as incoming mail. """ RECENT_FLAG = "\\Recent" @@ -85,7 +93,7 @@ class LeapIncomingMail(object): check_period, userid): """ - Initialize LeapIMAP. + Initialize LeapIncomingMail.. :param keymanager: a keymanager instance :type keymanager: keymanager.KeyManager @@ -162,6 +170,7 @@ class LeapIncomingMail(object): logger.warning("Tried to start an already running fetching loop.") def stop(self): + # XXX change the name to stop_loop, for consistency. """ Stops the loop that fetches mail. """ @@ -185,7 +194,9 @@ class LeapIncomingMail(object): with self.fetching_lock: log.msg('syncing soledad...') self._soledad.sync() + log.msg('soledad synced.') doclist = self._soledad.get_from_index("just-mail", "*") + return doclist def _signal_unread_to_ui(self): @@ -249,6 +260,8 @@ class LeapIncomingMail(object): err = failure.value logger.error("error saving msg locally: %s" % (err,)) + # process incoming mail. + def _process_doclist(self, doclist): """ Iterates through the doclist, checks if each doc @@ -267,7 +280,7 @@ class LeapIncomingMail(object): docs_cb = [] for index, doc in enumerate(doclist): - logger.debug("processing doc %d of %d" % (index, num_mails)) + logger.debug("processing doc %d of %d" % (index + 1, num_mails)) leap_events.signal( IMAP_MSG_PROCESSING, str(index), str(num_mails)) keys = doc.content.keys() @@ -275,15 +288,13 @@ class LeapIncomingMail(object): # Ok, this looks like a legit msg. # Let's process it! # Deferred chain for individual messages + + # XXX use an IConsumer instead... ? d = deferToThread(self._decrypt_doc, doc) d.addCallback(self._process_decrypted_doc) d.addErrback(self._log_err) d.addCallback(self._add_message_locally) d.addErrback(self._log_err) - # XXX check this, add_locally should not get called if we - # get an error in process - #d.addCallbacks(self._process_decrypted, self._decryption_error) - #d.addCallbacks(self._add_message_locally, self._saving_error) docs_cb.append(d) else: # Ooops, this does not. @@ -317,6 +328,7 @@ class LeapIncomingMail(object): """ log.msg('decrypting msg') success = False + try: decrdata = self._keymanager.decrypt( doc.content[ENC_JSON_KEY], @@ -341,6 +353,7 @@ class LeapIncomingMail(object): :return: a SoledadDocument and the processed data. :rtype: (doc, data) """ + log.msg('processing decrypted doc') doc, data = msgtuple msg = json.loads(data) if not isinstance(msg, dict): @@ -364,6 +377,7 @@ class LeapIncomingMail(object): :return: data, possibly descrypted. :rtype: str """ + log.msg('maybe decrypting doc') leap_assert_type(data, unicode) # parse the original message @@ -384,7 +398,6 @@ class LeapIncomingMail(object): pass valid_sig = False # we will add a header saying if sig is valid - decrdata = '' if msg.get_content_type() == 'multipart/encrypted': decrmsg, valid_sig = self._decrypt_multipart_encrypted_msg( msg, encoding, senderPubkey) @@ -400,7 +413,7 @@ class LeapIncomingMail(object): else: decrmsg.add_header( self.LEAP_SIGNATURE_HEADER, - self.LEAP_SIGNATURE_VALID if valid_sig else \ + self.LEAP_SIGNATURE_VALID if valid_sig else self.LEAP_SIGNATURE_INVALID, pubkey=senderPubkey.key_id) @@ -420,6 +433,7 @@ class LeapIncomingMail(object): :return: A unitary tuple containing a decrypted message. :rtype: (Message) """ + log.msg('decrypting multipart encrypted msg') msg = copy.deepcopy(msg) # sanity check payload = msg.get_payload() @@ -470,7 +484,7 @@ class LeapIncomingMail(object): return msg, valid_sig def _maybe_decrypt_inline_encrypted_msg(self, origmsg, encoding, - senderPubkey): + senderPubkey): """ Possibly decrypt an inline OpenPGP encrypted message. @@ -484,6 +498,7 @@ class LeapIncomingMail(object): :return: A unitary tuple containing a decrypted message. :rtype: (Message) """ + log.msg('maybe decrypting inline encrypted msg') # serialize the original message buf = StringIO() g = Generator(buf) @@ -527,6 +542,7 @@ class LeapIncomingMail(object): :raise DecryptError: Raised if failed to decrypt. """ + log.msg('decrypting and verifying data') valid_sig = False try: decrdata = self._keymanager.decrypt( @@ -550,6 +566,7 @@ class LeapIncomingMail(object): incoming message :type msgtuple: (SoledadDocument, str) """ + log.msg('adding message to local db') doc, data = msgtuple self._inbox.addMessage(data, (self.RECENT_FLAG,)) leap_events.signal(IMAP_MSG_SAVED_LOCALLY) diff --git a/src/leap/mail/imap/server.py b/src/leap/mail/imap/server.py index 733944c..6320a51 100644 --- a/src/leap/mail/imap/server.py +++ b/src/leap/mail/imap/server.py @@ -27,6 +27,7 @@ from collections import defaultdict from email.parser import Parser from zope.interface import implements +from zope.proxy import sameProxiedObjects from twisted.mail import imap4 from twisted.internet import defer @@ -36,6 +37,7 @@ from leap.common import events as leap_events from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL from leap.common.check import leap_assert, leap_assert_type from leap.common.mail import get_email_charset +from leap.mail.messageflow import IMessageConsumer, MessageProducer from leap.soledad.client import Soledad logger = logging.getLogger(__name__) @@ -816,6 +818,32 @@ class LeapMessage(WithMsgFields): return self._doc.content.get(key, None) +class SoledadDocWriter(object): + """ + This writer will create docs serially in the local soledad database. + """ + + implements(IMessageConsumer) + + def __init__(self, soledad): + """ + Initialize the writer. + + :param soledad: the soledad instance + :type soledad: Soledad + """ + self._soledad = soledad + + def consume(self, item): + """ + Creates a new document in soledad db. + + :param item: object to update. content of the document to be inserted. + :type item: dict + """ + self._soledad.create_doc(item) + + class MessageCollection(WithMsgFields, IndexedDB): """ A collection of messages, surprisingly. @@ -875,6 +903,16 @@ class MessageCollection(WithMsgFields, IndexedDB): self.initialize_db() self._parser = Parser() + # I think of someone like nietzsche when reading this + + # this will be the producer that will enqueue the content + # to be processed serially by the consumer (the writer). We just + # need to `put` the new material on its plate. + + self._soledad_writer = MessageProducer( + SoledadDocWriter(soledad), + period=0.2) + def _get_empty_msg(self): """ Returns an empty message. @@ -947,7 +985,9 @@ class MessageCollection(WithMsgFields, IndexedDB): # ...should get a sanity check here. content[self.UID_KEY] = uid - return self._soledad.create_doc(content) + self._soledad_writer.put(content) + # XXX have to decide what shall we do with errors with this change... + #return self._soledad.create_doc(content) def remove(self, msg): """ @@ -1041,7 +1081,11 @@ class MessageCollection(WithMsgFields, IndexedDB): :return: a list of u1db documents :rtype: list of SoledadDocument """ - # XXX this should return LeapMessage instances + if sameProxiedObjects(self._soledad, None): + logger.warning('Tried to get messages but soledad is None!') + return [] + + #f XXX this should return LeapMessage instances all_docs = [doc for doc in self._soledad.get_from_index( SoledadBackedAccount.TYPE_MBOX_IDX, self.TYPE_MESSAGE_VAL, self.mbox)] -- cgit v1.2.3