diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/leap/mail/decorators.py | 93 | ||||
| -rw-r--r-- | src/leap/mail/imap/fetch.py | 235 | ||||
| -rw-r--r-- | src/leap/mail/imap/server.py | 78 | 
3 files changed, 222 insertions, 184 deletions
| diff --git a/src/leap/mail/decorators.py b/src/leap/mail/decorators.py new file mode 100644 index 0000000..9e49605 --- /dev/null +++ b/src/leap/mail/decorators.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- +# decorators.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. +""" +Useful decorators for mail package. +""" +import logging +import os +import sys +import traceback + +from functools import wraps + +from twisted.internet.threads import deferToThread +from twisted.python import log + +logger = logging.getLogger(__name__) + + +def deferred(f): +    """ +    Decorator, for deferring methods to Threads. + +    It will do a deferToThread of the decorated method +    unless the environment variable LEAPMAIL_DEBUG is set. + +    It uses a descriptor to delay the definition of the +    method wrapper. +    """ +    class descript(object): +        def __init__(self, f): +            self.f = f + +        def __get__(self, instance, klass): +            if instance is None: +                # Class method was requested +                return self.make_unbound(klass) +            return self.make_bound(instance) + +        def _errback(self, failure): +            err = failure.value +            logger.warning('error in method: %s' % (self.f.__name__)) +            logger.exception(err) +            log.err(err) + +        def make_unbound(self, klass): + +            @wraps(self.f) +            def wrapper(*args, **kwargs): +                """ +                this doc will vanish +                """ +                raise TypeError( +                    'unbound method {}() must be called with {} instance ' +                    'as first argument (got nothing instead)'.format( +                        self.f.__name__, +                        klass.__name__) +                ) +            return wrapper + +        def make_bound(self, instance): + +            @wraps(self.f) +            def wrapper(*args, **kwargs): +                """ +                This documentation will disapear +                """ +                if not os.environ.get('LEAPMAIL_DEBUG'): +                    d = deferToThread(self.f, instance, *args, **kwargs) +                    d.addErrback(self._errback) +                    return d +                else: +                    return self.f(instance, *args, **kwargs) + +            # This instance does not need the descriptor anymore, +            # let it find the wrapper directly next time: +            setattr(instance, self.f.__name__, wrapper) +            return wrapper + +    return descript(f) diff --git a/src/leap/mail/imap/fetch.py b/src/leap/mail/imap/fetch.py index b1c34ba..0b31c3b 100644 --- a/src/leap/mail/imap/fetch.py +++ b/src/leap/mail/imap/fetch.py @@ -17,21 +17,24 @@  """  Incoming mail fetcher.  """ -import logging +import copy  import json -import ssl +import logging +#import ssl  import threading  import time -import copy -from StringIO import StringIO +import sys +import traceback  from email.parser import Parser  from email.generator import Generator  from email.utils import parseaddr +from StringIO import StringIO  from twisted.python import log +from twisted.internet import defer  from twisted.internet.task import LoopingCall -from twisted.internet.threads import deferToThread +#from twisted.internet.threads import deferToThread  from zope.proxy import sameProxiedObjects  from leap.common import events as leap_events @@ -45,12 +48,18 @@ from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL  from leap.common.mail import get_email_charset  from leap.keymanager import errors as keymanager_errors  from leap.keymanager.openpgp import OpenPGPKey +from leap.mail.decorators import deferred  from leap.soledad.client import Soledad  from leap.soledad.common.crypto import ENC_SCHEME_KEY, ENC_JSON_KEY  logger = logging.getLogger(__name__) +MULTIPART_ENCRYPTED = "multipart/encrypted" +MULTIPART_SIGNED = "multipart/signed" +PGP_BEGIN = "-----BEGIN PGP MESSAGE-----" +PGP_END = "-----END PGP MESSAGE-----" +  class MalformedMessage(Exception):      """ @@ -125,6 +134,9 @@ class LeapIncomingMail(object):          self._create_soledad_indexes() +        # initialize a mail parser only once +        self._parser = Parser() +      def _create_soledad_indexes(self):          """          Create needed indexes on soledad. @@ -152,9 +164,10 @@ class LeapIncomingMail(object):          logger.debug("fetching mail for: %s %s" % (              self._soledad.uuid, self._userid))          if not self.fetching_lock.locked(): -            d = deferToThread(self._sync_soledad) -            d.addCallbacks(self._signal_fetch_to_ui, self._sync_soledad_error) -            d.addCallbacks(self._process_doclist, self._sync_soledad_error) +            d1 = self._sync_soledad() +            d = defer.gatherResults([d1], consumeErrors=True) +            d.addCallbacks(self._signal_fetch_to_ui, self._errback) +            d.addCallbacks(self._signal_unread_to_ui, self._errback)              return d          else:              logger.debug("Already fetching mail.") @@ -184,6 +197,11 @@ class LeapIncomingMail(object):      # synchronize incoming mail +    def _errback(self, failure): +        logger.exception(failure.value) +        traceback.print_tb(*sys.exc_info()) + +    @deferred      def _sync_soledad(self):          """          Synchronizes with remote soledad. @@ -196,10 +214,9 @@ class LeapIncomingMail(object):              self._soledad.sync()              log.msg('soledad synced.')              doclist = self._soledad.get_from_index("just-mail", "*") +        self._process_doclist(doclist) -        return doclist - -    def _signal_unread_to_ui(self): +    def _signal_unread_to_ui(self, *args):          """          Sends unread event to ui.          """ @@ -215,53 +232,18 @@ class LeapIncomingMail(object):          :returns: doclist          :rtype: iterable          """ +        doclist = doclist[0]  # gatherResults pass us a list          fetched_ts = time.mktime(time.gmtime()) -        num_mails = len(doclist) -        log.msg("there are %s mails" % (num_mails,)) +        num_mails = len(doclist) if doclist is not None else 0 +        if num_mails != 0: +            log.msg("there are %s mails" % (num_mails,))          leap_events.signal(              IMAP_FETCHED_INCOMING, str(num_mails), str(fetched_ts)) -        self._signal_unread_to_ui()          return doclist -    def _sync_soledad_error(self, failure): -        """ -        Errback for sync errors. -        """ -        # XXX should signal unrecoverable maybe. -        err = failure.value -        logger.error("error syncing soledad: %s" % (err,)) -        if failure.check(ssl.SSLError): -            logger.warning('SSL Error while ' -                           'syncing soledad: %r' % (err,)) -        elif failure.check(Exception): -            logger.warning('Unknown error while ' -                           'syncing soledad: %r' % (err,)) - -    def _log_err(self, failure): -        """ -        Generic errback -        """ -        err = failure.value -        logger.exception("error!: %r" % (err,)) - -    def _decryption_error(self, failure): -        """ -        Errback for decryption errors. -        """ -        # XXX should signal unrecoverable maybe. -        err = failure.value -        logger.error("error decrypting msg: %s" % (err,)) - -    def _saving_error(self, failure): -        """ -        Errback for local save errors. -        """ -        # XXX should signal unrecoverable maybe. -        err = failure.value -        logger.error("error saving msg locally: %s" % (err,)) -      # process incoming mail. +    @defer.inlineCallbacks      def _process_doclist(self, doclist):          """          Iterates through the doclist, checks if each doc @@ -278,7 +260,6 @@ class LeapIncomingMail(object):              return          num_mails = len(doclist) -        docs_cb = []          for index, doc in enumerate(doclist):              logger.debug("processing doc %d of %d" % (index + 1, num_mails))              leap_events.signal( @@ -287,35 +268,18 @@ class LeapIncomingMail(object):              if self._is_msg(keys):                  # Ok, this looks like a legit msg.                  # Let's process it! -                # Deferred chain for individual messages - -                # XXX use an IConsumer instead... ? -                d = deferToThread(self._decrypt_doc, doc) -                d.addCallback(self._process_decrypted_doc) -                d.addErrback(self._log_err) -                d.addCallback(self._add_message_locally) -                d.addErrback(self._log_err) -                docs_cb.append(d) +                decrypted = list(self._decrypt_doc(doc))[0] +                res = self._add_message_locally(decrypted) +                yield res +              else:                  # Ooops, this does not.                  logger.debug('This does not look like a proper msg.') -        return docs_cb      #      # operations on individual messages      # -    def _is_msg(self, keys): -        """ -        Checks if the keys of a dictionary match the signature -        of the document type we use for messages. - -        :param keys: iterable containing the strings to match. -        :type keys: iterable of strings. -        :rtype: bool -        """ -        return ENC_SCHEME_KEY in keys and ENC_JSON_KEY in keys -      def _decrypt_doc(self, doc):          """          Decrypt the contents of a document. @@ -339,7 +303,9 @@ class LeapIncomingMail(object):              logger.error("Error while decrypting msg: %r" % (exc,))              decrdata = ""          leap_events.signal(IMAP_MSG_DECRYPTED, "1" if success else "0") -        return doc, decrdata + +        data = list(self._process_decrypted_doc((doc, decrdata))) +        yield (doc, data)      def _process_decrypted_doc(self, msgtuple):          """ @@ -357,16 +323,15 @@ class LeapIncomingMail(object):          doc, data = msgtuple          msg = json.loads(data)          if not isinstance(msg, dict): -            return False +            defer.returnValue(False)          if not msg.get(self.INCOMING_KEY, False): -            return False +            defer.returnValue(False)          # ok, this is an incoming message          rawmsg = msg.get(self.CONTENT_KEY, None)          if not rawmsg:              return False -        data = self._maybe_decrypt_msg(rawmsg) -        return doc, data +        return self._maybe_decrypt_msg(rawmsg)      def _maybe_decrypt_msg(self, data):          """ @@ -381,17 +346,16 @@ class LeapIncomingMail(object):          leap_assert_type(data, unicode)          # parse the original message -        parser = Parser()          encoding = get_email_charset(data)          data = data.encode(encoding) -        msg = parser.parsestr(data) +        msg = self._parser.parsestr(data)          # try to obtain sender public key          senderPubkey = None          fromHeader = msg.get('from', None) -        if fromHeader is not None \ -                and (msg.get_content_type() == 'multipart/encrypted' \ -                     or msg.get_content_type() == 'multipart/signed'): +        if (fromHeader is not None +            and (msg.get_content_type() == MULTIPART_ENCRYPTED +                 or msg.get_content_type() == MULTIPART_SIGNED)):              _, senderAddress = parseaddr(fromHeader)              try:                  senderPubkey = self._keymanager.get_key_from_cache( @@ -400,11 +364,14 @@ class LeapIncomingMail(object):                  pass          valid_sig = False  # we will add a header saying if sig is valid -        if msg.get_content_type() == 'multipart/encrypted': -            decrmsg, valid_sig = self._decrypt_multipart_encrypted_msg( +        decrypt_multi = self._decrypt_multipart_encrypted_msg +        decrypt_inline = self._maybe_decrypt_inline_encrypted_msg + +        if msg.get_content_type() == MULTIPART_ENCRYPTED: +            decrmsg, valid_sig = decrypt_multi(                  msg, encoding, senderPubkey)          else: -            decrmsg, valid_sig = self._maybe_decrypt_inline_encrypted_msg( +            decrmsg, valid_sig = decrypt_inline(                  msg, encoding, senderPubkey)          # add x-leap-signature header @@ -419,7 +386,7 @@ class LeapIncomingMail(object):                  self.LEAP_SIGNATURE_INVALID,                  pubkey=senderPubkey.key_id) -        return decrmsg.as_string() +        yield decrmsg.as_string()      def _decrypt_multipart_encrypted_msg(self, msg, encoding, senderPubkey):          """ @@ -437,43 +404,33 @@ class LeapIncomingMail(object):          """          log.msg('decrypting multipart encrypted msg')          msg = copy.deepcopy(msg) -        # sanity check -        payload = msg.get_payload() -        if len(payload) != 2: -            raise MalformedMessage( -                'Multipart/encrypted messages should have exactly 2 body ' -                'parts (instead of %d).' % len(payload)) -        if payload[0].get_content_type() != 'application/pgp-encrypted': -            raise MalformedMessage( -                "Multipart/encrypted messages' first body part should " -                "have content type equal to 'application/pgp-encrypted' " -                "(instead of %s)." % payload[0].get_content_type()) -        if payload[1].get_content_type() != 'application/octet-stream': -            raise MalformedMessage( -                "Multipart/encrypted messages' second body part should " -                "have content type equal to 'octet-stream' (instead of " -                "%s)." % payload[1].get_content_type()) +        self._multipart_sanity_check(msg) +          # parse message and get encrypted content          pgpencmsg = msg.get_payload()[1]          encdata = pgpencmsg.get_payload() +          # decrypt or fail gracefully          try: -            decrdata, valid_sig = self._decrypt_and_verify_data( +            decrdata, valid_sig = yield self._decrypt_and_verify_data(                  encdata, senderPubkey)          except keymanager_errors.DecryptError as e:              logger.warning('Failed to decrypt encrypted message (%s). '                             'Storing message without modifications.' % str(e)) -            return msg, False  # return original message +            # Bailing out! +            yield (msg, False) +          # decrypted successully, now fix encoding and parse          try:              decrdata = decrdata.encode(encoding)          except (UnicodeEncodeError, UnicodeDecodeError) as e:              logger.error("Unicode error {0}".format(e))              decrdata = decrdata.encode(encoding, 'replace') -        parser = Parser() -        decrmsg = parser.parsestr(decrdata) + +        decrmsg = self._parser.parsestr(decrdata)          # remove original message's multipart/encrypted content-type          del(msg['content-type']) +          # replace headers back in original message          for hkey, hval in decrmsg.items():              try: @@ -481,9 +438,10 @@ class LeapIncomingMail(object):                  msg.replace_header(hkey, hval)              except KeyError:                  msg[hkey] = hval -        # replace payload by unencrypted payload + +        # all ok, replace payload by unencrypted payload          msg.set_payload(decrmsg.get_payload()) -        return msg, valid_sig +        yield (msg, valid_sig)      def _maybe_decrypt_inline_encrypted_msg(self, origmsg, encoding,                                              senderPubkey): @@ -497,8 +455,9 @@ class LeapIncomingMail(object):          :param senderPubkey: The key of the sender of the message.          :type senderPubkey: OpenPGPKey -        :return: A unitary tuple containing a decrypted message. -        :rtype: (Message) +        :return: A unitary tuple containing a decrypted message and +                 a bool indicating wether the signature is valid. +        :rtype: (Message, bool)          """          log.msg('maybe decrypting inline encrypted msg')          # serialize the original message @@ -507,8 +466,6 @@ class LeapIncomingMail(object):          g.flatten(origmsg)          data = buf.getvalue()          # handle exactly one inline PGP message -        PGP_BEGIN = "-----BEGIN PGP MESSAGE-----" -        PGP_END = "-----END PGP MESSAGE-----"          valid_sig = False          if PGP_BEGIN in data:              begin = data.find(PGP_BEGIN) @@ -522,11 +479,11 @@ class LeapIncomingMail(object):              except keymanager_errors.DecryptError:                  logger.warning('Failed to decrypt potential inline encrypted '                                 'message. Storing message as is...') +          # if message is not encrypted, return raw data          if isinstance(data, unicode):              data = data.encode(encoding, 'replace') -        parser = Parser() -        return parser.parsestr(data), valid_sig +        return (self._parser.parsestr(data), valid_sig)      def _decrypt_and_verify_data(self, data, senderPubkey):          """ @@ -555,7 +512,7 @@ class LeapIncomingMail(object):          except keymanager_errors.InvalidSignature:              decrdata = self._keymanager.decrypt(                  data, self._pkey) -        return decrdata, valid_sig +        return (decrdata, valid_sig)      def _add_message_locally(self, msgtuple):          """ @@ -570,10 +527,54 @@ class LeapIncomingMail(object):          """          log.msg('adding message to local db')          doc, data = msgtuple -        self._inbox.addMessage(data, (self.RECENT_FLAG,)) +        if isinstance(data, list): +            data = data[0] + +        self._inbox.addMessage(data, flags=(self.RECENT_FLAG,)) +          leap_events.signal(IMAP_MSG_SAVED_LOCALLY)          doc_id = doc.doc_id          self._soledad.delete_doc(doc)          log.msg("deleted doc %s from incoming" % doc_id)          leap_events.signal(IMAP_MSG_DELETED_INCOMING)          self._signal_unread_to_ui() +        return True + +    # +    # helpers +    # + +    def _msg_multipart_sanity_check(self, msg): +        """ +        Performs a sanity check against a multipart encrypted msg + +        :param msg: The original encrypted message. +        :type msg: Message +        """ +        # sanity check +        payload = msg.get_payload() +        if len(payload) != 2: +            raise MalformedMessage( +                'Multipart/encrypted messages should have exactly 2 body ' +                'parts (instead of %d).' % len(payload)) +        if payload[0].get_content_type() != 'application/pgp-encrypted': +            raise MalformedMessage( +                "Multipart/encrypted messages' first body part should " +                "have content type equal to 'application/pgp-encrypted' " +                "(instead of %s)." % payload[0].get_content_type()) +        if payload[1].get_content_type() != 'application/octet-stream': +            raise MalformedMessage( +                "Multipart/encrypted messages' second body part should " +                "have content type equal to 'octet-stream' (instead of " +                "%s)." % payload[1].get_content_type()) + +    def _is_msg(self, keys): +        """ +        Checks if the keys of a dictionary match the signature +        of the document type we use for messages. + +        :param keys: iterable containing the strings to match. +        :type keys: iterable of strings. +        :rtype: bool +        """ +        return ENC_SCHEME_KEY in keys and ENC_JSON_KEY in keys diff --git a/src/leap/mail/imap/server.py b/src/leap/mail/imap/server.py index 8758dcb..57587a5 100644 --- a/src/leap/mail/imap/server.py +++ b/src/leap/mail/imap/server.py @@ -21,20 +21,17 @@ import copy  import logging  import StringIO  import cStringIO -import os  import time  import re  from collections import defaultdict, namedtuple  from email.parser import Parser -from functools import wraps  from zope.interface import implements  from zope.proxy import sameProxiedObjects  from twisted.mail import imap4  from twisted.internet import defer -from twisted.internet.threads import deferToThread  from twisted.python import log  from u1db import errors as u1db_errors @@ -44,70 +41,12 @@ from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL  from leap.common.check import leap_assert, leap_assert_type  from leap.common.mail import get_email_charset  from leap.mail.messageflow import IMessageConsumer, MessageProducer +from leap.mail.decorators import deferred  from leap.soledad.client import Soledad  logger = logging.getLogger(__name__) -def deferred(f): -    ''' -    Decorator, for deferring methods to Threads. - -    It will do a deferToThread of the decorated method -    unless the environment variable LEAPMAIL_DEBUG is set. - -    It uses a descriptor to delay the definition of the -    method wrapper. -    ''' -    class descript(object): -        def __init__(self, f): -            self.f = f - -        def __get__(self, instance, klass): -            if instance is None: -                # Class method was requested -                return self.make_unbound(klass) -            return self.make_bound(instance) - -        def _errback(self, failure): -            err = failure.value -            logger.warning('error in method: %s' % (self.f.__name__)) -            log.err(err) - -        def make_unbound(self, klass): - -            @wraps(self.f) -            def wrapper(*args, **kwargs): -                '''This documentation will vanish :)''' -                raise TypeError( -                    'unbound method {}() must be called with {} instance ' -                    'as first argument (got nothing instead)'.format( -                        self.f.__name__, -                        klass.__name__) -                ) -            return wrapper - -        def make_bound(self, instance): - -            @wraps(self.f) -            def wrapper(*args, **kwargs): -                '''This documentation will disapear :)''' - -                if not os.environ.get('LEAPMAIL_DEBUG'): -                    d = deferToThread(self.f, instance, *args, **kwargs) -                    d.addErrback(self._errback) -                    return d -                else: -                    return self.f(instance, *args, **kwargs) - -            # This instance does not need the descriptor anymore, -            # let it find the wrapper directly next time: -            setattr(instance, self.f.__name__, wrapper) -            return wrapper - -    return descript(f) - -  class MissingIndexError(Exception):      """      Raises when tried to access a non existent index document. @@ -248,6 +187,8 @@ class MailParser(object):              return self._parser.parse          if isinstance(o, basestring):              return self._parser.parsestr +        # fallback +        return self._parser.parsestr      def _stringify(self, o):          """ @@ -942,8 +883,8 @@ class LeapMessage(fields, MailParser, MBoxParser):          Return True if this message is multipart.          """          if self._cdoc: -            retval = self._cdoc.content.get(self.MULTIPART_KEY, False) -            print "MULTIPART? ", retval +            retval = self._fdoc.content.get(self.MULTIPART_KEY, False) +            return retval      def getSubPart(self, part):          """ @@ -1197,6 +1138,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          msg = self._get_parsed_msg(raw)          headers = dict(msg) +        logger.debug("adding. is multipart:%s" % msg.is_multipart())          flags_doc[self.MULTIPART_KEY] = msg.is_multipart()          # XXX get lower case for keys?          # XXX get headers doc @@ -1464,7 +1406,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser):      def addListener(self, listener):          """ -        Rdds a listener to the listeners queue. +        Adds a listener to the listeners queue. +        The server adds itself as a listener when there is a SELECT, +        so it can send EXIST commands.          :param listener: listener to add          :type listener: an object that implements IMailboxListener @@ -1716,6 +1660,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          :return: a deferred that evals to None          """          # XXX we should treat the message as an IMessage from here +        leap_assert_type(message, basestring)          uid_next = self.getUIDNext()          logger.debug('Adding msg with UID :%s' % uid_next)          if flags is None: @@ -1823,12 +1768,11 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          else:              for msg_id in messages: -                print "getting msg by uid", msg_id                  msg = self.messages.get_msg_by_uid(msg_id)                  if msg:                      result.append((msg_id, msg))                  else: -                    print "fetch %s, no msg found!!!" % msg_id +                    logger.debug("fetch %s, no msg found!!!" % msg_id)          if self.isWriteable():              self._unset_recent_flag() | 
