From 34b5252a4fc21791dc080d1f2f9e5d49dd01bf79 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 25 Dec 2013 11:57:42 -0400 Subject: inlineCallbacks all the things! --- mail/src/leap/mail/decorators.py | 93 +++++++++++++++ mail/src/leap/mail/imap/fetch.py | 235 +++++++++++++++++++------------------- mail/src/leap/mail/imap/server.py | 78 ++----------- 3 files changed, 222 insertions(+), 184 deletions(-) create mode 100644 mail/src/leap/mail/decorators.py (limited to 'mail/src') diff --git a/mail/src/leap/mail/decorators.py b/mail/src/leap/mail/decorators.py new file mode 100644 index 0000000..9e49605 --- /dev/null +++ b/mail/src/leap/mail/decorators.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- +# decorators.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Useful decorators for mail package. +""" +import logging +import os +import sys +import traceback + +from functools import wraps + +from twisted.internet.threads import deferToThread +from twisted.python import log + +logger = logging.getLogger(__name__) + + +def deferred(f): + """ + Decorator, for deferring methods to Threads. + + It will do a deferToThread of the decorated method + unless the environment variable LEAPMAIL_DEBUG is set. + + It uses a descriptor to delay the definition of the + method wrapper. + """ + class descript(object): + def __init__(self, f): + self.f = f + + def __get__(self, instance, klass): + if instance is None: + # Class method was requested + return self.make_unbound(klass) + return self.make_bound(instance) + + def _errback(self, failure): + err = failure.value + logger.warning('error in method: %s' % (self.f.__name__)) + logger.exception(err) + log.err(err) + + def make_unbound(self, klass): + + @wraps(self.f) + def wrapper(*args, **kwargs): + """ + this doc will vanish + """ + raise TypeError( + 'unbound method {}() must be called with {} instance ' + 'as first argument (got nothing instead)'.format( + self.f.__name__, + klass.__name__) + ) + return wrapper + + def make_bound(self, instance): + + @wraps(self.f) + def wrapper(*args, **kwargs): + """ + This documentation will disapear + """ + if not os.environ.get('LEAPMAIL_DEBUG'): + d = deferToThread(self.f, instance, *args, **kwargs) + d.addErrback(self._errback) + return d + else: + return self.f(instance, *args, **kwargs) + + # This instance does not need the descriptor anymore, + # let it find the wrapper directly next time: + setattr(instance, self.f.__name__, wrapper) + return wrapper + + return descript(f) diff --git a/mail/src/leap/mail/imap/fetch.py b/mail/src/leap/mail/imap/fetch.py index b1c34ba..0b31c3b 100644 --- a/mail/src/leap/mail/imap/fetch.py +++ b/mail/src/leap/mail/imap/fetch.py @@ -17,21 +17,24 @@ """ Incoming mail fetcher. """ -import logging +import copy import json -import ssl +import logging +#import ssl import threading import time -import copy -from StringIO import StringIO +import sys +import traceback from email.parser import Parser from email.generator import Generator from email.utils import parseaddr +from StringIO import StringIO from twisted.python import log +from twisted.internet import defer from twisted.internet.task import LoopingCall -from twisted.internet.threads import deferToThread +#from twisted.internet.threads import deferToThread from zope.proxy import sameProxiedObjects from leap.common import events as leap_events @@ -45,12 +48,18 @@ from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL from leap.common.mail import get_email_charset from leap.keymanager import errors as keymanager_errors from leap.keymanager.openpgp import OpenPGPKey +from leap.mail.decorators import deferred from leap.soledad.client import Soledad from leap.soledad.common.crypto import ENC_SCHEME_KEY, ENC_JSON_KEY logger = logging.getLogger(__name__) +MULTIPART_ENCRYPTED = "multipart/encrypted" +MULTIPART_SIGNED = "multipart/signed" +PGP_BEGIN = "-----BEGIN PGP MESSAGE-----" +PGP_END = "-----END PGP MESSAGE-----" + class MalformedMessage(Exception): """ @@ -125,6 +134,9 @@ class LeapIncomingMail(object): self._create_soledad_indexes() + # initialize a mail parser only once + self._parser = Parser() + def _create_soledad_indexes(self): """ Create needed indexes on soledad. @@ -152,9 +164,10 @@ class LeapIncomingMail(object): logger.debug("fetching mail for: %s %s" % ( self._soledad.uuid, self._userid)) if not self.fetching_lock.locked(): - d = deferToThread(self._sync_soledad) - d.addCallbacks(self._signal_fetch_to_ui, self._sync_soledad_error) - d.addCallbacks(self._process_doclist, self._sync_soledad_error) + d1 = self._sync_soledad() + d = defer.gatherResults([d1], consumeErrors=True) + d.addCallbacks(self._signal_fetch_to_ui, self._errback) + d.addCallbacks(self._signal_unread_to_ui, self._errback) return d else: logger.debug("Already fetching mail.") @@ -184,6 +197,11 @@ class LeapIncomingMail(object): # synchronize incoming mail + def _errback(self, failure): + logger.exception(failure.value) + traceback.print_tb(*sys.exc_info()) + + @deferred def _sync_soledad(self): """ Synchronizes with remote soledad. @@ -196,10 +214,9 @@ class LeapIncomingMail(object): self._soledad.sync() log.msg('soledad synced.') doclist = self._soledad.get_from_index("just-mail", "*") + self._process_doclist(doclist) - return doclist - - def _signal_unread_to_ui(self): + def _signal_unread_to_ui(self, *args): """ Sends unread event to ui. """ @@ -215,53 +232,18 @@ class LeapIncomingMail(object): :returns: doclist :rtype: iterable """ + doclist = doclist[0] # gatherResults pass us a list fetched_ts = time.mktime(time.gmtime()) - num_mails = len(doclist) - log.msg("there are %s mails" % (num_mails,)) + num_mails = len(doclist) if doclist is not None else 0 + if num_mails != 0: + log.msg("there are %s mails" % (num_mails,)) leap_events.signal( IMAP_FETCHED_INCOMING, str(num_mails), str(fetched_ts)) - self._signal_unread_to_ui() return doclist - def _sync_soledad_error(self, failure): - """ - Errback for sync errors. - """ - # XXX should signal unrecoverable maybe. - err = failure.value - logger.error("error syncing soledad: %s" % (err,)) - if failure.check(ssl.SSLError): - logger.warning('SSL Error while ' - 'syncing soledad: %r' % (err,)) - elif failure.check(Exception): - logger.warning('Unknown error while ' - 'syncing soledad: %r' % (err,)) - - def _log_err(self, failure): - """ - Generic errback - """ - err = failure.value - logger.exception("error!: %r" % (err,)) - - def _decryption_error(self, failure): - """ - Errback for decryption errors. - """ - # XXX should signal unrecoverable maybe. - err = failure.value - logger.error("error decrypting msg: %s" % (err,)) - - def _saving_error(self, failure): - """ - Errback for local save errors. - """ - # XXX should signal unrecoverable maybe. - err = failure.value - logger.error("error saving msg locally: %s" % (err,)) - # process incoming mail. + @defer.inlineCallbacks def _process_doclist(self, doclist): """ Iterates through the doclist, checks if each doc @@ -278,7 +260,6 @@ class LeapIncomingMail(object): return num_mails = len(doclist) - docs_cb = [] for index, doc in enumerate(doclist): logger.debug("processing doc %d of %d" % (index + 1, num_mails)) leap_events.signal( @@ -287,35 +268,18 @@ class LeapIncomingMail(object): if self._is_msg(keys): # Ok, this looks like a legit msg. # Let's process it! - # Deferred chain for individual messages - - # XXX use an IConsumer instead... ? - d = deferToThread(self._decrypt_doc, doc) - d.addCallback(self._process_decrypted_doc) - d.addErrback(self._log_err) - d.addCallback(self._add_message_locally) - d.addErrback(self._log_err) - docs_cb.append(d) + decrypted = list(self._decrypt_doc(doc))[0] + res = self._add_message_locally(decrypted) + yield res + else: # Ooops, this does not. logger.debug('This does not look like a proper msg.') - return docs_cb # # operations on individual messages # - def _is_msg(self, keys): - """ - Checks if the keys of a dictionary match the signature - of the document type we use for messages. - - :param keys: iterable containing the strings to match. - :type keys: iterable of strings. - :rtype: bool - """ - return ENC_SCHEME_KEY in keys and ENC_JSON_KEY in keys - def _decrypt_doc(self, doc): """ Decrypt the contents of a document. @@ -339,7 +303,9 @@ class LeapIncomingMail(object): logger.error("Error while decrypting msg: %r" % (exc,)) decrdata = "" leap_events.signal(IMAP_MSG_DECRYPTED, "1" if success else "0") - return doc, decrdata + + data = list(self._process_decrypted_doc((doc, decrdata))) + yield (doc, data) def _process_decrypted_doc(self, msgtuple): """ @@ -357,16 +323,15 @@ class LeapIncomingMail(object): doc, data = msgtuple msg = json.loads(data) if not isinstance(msg, dict): - return False + defer.returnValue(False) if not msg.get(self.INCOMING_KEY, False): - return False + defer.returnValue(False) # ok, this is an incoming message rawmsg = msg.get(self.CONTENT_KEY, None) if not rawmsg: return False - data = self._maybe_decrypt_msg(rawmsg) - return doc, data + return self._maybe_decrypt_msg(rawmsg) def _maybe_decrypt_msg(self, data): """ @@ -381,17 +346,16 @@ class LeapIncomingMail(object): leap_assert_type(data, unicode) # parse the original message - parser = Parser() encoding = get_email_charset(data) data = data.encode(encoding) - msg = parser.parsestr(data) + msg = self._parser.parsestr(data) # try to obtain sender public key senderPubkey = None fromHeader = msg.get('from', None) - if fromHeader is not None \ - and (msg.get_content_type() == 'multipart/encrypted' \ - or msg.get_content_type() == 'multipart/signed'): + if (fromHeader is not None + and (msg.get_content_type() == MULTIPART_ENCRYPTED + or msg.get_content_type() == MULTIPART_SIGNED)): _, senderAddress = parseaddr(fromHeader) try: senderPubkey = self._keymanager.get_key_from_cache( @@ -400,11 +364,14 @@ class LeapIncomingMail(object): pass valid_sig = False # we will add a header saying if sig is valid - if msg.get_content_type() == 'multipart/encrypted': - decrmsg, valid_sig = self._decrypt_multipart_encrypted_msg( + decrypt_multi = self._decrypt_multipart_encrypted_msg + decrypt_inline = self._maybe_decrypt_inline_encrypted_msg + + if msg.get_content_type() == MULTIPART_ENCRYPTED: + decrmsg, valid_sig = decrypt_multi( msg, encoding, senderPubkey) else: - decrmsg, valid_sig = self._maybe_decrypt_inline_encrypted_msg( + decrmsg, valid_sig = decrypt_inline( msg, encoding, senderPubkey) # add x-leap-signature header @@ -419,7 +386,7 @@ class LeapIncomingMail(object): self.LEAP_SIGNATURE_INVALID, pubkey=senderPubkey.key_id) - return decrmsg.as_string() + yield decrmsg.as_string() def _decrypt_multipart_encrypted_msg(self, msg, encoding, senderPubkey): """ @@ -437,43 +404,33 @@ class LeapIncomingMail(object): """ log.msg('decrypting multipart encrypted msg') msg = copy.deepcopy(msg) - # sanity check - payload = msg.get_payload() - if len(payload) != 2: - raise MalformedMessage( - 'Multipart/encrypted messages should have exactly 2 body ' - 'parts (instead of %d).' % len(payload)) - if payload[0].get_content_type() != 'application/pgp-encrypted': - raise MalformedMessage( - "Multipart/encrypted messages' first body part should " - "have content type equal to 'application/pgp-encrypted' " - "(instead of %s)." % payload[0].get_content_type()) - if payload[1].get_content_type() != 'application/octet-stream': - raise MalformedMessage( - "Multipart/encrypted messages' second body part should " - "have content type equal to 'octet-stream' (instead of " - "%s)." % payload[1].get_content_type()) + self._multipart_sanity_check(msg) + # parse message and get encrypted content pgpencmsg = msg.get_payload()[1] encdata = pgpencmsg.get_payload() + # decrypt or fail gracefully try: - decrdata, valid_sig = self._decrypt_and_verify_data( + decrdata, valid_sig = yield self._decrypt_and_verify_data( encdata, senderPubkey) except keymanager_errors.DecryptError as e: logger.warning('Failed to decrypt encrypted message (%s). ' 'Storing message without modifications.' % str(e)) - return msg, False # return original message + # Bailing out! + yield (msg, False) + # decrypted successully, now fix encoding and parse try: decrdata = decrdata.encode(encoding) except (UnicodeEncodeError, UnicodeDecodeError) as e: logger.error("Unicode error {0}".format(e)) decrdata = decrdata.encode(encoding, 'replace') - parser = Parser() - decrmsg = parser.parsestr(decrdata) + + decrmsg = self._parser.parsestr(decrdata) # remove original message's multipart/encrypted content-type del(msg['content-type']) + # replace headers back in original message for hkey, hval in decrmsg.items(): try: @@ -481,9 +438,10 @@ class LeapIncomingMail(object): msg.replace_header(hkey, hval) except KeyError: msg[hkey] = hval - # replace payload by unencrypted payload + + # all ok, replace payload by unencrypted payload msg.set_payload(decrmsg.get_payload()) - return msg, valid_sig + yield (msg, valid_sig) def _maybe_decrypt_inline_encrypted_msg(self, origmsg, encoding, senderPubkey): @@ -497,8 +455,9 @@ class LeapIncomingMail(object): :param senderPubkey: The key of the sender of the message. :type senderPubkey: OpenPGPKey - :return: A unitary tuple containing a decrypted message. - :rtype: (Message) + :return: A unitary tuple containing a decrypted message and + a bool indicating wether the signature is valid. + :rtype: (Message, bool) """ log.msg('maybe decrypting inline encrypted msg') # serialize the original message @@ -507,8 +466,6 @@ class LeapIncomingMail(object): g.flatten(origmsg) data = buf.getvalue() # handle exactly one inline PGP message - PGP_BEGIN = "-----BEGIN PGP MESSAGE-----" - PGP_END = "-----END PGP MESSAGE-----" valid_sig = False if PGP_BEGIN in data: begin = data.find(PGP_BEGIN) @@ -522,11 +479,11 @@ class LeapIncomingMail(object): except keymanager_errors.DecryptError: logger.warning('Failed to decrypt potential inline encrypted ' 'message. Storing message as is...') + # if message is not encrypted, return raw data if isinstance(data, unicode): data = data.encode(encoding, 'replace') - parser = Parser() - return parser.parsestr(data), valid_sig + return (self._parser.parsestr(data), valid_sig) def _decrypt_and_verify_data(self, data, senderPubkey): """ @@ -555,7 +512,7 @@ class LeapIncomingMail(object): except keymanager_errors.InvalidSignature: decrdata = self._keymanager.decrypt( data, self._pkey) - return decrdata, valid_sig + return (decrdata, valid_sig) def _add_message_locally(self, msgtuple): """ @@ -570,10 +527,54 @@ class LeapIncomingMail(object): """ log.msg('adding message to local db') doc, data = msgtuple - self._inbox.addMessage(data, (self.RECENT_FLAG,)) + if isinstance(data, list): + data = data[0] + + self._inbox.addMessage(data, flags=(self.RECENT_FLAG,)) + leap_events.signal(IMAP_MSG_SAVED_LOCALLY) doc_id = doc.doc_id self._soledad.delete_doc(doc) log.msg("deleted doc %s from incoming" % doc_id) leap_events.signal(IMAP_MSG_DELETED_INCOMING) self._signal_unread_to_ui() + return True + + # + # helpers + # + + def _msg_multipart_sanity_check(self, msg): + """ + Performs a sanity check against a multipart encrypted msg + + :param msg: The original encrypted message. + :type msg: Message + """ + # sanity check + payload = msg.get_payload() + if len(payload) != 2: + raise MalformedMessage( + 'Multipart/encrypted messages should have exactly 2 body ' + 'parts (instead of %d).' % len(payload)) + if payload[0].get_content_type() != 'application/pgp-encrypted': + raise MalformedMessage( + "Multipart/encrypted messages' first body part should " + "have content type equal to 'application/pgp-encrypted' " + "(instead of %s)." % payload[0].get_content_type()) + if payload[1].get_content_type() != 'application/octet-stream': + raise MalformedMessage( + "Multipart/encrypted messages' second body part should " + "have content type equal to 'octet-stream' (instead of " + "%s)." % payload[1].get_content_type()) + + def _is_msg(self, keys): + """ + Checks if the keys of a dictionary match the signature + of the document type we use for messages. + + :param keys: iterable containing the strings to match. + :type keys: iterable of strings. + :rtype: bool + """ + return ENC_SCHEME_KEY in keys and ENC_JSON_KEY in keys diff --git a/mail/src/leap/mail/imap/server.py b/mail/src/leap/mail/imap/server.py index 8758dcb..57587a5 100644 --- a/mail/src/leap/mail/imap/server.py +++ b/mail/src/leap/mail/imap/server.py @@ -21,20 +21,17 @@ import copy import logging import StringIO import cStringIO -import os import time import re from collections import defaultdict, namedtuple from email.parser import Parser -from functools import wraps from zope.interface import implements from zope.proxy import sameProxiedObjects from twisted.mail import imap4 from twisted.internet import defer -from twisted.internet.threads import deferToThread from twisted.python import log from u1db import errors as u1db_errors @@ -44,70 +41,12 @@ from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL from leap.common.check import leap_assert, leap_assert_type from leap.common.mail import get_email_charset from leap.mail.messageflow import IMessageConsumer, MessageProducer +from leap.mail.decorators import deferred from leap.soledad.client import Soledad logger = logging.getLogger(__name__) -def deferred(f): - ''' - Decorator, for deferring methods to Threads. - - It will do a deferToThread of the decorated method - unless the environment variable LEAPMAIL_DEBUG is set. - - It uses a descriptor to delay the definition of the - method wrapper. - ''' - class descript(object): - def __init__(self, f): - self.f = f - - def __get__(self, instance, klass): - if instance is None: - # Class method was requested - return self.make_unbound(klass) - return self.make_bound(instance) - - def _errback(self, failure): - err = failure.value - logger.warning('error in method: %s' % (self.f.__name__)) - log.err(err) - - def make_unbound(self, klass): - - @wraps(self.f) - def wrapper(*args, **kwargs): - '''This documentation will vanish :)''' - raise TypeError( - 'unbound method {}() must be called with {} instance ' - 'as first argument (got nothing instead)'.format( - self.f.__name__, - klass.__name__) - ) - return wrapper - - def make_bound(self, instance): - - @wraps(self.f) - def wrapper(*args, **kwargs): - '''This documentation will disapear :)''' - - if not os.environ.get('LEAPMAIL_DEBUG'): - d = deferToThread(self.f, instance, *args, **kwargs) - d.addErrback(self._errback) - return d - else: - return self.f(instance, *args, **kwargs) - - # This instance does not need the descriptor anymore, - # let it find the wrapper directly next time: - setattr(instance, self.f.__name__, wrapper) - return wrapper - - return descript(f) - - class MissingIndexError(Exception): """ Raises when tried to access a non existent index document. @@ -248,6 +187,8 @@ class MailParser(object): return self._parser.parse if isinstance(o, basestring): return self._parser.parsestr + # fallback + return self._parser.parsestr def _stringify(self, o): """ @@ -942,8 +883,8 @@ class LeapMessage(fields, MailParser, MBoxParser): Return True if this message is multipart. """ if self._cdoc: - retval = self._cdoc.content.get(self.MULTIPART_KEY, False) - print "MULTIPART? ", retval + retval = self._fdoc.content.get(self.MULTIPART_KEY, False) + return retval def getSubPart(self, part): """ @@ -1197,6 +1138,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): msg = self._get_parsed_msg(raw) headers = dict(msg) + logger.debug("adding. is multipart:%s" % msg.is_multipart()) flags_doc[self.MULTIPART_KEY] = msg.is_multipart() # XXX get lower case for keys? # XXX get headers doc @@ -1464,7 +1406,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser): def addListener(self, listener): """ - Rdds a listener to the listeners queue. + Adds a listener to the listeners queue. + The server adds itself as a listener when there is a SELECT, + so it can send EXIST commands. :param listener: listener to add :type listener: an object that implements IMailboxListener @@ -1716,6 +1660,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :return: a deferred that evals to None """ # XXX we should treat the message as an IMessage from here + leap_assert_type(message, basestring) uid_next = self.getUIDNext() logger.debug('Adding msg with UID :%s' % uid_next) if flags is None: @@ -1823,12 +1768,11 @@ class SoledadMailbox(WithMsgFields, MBoxParser): else: for msg_id in messages: - print "getting msg by uid", msg_id msg = self.messages.get_msg_by_uid(msg_id) if msg: result.append((msg_id, msg)) else: - print "fetch %s, no msg found!!!" % msg_id + logger.debug("fetch %s, no msg found!!!" % msg_id) if self.isWriteable(): self._unset_recent_flag() -- cgit v1.2.3