summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2013-12-25 11:57:42 -0400
committerKali Kaneko <kali@leap.se>2013-12-26 13:17:18 -0400
commit59756d93f329ceee925a813bd4137c2cb34e7679 (patch)
tree5d4dba23f07202c47fd4453805c882e7f13d85ce /src/leap/mail/imap
parent70fdb37740b0737202ad08b4637bf23c12dab87e (diff)
inlineCallbacks all the things!
Diffstat (limited to 'src/leap/mail/imap')
-rw-r--r--src/leap/mail/imap/fetch.py235
-rw-r--r--src/leap/mail/imap/server.py78
2 files changed, 129 insertions, 184 deletions
diff --git a/src/leap/mail/imap/fetch.py b/src/leap/mail/imap/fetch.py
index b1c34ba..0b31c3b 100644
--- a/src/leap/mail/imap/fetch.py
+++ b/src/leap/mail/imap/fetch.py
@@ -17,21 +17,24 @@
"""
Incoming mail fetcher.
"""
-import logging
+import copy
import json
-import ssl
+import logging
+#import ssl
import threading
import time
-import copy
-from StringIO import StringIO
+import sys
+import traceback
from email.parser import Parser
from email.generator import Generator
from email.utils import parseaddr
+from StringIO import StringIO
from twisted.python import log
+from twisted.internet import defer
from twisted.internet.task import LoopingCall
-from twisted.internet.threads import deferToThread
+#from twisted.internet.threads import deferToThread
from zope.proxy import sameProxiedObjects
from leap.common import events as leap_events
@@ -45,12 +48,18 @@ from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL
from leap.common.mail import get_email_charset
from leap.keymanager import errors as keymanager_errors
from leap.keymanager.openpgp import OpenPGPKey
+from leap.mail.decorators import deferred
from leap.soledad.client import Soledad
from leap.soledad.common.crypto import ENC_SCHEME_KEY, ENC_JSON_KEY
logger = logging.getLogger(__name__)
+MULTIPART_ENCRYPTED = "multipart/encrypted"
+MULTIPART_SIGNED = "multipart/signed"
+PGP_BEGIN = "-----BEGIN PGP MESSAGE-----"
+PGP_END = "-----END PGP MESSAGE-----"
+
class MalformedMessage(Exception):
"""
@@ -125,6 +134,9 @@ class LeapIncomingMail(object):
self._create_soledad_indexes()
+ # initialize a mail parser only once
+ self._parser = Parser()
+
def _create_soledad_indexes(self):
"""
Create needed indexes on soledad.
@@ -152,9 +164,10 @@ class LeapIncomingMail(object):
logger.debug("fetching mail for: %s %s" % (
self._soledad.uuid, self._userid))
if not self.fetching_lock.locked():
- d = deferToThread(self._sync_soledad)
- d.addCallbacks(self._signal_fetch_to_ui, self._sync_soledad_error)
- d.addCallbacks(self._process_doclist, self._sync_soledad_error)
+ d1 = self._sync_soledad()
+ d = defer.gatherResults([d1], consumeErrors=True)
+ d.addCallbacks(self._signal_fetch_to_ui, self._errback)
+ d.addCallbacks(self._signal_unread_to_ui, self._errback)
return d
else:
logger.debug("Already fetching mail.")
@@ -184,6 +197,11 @@ class LeapIncomingMail(object):
# synchronize incoming mail
+ def _errback(self, failure):
+ logger.exception(failure.value)
+ traceback.print_tb(*sys.exc_info())
+
+ @deferred
def _sync_soledad(self):
"""
Synchronizes with remote soledad.
@@ -196,10 +214,9 @@ class LeapIncomingMail(object):
self._soledad.sync()
log.msg('soledad synced.')
doclist = self._soledad.get_from_index("just-mail", "*")
+ self._process_doclist(doclist)
- return doclist
-
- def _signal_unread_to_ui(self):
+ def _signal_unread_to_ui(self, *args):
"""
Sends unread event to ui.
"""
@@ -215,53 +232,18 @@ class LeapIncomingMail(object):
:returns: doclist
:rtype: iterable
"""
+ doclist = doclist[0] # gatherResults pass us a list
fetched_ts = time.mktime(time.gmtime())
- num_mails = len(doclist)
- log.msg("there are %s mails" % (num_mails,))
+ num_mails = len(doclist) if doclist is not None else 0
+ if num_mails != 0:
+ log.msg("there are %s mails" % (num_mails,))
leap_events.signal(
IMAP_FETCHED_INCOMING, str(num_mails), str(fetched_ts))
- self._signal_unread_to_ui()
return doclist
- def _sync_soledad_error(self, failure):
- """
- Errback for sync errors.
- """
- # XXX should signal unrecoverable maybe.
- err = failure.value
- logger.error("error syncing soledad: %s" % (err,))
- if failure.check(ssl.SSLError):
- logger.warning('SSL Error while '
- 'syncing soledad: %r' % (err,))
- elif failure.check(Exception):
- logger.warning('Unknown error while '
- 'syncing soledad: %r' % (err,))
-
- def _log_err(self, failure):
- """
- Generic errback
- """
- err = failure.value
- logger.exception("error!: %r" % (err,))
-
- def _decryption_error(self, failure):
- """
- Errback for decryption errors.
- """
- # XXX should signal unrecoverable maybe.
- err = failure.value
- logger.error("error decrypting msg: %s" % (err,))
-
- def _saving_error(self, failure):
- """
- Errback for local save errors.
- """
- # XXX should signal unrecoverable maybe.
- err = failure.value
- logger.error("error saving msg locally: %s" % (err,))
-
# process incoming mail.
+ @defer.inlineCallbacks
def _process_doclist(self, doclist):
"""
Iterates through the doclist, checks if each doc
@@ -278,7 +260,6 @@ class LeapIncomingMail(object):
return
num_mails = len(doclist)
- docs_cb = []
for index, doc in enumerate(doclist):
logger.debug("processing doc %d of %d" % (index + 1, num_mails))
leap_events.signal(
@@ -287,35 +268,18 @@ class LeapIncomingMail(object):
if self._is_msg(keys):
# Ok, this looks like a legit msg.
# Let's process it!
- # Deferred chain for individual messages
-
- # XXX use an IConsumer instead... ?
- d = deferToThread(self._decrypt_doc, doc)
- d.addCallback(self._process_decrypted_doc)
- d.addErrback(self._log_err)
- d.addCallback(self._add_message_locally)
- d.addErrback(self._log_err)
- docs_cb.append(d)
+ decrypted = list(self._decrypt_doc(doc))[0]
+ res = self._add_message_locally(decrypted)
+ yield res
+
else:
# Ooops, this does not.
logger.debug('This does not look like a proper msg.')
- return docs_cb
#
# operations on individual messages
#
- def _is_msg(self, keys):
- """
- Checks if the keys of a dictionary match the signature
- of the document type we use for messages.
-
- :param keys: iterable containing the strings to match.
- :type keys: iterable of strings.
- :rtype: bool
- """
- return ENC_SCHEME_KEY in keys and ENC_JSON_KEY in keys
-
def _decrypt_doc(self, doc):
"""
Decrypt the contents of a document.
@@ -339,7 +303,9 @@ class LeapIncomingMail(object):
logger.error("Error while decrypting msg: %r" % (exc,))
decrdata = ""
leap_events.signal(IMAP_MSG_DECRYPTED, "1" if success else "0")
- return doc, decrdata
+
+ data = list(self._process_decrypted_doc((doc, decrdata)))
+ yield (doc, data)
def _process_decrypted_doc(self, msgtuple):
"""
@@ -357,16 +323,15 @@ class LeapIncomingMail(object):
doc, data = msgtuple
msg = json.loads(data)
if not isinstance(msg, dict):
- return False
+ defer.returnValue(False)
if not msg.get(self.INCOMING_KEY, False):
- return False
+ defer.returnValue(False)
# ok, this is an incoming message
rawmsg = msg.get(self.CONTENT_KEY, None)
if not rawmsg:
return False
- data = self._maybe_decrypt_msg(rawmsg)
- return doc, data
+ return self._maybe_decrypt_msg(rawmsg)
def _maybe_decrypt_msg(self, data):
"""
@@ -381,17 +346,16 @@ class LeapIncomingMail(object):
leap_assert_type(data, unicode)
# parse the original message
- parser = Parser()
encoding = get_email_charset(data)
data = data.encode(encoding)
- msg = parser.parsestr(data)
+ msg = self._parser.parsestr(data)
# try to obtain sender public key
senderPubkey = None
fromHeader = msg.get('from', None)
- if fromHeader is not None \
- and (msg.get_content_type() == 'multipart/encrypted' \
- or msg.get_content_type() == 'multipart/signed'):
+ if (fromHeader is not None
+ and (msg.get_content_type() == MULTIPART_ENCRYPTED
+ or msg.get_content_type() == MULTIPART_SIGNED)):
_, senderAddress = parseaddr(fromHeader)
try:
senderPubkey = self._keymanager.get_key_from_cache(
@@ -400,11 +364,14 @@ class LeapIncomingMail(object):
pass
valid_sig = False # we will add a header saying if sig is valid
- if msg.get_content_type() == 'multipart/encrypted':
- decrmsg, valid_sig = self._decrypt_multipart_encrypted_msg(
+ decrypt_multi = self._decrypt_multipart_encrypted_msg
+ decrypt_inline = self._maybe_decrypt_inline_encrypted_msg
+
+ if msg.get_content_type() == MULTIPART_ENCRYPTED:
+ decrmsg, valid_sig = decrypt_multi(
msg, encoding, senderPubkey)
else:
- decrmsg, valid_sig = self._maybe_decrypt_inline_encrypted_msg(
+ decrmsg, valid_sig = decrypt_inline(
msg, encoding, senderPubkey)
# add x-leap-signature header
@@ -419,7 +386,7 @@ class LeapIncomingMail(object):
self.LEAP_SIGNATURE_INVALID,
pubkey=senderPubkey.key_id)
- return decrmsg.as_string()
+ yield decrmsg.as_string()
def _decrypt_multipart_encrypted_msg(self, msg, encoding, senderPubkey):
"""
@@ -437,43 +404,33 @@ class LeapIncomingMail(object):
"""
log.msg('decrypting multipart encrypted msg')
msg = copy.deepcopy(msg)
- # sanity check
- payload = msg.get_payload()
- if len(payload) != 2:
- raise MalformedMessage(
- 'Multipart/encrypted messages should have exactly 2 body '
- 'parts (instead of %d).' % len(payload))
- if payload[0].get_content_type() != 'application/pgp-encrypted':
- raise MalformedMessage(
- "Multipart/encrypted messages' first body part should "
- "have content type equal to 'application/pgp-encrypted' "
- "(instead of %s)." % payload[0].get_content_type())
- if payload[1].get_content_type() != 'application/octet-stream':
- raise MalformedMessage(
- "Multipart/encrypted messages' second body part should "
- "have content type equal to 'octet-stream' (instead of "
- "%s)." % payload[1].get_content_type())
+ self._multipart_sanity_check(msg)
+
# parse message and get encrypted content
pgpencmsg = msg.get_payload()[1]
encdata = pgpencmsg.get_payload()
+
# decrypt or fail gracefully
try:
- decrdata, valid_sig = self._decrypt_and_verify_data(
+ decrdata, valid_sig = yield self._decrypt_and_verify_data(
encdata, senderPubkey)
except keymanager_errors.DecryptError as e:
logger.warning('Failed to decrypt encrypted message (%s). '
'Storing message without modifications.' % str(e))
- return msg, False # return original message
+ # Bailing out!
+ yield (msg, False)
+
# decrypted successully, now fix encoding and parse
try:
decrdata = decrdata.encode(encoding)
except (UnicodeEncodeError, UnicodeDecodeError) as e:
logger.error("Unicode error {0}".format(e))
decrdata = decrdata.encode(encoding, 'replace')
- parser = Parser()
- decrmsg = parser.parsestr(decrdata)
+
+ decrmsg = self._parser.parsestr(decrdata)
# remove original message's multipart/encrypted content-type
del(msg['content-type'])
+
# replace headers back in original message
for hkey, hval in decrmsg.items():
try:
@@ -481,9 +438,10 @@ class LeapIncomingMail(object):
msg.replace_header(hkey, hval)
except KeyError:
msg[hkey] = hval
- # replace payload by unencrypted payload
+
+ # all ok, replace payload by unencrypted payload
msg.set_payload(decrmsg.get_payload())
- return msg, valid_sig
+ yield (msg, valid_sig)
def _maybe_decrypt_inline_encrypted_msg(self, origmsg, encoding,
senderPubkey):
@@ -497,8 +455,9 @@ class LeapIncomingMail(object):
:param senderPubkey: The key of the sender of the message.
:type senderPubkey: OpenPGPKey
- :return: A unitary tuple containing a decrypted message.
- :rtype: (Message)
+ :return: A unitary tuple containing a decrypted message and
+ a bool indicating wether the signature is valid.
+ :rtype: (Message, bool)
"""
log.msg('maybe decrypting inline encrypted msg')
# serialize the original message
@@ -507,8 +466,6 @@ class LeapIncomingMail(object):
g.flatten(origmsg)
data = buf.getvalue()
# handle exactly one inline PGP message
- PGP_BEGIN = "-----BEGIN PGP MESSAGE-----"
- PGP_END = "-----END PGP MESSAGE-----"
valid_sig = False
if PGP_BEGIN in data:
begin = data.find(PGP_BEGIN)
@@ -522,11 +479,11 @@ class LeapIncomingMail(object):
except keymanager_errors.DecryptError:
logger.warning('Failed to decrypt potential inline encrypted '
'message. Storing message as is...')
+
# if message is not encrypted, return raw data
if isinstance(data, unicode):
data = data.encode(encoding, 'replace')
- parser = Parser()
- return parser.parsestr(data), valid_sig
+ return (self._parser.parsestr(data), valid_sig)
def _decrypt_and_verify_data(self, data, senderPubkey):
"""
@@ -555,7 +512,7 @@ class LeapIncomingMail(object):
except keymanager_errors.InvalidSignature:
decrdata = self._keymanager.decrypt(
data, self._pkey)
- return decrdata, valid_sig
+ return (decrdata, valid_sig)
def _add_message_locally(self, msgtuple):
"""
@@ -570,10 +527,54 @@ class LeapIncomingMail(object):
"""
log.msg('adding message to local db')
doc, data = msgtuple
- self._inbox.addMessage(data, (self.RECENT_FLAG,))
+ if isinstance(data, list):
+ data = data[0]
+
+ self._inbox.addMessage(data, flags=(self.RECENT_FLAG,))
+
leap_events.signal(IMAP_MSG_SAVED_LOCALLY)
doc_id = doc.doc_id
self._soledad.delete_doc(doc)
log.msg("deleted doc %s from incoming" % doc_id)
leap_events.signal(IMAP_MSG_DELETED_INCOMING)
self._signal_unread_to_ui()
+ return True
+
+ #
+ # helpers
+ #
+
+ def _msg_multipart_sanity_check(self, msg):
+ """
+ Performs a sanity check against a multipart encrypted msg
+
+ :param msg: The original encrypted message.
+ :type msg: Message
+ """
+ # sanity check
+ payload = msg.get_payload()
+ if len(payload) != 2:
+ raise MalformedMessage(
+ 'Multipart/encrypted messages should have exactly 2 body '
+ 'parts (instead of %d).' % len(payload))
+ if payload[0].get_content_type() != 'application/pgp-encrypted':
+ raise MalformedMessage(
+ "Multipart/encrypted messages' first body part should "
+ "have content type equal to 'application/pgp-encrypted' "
+ "(instead of %s)." % payload[0].get_content_type())
+ if payload[1].get_content_type() != 'application/octet-stream':
+ raise MalformedMessage(
+ "Multipart/encrypted messages' second body part should "
+ "have content type equal to 'octet-stream' (instead of "
+ "%s)." % payload[1].get_content_type())
+
+ def _is_msg(self, keys):
+ """
+ Checks if the keys of a dictionary match the signature
+ of the document type we use for messages.
+
+ :param keys: iterable containing the strings to match.
+ :type keys: iterable of strings.
+ :rtype: bool
+ """
+ return ENC_SCHEME_KEY in keys and ENC_JSON_KEY in keys
diff --git a/src/leap/mail/imap/server.py b/src/leap/mail/imap/server.py
index 8758dcb..57587a5 100644
--- a/src/leap/mail/imap/server.py
+++ b/src/leap/mail/imap/server.py
@@ -21,20 +21,17 @@ import copy
import logging
import StringIO
import cStringIO
-import os
import time
import re
from collections import defaultdict, namedtuple
from email.parser import Parser
-from functools import wraps
from zope.interface import implements
from zope.proxy import sameProxiedObjects
from twisted.mail import imap4
from twisted.internet import defer
-from twisted.internet.threads import deferToThread
from twisted.python import log
from u1db import errors as u1db_errors
@@ -44,70 +41,12 @@ from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL
from leap.common.check import leap_assert, leap_assert_type
from leap.common.mail import get_email_charset
from leap.mail.messageflow import IMessageConsumer, MessageProducer
+from leap.mail.decorators import deferred
from leap.soledad.client import Soledad
logger = logging.getLogger(__name__)
-def deferred(f):
- '''
- Decorator, for deferring methods to Threads.
-
- It will do a deferToThread of the decorated method
- unless the environment variable LEAPMAIL_DEBUG is set.
-
- It uses a descriptor to delay the definition of the
- method wrapper.
- '''
- class descript(object):
- def __init__(self, f):
- self.f = f
-
- def __get__(self, instance, klass):
- if instance is None:
- # Class method was requested
- return self.make_unbound(klass)
- return self.make_bound(instance)
-
- def _errback(self, failure):
- err = failure.value
- logger.warning('error in method: %s' % (self.f.__name__))
- log.err(err)
-
- def make_unbound(self, klass):
-
- @wraps(self.f)
- def wrapper(*args, **kwargs):
- '''This documentation will vanish :)'''
- raise TypeError(
- 'unbound method {}() must be called with {} instance '
- 'as first argument (got nothing instead)'.format(
- self.f.__name__,
- klass.__name__)
- )
- return wrapper
-
- def make_bound(self, instance):
-
- @wraps(self.f)
- def wrapper(*args, **kwargs):
- '''This documentation will disapear :)'''
-
- if not os.environ.get('LEAPMAIL_DEBUG'):
- d = deferToThread(self.f, instance, *args, **kwargs)
- d.addErrback(self._errback)
- return d
- else:
- return self.f(instance, *args, **kwargs)
-
- # This instance does not need the descriptor anymore,
- # let it find the wrapper directly next time:
- setattr(instance, self.f.__name__, wrapper)
- return wrapper
-
- return descript(f)
-
-
class MissingIndexError(Exception):
"""
Raises when tried to access a non existent index document.
@@ -248,6 +187,8 @@ class MailParser(object):
return self._parser.parse
if isinstance(o, basestring):
return self._parser.parsestr
+ # fallback
+ return self._parser.parsestr
def _stringify(self, o):
"""
@@ -942,8 +883,8 @@ class LeapMessage(fields, MailParser, MBoxParser):
Return True if this message is multipart.
"""
if self._cdoc:
- retval = self._cdoc.content.get(self.MULTIPART_KEY, False)
- print "MULTIPART? ", retval
+ retval = self._fdoc.content.get(self.MULTIPART_KEY, False)
+ return retval
def getSubPart(self, part):
"""
@@ -1197,6 +1138,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
msg = self._get_parsed_msg(raw)
headers = dict(msg)
+ logger.debug("adding. is multipart:%s" % msg.is_multipart())
flags_doc[self.MULTIPART_KEY] = msg.is_multipart()
# XXX get lower case for keys?
# XXX get headers doc
@@ -1464,7 +1406,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
def addListener(self, listener):
"""
- Rdds a listener to the listeners queue.
+ Adds a listener to the listeners queue.
+ The server adds itself as a listener when there is a SELECT,
+ so it can send EXIST commands.
:param listener: listener to add
:type listener: an object that implements IMailboxListener
@@ -1716,6 +1660,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:return: a deferred that evals to None
"""
# XXX we should treat the message as an IMessage from here
+ leap_assert_type(message, basestring)
uid_next = self.getUIDNext()
logger.debug('Adding msg with UID :%s' % uid_next)
if flags is None:
@@ -1823,12 +1768,11 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
else:
for msg_id in messages:
- print "getting msg by uid", msg_id
msg = self.messages.get_msg_by_uid(msg_id)
if msg:
result.append((msg_id, msg))
else:
- print "fetch %s, no msg found!!!" % msg_id
+ logger.debug("fetch %s, no msg found!!!" % msg_id)
if self.isWriteable():
self._unset_recent_flag()