diff options
| -rw-r--r-- | changes/feature_4606-serialize-soledad-writes | 2 | ||||
| -rw-r--r-- | src/leap/mail/imap/fetch.py | 37 | ||||
| -rw-r--r-- | src/leap/mail/imap/server.py | 48 | ||||
| -rw-r--r-- | src/leap/mail/messageflow.py | 149 | 
4 files changed, 224 insertions, 12 deletions
| diff --git a/changes/feature_4606-serialize-soledad-writes b/changes/feature_4606-serialize-soledad-writes new file mode 100644 index 0000000..5dfb8ee --- /dev/null +++ b/changes/feature_4606-serialize-soledad-writes @@ -0,0 +1,2 @@ +  o Serialize Soledad Writes for new messages. Fixes segmentation fault when sqlcipher was +    been concurrently accessed from many threads. Closes: #4606 diff --git a/src/leap/mail/imap/fetch.py b/src/leap/mail/imap/fetch.py index 831ff22..14f7a9b 100644 --- a/src/leap/mail/imap/fetch.py +++ b/src/leap/mail/imap/fetch.py @@ -61,7 +61,15 @@ class MalformedMessage(Exception):  class LeapIncomingMail(object):      """ -    Fetches mail from the incoming queue. +    Fetches and process mail from the incoming pool. + +    This object has public methods start_loop and stop that will +    actually initiate a LoopingCall with check_period recurrency. +    The LoopingCall itself will invoke the fetch method each time +    that the check_period expires. + +    This loop will sync the soledad db with the remote server and +    process all the documents found tagged as incoming mail.      """      RECENT_FLAG = "\\Recent" @@ -85,7 +93,7 @@ class LeapIncomingMail(object):                   check_period, userid):          """ -        Initialize LeapIMAP. +        Initialize LeapIncomingMail..          :param keymanager: a keymanager instance          :type keymanager: keymanager.KeyManager @@ -162,6 +170,7 @@ class LeapIncomingMail(object):              logger.warning("Tried to start an already running fetching loop.")      def stop(self): +        # XXX change the name to stop_loop, for consistency.          """          Stops the loop that fetches mail.          """ @@ -185,7 +194,9 @@ class LeapIncomingMail(object):          with self.fetching_lock:              log.msg('syncing soledad...')              self._soledad.sync() +            log.msg('soledad synced.')              doclist = self._soledad.get_from_index("just-mail", "*") +          return doclist      def _signal_unread_to_ui(self): @@ -249,6 +260,8 @@ class LeapIncomingMail(object):          err = failure.value          logger.error("error saving msg locally: %s" % (err,)) +    # process incoming mail. +      def _process_doclist(self, doclist):          """          Iterates through the doclist, checks if each doc @@ -267,7 +280,7 @@ class LeapIncomingMail(object):          docs_cb = []          for index, doc in enumerate(doclist): -            logger.debug("processing doc %d of %d" % (index, num_mails)) +            logger.debug("processing doc %d of %d" % (index + 1, num_mails))              leap_events.signal(                  IMAP_MSG_PROCESSING, str(index), str(num_mails))              keys = doc.content.keys() @@ -275,15 +288,13 @@ class LeapIncomingMail(object):                  # Ok, this looks like a legit msg.                  # Let's process it!                  # Deferred chain for individual messages + +                # XXX use an IConsumer instead... ?                  d = deferToThread(self._decrypt_doc, doc)                  d.addCallback(self._process_decrypted_doc)                  d.addErrback(self._log_err)                  d.addCallback(self._add_message_locally)                  d.addErrback(self._log_err) -                # XXX check this, add_locally should not get called if we -                # get an error in process -                #d.addCallbacks(self._process_decrypted, self._decryption_error) -                #d.addCallbacks(self._add_message_locally, self._saving_error)                  docs_cb.append(d)              else:                  # Ooops, this does not. @@ -317,6 +328,7 @@ class LeapIncomingMail(object):          """          log.msg('decrypting msg')          success = False +          try:              decrdata = self._keymanager.decrypt(                  doc.content[ENC_JSON_KEY], @@ -341,6 +353,7 @@ class LeapIncomingMail(object):          :return: a SoledadDocument and the processed data.          :rtype: (doc, data)          """ +        log.msg('processing decrypted doc')          doc, data = msgtuple          msg = json.loads(data)          if not isinstance(msg, dict): @@ -364,6 +377,7 @@ class LeapIncomingMail(object):          :return: data, possibly descrypted.          :rtype: str          """ +        log.msg('maybe decrypting doc')          leap_assert_type(data, unicode)          # parse the original message @@ -384,7 +398,6 @@ class LeapIncomingMail(object):                  pass          valid_sig = False  # we will add a header saying if sig is valid -        decrdata = ''          if msg.get_content_type() == 'multipart/encrypted':              decrmsg, valid_sig = self._decrypt_multipart_encrypted_msg(                  msg, encoding, senderPubkey) @@ -400,7 +413,7 @@ class LeapIncomingMail(object):          else:              decrmsg.add_header(                  self.LEAP_SIGNATURE_HEADER, -                self.LEAP_SIGNATURE_VALID if valid_sig else \ +                self.LEAP_SIGNATURE_VALID if valid_sig else                  self.LEAP_SIGNATURE_INVALID,                  pubkey=senderPubkey.key_id) @@ -420,6 +433,7 @@ class LeapIncomingMail(object):          :return: A unitary tuple containing a decrypted message.          :rtype: (Message)          """ +        log.msg('decrypting multipart encrypted msg')          msg = copy.deepcopy(msg)          # sanity check          payload = msg.get_payload() @@ -470,7 +484,7 @@ class LeapIncomingMail(object):          return msg, valid_sig      def _maybe_decrypt_inline_encrypted_msg(self, origmsg, encoding, -                                                senderPubkey): +                                            senderPubkey):          """          Possibly decrypt an inline OpenPGP encrypted message. @@ -484,6 +498,7 @@ class LeapIncomingMail(object):          :return: A unitary tuple containing a decrypted message.          :rtype: (Message)          """ +        log.msg('maybe decrypting inline encrypted msg')          # serialize the original message          buf = StringIO()          g = Generator(buf) @@ -527,6 +542,7 @@ class LeapIncomingMail(object):          :raise DecryptError: Raised if failed to decrypt.          """ +        log.msg('decrypting and verifying data')          valid_sig = False          try:              decrdata = self._keymanager.decrypt( @@ -550,6 +566,7 @@ class LeapIncomingMail(object):                           incoming message          :type msgtuple: (SoledadDocument, str)          """ +        log.msg('adding message to local db')          doc, data = msgtuple          self._inbox.addMessage(data, (self.RECENT_FLAG,))          leap_events.signal(IMAP_MSG_SAVED_LOCALLY) diff --git a/src/leap/mail/imap/server.py b/src/leap/mail/imap/server.py index 733944c..6320a51 100644 --- a/src/leap/mail/imap/server.py +++ b/src/leap/mail/imap/server.py @@ -27,6 +27,7 @@ from collections import defaultdict  from email.parser import Parser  from zope.interface import implements +from zope.proxy import sameProxiedObjects  from twisted.mail import imap4  from twisted.internet import defer @@ -36,6 +37,7 @@ from leap.common import events as leap_events  from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL  from leap.common.check import leap_assert, leap_assert_type  from leap.common.mail import get_email_charset +from leap.mail.messageflow import IMessageConsumer, MessageProducer  from leap.soledad.client import Soledad  logger = logging.getLogger(__name__) @@ -816,6 +818,32 @@ class LeapMessage(WithMsgFields):          return self._doc.content.get(key, None) +class SoledadDocWriter(object): +    """ +    This writer will create docs serially in the local soledad database. +    """ + +    implements(IMessageConsumer) + +    def __init__(self, soledad): +        """ +        Initialize the writer. + +        :param soledad: the soledad instance +        :type soledad: Soledad +        """ +        self._soledad = soledad + +    def consume(self, item): +        """ +        Creates a new document in soledad db. + +        :param item: object to update. content of the document to be inserted. +        :type item: dict +        """ +        self._soledad.create_doc(item) + +  class MessageCollection(WithMsgFields, IndexedDB):      """      A collection of messages, surprisingly. @@ -875,6 +903,16 @@ class MessageCollection(WithMsgFields, IndexedDB):          self.initialize_db()          self._parser = Parser() +        # I think of someone like nietzsche when reading this + +        # this will be the producer that will enqueue the content +        # to be processed serially by the consumer (the writer). We just +        # need to `put` the new material on its plate. + +        self._soledad_writer = MessageProducer( +            SoledadDocWriter(soledad), +            period=0.2) +      def _get_empty_msg(self):          """          Returns an empty message. @@ -947,7 +985,9 @@ class MessageCollection(WithMsgFields, IndexedDB):          # ...should get a sanity check here.          content[self.UID_KEY] = uid -        return self._soledad.create_doc(content) +        self._soledad_writer.put(content) +        # XXX have to decide what shall we do with errors with this change... +        #return self._soledad.create_doc(content)      def remove(self, msg):          """ @@ -1041,7 +1081,11 @@ class MessageCollection(WithMsgFields, IndexedDB):          :return: a list of u1db documents          :rtype: list of SoledadDocument          """ -        # XXX this should return LeapMessage instances +        if sameProxiedObjects(self._soledad, None): +            logger.warning('Tried to get messages but soledad is None!') +            return [] + +        #f XXX this should return LeapMessage instances          all_docs = [doc for doc in self._soledad.get_from_index(              SoledadBackedAccount.TYPE_MBOX_IDX,              self.TYPE_MESSAGE_VAL, self.mbox)] diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py new file mode 100644 index 0000000..21f6d62 --- /dev/null +++ b/src/leap/mail/messageflow.py @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- +# messageflow.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. +""" +Message Producers and Consumers for flow control. +""" +import Queue + +from twisted.internet.task import LoopingCall + +from zope.interface import Interface, implements + + +class IMessageConsumer(Interface): + +    def consume(self, item): +        """ +        Consumes the passed item. + +        :param item: an object to be consumed. +        :type item: object +        """ +        # TODO we could add an optional type to be passed +        # for doing type check. + +        # TODO in case of errors, we could return the object to +        # the queue, maybe wrapped in an object with a retries attribute. + + +class DummyMsgConsumer(object): + +    implements(IMessageConsumer) + +    def consume(self, item): +        """ +        Just prints the passed item. +        """ +        print "got item %s" % item + + +class MessageProducer(object): +    """ +    A Producer class that we can use to temporarily buffer the production +    of messages so that different objects can consume them. + +    This is useful for serializing the consumption of the messages stream +    in the case of an slow resource (db), or for returning early from a +    deferred chain and leave further processing detached from the calling loop, +    as in the case of smtp. +    """ +    # TODO this can be seen as a first step towards properly implementing +    # components that implement IPushProducer / IConsumer  interfaces. +    # However, I need to think more about how to pause the streaming. +    # In any case, the differential rate between message production +    # and consumption is not likely (?) to consume huge amounts of memory in +    # our current settings, so the need to pause the stream is not urgent now. + +    def __init__(self, consumer, queue=Queue.Queue, period=1): +        """ +        Initializes the MessageProducer + +        :param consumer: an instance of a IMessageConsumer that will consume +                         the new messages. +        :param queue: any queue implementation to be used as the temporary +                      buffer for new items. Default is a FIFO Queue. +        :param period: the period to check for new items, in seconds. +        """ +        # XXX should assert it implements IConsumer / IMailConsumer +        # it should implement a `consume` method +        self._consumer = consumer + +        self._queue = queue() +        self._period = period + +        self._loop = LoopingCall(self._check_for_new) + +    # private methods + +    def _check_for_new(self): +        """ +        Checks for new items in the internal queue, and calls the consume +        method in the consumer. + +        If the queue is found empty, the loop is stopped. It will be started +        again after the addition of new items. +        """ +        # XXX right now I'm assuming that the period is good enough to allow +        # a right pace of processing. but we could also pass the queue object +        # to the consumer and let it choose whether process a new item or not. + +        if self._queue.empty(): +            self.stop() +        else: +            self._consumer.consume(self._queue.get()) + +    # public methods + +    def put(self, item): +        """ +        Puts a new item in the queue. + +        If the queue was empty, we will start the loop again. +        """ +        was_empty = self._queue.empty() + +        # XXX this might raise if the queue does not accept any new +        # items. what to do then? +        self._queue.put(item) +        if was_empty: +            self.start() + +    def start(self): +        """ +        Starts polling for new items. +        """ +        if not self._loop.running: +            self._loop.start(self._period) + +    def stop(self): +        """ +        Stop polling for new items. +        """ +        if self._loop.running: +            self._loop.stop() + + +if __name__ == "__main__": +    from twisted.internet import reactor +    producer = MessageProducer(DummyMsgConsumer()) +    producer.start() + +    for delay, item in ((2, 1), (3, 2), (4, 3), +                        (6, 4), (7, 5), (8, 6), (8.2, 7), +                        (15, 'a'), (16, 'b'), (17, 'c')): +        reactor.callLater(delay, producer.put, item) +    reactor.run() | 
