diff options
| -rw-r--r-- | mail/src/leap/mail/imap/fetch.py | 186 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/fields.py | 15 | 
2 files changed, 145 insertions, 56 deletions
| diff --git a/mail/src/leap/mail/imap/fetch.py b/mail/src/leap/mail/imap/fetch.py index 40dadb3a..6e12b3fa 100644 --- a/mail/src/leap/mail/imap/fetch.py +++ b/mail/src/leap/mail/imap/fetch.py @@ -23,6 +23,7 @@ import threading  import time  import sys  import traceback +import warnings  from email.parser import Parser  from email.generator import Generator @@ -32,6 +33,8 @@ from StringIO import StringIO  from twisted.python import log  from twisted.internet import defer  from twisted.internet.task import LoopingCall +from twisted.internet.task import deferLater +from u1db import errors as u1db_errors  from zope.proxy import sameProxiedObjects  from leap.common import events as leap_events @@ -46,7 +49,8 @@ from leap.common.mail import get_email_charset  from leap.keymanager import errors as keymanager_errors  from leap.keymanager.openpgp import OpenPGPKey  from leap.mail.decorators import deferred_to_thread -from leap.mail.utils import json_loads +from leap.mail.imap.fields import fields +from leap.mail.utils import json_loads, empty, first  from leap.soledad.client import Soledad  from leap.soledad.common.crypto import ENC_SCHEME_KEY, ENC_JSON_KEY @@ -80,8 +84,6 @@ class LeapIncomingMail(object):      """      RECENT_FLAG = "\\Recent" - -    INCOMING_KEY = "incoming"      CONTENT_KEY = "content"      LEAP_SIGNATURE_HEADER = 'X-Leap-Signature' @@ -130,17 +132,9 @@ class LeapIncomingMail(object):          self._loop = None          self._check_period = check_period -        self._create_soledad_indexes() -          # initialize a mail parser only once          self._parser = Parser() -    def _create_soledad_indexes(self): -        """ -        Create needed indexes on soledad. -        """ -        self._soledad.create_index("just-mail", "incoming") -      @property      def _pkey(self):          if sameProxiedObjects(self._keymanager, None): @@ -159,13 +153,29 @@ class LeapIncomingMail(object):          Calls a deferred that will execute the fetch callback          in a separate thread          """ +        def syncSoledadCallback(result): +            # FIXME this needs a matching change in mx!!! +            # --> need to add ERROR_DECRYPTING_KEY = False +            # as default. +            try: +                doclist = self._soledad.get_from_index( +                    fields.JUST_MAIL_IDX, "*", "0") +            except u1db_errors.InvalidGlobbing: +                # It looks like we are a dealing with an outdated +                # mx. Fallback to the version of the index +                warnings.warn("JUST_MAIL_COMPAT_IDX will be deprecated!", +                              DeprecationWarning) +                doclist = self._soledad.get_from_index( +                    fields.JUST_MAIL_COMPAT_IDX, "*") +            self._process_doclist(doclist) +          logger.debug("fetching mail for: %s %s" % (              self._soledad.uuid, self._userid))          if not self.fetching_lock.locked():              d1 = self._sync_soledad()              d = defer.gatherResults([d1], consumeErrors=True) +            d.addCallbacks(syncSoledadCallback, self._errback)              d.addCallbacks(self._signal_fetch_to_ui, self._errback) -            d.addCallbacks(self._signal_unread_to_ui, self._errback)              return d          else:              logger.debug("Already fetching mail.") @@ -202,46 +212,44 @@ class LeapIncomingMail(object):      @deferred_to_thread      def _sync_soledad(self):          """ -        Synchronizes with remote soledad. +        Synchronize with remote soledad.          :returns: a list of LeapDocuments, or None.          :rtype: iterable or None          """          with self.fetching_lock: -            log.msg('syncing soledad...') +            log.msg('FETCH: syncing soledad...')              self._soledad.sync() -            log.msg('soledad synced.') -            doclist = self._soledad.get_from_index("just-mail", "*") -        self._process_doclist(doclist) - -    def _signal_unread_to_ui(self, *args): -        """ -        Sends unread event to ui. -        """ -        leap_events.signal( -            IMAP_UNREAD_MAIL, str(self._inbox.getUnseenCount())) +            log.msg('FETCH soledad SYNCED.')      def _signal_fetch_to_ui(self, doclist):          """ -        Sends leap events to ui. +        Send leap events to ui.          :param doclist: iterable with msg documents.          :type doclist: iterable.          :returns: doclist          :rtype: iterable          """ -        doclist = doclist[0]  # gatherResults pass us a list -        fetched_ts = time.mktime(time.gmtime()) -        num_mails = len(doclist) if doclist is not None else 0 -        if num_mails != 0: -            log.msg("there are %s mails" % (num_mails,)) +        doclist = first(doclist)  # gatherResults pass us a list +        if doclist: +            fetched_ts = time.mktime(time.gmtime()) +            num_mails = len(doclist) if doclist is not None else 0 +            if num_mails != 0: +                log.msg("there are %s mails" % (num_mails,)) +            leap_events.signal( +                IMAP_FETCHED_INCOMING, str(num_mails), str(fetched_ts)) +            return doclist + +    def _signal_unread_to_ui(self, *args): +        """ +        Sends unread event to ui. +        """          leap_events.signal( -            IMAP_FETCHED_INCOMING, str(num_mails), str(fetched_ts)) -        return doclist +            IMAP_UNREAD_MAIL, str(self._inbox.getUnseenCount()))      # process incoming mail. -    @defer.inlineCallbacks      def _process_doclist(self, doclist):          """          Iterates through the doclist, checks if each doc @@ -262,22 +270,36 @@ class LeapIncomingMail(object):              logger.debug("processing doc %d of %d" % (index + 1, num_mails))              leap_events.signal(                  IMAP_MSG_PROCESSING, str(index), str(num_mails)) +              keys = doc.content.keys() -            if self._is_msg(keys): -                # Ok, this looks like a legit msg. + +            # TODO Compatibility check with the index in pre-0.6 mx +            # that does not write the ERROR_DECRYPTING_KEY +            # This should be removed in 0.7 + +            has_errors = doc.content.get(fields.ERROR_DECRYPTING_KEY, None) +            if has_errors is None: +                warnings.warn("JUST_MAIL_COMPAT_IDX will be deprecated!", +                              DeprecationWarning) +            if has_errors: +                logger.debug("skipping msg with decrypting errors...") + +            if self._is_msg(keys) and not has_errors: +                # Evaluating to bool of has_errors is intentional here. +                # We don't mind at this point if it's None or False. + +                # Ok, this looks like a legit msg, and with no errors.                  # Let's process it! -                decrypted = list(self._decrypt_doc(doc))[0] -                res = self._add_message_locally(decrypted) -                yield res -            else: -                # Ooops, this does not. -                logger.debug('This does not look like a proper msg.') +                d1 = self._decrypt_doc(doc) +                d = defer.gatherResults([d1], consumeErrors=True) +                d.addCallbacks(self._add_message_locally, self._errback)      #      # operations on individual messages      # +    @deferred_to_thread      def _decrypt_doc(self, doc):          """          Decrypt the contents of a document. @@ -302,8 +324,8 @@ class LeapIncomingMail(object):              decrdata = ""          leap_events.signal(IMAP_MSG_DECRYPTED, "1" if success else "0") -        data = list(self._process_decrypted_doc((doc, decrdata))) -        yield (doc, data) +        data = self._process_decrypted_doc((doc, decrdata)) +        return (doc, data)      def _process_decrypted_doc(self, msgtuple):          """ @@ -319,11 +341,35 @@ class LeapIncomingMail(object):          """          log.msg('processing decrypted doc')          doc, data = msgtuple -        msg = json_loads(data) + +        from twisted.internet import reactor + +        # XXX turn this into an errBack for each one of +        # the deferreds that would process an individual document +        try: +            msg = json_loads(data) +        except UnicodeError as exc: +            logger.error("Error while decrypting %s" % (doc.doc_id,)) +            logger.exception(exc) + +            # we flag the message as "with decrypting errors", +            # to avoid further decryption attempts during sync +            # cycles until we're prepared to deal with that. +            # What is the same, when Ivan deals with it... +            # A new decrypting attempt event could be triggered by a +            # future a library upgrade, or a cli flag to the client, +            # we just `defer` that for now... :) +            doc.content[fields.ERROR_DECRYPTING_KEY] = True +            deferLater(reactor, 0, self._update_incoming_message, doc) + +            # FIXME this is just a dirty hack to delay the proper +            # deferred organization here... +            # and remember, boys, do not do this at home. +            return []          if not isinstance(msg, dict):              defer.returnValue(False) -        if not msg.get(self.INCOMING_KEY, False): +        if not msg.get(fields.INCOMING_KEY, False):              defer.returnValue(False)          # ok, this is an incoming message @@ -332,6 +378,27 @@ class LeapIncomingMail(object):              return False          return self._maybe_decrypt_msg(rawmsg) +    @deferred_to_thread +    def _update_incoming_message(self, doc): +        """ +        Do a put for a soledad document. This probably has been called only +        in the case that we've needed to update the ERROR_DECRYPTING_KEY +        flag in an incoming message, to get it out of the decrypting queue. + +        :param doc: the SoledadDocument to update +        :type doc: SoledadDocument +        """ +        log.msg("Updating SoledadDoc %s" % (doc.doc_id)) +        self._soledad.put_doc(doc) + +    @deferred_to_thread +    def _delete_incoming_message(self, doc): +        """ +        Delete document. +        """ +        log.msg("Deleting SoledadDoc %s" % (doc.doc_id)) +        self._soledad.delete_doc(doc) +      def _maybe_decrypt_msg(self, data):          """          Tries to decrypt a gpg message if data looks like one. @@ -384,7 +451,7 @@ class LeapIncomingMail(object):                  self.LEAP_SIGNATURE_INVALID,                  pubkey=senderPubkey.key_id) -        yield decrmsg.as_string() +        return decrmsg.as_string()      def _decrypt_multipart_encrypted_msg(self, msg, encoding, senderPubkey):          """ @@ -505,32 +572,39 @@ class LeapIncomingMail(object):                  data, self._pkey)          return (decrdata, valid_sig) -    def _add_message_locally(self, msgtuple): +    def _add_message_locally(self, result):          """          Adds a message to local inbox and delete it from the incoming db          in soledad. +        # XXX this comes from a gatherresult...          :param msgtuple: a tuple consisting of a SoledadDocument                           instance containing the incoming message                           and data, the json-encoded, decrypted content of the                           incoming message          :type msgtuple: (SoledadDocument, str)          """ -        log.msg('adding message to local db') +        from twisted.internet import reactor +        msgtuple = first(result) +          doc, data = msgtuple +        log.msg('adding message %s to local db' % (doc.doc_id,))          if isinstance(data, list): +            if empty(data): +                return False              data = data[0] -        self._inbox.addMessage(data, flags=(self.RECENT_FLAG,)) +        def msgSavedCallback(result): +            if not empty(result): +                leap_events.signal(IMAP_MSG_SAVED_LOCALLY) +                deferLater(reactor, 0, self._delete_incoming_message, result) +                leap_events.signal(IMAP_MSG_DELETED_INCOMING) +                deferLater(reactor, 1, self._signal_unread_to_ui) -        leap_events.signal(IMAP_MSG_SAVED_LOCALLY) -        doc_id = doc.doc_id -        self._soledad.delete_doc(doc) -        log.msg("deleted doc %s from incoming" % doc_id) -        leap_events.signal(IMAP_MSG_DELETED_INCOMING) -        self._signal_unread_to_ui() -        return True +        # XXX should pass a notify_on_disk=True along... +        d = self._inbox.addMessage(data, flags=(self.RECENT_FLAG,)) +        d.addCallbacks(msgSavedCallback, self._errback)      #      # helpers diff --git a/mail/src/leap/mail/imap/fields.py b/mail/src/leap/mail/imap/fields.py index 886ee631..45769391 100644 --- a/mail/src/leap/mail/imap/fields.py +++ b/mail/src/leap/mail/imap/fields.py @@ -108,6 +108,14 @@ class WithMsgFields(object):      # correct since the recent flag is volatile.      TYPE_MBOX_RECT_SEEN_IDX = 'by-type-and-mbox-and-recent-and-seen' +    # Soledad index for incoming mail, without decrypting errors. +    JUST_MAIL_IDX = "just-mail" +    # XXX the backward-compatible index, will be deprecated at 0.7 +    JUST_MAIL_COMPAT_IDX = "just-mail-compat" + +    INCOMING_KEY = "incoming" +    ERROR_DECRYPTING_KEY = "errdecr" +      KTYPE = TYPE_KEY      MBOX_VAL = TYPE_MBOX_VAL      CHASH_VAL = CONTENT_HASH_KEY @@ -140,6 +148,13 @@ class WithMsgFields(object):          TYPE_MBOX_DEL_IDX: [KTYPE, MBOX_VAL, 'bool(deleted)'],          TYPE_MBOX_RECT_SEEN_IDX: [KTYPE, MBOX_VAL,                                    'bool(recent)', 'bool(seen)'], + +        # incoming queue +        JUST_MAIL_IDX: [INCOMING_KEY, +                        "bool(%s)" % (ERROR_DECRYPTING_KEY,)], + +        # the backward-compatible index, will be deprecated at 0.7 +        JUST_MAIL_COMPAT_IDX: [INCOMING_KEY],      }      MBOX_KEY = MBOX_VAL | 
