diff options
| author | Tomás Touceda <chiiph@leap.se> | 2013-12-06 15:48:54 -0300 | 
|---|---|---|
| committer | Tomás Touceda <chiiph@leap.se> | 2013-12-06 15:48:54 -0300 | 
| commit | 1e6629297273e3906df51a9a1c0db52e5d8cc85f (patch) | |
| tree | d3e20036d6e986f2a0509003fdcd426ea1e54857 /mail | |
| parent | e65620c9de05fdd051a8ad045a0ff81bcf67e39a (diff) | |
| parent | d8aa0552cef82516cafc2ccca9f6dc1da97371e3 (diff) | |
Merge branch 'release-0.3.8'
Diffstat (limited to 'mail')
| -rw-r--r-- | mail/CHANGELOG | 16 | ||||
| -rw-r--r-- | mail/changes/VERSION_COMPAT | 1 | ||||
| -rw-r--r-- | mail/pkg/requirements.pip | 2 | ||||
| -rwxr-xr-x | mail/pkg/tools/with_venvwrapper.sh | 16 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/fetch.py | 315 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/server.py | 59 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/service/imap.py | 2 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/tests/test_imap.py | 18 | ||||
| -rw-r--r-- | mail/src/leap/mail/messageflow.py | 149 | ||||
| -rw-r--r-- | mail/src/leap/mail/smtp/gateway.py | 14 | ||||
| -rw-r--r-- | mail/src/leap/mail/smtp/tests/__init__.py | 4 | ||||
| -rw-r--r-- | mail/src/leap/mail/smtp/tests/test_gateway.py | 21 | 
12 files changed, 501 insertions, 116 deletions
| diff --git a/mail/CHANGELOG b/mail/CHANGELOG index f15482c4..fea58c85 100644 --- a/mail/CHANGELOG +++ b/mail/CHANGELOG @@ -1,3 +1,19 @@ +0.3.8 Dec 6: +  o Fail gracefully when failing to decrypt incoming messages. Closes +    #4589. +  o Fix a bug when adding a message with empty flags. Closes #4496 +  o Allow to iterate in an empty mailbox during fetch. Closes #4603 +  o Add 'signencrypt' preference to OpenPGP header on outgoing +    email. Closes #3878. +  o Add a header to incoming emails that reflects if a valid signature +    was found when decrypting. Closes #4354. +  o Add a footer to outgoing email pointing to the address where +    sender keys can be fetched. Closes #4526. +  o Serialize Soledad Writes for new messages. Fixes segmentation +    fault when sqlcipher was been concurrently accessed from many +    threads. Closes #4606 +  o Set remote mail polling time to 60 seconds. Closes #4499 +  0.3.7 Nov 15:    o Uses deferToThread for sendMail. Closes #3937    o Update pkey to allow multiple accounts. Solves: #4394 diff --git a/mail/changes/VERSION_COMPAT b/mail/changes/VERSION_COMPAT index cc00ecf7..032b26ac 100644 --- a/mail/changes/VERSION_COMPAT +++ b/mail/changes/VERSION_COMPAT @@ -8,3 +8,4 @@  #  # BEGIN DEPENDENCY LIST -------------------------  # leap.foo.bar>=x.y.z + diff --git a/mail/pkg/requirements.pip b/mail/pkg/requirements.pip index 7ed50878..dc0635c3 100644 --- a/mail/pkg/requirements.pip +++ b/mail/pkg/requirements.pip @@ -1,6 +1,6 @@  zope.interface  leap.soledad.client>=0.3.0  leap.common>=0.3.5 -leap.keymanager>=0.3.4 +leap.keymanager>=0.3.7  twisted  # >= 12.0.3 ??  zope.proxy diff --git a/mail/pkg/tools/with_venvwrapper.sh b/mail/pkg/tools/with_venvwrapper.sh new file mode 100755 index 00000000..693c0ac9 --- /dev/null +++ b/mail/pkg/tools/with_venvwrapper.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +#Wraps a command in a virtualenwrapper passed as first argument. +#Example: +#with_virtualenvwrapper.sh leap-bitmask ./run_tests.sh + +wd=`pwd` +alias pyver='python -c "import $1;print $1.__path__[0]; print $1.__version__;"' + +source `which virtualenvwrapper.sh` +echo "Activating virtualenv " $1 +echo "------------------------------------" +workon $1 +cd $wd +echo "running version: " `pyver leap.bitmask` +$2 $3 $4 $5 diff --git a/mail/src/leap/mail/imap/fetch.py b/mail/src/leap/mail/imap/fetch.py index 3422ed50..14f7a9be 100644 --- a/mail/src/leap/mail/imap/fetch.py +++ b/mail/src/leap/mail/imap/fetch.py @@ -22,8 +22,12 @@ import json  import ssl  import threading  import time +import copy +from StringIO import StringIO  from email.parser import Parser +from email.generator import Generator +from email.utils import parseaddr  from twisted.python import log  from twisted.internet.task import LoopingCall @@ -57,7 +61,15 @@ class MalformedMessage(Exception):  class LeapIncomingMail(object):      """ -    Fetches mail from the incoming queue. +    Fetches and process mail from the incoming pool. + +    This object has public methods start_loop and stop that will +    actually initiate a LoopingCall with check_period recurrency. +    The LoopingCall itself will invoke the fetch method each time +    that the check_period expires. + +    This loop will sync the soledad db with the remote server and +    process all the documents found tagged as incoming mail.      """      RECENT_FLAG = "\\Recent" @@ -65,13 +77,23 @@ class LeapIncomingMail(object):      INCOMING_KEY = "incoming"      CONTENT_KEY = "content" +    LEAP_SIGNATURE_HEADER = 'X-Leap-Signature' +    """ +    Header added to messages when they are decrypted by the IMAP fetcher, +    which states the validity of an eventual signature that might be included +    in the encrypted blob. +    """ +    LEAP_SIGNATURE_VALID = 'valid' +    LEAP_SIGNATURE_INVALID = 'invalid' +    LEAP_SIGNATURE_COULD_NOT_VERIFY = 'could not verify' +      fetching_lock = threading.Lock()      def __init__(self, keymanager, soledad, imap_account,                   check_period, userid):          """ -        Initialize LeapIMAP. +        Initialize LeapIncomingMail..          :param keymanager: a keymanager instance          :type keymanager: keymanager.KeyManager @@ -148,6 +170,7 @@ class LeapIncomingMail(object):              logger.warning("Tried to start an already running fetching loop.")      def stop(self): +        # XXX change the name to stop_loop, for consistency.          """          Stops the loop that fetches mail.          """ @@ -171,7 +194,9 @@ class LeapIncomingMail(object):          with self.fetching_lock:              log.msg('syncing soledad...')              self._soledad.sync() +            log.msg('soledad synced.')              doclist = self._soledad.get_from_index("just-mail", "*") +          return doclist      def _signal_unread_to_ui(self): @@ -235,6 +260,8 @@ class LeapIncomingMail(object):          err = failure.value          logger.error("error saving msg locally: %s" % (err,)) +    # process incoming mail. +      def _process_doclist(self, doclist):          """          Iterates through the doclist, checks if each doc @@ -253,25 +280,21 @@ class LeapIncomingMail(object):          docs_cb = []          for index, doc in enumerate(doclist): -            logger.debug("processing doc %d of %d" % (index, num_mails)) +            logger.debug("processing doc %d of %d" % (index + 1, num_mails))              leap_events.signal(                  IMAP_MSG_PROCESSING, str(index), str(num_mails))              keys = doc.content.keys()              if self._is_msg(keys):                  # Ok, this looks like a legit msg.                  # Let's process it! -                encdata = doc.content[ENC_JSON_KEY] -                  # Deferred chain for individual messages -                d = deferToThread(self._decrypt_msg, doc, encdata) -                d.addCallback(self._process_decrypted) + +                # XXX use an IConsumer instead... ? +                d = deferToThread(self._decrypt_doc, doc) +                d.addCallback(self._process_decrypted_doc)                  d.addErrback(self._log_err)                  d.addCallback(self._add_message_locally)                  d.addErrback(self._log_err) -                # XXX check this, add_locally should not get called if we -                # get an error in process -                #d.addCallbacks(self._process_decrypted, self._decryption_error) -                #d.addCallbacks(self._add_message_locally, self._saving_error)                  docs_cb.append(d)              else:                  # Ooops, this does not. @@ -293,34 +316,44 @@ class LeapIncomingMail(object):          """          return ENC_SCHEME_KEY in keys and ENC_JSON_KEY in keys -    def _decrypt_msg(self, doc, encdata): +    def _decrypt_doc(self, doc): +        """ +        Decrypt the contents of a document. + +        :param doc: A document containing an encrypted message. +        :type doc: SoledadDocument + +        :return: A tuple containing the document and the decrypted message. +        :rtype: (SoledadDocument, str) +        """          log.msg('decrypting msg') -        key = self._pkey +        success = False +          try: -            decrdata = (self._keymanager.decrypt( -                encdata, key, -                passphrase=self._soledad.passphrase)) -            ok = True +            decrdata = self._keymanager.decrypt( +                doc.content[ENC_JSON_KEY], +                self._pkey) +            success = True          except Exception as exc:              # XXX move this to errback !!! -            logger.warning("Error while decrypting msg: %r" % (exc,)) +            logger.error("Error while decrypting msg: %r" % (exc,))              decrdata = "" -            ok = False -        leap_events.signal(IMAP_MSG_DECRYPTED, "1" if ok else "0") +        leap_events.signal(IMAP_MSG_DECRYPTED, "1" if success else "0")          return doc, decrdata -    def _process_decrypted(self, msgtuple): +    def _process_decrypted_doc(self, msgtuple):          """ -        Process a successfully decrypted message. +        Process a document containing a succesfully decrypted message.          :param msgtuple: a tuple consisting of a SoledadDocument                           instance containing the incoming message                           and data, the json-encoded, decrypted content of the                           incoming message          :type msgtuple: (SoledadDocument, str) -        :returns: a SoledadDocument and the processed data. +        :return: a SoledadDocument and the processed data.          :rtype: (doc, data)          """ +        log.msg('processing decrypted doc')          doc, data = msgtuple          msg = json.loads(data)          if not isinstance(msg, dict): @@ -332,14 +365,10 @@ class LeapIncomingMail(object):          rawmsg = msg.get(self.CONTENT_KEY, None)          if not rawmsg:              return False -        try: -            data = self._maybe_decrypt_gpg_msg(rawmsg) -            return doc, data -        except keymanager_errors.EncryptionDecryptionFailed as exc: -            logger.error(exc) -            raise +        data = self._maybe_decrypt_msg(rawmsg) +        return doc, data -    def _maybe_decrypt_gpg_msg(self, data): +    def _maybe_decrypt_msg(self, data):          """          Tries to decrypt a gpg message if data looks like one. @@ -348,80 +377,183 @@ class LeapIncomingMail(object):          :return: data, possibly descrypted.          :rtype: str          """ -        # TODO split this method +        log.msg('maybe decrypting doc')          leap_assert_type(data, unicode) +        # parse the original message          parser = Parser()          encoding = get_email_charset(data)          data = data.encode(encoding) -        origmsg = parser.parsestr(data) - -        # handle multipart/encrypted messages -        if origmsg.get_content_type() == 'multipart/encrypted': -            # sanity check -            payload = origmsg.get_payload() -            if len(payload) != 2: -                raise MalformedMessage( -                    'Multipart/encrypted messages should have exactly 2 body ' -                    'parts (instead of %d).' % len(payload)) -            if payload[0].get_content_type() != 'application/pgp-encrypted': -                raise MalformedMessage( -                    "Multipart/encrypted messages' first body part should " -                    "have content type equal to 'application/pgp-encrypted' " -                    "(instead of %s)." % payload[0].get_content_type()) -            if payload[1].get_content_type() != 'application/octet-stream': -                raise MalformedMessage( -                    "Multipart/encrypted messages' second body part should " -                    "have content type equal to 'octet-stream' (instead of " -                    "%s)." % payload[1].get_content_type()) - -            # parse message and get encrypted content -            pgpencmsg = origmsg.get_payload()[1] -            encdata = pgpencmsg.get_payload() - -            # decrypt and parse decrypted message -            decrdata = self._keymanager.decrypt( -                encdata, self._pkey, -                passphrase=self._soledad.passphrase) +        msg = parser.parsestr(data) + +        # try to obtain sender public key +        senderPubkey = None +        fromHeader = msg.get('from', None) +        if fromHeader is not None: +            _, senderAddress = parseaddr(fromHeader)              try: -                decrdata = decrdata.encode(encoding) -            except (UnicodeEncodeError, UnicodeDecodeError) as e: -                logger.error("Unicode error {0}".format(e)) -                decrdata = decrdata.encode(encoding, 'replace') - -            decrmsg = parser.parsestr(decrdata) -            # remove original message's multipart/encrypted content-type -            del(origmsg['content-type']) -            # replace headers back in original message -            for hkey, hval in decrmsg.items(): -                try: -                    # this will raise KeyError if header is not present -                    origmsg.replace_header(hkey, hval) -                except KeyError: -                    origmsg[hkey] = hval - -            # replace payload by unencrypted payload -            origmsg.set_payload(decrmsg.get_payload()) -            return origmsg.as_string(unixfrom=False) +                senderPubkey = self._keymanager.get_key( +                    senderAddress, OpenPGPKey) +            except keymanager_errors.KeyNotFound: +                pass + +        valid_sig = False  # we will add a header saying if sig is valid +        if msg.get_content_type() == 'multipart/encrypted': +            decrmsg, valid_sig = self._decrypt_multipart_encrypted_msg( +                msg, encoding, senderPubkey)          else: -            PGP_BEGIN = "-----BEGIN PGP MESSAGE-----" -            PGP_END = "-----END PGP MESSAGE-----" -            # handle inline PGP messages -            if PGP_BEGIN in data: -                begin = data.find(PGP_BEGIN) -                end = data.rfind(PGP_END) -                pgp_message = data[begin:begin+end] -                decrdata = (self._keymanager.decrypt( -                    pgp_message, self._pkey, -                    passphrase=self._soledad.passphrase)) +            decrmsg, valid_sig = self._maybe_decrypt_inline_encrypted_msg( +                msg, encoding, senderPubkey) + +        # add x-leap-signature header +        if senderPubkey is None: +            decrmsg.add_header( +                self.LEAP_SIGNATURE_HEADER, +                self.LEAP_SIGNATURE_COULD_NOT_VERIFY) +        else: +            decrmsg.add_header( +                self.LEAP_SIGNATURE_HEADER, +                self.LEAP_SIGNATURE_VALID if valid_sig else +                self.LEAP_SIGNATURE_INVALID, +                pubkey=senderPubkey.key_id) + +        return decrmsg.as_string() + +    def _decrypt_multipart_encrypted_msg(self, msg, encoding, senderPubkey): +        """ +        Decrypt a message with content-type 'multipart/encrypted'. + +        :param msg: The original encrypted message. +        :type msg: Message +        :param encoding: The encoding of the email message. +        :type encoding: str +        :param senderPubkey: The key of the sender of the message. +        :type senderPubkey: OpenPGPKey + +        :return: A unitary tuple containing a decrypted message. +        :rtype: (Message) +        """ +        log.msg('decrypting multipart encrypted msg') +        msg = copy.deepcopy(msg) +        # sanity check +        payload = msg.get_payload() +        if len(payload) != 2: +            raise MalformedMessage( +                'Multipart/encrypted messages should have exactly 2 body ' +                'parts (instead of %d).' % len(payload)) +        if payload[0].get_content_type() != 'application/pgp-encrypted': +            raise MalformedMessage( +                "Multipart/encrypted messages' first body part should " +                "have content type equal to 'application/pgp-encrypted' " +                "(instead of %s)." % payload[0].get_content_type()) +        if payload[1].get_content_type() != 'application/octet-stream': +            raise MalformedMessage( +                "Multipart/encrypted messages' second body part should " +                "have content type equal to 'octet-stream' (instead of " +                "%s)." % payload[1].get_content_type()) +        # parse message and get encrypted content +        pgpencmsg = msg.get_payload()[1] +        encdata = pgpencmsg.get_payload() +        # decrypt or fail gracefully +        try: +            decrdata, valid_sig = self._decrypt_and_verify_data( +                encdata, senderPubkey) +        except keymanager_errors.DecryptError as e: +            logger.warning('Failed to decrypt encrypted message (%s). ' +                           'Storing message without modifications.' % str(e)) +            return msg, False  # return original message +        # decrypted successully, now fix encoding and parse +        try: +            decrdata = decrdata.encode(encoding) +        except (UnicodeEncodeError, UnicodeDecodeError) as e: +            logger.error("Unicode error {0}".format(e)) +            decrdata = decrdata.encode(encoding, 'replace') +        parser = Parser() +        decrmsg = parser.parsestr(decrdata) +        # remove original message's multipart/encrypted content-type +        del(msg['content-type']) +        # replace headers back in original message +        for hkey, hval in decrmsg.items(): +            try: +                # this will raise KeyError if header is not present +                msg.replace_header(hkey, hval) +            except KeyError: +                msg[hkey] = hval +        # replace payload by unencrypted payload +        msg.set_payload(decrmsg.get_payload()) +        return msg, valid_sig + +    def _maybe_decrypt_inline_encrypted_msg(self, origmsg, encoding, +                                            senderPubkey): +        """ +        Possibly decrypt an inline OpenPGP encrypted message. + +        :param origmsg: The original, possibly encrypted message. +        :type origmsg: Message +        :param encoding: The encoding of the email message. +        :type encoding: str +        :param senderPubkey: The key of the sender of the message. +        :type senderPubkey: OpenPGPKey + +        :return: A unitary tuple containing a decrypted message. +        :rtype: (Message) +        """ +        log.msg('maybe decrypting inline encrypted msg') +        # serialize the original message +        buf = StringIO() +        g = Generator(buf) +        g.flatten(origmsg) +        data = buf.getvalue() +        # handle exactly one inline PGP message +        PGP_BEGIN = "-----BEGIN PGP MESSAGE-----" +        PGP_END = "-----END PGP MESSAGE-----" +        valid_sig = False +        if PGP_BEGIN in data: +            begin = data.find(PGP_BEGIN) +            end = data.find(PGP_END) +            pgp_message = data[begin:end+len(PGP_END)] +            try: +                decrdata, valid_sig = self._decrypt_and_verify_data( +                    pgp_message, senderPubkey)                  # replace encrypted by decrypted content                  data = data.replace(pgp_message, decrdata) +            except keymanager_errors.DecryptError: +                logger.warning('Failed to decrypt potential inline encrypted ' +                               'message. Storing message as is...')          # if message is not encrypted, return raw data -          if isinstance(data, unicode):              data = data.encode(encoding, 'replace') +        parser = Parser() +        return parser.parsestr(data), valid_sig -        return data +    def _decrypt_and_verify_data(self, data, senderPubkey): +        """ +        Decrypt C{data} using our private key and attempt to verify a +        signature using C{senderPubkey}. + +        :param data: The text to be decrypted. +        :type data: unicode +        :param senderPubkey: The public key of the sender of the message. +        :type senderPubkey: OpenPGPKey + +        :return: The decrypted data and a boolean stating whether the +                 signature could be verified. +        :rtype: (str, bool) + +        :raise DecryptError: Raised if failed to decrypt. +        """ +        log.msg('decrypting and verifying data') +        valid_sig = False +        try: +            decrdata = self._keymanager.decrypt( +                data, self._pkey, +                verify=senderPubkey) +            if senderPubkey is not None: +                valid_sig = True +        except keymanager_errors.InvalidSignature: +            decrdata = self._keymanager.decrypt( +                data, self._pkey) +        return decrdata, valid_sig      def _add_message_locally(self, msgtuple):          """ @@ -434,6 +566,7 @@ class LeapIncomingMail(object):                           incoming message          :type msgtuple: (SoledadDocument, str)          """ +        log.msg('adding message to local db')          doc, data = msgtuple          self._inbox.addMessage(data, (self.RECENT_FLAG,))          leap_events.signal(IMAP_MSG_SAVED_LOCALLY) diff --git a/mail/src/leap/mail/imap/server.py b/mail/src/leap/mail/imap/server.py index bb2830d9..6320a515 100644 --- a/mail/src/leap/mail/imap/server.py +++ b/mail/src/leap/mail/imap/server.py @@ -27,6 +27,7 @@ from collections import defaultdict  from email.parser import Parser  from zope.interface import implements +from zope.proxy import sameProxiedObjects  from twisted.mail import imap4  from twisted.internet import defer @@ -36,6 +37,7 @@ from leap.common import events as leap_events  from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL  from leap.common.check import leap_assert, leap_assert_type  from leap.common.mail import get_email_charset +from leap.mail.messageflow import IMessageConsumer, MessageProducer  from leap.soledad.client import Soledad  logger = logging.getLogger(__name__) @@ -816,6 +818,32 @@ class LeapMessage(WithMsgFields):          return self._doc.content.get(key, None) +class SoledadDocWriter(object): +    """ +    This writer will create docs serially in the local soledad database. +    """ + +    implements(IMessageConsumer) + +    def __init__(self, soledad): +        """ +        Initialize the writer. + +        :param soledad: the soledad instance +        :type soledad: Soledad +        """ +        self._soledad = soledad + +    def consume(self, item): +        """ +        Creates a new document in soledad db. + +        :param item: object to update. content of the document to be inserted. +        :type item: dict +        """ +        self._soledad.create_doc(item) + +  class MessageCollection(WithMsgFields, IndexedDB):      """      A collection of messages, surprisingly. @@ -875,6 +903,16 @@ class MessageCollection(WithMsgFields, IndexedDB):          self.initialize_db()          self._parser = Parser() +        # I think of someone like nietzsche when reading this + +        # this will be the producer that will enqueue the content +        # to be processed serially by the consumer (the writer). We just +        # need to `put` the new material on its plate. + +        self._soledad_writer = MessageProducer( +            SoledadDocWriter(soledad), +            period=0.2) +      def _get_empty_msg(self):          """          Returns an empty message. @@ -947,7 +985,9 @@ class MessageCollection(WithMsgFields, IndexedDB):          # ...should get a sanity check here.          content[self.UID_KEY] = uid -        return self._soledad.create_doc(content) +        self._soledad_writer.put(content) +        # XXX have to decide what shall we do with errors with this change... +        #return self._soledad.create_doc(content)      def remove(self, msg):          """ @@ -1041,7 +1081,11 @@ class MessageCollection(WithMsgFields, IndexedDB):          :return: a list of u1db documents          :rtype: list of SoledadDocument          """ -        # XXX this should return LeapMessage instances +        if sameProxiedObjects(self._soledad, None): +            logger.warning('Tried to get messages but soledad is None!') +            return [] + +        #f XXX this should return LeapMessage instances          all_docs = [doc for doc in self._soledad.get_from_index(              SoledadBackedAccount.TYPE_MBOX_IDX,              self.TYPE_MESSAGE_VAL, self.mbox)] @@ -1438,12 +1482,14 @@ class SoledadMailbox(WithMsgFields):          """          # XXX we should treat the message as an IMessage from here          uid_next = self.getUIDNext() -        flags = tuple(str(flag) for flag in flags) +        if flags is None: +            flags = tuple() +        else: +            flags = tuple(str(flag) for flag in flags)          self.messages.add_msg(message, flags=flags, date=date,                                uid=uid_next) -        # XXX recent should not include deleted...??          exists = len(self.messages)          recent = len(self.messages.get_recent())          for listener in self.listeners: @@ -1512,7 +1558,10 @@ class SoledadMailbox(WithMsgFields):              except TypeError:                  # looks like we cannot iterate                  last = self.messages.get_last() -                uid_last = last.getUID() +                if last is None: +                    uid_last = 1 +                else: +                    uid_last = last.getUID()                  messages.last = uid_last          # for sequence numbers (uid = 0) diff --git a/mail/src/leap/mail/imap/service/imap.py b/mail/src/leap/mail/imap/service/imap.py index feb2593a..8756ddcd 100644 --- a/mail/src/leap/mail/imap/service/imap.py +++ b/mail/src/leap/mail/imap/service/imap.py @@ -41,7 +41,7 @@ IMAP_PORT = 1984  # The period between succesive checks of the incoming mail  # queue (in seconds) -INCOMING_CHECK_PERIOD = 300 +INCOMING_CHECK_PERIOD = 60  from leap.common.events.events_pb2 import IMAP_SERVICE_STARTED  from leap.common.events.events_pb2 import IMAP_SERVICE_FAILED_TO_START diff --git a/mail/src/leap/mail/imap/tests/test_imap.py b/mail/src/leap/mail/imap/tests/test_imap.py index ad11315d..ca73a11c 100644 --- a/mail/src/leap/mail/imap/tests/test_imap.py +++ b/mail/src/leap/mail/imap/tests/test_imap.py @@ -923,7 +923,6 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          """          self.server.theAccount.addMailbox('test-mailbox-e',                                            creation_ts=42) -        #import ipdb; ipdb.set_trace()          self.examinedArgs = None @@ -1108,16 +1107,15 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          mb = SimpleLEAPServer.theAccount.getMailbox('ROOT/SUBTHING')          self.assertEqual(1, len(mb.messages)) -        #import ipdb; ipdb.set_trace()          self.assertEqual(              ['\\SEEN', '\\DELETED'], -            mb.messages[1]['flags']) +            mb.messages[1].content['flags'])          self.assertEqual(              'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)', -            mb.messages[1]['date']) +            mb.messages[1].content['date']) -        self.assertEqual(open(infile).read(), mb.messages[1]['raw']) +        self.assertEqual(open(infile).read(), mb.messages[1].content['raw'])      def testPartialAppend(self):          """ @@ -1152,11 +1150,11 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          self.assertEqual(1, len(mb.messages))          self.assertEqual(              ['\\SEEN',], -            mb.messages[1]['flags'] +            mb.messages[1].content['flags']          )          self.assertEqual( -            'Right now', mb.messages[1]['date']) -        self.assertEqual(open(infile).read(), mb.messages[1]['raw']) +            'Right now', mb.messages[1].content['date']) +        self.assertEqual(open(infile).read(), mb.messages[1].content['raw'])      def testCheck(self):          """ @@ -1214,7 +1212,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):      def _cbTestClose(self, ignored, m):          self.assertEqual(len(m.messages), 1)          self.assertEqual( -            m.messages[1]['subject'], +            m.messages[1].content['subject'],              'Message 2')          self.failUnless(m.closed) @@ -1257,7 +1255,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):      def _cbTestExpunge(self, ignored, m):          self.assertEqual(len(m.messages), 1)          self.assertEqual( -            m.messages[1]['subject'], +            m.messages[1].content['subject'],              'Message 2')          self.assertEqual(self.results, [0, 1])          # XXX fix this thing with the indexes... diff --git a/mail/src/leap/mail/messageflow.py b/mail/src/leap/mail/messageflow.py new file mode 100644 index 00000000..21f6d62a --- /dev/null +++ b/mail/src/leap/mail/messageflow.py @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- +# messageflow.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. +""" +Message Producers and Consumers for flow control. +""" +import Queue + +from twisted.internet.task import LoopingCall + +from zope.interface import Interface, implements + + +class IMessageConsumer(Interface): + +    def consume(self, item): +        """ +        Consumes the passed item. + +        :param item: an object to be consumed. +        :type item: object +        """ +        # TODO we could add an optional type to be passed +        # for doing type check. + +        # TODO in case of errors, we could return the object to +        # the queue, maybe wrapped in an object with a retries attribute. + + +class DummyMsgConsumer(object): + +    implements(IMessageConsumer) + +    def consume(self, item): +        """ +        Just prints the passed item. +        """ +        print "got item %s" % item + + +class MessageProducer(object): +    """ +    A Producer class that we can use to temporarily buffer the production +    of messages so that different objects can consume them. + +    This is useful for serializing the consumption of the messages stream +    in the case of an slow resource (db), or for returning early from a +    deferred chain and leave further processing detached from the calling loop, +    as in the case of smtp. +    """ +    # TODO this can be seen as a first step towards properly implementing +    # components that implement IPushProducer / IConsumer  interfaces. +    # However, I need to think more about how to pause the streaming. +    # In any case, the differential rate between message production +    # and consumption is not likely (?) to consume huge amounts of memory in +    # our current settings, so the need to pause the stream is not urgent now. + +    def __init__(self, consumer, queue=Queue.Queue, period=1): +        """ +        Initializes the MessageProducer + +        :param consumer: an instance of a IMessageConsumer that will consume +                         the new messages. +        :param queue: any queue implementation to be used as the temporary +                      buffer for new items. Default is a FIFO Queue. +        :param period: the period to check for new items, in seconds. +        """ +        # XXX should assert it implements IConsumer / IMailConsumer +        # it should implement a `consume` method +        self._consumer = consumer + +        self._queue = queue() +        self._period = period + +        self._loop = LoopingCall(self._check_for_new) + +    # private methods + +    def _check_for_new(self): +        """ +        Checks for new items in the internal queue, and calls the consume +        method in the consumer. + +        If the queue is found empty, the loop is stopped. It will be started +        again after the addition of new items. +        """ +        # XXX right now I'm assuming that the period is good enough to allow +        # a right pace of processing. but we could also pass the queue object +        # to the consumer and let it choose whether process a new item or not. + +        if self._queue.empty(): +            self.stop() +        else: +            self._consumer.consume(self._queue.get()) + +    # public methods + +    def put(self, item): +        """ +        Puts a new item in the queue. + +        If the queue was empty, we will start the loop again. +        """ +        was_empty = self._queue.empty() + +        # XXX this might raise if the queue does not accept any new +        # items. what to do then? +        self._queue.put(item) +        if was_empty: +            self.start() + +    def start(self): +        """ +        Starts polling for new items. +        """ +        if not self._loop.running: +            self._loop.start(self._period) + +    def stop(self): +        """ +        Stop polling for new items. +        """ +        if self._loop.running: +            self._loop.stop() + + +if __name__ == "__main__": +    from twisted.internet import reactor +    producer = MessageProducer(DummyMsgConsumer()) +    producer.start() + +    for delay, item in ((2, 1), (3, 2), (4, 3), +                        (6, 4), (7, 5), (8, 6), (8.2, 7), +                        (15, 'a'), (16, 'b'), (17, 'c')): +        reactor.callLater(delay, producer.put, item) +    reactor.run() diff --git a/mail/src/leap/mail/smtp/gateway.py b/mail/src/leap/mail/smtp/gateway.py index f09ee141..a78bd55b 100644 --- a/mail/src/leap/mail/smtp/gateway.py +++ b/mail/src/leap/mail/smtp/gateway.py @@ -333,6 +333,8 @@ class EncryptedMessage(object):      """      implements(smtp.IMessage) +    FOOTER_STRING = "I prefer encrypted email" +      def __init__(self, fromAddress, user, keymanager, host, port, cert, key):          """          Initialize the encrypted message. @@ -597,7 +599,16 @@ class EncryptedMessage(object):              self._msg = self._origmsg              return +        # add a nice footer to the outgoing message          from_address = validate_address(self._fromAddress.addrstr) +        username, domain = from_address.split('@') +        self.lines.append('--') +        self.lines.append('%s - https://%s/key/%s.' % +                          (self.FOOTER_STRING, domain, username)) +        self.lines.append('') +        self._origmsg = self.parseMessage() + +        # get sender and recipient data          signkey = self._km.get_key(from_address, OpenPGPKey, private=True)          log.msg("Will sign the message with %s." % signkey.fingerprint)          to_address = validate_address(self._user.dest.addrstr) @@ -672,6 +683,7 @@ class EncryptedMessage(object):          username, domain = signkey.address.split('@')          newmsg.add_header(              'OpenPGP', 'id=%s' % signkey.key_id, -            url='https://%s/openpgp/%s' % (domain, username)) +            url='https://%s/key/%s' % (domain, username), +            preference='signencrypt')          # delete user-agent from origmsg          del(origmsg['user-agent']) diff --git a/mail/src/leap/mail/smtp/tests/__init__.py b/mail/src/leap/mail/smtp/tests/__init__.py index 62b015f0..1459cea6 100644 --- a/mail/src/leap/mail/smtp/tests/__init__.py +++ b/mail/src/leap/mail/smtp/tests/__init__.py @@ -115,8 +115,8 @@ class TestCaseWithKeyManager(BaseLeapTest):              'username': address,              'password': '<password>',              'encrypted_only': True, -            'cert': 'src/leap/mail/smtp/tests/cert/server.crt', -            'key': 'src/leap/mail/smtp/tests/cert/server.key', +            'cert': u'src/leap/mail/smtp/tests/cert/server.crt', +            'key': u'src/leap/mail/smtp/tests/cert/server.key',          }          class Response(object): diff --git a/mail/src/leap/mail/smtp/tests/test_gateway.py b/mail/src/leap/mail/smtp/tests/test_gateway.py index f9ea027c..4c2f04f4 100644 --- a/mail/src/leap/mail/smtp/tests/test_gateway.py +++ b/mail/src/leap/mail/smtp/tests/test_gateway.py @@ -22,7 +22,6 @@ SMTP gateway tests.  import re -  from datetime import datetime  from gnupg._util import _make_binary_stream  from twisted.test import proto_helpers @@ -131,6 +130,9 @@ class TestSmtpGateway(TestCaseWithKeyManager):          for line in self.EMAIL_DATA[4:12]:              m.lineReceived(line)          m.eomReceived() +        # we need to call the following explicitelly because it was deferred +        # inside the previous method +        m._maybe_encrypt_and_sign()          # assert structure of encrypted message          self.assertTrue('Content-Type' in m._msg)          self.assertEqual('multipart/encrypted', m._msg.get_content_type()) @@ -146,7 +148,8 @@ class TestSmtpGateway(TestCaseWithKeyManager):          decrypted = self._km.decrypt(              m._msg.get_payload(1).get_payload(), privkey)          self.assertEqual( -            '\n' + '\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n', +            '\n' + '\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n\r\n--\r\n' + +            'I prefer encrypted email - https://leap.se/key/anotheruser.\r\n',              decrypted,              'Decrypted text differs from plaintext.') @@ -168,6 +171,9 @@ class TestSmtpGateway(TestCaseWithKeyManager):              m.lineReceived(line)          # trigger encryption and signing          m.eomReceived() +        # we need to call the following explicitelly because it was deferred +        # inside the previous method +        m._maybe_encrypt_and_sign()          # assert structure of encrypted message          self.assertTrue('Content-Type' in m._msg)          self.assertEqual('multipart/encrypted', m._msg.get_content_type()) @@ -185,7 +191,8 @@ class TestSmtpGateway(TestCaseWithKeyManager):          decrypted = self._km.decrypt(              m._msg.get_payload(1).get_payload(), privkey, verify=pubkey)          self.assertEqual( -            '\n' + '\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n', +            '\n' + '\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n\r\n--\r\n' + +            'I prefer encrypted email - https://leap.se/key/anotheruser.\r\n',              decrypted,              'Decrypted text differs from plaintext.') @@ -208,6 +215,9 @@ class TestSmtpGateway(TestCaseWithKeyManager):              m.lineReceived(line)          # trigger signing          m.eomReceived() +        # we need to call the following explicitelly because it was deferred +        # inside the previous method +        m._maybe_encrypt_and_sign()          # assert structure of signed message          self.assertTrue('Content-Type' in m._msg)          self.assertEqual('multipart/signed', m._msg.get_content_type()) @@ -216,8 +226,9 @@ class TestSmtpGateway(TestCaseWithKeyManager):          self.assertEqual('pgp-sha512', m._msg.get_param('micalg'))          # assert content of message          self.assertEqual( -            m._msg.get_payload(0).get_payload(decode=True), -            '\r\n'.join(self.EMAIL_DATA[9:13])) +            '\r\n'.join(self.EMAIL_DATA[9:13])+'\r\n--\r\n' + +            'I prefer encrypted email - https://leap.se/key/anotheruser.\r\n', +            m._msg.get_payload(0).get_payload(decode=True))          # assert content of signature          self.assertTrue(              m._msg.get_payload(1).get_payload().startswith( | 
