diff options
| author | Kali Kaneko <kali@leap.se> | 2013-11-28 12:49:03 -0200 | 
|---|---|---|
| committer | Kali Kaneko <kali@leap.se> | 2013-11-28 19:15:18 -0200 | 
| commit | af45d5d1e4d9dababd77a60f2f8aabb6f9871dc2 (patch) | |
| tree | 3d97530911e1e1f23570275e61da956e31a59eac | |
| parent | f78bf4c8880de648ad84aa4b4946d922c8298388 (diff) | |
use messageproducer to write messages to soledad
| -rw-r--r-- | mail/src/leap/mail/imap/fetch.py | 37 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/server.py | 48 | 
2 files changed, 73 insertions, 12 deletions
| diff --git a/mail/src/leap/mail/imap/fetch.py b/mail/src/leap/mail/imap/fetch.py index 831ff22..14f7a9b 100644 --- a/mail/src/leap/mail/imap/fetch.py +++ b/mail/src/leap/mail/imap/fetch.py @@ -61,7 +61,15 @@ class MalformedMessage(Exception):  class LeapIncomingMail(object):      """ -    Fetches mail from the incoming queue. +    Fetches and process mail from the incoming pool. + +    This object has public methods start_loop and stop that will +    actually initiate a LoopingCall with check_period recurrency. +    The LoopingCall itself will invoke the fetch method each time +    that the check_period expires. + +    This loop will sync the soledad db with the remote server and +    process all the documents found tagged as incoming mail.      """      RECENT_FLAG = "\\Recent" @@ -85,7 +93,7 @@ class LeapIncomingMail(object):                   check_period, userid):          """ -        Initialize LeapIMAP. +        Initialize LeapIncomingMail..          :param keymanager: a keymanager instance          :type keymanager: keymanager.KeyManager @@ -162,6 +170,7 @@ class LeapIncomingMail(object):              logger.warning("Tried to start an already running fetching loop.")      def stop(self): +        # XXX change the name to stop_loop, for consistency.          """          Stops the loop that fetches mail.          """ @@ -185,7 +194,9 @@ class LeapIncomingMail(object):          with self.fetching_lock:              log.msg('syncing soledad...')              self._soledad.sync() +            log.msg('soledad synced.')              doclist = self._soledad.get_from_index("just-mail", "*") +          return doclist      def _signal_unread_to_ui(self): @@ -249,6 +260,8 @@ class LeapIncomingMail(object):          err = failure.value          logger.error("error saving msg locally: %s" % (err,)) +    # process incoming mail. +      def _process_doclist(self, doclist):          """          Iterates through the doclist, checks if each doc @@ -267,7 +280,7 @@ class LeapIncomingMail(object):          docs_cb = []          for index, doc in enumerate(doclist): -            logger.debug("processing doc %d of %d" % (index, num_mails)) +            logger.debug("processing doc %d of %d" % (index + 1, num_mails))              leap_events.signal(                  IMAP_MSG_PROCESSING, str(index), str(num_mails))              keys = doc.content.keys() @@ -275,15 +288,13 @@ class LeapIncomingMail(object):                  # Ok, this looks like a legit msg.                  # Let's process it!                  # Deferred chain for individual messages + +                # XXX use an IConsumer instead... ?                  d = deferToThread(self._decrypt_doc, doc)                  d.addCallback(self._process_decrypted_doc)                  d.addErrback(self._log_err)                  d.addCallback(self._add_message_locally)                  d.addErrback(self._log_err) -                # XXX check this, add_locally should not get called if we -                # get an error in process -                #d.addCallbacks(self._process_decrypted, self._decryption_error) -                #d.addCallbacks(self._add_message_locally, self._saving_error)                  docs_cb.append(d)              else:                  # Ooops, this does not. @@ -317,6 +328,7 @@ class LeapIncomingMail(object):          """          log.msg('decrypting msg')          success = False +          try:              decrdata = self._keymanager.decrypt(                  doc.content[ENC_JSON_KEY], @@ -341,6 +353,7 @@ class LeapIncomingMail(object):          :return: a SoledadDocument and the processed data.          :rtype: (doc, data)          """ +        log.msg('processing decrypted doc')          doc, data = msgtuple          msg = json.loads(data)          if not isinstance(msg, dict): @@ -364,6 +377,7 @@ class LeapIncomingMail(object):          :return: data, possibly descrypted.          :rtype: str          """ +        log.msg('maybe decrypting doc')          leap_assert_type(data, unicode)          # parse the original message @@ -384,7 +398,6 @@ class LeapIncomingMail(object):                  pass          valid_sig = False  # we will add a header saying if sig is valid -        decrdata = ''          if msg.get_content_type() == 'multipart/encrypted':              decrmsg, valid_sig = self._decrypt_multipart_encrypted_msg(                  msg, encoding, senderPubkey) @@ -400,7 +413,7 @@ class LeapIncomingMail(object):          else:              decrmsg.add_header(                  self.LEAP_SIGNATURE_HEADER, -                self.LEAP_SIGNATURE_VALID if valid_sig else \ +                self.LEAP_SIGNATURE_VALID if valid_sig else                  self.LEAP_SIGNATURE_INVALID,                  pubkey=senderPubkey.key_id) @@ -420,6 +433,7 @@ class LeapIncomingMail(object):          :return: A unitary tuple containing a decrypted message.          :rtype: (Message)          """ +        log.msg('decrypting multipart encrypted msg')          msg = copy.deepcopy(msg)          # sanity check          payload = msg.get_payload() @@ -470,7 +484,7 @@ class LeapIncomingMail(object):          return msg, valid_sig      def _maybe_decrypt_inline_encrypted_msg(self, origmsg, encoding, -                                                senderPubkey): +                                            senderPubkey):          """          Possibly decrypt an inline OpenPGP encrypted message. @@ -484,6 +498,7 @@ class LeapIncomingMail(object):          :return: A unitary tuple containing a decrypted message.          :rtype: (Message)          """ +        log.msg('maybe decrypting inline encrypted msg')          # serialize the original message          buf = StringIO()          g = Generator(buf) @@ -527,6 +542,7 @@ class LeapIncomingMail(object):          :raise DecryptError: Raised if failed to decrypt.          """ +        log.msg('decrypting and verifying data')          valid_sig = False          try:              decrdata = self._keymanager.decrypt( @@ -550,6 +566,7 @@ class LeapIncomingMail(object):                           incoming message          :type msgtuple: (SoledadDocument, str)          """ +        log.msg('adding message to local db')          doc, data = msgtuple          self._inbox.addMessage(data, (self.RECENT_FLAG,))          leap_events.signal(IMAP_MSG_SAVED_LOCALLY) diff --git a/mail/src/leap/mail/imap/server.py b/mail/src/leap/mail/imap/server.py index 733944c..6320a51 100644 --- a/mail/src/leap/mail/imap/server.py +++ b/mail/src/leap/mail/imap/server.py @@ -27,6 +27,7 @@ from collections import defaultdict  from email.parser import Parser  from zope.interface import implements +from zope.proxy import sameProxiedObjects  from twisted.mail import imap4  from twisted.internet import defer @@ -36,6 +37,7 @@ from leap.common import events as leap_events  from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL  from leap.common.check import leap_assert, leap_assert_type  from leap.common.mail import get_email_charset +from leap.mail.messageflow import IMessageConsumer, MessageProducer  from leap.soledad.client import Soledad  logger = logging.getLogger(__name__) @@ -816,6 +818,32 @@ class LeapMessage(WithMsgFields):          return self._doc.content.get(key, None) +class SoledadDocWriter(object): +    """ +    This writer will create docs serially in the local soledad database. +    """ + +    implements(IMessageConsumer) + +    def __init__(self, soledad): +        """ +        Initialize the writer. + +        :param soledad: the soledad instance +        :type soledad: Soledad +        """ +        self._soledad = soledad + +    def consume(self, item): +        """ +        Creates a new document in soledad db. + +        :param item: object to update. content of the document to be inserted. +        :type item: dict +        """ +        self._soledad.create_doc(item) + +  class MessageCollection(WithMsgFields, IndexedDB):      """      A collection of messages, surprisingly. @@ -875,6 +903,16 @@ class MessageCollection(WithMsgFields, IndexedDB):          self.initialize_db()          self._parser = Parser() +        # I think of someone like nietzsche when reading this + +        # this will be the producer that will enqueue the content +        # to be processed serially by the consumer (the writer). We just +        # need to `put` the new material on its plate. + +        self._soledad_writer = MessageProducer( +            SoledadDocWriter(soledad), +            period=0.2) +      def _get_empty_msg(self):          """          Returns an empty message. @@ -947,7 +985,9 @@ class MessageCollection(WithMsgFields, IndexedDB):          # ...should get a sanity check here.          content[self.UID_KEY] = uid -        return self._soledad.create_doc(content) +        self._soledad_writer.put(content) +        # XXX have to decide what shall we do with errors with this change... +        #return self._soledad.create_doc(content)      def remove(self, msg):          """ @@ -1041,7 +1081,11 @@ class MessageCollection(WithMsgFields, IndexedDB):          :return: a list of u1db documents          :rtype: list of SoledadDocument          """ -        # XXX this should return LeapMessage instances +        if sameProxiedObjects(self._soledad, None): +            logger.warning('Tried to get messages but soledad is None!') +            return [] + +        #f XXX this should return LeapMessage instances          all_docs = [doc for doc in self._soledad.get_from_index(              SoledadBackedAccount.TYPE_MBOX_IDX,              self.TYPE_MESSAGE_VAL, self.mbox)] | 
