summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/fetch.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/mail/imap/fetch.py')
-rw-r--r--src/leap/mail/imap/fetch.py234
1 files changed, 118 insertions, 116 deletions
diff --git a/src/leap/mail/imap/fetch.py b/src/leap/mail/imap/fetch.py
index b1c34ba..604a2ea 100644
--- a/src/leap/mail/imap/fetch.py
+++ b/src/leap/mail/imap/fetch.py
@@ -17,21 +17,24 @@
"""
Incoming mail fetcher.
"""
-import logging
+import copy
import json
-import ssl
+import logging
+#import ssl
import threading
import time
-import copy
-from StringIO import StringIO
+import sys
+import traceback
from email.parser import Parser
from email.generator import Generator
from email.utils import parseaddr
+from StringIO import StringIO
from twisted.python import log
+from twisted.internet import defer
from twisted.internet.task import LoopingCall
-from twisted.internet.threads import deferToThread
+#from twisted.internet.threads import deferToThread
from zope.proxy import sameProxiedObjects
from leap.common import events as leap_events
@@ -45,12 +48,18 @@ from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL
from leap.common.mail import get_email_charset
from leap.keymanager import errors as keymanager_errors
from leap.keymanager.openpgp import OpenPGPKey
+from leap.mail.decorators import deferred
from leap.soledad.client import Soledad
from leap.soledad.common.crypto import ENC_SCHEME_KEY, ENC_JSON_KEY
logger = logging.getLogger(__name__)
+MULTIPART_ENCRYPTED = "multipart/encrypted"
+MULTIPART_SIGNED = "multipart/signed"
+PGP_BEGIN = "-----BEGIN PGP MESSAGE-----"
+PGP_END = "-----END PGP MESSAGE-----"
+
class MalformedMessage(Exception):
"""
@@ -125,6 +134,9 @@ class LeapIncomingMail(object):
self._create_soledad_indexes()
+ # initialize a mail parser only once
+ self._parser = Parser()
+
def _create_soledad_indexes(self):
"""
Create needed indexes on soledad.
@@ -152,9 +164,10 @@ class LeapIncomingMail(object):
logger.debug("fetching mail for: %s %s" % (
self._soledad.uuid, self._userid))
if not self.fetching_lock.locked():
- d = deferToThread(self._sync_soledad)
- d.addCallbacks(self._signal_fetch_to_ui, self._sync_soledad_error)
- d.addCallbacks(self._process_doclist, self._sync_soledad_error)
+ d1 = self._sync_soledad()
+ d = defer.gatherResults([d1], consumeErrors=True)
+ d.addCallbacks(self._signal_fetch_to_ui, self._errback)
+ d.addCallbacks(self._signal_unread_to_ui, self._errback)
return d
else:
logger.debug("Already fetching mail.")
@@ -184,6 +197,11 @@ class LeapIncomingMail(object):
# synchronize incoming mail
+ def _errback(self, failure):
+ logger.exception(failure.value)
+ traceback.print_tb(*sys.exc_info())
+
+ @deferred
def _sync_soledad(self):
"""
Synchronizes with remote soledad.
@@ -196,10 +214,9 @@ class LeapIncomingMail(object):
self._soledad.sync()
log.msg('soledad synced.')
doclist = self._soledad.get_from_index("just-mail", "*")
+ self._process_doclist(doclist)
- return doclist
-
- def _signal_unread_to_ui(self):
+ def _signal_unread_to_ui(self, *args):
"""
Sends unread event to ui.
"""
@@ -215,53 +232,18 @@ class LeapIncomingMail(object):
:returns: doclist
:rtype: iterable
"""
+ doclist = doclist[0] # gatherResults pass us a list
fetched_ts = time.mktime(time.gmtime())
- num_mails = len(doclist)
- log.msg("there are %s mails" % (num_mails,))
+ num_mails = len(doclist) if doclist is not None else 0
+ if num_mails != 0:
+ log.msg("there are %s mails" % (num_mails,))
leap_events.signal(
IMAP_FETCHED_INCOMING, str(num_mails), str(fetched_ts))
- self._signal_unread_to_ui()
return doclist
- def _sync_soledad_error(self, failure):
- """
- Errback for sync errors.
- """
- # XXX should signal unrecoverable maybe.
- err = failure.value
- logger.error("error syncing soledad: %s" % (err,))
- if failure.check(ssl.SSLError):
- logger.warning('SSL Error while '
- 'syncing soledad: %r' % (err,))
- elif failure.check(Exception):
- logger.warning('Unknown error while '
- 'syncing soledad: %r' % (err,))
-
- def _log_err(self, failure):
- """
- Generic errback
- """
- err = failure.value
- logger.exception("error!: %r" % (err,))
-
- def _decryption_error(self, failure):
- """
- Errback for decryption errors.
- """
- # XXX should signal unrecoverable maybe.
- err = failure.value
- logger.error("error decrypting msg: %s" % (err,))
-
- def _saving_error(self, failure):
- """
- Errback for local save errors.
- """
- # XXX should signal unrecoverable maybe.
- err = failure.value
- logger.error("error saving msg locally: %s" % (err,))
-
# process incoming mail.
+ @defer.inlineCallbacks
def _process_doclist(self, doclist):
"""
Iterates through the doclist, checks if each doc
@@ -278,7 +260,6 @@ class LeapIncomingMail(object):
return
num_mails = len(doclist)
- docs_cb = []
for index, doc in enumerate(doclist):
logger.debug("processing doc %d of %d" % (index + 1, num_mails))
leap_events.signal(
@@ -287,35 +268,18 @@ class LeapIncomingMail(object):
if self._is_msg(keys):
# Ok, this looks like a legit msg.
# Let's process it!
- # Deferred chain for individual messages
-
- # XXX use an IConsumer instead... ?
- d = deferToThread(self._decrypt_doc, doc)
- d.addCallback(self._process_decrypted_doc)
- d.addErrback(self._log_err)
- d.addCallback(self._add_message_locally)
- d.addErrback(self._log_err)
- docs_cb.append(d)
+ decrypted = list(self._decrypt_doc(doc))[0]
+ res = self._add_message_locally(decrypted)
+ yield res
+
else:
# Ooops, this does not.
logger.debug('This does not look like a proper msg.')
- return docs_cb
#
# operations on individual messages
#
- def _is_msg(self, keys):
- """
- Checks if the keys of a dictionary match the signature
- of the document type we use for messages.
-
- :param keys: iterable containing the strings to match.
- :type keys: iterable of strings.
- :rtype: bool
- """
- return ENC_SCHEME_KEY in keys and ENC_JSON_KEY in keys
-
def _decrypt_doc(self, doc):
"""
Decrypt the contents of a document.
@@ -339,7 +303,9 @@ class LeapIncomingMail(object):
logger.error("Error while decrypting msg: %r" % (exc,))
decrdata = ""
leap_events.signal(IMAP_MSG_DECRYPTED, "1" if success else "0")
- return doc, decrdata
+
+ data = list(self._process_decrypted_doc((doc, decrdata)))
+ yield (doc, data)
def _process_decrypted_doc(self, msgtuple):
"""
@@ -357,16 +323,15 @@ class LeapIncomingMail(object):
doc, data = msgtuple
msg = json.loads(data)
if not isinstance(msg, dict):
- return False
+ defer.returnValue(False)
if not msg.get(self.INCOMING_KEY, False):
- return False
+ defer.returnValue(False)
# ok, this is an incoming message
rawmsg = msg.get(self.CONTENT_KEY, None)
if not rawmsg:
return False
- data = self._maybe_decrypt_msg(rawmsg)
- return doc, data
+ return self._maybe_decrypt_msg(rawmsg)
def _maybe_decrypt_msg(self, data):
"""
@@ -381,17 +346,16 @@ class LeapIncomingMail(object):
leap_assert_type(data, unicode)
# parse the original message
- parser = Parser()
encoding = get_email_charset(data)
data = data.encode(encoding)
- msg = parser.parsestr(data)
+ msg = self._parser.parsestr(data)
# try to obtain sender public key
senderPubkey = None
fromHeader = msg.get('from', None)
- if fromHeader is not None \
- and (msg.get_content_type() == 'multipart/encrypted' \
- or msg.get_content_type() == 'multipart/signed'):
+ if (fromHeader is not None
+ and (msg.get_content_type() == MULTIPART_ENCRYPTED
+ or msg.get_content_type() == MULTIPART_SIGNED)):
_, senderAddress = parseaddr(fromHeader)
try:
senderPubkey = self._keymanager.get_key_from_cache(
@@ -400,11 +364,14 @@ class LeapIncomingMail(object):
pass
valid_sig = False # we will add a header saying if sig is valid
- if msg.get_content_type() == 'multipart/encrypted':
- decrmsg, valid_sig = self._decrypt_multipart_encrypted_msg(
+ decrypt_multi = self._decrypt_multipart_encrypted_msg
+ decrypt_inline = self._maybe_decrypt_inline_encrypted_msg
+
+ if msg.get_content_type() == MULTIPART_ENCRYPTED:
+ decrmsg, valid_sig = decrypt_multi(
msg, encoding, senderPubkey)
else:
- decrmsg, valid_sig = self._maybe_decrypt_inline_encrypted_msg(
+ decrmsg, valid_sig = decrypt_inline(
msg, encoding, senderPubkey)
# add x-leap-signature header
@@ -419,7 +386,7 @@ class LeapIncomingMail(object):
self.LEAP_SIGNATURE_INVALID,
pubkey=senderPubkey.key_id)
- return decrmsg.as_string()
+ yield decrmsg.as_string()
def _decrypt_multipart_encrypted_msg(self, msg, encoding, senderPubkey):
"""
@@ -437,25 +404,12 @@ class LeapIncomingMail(object):
"""
log.msg('decrypting multipart encrypted msg')
msg = copy.deepcopy(msg)
- # sanity check
- payload = msg.get_payload()
- if len(payload) != 2:
- raise MalformedMessage(
- 'Multipart/encrypted messages should have exactly 2 body '
- 'parts (instead of %d).' % len(payload))
- if payload[0].get_content_type() != 'application/pgp-encrypted':
- raise MalformedMessage(
- "Multipart/encrypted messages' first body part should "
- "have content type equal to 'application/pgp-encrypted' "
- "(instead of %s)." % payload[0].get_content_type())
- if payload[1].get_content_type() != 'application/octet-stream':
- raise MalformedMessage(
- "Multipart/encrypted messages' second body part should "
- "have content type equal to 'octet-stream' (instead of "
- "%s)." % payload[1].get_content_type())
+ self._msg_multipart_sanity_check(msg)
+
# parse message and get encrypted content
pgpencmsg = msg.get_payload()[1]
encdata = pgpencmsg.get_payload()
+
# decrypt or fail gracefully
try:
decrdata, valid_sig = self._decrypt_and_verify_data(
@@ -463,17 +417,20 @@ class LeapIncomingMail(object):
except keymanager_errors.DecryptError as e:
logger.warning('Failed to decrypt encrypted message (%s). '
'Storing message without modifications.' % str(e))
- return msg, False # return original message
+ # Bailing out!
+ return (msg, False)
+
# decrypted successully, now fix encoding and parse
try:
decrdata = decrdata.encode(encoding)
except (UnicodeEncodeError, UnicodeDecodeError) as e:
logger.error("Unicode error {0}".format(e))
decrdata = decrdata.encode(encoding, 'replace')
- parser = Parser()
- decrmsg = parser.parsestr(decrdata)
+
+ decrmsg = self._parser.parsestr(decrdata)
# remove original message's multipart/encrypted content-type
del(msg['content-type'])
+
# replace headers back in original message
for hkey, hval in decrmsg.items():
try:
@@ -481,9 +438,10 @@ class LeapIncomingMail(object):
msg.replace_header(hkey, hval)
except KeyError:
msg[hkey] = hval
- # replace payload by unencrypted payload
+
+ # all ok, replace payload by unencrypted payload
msg.set_payload(decrmsg.get_payload())
- return msg, valid_sig
+ return (msg, valid_sig)
def _maybe_decrypt_inline_encrypted_msg(self, origmsg, encoding,
senderPubkey):
@@ -497,8 +455,9 @@ class LeapIncomingMail(object):
:param senderPubkey: The key of the sender of the message.
:type senderPubkey: OpenPGPKey
- :return: A unitary tuple containing a decrypted message.
- :rtype: (Message)
+ :return: A tuple containing a decrypted message and
+ a bool indicating whether the signature is valid.
+ :rtype: (Message, bool)
"""
log.msg('maybe decrypting inline encrypted msg')
# serialize the original message
@@ -507,8 +466,6 @@ class LeapIncomingMail(object):
g.flatten(origmsg)
data = buf.getvalue()
# handle exactly one inline PGP message
- PGP_BEGIN = "-----BEGIN PGP MESSAGE-----"
- PGP_END = "-----END PGP MESSAGE-----"
valid_sig = False
if PGP_BEGIN in data:
begin = data.find(PGP_BEGIN)
@@ -522,11 +479,11 @@ class LeapIncomingMail(object):
except keymanager_errors.DecryptError:
logger.warning('Failed to decrypt potential inline encrypted '
'message. Storing message as is...')
+
# if message is not encrypted, return raw data
if isinstance(data, unicode):
data = data.encode(encoding, 'replace')
- parser = Parser()
- return parser.parsestr(data), valid_sig
+ return (self._parser.parsestr(data), valid_sig)
def _decrypt_and_verify_data(self, data, senderPubkey):
"""
@@ -555,7 +512,7 @@ class LeapIncomingMail(object):
except keymanager_errors.InvalidSignature:
decrdata = self._keymanager.decrypt(
data, self._pkey)
- return decrdata, valid_sig
+ return (decrdata, valid_sig)
def _add_message_locally(self, msgtuple):
"""
@@ -570,10 +527,55 @@ class LeapIncomingMail(object):
"""
log.msg('adding message to local db')
doc, data = msgtuple
- self._inbox.addMessage(data, (self.RECENT_FLAG,))
+
+ if isinstance(data, list):
+ data = data[0]
+
+ self._inbox.addMessage(data, flags=(self.RECENT_FLAG,))
+
leap_events.signal(IMAP_MSG_SAVED_LOCALLY)
doc_id = doc.doc_id
self._soledad.delete_doc(doc)
log.msg("deleted doc %s from incoming" % doc_id)
leap_events.signal(IMAP_MSG_DELETED_INCOMING)
self._signal_unread_to_ui()
+ return True
+
+ #
+ # helpers
+ #
+
+ def _msg_multipart_sanity_check(self, msg):
+ """
+ Performs a sanity check against a multipart encrypted msg
+
+ :param msg: The original encrypted message.
+ :type msg: Message
+ """
+ # sanity check
+ payload = msg.get_payload()
+ if len(payload) != 2:
+ raise MalformedMessage(
+ 'Multipart/encrypted messages should have exactly 2 body '
+ 'parts (instead of %d).' % len(payload))
+ if payload[0].get_content_type() != 'application/pgp-encrypted':
+ raise MalformedMessage(
+ "Multipart/encrypted messages' first body part should "
+ "have content type equal to 'application/pgp-encrypted' "
+ "(instead of %s)." % payload[0].get_content_type())
+ if payload[1].get_content_type() != 'application/octet-stream':
+ raise MalformedMessage(
+ "Multipart/encrypted messages' second body part should "
+ "have content type equal to 'octet-stream' (instead of "
+ "%s)." % payload[1].get_content_type())
+
+ def _is_msg(self, keys):
+ """
+ Checks if the keys of a dictionary match the signature
+ of the document type we use for messages.
+
+ :param keys: iterable containing the strings to match.
+ :type keys: iterable of strings.
+ :rtype: bool
+ """
+ return ENC_SCHEME_KEY in keys and ENC_JSON_KEY in keys