summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGELOG6
-rw-r--r--src/leap/mx/mail_receiver.py122
2 files changed, 102 insertions, 26 deletions
diff --git a/CHANGELOG b/CHANGELOG
index 67ff1e5..cbd9c74 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,3 +1,9 @@
+0.3.4 Nov 15:
+ o Some mail may be skipped at processing because of possible
+ problems (like connectivity issues to our couch nodes), MX now
+ looks for unprocessed mails every half hour and tries to process
+ them. Fixes #3628.
+
0.3.3 Nov 1:
o Fix return codes for check recipient access. Fixes #3356.
o Improve logging in general and support possible unicode parameters
diff --git a/src/leap/mx/mail_receiver.py b/src/leap/mx/mail_receiver.py
index 741b99d..3890eb2 100644
--- a/src/leap/mx/mail_receiver.py
+++ b/src/leap/mx/mail_receiver.py
@@ -35,7 +35,7 @@ except ImportError:
from email import message_from_string
from twisted.application.service import Service
-from twisted.internet import inotify, defer
+from twisted.internet import inotify, defer, task
from twisted.python import filepath, log
from leap.common.mail import get_email_charset
@@ -74,6 +74,7 @@ class MailReceiver(Service):
self._users_cdb = users_cdb
self._directories = directories
self._domain = socket.gethostbyaddr(socket.gethostname())[0]
+ self._processing_skipped = False
def startService(self):
"""
@@ -91,6 +92,10 @@ class MailReceiver(Service):
callbacks=[self._process_incoming_email],
recursive=recursive)
+ self._lcall = task.LoopingCall(self._process_skipped)
+ # Run once every half an hour, but don't start right now
+ self._lcall.start(interval=60*30, now=False)
+
def _encrypt_message(self, pubkey, message):
"""
Given a public key and a message, it encrypts the message to
@@ -109,7 +114,7 @@ class MailReceiver(Service):
if pubkey is None or len(pubkey) == 0:
log.msg("_encrypt_message: Something went wrong, here's all "
"I know: %r" % (pubkey,))
- return None, None
+ return None
doc = SoledadDocument(doc_id=str(pyuuid.uuid4()))
@@ -211,6 +216,8 @@ class MailReceiver(Service):
uuid = None
delivereds = mail.get_all("Delivered-To")
+ if delivereds is None:
+ return None
for to in delivereds:
name, addr = email.utils.parseaddr(to)
parts = addr.split("@")
@@ -220,6 +227,86 @@ class MailReceiver(Service):
return uuid
+ def sleep(self, secs):
+ """
+ Async sleep for a defer. Use this when you want to wait for
+ another (non atomic) defer to finish.
+
+ :param secs: seconds to wait (not really accurate, it depends
+ on the event queue)
+ :type secs: int
+
+ :rtype: twisted.internet.defer.Deferred
+ """
+ from twisted.internet import reactor
+ d = defer.Deferred()
+ reactor.callLater(secs, d.callback, None)
+ return d
+
+ @defer.inlineCallbacks
+ def _process_skipped(self):
+ """
+ Recursively or not (depending on the configuration) process
+ all the watched directories for unprocessed mail and try to
+ process it.
+ """
+ if self._processing_skipped:
+ defer.returnValue(None)
+
+ self._processing_skipped = True
+ log.msg("Starting processing skipped mail...")
+ log.msg("-"*50)
+
+ for directory, recursive in self._directories:
+ for root, dirs, files in os.walk(directory):
+ for fname in files:
+ fullpath = os.path.join(root, fname)
+ fpath = filepath.FilePath(fullpath)
+ yield self._step_process_mail_backend(fpath)
+
+ if not recursive:
+ break
+
+ self._processing_skipped = False
+ log.msg("+"*50)
+ log.msg("Done processing skipped mail")
+
+ @defer.inlineCallbacks
+ def _step_process_mail_backend(self, filepath):
+ """
+ Processes the email pointed by filepath in an async
+ fashion. yield this method in another inlineCallbacks method
+ or return it for it to be run.
+
+ :param filepath: Path of the file that changed
+ :type filepath: twisted.python.filepath.FilePath
+ """
+ log.msg("Processing new mail at %r" % (filepath.path,))
+ with filepath.open("r") as f:
+ mail_data = f.read()
+ mail = message_from_string(mail_data)
+ uuid = self._get_owner(mail)
+ if uuid is None:
+ log.msg("Don't know how to deliver mail %r, skipping..." %
+ (filepath.path,))
+ defer.returnValue(None)
+ log.msg("Mail owner: %s" % (uuid,))
+
+ if uuid is None:
+ log.msg("BUG: There was no uuid!")
+ defer.returnValue(None)
+
+ pubkey = yield self._users_cdb.getPubKey(uuid)
+ if pubkey is None or len(pubkey):
+ log.msg("No public key, stopping the processing chain")
+ defer.returnValue(None)
+
+ log.msg("Encrypting message to %s's pubkey" % (uuid,))
+ doc = yield self._encrypt_message(pubkey, mail_data)
+
+ do_remove = yield self._export_message(uuid, doc)
+ yield self._conditional_remove(do_remove, filepath)
+
@defer.inlineCallbacks
def _process_incoming_email(self, otherself, filepath, mask):
"""
@@ -235,28 +322,11 @@ class MailReceiver(Service):
:type mask: int
"""
try:
+ while self._processing_skipped:
+ log.msg("Waiting for the process of skipped mail to be done...")
+ yield self.sleep(10) # NO-OP
if os.path.split(filepath.dirname())[-1] == "new":
- log.msg("Processing new mail at %r" % (filepath.path,))
- with filepath.open("r") as f:
- mail_data = f.read()
- mail = message_from_string(mail_data)
- uuid = self._get_owner(mail)
- if uuid is None:
- log.msg("Don't know how to deliver mail %r, skipping..." %
- filepath.path)
- return
- log.msg("Mail owner: %s" % (uuid,))
-
- if uuid is None:
- log.msg("BUG: There was no uuid!")
- return
-
- pubkey = yield self._users_cdb.getPubKey(uuid)
-
- log.msg("Encrypting message to %s's pubkey" % (uuid,))
- doc = yield self._encrypt_message(pubkey, mail_data)
-
- do_remove = yield self._export_message(uuid, doc)
- yield self._conditional_remove(do_remove, filepath)
- except Exception:
- log.err()
+ yield self._step_process_mail_backend(filepath)
+ except Exception as e:
+ log.msg("Something went wrong while processing {0!r}: {1!r}"
+ .format(filepath, e))