diff options
| author | Kali Kaneko <kali@leap.se> | 2013-08-22 22:38:53 +0200 | 
|---|---|---|
| committer | Kali Kaneko <kali@leap.se> | 2013-08-23 11:23:48 +0200 | 
| commit | cc7059da23934d6ee8030c5c8b71c8c9618b4272 (patch) | |
| tree | ae196ff17d593ad2a4667d050137600fa2ad8474 /mail/src | |
| parent | cf2afc254f7c6f8ac67b83c417b17ca4891a6059 (diff) | |
refactor imap fetch
Diffstat (limited to 'mail/src')
| -rw-r--r-- | mail/src/leap/mail/imap/fetch.py | 217 | 
1 files changed, 152 insertions, 65 deletions
| diff --git a/mail/src/leap/mail/imap/fetch.py b/mail/src/leap/mail/imap/fetch.py index 2b25d82..8b29c5e 100644 --- a/mail/src/leap/mail/imap/fetch.py +++ b/mail/src/leap/mail/imap/fetch.py @@ -20,10 +20,10 @@ Incoming mail fetcher.  import logging  import json  import ssl +import threading  import time  from twisted.python import log -from twisted.internet import defer  from twisted.internet.task import LoopingCall  from twisted.internet.threads import deferToThread @@ -53,6 +53,8 @@ class LeapIncomingMail(object):      INCOMING_KEY = "incoming"      CONTENT_KEY = "content" +    fetching_lock = threading.Lock() +      def __init__(self, keymanager, soledad, imap_account,                   check_period): @@ -95,6 +97,10 @@ class LeapIncomingMail(object):          """          self._soledad.create_index("just-mail", "incoming") +    # +    # Public API: fetch, start_loop, stop. +    # +      def fetch(self):          """          Fetch incoming mail, to be called periodically. @@ -102,9 +108,13 @@ class LeapIncomingMail(object):          Calls a deferred that will execute the fetch callback          in a separate thread          """ -        d = deferToThread(self._sync_soledad) -        d.addCallbacks(self._process_doclist, self._sync_soledad_err) -        return d +        if not self.fetching_lock.locked(): +            d = deferToThread(self._sync_soledad) +            d.addCallbacks(self._signal_fetch_to_ui, self._sync_soledad_error) +            d.addCallbacks(self._process_doclist, self._sync_soledad_error) +            return d +        else: +            logger.debug("Already fetching mail.")      def start_loop(self):          """ @@ -117,52 +127,113 @@ class LeapIncomingMail(object):          """          Stops the loop that fetches mail.          """ +        # XXX should cancel ongoing fetches too.          if self._loop and self._loop.running is True:              self._loop.stop() +    # +    # Private methods. +    # + +    # synchronize incoming mail +      def _sync_soledad(self): -        log.msg('syncing soledad...') +        """ +        Synchronizes with remote soledad. -        try: +        :returns: a list of LeapDocuments, or None. +        :rtype: iterable or None +        """ +        with self.fetching_lock: +            log.msg('syncing soledad...')              self._soledad.sync() -            fetched_ts = time.mktime(time.gmtime())              doclist = self._soledad.get_from_index("just-mail", "*") -            num_mails = len(doclist) -            log.msg("there are %s mails" % (num_mails,)) -            leap_events.signal( -                IMAP_FETCHED_INCOMING, str(num_mails), str(fetched_ts)) -            leap_events.signal( -                IMAP_UNREAD_MAIL, str(self._inbox.getUnseenCount())) -            return doclist -        except ssl.SSLError as exc: -            logger.warning('SSL Error while syncing soledad: %r' % (exc,)) -        except Exception as exc: -            logger.warning('Error while syncing soledad: %r' % (exc,)) +        return doclist -    def _sync_soledad_err(self, f): -        log.err("error syncing soledad: %s" % (f.value,)) -        return f +    def _signal_fetch_to_ui(self, doclist): +        """ +        Sends leap events to ui. + +        :param doclist: iterable with msg documents. +        :type doclist: iterable. +        :returns: doclist +        :rtype: iterable +        """ +        fetched_ts = time.mktime(time.gmtime()) +        num_mails = len(doclist) +        log.msg("there are %s mails" % (num_mails,)) +        leap_events.signal( +            IMAP_FETCHED_INCOMING, str(num_mails), str(fetched_ts)) +        leap_events.signal( +            IMAP_UNREAD_MAIL, str(self._inbox.getUnseenCount())) +        return doclist + +    def _sync_soledad_error(self, failure): +        """ +        Errback for sync errors. +        """ +        # XXX should signal unrecoverable maybe. +        err = failure.value +        logger.error("error syncing soledad: %s" % (err,)) +        if failure.check(ssl.SSLError): +            logger.warning('SSL Error while ' +                           'syncing soledad: %r' % (err,)) +        elif failure.check(Exception): +            logger.warning('Unknown error while ' +                           'syncing soledad: %r' % (err,))      def _process_doclist(self, doclist): +        """ +        Iterates through the doclist, checks if each doc +        looks like a message, and yields a deferred that will decrypt and +        process the message. + +        :param doclist: iterable with msg documents. +        :type doclist: iterable. +        :returns: a list of deferreds for individual messages. +        """          log.msg('processing doclist')          if not doclist:              logger.debug("no docs found")              return          num_mails = len(doclist) + +        docs_cb = []          for index, doc in enumerate(doclist):              logger.debug("processing doc %d of %d: %s" % (                  index, num_mails, doc))              leap_events.signal(                  IMAP_MSG_PROCESSING, str(index), str(num_mails))              keys = doc.content.keys() -            if ENC_SCHEME_KEY in keys and ENC_JSON_KEY in keys: - -                # XXX should check for _enc_scheme == "pubkey" || "none" -                # that is what incoming mail uses. +            if self._is_msg(keys): +                # Ok, this looks like a legit msg. +                # Let's process it!                  encdata = doc.content[ENC_JSON_KEY] -                defer.Deferred(self._decrypt_msg(doc, encdata)) + +                # Deferred chain for individual messages +                d = deferToThread(self._decrypt_msg, doc, encdata) +                d.addCallback(self._process_decrypted) +                d.addCallback(self._add_message_locally) +                docs_cb.append(d)              else: +                # Ooops, this does not.                  logger.debug('This does not look like a proper msg.') +        return docs_cb + +    # +    # operations on individual messages +    # + +    def _is_msg(self, keys): +        """ +        Checks if the keys of a dictionary match the signature +        of the document type we use for messages. + +        :param keys: iterable containing the strings to match. +        :type keys: iterable of strings. +        :rtype: bool +        """ +        return ENC_SCHEME_KEY in keys and ENC_JSON_KEY in keys      def _decrypt_msg(self, doc, encdata):          log.msg('decrypting msg') @@ -170,64 +241,80 @@ class LeapIncomingMail(object):          try:              decrdata = (self._keymanager.decrypt(                  encdata, key, -                # XXX get from public method instead -                passphrase=self._soledad._passphrase)) +                passphrase=self._soledad.passphrase))              ok = True          except Exception as exc: +            # XXX move this to errback !!!              logger.warning("Error while decrypting msg: %r" % (exc,))              decrdata = ""              ok = False          leap_events.signal(IMAP_MSG_DECRYPTED, "1" if ok else "0") -        # XXX TODO: defer this properly -        return self._process_decrypted(doc, decrdata) +        return doc, decrdata -    def _process_decrypted(self, doc, data): +    def _process_decrypted(self, msgtuple):          """          Process a successfully decrypted message. -        :param doc: a SoledadDocument instance containing the incoming message -        :type doc: SoledadDocument - -        :param data: the json-encoded, decrypted content of the incoming -                     message -        :type data: str - -        :param inbox: a open SoledadMailbox instance where this message is -                      to be saved -        :type inbox: SoledadMailbox +        :param msgtuple: a tuple consisting of a SoledadDocument +                         instance containing the incoming message +                         and data, the json-encoded, decrypted content of the +                         incoming message +        :type msgtuple: (SoledadDocument, str) +        :returns: a SoledadDocument and the processed data. +        :rtype: (doc, data)          """ -        log.msg("processing incoming message!") +        doc, data = msgtuple          msg = json.loads(data)          if not isinstance(msg, dict):              return False          if not msg.get(self.INCOMING_KEY, False):              return False +          # ok, this is an incoming message          rawmsg = msg.get(self.CONTENT_KEY, None)          if not rawmsg:              return False          logger.debug('got incoming message: %s' % (rawmsg,)) +        data = self._maybe_decrypt_gpg_msg(rawmsg) +        return doc, data -        # XXX factor out gpg bits. -        try: -            pgp_beg = "-----BEGIN PGP MESSAGE-----" -            pgp_end = "-----END PGP MESSAGE-----" -            if pgp_beg in rawmsg: -                first = rawmsg.find(pgp_beg) -                last = rawmsg.rfind(pgp_end) -                pgp_message = rawmsg[first:first+last] - -                decrdata = (self._keymanager.decrypt( -                    pgp_message, self._pkey, -                    # XXX get from public method instead -                    passphrase=self._soledad._passphrase)) -                rawmsg = rawmsg.replace(pgp_message, decrdata) -            # add to inbox and delete from soledad -            self._inbox.addMessage(rawmsg, (self.RECENT_FLAG,)) -            leap_events.signal(IMAP_MSG_SAVED_LOCALLY) -            doc_id = doc.doc_id -            self._soledad.delete_doc(doc) -            log.msg("deleted doc %s from incoming" % doc_id) -            leap_events.signal(IMAP_MSG_DELETED_INCOMING) -        except Exception as e: -            logger.error("Problem processing incoming mail: %r" % (e,)) +    def _maybe_decrypt_gpg_msg(self, data): +        """ +        Tries to decrypt a gpg message if data looks like one. + +        :param data: the text to be decrypted. +        :type data: str +        :return: data, possibly descrypted. +        :rtype: str +        """ +        PGP_BEGIN = "-----BEGIN PGP MESSAGE-----" +        PGP_END = "-----END PGP MESSAGE-----" +        if PGP_BEGIN in data: +            begin = data.find(PGP_BEGIN) +            end = data.rfind(PGP_END) +            pgp_message = data[begin:begin+end] + +            decrdata = (self._keymanager.decrypt( +                pgp_message, self._pkey, +                passphrase=self._soledad.passphrase)) +            data = data.replace(pgp_message, decrdata) +        return data + +    def _add_message_locally(self, msgtuple): +        """ +        Adds a message to local inbox and delete it from the incoming db +        in soledad. + +        :param msgtuple: a tuple consisting of a SoledadDocument +                         instance containing the incoming message +                         and data, the json-encoded, decrypted content of the +                         incoming message +        :type msgtuple: (SoledadDocument, str) +        """ +        doc, data = msgtuple +        self._inbox.addMessage(data, (self.RECENT_FLAG,)) +        leap_events.signal(IMAP_MSG_SAVED_LOCALLY) +        doc_id = doc.doc_id +        self._soledad.delete_doc(doc) +        log.msg("deleted doc %s from incoming" % doc_id) +        leap_events.signal(IMAP_MSG_DELETED_INCOMING) | 
