diff options
Diffstat (limited to 'src/leap/mail/imap/fetch.py')
-rw-r--r-- | src/leap/mail/imap/fetch.py | 655 |
1 files changed, 0 insertions, 655 deletions
diff --git a/src/leap/mail/imap/fetch.py b/src/leap/mail/imap/fetch.py deleted file mode 100644 index 0a97752..0000000 --- a/src/leap/mail/imap/fetch.py +++ /dev/null @@ -1,655 +0,0 @@ -# -*- coding: utf-8 -*- -# fetch.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -Incoming mail fetcher. -""" -import copy -import logging -import threading -import time -import sys -import traceback -import warnings - -from email.parser import Parser -from email.generator import Generator -from email.utils import parseaddr -from StringIO import StringIO - -from twisted.python import log -from twisted.internet import defer -from twisted.internet.task import LoopingCall -from twisted.internet.task import deferLater -from u1db import errors as u1db_errors -from zope.proxy import sameProxiedObjects - -from leap.common import events as leap_events -from leap.common.check import leap_assert, leap_assert_type -from leap.common.events.events_pb2 import IMAP_FETCHED_INCOMING -from leap.common.events.events_pb2 import IMAP_MSG_PROCESSING -from leap.common.events.events_pb2 import IMAP_MSG_DECRYPTED -from leap.common.events.events_pb2 import IMAP_MSG_SAVED_LOCALLY -from leap.common.events.events_pb2 import IMAP_MSG_DELETED_INCOMING -from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL -from leap.common.events.events_pb2 import SOLEDAD_INVALID_AUTH_TOKEN -from leap.common.mail import get_email_charset -from leap.keymanager import errors as keymanager_errors -from leap.keymanager.openpgp import OpenPGPKey -from leap.mail.decorators import deferred_to_thread -from leap.mail.imap.fields import fields -from leap.mail.utils import json_loads, empty, first -from leap.soledad.client import Soledad -from leap.soledad.common.crypto import ENC_SCHEME_KEY, ENC_JSON_KEY -from leap.soledad.common.errors import InvalidAuthTokenError - - -logger = logging.getLogger(__name__) - -MULTIPART_ENCRYPTED = "multipart/encrypted" -MULTIPART_SIGNED = "multipart/signed" -PGP_BEGIN = "-----BEGIN PGP MESSAGE-----" -PGP_END = "-----END PGP MESSAGE-----" - - -class MalformedMessage(Exception): - """ - Raised when a given message is not well formed. - """ - pass - - -class LeapIncomingMail(object): - """ - Fetches and process mail from the incoming pool. - - This object has public methods start_loop and stop that will - actually initiate a LoopingCall with check_period recurrency. - The LoopingCall itself will invoke the fetch method each time - that the check_period expires. - - This loop will sync the soledad db with the remote server and - process all the documents found tagged as incoming mail. - """ - - RECENT_FLAG = "\\Recent" - CONTENT_KEY = "content" - - LEAP_SIGNATURE_HEADER = 'X-Leap-Signature' - """ - Header added to messages when they are decrypted by the IMAP fetcher, - which states the validity of an eventual signature that might be included - in the encrypted blob. - """ - LEAP_SIGNATURE_VALID = 'valid' - LEAP_SIGNATURE_INVALID = 'invalid' - LEAP_SIGNATURE_COULD_NOT_VERIFY = 'could not verify' - - fetching_lock = threading.Lock() - - def __init__(self, keymanager, soledad, imap_account, - check_period, userid): - - """ - Initialize LeapIncomingMail.. - - :param keymanager: a keymanager instance - :type keymanager: keymanager.KeyManager - - :param soledad: a soledad instance - :type soledad: Soledad - - :param imap_account: the account to fetch periodically - :type imap_account: SoledadBackedAccount - - :param check_period: the period to fetch new mail, in seconds. - :type check_period: int - """ - - leap_assert(keymanager, "need a keymanager to initialize") - leap_assert_type(soledad, Soledad) - leap_assert(check_period, "need a period to check incoming mail") - leap_assert_type(check_period, int) - leap_assert(userid, "need a userid to initialize") - - self._keymanager = keymanager - self._soledad = soledad - self.imapAccount = imap_account - self._inbox = self.imapAccount.getMailbox('inbox') - self._userid = userid - - self._loop = None - self._check_period = check_period - - # initialize a mail parser only once - self._parser = Parser() - - @property - def _pkey(self): - if sameProxiedObjects(self._keymanager, None): - logger.warning('tried to get key, but null keymanager found') - return None - return self._keymanager.get_key(self._userid, OpenPGPKey, private=True) - - # - # Public API: fetch, start_loop, stop. - # - - def fetch(self): - """ - Fetch incoming mail, to be called periodically. - - Calls a deferred that will execute the fetch callback - in a separate thread - """ - def syncSoledadCallback(result): - # FIXME this needs a matching change in mx!!! - # --> need to add ERROR_DECRYPTING_KEY = False - # as default. - try: - doclist = self._soledad.get_from_index( - fields.JUST_MAIL_IDX, "*", "0") - except u1db_errors.InvalidGlobbing: - # It looks like we are a dealing with an outdated - # mx. Fallback to the version of the index - warnings.warn("JUST_MAIL_COMPAT_IDX will be deprecated!", - DeprecationWarning) - doclist = self._soledad.get_from_index( - fields.JUST_MAIL_COMPAT_IDX, "*") - self._process_doclist(doclist) - - logger.debug("fetching mail for: %s %s" % ( - self._soledad.uuid, self._userid)) - if not self.fetching_lock.locked(): - d1 = self._sync_soledad() - d = defer.gatherResults([d1], consumeErrors=True) - d.addCallbacks(syncSoledadCallback, self._errback) - d.addCallbacks(self._signal_fetch_to_ui, self._errback) - return d - else: - logger.debug("Already fetching mail.") - - def start_loop(self): - """ - Starts a loop to fetch mail. - """ - if self._loop is None: - self._loop = LoopingCall(self.fetch) - self._loop.start(self._check_period) - else: - logger.warning("Tried to start an already running fetching loop.") - - def stop(self): - # XXX change the name to stop_loop, for consistency. - """ - Stops the loop that fetches mail. - """ - if self._loop and self._loop.running is True: - self._loop.stop() - self._loop = None - - # - # Private methods. - # - - # synchronize incoming mail - - def _errback(self, failure): - logger.exception(failure.value) - traceback.print_tb(*sys.exc_info()) - - @deferred_to_thread - def _sync_soledad(self): - """ - Synchronize with remote soledad. - - :returns: a list of LeapDocuments, or None. - :rtype: iterable or None - """ - with self.fetching_lock: - try: - log.msg('FETCH: syncing soledad...') - self._soledad.sync() - log.msg('FETCH soledad SYNCED.') - except InvalidAuthTokenError: - # if the token is invalid, send an event so the GUI can - # disable mail and show an error message. - leap_events.signal(SOLEDAD_INVALID_AUTH_TOKEN) - - def _signal_fetch_to_ui(self, doclist): - """ - Send leap events to ui. - - :param doclist: iterable with msg documents. - :type doclist: iterable. - :returns: doclist - :rtype: iterable - """ - doclist = first(doclist) # gatherResults pass us a list - if doclist: - fetched_ts = time.mktime(time.gmtime()) - num_mails = len(doclist) if doclist is not None else 0 - if num_mails != 0: - log.msg("there are %s mails" % (num_mails,)) - leap_events.signal( - IMAP_FETCHED_INCOMING, str(num_mails), str(fetched_ts)) - return doclist - - def _signal_unread_to_ui(self, *args): - """ - Sends unread event to ui. - """ - leap_events.signal( - IMAP_UNREAD_MAIL, str(self._inbox.getUnseenCount())) - - # process incoming mail. - - def _process_doclist(self, doclist): - """ - Iterates through the doclist, checks if each doc - looks like a message, and yields a deferred that will decrypt and - process the message. - - :param doclist: iterable with msg documents. - :type doclist: iterable. - :returns: a list of deferreds for individual messages. - """ - log.msg('processing doclist') - if not doclist: - logger.debug("no docs found") - return - num_mails = len(doclist) - - for index, doc in enumerate(doclist): - logger.debug("processing doc %d of %d" % (index + 1, num_mails)) - leap_events.signal( - IMAP_MSG_PROCESSING, str(index), str(num_mails)) - - keys = doc.content.keys() - - # TODO Compatibility check with the index in pre-0.6 mx - # that does not write the ERROR_DECRYPTING_KEY - # This should be removed in 0.7 - - has_errors = doc.content.get(fields.ERROR_DECRYPTING_KEY, None) - if has_errors is None: - warnings.warn("JUST_MAIL_COMPAT_IDX will be deprecated!", - DeprecationWarning) - if has_errors: - logger.debug("skipping msg with decrypting errors...") - - if self._is_msg(keys) and not has_errors: - # Evaluating to bool of has_errors is intentional here. - # We don't mind at this point if it's None or False. - - # Ok, this looks like a legit msg, and with no errors. - # Let's process it! - - d1 = self._decrypt_doc(doc) - d = defer.gatherResults([d1], consumeErrors=True) - d.addCallbacks(self._add_message_locally, self._errback) - - # - # operations on individual messages - # - - @deferred_to_thread - def _decrypt_doc(self, doc): - """ - Decrypt the contents of a document. - - :param doc: A document containing an encrypted message. - :type doc: SoledadDocument - - :return: A tuple containing the document and the decrypted message. - :rtype: (SoledadDocument, str) - """ - log.msg('decrypting msg') - success = False - - try: - decrdata = self._keymanager.decrypt( - doc.content[ENC_JSON_KEY], - self._pkey) - success = True - except Exception as exc: - # XXX move this to errback !!! - logger.error("Error while decrypting msg: %r" % (exc,)) - decrdata = "" - leap_events.signal(IMAP_MSG_DECRYPTED, "1" if success else "0") - - data = self._process_decrypted_doc((doc, decrdata)) - return (doc, data) - - def _process_decrypted_doc(self, msgtuple): - """ - Process a document containing a succesfully decrypted message. - - :param msgtuple: a tuple consisting of a SoledadDocument - instance containing the incoming message - and data, the json-encoded, decrypted content of the - incoming message - :type msgtuple: (SoledadDocument, str) - :return: a SoledadDocument and the processed data. - :rtype: (doc, data) - """ - log.msg('processing decrypted doc') - doc, data = msgtuple - - from twisted.internet import reactor - - # XXX turn this into an errBack for each one of - # the deferreds that would process an individual document - try: - msg = json_loads(data) - except UnicodeError as exc: - logger.error("Error while decrypting %s" % (doc.doc_id,)) - logger.exception(exc) - - # we flag the message as "with decrypting errors", - # to avoid further decryption attempts during sync - # cycles until we're prepared to deal with that. - # What is the same, when Ivan deals with it... - # A new decrypting attempt event could be triggered by a - # future a library upgrade, or a cli flag to the client, - # we just `defer` that for now... :) - doc.content[fields.ERROR_DECRYPTING_KEY] = True - deferLater(reactor, 0, self._update_incoming_message, doc) - - # FIXME this is just a dirty hack to delay the proper - # deferred organization here... - # and remember, boys, do not do this at home. - return [] - - if not isinstance(msg, dict): - defer.returnValue(False) - if not msg.get(fields.INCOMING_KEY, False): - defer.returnValue(False) - - # ok, this is an incoming message - rawmsg = msg.get(self.CONTENT_KEY, None) - if not rawmsg: - return False - return self._maybe_decrypt_msg(rawmsg) - - @deferred_to_thread - def _update_incoming_message(self, doc): - """ - Do a put for a soledad document. This probably has been called only - in the case that we've needed to update the ERROR_DECRYPTING_KEY - flag in an incoming message, to get it out of the decrypting queue. - - :param doc: the SoledadDocument to update - :type doc: SoledadDocument - """ - log.msg("Updating SoledadDoc %s" % (doc.doc_id)) - self._soledad.put_doc(doc) - - @deferred_to_thread - def _delete_incoming_message(self, doc): - """ - Delete document. - - :param doc: the SoledadDocument to delete - :type doc: SoledadDocument - """ - log.msg("Deleting Incoming message: %s" % (doc.doc_id,)) - self._soledad.delete_doc(doc) - - def _maybe_decrypt_msg(self, data): - """ - Tries to decrypt a gpg message if data looks like one. - - :param data: the text to be decrypted. - :type data: str - :return: data, possibly descrypted. - :rtype: str - """ - leap_assert_type(data, str) - log.msg('maybe decrypting doc') - - # parse the original message - encoding = get_email_charset(data) - msg = self._parser.parsestr(data) - - # try to obtain sender public key - senderPubkey = None - fromHeader = msg.get('from', None) - if (fromHeader is not None - and (msg.get_content_type() == MULTIPART_ENCRYPTED - or msg.get_content_type() == MULTIPART_SIGNED)): - _, senderAddress = parseaddr(fromHeader) - try: - senderPubkey = self._keymanager.get_key_from_cache( - senderAddress, OpenPGPKey) - except keymanager_errors.KeyNotFound: - pass - - valid_sig = False # we will add a header saying if sig is valid - decrypt_multi = self._decrypt_multipart_encrypted_msg - decrypt_inline = self._maybe_decrypt_inline_encrypted_msg - - if msg.get_content_type() == MULTIPART_ENCRYPTED: - decrmsg, valid_sig = decrypt_multi( - msg, encoding, senderPubkey) - else: - decrmsg, valid_sig = decrypt_inline( - msg, encoding, senderPubkey) - - # add x-leap-signature header - if senderPubkey is None: - decrmsg.add_header( - self.LEAP_SIGNATURE_HEADER, - self.LEAP_SIGNATURE_COULD_NOT_VERIFY) - else: - decrmsg.add_header( - self.LEAP_SIGNATURE_HEADER, - self.LEAP_SIGNATURE_VALID if valid_sig else - self.LEAP_SIGNATURE_INVALID, - pubkey=senderPubkey.key_id) - - return decrmsg.as_string() - - def _decrypt_multipart_encrypted_msg(self, msg, encoding, senderPubkey): - """ - Decrypt a message with content-type 'multipart/encrypted'. - - :param msg: The original encrypted message. - :type msg: Message - :param encoding: The encoding of the email message. - :type encoding: str - :param senderPubkey: The key of the sender of the message. - :type senderPubkey: OpenPGPKey - - :return: A unitary tuple containing a decrypted message. - :rtype: (Message) - """ - log.msg('decrypting multipart encrypted msg') - msg = copy.deepcopy(msg) - self._msg_multipart_sanity_check(msg) - - # parse message and get encrypted content - pgpencmsg = msg.get_payload()[1] - encdata = pgpencmsg.get_payload() - - # decrypt or fail gracefully - try: - decrdata, valid_sig = self._decrypt_and_verify_data( - encdata, senderPubkey) - except keymanager_errors.DecryptError as e: - logger.warning('Failed to decrypt encrypted message (%s). ' - 'Storing message without modifications.' % str(e)) - # Bailing out! - return (msg, False) - - decrmsg = self._parser.parsestr(decrdata) - # remove original message's multipart/encrypted content-type - del(msg['content-type']) - - # replace headers back in original message - for hkey, hval in decrmsg.items(): - try: - # this will raise KeyError if header is not present - msg.replace_header(hkey, hval) - except KeyError: - msg[hkey] = hval - - # all ok, replace payload by unencrypted payload - msg.set_payload(decrmsg.get_payload()) - return (msg, valid_sig) - - def _maybe_decrypt_inline_encrypted_msg(self, origmsg, encoding, - senderPubkey): - """ - Possibly decrypt an inline OpenPGP encrypted message. - - :param origmsg: The original, possibly encrypted message. - :type origmsg: Message - :param encoding: The encoding of the email message. - :type encoding: str - :param senderPubkey: The key of the sender of the message. - :type senderPubkey: OpenPGPKey - - :return: A tuple containing a decrypted message and - a bool indicating whether the signature is valid. - :rtype: (Message, bool) - """ - log.msg('maybe decrypting inline encrypted msg') - # serialize the original message - buf = StringIO() - g = Generator(buf) - g.flatten(origmsg) - data = buf.getvalue() - # handle exactly one inline PGP message - valid_sig = False - if PGP_BEGIN in data: - begin = data.find(PGP_BEGIN) - end = data.find(PGP_END) - pgp_message = data[begin:end + len(PGP_END)] - try: - decrdata, valid_sig = self._decrypt_and_verify_data( - pgp_message, senderPubkey) - # replace encrypted by decrypted content - data = data.replace(pgp_message, decrdata) - except keymanager_errors.DecryptError: - logger.warning('Failed to decrypt potential inline encrypted ' - 'message. Storing message as is...') - - # if message is not encrypted, return raw data - if isinstance(data, unicode): - data = data.encode(encoding, 'replace') - return (self._parser.parsestr(data), valid_sig) - - def _decrypt_and_verify_data(self, data, senderPubkey): - """ - Decrypt C{data} using our private key and attempt to verify a - signature using C{senderPubkey}. - - :param data: The text to be decrypted. - :type data: unicode - :param senderPubkey: The public key of the sender of the message. - :type senderPubkey: OpenPGPKey - - :return: The decrypted data and a boolean stating whether the - signature could be verified. - :rtype: (str, bool) - - :raise DecryptError: Raised if failed to decrypt. - """ - log.msg('decrypting and verifying data') - valid_sig = False - try: - decrdata = self._keymanager.decrypt( - data, self._pkey, - verify=senderPubkey) - if senderPubkey is not None: - valid_sig = True - except keymanager_errors.InvalidSignature: - decrdata = self._keymanager.decrypt( - data, self._pkey) - return (decrdata, valid_sig) - - def _add_message_locally(self, result): - """ - Adds a message to local inbox and delete it from the incoming db - in soledad. - - # XXX this comes from a gatherresult... - :param msgtuple: a tuple consisting of a SoledadDocument - instance containing the incoming message - and data, the json-encoded, decrypted content of the - incoming message - :type msgtuple: (SoledadDocument, str) - """ - from twisted.internet import reactor - msgtuple = first(result) - - doc, data = msgtuple - log.msg('adding message %s to local db' % (doc.doc_id,)) - - if isinstance(data, list): - if empty(data): - return False - data = data[0] - - def msgSavedCallback(result): - if not empty(result): - leap_events.signal(IMAP_MSG_SAVED_LOCALLY) - deferLater(reactor, 0, self._delete_incoming_message, doc) - leap_events.signal(IMAP_MSG_DELETED_INCOMING) - - d = self._inbox.addMessage(data, flags=(self.RECENT_FLAG,), - notify_on_disk=True) - d.addCallbacks(msgSavedCallback, self._errback) - - # - # helpers - # - - def _msg_multipart_sanity_check(self, msg): - """ - Performs a sanity check against a multipart encrypted msg - - :param msg: The original encrypted message. - :type msg: Message - """ - # sanity check - payload = msg.get_payload() - if len(payload) != 2: - raise MalformedMessage( - 'Multipart/encrypted messages should have exactly 2 body ' - 'parts (instead of %d).' % len(payload)) - if payload[0].get_content_type() != 'application/pgp-encrypted': - raise MalformedMessage( - "Multipart/encrypted messages' first body part should " - "have content type equal to 'application/pgp-encrypted' " - "(instead of %s)." % payload[0].get_content_type()) - if payload[1].get_content_type() != 'application/octet-stream': - raise MalformedMessage( - "Multipart/encrypted messages' second body part should " - "have content type equal to 'octet-stream' (instead of " - "%s)." % payload[1].get_content_type()) - - def _is_msg(self, keys): - """ - Checks if the keys of a dictionary match the signature - of the document type we use for messages. - - :param keys: iterable containing the strings to match. - :type keys: iterable of strings. - :rtype: bool - """ - return ENC_SCHEME_KEY in keys and ENC_JSON_KEY in keys |