summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/fetch.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/mail/imap/fetch.py')
-rw-r--r--src/leap/mail/imap/fetch.py400
1 files changed, 239 insertions, 161 deletions
diff --git a/src/leap/mail/imap/fetch.py b/src/leap/mail/imap/fetch.py
index 14f7a9b..0a97752 100644
--- a/src/leap/mail/imap/fetch.py
+++ b/src/leap/mail/imap/fetch.py
@@ -17,21 +17,24 @@
"""
Incoming mail fetcher.
"""
+import copy
import logging
-import json
-import ssl
import threading
import time
-import copy
-from StringIO import StringIO
+import sys
+import traceback
+import warnings
from email.parser import Parser
from email.generator import Generator
from email.utils import parseaddr
+from StringIO import StringIO
from twisted.python import log
+from twisted.internet import defer
from twisted.internet.task import LoopingCall
-from twisted.internet.threads import deferToThread
+from twisted.internet.task import deferLater
+from u1db import errors as u1db_errors
from zope.proxy import sameProxiedObjects
from leap.common import events as leap_events
@@ -42,15 +45,25 @@ from leap.common.events.events_pb2 import IMAP_MSG_DECRYPTED
from leap.common.events.events_pb2 import IMAP_MSG_SAVED_LOCALLY
from leap.common.events.events_pb2 import IMAP_MSG_DELETED_INCOMING
from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL
+from leap.common.events.events_pb2 import SOLEDAD_INVALID_AUTH_TOKEN
from leap.common.mail import get_email_charset
from leap.keymanager import errors as keymanager_errors
from leap.keymanager.openpgp import OpenPGPKey
+from leap.mail.decorators import deferred_to_thread
+from leap.mail.imap.fields import fields
+from leap.mail.utils import json_loads, empty, first
from leap.soledad.client import Soledad
from leap.soledad.common.crypto import ENC_SCHEME_KEY, ENC_JSON_KEY
+from leap.soledad.common.errors import InvalidAuthTokenError
logger = logging.getLogger(__name__)
+MULTIPART_ENCRYPTED = "multipart/encrypted"
+MULTIPART_SIGNED = "multipart/signed"
+PGP_BEGIN = "-----BEGIN PGP MESSAGE-----"
+PGP_END = "-----END PGP MESSAGE-----"
+
class MalformedMessage(Exception):
"""
@@ -73,8 +86,6 @@ class LeapIncomingMail(object):
"""
RECENT_FLAG = "\\Recent"
-
- INCOMING_KEY = "incoming"
CONTENT_KEY = "content"
LEAP_SIGNATURE_HEADER = 'X-Leap-Signature'
@@ -123,13 +134,8 @@ class LeapIncomingMail(object):
self._loop = None
self._check_period = check_period
- self._create_soledad_indexes()
-
- def _create_soledad_indexes(self):
- """
- Create needed indexes on soledad.
- """
- self._soledad.create_index("just-mail", "incoming")
+ # initialize a mail parser only once
+ self._parser = Parser()
@property
def _pkey(self):
@@ -149,12 +155,29 @@ class LeapIncomingMail(object):
Calls a deferred that will execute the fetch callback
in a separate thread
"""
+ def syncSoledadCallback(result):
+ # FIXME this needs a matching change in mx!!!
+ # --> need to add ERROR_DECRYPTING_KEY = False
+ # as default.
+ try:
+ doclist = self._soledad.get_from_index(
+ fields.JUST_MAIL_IDX, "*", "0")
+ except u1db_errors.InvalidGlobbing:
+ # It looks like we are a dealing with an outdated
+ # mx. Fallback to the version of the index
+ warnings.warn("JUST_MAIL_COMPAT_IDX will be deprecated!",
+ DeprecationWarning)
+ doclist = self._soledad.get_from_index(
+ fields.JUST_MAIL_COMPAT_IDX, "*")
+ self._process_doclist(doclist)
+
logger.debug("fetching mail for: %s %s" % (
self._soledad.uuid, self._userid))
if not self.fetching_lock.locked():
- d = deferToThread(self._sync_soledad)
- d.addCallbacks(self._signal_fetch_to_ui, self._sync_soledad_error)
- d.addCallbacks(self._process_doclist, self._sync_soledad_error)
+ d1 = self._sync_soledad()
+ d = defer.gatherResults([d1], consumeErrors=True)
+ d.addCallbacks(syncSoledadCallback, self._errback)
+ d.addCallbacks(self._signal_fetch_to_ui, self._errback)
return d
else:
logger.debug("Already fetching mail.")
@@ -184,81 +207,53 @@ class LeapIncomingMail(object):
# synchronize incoming mail
+ def _errback(self, failure):
+ logger.exception(failure.value)
+ traceback.print_tb(*sys.exc_info())
+
+ @deferred_to_thread
def _sync_soledad(self):
"""
- Synchronizes with remote soledad.
+ Synchronize with remote soledad.
:returns: a list of LeapDocuments, or None.
:rtype: iterable or None
"""
with self.fetching_lock:
- log.msg('syncing soledad...')
- self._soledad.sync()
- log.msg('soledad synced.')
- doclist = self._soledad.get_from_index("just-mail", "*")
-
- return doclist
-
- def _signal_unread_to_ui(self):
- """
- Sends unread event to ui.
- """
- leap_events.signal(
- IMAP_UNREAD_MAIL, str(self._inbox.getUnseenCount()))
+ try:
+ log.msg('FETCH: syncing soledad...')
+ self._soledad.sync()
+ log.msg('FETCH soledad SYNCED.')
+ except InvalidAuthTokenError:
+ # if the token is invalid, send an event so the GUI can
+ # disable mail and show an error message.
+ leap_events.signal(SOLEDAD_INVALID_AUTH_TOKEN)
def _signal_fetch_to_ui(self, doclist):
"""
- Sends leap events to ui.
+ Send leap events to ui.
:param doclist: iterable with msg documents.
:type doclist: iterable.
:returns: doclist
:rtype: iterable
"""
- fetched_ts = time.mktime(time.gmtime())
- num_mails = len(doclist)
- log.msg("there are %s mails" % (num_mails,))
- leap_events.signal(
- IMAP_FETCHED_INCOMING, str(num_mails), str(fetched_ts))
- self._signal_unread_to_ui()
- return doclist
-
- def _sync_soledad_error(self, failure):
- """
- Errback for sync errors.
- """
- # XXX should signal unrecoverable maybe.
- err = failure.value
- logger.error("error syncing soledad: %s" % (err,))
- if failure.check(ssl.SSLError):
- logger.warning('SSL Error while '
- 'syncing soledad: %r' % (err,))
- elif failure.check(Exception):
- logger.warning('Unknown error while '
- 'syncing soledad: %r' % (err,))
-
- def _log_err(self, failure):
- """
- Generic errback
- """
- err = failure.value
- logger.exception("error!: %r" % (err,))
-
- def _decryption_error(self, failure):
- """
- Errback for decryption errors.
- """
- # XXX should signal unrecoverable maybe.
- err = failure.value
- logger.error("error decrypting msg: %s" % (err,))
+ doclist = first(doclist) # gatherResults pass us a list
+ if doclist:
+ fetched_ts = time.mktime(time.gmtime())
+ num_mails = len(doclist) if doclist is not None else 0
+ if num_mails != 0:
+ log.msg("there are %s mails" % (num_mails,))
+ leap_events.signal(
+ IMAP_FETCHED_INCOMING, str(num_mails), str(fetched_ts))
+ return doclist
- def _saving_error(self, failure):
+ def _signal_unread_to_ui(self, *args):
"""
- Errback for local save errors.
+ Sends unread event to ui.
"""
- # XXX should signal unrecoverable maybe.
- err = failure.value
- logger.error("error saving msg locally: %s" % (err,))
+ leap_events.signal(
+ IMAP_UNREAD_MAIL, str(self._inbox.getUnseenCount()))
# process incoming mail.
@@ -278,44 +273,40 @@ class LeapIncomingMail(object):
return
num_mails = len(doclist)
- docs_cb = []
for index, doc in enumerate(doclist):
logger.debug("processing doc %d of %d" % (index + 1, num_mails))
leap_events.signal(
IMAP_MSG_PROCESSING, str(index), str(num_mails))
+
keys = doc.content.keys()
- if self._is_msg(keys):
- # Ok, this looks like a legit msg.
+
+ # TODO Compatibility check with the index in pre-0.6 mx
+ # that does not write the ERROR_DECRYPTING_KEY
+ # This should be removed in 0.7
+
+ has_errors = doc.content.get(fields.ERROR_DECRYPTING_KEY, None)
+ if has_errors is None:
+ warnings.warn("JUST_MAIL_COMPAT_IDX will be deprecated!",
+ DeprecationWarning)
+ if has_errors:
+ logger.debug("skipping msg with decrypting errors...")
+
+ if self._is_msg(keys) and not has_errors:
+ # Evaluating to bool of has_errors is intentional here.
+ # We don't mind at this point if it's None or False.
+
+ # Ok, this looks like a legit msg, and with no errors.
# Let's process it!
- # Deferred chain for individual messages
-
- # XXX use an IConsumer instead... ?
- d = deferToThread(self._decrypt_doc, doc)
- d.addCallback(self._process_decrypted_doc)
- d.addErrback(self._log_err)
- d.addCallback(self._add_message_locally)
- d.addErrback(self._log_err)
- docs_cb.append(d)
- else:
- # Ooops, this does not.
- logger.debug('This does not look like a proper msg.')
- return docs_cb
+
+ d1 = self._decrypt_doc(doc)
+ d = defer.gatherResults([d1], consumeErrors=True)
+ d.addCallbacks(self._add_message_locally, self._errback)
#
# operations on individual messages
#
- def _is_msg(self, keys):
- """
- Checks if the keys of a dictionary match the signature
- of the document type we use for messages.
-
- :param keys: iterable containing the strings to match.
- :type keys: iterable of strings.
- :rtype: bool
- """
- return ENC_SCHEME_KEY in keys and ENC_JSON_KEY in keys
-
+ @deferred_to_thread
def _decrypt_doc(self, doc):
"""
Decrypt the contents of a document.
@@ -339,7 +330,9 @@ class LeapIncomingMail(object):
logger.error("Error while decrypting msg: %r" % (exc,))
decrdata = ""
leap_events.signal(IMAP_MSG_DECRYPTED, "1" if success else "0")
- return doc, decrdata
+
+ data = self._process_decrypted_doc((doc, decrdata))
+ return (doc, data)
def _process_decrypted_doc(self, msgtuple):
"""
@@ -355,54 +348,105 @@ class LeapIncomingMail(object):
"""
log.msg('processing decrypted doc')
doc, data = msgtuple
- msg = json.loads(data)
+
+ from twisted.internet import reactor
+
+ # XXX turn this into an errBack for each one of
+ # the deferreds that would process an individual document
+ try:
+ msg = json_loads(data)
+ except UnicodeError as exc:
+ logger.error("Error while decrypting %s" % (doc.doc_id,))
+ logger.exception(exc)
+
+ # we flag the message as "with decrypting errors",
+ # to avoid further decryption attempts during sync
+ # cycles until we're prepared to deal with that.
+ # What is the same, when Ivan deals with it...
+ # A new decrypting attempt event could be triggered by a
+ # future a library upgrade, or a cli flag to the client,
+ # we just `defer` that for now... :)
+ doc.content[fields.ERROR_DECRYPTING_KEY] = True
+ deferLater(reactor, 0, self._update_incoming_message, doc)
+
+ # FIXME this is just a dirty hack to delay the proper
+ # deferred organization here...
+ # and remember, boys, do not do this at home.
+ return []
+
if not isinstance(msg, dict):
- return False
- if not msg.get(self.INCOMING_KEY, False):
- return False
+ defer.returnValue(False)
+ if not msg.get(fields.INCOMING_KEY, False):
+ defer.returnValue(False)
# ok, this is an incoming message
rawmsg = msg.get(self.CONTENT_KEY, None)
if not rawmsg:
return False
- data = self._maybe_decrypt_msg(rawmsg)
- return doc, data
+ return self._maybe_decrypt_msg(rawmsg)
+
+ @deferred_to_thread
+ def _update_incoming_message(self, doc):
+ """
+ Do a put for a soledad document. This probably has been called only
+ in the case that we've needed to update the ERROR_DECRYPTING_KEY
+ flag in an incoming message, to get it out of the decrypting queue.
+
+ :param doc: the SoledadDocument to update
+ :type doc: SoledadDocument
+ """
+ log.msg("Updating SoledadDoc %s" % (doc.doc_id))
+ self._soledad.put_doc(doc)
+
+ @deferred_to_thread
+ def _delete_incoming_message(self, doc):
+ """
+ Delete document.
+
+ :param doc: the SoledadDocument to delete
+ :type doc: SoledadDocument
+ """
+ log.msg("Deleting Incoming message: %s" % (doc.doc_id,))
+ self._soledad.delete_doc(doc)
def _maybe_decrypt_msg(self, data):
"""
Tries to decrypt a gpg message if data looks like one.
:param data: the text to be decrypted.
- :type data: unicode
+ :type data: str
:return: data, possibly descrypted.
:rtype: str
"""
+ leap_assert_type(data, str)
log.msg('maybe decrypting doc')
- leap_assert_type(data, unicode)
# parse the original message
- parser = Parser()
encoding = get_email_charset(data)
- data = data.encode(encoding)
- msg = parser.parsestr(data)
+ msg = self._parser.parsestr(data)
# try to obtain sender public key
senderPubkey = None
fromHeader = msg.get('from', None)
- if fromHeader is not None:
+ if (fromHeader is not None
+ and (msg.get_content_type() == MULTIPART_ENCRYPTED
+ or msg.get_content_type() == MULTIPART_SIGNED)):
_, senderAddress = parseaddr(fromHeader)
try:
- senderPubkey = self._keymanager.get_key(
+ senderPubkey = self._keymanager.get_key_from_cache(
senderAddress, OpenPGPKey)
except keymanager_errors.KeyNotFound:
pass
valid_sig = False # we will add a header saying if sig is valid
- if msg.get_content_type() == 'multipart/encrypted':
- decrmsg, valid_sig = self._decrypt_multipart_encrypted_msg(
+ decrypt_multi = self._decrypt_multipart_encrypted_msg
+ decrypt_inline = self._maybe_decrypt_inline_encrypted_msg
+
+ if msg.get_content_type() == MULTIPART_ENCRYPTED:
+ decrmsg, valid_sig = decrypt_multi(
msg, encoding, senderPubkey)
else:
- decrmsg, valid_sig = self._maybe_decrypt_inline_encrypted_msg(
+ decrmsg, valid_sig = decrypt_inline(
msg, encoding, senderPubkey)
# add x-leap-signature header
@@ -435,25 +479,12 @@ class LeapIncomingMail(object):
"""
log.msg('decrypting multipart encrypted msg')
msg = copy.deepcopy(msg)
- # sanity check
- payload = msg.get_payload()
- if len(payload) != 2:
- raise MalformedMessage(
- 'Multipart/encrypted messages should have exactly 2 body '
- 'parts (instead of %d).' % len(payload))
- if payload[0].get_content_type() != 'application/pgp-encrypted':
- raise MalformedMessage(
- "Multipart/encrypted messages' first body part should "
- "have content type equal to 'application/pgp-encrypted' "
- "(instead of %s)." % payload[0].get_content_type())
- if payload[1].get_content_type() != 'application/octet-stream':
- raise MalformedMessage(
- "Multipart/encrypted messages' second body part should "
- "have content type equal to 'octet-stream' (instead of "
- "%s)." % payload[1].get_content_type())
+ self._msg_multipart_sanity_check(msg)
+
# parse message and get encrypted content
pgpencmsg = msg.get_payload()[1]
encdata = pgpencmsg.get_payload()
+
# decrypt or fail gracefully
try:
decrdata, valid_sig = self._decrypt_and_verify_data(
@@ -461,17 +492,13 @@ class LeapIncomingMail(object):
except keymanager_errors.DecryptError as e:
logger.warning('Failed to decrypt encrypted message (%s). '
'Storing message without modifications.' % str(e))
- return msg, False # return original message
- # decrypted successully, now fix encoding and parse
- try:
- decrdata = decrdata.encode(encoding)
- except (UnicodeEncodeError, UnicodeDecodeError) as e:
- logger.error("Unicode error {0}".format(e))
- decrdata = decrdata.encode(encoding, 'replace')
- parser = Parser()
- decrmsg = parser.parsestr(decrdata)
+ # Bailing out!
+ return (msg, False)
+
+ decrmsg = self._parser.parsestr(decrdata)
# remove original message's multipart/encrypted content-type
del(msg['content-type'])
+
# replace headers back in original message
for hkey, hval in decrmsg.items():
try:
@@ -479,9 +506,10 @@ class LeapIncomingMail(object):
msg.replace_header(hkey, hval)
except KeyError:
msg[hkey] = hval
- # replace payload by unencrypted payload
+
+ # all ok, replace payload by unencrypted payload
msg.set_payload(decrmsg.get_payload())
- return msg, valid_sig
+ return (msg, valid_sig)
def _maybe_decrypt_inline_encrypted_msg(self, origmsg, encoding,
senderPubkey):
@@ -495,8 +523,9 @@ class LeapIncomingMail(object):
:param senderPubkey: The key of the sender of the message.
:type senderPubkey: OpenPGPKey
- :return: A unitary tuple containing a decrypted message.
- :rtype: (Message)
+ :return: A tuple containing a decrypted message and
+ a bool indicating whether the signature is valid.
+ :rtype: (Message, bool)
"""
log.msg('maybe decrypting inline encrypted msg')
# serialize the original message
@@ -505,13 +534,11 @@ class LeapIncomingMail(object):
g.flatten(origmsg)
data = buf.getvalue()
# handle exactly one inline PGP message
- PGP_BEGIN = "-----BEGIN PGP MESSAGE-----"
- PGP_END = "-----END PGP MESSAGE-----"
valid_sig = False
if PGP_BEGIN in data:
begin = data.find(PGP_BEGIN)
end = data.find(PGP_END)
- pgp_message = data[begin:end+len(PGP_END)]
+ pgp_message = data[begin:end + len(PGP_END)]
try:
decrdata, valid_sig = self._decrypt_and_verify_data(
pgp_message, senderPubkey)
@@ -520,11 +547,11 @@ class LeapIncomingMail(object):
except keymanager_errors.DecryptError:
logger.warning('Failed to decrypt potential inline encrypted '
'message. Storing message as is...')
+
# if message is not encrypted, return raw data
if isinstance(data, unicode):
data = data.encode(encoding, 'replace')
- parser = Parser()
- return parser.parsestr(data), valid_sig
+ return (self._parser.parsestr(data), valid_sig)
def _decrypt_and_verify_data(self, data, senderPubkey):
"""
@@ -553,25 +580,76 @@ class LeapIncomingMail(object):
except keymanager_errors.InvalidSignature:
decrdata = self._keymanager.decrypt(
data, self._pkey)
- return decrdata, valid_sig
+ return (decrdata, valid_sig)
- def _add_message_locally(self, msgtuple):
+ def _add_message_locally(self, result):
"""
Adds a message to local inbox and delete it from the incoming db
in soledad.
+ # XXX this comes from a gatherresult...
:param msgtuple: a tuple consisting of a SoledadDocument
instance containing the incoming message
and data, the json-encoded, decrypted content of the
incoming message
:type msgtuple: (SoledadDocument, str)
"""
- log.msg('adding message to local db')
+ from twisted.internet import reactor
+ msgtuple = first(result)
+
doc, data = msgtuple
- self._inbox.addMessage(data, (self.RECENT_FLAG,))
- leap_events.signal(IMAP_MSG_SAVED_LOCALLY)
- doc_id = doc.doc_id
- self._soledad.delete_doc(doc)
- log.msg("deleted doc %s from incoming" % doc_id)
- leap_events.signal(IMAP_MSG_DELETED_INCOMING)
- self._signal_unread_to_ui()
+ log.msg('adding message %s to local db' % (doc.doc_id,))
+
+ if isinstance(data, list):
+ if empty(data):
+ return False
+ data = data[0]
+
+ def msgSavedCallback(result):
+ if not empty(result):
+ leap_events.signal(IMAP_MSG_SAVED_LOCALLY)
+ deferLater(reactor, 0, self._delete_incoming_message, doc)
+ leap_events.signal(IMAP_MSG_DELETED_INCOMING)
+
+ d = self._inbox.addMessage(data, flags=(self.RECENT_FLAG,),
+ notify_on_disk=True)
+ d.addCallbacks(msgSavedCallback, self._errback)
+
+ #
+ # helpers
+ #
+
+ def _msg_multipart_sanity_check(self, msg):
+ """
+ Performs a sanity check against a multipart encrypted msg
+
+ :param msg: The original encrypted message.
+ :type msg: Message
+ """
+ # sanity check
+ payload = msg.get_payload()
+ if len(payload) != 2:
+ raise MalformedMessage(
+ 'Multipart/encrypted messages should have exactly 2 body '
+ 'parts (instead of %d).' % len(payload))
+ if payload[0].get_content_type() != 'application/pgp-encrypted':
+ raise MalformedMessage(
+ "Multipart/encrypted messages' first body part should "
+ "have content type equal to 'application/pgp-encrypted' "
+ "(instead of %s)." % payload[0].get_content_type())
+ if payload[1].get_content_type() != 'application/octet-stream':
+ raise MalformedMessage(
+ "Multipart/encrypted messages' second body part should "
+ "have content type equal to 'octet-stream' (instead of "
+ "%s)." % payload[1].get_content_type())
+
+ def _is_msg(self, keys):
+ """
+ Checks if the keys of a dictionary match the signature
+ of the document type we use for messages.
+
+ :param keys: iterable containing the strings to match.
+ :type keys: iterable of strings.
+ :rtype: bool
+ """
+ return ENC_SCHEME_KEY in keys and ENC_JSON_KEY in keys