From 00e6971e10ce1745ad9ccb1c2b87cdf703e18aee Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 31 Jan 2014 14:50:16 -0400 Subject: keep processing after decoding errors during fetch --- mail/src/leap/mail/imap/fetch.py | 186 ++++++++++++++++++++++++++------------ mail/src/leap/mail/imap/fields.py | 15 +++ 2 files changed, 145 insertions(+), 56 deletions(-) (limited to 'mail') diff --git a/mail/src/leap/mail/imap/fetch.py b/mail/src/leap/mail/imap/fetch.py index 40dadb3a..6e12b3fa 100644 --- a/mail/src/leap/mail/imap/fetch.py +++ b/mail/src/leap/mail/imap/fetch.py @@ -23,6 +23,7 @@ import threading import time import sys import traceback +import warnings from email.parser import Parser from email.generator import Generator @@ -32,6 +33,8 @@ from StringIO import StringIO from twisted.python import log from twisted.internet import defer from twisted.internet.task import LoopingCall +from twisted.internet.task import deferLater +from u1db import errors as u1db_errors from zope.proxy import sameProxiedObjects from leap.common import events as leap_events @@ -46,7 +49,8 @@ from leap.common.mail import get_email_charset from leap.keymanager import errors as keymanager_errors from leap.keymanager.openpgp import OpenPGPKey from leap.mail.decorators import deferred_to_thread -from leap.mail.utils import json_loads +from leap.mail.imap.fields import fields +from leap.mail.utils import json_loads, empty, first from leap.soledad.client import Soledad from leap.soledad.common.crypto import ENC_SCHEME_KEY, ENC_JSON_KEY @@ -80,8 +84,6 @@ class LeapIncomingMail(object): """ RECENT_FLAG = "\\Recent" - - INCOMING_KEY = "incoming" CONTENT_KEY = "content" LEAP_SIGNATURE_HEADER = 'X-Leap-Signature' @@ -130,17 +132,9 @@ class LeapIncomingMail(object): self._loop = None self._check_period = check_period - self._create_soledad_indexes() - # initialize a mail parser only once self._parser = Parser() - def _create_soledad_indexes(self): - """ - Create needed indexes on soledad. - """ - self._soledad.create_index("just-mail", "incoming") - @property def _pkey(self): if sameProxiedObjects(self._keymanager, None): @@ -159,13 +153,29 @@ class LeapIncomingMail(object): Calls a deferred that will execute the fetch callback in a separate thread """ + def syncSoledadCallback(result): + # FIXME this needs a matching change in mx!!! + # --> need to add ERROR_DECRYPTING_KEY = False + # as default. + try: + doclist = self._soledad.get_from_index( + fields.JUST_MAIL_IDX, "*", "0") + except u1db_errors.InvalidGlobbing: + # It looks like we are a dealing with an outdated + # mx. Fallback to the version of the index + warnings.warn("JUST_MAIL_COMPAT_IDX will be deprecated!", + DeprecationWarning) + doclist = self._soledad.get_from_index( + fields.JUST_MAIL_COMPAT_IDX, "*") + self._process_doclist(doclist) + logger.debug("fetching mail for: %s %s" % ( self._soledad.uuid, self._userid)) if not self.fetching_lock.locked(): d1 = self._sync_soledad() d = defer.gatherResults([d1], consumeErrors=True) + d.addCallbacks(syncSoledadCallback, self._errback) d.addCallbacks(self._signal_fetch_to_ui, self._errback) - d.addCallbacks(self._signal_unread_to_ui, self._errback) return d else: logger.debug("Already fetching mail.") @@ -202,46 +212,44 @@ class LeapIncomingMail(object): @deferred_to_thread def _sync_soledad(self): """ - Synchronizes with remote soledad. + Synchronize with remote soledad. :returns: a list of LeapDocuments, or None. :rtype: iterable or None """ with self.fetching_lock: - log.msg('syncing soledad...') + log.msg('FETCH: syncing soledad...') self._soledad.sync() - log.msg('soledad synced.') - doclist = self._soledad.get_from_index("just-mail", "*") - self._process_doclist(doclist) - - def _signal_unread_to_ui(self, *args): - """ - Sends unread event to ui. - """ - leap_events.signal( - IMAP_UNREAD_MAIL, str(self._inbox.getUnseenCount())) + log.msg('FETCH soledad SYNCED.') def _signal_fetch_to_ui(self, doclist): """ - Sends leap events to ui. + Send leap events to ui. :param doclist: iterable with msg documents. :type doclist: iterable. :returns: doclist :rtype: iterable """ - doclist = doclist[0] # gatherResults pass us a list - fetched_ts = time.mktime(time.gmtime()) - num_mails = len(doclist) if doclist is not None else 0 - if num_mails != 0: - log.msg("there are %s mails" % (num_mails,)) + doclist = first(doclist) # gatherResults pass us a list + if doclist: + fetched_ts = time.mktime(time.gmtime()) + num_mails = len(doclist) if doclist is not None else 0 + if num_mails != 0: + log.msg("there are %s mails" % (num_mails,)) + leap_events.signal( + IMAP_FETCHED_INCOMING, str(num_mails), str(fetched_ts)) + return doclist + + def _signal_unread_to_ui(self, *args): + """ + Sends unread event to ui. + """ leap_events.signal( - IMAP_FETCHED_INCOMING, str(num_mails), str(fetched_ts)) - return doclist + IMAP_UNREAD_MAIL, str(self._inbox.getUnseenCount())) # process incoming mail. - @defer.inlineCallbacks def _process_doclist(self, doclist): """ Iterates through the doclist, checks if each doc @@ -262,22 +270,36 @@ class LeapIncomingMail(object): logger.debug("processing doc %d of %d" % (index + 1, num_mails)) leap_events.signal( IMAP_MSG_PROCESSING, str(index), str(num_mails)) + keys = doc.content.keys() - if self._is_msg(keys): - # Ok, this looks like a legit msg. + + # TODO Compatibility check with the index in pre-0.6 mx + # that does not write the ERROR_DECRYPTING_KEY + # This should be removed in 0.7 + + has_errors = doc.content.get(fields.ERROR_DECRYPTING_KEY, None) + if has_errors is None: + warnings.warn("JUST_MAIL_COMPAT_IDX will be deprecated!", + DeprecationWarning) + if has_errors: + logger.debug("skipping msg with decrypting errors...") + + if self._is_msg(keys) and not has_errors: + # Evaluating to bool of has_errors is intentional here. + # We don't mind at this point if it's None or False. + + # Ok, this looks like a legit msg, and with no errors. # Let's process it! - decrypted = list(self._decrypt_doc(doc))[0] - res = self._add_message_locally(decrypted) - yield res - else: - # Ooops, this does not. - logger.debug('This does not look like a proper msg.') + d1 = self._decrypt_doc(doc) + d = defer.gatherResults([d1], consumeErrors=True) + d.addCallbacks(self._add_message_locally, self._errback) # # operations on individual messages # + @deferred_to_thread def _decrypt_doc(self, doc): """ Decrypt the contents of a document. @@ -302,8 +324,8 @@ class LeapIncomingMail(object): decrdata = "" leap_events.signal(IMAP_MSG_DECRYPTED, "1" if success else "0") - data = list(self._process_decrypted_doc((doc, decrdata))) - yield (doc, data) + data = self._process_decrypted_doc((doc, decrdata)) + return (doc, data) def _process_decrypted_doc(self, msgtuple): """ @@ -319,11 +341,35 @@ class LeapIncomingMail(object): """ log.msg('processing decrypted doc') doc, data = msgtuple - msg = json_loads(data) + + from twisted.internet import reactor + + # XXX turn this into an errBack for each one of + # the deferreds that would process an individual document + try: + msg = json_loads(data) + except UnicodeError as exc: + logger.error("Error while decrypting %s" % (doc.doc_id,)) + logger.exception(exc) + + # we flag the message as "with decrypting errors", + # to avoid further decryption attempts during sync + # cycles until we're prepared to deal with that. + # What is the same, when Ivan deals with it... + # A new decrypting attempt event could be triggered by a + # future a library upgrade, or a cli flag to the client, + # we just `defer` that for now... :) + doc.content[fields.ERROR_DECRYPTING_KEY] = True + deferLater(reactor, 0, self._update_incoming_message, doc) + + # FIXME this is just a dirty hack to delay the proper + # deferred organization here... + # and remember, boys, do not do this at home. + return [] if not isinstance(msg, dict): defer.returnValue(False) - if not msg.get(self.INCOMING_KEY, False): + if not msg.get(fields.INCOMING_KEY, False): defer.returnValue(False) # ok, this is an incoming message @@ -332,6 +378,27 @@ class LeapIncomingMail(object): return False return self._maybe_decrypt_msg(rawmsg) + @deferred_to_thread + def _update_incoming_message(self, doc): + """ + Do a put for a soledad document. This probably has been called only + in the case that we've needed to update the ERROR_DECRYPTING_KEY + flag in an incoming message, to get it out of the decrypting queue. + + :param doc: the SoledadDocument to update + :type doc: SoledadDocument + """ + log.msg("Updating SoledadDoc %s" % (doc.doc_id)) + self._soledad.put_doc(doc) + + @deferred_to_thread + def _delete_incoming_message(self, doc): + """ + Delete document. + """ + log.msg("Deleting SoledadDoc %s" % (doc.doc_id)) + self._soledad.delete_doc(doc) + def _maybe_decrypt_msg(self, data): """ Tries to decrypt a gpg message if data looks like one. @@ -384,7 +451,7 @@ class LeapIncomingMail(object): self.LEAP_SIGNATURE_INVALID, pubkey=senderPubkey.key_id) - yield decrmsg.as_string() + return decrmsg.as_string() def _decrypt_multipart_encrypted_msg(self, msg, encoding, senderPubkey): """ @@ -505,32 +572,39 @@ class LeapIncomingMail(object): data, self._pkey) return (decrdata, valid_sig) - def _add_message_locally(self, msgtuple): + def _add_message_locally(self, result): """ Adds a message to local inbox and delete it from the incoming db in soledad. + # XXX this comes from a gatherresult... :param msgtuple: a tuple consisting of a SoledadDocument instance containing the incoming message and data, the json-encoded, decrypted content of the incoming message :type msgtuple: (SoledadDocument, str) """ - log.msg('adding message to local db') + from twisted.internet import reactor + msgtuple = first(result) + doc, data = msgtuple + log.msg('adding message %s to local db' % (doc.doc_id,)) if isinstance(data, list): + if empty(data): + return False data = data[0] - self._inbox.addMessage(data, flags=(self.RECENT_FLAG,)) + def msgSavedCallback(result): + if not empty(result): + leap_events.signal(IMAP_MSG_SAVED_LOCALLY) + deferLater(reactor, 0, self._delete_incoming_message, result) + leap_events.signal(IMAP_MSG_DELETED_INCOMING) + deferLater(reactor, 1, self._signal_unread_to_ui) - leap_events.signal(IMAP_MSG_SAVED_LOCALLY) - doc_id = doc.doc_id - self._soledad.delete_doc(doc) - log.msg("deleted doc %s from incoming" % doc_id) - leap_events.signal(IMAP_MSG_DELETED_INCOMING) - self._signal_unread_to_ui() - return True + # XXX should pass a notify_on_disk=True along... + d = self._inbox.addMessage(data, flags=(self.RECENT_FLAG,)) + d.addCallbacks(msgSavedCallback, self._errback) # # helpers diff --git a/mail/src/leap/mail/imap/fields.py b/mail/src/leap/mail/imap/fields.py index 886ee631..45769391 100644 --- a/mail/src/leap/mail/imap/fields.py +++ b/mail/src/leap/mail/imap/fields.py @@ -108,6 +108,14 @@ class WithMsgFields(object): # correct since the recent flag is volatile. TYPE_MBOX_RECT_SEEN_IDX = 'by-type-and-mbox-and-recent-and-seen' + # Soledad index for incoming mail, without decrypting errors. + JUST_MAIL_IDX = "just-mail" + # XXX the backward-compatible index, will be deprecated at 0.7 + JUST_MAIL_COMPAT_IDX = "just-mail-compat" + + INCOMING_KEY = "incoming" + ERROR_DECRYPTING_KEY = "errdecr" + KTYPE = TYPE_KEY MBOX_VAL = TYPE_MBOX_VAL CHASH_VAL = CONTENT_HASH_KEY @@ -140,6 +148,13 @@ class WithMsgFields(object): TYPE_MBOX_DEL_IDX: [KTYPE, MBOX_VAL, 'bool(deleted)'], TYPE_MBOX_RECT_SEEN_IDX: [KTYPE, MBOX_VAL, 'bool(recent)', 'bool(seen)'], + + # incoming queue + JUST_MAIL_IDX: [INCOMING_KEY, + "bool(%s)" % (ERROR_DECRYPTING_KEY,)], + + # the backward-compatible index, will be deprecated at 0.7 + JUST_MAIL_COMPAT_IDX: [INCOMING_KEY], } MBOX_KEY = MBOX_VAL -- cgit v1.2.3 From 94a7d28481bf2cced52126e20df2721952deb970 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 11 Mar 2014 10:38:59 -0400 Subject: changes file --- mail/changes/bug_5307_keep-processing | 1 + 1 file changed, 1 insertion(+) create mode 100644 mail/changes/bug_5307_keep-processing (limited to 'mail') diff --git a/mail/changes/bug_5307_keep-processing b/mail/changes/bug_5307_keep-processing new file mode 100644 index 00000000..7194adf1 --- /dev/null +++ b/mail/changes/bug_5307_keep-processing @@ -0,0 +1 @@ + o Keep processing after a decryption error. Closes: #5307 -- cgit v1.2.3