diff options
-rw-r--r-- | CHANGELOG | 6 | ||||
-rw-r--r-- | src/leap/mx/mail_receiver.py | 122 |
2 files changed, 102 insertions, 26 deletions
@@ -1,3 +1,9 @@ +0.3.4 Nov 15: + o Some mail may be skipped at processing because of possible + problems (like connectivity issues to our couch nodes), MX now + looks for unprocessed mails every half hour and tries to process + them. Fixes #3628. + 0.3.3 Nov 1: o Fix return codes for check recipient access. Fixes #3356. o Improve logging in general and support possible unicode parameters diff --git a/src/leap/mx/mail_receiver.py b/src/leap/mx/mail_receiver.py index 741b99d..3890eb2 100644 --- a/src/leap/mx/mail_receiver.py +++ b/src/leap/mx/mail_receiver.py @@ -35,7 +35,7 @@ except ImportError: from email import message_from_string from twisted.application.service import Service -from twisted.internet import inotify, defer +from twisted.internet import inotify, defer, task from twisted.python import filepath, log from leap.common.mail import get_email_charset @@ -74,6 +74,7 @@ class MailReceiver(Service): self._users_cdb = users_cdb self._directories = directories self._domain = socket.gethostbyaddr(socket.gethostname())[0] + self._processing_skipped = False def startService(self): """ @@ -91,6 +92,10 @@ class MailReceiver(Service): callbacks=[self._process_incoming_email], recursive=recursive) + self._lcall = task.LoopingCall(self._process_skipped) + # Run once every half an hour, but don't start right now + self._lcall.start(interval=60*30, now=False) + def _encrypt_message(self, pubkey, message): """ Given a public key and a message, it encrypts the message to @@ -109,7 +114,7 @@ class MailReceiver(Service): if pubkey is None or len(pubkey) == 0: log.msg("_encrypt_message: Something went wrong, here's all " "I know: %r" % (pubkey,)) - return None, None + return None doc = SoledadDocument(doc_id=str(pyuuid.uuid4())) @@ -211,6 +216,8 @@ class MailReceiver(Service): uuid = None delivereds = mail.get_all("Delivered-To") + if delivereds is None: + return None for to in delivereds: name, addr = email.utils.parseaddr(to) parts = addr.split("@") @@ -220,6 +227,86 @@ class MailReceiver(Service): return uuid + def sleep(self, secs): + """ + Async sleep for a defer. Use this when you want to wait for + another (non atomic) defer to finish. + + :param secs: seconds to wait (not really accurate, it depends + on the event queue) + :type secs: int + + :rtype: twisted.internet.defer.Deferred + """ + from twisted.internet import reactor + d = defer.Deferred() + reactor.callLater(secs, d.callback, None) + return d + + @defer.inlineCallbacks + def _process_skipped(self): + """ + Recursively or not (depending on the configuration) process + all the watched directories for unprocessed mail and try to + process it. + """ + if self._processing_skipped: + defer.returnValue(None) + + self._processing_skipped = True + log.msg("Starting processing skipped mail...") + log.msg("-"*50) + + for directory, recursive in self._directories: + for root, dirs, files in os.walk(directory): + for fname in files: + fullpath = os.path.join(root, fname) + fpath = filepath.FilePath(fullpath) + yield self._step_process_mail_backend(fpath) + + if not recursive: + break + + self._processing_skipped = False + log.msg("+"*50) + log.msg("Done processing skipped mail") + + @defer.inlineCallbacks + def _step_process_mail_backend(self, filepath): + """ + Processes the email pointed by filepath in an async + fashion. yield this method in another inlineCallbacks method + or return it for it to be run. + + :param filepath: Path of the file that changed + :type filepath: twisted.python.filepath.FilePath + """ + log.msg("Processing new mail at %r" % (filepath.path,)) + with filepath.open("r") as f: + mail_data = f.read() + mail = message_from_string(mail_data) + uuid = self._get_owner(mail) + if uuid is None: + log.msg("Don't know how to deliver mail %r, skipping..." % + (filepath.path,)) + defer.returnValue(None) + log.msg("Mail owner: %s" % (uuid,)) + + if uuid is None: + log.msg("BUG: There was no uuid!") + defer.returnValue(None) + + pubkey = yield self._users_cdb.getPubKey(uuid) + if pubkey is None or len(pubkey): + log.msg("No public key, stopping the processing chain") + defer.returnValue(None) + + log.msg("Encrypting message to %s's pubkey" % (uuid,)) + doc = yield self._encrypt_message(pubkey, mail_data) + + do_remove = yield self._export_message(uuid, doc) + yield self._conditional_remove(do_remove, filepath) + @defer.inlineCallbacks def _process_incoming_email(self, otherself, filepath, mask): """ @@ -235,28 +322,11 @@ class MailReceiver(Service): :type mask: int """ try: + while self._processing_skipped: + log.msg("Waiting for the process of skipped mail to be done...") + yield self.sleep(10) # NO-OP if os.path.split(filepath.dirname())[-1] == "new": - log.msg("Processing new mail at %r" % (filepath.path,)) - with filepath.open("r") as f: - mail_data = f.read() - mail = message_from_string(mail_data) - uuid = self._get_owner(mail) - if uuid is None: - log.msg("Don't know how to deliver mail %r, skipping..." % - filepath.path) - return - log.msg("Mail owner: %s" % (uuid,)) - - if uuid is None: - log.msg("BUG: There was no uuid!") - return - - pubkey = yield self._users_cdb.getPubKey(uuid) - - log.msg("Encrypting message to %s's pubkey" % (uuid,)) - doc = yield self._encrypt_message(pubkey, mail_data) - - do_remove = yield self._export_message(uuid, doc) - yield self._conditional_remove(do_remove, filepath) - except Exception: - log.err() + yield self._step_process_mail_backend(filepath) + except Exception as e: + log.msg("Something went wrong while processing {0!r}: {1!r}" + .format(filepath, e)) |