summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-01-31 14:50:16 -0400
committerKali Kaneko <kali@leap.se>2014-03-10 03:37:13 -0400
commit40197f87b86c20ecc3f9dfd38687f25a4158d6e7 (patch)
treeeb3d3692d5738f126f91afdc9d36bf71df264d7d
parent1348abaef5ac93692c308b910b41524e39cd627e (diff)
keep processing after decoding errors during fetch
-rw-r--r--src/leap/mail/imap/fetch.py186
-rw-r--r--src/leap/mail/imap/fields.py15
2 files changed, 145 insertions, 56 deletions
diff --git a/src/leap/mail/imap/fetch.py b/src/leap/mail/imap/fetch.py
index 40dadb3..6e12b3f 100644
--- a/src/leap/mail/imap/fetch.py
+++ b/src/leap/mail/imap/fetch.py
@@ -23,6 +23,7 @@ import threading
import time
import sys
import traceback
+import warnings
from email.parser import Parser
from email.generator import Generator
@@ -32,6 +33,8 @@ from StringIO import StringIO
from twisted.python import log
from twisted.internet import defer
from twisted.internet.task import LoopingCall
+from twisted.internet.task import deferLater
+from u1db import errors as u1db_errors
from zope.proxy import sameProxiedObjects
from leap.common import events as leap_events
@@ -46,7 +49,8 @@ from leap.common.mail import get_email_charset
from leap.keymanager import errors as keymanager_errors
from leap.keymanager.openpgp import OpenPGPKey
from leap.mail.decorators import deferred_to_thread
-from leap.mail.utils import json_loads
+from leap.mail.imap.fields import fields
+from leap.mail.utils import json_loads, empty, first
from leap.soledad.client import Soledad
from leap.soledad.common.crypto import ENC_SCHEME_KEY, ENC_JSON_KEY
@@ -80,8 +84,6 @@ class LeapIncomingMail(object):
"""
RECENT_FLAG = "\\Recent"
-
- INCOMING_KEY = "incoming"
CONTENT_KEY = "content"
LEAP_SIGNATURE_HEADER = 'X-Leap-Signature'
@@ -130,17 +132,9 @@ class LeapIncomingMail(object):
self._loop = None
self._check_period = check_period
- self._create_soledad_indexes()
-
# initialize a mail parser only once
self._parser = Parser()
- def _create_soledad_indexes(self):
- """
- Create needed indexes on soledad.
- """
- self._soledad.create_index("just-mail", "incoming")
-
@property
def _pkey(self):
if sameProxiedObjects(self._keymanager, None):
@@ -159,13 +153,29 @@ class LeapIncomingMail(object):
Calls a deferred that will execute the fetch callback
in a separate thread
"""
+ def syncSoledadCallback(result):
+ # FIXME this needs a matching change in mx!!!
+ # --> need to add ERROR_DECRYPTING_KEY = False
+ # as default.
+ try:
+ doclist = self._soledad.get_from_index(
+ fields.JUST_MAIL_IDX, "*", "0")
+ except u1db_errors.InvalidGlobbing:
+ # It looks like we are a dealing with an outdated
+ # mx. Fallback to the version of the index
+ warnings.warn("JUST_MAIL_COMPAT_IDX will be deprecated!",
+ DeprecationWarning)
+ doclist = self._soledad.get_from_index(
+ fields.JUST_MAIL_COMPAT_IDX, "*")
+ self._process_doclist(doclist)
+
logger.debug("fetching mail for: %s %s" % (
self._soledad.uuid, self._userid))
if not self.fetching_lock.locked():
d1 = self._sync_soledad()
d = defer.gatherResults([d1], consumeErrors=True)
+ d.addCallbacks(syncSoledadCallback, self._errback)
d.addCallbacks(self._signal_fetch_to_ui, self._errback)
- d.addCallbacks(self._signal_unread_to_ui, self._errback)
return d
else:
logger.debug("Already fetching mail.")
@@ -202,46 +212,44 @@ class LeapIncomingMail(object):
@deferred_to_thread
def _sync_soledad(self):
"""
- Synchronizes with remote soledad.
+ Synchronize with remote soledad.
:returns: a list of LeapDocuments, or None.
:rtype: iterable or None
"""
with self.fetching_lock:
- log.msg('syncing soledad...')
+ log.msg('FETCH: syncing soledad...')
self._soledad.sync()
- log.msg('soledad synced.')
- doclist = self._soledad.get_from_index("just-mail", "*")
- self._process_doclist(doclist)
-
- def _signal_unread_to_ui(self, *args):
- """
- Sends unread event to ui.
- """
- leap_events.signal(
- IMAP_UNREAD_MAIL, str(self._inbox.getUnseenCount()))
+ log.msg('FETCH soledad SYNCED.')
def _signal_fetch_to_ui(self, doclist):
"""
- Sends leap events to ui.
+ Send leap events to ui.
:param doclist: iterable with msg documents.
:type doclist: iterable.
:returns: doclist
:rtype: iterable
"""
- doclist = doclist[0] # gatherResults pass us a list
- fetched_ts = time.mktime(time.gmtime())
- num_mails = len(doclist) if doclist is not None else 0
- if num_mails != 0:
- log.msg("there are %s mails" % (num_mails,))
+ doclist = first(doclist) # gatherResults pass us a list
+ if doclist:
+ fetched_ts = time.mktime(time.gmtime())
+ num_mails = len(doclist) if doclist is not None else 0
+ if num_mails != 0:
+ log.msg("there are %s mails" % (num_mails,))
+ leap_events.signal(
+ IMAP_FETCHED_INCOMING, str(num_mails), str(fetched_ts))
+ return doclist
+
+ def _signal_unread_to_ui(self, *args):
+ """
+ Sends unread event to ui.
+ """
leap_events.signal(
- IMAP_FETCHED_INCOMING, str(num_mails), str(fetched_ts))
- return doclist
+ IMAP_UNREAD_MAIL, str(self._inbox.getUnseenCount()))
# process incoming mail.
- @defer.inlineCallbacks
def _process_doclist(self, doclist):
"""
Iterates through the doclist, checks if each doc
@@ -262,22 +270,36 @@ class LeapIncomingMail(object):
logger.debug("processing doc %d of %d" % (index + 1, num_mails))
leap_events.signal(
IMAP_MSG_PROCESSING, str(index), str(num_mails))
+
keys = doc.content.keys()
- if self._is_msg(keys):
- # Ok, this looks like a legit msg.
+
+ # TODO Compatibility check with the index in pre-0.6 mx
+ # that does not write the ERROR_DECRYPTING_KEY
+ # This should be removed in 0.7
+
+ has_errors = doc.content.get(fields.ERROR_DECRYPTING_KEY, None)
+ if has_errors is None:
+ warnings.warn("JUST_MAIL_COMPAT_IDX will be deprecated!",
+ DeprecationWarning)
+ if has_errors:
+ logger.debug("skipping msg with decrypting errors...")
+
+ if self._is_msg(keys) and not has_errors:
+ # Evaluating to bool of has_errors is intentional here.
+ # We don't mind at this point if it's None or False.
+
+ # Ok, this looks like a legit msg, and with no errors.
# Let's process it!
- decrypted = list(self._decrypt_doc(doc))[0]
- res = self._add_message_locally(decrypted)
- yield res
- else:
- # Ooops, this does not.
- logger.debug('This does not look like a proper msg.')
+ d1 = self._decrypt_doc(doc)
+ d = defer.gatherResults([d1], consumeErrors=True)
+ d.addCallbacks(self._add_message_locally, self._errback)
#
# operations on individual messages
#
+ @deferred_to_thread
def _decrypt_doc(self, doc):
"""
Decrypt the contents of a document.
@@ -302,8 +324,8 @@ class LeapIncomingMail(object):
decrdata = ""
leap_events.signal(IMAP_MSG_DECRYPTED, "1" if success else "0")
- data = list(self._process_decrypted_doc((doc, decrdata)))
- yield (doc, data)
+ data = self._process_decrypted_doc((doc, decrdata))
+ return (doc, data)
def _process_decrypted_doc(self, msgtuple):
"""
@@ -319,11 +341,35 @@ class LeapIncomingMail(object):
"""
log.msg('processing decrypted doc')
doc, data = msgtuple
- msg = json_loads(data)
+
+ from twisted.internet import reactor
+
+ # XXX turn this into an errBack for each one of
+ # the deferreds that would process an individual document
+ try:
+ msg = json_loads(data)
+ except UnicodeError as exc:
+ logger.error("Error while decrypting %s" % (doc.doc_id,))
+ logger.exception(exc)
+
+ # we flag the message as "with decrypting errors",
+ # to avoid further decryption attempts during sync
+ # cycles until we're prepared to deal with that.
+ # What is the same, when Ivan deals with it...
+ # A new decrypting attempt event could be triggered by a
+ # future a library upgrade, or a cli flag to the client,
+ # we just `defer` that for now... :)
+ doc.content[fields.ERROR_DECRYPTING_KEY] = True
+ deferLater(reactor, 0, self._update_incoming_message, doc)
+
+ # FIXME this is just a dirty hack to delay the proper
+ # deferred organization here...
+ # and remember, boys, do not do this at home.
+ return []
if not isinstance(msg, dict):
defer.returnValue(False)
- if not msg.get(self.INCOMING_KEY, False):
+ if not msg.get(fields.INCOMING_KEY, False):
defer.returnValue(False)
# ok, this is an incoming message
@@ -332,6 +378,27 @@ class LeapIncomingMail(object):
return False
return self._maybe_decrypt_msg(rawmsg)
+ @deferred_to_thread
+ def _update_incoming_message(self, doc):
+ """
+ Do a put for a soledad document. This probably has been called only
+ in the case that we've needed to update the ERROR_DECRYPTING_KEY
+ flag in an incoming message, to get it out of the decrypting queue.
+
+ :param doc: the SoledadDocument to update
+ :type doc: SoledadDocument
+ """
+ log.msg("Updating SoledadDoc %s" % (doc.doc_id))
+ self._soledad.put_doc(doc)
+
+ @deferred_to_thread
+ def _delete_incoming_message(self, doc):
+ """
+ Delete document.
+ """
+ log.msg("Deleting SoledadDoc %s" % (doc.doc_id))
+ self._soledad.delete_doc(doc)
+
def _maybe_decrypt_msg(self, data):
"""
Tries to decrypt a gpg message if data looks like one.
@@ -384,7 +451,7 @@ class LeapIncomingMail(object):
self.LEAP_SIGNATURE_INVALID,
pubkey=senderPubkey.key_id)
- yield decrmsg.as_string()
+ return decrmsg.as_string()
def _decrypt_multipart_encrypted_msg(self, msg, encoding, senderPubkey):
"""
@@ -505,32 +572,39 @@ class LeapIncomingMail(object):
data, self._pkey)
return (decrdata, valid_sig)
- def _add_message_locally(self, msgtuple):
+ def _add_message_locally(self, result):
"""
Adds a message to local inbox and delete it from the incoming db
in soledad.
+ # XXX this comes from a gatherresult...
:param msgtuple: a tuple consisting of a SoledadDocument
instance containing the incoming message
and data, the json-encoded, decrypted content of the
incoming message
:type msgtuple: (SoledadDocument, str)
"""
- log.msg('adding message to local db')
+ from twisted.internet import reactor
+ msgtuple = first(result)
+
doc, data = msgtuple
+ log.msg('adding message %s to local db' % (doc.doc_id,))
if isinstance(data, list):
+ if empty(data):
+ return False
data = data[0]
- self._inbox.addMessage(data, flags=(self.RECENT_FLAG,))
+ def msgSavedCallback(result):
+ if not empty(result):
+ leap_events.signal(IMAP_MSG_SAVED_LOCALLY)
+ deferLater(reactor, 0, self._delete_incoming_message, result)
+ leap_events.signal(IMAP_MSG_DELETED_INCOMING)
+ deferLater(reactor, 1, self._signal_unread_to_ui)
- leap_events.signal(IMAP_MSG_SAVED_LOCALLY)
- doc_id = doc.doc_id
- self._soledad.delete_doc(doc)
- log.msg("deleted doc %s from incoming" % doc_id)
- leap_events.signal(IMAP_MSG_DELETED_INCOMING)
- self._signal_unread_to_ui()
- return True
+ # XXX should pass a notify_on_disk=True along...
+ d = self._inbox.addMessage(data, flags=(self.RECENT_FLAG,))
+ d.addCallbacks(msgSavedCallback, self._errback)
#
# helpers
diff --git a/src/leap/mail/imap/fields.py b/src/leap/mail/imap/fields.py
index 886ee63..4576939 100644
--- a/src/leap/mail/imap/fields.py
+++ b/src/leap/mail/imap/fields.py
@@ -108,6 +108,14 @@ class WithMsgFields(object):
# correct since the recent flag is volatile.
TYPE_MBOX_RECT_SEEN_IDX = 'by-type-and-mbox-and-recent-and-seen'
+ # Soledad index for incoming mail, without decrypting errors.
+ JUST_MAIL_IDX = "just-mail"
+ # XXX the backward-compatible index, will be deprecated at 0.7
+ JUST_MAIL_COMPAT_IDX = "just-mail-compat"
+
+ INCOMING_KEY = "incoming"
+ ERROR_DECRYPTING_KEY = "errdecr"
+
KTYPE = TYPE_KEY
MBOX_VAL = TYPE_MBOX_VAL
CHASH_VAL = CONTENT_HASH_KEY
@@ -140,6 +148,13 @@ class WithMsgFields(object):
TYPE_MBOX_DEL_IDX: [KTYPE, MBOX_VAL, 'bool(deleted)'],
TYPE_MBOX_RECT_SEEN_IDX: [KTYPE, MBOX_VAL,
'bool(recent)', 'bool(seen)'],
+
+ # incoming queue
+ JUST_MAIL_IDX: [INCOMING_KEY,
+ "bool(%s)" % (ERROR_DECRYPTING_KEY,)],
+
+ # the backward-compatible index, will be deprecated at 0.7
+ JUST_MAIL_COMPAT_IDX: [INCOMING_KEY],
}
MBOX_KEY = MBOX_VAL