diff options
author | Tomás Touceda <chiiph@leap.se> | 2013-11-07 09:46:28 -0300 |
---|---|---|
committer | Tomás Touceda <chiiph@leap.se> | 2013-11-08 07:22:26 -0300 |
commit | 2db1b2a7187e7ebc543d5c01e585fdc3be7dcb1b (patch) | |
tree | ca7a67b9ad4c1de7013a566390bb1425675eb169 /src/leap/mx | |
parent | 3fc38cfa269fd7a8fa602aaf2a3e025b5403b71b (diff) |
Implement processing skipped mail regularly
Diffstat (limited to 'src/leap/mx')
-rw-r--r-- | src/leap/mx/mail_receiver.py | 76 |
1 files changed, 71 insertions, 5 deletions
diff --git a/src/leap/mx/mail_receiver.py b/src/leap/mx/mail_receiver.py index 785be2e..3890eb2 100644 --- a/src/leap/mx/mail_receiver.py +++ b/src/leap/mx/mail_receiver.py @@ -35,7 +35,7 @@ except ImportError: from email import message_from_string from twisted.application.service import Service -from twisted.internet import inotify, defer +from twisted.internet import inotify, defer, task from twisted.python import filepath, log from leap.common.mail import get_email_charset @@ -74,6 +74,7 @@ class MailReceiver(Service): self._users_cdb = users_cdb self._directories = directories self._domain = socket.gethostbyaddr(socket.gethostname())[0] + self._processing_skipped = False def startService(self): """ @@ -91,6 +92,10 @@ class MailReceiver(Service): callbacks=[self._process_incoming_email], recursive=recursive) + self._lcall = task.LoopingCall(self._process_skipped) + # Run once every half an hour, but don't start right now + self._lcall.start(interval=60*30, now=False) + def _encrypt_message(self, pubkey, message): """ Given a public key and a message, it encrypts the message to @@ -109,7 +114,7 @@ class MailReceiver(Service): if pubkey is None or len(pubkey) == 0: log.msg("_encrypt_message: Something went wrong, here's all " "I know: %r" % (pubkey,)) - return None, None + return None doc = SoledadDocument(doc_id=str(pyuuid.uuid4())) @@ -211,6 +216,8 @@ class MailReceiver(Service): uuid = None delivereds = mail.get_all("Delivered-To") + if delivereds is None: + return None for to in delivereds: name, addr = email.utils.parseaddr(to) parts = addr.split("@") @@ -220,8 +227,60 @@ class MailReceiver(Service): return uuid + def sleep(self, secs): + """ + Async sleep for a defer. Use this when you want to wait for + another (non atomic) defer to finish. + + :param secs: seconds to wait (not really accurate, it depends + on the event queue) + :type secs: int + + :rtype: twisted.internet.defer.Deferred + """ + from twisted.internet import reactor + d = defer.Deferred() + reactor.callLater(secs, d.callback, None) + return d + + @defer.inlineCallbacks + def _process_skipped(self): + """ + Recursively or not (depending on the configuration) process + all the watched directories for unprocessed mail and try to + process it. + """ + if self._processing_skipped: + defer.returnValue(None) + + self._processing_skipped = True + log.msg("Starting processing skipped mail...") + log.msg("-"*50) + + for directory, recursive in self._directories: + for root, dirs, files in os.walk(directory): + for fname in files: + fullpath = os.path.join(root, fname) + fpath = filepath.FilePath(fullpath) + yield self._step_process_mail_backend(fpath) + + if not recursive: + break + + self._processing_skipped = False + log.msg("+"*50) + log.msg("Done processing skipped mail") + @defer.inlineCallbacks def _step_process_mail_backend(self, filepath): + """ + Processes the email pointed by filepath in an async + fashion. yield this method in another inlineCallbacks method + or return it for it to be run. + + :param filepath: Path of the file that changed + :type filepath: twisted.python.filepath.FilePath + """ log.msg("Processing new mail at %r" % (filepath.path,)) with filepath.open("r") as f: mail_data = f.read() @@ -229,7 +288,7 @@ class MailReceiver(Service): uuid = self._get_owner(mail) if uuid is None: log.msg("Don't know how to deliver mail %r, skipping..." % - filepath.path) + (filepath.path,)) defer.returnValue(None) log.msg("Mail owner: %s" % (uuid,)) @@ -238,6 +297,9 @@ class MailReceiver(Service): defer.returnValue(None) pubkey = yield self._users_cdb.getPubKey(uuid) + if pubkey is None or len(pubkey): + log.msg("No public key, stopping the processing chain") + defer.returnValue(None) log.msg("Encrypting message to %s's pubkey" % (uuid,)) doc = yield self._encrypt_message(pubkey, mail_data) @@ -260,7 +322,11 @@ class MailReceiver(Service): :type mask: int """ try: + while self._processing_skipped: + log.msg("Waiting for the process of skipped mail to be done...") + yield self.sleep(10) # NO-OP if os.path.split(filepath.dirname())[-1] == "new": yield self._step_process_mail_backend(filepath) - except Exception: - log.err() + except Exception as e: + log.msg("Something went wrong while processing {0!r}: {1!r}" + .format(filepath, e)) |