summaryrefslogtreecommitdiff
path: root/src/leap
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap')
-rw-r--r--src/leap/mail/imap/fetch.py315
-rw-r--r--src/leap/mail/imap/server.py59
-rw-r--r--src/leap/mail/imap/service/imap.py2
-rw-r--r--src/leap/mail/imap/tests/test_imap.py18
-rw-r--r--src/leap/mail/messageflow.py149
-rw-r--r--src/leap/mail/smtp/gateway.py14
-rw-r--r--src/leap/mail/smtp/tests/__init__.py4
-rw-r--r--src/leap/mail/smtp/tests/test_gateway.py21
8 files changed, 467 insertions, 115 deletions
diff --git a/src/leap/mail/imap/fetch.py b/src/leap/mail/imap/fetch.py
index 3422ed5..14f7a9b 100644
--- a/src/leap/mail/imap/fetch.py
+++ b/src/leap/mail/imap/fetch.py
@@ -22,8 +22,12 @@ import json
import ssl
import threading
import time
+import copy
+from StringIO import StringIO
from email.parser import Parser
+from email.generator import Generator
+from email.utils import parseaddr
from twisted.python import log
from twisted.internet.task import LoopingCall
@@ -57,7 +61,15 @@ class MalformedMessage(Exception):
class LeapIncomingMail(object):
"""
- Fetches mail from the incoming queue.
+ Fetches and process mail from the incoming pool.
+
+ This object has public methods start_loop and stop that will
+ actually initiate a LoopingCall with check_period recurrency.
+ The LoopingCall itself will invoke the fetch method each time
+ that the check_period expires.
+
+ This loop will sync the soledad db with the remote server and
+ process all the documents found tagged as incoming mail.
"""
RECENT_FLAG = "\\Recent"
@@ -65,13 +77,23 @@ class LeapIncomingMail(object):
INCOMING_KEY = "incoming"
CONTENT_KEY = "content"
+ LEAP_SIGNATURE_HEADER = 'X-Leap-Signature'
+ """
+ Header added to messages when they are decrypted by the IMAP fetcher,
+ which states the validity of an eventual signature that might be included
+ in the encrypted blob.
+ """
+ LEAP_SIGNATURE_VALID = 'valid'
+ LEAP_SIGNATURE_INVALID = 'invalid'
+ LEAP_SIGNATURE_COULD_NOT_VERIFY = 'could not verify'
+
fetching_lock = threading.Lock()
def __init__(self, keymanager, soledad, imap_account,
check_period, userid):
"""
- Initialize LeapIMAP.
+ Initialize LeapIncomingMail..
:param keymanager: a keymanager instance
:type keymanager: keymanager.KeyManager
@@ -148,6 +170,7 @@ class LeapIncomingMail(object):
logger.warning("Tried to start an already running fetching loop.")
def stop(self):
+ # XXX change the name to stop_loop, for consistency.
"""
Stops the loop that fetches mail.
"""
@@ -171,7 +194,9 @@ class LeapIncomingMail(object):
with self.fetching_lock:
log.msg('syncing soledad...')
self._soledad.sync()
+ log.msg('soledad synced.')
doclist = self._soledad.get_from_index("just-mail", "*")
+
return doclist
def _signal_unread_to_ui(self):
@@ -235,6 +260,8 @@ class LeapIncomingMail(object):
err = failure.value
logger.error("error saving msg locally: %s" % (err,))
+ # process incoming mail.
+
def _process_doclist(self, doclist):
"""
Iterates through the doclist, checks if each doc
@@ -253,25 +280,21 @@ class LeapIncomingMail(object):
docs_cb = []
for index, doc in enumerate(doclist):
- logger.debug("processing doc %d of %d" % (index, num_mails))
+ logger.debug("processing doc %d of %d" % (index + 1, num_mails))
leap_events.signal(
IMAP_MSG_PROCESSING, str(index), str(num_mails))
keys = doc.content.keys()
if self._is_msg(keys):
# Ok, this looks like a legit msg.
# Let's process it!
- encdata = doc.content[ENC_JSON_KEY]
-
# Deferred chain for individual messages
- d = deferToThread(self._decrypt_msg, doc, encdata)
- d.addCallback(self._process_decrypted)
+
+ # XXX use an IConsumer instead... ?
+ d = deferToThread(self._decrypt_doc, doc)
+ d.addCallback(self._process_decrypted_doc)
d.addErrback(self._log_err)
d.addCallback(self._add_message_locally)
d.addErrback(self._log_err)
- # XXX check this, add_locally should not get called if we
- # get an error in process
- #d.addCallbacks(self._process_decrypted, self._decryption_error)
- #d.addCallbacks(self._add_message_locally, self._saving_error)
docs_cb.append(d)
else:
# Ooops, this does not.
@@ -293,34 +316,44 @@ class LeapIncomingMail(object):
"""
return ENC_SCHEME_KEY in keys and ENC_JSON_KEY in keys
- def _decrypt_msg(self, doc, encdata):
+ def _decrypt_doc(self, doc):
+ """
+ Decrypt the contents of a document.
+
+ :param doc: A document containing an encrypted message.
+ :type doc: SoledadDocument
+
+ :return: A tuple containing the document and the decrypted message.
+ :rtype: (SoledadDocument, str)
+ """
log.msg('decrypting msg')
- key = self._pkey
+ success = False
+
try:
- decrdata = (self._keymanager.decrypt(
- encdata, key,
- passphrase=self._soledad.passphrase))
- ok = True
+ decrdata = self._keymanager.decrypt(
+ doc.content[ENC_JSON_KEY],
+ self._pkey)
+ success = True
except Exception as exc:
# XXX move this to errback !!!
- logger.warning("Error while decrypting msg: %r" % (exc,))
+ logger.error("Error while decrypting msg: %r" % (exc,))
decrdata = ""
- ok = False
- leap_events.signal(IMAP_MSG_DECRYPTED, "1" if ok else "0")
+ leap_events.signal(IMAP_MSG_DECRYPTED, "1" if success else "0")
return doc, decrdata
- def _process_decrypted(self, msgtuple):
+ def _process_decrypted_doc(self, msgtuple):
"""
- Process a successfully decrypted message.
+ Process a document containing a succesfully decrypted message.
:param msgtuple: a tuple consisting of a SoledadDocument
instance containing the incoming message
and data, the json-encoded, decrypted content of the
incoming message
:type msgtuple: (SoledadDocument, str)
- :returns: a SoledadDocument and the processed data.
+ :return: a SoledadDocument and the processed data.
:rtype: (doc, data)
"""
+ log.msg('processing decrypted doc')
doc, data = msgtuple
msg = json.loads(data)
if not isinstance(msg, dict):
@@ -332,14 +365,10 @@ class LeapIncomingMail(object):
rawmsg = msg.get(self.CONTENT_KEY, None)
if not rawmsg:
return False
- try:
- data = self._maybe_decrypt_gpg_msg(rawmsg)
- return doc, data
- except keymanager_errors.EncryptionDecryptionFailed as exc:
- logger.error(exc)
- raise
+ data = self._maybe_decrypt_msg(rawmsg)
+ return doc, data
- def _maybe_decrypt_gpg_msg(self, data):
+ def _maybe_decrypt_msg(self, data):
"""
Tries to decrypt a gpg message if data looks like one.
@@ -348,80 +377,183 @@ class LeapIncomingMail(object):
:return: data, possibly descrypted.
:rtype: str
"""
- # TODO split this method
+ log.msg('maybe decrypting doc')
leap_assert_type(data, unicode)
+ # parse the original message
parser = Parser()
encoding = get_email_charset(data)
data = data.encode(encoding)
- origmsg = parser.parsestr(data)
-
- # handle multipart/encrypted messages
- if origmsg.get_content_type() == 'multipart/encrypted':
- # sanity check
- payload = origmsg.get_payload()
- if len(payload) != 2:
- raise MalformedMessage(
- 'Multipart/encrypted messages should have exactly 2 body '
- 'parts (instead of %d).' % len(payload))
- if payload[0].get_content_type() != 'application/pgp-encrypted':
- raise MalformedMessage(
- "Multipart/encrypted messages' first body part should "
- "have content type equal to 'application/pgp-encrypted' "
- "(instead of %s)." % payload[0].get_content_type())
- if payload[1].get_content_type() != 'application/octet-stream':
- raise MalformedMessage(
- "Multipart/encrypted messages' second body part should "
- "have content type equal to 'octet-stream' (instead of "
- "%s)." % payload[1].get_content_type())
-
- # parse message and get encrypted content
- pgpencmsg = origmsg.get_payload()[1]
- encdata = pgpencmsg.get_payload()
-
- # decrypt and parse decrypted message
- decrdata = self._keymanager.decrypt(
- encdata, self._pkey,
- passphrase=self._soledad.passphrase)
+ msg = parser.parsestr(data)
+
+ # try to obtain sender public key
+ senderPubkey = None
+ fromHeader = msg.get('from', None)
+ if fromHeader is not None:
+ _, senderAddress = parseaddr(fromHeader)
try:
- decrdata = decrdata.encode(encoding)
- except (UnicodeEncodeError, UnicodeDecodeError) as e:
- logger.error("Unicode error {0}".format(e))
- decrdata = decrdata.encode(encoding, 'replace')
-
- decrmsg = parser.parsestr(decrdata)
- # remove original message's multipart/encrypted content-type
- del(origmsg['content-type'])
- # replace headers back in original message
- for hkey, hval in decrmsg.items():
- try:
- # this will raise KeyError if header is not present
- origmsg.replace_header(hkey, hval)
- except KeyError:
- origmsg[hkey] = hval
-
- # replace payload by unencrypted payload
- origmsg.set_payload(decrmsg.get_payload())
- return origmsg.as_string(unixfrom=False)
+ senderPubkey = self._keymanager.get_key(
+ senderAddress, OpenPGPKey)
+ except keymanager_errors.KeyNotFound:
+ pass
+
+ valid_sig = False # we will add a header saying if sig is valid
+ if msg.get_content_type() == 'multipart/encrypted':
+ decrmsg, valid_sig = self._decrypt_multipart_encrypted_msg(
+ msg, encoding, senderPubkey)
else:
- PGP_BEGIN = "-----BEGIN PGP MESSAGE-----"
- PGP_END = "-----END PGP MESSAGE-----"
- # handle inline PGP messages
- if PGP_BEGIN in data:
- begin = data.find(PGP_BEGIN)
- end = data.rfind(PGP_END)
- pgp_message = data[begin:begin+end]
- decrdata = (self._keymanager.decrypt(
- pgp_message, self._pkey,
- passphrase=self._soledad.passphrase))
+ decrmsg, valid_sig = self._maybe_decrypt_inline_encrypted_msg(
+ msg, encoding, senderPubkey)
+
+ # add x-leap-signature header
+ if senderPubkey is None:
+ decrmsg.add_header(
+ self.LEAP_SIGNATURE_HEADER,
+ self.LEAP_SIGNATURE_COULD_NOT_VERIFY)
+ else:
+ decrmsg.add_header(
+ self.LEAP_SIGNATURE_HEADER,
+ self.LEAP_SIGNATURE_VALID if valid_sig else
+ self.LEAP_SIGNATURE_INVALID,
+ pubkey=senderPubkey.key_id)
+
+ return decrmsg.as_string()
+
+ def _decrypt_multipart_encrypted_msg(self, msg, encoding, senderPubkey):
+ """
+ Decrypt a message with content-type 'multipart/encrypted'.
+
+ :param msg: The original encrypted message.
+ :type msg: Message
+ :param encoding: The encoding of the email message.
+ :type encoding: str
+ :param senderPubkey: The key of the sender of the message.
+ :type senderPubkey: OpenPGPKey
+
+ :return: A unitary tuple containing a decrypted message.
+ :rtype: (Message)
+ """
+ log.msg('decrypting multipart encrypted msg')
+ msg = copy.deepcopy(msg)
+ # sanity check
+ payload = msg.get_payload()
+ if len(payload) != 2:
+ raise MalformedMessage(
+ 'Multipart/encrypted messages should have exactly 2 body '
+ 'parts (instead of %d).' % len(payload))
+ if payload[0].get_content_type() != 'application/pgp-encrypted':
+ raise MalformedMessage(
+ "Multipart/encrypted messages' first body part should "
+ "have content type equal to 'application/pgp-encrypted' "
+ "(instead of %s)." % payload[0].get_content_type())
+ if payload[1].get_content_type() != 'application/octet-stream':
+ raise MalformedMessage(
+ "Multipart/encrypted messages' second body part should "
+ "have content type equal to 'octet-stream' (instead of "
+ "%s)." % payload[1].get_content_type())
+ # parse message and get encrypted content
+ pgpencmsg = msg.get_payload()[1]
+ encdata = pgpencmsg.get_payload()
+ # decrypt or fail gracefully
+ try:
+ decrdata, valid_sig = self._decrypt_and_verify_data(
+ encdata, senderPubkey)
+ except keymanager_errors.DecryptError as e:
+ logger.warning('Failed to decrypt encrypted message (%s). '
+ 'Storing message without modifications.' % str(e))
+ return msg, False # return original message
+ # decrypted successully, now fix encoding and parse
+ try:
+ decrdata = decrdata.encode(encoding)
+ except (UnicodeEncodeError, UnicodeDecodeError) as e:
+ logger.error("Unicode error {0}".format(e))
+ decrdata = decrdata.encode(encoding, 'replace')
+ parser = Parser()
+ decrmsg = parser.parsestr(decrdata)
+ # remove original message's multipart/encrypted content-type
+ del(msg['content-type'])
+ # replace headers back in original message
+ for hkey, hval in decrmsg.items():
+ try:
+ # this will raise KeyError if header is not present
+ msg.replace_header(hkey, hval)
+ except KeyError:
+ msg[hkey] = hval
+ # replace payload by unencrypted payload
+ msg.set_payload(decrmsg.get_payload())
+ return msg, valid_sig
+
+ def _maybe_decrypt_inline_encrypted_msg(self, origmsg, encoding,
+ senderPubkey):
+ """
+ Possibly decrypt an inline OpenPGP encrypted message.
+
+ :param origmsg: The original, possibly encrypted message.
+ :type origmsg: Message
+ :param encoding: The encoding of the email message.
+ :type encoding: str
+ :param senderPubkey: The key of the sender of the message.
+ :type senderPubkey: OpenPGPKey
+
+ :return: A unitary tuple containing a decrypted message.
+ :rtype: (Message)
+ """
+ log.msg('maybe decrypting inline encrypted msg')
+ # serialize the original message
+ buf = StringIO()
+ g = Generator(buf)
+ g.flatten(origmsg)
+ data = buf.getvalue()
+ # handle exactly one inline PGP message
+ PGP_BEGIN = "-----BEGIN PGP MESSAGE-----"
+ PGP_END = "-----END PGP MESSAGE-----"
+ valid_sig = False
+ if PGP_BEGIN in data:
+ begin = data.find(PGP_BEGIN)
+ end = data.find(PGP_END)
+ pgp_message = data[begin:end+len(PGP_END)]
+ try:
+ decrdata, valid_sig = self._decrypt_and_verify_data(
+ pgp_message, senderPubkey)
# replace encrypted by decrypted content
data = data.replace(pgp_message, decrdata)
+ except keymanager_errors.DecryptError:
+ logger.warning('Failed to decrypt potential inline encrypted '
+ 'message. Storing message as is...')
# if message is not encrypted, return raw data
-
if isinstance(data, unicode):
data = data.encode(encoding, 'replace')
+ parser = Parser()
+ return parser.parsestr(data), valid_sig
- return data
+ def _decrypt_and_verify_data(self, data, senderPubkey):
+ """
+ Decrypt C{data} using our private key and attempt to verify a
+ signature using C{senderPubkey}.
+
+ :param data: The text to be decrypted.
+ :type data: unicode
+ :param senderPubkey: The public key of the sender of the message.
+ :type senderPubkey: OpenPGPKey
+
+ :return: The decrypted data and a boolean stating whether the
+ signature could be verified.
+ :rtype: (str, bool)
+
+ :raise DecryptError: Raised if failed to decrypt.
+ """
+ log.msg('decrypting and verifying data')
+ valid_sig = False
+ try:
+ decrdata = self._keymanager.decrypt(
+ data, self._pkey,
+ verify=senderPubkey)
+ if senderPubkey is not None:
+ valid_sig = True
+ except keymanager_errors.InvalidSignature:
+ decrdata = self._keymanager.decrypt(
+ data, self._pkey)
+ return decrdata, valid_sig
def _add_message_locally(self, msgtuple):
"""
@@ -434,6 +566,7 @@ class LeapIncomingMail(object):
incoming message
:type msgtuple: (SoledadDocument, str)
"""
+ log.msg('adding message to local db')
doc, data = msgtuple
self._inbox.addMessage(data, (self.RECENT_FLAG,))
leap_events.signal(IMAP_MSG_SAVED_LOCALLY)
diff --git a/src/leap/mail/imap/server.py b/src/leap/mail/imap/server.py
index bb2830d..6320a51 100644
--- a/src/leap/mail/imap/server.py
+++ b/src/leap/mail/imap/server.py
@@ -27,6 +27,7 @@ from collections import defaultdict
from email.parser import Parser
from zope.interface import implements
+from zope.proxy import sameProxiedObjects
from twisted.mail import imap4
from twisted.internet import defer
@@ -36,6 +37,7 @@ from leap.common import events as leap_events
from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL
from leap.common.check import leap_assert, leap_assert_type
from leap.common.mail import get_email_charset
+from leap.mail.messageflow import IMessageConsumer, MessageProducer
from leap.soledad.client import Soledad
logger = logging.getLogger(__name__)
@@ -816,6 +818,32 @@ class LeapMessage(WithMsgFields):
return self._doc.content.get(key, None)
+class SoledadDocWriter(object):
+ """
+ This writer will create docs serially in the local soledad database.
+ """
+
+ implements(IMessageConsumer)
+
+ def __init__(self, soledad):
+ """
+ Initialize the writer.
+
+ :param soledad: the soledad instance
+ :type soledad: Soledad
+ """
+ self._soledad = soledad
+
+ def consume(self, item):
+ """
+ Creates a new document in soledad db.
+
+ :param item: object to update. content of the document to be inserted.
+ :type item: dict
+ """
+ self._soledad.create_doc(item)
+
+
class MessageCollection(WithMsgFields, IndexedDB):
"""
A collection of messages, surprisingly.
@@ -875,6 +903,16 @@ class MessageCollection(WithMsgFields, IndexedDB):
self.initialize_db()
self._parser = Parser()
+ # I think of someone like nietzsche when reading this
+
+ # this will be the producer that will enqueue the content
+ # to be processed serially by the consumer (the writer). We just
+ # need to `put` the new material on its plate.
+
+ self._soledad_writer = MessageProducer(
+ SoledadDocWriter(soledad),
+ period=0.2)
+
def _get_empty_msg(self):
"""
Returns an empty message.
@@ -947,7 +985,9 @@ class MessageCollection(WithMsgFields, IndexedDB):
# ...should get a sanity check here.
content[self.UID_KEY] = uid
- return self._soledad.create_doc(content)
+ self._soledad_writer.put(content)
+ # XXX have to decide what shall we do with errors with this change...
+ #return self._soledad.create_doc(content)
def remove(self, msg):
"""
@@ -1041,7 +1081,11 @@ class MessageCollection(WithMsgFields, IndexedDB):
:return: a list of u1db documents
:rtype: list of SoledadDocument
"""
- # XXX this should return LeapMessage instances
+ if sameProxiedObjects(self._soledad, None):
+ logger.warning('Tried to get messages but soledad is None!')
+ return []
+
+ #f XXX this should return LeapMessage instances
all_docs = [doc for doc in self._soledad.get_from_index(
SoledadBackedAccount.TYPE_MBOX_IDX,
self.TYPE_MESSAGE_VAL, self.mbox)]
@@ -1438,12 +1482,14 @@ class SoledadMailbox(WithMsgFields):
"""
# XXX we should treat the message as an IMessage from here
uid_next = self.getUIDNext()
- flags = tuple(str(flag) for flag in flags)
+ if flags is None:
+ flags = tuple()
+ else:
+ flags = tuple(str(flag) for flag in flags)
self.messages.add_msg(message, flags=flags, date=date,
uid=uid_next)
- # XXX recent should not include deleted...??
exists = len(self.messages)
recent = len(self.messages.get_recent())
for listener in self.listeners:
@@ -1512,7 +1558,10 @@ class SoledadMailbox(WithMsgFields):
except TypeError:
# looks like we cannot iterate
last = self.messages.get_last()
- uid_last = last.getUID()
+ if last is None:
+ uid_last = 1
+ else:
+ uid_last = last.getUID()
messages.last = uid_last
# for sequence numbers (uid = 0)
diff --git a/src/leap/mail/imap/service/imap.py b/src/leap/mail/imap/service/imap.py
index feb2593..8756ddc 100644
--- a/src/leap/mail/imap/service/imap.py
+++ b/src/leap/mail/imap/service/imap.py
@@ -41,7 +41,7 @@ IMAP_PORT = 1984
# The period between succesive checks of the incoming mail
# queue (in seconds)
-INCOMING_CHECK_PERIOD = 300
+INCOMING_CHECK_PERIOD = 60
from leap.common.events.events_pb2 import IMAP_SERVICE_STARTED
from leap.common.events.events_pb2 import IMAP_SERVICE_FAILED_TO_START
diff --git a/src/leap/mail/imap/tests/test_imap.py b/src/leap/mail/imap/tests/test_imap.py
index ad11315..ca73a11 100644
--- a/src/leap/mail/imap/tests/test_imap.py
+++ b/src/leap/mail/imap/tests/test_imap.py
@@ -923,7 +923,6 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
"""
self.server.theAccount.addMailbox('test-mailbox-e',
creation_ts=42)
- #import ipdb; ipdb.set_trace()
self.examinedArgs = None
@@ -1108,16 +1107,15 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
mb = SimpleLEAPServer.theAccount.getMailbox('ROOT/SUBTHING')
self.assertEqual(1, len(mb.messages))
- #import ipdb; ipdb.set_trace()
self.assertEqual(
['\\SEEN', '\\DELETED'],
- mb.messages[1]['flags'])
+ mb.messages[1].content['flags'])
self.assertEqual(
'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)',
- mb.messages[1]['date'])
+ mb.messages[1].content['date'])
- self.assertEqual(open(infile).read(), mb.messages[1]['raw'])
+ self.assertEqual(open(infile).read(), mb.messages[1].content['raw'])
def testPartialAppend(self):
"""
@@ -1152,11 +1150,11 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
self.assertEqual(1, len(mb.messages))
self.assertEqual(
['\\SEEN',],
- mb.messages[1]['flags']
+ mb.messages[1].content['flags']
)
self.assertEqual(
- 'Right now', mb.messages[1]['date'])
- self.assertEqual(open(infile).read(), mb.messages[1]['raw'])
+ 'Right now', mb.messages[1].content['date'])
+ self.assertEqual(open(infile).read(), mb.messages[1].content['raw'])
def testCheck(self):
"""
@@ -1214,7 +1212,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
def _cbTestClose(self, ignored, m):
self.assertEqual(len(m.messages), 1)
self.assertEqual(
- m.messages[1]['subject'],
+ m.messages[1].content['subject'],
'Message 2')
self.failUnless(m.closed)
@@ -1257,7 +1255,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
def _cbTestExpunge(self, ignored, m):
self.assertEqual(len(m.messages), 1)
self.assertEqual(
- m.messages[1]['subject'],
+ m.messages[1].content['subject'],
'Message 2')
self.assertEqual(self.results, [0, 1])
# XXX fix this thing with the indexes...
diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py
new file mode 100644
index 0000000..21f6d62
--- /dev/null
+++ b/src/leap/mail/messageflow.py
@@ -0,0 +1,149 @@
+# -*- coding: utf-8 -*-
+# messageflow.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Message Producers and Consumers for flow control.
+"""
+import Queue
+
+from twisted.internet.task import LoopingCall
+
+from zope.interface import Interface, implements
+
+
+class IMessageConsumer(Interface):
+
+ def consume(self, item):
+ """
+ Consumes the passed item.
+
+ :param item: an object to be consumed.
+ :type item: object
+ """
+ # TODO we could add an optional type to be passed
+ # for doing type check.
+
+ # TODO in case of errors, we could return the object to
+ # the queue, maybe wrapped in an object with a retries attribute.
+
+
+class DummyMsgConsumer(object):
+
+ implements(IMessageConsumer)
+
+ def consume(self, item):
+ """
+ Just prints the passed item.
+ """
+ print "got item %s" % item
+
+
+class MessageProducer(object):
+ """
+ A Producer class that we can use to temporarily buffer the production
+ of messages so that different objects can consume them.
+
+ This is useful for serializing the consumption of the messages stream
+ in the case of an slow resource (db), or for returning early from a
+ deferred chain and leave further processing detached from the calling loop,
+ as in the case of smtp.
+ """
+ # TODO this can be seen as a first step towards properly implementing
+ # components that implement IPushProducer / IConsumer interfaces.
+ # However, I need to think more about how to pause the streaming.
+ # In any case, the differential rate between message production
+ # and consumption is not likely (?) to consume huge amounts of memory in
+ # our current settings, so the need to pause the stream is not urgent now.
+
+ def __init__(self, consumer, queue=Queue.Queue, period=1):
+ """
+ Initializes the MessageProducer
+
+ :param consumer: an instance of a IMessageConsumer that will consume
+ the new messages.
+ :param queue: any queue implementation to be used as the temporary
+ buffer for new items. Default is a FIFO Queue.
+ :param period: the period to check for new items, in seconds.
+ """
+ # XXX should assert it implements IConsumer / IMailConsumer
+ # it should implement a `consume` method
+ self._consumer = consumer
+
+ self._queue = queue()
+ self._period = period
+
+ self._loop = LoopingCall(self._check_for_new)
+
+ # private methods
+
+ def _check_for_new(self):
+ """
+ Checks for new items in the internal queue, and calls the consume
+ method in the consumer.
+
+ If the queue is found empty, the loop is stopped. It will be started
+ again after the addition of new items.
+ """
+ # XXX right now I'm assuming that the period is good enough to allow
+ # a right pace of processing. but we could also pass the queue object
+ # to the consumer and let it choose whether process a new item or not.
+
+ if self._queue.empty():
+ self.stop()
+ else:
+ self._consumer.consume(self._queue.get())
+
+ # public methods
+
+ def put(self, item):
+ """
+ Puts a new item in the queue.
+
+ If the queue was empty, we will start the loop again.
+ """
+ was_empty = self._queue.empty()
+
+ # XXX this might raise if the queue does not accept any new
+ # items. what to do then?
+ self._queue.put(item)
+ if was_empty:
+ self.start()
+
+ def start(self):
+ """
+ Starts polling for new items.
+ """
+ if not self._loop.running:
+ self._loop.start(self._period)
+
+ def stop(self):
+ """
+ Stop polling for new items.
+ """
+ if self._loop.running:
+ self._loop.stop()
+
+
+if __name__ == "__main__":
+ from twisted.internet import reactor
+ producer = MessageProducer(DummyMsgConsumer())
+ producer.start()
+
+ for delay, item in ((2, 1), (3, 2), (4, 3),
+ (6, 4), (7, 5), (8, 6), (8.2, 7),
+ (15, 'a'), (16, 'b'), (17, 'c')):
+ reactor.callLater(delay, producer.put, item)
+ reactor.run()
diff --git a/src/leap/mail/smtp/gateway.py b/src/leap/mail/smtp/gateway.py
index f09ee14..a78bd55 100644
--- a/src/leap/mail/smtp/gateway.py
+++ b/src/leap/mail/smtp/gateway.py
@@ -333,6 +333,8 @@ class EncryptedMessage(object):
"""
implements(smtp.IMessage)
+ FOOTER_STRING = "I prefer encrypted email"
+
def __init__(self, fromAddress, user, keymanager, host, port, cert, key):
"""
Initialize the encrypted message.
@@ -597,7 +599,16 @@ class EncryptedMessage(object):
self._msg = self._origmsg
return
+ # add a nice footer to the outgoing message
from_address = validate_address(self._fromAddress.addrstr)
+ username, domain = from_address.split('@')
+ self.lines.append('--')
+ self.lines.append('%s - https://%s/key/%s.' %
+ (self.FOOTER_STRING, domain, username))
+ self.lines.append('')
+ self._origmsg = self.parseMessage()
+
+ # get sender and recipient data
signkey = self._km.get_key(from_address, OpenPGPKey, private=True)
log.msg("Will sign the message with %s." % signkey.fingerprint)
to_address = validate_address(self._user.dest.addrstr)
@@ -672,6 +683,7 @@ class EncryptedMessage(object):
username, domain = signkey.address.split('@')
newmsg.add_header(
'OpenPGP', 'id=%s' % signkey.key_id,
- url='https://%s/openpgp/%s' % (domain, username))
+ url='https://%s/key/%s' % (domain, username),
+ preference='signencrypt')
# delete user-agent from origmsg
del(origmsg['user-agent'])
diff --git a/src/leap/mail/smtp/tests/__init__.py b/src/leap/mail/smtp/tests/__init__.py
index 62b015f..1459cea 100644
--- a/src/leap/mail/smtp/tests/__init__.py
+++ b/src/leap/mail/smtp/tests/__init__.py
@@ -115,8 +115,8 @@ class TestCaseWithKeyManager(BaseLeapTest):
'username': address,
'password': '<password>',
'encrypted_only': True,
- 'cert': 'src/leap/mail/smtp/tests/cert/server.crt',
- 'key': 'src/leap/mail/smtp/tests/cert/server.key',
+ 'cert': u'src/leap/mail/smtp/tests/cert/server.crt',
+ 'key': u'src/leap/mail/smtp/tests/cert/server.key',
}
class Response(object):
diff --git a/src/leap/mail/smtp/tests/test_gateway.py b/src/leap/mail/smtp/tests/test_gateway.py
index f9ea027..4c2f04f 100644
--- a/src/leap/mail/smtp/tests/test_gateway.py
+++ b/src/leap/mail/smtp/tests/test_gateway.py
@@ -22,7 +22,6 @@ SMTP gateway tests.
import re
-
from datetime import datetime
from gnupg._util import _make_binary_stream
from twisted.test import proto_helpers
@@ -131,6 +130,9 @@ class TestSmtpGateway(TestCaseWithKeyManager):
for line in self.EMAIL_DATA[4:12]:
m.lineReceived(line)
m.eomReceived()
+ # we need to call the following explicitelly because it was deferred
+ # inside the previous method
+ m._maybe_encrypt_and_sign()
# assert structure of encrypted message
self.assertTrue('Content-Type' in m._msg)
self.assertEqual('multipart/encrypted', m._msg.get_content_type())
@@ -146,7 +148,8 @@ class TestSmtpGateway(TestCaseWithKeyManager):
decrypted = self._km.decrypt(
m._msg.get_payload(1).get_payload(), privkey)
self.assertEqual(
- '\n' + '\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n',
+ '\n' + '\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n\r\n--\r\n' +
+ 'I prefer encrypted email - https://leap.se/key/anotheruser.\r\n',
decrypted,
'Decrypted text differs from plaintext.')
@@ -168,6 +171,9 @@ class TestSmtpGateway(TestCaseWithKeyManager):
m.lineReceived(line)
# trigger encryption and signing
m.eomReceived()
+ # we need to call the following explicitelly because it was deferred
+ # inside the previous method
+ m._maybe_encrypt_and_sign()
# assert structure of encrypted message
self.assertTrue('Content-Type' in m._msg)
self.assertEqual('multipart/encrypted', m._msg.get_content_type())
@@ -185,7 +191,8 @@ class TestSmtpGateway(TestCaseWithKeyManager):
decrypted = self._km.decrypt(
m._msg.get_payload(1).get_payload(), privkey, verify=pubkey)
self.assertEqual(
- '\n' + '\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n',
+ '\n' + '\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n\r\n--\r\n' +
+ 'I prefer encrypted email - https://leap.se/key/anotheruser.\r\n',
decrypted,
'Decrypted text differs from plaintext.')
@@ -208,6 +215,9 @@ class TestSmtpGateway(TestCaseWithKeyManager):
m.lineReceived(line)
# trigger signing
m.eomReceived()
+ # we need to call the following explicitelly because it was deferred
+ # inside the previous method
+ m._maybe_encrypt_and_sign()
# assert structure of signed message
self.assertTrue('Content-Type' in m._msg)
self.assertEqual('multipart/signed', m._msg.get_content_type())
@@ -216,8 +226,9 @@ class TestSmtpGateway(TestCaseWithKeyManager):
self.assertEqual('pgp-sha512', m._msg.get_param('micalg'))
# assert content of message
self.assertEqual(
- m._msg.get_payload(0).get_payload(decode=True),
- '\r\n'.join(self.EMAIL_DATA[9:13]))
+ '\r\n'.join(self.EMAIL_DATA[9:13])+'\r\n--\r\n' +
+ 'I prefer encrypted email - https://leap.se/key/anotheruser.\r\n',
+ m._msg.get_payload(0).get_payload(decode=True))
# assert content of signature
self.assertTrue(
m._msg.get_payload(1).get_payload().startswith(