summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2013-08-22 22:38:53 +0200
committerKali Kaneko <kali@leap.se>2013-08-23 11:23:48 +0200
commite9381e8b561e0f327e51e732f523352af6a9c8b0 (patch)
tree63d806fed7a023a19152e2bbc540216e3acd7bdc
parent79971e7c2b26e0dd34462aae8c732029c2701cd9 (diff)
refactor imap fetch
-rw-r--r--changes/feature_3423_refactor_imap_fetch1
-rw-r--r--src/leap/mail/imap/fetch.py217
2 files changed, 153 insertions, 65 deletions
diff --git a/changes/feature_3423_refactor_imap_fetch b/changes/feature_3423_refactor_imap_fetch
new file mode 100644
index 0000000..cacceef
--- /dev/null
+++ b/changes/feature_3423_refactor_imap_fetch
@@ -0,0 +1 @@
+ o Refactor imap fetch code for better defer handling. Closes: #3423
diff --git a/src/leap/mail/imap/fetch.py b/src/leap/mail/imap/fetch.py
index 2b25d82..8b29c5e 100644
--- a/src/leap/mail/imap/fetch.py
+++ b/src/leap/mail/imap/fetch.py
@@ -20,10 +20,10 @@ Incoming mail fetcher.
import logging
import json
import ssl
+import threading
import time
from twisted.python import log
-from twisted.internet import defer
from twisted.internet.task import LoopingCall
from twisted.internet.threads import deferToThread
@@ -53,6 +53,8 @@ class LeapIncomingMail(object):
INCOMING_KEY = "incoming"
CONTENT_KEY = "content"
+ fetching_lock = threading.Lock()
+
def __init__(self, keymanager, soledad, imap_account,
check_period):
@@ -95,6 +97,10 @@ class LeapIncomingMail(object):
"""
self._soledad.create_index("just-mail", "incoming")
+ #
+ # Public API: fetch, start_loop, stop.
+ #
+
def fetch(self):
"""
Fetch incoming mail, to be called periodically.
@@ -102,9 +108,13 @@ class LeapIncomingMail(object):
Calls a deferred that will execute the fetch callback
in a separate thread
"""
- d = deferToThread(self._sync_soledad)
- d.addCallbacks(self._process_doclist, self._sync_soledad_err)
- return d
+ if not self.fetching_lock.locked():
+ d = deferToThread(self._sync_soledad)
+ d.addCallbacks(self._signal_fetch_to_ui, self._sync_soledad_error)
+ d.addCallbacks(self._process_doclist, self._sync_soledad_error)
+ return d
+ else:
+ logger.debug("Already fetching mail.")
def start_loop(self):
"""
@@ -117,52 +127,113 @@ class LeapIncomingMail(object):
"""
Stops the loop that fetches mail.
"""
+ # XXX should cancel ongoing fetches too.
if self._loop and self._loop.running is True:
self._loop.stop()
+ #
+ # Private methods.
+ #
+
+ # synchronize incoming mail
+
def _sync_soledad(self):
- log.msg('syncing soledad...')
+ """
+ Synchronizes with remote soledad.
- try:
+ :returns: a list of LeapDocuments, or None.
+ :rtype: iterable or None
+ """
+ with self.fetching_lock:
+ log.msg('syncing soledad...')
self._soledad.sync()
- fetched_ts = time.mktime(time.gmtime())
doclist = self._soledad.get_from_index("just-mail", "*")
- num_mails = len(doclist)
- log.msg("there are %s mails" % (num_mails,))
- leap_events.signal(
- IMAP_FETCHED_INCOMING, str(num_mails), str(fetched_ts))
- leap_events.signal(
- IMAP_UNREAD_MAIL, str(self._inbox.getUnseenCount()))
- return doclist
- except ssl.SSLError as exc:
- logger.warning('SSL Error while syncing soledad: %r' % (exc,))
- except Exception as exc:
- logger.warning('Error while syncing soledad: %r' % (exc,))
+ return doclist
- def _sync_soledad_err(self, f):
- log.err("error syncing soledad: %s" % (f.value,))
- return f
+ def _signal_fetch_to_ui(self, doclist):
+ """
+ Sends leap events to ui.
+
+ :param doclist: iterable with msg documents.
+ :type doclist: iterable.
+ :returns: doclist
+ :rtype: iterable
+ """
+ fetched_ts = time.mktime(time.gmtime())
+ num_mails = len(doclist)
+ log.msg("there are %s mails" % (num_mails,))
+ leap_events.signal(
+ IMAP_FETCHED_INCOMING, str(num_mails), str(fetched_ts))
+ leap_events.signal(
+ IMAP_UNREAD_MAIL, str(self._inbox.getUnseenCount()))
+ return doclist
+
+ def _sync_soledad_error(self, failure):
+ """
+ Errback for sync errors.
+ """
+ # XXX should signal unrecoverable maybe.
+ err = failure.value
+ logger.error("error syncing soledad: %s" % (err,))
+ if failure.check(ssl.SSLError):
+ logger.warning('SSL Error while '
+ 'syncing soledad: %r' % (err,))
+ elif failure.check(Exception):
+ logger.warning('Unknown error while '
+ 'syncing soledad: %r' % (err,))
def _process_doclist(self, doclist):
+ """
+ Iterates through the doclist, checks if each doc
+ looks like a message, and yields a deferred that will decrypt and
+ process the message.
+
+ :param doclist: iterable with msg documents.
+ :type doclist: iterable.
+ :returns: a list of deferreds for individual messages.
+ """
log.msg('processing doclist')
if not doclist:
logger.debug("no docs found")
return
num_mails = len(doclist)
+
+ docs_cb = []
for index, doc in enumerate(doclist):
logger.debug("processing doc %d of %d: %s" % (
index, num_mails, doc))
leap_events.signal(
IMAP_MSG_PROCESSING, str(index), str(num_mails))
keys = doc.content.keys()
- if ENC_SCHEME_KEY in keys and ENC_JSON_KEY in keys:
-
- # XXX should check for _enc_scheme == "pubkey" || "none"
- # that is what incoming mail uses.
+ if self._is_msg(keys):
+ # Ok, this looks like a legit msg.
+ # Let's process it!
encdata = doc.content[ENC_JSON_KEY]
- defer.Deferred(self._decrypt_msg(doc, encdata))
+
+ # Deferred chain for individual messages
+ d = deferToThread(self._decrypt_msg, doc, encdata)
+ d.addCallback(self._process_decrypted)
+ d.addCallback(self._add_message_locally)
+ docs_cb.append(d)
else:
+ # Ooops, this does not.
logger.debug('This does not look like a proper msg.')
+ return docs_cb
+
+ #
+ # operations on individual messages
+ #
+
+ def _is_msg(self, keys):
+ """
+ Checks if the keys of a dictionary match the signature
+ of the document type we use for messages.
+
+ :param keys: iterable containing the strings to match.
+ :type keys: iterable of strings.
+ :rtype: bool
+ """
+ return ENC_SCHEME_KEY in keys and ENC_JSON_KEY in keys
def _decrypt_msg(self, doc, encdata):
log.msg('decrypting msg')
@@ -170,64 +241,80 @@ class LeapIncomingMail(object):
try:
decrdata = (self._keymanager.decrypt(
encdata, key,
- # XXX get from public method instead
- passphrase=self._soledad._passphrase))
+ passphrase=self._soledad.passphrase))
ok = True
except Exception as exc:
+ # XXX move this to errback !!!
logger.warning("Error while decrypting msg: %r" % (exc,))
decrdata = ""
ok = False
leap_events.signal(IMAP_MSG_DECRYPTED, "1" if ok else "0")
- # XXX TODO: defer this properly
- return self._process_decrypted(doc, decrdata)
+ return doc, decrdata
- def _process_decrypted(self, doc, data):
+ def _process_decrypted(self, msgtuple):
"""
Process a successfully decrypted message.
- :param doc: a SoledadDocument instance containing the incoming message
- :type doc: SoledadDocument
-
- :param data: the json-encoded, decrypted content of the incoming
- message
- :type data: str
-
- :param inbox: a open SoledadMailbox instance where this message is
- to be saved
- :type inbox: SoledadMailbox
+ :param msgtuple: a tuple consisting of a SoledadDocument
+ instance containing the incoming message
+ and data, the json-encoded, decrypted content of the
+ incoming message
+ :type msgtuple: (SoledadDocument, str)
+ :returns: a SoledadDocument and the processed data.
+ :rtype: (doc, data)
"""
- log.msg("processing incoming message!")
+ doc, data = msgtuple
msg = json.loads(data)
if not isinstance(msg, dict):
return False
if not msg.get(self.INCOMING_KEY, False):
return False
+
# ok, this is an incoming message
rawmsg = msg.get(self.CONTENT_KEY, None)
if not rawmsg:
return False
logger.debug('got incoming message: %s' % (rawmsg,))
+ data = self._maybe_decrypt_gpg_msg(rawmsg)
+ return doc, data
- # XXX factor out gpg bits.
- try:
- pgp_beg = "-----BEGIN PGP MESSAGE-----"
- pgp_end = "-----END PGP MESSAGE-----"
- if pgp_beg in rawmsg:
- first = rawmsg.find(pgp_beg)
- last = rawmsg.rfind(pgp_end)
- pgp_message = rawmsg[first:first+last]
-
- decrdata = (self._keymanager.decrypt(
- pgp_message, self._pkey,
- # XXX get from public method instead
- passphrase=self._soledad._passphrase))
- rawmsg = rawmsg.replace(pgp_message, decrdata)
- # add to inbox and delete from soledad
- self._inbox.addMessage(rawmsg, (self.RECENT_FLAG,))
- leap_events.signal(IMAP_MSG_SAVED_LOCALLY)
- doc_id = doc.doc_id
- self._soledad.delete_doc(doc)
- log.msg("deleted doc %s from incoming" % doc_id)
- leap_events.signal(IMAP_MSG_DELETED_INCOMING)
- except Exception as e:
- logger.error("Problem processing incoming mail: %r" % (e,))
+ def _maybe_decrypt_gpg_msg(self, data):
+ """
+ Tries to decrypt a gpg message if data looks like one.
+
+ :param data: the text to be decrypted.
+ :type data: str
+ :return: data, possibly descrypted.
+ :rtype: str
+ """
+ PGP_BEGIN = "-----BEGIN PGP MESSAGE-----"
+ PGP_END = "-----END PGP MESSAGE-----"
+ if PGP_BEGIN in data:
+ begin = data.find(PGP_BEGIN)
+ end = data.rfind(PGP_END)
+ pgp_message = data[begin:begin+end]
+
+ decrdata = (self._keymanager.decrypt(
+ pgp_message, self._pkey,
+ passphrase=self._soledad.passphrase))
+ data = data.replace(pgp_message, decrdata)
+ return data
+
+ def _add_message_locally(self, msgtuple):
+ """
+ Adds a message to local inbox and delete it from the incoming db
+ in soledad.
+
+ :param msgtuple: a tuple consisting of a SoledadDocument
+ instance containing the incoming message
+ and data, the json-encoded, decrypted content of the
+ incoming message
+ :type msgtuple: (SoledadDocument, str)
+ """
+ doc, data = msgtuple
+ self._inbox.addMessage(data, (self.RECENT_FLAG,))
+ leap_events.signal(IMAP_MSG_SAVED_LOCALLY)
+ doc_id = doc.doc_id
+ self._soledad.delete_doc(doc)
+ log.msg("deleted doc %s from incoming" % doc_id)
+ leap_events.signal(IMAP_MSG_DELETED_INCOMING)