From cc7059da23934d6ee8030c5c8b71c8c9618b4272 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 22 Aug 2013 22:38:53 +0200 Subject: refactor imap fetch --- mail/changes/feature_3423_refactor_imap_fetch | 1 + mail/src/leap/mail/imap/fetch.py | 217 ++++++++++++++++++-------- 2 files changed, 153 insertions(+), 65 deletions(-) create mode 100644 mail/changes/feature_3423_refactor_imap_fetch diff --git a/mail/changes/feature_3423_refactor_imap_fetch b/mail/changes/feature_3423_refactor_imap_fetch new file mode 100644 index 00000000..cacceef3 --- /dev/null +++ b/mail/changes/feature_3423_refactor_imap_fetch @@ -0,0 +1 @@ + o Refactor imap fetch code for better defer handling. Closes: #3423 diff --git a/mail/src/leap/mail/imap/fetch.py b/mail/src/leap/mail/imap/fetch.py index 2b25d820..8b29c5e9 100644 --- a/mail/src/leap/mail/imap/fetch.py +++ b/mail/src/leap/mail/imap/fetch.py @@ -20,10 +20,10 @@ Incoming mail fetcher. import logging import json import ssl +import threading import time from twisted.python import log -from twisted.internet import defer from twisted.internet.task import LoopingCall from twisted.internet.threads import deferToThread @@ -53,6 +53,8 @@ class LeapIncomingMail(object): INCOMING_KEY = "incoming" CONTENT_KEY = "content" + fetching_lock = threading.Lock() + def __init__(self, keymanager, soledad, imap_account, check_period): @@ -95,6 +97,10 @@ class LeapIncomingMail(object): """ self._soledad.create_index("just-mail", "incoming") + # + # Public API: fetch, start_loop, stop. + # + def fetch(self): """ Fetch incoming mail, to be called periodically. @@ -102,9 +108,13 @@ class LeapIncomingMail(object): Calls a deferred that will execute the fetch callback in a separate thread """ - d = deferToThread(self._sync_soledad) - d.addCallbacks(self._process_doclist, self._sync_soledad_err) - return d + if not self.fetching_lock.locked(): + d = deferToThread(self._sync_soledad) + d.addCallbacks(self._signal_fetch_to_ui, self._sync_soledad_error) + d.addCallbacks(self._process_doclist, self._sync_soledad_error) + return d + else: + logger.debug("Already fetching mail.") def start_loop(self): """ @@ -117,52 +127,113 @@ class LeapIncomingMail(object): """ Stops the loop that fetches mail. """ + # XXX should cancel ongoing fetches too. if self._loop and self._loop.running is True: self._loop.stop() + # + # Private methods. + # + + # synchronize incoming mail + def _sync_soledad(self): - log.msg('syncing soledad...') + """ + Synchronizes with remote soledad. - try: + :returns: a list of LeapDocuments, or None. + :rtype: iterable or None + """ + with self.fetching_lock: + log.msg('syncing soledad...') self._soledad.sync() - fetched_ts = time.mktime(time.gmtime()) doclist = self._soledad.get_from_index("just-mail", "*") - num_mails = len(doclist) - log.msg("there are %s mails" % (num_mails,)) - leap_events.signal( - IMAP_FETCHED_INCOMING, str(num_mails), str(fetched_ts)) - leap_events.signal( - IMAP_UNREAD_MAIL, str(self._inbox.getUnseenCount())) - return doclist - except ssl.SSLError as exc: - logger.warning('SSL Error while syncing soledad: %r' % (exc,)) - except Exception as exc: - logger.warning('Error while syncing soledad: %r' % (exc,)) + return doclist - def _sync_soledad_err(self, f): - log.err("error syncing soledad: %s" % (f.value,)) - return f + def _signal_fetch_to_ui(self, doclist): + """ + Sends leap events to ui. + + :param doclist: iterable with msg documents. + :type doclist: iterable. + :returns: doclist + :rtype: iterable + """ + fetched_ts = time.mktime(time.gmtime()) + num_mails = len(doclist) + log.msg("there are %s mails" % (num_mails,)) + leap_events.signal( + IMAP_FETCHED_INCOMING, str(num_mails), str(fetched_ts)) + leap_events.signal( + IMAP_UNREAD_MAIL, str(self._inbox.getUnseenCount())) + return doclist + + def _sync_soledad_error(self, failure): + """ + Errback for sync errors. + """ + # XXX should signal unrecoverable maybe. + err = failure.value + logger.error("error syncing soledad: %s" % (err,)) + if failure.check(ssl.SSLError): + logger.warning('SSL Error while ' + 'syncing soledad: %r' % (err,)) + elif failure.check(Exception): + logger.warning('Unknown error while ' + 'syncing soledad: %r' % (err,)) def _process_doclist(self, doclist): + """ + Iterates through the doclist, checks if each doc + looks like a message, and yields a deferred that will decrypt and + process the message. + + :param doclist: iterable with msg documents. + :type doclist: iterable. + :returns: a list of deferreds for individual messages. + """ log.msg('processing doclist') if not doclist: logger.debug("no docs found") return num_mails = len(doclist) + + docs_cb = [] for index, doc in enumerate(doclist): logger.debug("processing doc %d of %d: %s" % ( index, num_mails, doc)) leap_events.signal( IMAP_MSG_PROCESSING, str(index), str(num_mails)) keys = doc.content.keys() - if ENC_SCHEME_KEY in keys and ENC_JSON_KEY in keys: - - # XXX should check for _enc_scheme == "pubkey" || "none" - # that is what incoming mail uses. + if self._is_msg(keys): + # Ok, this looks like a legit msg. + # Let's process it! encdata = doc.content[ENC_JSON_KEY] - defer.Deferred(self._decrypt_msg(doc, encdata)) + + # Deferred chain for individual messages + d = deferToThread(self._decrypt_msg, doc, encdata) + d.addCallback(self._process_decrypted) + d.addCallback(self._add_message_locally) + docs_cb.append(d) else: + # Ooops, this does not. logger.debug('This does not look like a proper msg.') + return docs_cb + + # + # operations on individual messages + # + + def _is_msg(self, keys): + """ + Checks if the keys of a dictionary match the signature + of the document type we use for messages. + + :param keys: iterable containing the strings to match. + :type keys: iterable of strings. + :rtype: bool + """ + return ENC_SCHEME_KEY in keys and ENC_JSON_KEY in keys def _decrypt_msg(self, doc, encdata): log.msg('decrypting msg') @@ -170,64 +241,80 @@ class LeapIncomingMail(object): try: decrdata = (self._keymanager.decrypt( encdata, key, - # XXX get from public method instead - passphrase=self._soledad._passphrase)) + passphrase=self._soledad.passphrase)) ok = True except Exception as exc: + # XXX move this to errback !!! logger.warning("Error while decrypting msg: %r" % (exc,)) decrdata = "" ok = False leap_events.signal(IMAP_MSG_DECRYPTED, "1" if ok else "0") - # XXX TODO: defer this properly - return self._process_decrypted(doc, decrdata) + return doc, decrdata - def _process_decrypted(self, doc, data): + def _process_decrypted(self, msgtuple): """ Process a successfully decrypted message. - :param doc: a SoledadDocument instance containing the incoming message - :type doc: SoledadDocument - - :param data: the json-encoded, decrypted content of the incoming - message - :type data: str - - :param inbox: a open SoledadMailbox instance where this message is - to be saved - :type inbox: SoledadMailbox + :param msgtuple: a tuple consisting of a SoledadDocument + instance containing the incoming message + and data, the json-encoded, decrypted content of the + incoming message + :type msgtuple: (SoledadDocument, str) + :returns: a SoledadDocument and the processed data. + :rtype: (doc, data) """ - log.msg("processing incoming message!") + doc, data = msgtuple msg = json.loads(data) if not isinstance(msg, dict): return False if not msg.get(self.INCOMING_KEY, False): return False + # ok, this is an incoming message rawmsg = msg.get(self.CONTENT_KEY, None) if not rawmsg: return False logger.debug('got incoming message: %s' % (rawmsg,)) + data = self._maybe_decrypt_gpg_msg(rawmsg) + return doc, data - # XXX factor out gpg bits. - try: - pgp_beg = "-----BEGIN PGP MESSAGE-----" - pgp_end = "-----END PGP MESSAGE-----" - if pgp_beg in rawmsg: - first = rawmsg.find(pgp_beg) - last = rawmsg.rfind(pgp_end) - pgp_message = rawmsg[first:first+last] - - decrdata = (self._keymanager.decrypt( - pgp_message, self._pkey, - # XXX get from public method instead - passphrase=self._soledad._passphrase)) - rawmsg = rawmsg.replace(pgp_message, decrdata) - # add to inbox and delete from soledad - self._inbox.addMessage(rawmsg, (self.RECENT_FLAG,)) - leap_events.signal(IMAP_MSG_SAVED_LOCALLY) - doc_id = doc.doc_id - self._soledad.delete_doc(doc) - log.msg("deleted doc %s from incoming" % doc_id) - leap_events.signal(IMAP_MSG_DELETED_INCOMING) - except Exception as e: - logger.error("Problem processing incoming mail: %r" % (e,)) + def _maybe_decrypt_gpg_msg(self, data): + """ + Tries to decrypt a gpg message if data looks like one. + + :param data: the text to be decrypted. + :type data: str + :return: data, possibly descrypted. + :rtype: str + """ + PGP_BEGIN = "-----BEGIN PGP MESSAGE-----" + PGP_END = "-----END PGP MESSAGE-----" + if PGP_BEGIN in data: + begin = data.find(PGP_BEGIN) + end = data.rfind(PGP_END) + pgp_message = data[begin:begin+end] + + decrdata = (self._keymanager.decrypt( + pgp_message, self._pkey, + passphrase=self._soledad.passphrase)) + data = data.replace(pgp_message, decrdata) + return data + + def _add_message_locally(self, msgtuple): + """ + Adds a message to local inbox and delete it from the incoming db + in soledad. + + :param msgtuple: a tuple consisting of a SoledadDocument + instance containing the incoming message + and data, the json-encoded, decrypted content of the + incoming message + :type msgtuple: (SoledadDocument, str) + """ + doc, data = msgtuple + self._inbox.addMessage(data, (self.RECENT_FLAG,)) + leap_events.signal(IMAP_MSG_SAVED_LOCALLY) + doc_id = doc.doc_id + self._soledad.delete_doc(doc) + log.msg("deleted doc %s from incoming" % doc_id) + leap_events.signal(IMAP_MSG_DELETED_INCOMING) -- cgit v1.2.3