From 8791daed383605fca6e307c68e70cd94b1b0aa28 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 28 Nov 2013 12:49:03 -0200 Subject: use messageproducer to write messages to soledad --- src/leap/mail/imap/fetch.py | 37 +++++++++++++++++++++++++--------- src/leap/mail/imap/server.py | 48 ++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 73 insertions(+), 12 deletions(-) (limited to 'src/leap') diff --git a/src/leap/mail/imap/fetch.py b/src/leap/mail/imap/fetch.py index 831ff22..14f7a9b 100644 --- a/src/leap/mail/imap/fetch.py +++ b/src/leap/mail/imap/fetch.py @@ -61,7 +61,15 @@ class MalformedMessage(Exception): class LeapIncomingMail(object): """ - Fetches mail from the incoming queue. + Fetches and process mail from the incoming pool. + + This object has public methods start_loop and stop that will + actually initiate a LoopingCall with check_period recurrency. + The LoopingCall itself will invoke the fetch method each time + that the check_period expires. + + This loop will sync the soledad db with the remote server and + process all the documents found tagged as incoming mail. """ RECENT_FLAG = "\\Recent" @@ -85,7 +93,7 @@ class LeapIncomingMail(object): check_period, userid): """ - Initialize LeapIMAP. + Initialize LeapIncomingMail.. :param keymanager: a keymanager instance :type keymanager: keymanager.KeyManager @@ -162,6 +170,7 @@ class LeapIncomingMail(object): logger.warning("Tried to start an already running fetching loop.") def stop(self): + # XXX change the name to stop_loop, for consistency. """ Stops the loop that fetches mail. """ @@ -185,7 +194,9 @@ class LeapIncomingMail(object): with self.fetching_lock: log.msg('syncing soledad...') self._soledad.sync() + log.msg('soledad synced.') doclist = self._soledad.get_from_index("just-mail", "*") + return doclist def _signal_unread_to_ui(self): @@ -249,6 +260,8 @@ class LeapIncomingMail(object): err = failure.value logger.error("error saving msg locally: %s" % (err,)) + # process incoming mail. + def _process_doclist(self, doclist): """ Iterates through the doclist, checks if each doc @@ -267,7 +280,7 @@ class LeapIncomingMail(object): docs_cb = [] for index, doc in enumerate(doclist): - logger.debug("processing doc %d of %d" % (index, num_mails)) + logger.debug("processing doc %d of %d" % (index + 1, num_mails)) leap_events.signal( IMAP_MSG_PROCESSING, str(index), str(num_mails)) keys = doc.content.keys() @@ -275,15 +288,13 @@ class LeapIncomingMail(object): # Ok, this looks like a legit msg. # Let's process it! # Deferred chain for individual messages + + # XXX use an IConsumer instead... ? d = deferToThread(self._decrypt_doc, doc) d.addCallback(self._process_decrypted_doc) d.addErrback(self._log_err) d.addCallback(self._add_message_locally) d.addErrback(self._log_err) - # XXX check this, add_locally should not get called if we - # get an error in process - #d.addCallbacks(self._process_decrypted, self._decryption_error) - #d.addCallbacks(self._add_message_locally, self._saving_error) docs_cb.append(d) else: # Ooops, this does not. @@ -317,6 +328,7 @@ class LeapIncomingMail(object): """ log.msg('decrypting msg') success = False + try: decrdata = self._keymanager.decrypt( doc.content[ENC_JSON_KEY], @@ -341,6 +353,7 @@ class LeapIncomingMail(object): :return: a SoledadDocument and the processed data. :rtype: (doc, data) """ + log.msg('processing decrypted doc') doc, data = msgtuple msg = json.loads(data) if not isinstance(msg, dict): @@ -364,6 +377,7 @@ class LeapIncomingMail(object): :return: data, possibly descrypted. :rtype: str """ + log.msg('maybe decrypting doc') leap_assert_type(data, unicode) # parse the original message @@ -384,7 +398,6 @@ class LeapIncomingMail(object): pass valid_sig = False # we will add a header saying if sig is valid - decrdata = '' if msg.get_content_type() == 'multipart/encrypted': decrmsg, valid_sig = self._decrypt_multipart_encrypted_msg( msg, encoding, senderPubkey) @@ -400,7 +413,7 @@ class LeapIncomingMail(object): else: decrmsg.add_header( self.LEAP_SIGNATURE_HEADER, - self.LEAP_SIGNATURE_VALID if valid_sig else \ + self.LEAP_SIGNATURE_VALID if valid_sig else self.LEAP_SIGNATURE_INVALID, pubkey=senderPubkey.key_id) @@ -420,6 +433,7 @@ class LeapIncomingMail(object): :return: A unitary tuple containing a decrypted message. :rtype: (Message) """ + log.msg('decrypting multipart encrypted msg') msg = copy.deepcopy(msg) # sanity check payload = msg.get_payload() @@ -470,7 +484,7 @@ class LeapIncomingMail(object): return msg, valid_sig def _maybe_decrypt_inline_encrypted_msg(self, origmsg, encoding, - senderPubkey): + senderPubkey): """ Possibly decrypt an inline OpenPGP encrypted message. @@ -484,6 +498,7 @@ class LeapIncomingMail(object): :return: A unitary tuple containing a decrypted message. :rtype: (Message) """ + log.msg('maybe decrypting inline encrypted msg') # serialize the original message buf = StringIO() g = Generator(buf) @@ -527,6 +542,7 @@ class LeapIncomingMail(object): :raise DecryptError: Raised if failed to decrypt. """ + log.msg('decrypting and verifying data') valid_sig = False try: decrdata = self._keymanager.decrypt( @@ -550,6 +566,7 @@ class LeapIncomingMail(object): incoming message :type msgtuple: (SoledadDocument, str) """ + log.msg('adding message to local db') doc, data = msgtuple self._inbox.addMessage(data, (self.RECENT_FLAG,)) leap_events.signal(IMAP_MSG_SAVED_LOCALLY) diff --git a/src/leap/mail/imap/server.py b/src/leap/mail/imap/server.py index 733944c..6320a51 100644 --- a/src/leap/mail/imap/server.py +++ b/src/leap/mail/imap/server.py @@ -27,6 +27,7 @@ from collections import defaultdict from email.parser import Parser from zope.interface import implements +from zope.proxy import sameProxiedObjects from twisted.mail import imap4 from twisted.internet import defer @@ -36,6 +37,7 @@ from leap.common import events as leap_events from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL from leap.common.check import leap_assert, leap_assert_type from leap.common.mail import get_email_charset +from leap.mail.messageflow import IMessageConsumer, MessageProducer from leap.soledad.client import Soledad logger = logging.getLogger(__name__) @@ -816,6 +818,32 @@ class LeapMessage(WithMsgFields): return self._doc.content.get(key, None) +class SoledadDocWriter(object): + """ + This writer will create docs serially in the local soledad database. + """ + + implements(IMessageConsumer) + + def __init__(self, soledad): + """ + Initialize the writer. + + :param soledad: the soledad instance + :type soledad: Soledad + """ + self._soledad = soledad + + def consume(self, item): + """ + Creates a new document in soledad db. + + :param item: object to update. content of the document to be inserted. + :type item: dict + """ + self._soledad.create_doc(item) + + class MessageCollection(WithMsgFields, IndexedDB): """ A collection of messages, surprisingly. @@ -875,6 +903,16 @@ class MessageCollection(WithMsgFields, IndexedDB): self.initialize_db() self._parser = Parser() + # I think of someone like nietzsche when reading this + + # this will be the producer that will enqueue the content + # to be processed serially by the consumer (the writer). We just + # need to `put` the new material on its plate. + + self._soledad_writer = MessageProducer( + SoledadDocWriter(soledad), + period=0.2) + def _get_empty_msg(self): """ Returns an empty message. @@ -947,7 +985,9 @@ class MessageCollection(WithMsgFields, IndexedDB): # ...should get a sanity check here. content[self.UID_KEY] = uid - return self._soledad.create_doc(content) + self._soledad_writer.put(content) + # XXX have to decide what shall we do with errors with this change... + #return self._soledad.create_doc(content) def remove(self, msg): """ @@ -1041,7 +1081,11 @@ class MessageCollection(WithMsgFields, IndexedDB): :return: a list of u1db documents :rtype: list of SoledadDocument """ - # XXX this should return LeapMessage instances + if sameProxiedObjects(self._soledad, None): + logger.warning('Tried to get messages but soledad is None!') + return [] + + #f XXX this should return LeapMessage instances all_docs = [doc for doc in self._soledad.get_from_index( SoledadBackedAccount.TYPE_MBOX_IDX, self.TYPE_MESSAGE_VAL, self.mbox)] -- cgit v1.2.3