diff options
Diffstat (limited to 'src/leap/mail/imap/fetch.py')
-rw-r--r-- | src/leap/mail/imap/fetch.py | 400 |
1 files changed, 239 insertions, 161 deletions
diff --git a/src/leap/mail/imap/fetch.py b/src/leap/mail/imap/fetch.py index 14f7a9b..0a97752 100644 --- a/src/leap/mail/imap/fetch.py +++ b/src/leap/mail/imap/fetch.py @@ -17,21 +17,24 @@ """ Incoming mail fetcher. """ +import copy import logging -import json -import ssl import threading import time -import copy -from StringIO import StringIO +import sys +import traceback +import warnings from email.parser import Parser from email.generator import Generator from email.utils import parseaddr +from StringIO import StringIO from twisted.python import log +from twisted.internet import defer from twisted.internet.task import LoopingCall -from twisted.internet.threads import deferToThread +from twisted.internet.task import deferLater +from u1db import errors as u1db_errors from zope.proxy import sameProxiedObjects from leap.common import events as leap_events @@ -42,15 +45,25 @@ from leap.common.events.events_pb2 import IMAP_MSG_DECRYPTED from leap.common.events.events_pb2 import IMAP_MSG_SAVED_LOCALLY from leap.common.events.events_pb2 import IMAP_MSG_DELETED_INCOMING from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL +from leap.common.events.events_pb2 import SOLEDAD_INVALID_AUTH_TOKEN from leap.common.mail import get_email_charset from leap.keymanager import errors as keymanager_errors from leap.keymanager.openpgp import OpenPGPKey +from leap.mail.decorators import deferred_to_thread +from leap.mail.imap.fields import fields +from leap.mail.utils import json_loads, empty, first from leap.soledad.client import Soledad from leap.soledad.common.crypto import ENC_SCHEME_KEY, ENC_JSON_KEY +from leap.soledad.common.errors import InvalidAuthTokenError logger = logging.getLogger(__name__) +MULTIPART_ENCRYPTED = "multipart/encrypted" +MULTIPART_SIGNED = "multipart/signed" +PGP_BEGIN = "-----BEGIN PGP MESSAGE-----" +PGP_END = "-----END PGP MESSAGE-----" + class MalformedMessage(Exception): """ @@ -73,8 +86,6 @@ class LeapIncomingMail(object): """ RECENT_FLAG = "\\Recent" - - INCOMING_KEY = "incoming" CONTENT_KEY = "content" LEAP_SIGNATURE_HEADER = 'X-Leap-Signature' @@ -123,13 +134,8 @@ class LeapIncomingMail(object): self._loop = None self._check_period = check_period - self._create_soledad_indexes() - - def _create_soledad_indexes(self): - """ - Create needed indexes on soledad. - """ - self._soledad.create_index("just-mail", "incoming") + # initialize a mail parser only once + self._parser = Parser() @property def _pkey(self): @@ -149,12 +155,29 @@ class LeapIncomingMail(object): Calls a deferred that will execute the fetch callback in a separate thread """ + def syncSoledadCallback(result): + # FIXME this needs a matching change in mx!!! + # --> need to add ERROR_DECRYPTING_KEY = False + # as default. + try: + doclist = self._soledad.get_from_index( + fields.JUST_MAIL_IDX, "*", "0") + except u1db_errors.InvalidGlobbing: + # It looks like we are a dealing with an outdated + # mx. Fallback to the version of the index + warnings.warn("JUST_MAIL_COMPAT_IDX will be deprecated!", + DeprecationWarning) + doclist = self._soledad.get_from_index( + fields.JUST_MAIL_COMPAT_IDX, "*") + self._process_doclist(doclist) + logger.debug("fetching mail for: %s %s" % ( self._soledad.uuid, self._userid)) if not self.fetching_lock.locked(): - d = deferToThread(self._sync_soledad) - d.addCallbacks(self._signal_fetch_to_ui, self._sync_soledad_error) - d.addCallbacks(self._process_doclist, self._sync_soledad_error) + d1 = self._sync_soledad() + d = defer.gatherResults([d1], consumeErrors=True) + d.addCallbacks(syncSoledadCallback, self._errback) + d.addCallbacks(self._signal_fetch_to_ui, self._errback) return d else: logger.debug("Already fetching mail.") @@ -184,81 +207,53 @@ class LeapIncomingMail(object): # synchronize incoming mail + def _errback(self, failure): + logger.exception(failure.value) + traceback.print_tb(*sys.exc_info()) + + @deferred_to_thread def _sync_soledad(self): """ - Synchronizes with remote soledad. + Synchronize with remote soledad. :returns: a list of LeapDocuments, or None. :rtype: iterable or None """ with self.fetching_lock: - log.msg('syncing soledad...') - self._soledad.sync() - log.msg('soledad synced.') - doclist = self._soledad.get_from_index("just-mail", "*") - - return doclist - - def _signal_unread_to_ui(self): - """ - Sends unread event to ui. - """ - leap_events.signal( - IMAP_UNREAD_MAIL, str(self._inbox.getUnseenCount())) + try: + log.msg('FETCH: syncing soledad...') + self._soledad.sync() + log.msg('FETCH soledad SYNCED.') + except InvalidAuthTokenError: + # if the token is invalid, send an event so the GUI can + # disable mail and show an error message. + leap_events.signal(SOLEDAD_INVALID_AUTH_TOKEN) def _signal_fetch_to_ui(self, doclist): """ - Sends leap events to ui. + Send leap events to ui. :param doclist: iterable with msg documents. :type doclist: iterable. :returns: doclist :rtype: iterable """ - fetched_ts = time.mktime(time.gmtime()) - num_mails = len(doclist) - log.msg("there are %s mails" % (num_mails,)) - leap_events.signal( - IMAP_FETCHED_INCOMING, str(num_mails), str(fetched_ts)) - self._signal_unread_to_ui() - return doclist - - def _sync_soledad_error(self, failure): - """ - Errback for sync errors. - """ - # XXX should signal unrecoverable maybe. - err = failure.value - logger.error("error syncing soledad: %s" % (err,)) - if failure.check(ssl.SSLError): - logger.warning('SSL Error while ' - 'syncing soledad: %r' % (err,)) - elif failure.check(Exception): - logger.warning('Unknown error while ' - 'syncing soledad: %r' % (err,)) - - def _log_err(self, failure): - """ - Generic errback - """ - err = failure.value - logger.exception("error!: %r" % (err,)) - - def _decryption_error(self, failure): - """ - Errback for decryption errors. - """ - # XXX should signal unrecoverable maybe. - err = failure.value - logger.error("error decrypting msg: %s" % (err,)) + doclist = first(doclist) # gatherResults pass us a list + if doclist: + fetched_ts = time.mktime(time.gmtime()) + num_mails = len(doclist) if doclist is not None else 0 + if num_mails != 0: + log.msg("there are %s mails" % (num_mails,)) + leap_events.signal( + IMAP_FETCHED_INCOMING, str(num_mails), str(fetched_ts)) + return doclist - def _saving_error(self, failure): + def _signal_unread_to_ui(self, *args): """ - Errback for local save errors. + Sends unread event to ui. """ - # XXX should signal unrecoverable maybe. - err = failure.value - logger.error("error saving msg locally: %s" % (err,)) + leap_events.signal( + IMAP_UNREAD_MAIL, str(self._inbox.getUnseenCount())) # process incoming mail. @@ -278,44 +273,40 @@ class LeapIncomingMail(object): return num_mails = len(doclist) - docs_cb = [] for index, doc in enumerate(doclist): logger.debug("processing doc %d of %d" % (index + 1, num_mails)) leap_events.signal( IMAP_MSG_PROCESSING, str(index), str(num_mails)) + keys = doc.content.keys() - if self._is_msg(keys): - # Ok, this looks like a legit msg. + + # TODO Compatibility check with the index in pre-0.6 mx + # that does not write the ERROR_DECRYPTING_KEY + # This should be removed in 0.7 + + has_errors = doc.content.get(fields.ERROR_DECRYPTING_KEY, None) + if has_errors is None: + warnings.warn("JUST_MAIL_COMPAT_IDX will be deprecated!", + DeprecationWarning) + if has_errors: + logger.debug("skipping msg with decrypting errors...") + + if self._is_msg(keys) and not has_errors: + # Evaluating to bool of has_errors is intentional here. + # We don't mind at this point if it's None or False. + + # Ok, this looks like a legit msg, and with no errors. # Let's process it! - # Deferred chain for individual messages - - # XXX use an IConsumer instead... ? - d = deferToThread(self._decrypt_doc, doc) - d.addCallback(self._process_decrypted_doc) - d.addErrback(self._log_err) - d.addCallback(self._add_message_locally) - d.addErrback(self._log_err) - docs_cb.append(d) - else: - # Ooops, this does not. - logger.debug('This does not look like a proper msg.') - return docs_cb + + d1 = self._decrypt_doc(doc) + d = defer.gatherResults([d1], consumeErrors=True) + d.addCallbacks(self._add_message_locally, self._errback) # # operations on individual messages # - def _is_msg(self, keys): - """ - Checks if the keys of a dictionary match the signature - of the document type we use for messages. - - :param keys: iterable containing the strings to match. - :type keys: iterable of strings. - :rtype: bool - """ - return ENC_SCHEME_KEY in keys and ENC_JSON_KEY in keys - + @deferred_to_thread def _decrypt_doc(self, doc): """ Decrypt the contents of a document. @@ -339,7 +330,9 @@ class LeapIncomingMail(object): logger.error("Error while decrypting msg: %r" % (exc,)) decrdata = "" leap_events.signal(IMAP_MSG_DECRYPTED, "1" if success else "0") - return doc, decrdata + + data = self._process_decrypted_doc((doc, decrdata)) + return (doc, data) def _process_decrypted_doc(self, msgtuple): """ @@ -355,54 +348,105 @@ class LeapIncomingMail(object): """ log.msg('processing decrypted doc') doc, data = msgtuple - msg = json.loads(data) + + from twisted.internet import reactor + + # XXX turn this into an errBack for each one of + # the deferreds that would process an individual document + try: + msg = json_loads(data) + except UnicodeError as exc: + logger.error("Error while decrypting %s" % (doc.doc_id,)) + logger.exception(exc) + + # we flag the message as "with decrypting errors", + # to avoid further decryption attempts during sync + # cycles until we're prepared to deal with that. + # What is the same, when Ivan deals with it... + # A new decrypting attempt event could be triggered by a + # future a library upgrade, or a cli flag to the client, + # we just `defer` that for now... :) + doc.content[fields.ERROR_DECRYPTING_KEY] = True + deferLater(reactor, 0, self._update_incoming_message, doc) + + # FIXME this is just a dirty hack to delay the proper + # deferred organization here... + # and remember, boys, do not do this at home. + return [] + if not isinstance(msg, dict): - return False - if not msg.get(self.INCOMING_KEY, False): - return False + defer.returnValue(False) + if not msg.get(fields.INCOMING_KEY, False): + defer.returnValue(False) # ok, this is an incoming message rawmsg = msg.get(self.CONTENT_KEY, None) if not rawmsg: return False - data = self._maybe_decrypt_msg(rawmsg) - return doc, data + return self._maybe_decrypt_msg(rawmsg) + + @deferred_to_thread + def _update_incoming_message(self, doc): + """ + Do a put for a soledad document. This probably has been called only + in the case that we've needed to update the ERROR_DECRYPTING_KEY + flag in an incoming message, to get it out of the decrypting queue. + + :param doc: the SoledadDocument to update + :type doc: SoledadDocument + """ + log.msg("Updating SoledadDoc %s" % (doc.doc_id)) + self._soledad.put_doc(doc) + + @deferred_to_thread + def _delete_incoming_message(self, doc): + """ + Delete document. + + :param doc: the SoledadDocument to delete + :type doc: SoledadDocument + """ + log.msg("Deleting Incoming message: %s" % (doc.doc_id,)) + self._soledad.delete_doc(doc) def _maybe_decrypt_msg(self, data): """ Tries to decrypt a gpg message if data looks like one. :param data: the text to be decrypted. - :type data: unicode + :type data: str :return: data, possibly descrypted. :rtype: str """ + leap_assert_type(data, str) log.msg('maybe decrypting doc') - leap_assert_type(data, unicode) # parse the original message - parser = Parser() encoding = get_email_charset(data) - data = data.encode(encoding) - msg = parser.parsestr(data) + msg = self._parser.parsestr(data) # try to obtain sender public key senderPubkey = None fromHeader = msg.get('from', None) - if fromHeader is not None: + if (fromHeader is not None + and (msg.get_content_type() == MULTIPART_ENCRYPTED + or msg.get_content_type() == MULTIPART_SIGNED)): _, senderAddress = parseaddr(fromHeader) try: - senderPubkey = self._keymanager.get_key( + senderPubkey = self._keymanager.get_key_from_cache( senderAddress, OpenPGPKey) except keymanager_errors.KeyNotFound: pass valid_sig = False # we will add a header saying if sig is valid - if msg.get_content_type() == 'multipart/encrypted': - decrmsg, valid_sig = self._decrypt_multipart_encrypted_msg( + decrypt_multi = self._decrypt_multipart_encrypted_msg + decrypt_inline = self._maybe_decrypt_inline_encrypted_msg + + if msg.get_content_type() == MULTIPART_ENCRYPTED: + decrmsg, valid_sig = decrypt_multi( msg, encoding, senderPubkey) else: - decrmsg, valid_sig = self._maybe_decrypt_inline_encrypted_msg( + decrmsg, valid_sig = decrypt_inline( msg, encoding, senderPubkey) # add x-leap-signature header @@ -435,25 +479,12 @@ class LeapIncomingMail(object): """ log.msg('decrypting multipart encrypted msg') msg = copy.deepcopy(msg) - # sanity check - payload = msg.get_payload() - if len(payload) != 2: - raise MalformedMessage( - 'Multipart/encrypted messages should have exactly 2 body ' - 'parts (instead of %d).' % len(payload)) - if payload[0].get_content_type() != 'application/pgp-encrypted': - raise MalformedMessage( - "Multipart/encrypted messages' first body part should " - "have content type equal to 'application/pgp-encrypted' " - "(instead of %s)." % payload[0].get_content_type()) - if payload[1].get_content_type() != 'application/octet-stream': - raise MalformedMessage( - "Multipart/encrypted messages' second body part should " - "have content type equal to 'octet-stream' (instead of " - "%s)." % payload[1].get_content_type()) + self._msg_multipart_sanity_check(msg) + # parse message and get encrypted content pgpencmsg = msg.get_payload()[1] encdata = pgpencmsg.get_payload() + # decrypt or fail gracefully try: decrdata, valid_sig = self._decrypt_and_verify_data( @@ -461,17 +492,13 @@ class LeapIncomingMail(object): except keymanager_errors.DecryptError as e: logger.warning('Failed to decrypt encrypted message (%s). ' 'Storing message without modifications.' % str(e)) - return msg, False # return original message - # decrypted successully, now fix encoding and parse - try: - decrdata = decrdata.encode(encoding) - except (UnicodeEncodeError, UnicodeDecodeError) as e: - logger.error("Unicode error {0}".format(e)) - decrdata = decrdata.encode(encoding, 'replace') - parser = Parser() - decrmsg = parser.parsestr(decrdata) + # Bailing out! + return (msg, False) + + decrmsg = self._parser.parsestr(decrdata) # remove original message's multipart/encrypted content-type del(msg['content-type']) + # replace headers back in original message for hkey, hval in decrmsg.items(): try: @@ -479,9 +506,10 @@ class LeapIncomingMail(object): msg.replace_header(hkey, hval) except KeyError: msg[hkey] = hval - # replace payload by unencrypted payload + + # all ok, replace payload by unencrypted payload msg.set_payload(decrmsg.get_payload()) - return msg, valid_sig + return (msg, valid_sig) def _maybe_decrypt_inline_encrypted_msg(self, origmsg, encoding, senderPubkey): @@ -495,8 +523,9 @@ class LeapIncomingMail(object): :param senderPubkey: The key of the sender of the message. :type senderPubkey: OpenPGPKey - :return: A unitary tuple containing a decrypted message. - :rtype: (Message) + :return: A tuple containing a decrypted message and + a bool indicating whether the signature is valid. + :rtype: (Message, bool) """ log.msg('maybe decrypting inline encrypted msg') # serialize the original message @@ -505,13 +534,11 @@ class LeapIncomingMail(object): g.flatten(origmsg) data = buf.getvalue() # handle exactly one inline PGP message - PGP_BEGIN = "-----BEGIN PGP MESSAGE-----" - PGP_END = "-----END PGP MESSAGE-----" valid_sig = False if PGP_BEGIN in data: begin = data.find(PGP_BEGIN) end = data.find(PGP_END) - pgp_message = data[begin:end+len(PGP_END)] + pgp_message = data[begin:end + len(PGP_END)] try: decrdata, valid_sig = self._decrypt_and_verify_data( pgp_message, senderPubkey) @@ -520,11 +547,11 @@ class LeapIncomingMail(object): except keymanager_errors.DecryptError: logger.warning('Failed to decrypt potential inline encrypted ' 'message. Storing message as is...') + # if message is not encrypted, return raw data if isinstance(data, unicode): data = data.encode(encoding, 'replace') - parser = Parser() - return parser.parsestr(data), valid_sig + return (self._parser.parsestr(data), valid_sig) def _decrypt_and_verify_data(self, data, senderPubkey): """ @@ -553,25 +580,76 @@ class LeapIncomingMail(object): except keymanager_errors.InvalidSignature: decrdata = self._keymanager.decrypt( data, self._pkey) - return decrdata, valid_sig + return (decrdata, valid_sig) - def _add_message_locally(self, msgtuple): + def _add_message_locally(self, result): """ Adds a message to local inbox and delete it from the incoming db in soledad. + # XXX this comes from a gatherresult... :param msgtuple: a tuple consisting of a SoledadDocument instance containing the incoming message and data, the json-encoded, decrypted content of the incoming message :type msgtuple: (SoledadDocument, str) """ - log.msg('adding message to local db') + from twisted.internet import reactor + msgtuple = first(result) + doc, data = msgtuple - self._inbox.addMessage(data, (self.RECENT_FLAG,)) - leap_events.signal(IMAP_MSG_SAVED_LOCALLY) - doc_id = doc.doc_id - self._soledad.delete_doc(doc) - log.msg("deleted doc %s from incoming" % doc_id) - leap_events.signal(IMAP_MSG_DELETED_INCOMING) - self._signal_unread_to_ui() + log.msg('adding message %s to local db' % (doc.doc_id,)) + + if isinstance(data, list): + if empty(data): + return False + data = data[0] + + def msgSavedCallback(result): + if not empty(result): + leap_events.signal(IMAP_MSG_SAVED_LOCALLY) + deferLater(reactor, 0, self._delete_incoming_message, doc) + leap_events.signal(IMAP_MSG_DELETED_INCOMING) + + d = self._inbox.addMessage(data, flags=(self.RECENT_FLAG,), + notify_on_disk=True) + d.addCallbacks(msgSavedCallback, self._errback) + + # + # helpers + # + + def _msg_multipart_sanity_check(self, msg): + """ + Performs a sanity check against a multipart encrypted msg + + :param msg: The original encrypted message. + :type msg: Message + """ + # sanity check + payload = msg.get_payload() + if len(payload) != 2: + raise MalformedMessage( + 'Multipart/encrypted messages should have exactly 2 body ' + 'parts (instead of %d).' % len(payload)) + if payload[0].get_content_type() != 'application/pgp-encrypted': + raise MalformedMessage( + "Multipart/encrypted messages' first body part should " + "have content type equal to 'application/pgp-encrypted' " + "(instead of %s)." % payload[0].get_content_type()) + if payload[1].get_content_type() != 'application/octet-stream': + raise MalformedMessage( + "Multipart/encrypted messages' second body part should " + "have content type equal to 'octet-stream' (instead of " + "%s)." % payload[1].get_content_type()) + + def _is_msg(self, keys): + """ + Checks if the keys of a dictionary match the signature + of the document type we use for messages. + + :param keys: iterable containing the strings to match. + :type keys: iterable of strings. + :rtype: bool + """ + return ENC_SCHEME_KEY in keys and ENC_JSON_KEY in keys |