From e108a2ffec444c09b3661379a1051fda1f9952cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Touceda?= Date: Tue, 5 Nov 2013 14:47:13 -0300 Subject: Use inline defers for more readable code --- src/leap/mx/mail_receiver.py | 57 ++++++++++++++++++++++---------------------- 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/src/leap/mx/mail_receiver.py b/src/leap/mx/mail_receiver.py index 8fcadce..741b99d 100644 --- a/src/leap/mx/mail_receiver.py +++ b/src/leap/mx/mail_receiver.py @@ -35,7 +35,7 @@ except ImportError: from email import message_from_string from twisted.application.service import Service -from twisted.internet import inotify +from twisted.internet import inotify, defer from twisted.python import filepath, log from leap.common.mail import get_email_charset @@ -91,33 +91,30 @@ class MailReceiver(Service): callbacks=[self._process_incoming_email], recursive=recursive) - def _encrypt_message(self, pubkey, uuid, message): + def _encrypt_message(self, pubkey, message): """ - Given a UUID, a public key and a message, it encrypts the - message to that public key. + Given a public key and a message, it encrypts the message to + that public key. The address is needed in order to build the OpenPGPKey object. - :param uuid_pubkey: tuple that holds the uuid and the public - key as it is returned by the previous call in the - chain - :type uuid_pubkey: tuple (str, str) + :param pubkey: public key for the owner of the message + :type pubkey: str :param message: message contents :type message: str - :return: uuid, doc to sync with Soledad or None, None if - something went wrong. - :rtype: tuple(str, SoledadDocument) + :return: doc to sync with Soledad or None, None if something + went wrong. + :rtype: SoledadDocument """ - if uuid is None or pubkey is None or len(pubkey) == 0: + if pubkey is None or len(pubkey) == 0: log.msg("_encrypt_message: Something went wrong, here's all " - "I know: %r | %r" % (uuid, pubkey)) + "I know: %r" % (pubkey,)) return None, None - log.msg("Encrypting message to %s's pubkey" % (uuid,)) - doc = SoledadDocument(doc_id=str(pyuuid.uuid4())) - encoding = get_email_charset(message, default=None) + encoding = get_email_charset(message.decode("utf8", "replace"), + default=None) if encoding is None: result = chardet.detect(message) encoding = result["encoding"] @@ -130,7 +127,7 @@ class MailReceiver(Service): ENC_SCHEME_KEY: EncryptionSchemes.NONE, ENC_JSON_KEY: json.dumps(data, encoding=encoding) } - return uuid, doc + return doc openpgp_key = None with openpgp.TempGPGWrapper(gpgbinary='/usr/bin/gpg') as gpg: @@ -149,22 +146,23 @@ class MailReceiver(Service): symmetric=False)) } - return uuid, doc + return doc - def _export_message(self, uuid_doc): + def _export_message(self, uuid, doc): """ Given a UUID and a SoledadDocument, it saves it directly in the couchdb that serves as a backend for Soledad, in a db accessible to the recipient of the mail. - :param uuid_doc: tuple that holds the UUID and SoledadDocument - :type uuid_doc: tuple(str, SoledadDocument) + :param uuid: the mail owner's uuid + :type uuid: str + :param doc: SoledadDocument that represents the email + :type doc: SoledadDocument :return: True if it's ok to remove the message, False otherwise :rtype: bool """ - uuid, doc = uuid_doc if uuid is None or doc is None: log.msg("_export_message: Something went wrong, here's all " "I know: %r | %r" % (uuid, doc)) @@ -222,6 +220,7 @@ class MailReceiver(Service): return uuid + @defer.inlineCallbacks def _process_incoming_email(self, otherself, filepath, mask): """ Callback that processes incoming email. @@ -252,12 +251,12 @@ class MailReceiver(Service): log.msg("BUG: There was no uuid!") return - d = self._users_cdb.getPubKey(uuid) - d.addCallbacks(self._encrypt_message, log.err, - (uuid, mail_data)) - d.addCallbacks(self._export_message, log.err) - d.addCallbacks(self._conditional_remove, log.err, - (filepath,)) - d.addErrback(log.err) + pubkey = yield self._users_cdb.getPubKey(uuid) + + log.msg("Encrypting message to %s's pubkey" % (uuid,)) + doc = yield self._encrypt_message(pubkey, mail_data) + + do_remove = yield self._export_message(uuid, doc) + yield self._conditional_remove(do_remove, filepath) except Exception: log.err() -- cgit v1.2.3 From 3fc38cfa269fd7a8fa602aaf2a3e025b5403b71b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Touceda?= Date: Wed, 6 Nov 2013 14:27:24 -0300 Subject: Refactor mail processing routine --- src/leap/mx/mail_receiver.py | 48 ++++++++++++++++++++++++-------------------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/src/leap/mx/mail_receiver.py b/src/leap/mx/mail_receiver.py index 741b99d..785be2e 100644 --- a/src/leap/mx/mail_receiver.py +++ b/src/leap/mx/mail_receiver.py @@ -220,6 +220,31 @@ class MailReceiver(Service): return uuid + @defer.inlineCallbacks + def _step_process_mail_backend(self, filepath): + log.msg("Processing new mail at %r" % (filepath.path,)) + with filepath.open("r") as f: + mail_data = f.read() + mail = message_from_string(mail_data) + uuid = self._get_owner(mail) + if uuid is None: + log.msg("Don't know how to deliver mail %r, skipping..." % + filepath.path) + defer.returnValue(None) + log.msg("Mail owner: %s" % (uuid,)) + + if uuid is None: + log.msg("BUG: There was no uuid!") + defer.returnValue(None) + + pubkey = yield self._users_cdb.getPubKey(uuid) + + log.msg("Encrypting message to %s's pubkey" % (uuid,)) + doc = yield self._encrypt_message(pubkey, mail_data) + + do_remove = yield self._export_message(uuid, doc) + yield self._conditional_remove(do_remove, filepath) + @defer.inlineCallbacks def _process_incoming_email(self, otherself, filepath, mask): """ @@ -236,27 +261,6 @@ class MailReceiver(Service): """ try: if os.path.split(filepath.dirname())[-1] == "new": - log.msg("Processing new mail at %r" % (filepath.path,)) - with filepath.open("r") as f: - mail_data = f.read() - mail = message_from_string(mail_data) - uuid = self._get_owner(mail) - if uuid is None: - log.msg("Don't know how to deliver mail %r, skipping..." % - filepath.path) - return - log.msg("Mail owner: %s" % (uuid,)) - - if uuid is None: - log.msg("BUG: There was no uuid!") - return - - pubkey = yield self._users_cdb.getPubKey(uuid) - - log.msg("Encrypting message to %s's pubkey" % (uuid,)) - doc = yield self._encrypt_message(pubkey, mail_data) - - do_remove = yield self._export_message(uuid, doc) - yield self._conditional_remove(do_remove, filepath) + yield self._step_process_mail_backend(filepath) except Exception: log.err() -- cgit v1.2.3 From 2db1b2a7187e7ebc543d5c01e585fdc3be7dcb1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Touceda?= Date: Thu, 7 Nov 2013 09:46:28 -0300 Subject: Implement processing skipped mail regularly --- src/leap/mx/mail_receiver.py | 76 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 71 insertions(+), 5 deletions(-) diff --git a/src/leap/mx/mail_receiver.py b/src/leap/mx/mail_receiver.py index 785be2e..3890eb2 100644 --- a/src/leap/mx/mail_receiver.py +++ b/src/leap/mx/mail_receiver.py @@ -35,7 +35,7 @@ except ImportError: from email import message_from_string from twisted.application.service import Service -from twisted.internet import inotify, defer +from twisted.internet import inotify, defer, task from twisted.python import filepath, log from leap.common.mail import get_email_charset @@ -74,6 +74,7 @@ class MailReceiver(Service): self._users_cdb = users_cdb self._directories = directories self._domain = socket.gethostbyaddr(socket.gethostname())[0] + self._processing_skipped = False def startService(self): """ @@ -91,6 +92,10 @@ class MailReceiver(Service): callbacks=[self._process_incoming_email], recursive=recursive) + self._lcall = task.LoopingCall(self._process_skipped) + # Run once every half an hour, but don't start right now + self._lcall.start(interval=60*30, now=False) + def _encrypt_message(self, pubkey, message): """ Given a public key and a message, it encrypts the message to @@ -109,7 +114,7 @@ class MailReceiver(Service): if pubkey is None or len(pubkey) == 0: log.msg("_encrypt_message: Something went wrong, here's all " "I know: %r" % (pubkey,)) - return None, None + return None doc = SoledadDocument(doc_id=str(pyuuid.uuid4())) @@ -211,6 +216,8 @@ class MailReceiver(Service): uuid = None delivereds = mail.get_all("Delivered-To") + if delivereds is None: + return None for to in delivereds: name, addr = email.utils.parseaddr(to) parts = addr.split("@") @@ -220,8 +227,60 @@ class MailReceiver(Service): return uuid + def sleep(self, secs): + """ + Async sleep for a defer. Use this when you want to wait for + another (non atomic) defer to finish. + + :param secs: seconds to wait (not really accurate, it depends + on the event queue) + :type secs: int + + :rtype: twisted.internet.defer.Deferred + """ + from twisted.internet import reactor + d = defer.Deferred() + reactor.callLater(secs, d.callback, None) + return d + + @defer.inlineCallbacks + def _process_skipped(self): + """ + Recursively or not (depending on the configuration) process + all the watched directories for unprocessed mail and try to + process it. + """ + if self._processing_skipped: + defer.returnValue(None) + + self._processing_skipped = True + log.msg("Starting processing skipped mail...") + log.msg("-"*50) + + for directory, recursive in self._directories: + for root, dirs, files in os.walk(directory): + for fname in files: + fullpath = os.path.join(root, fname) + fpath = filepath.FilePath(fullpath) + yield self._step_process_mail_backend(fpath) + + if not recursive: + break + + self._processing_skipped = False + log.msg("+"*50) + log.msg("Done processing skipped mail") + @defer.inlineCallbacks def _step_process_mail_backend(self, filepath): + """ + Processes the email pointed by filepath in an async + fashion. yield this method in another inlineCallbacks method + or return it for it to be run. + + :param filepath: Path of the file that changed + :type filepath: twisted.python.filepath.FilePath + """ log.msg("Processing new mail at %r" % (filepath.path,)) with filepath.open("r") as f: mail_data = f.read() @@ -229,7 +288,7 @@ class MailReceiver(Service): uuid = self._get_owner(mail) if uuid is None: log.msg("Don't know how to deliver mail %r, skipping..." % - filepath.path) + (filepath.path,)) defer.returnValue(None) log.msg("Mail owner: %s" % (uuid,)) @@ -238,6 +297,9 @@ class MailReceiver(Service): defer.returnValue(None) pubkey = yield self._users_cdb.getPubKey(uuid) + if pubkey is None or len(pubkey): + log.msg("No public key, stopping the processing chain") + defer.returnValue(None) log.msg("Encrypting message to %s's pubkey" % (uuid,)) doc = yield self._encrypt_message(pubkey, mail_data) @@ -260,7 +322,11 @@ class MailReceiver(Service): :type mask: int """ try: + while self._processing_skipped: + log.msg("Waiting for the process of skipped mail to be done...") + yield self.sleep(10) # NO-OP if os.path.split(filepath.dirname())[-1] == "new": yield self._step_process_mail_backend(filepath) - except Exception: - log.err() + except Exception as e: + log.msg("Something went wrong while processing {0!r}: {1!r}" + .format(filepath, e)) -- cgit v1.2.3 From 66768604da247575a84618ffb2bfd88f70b02974 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Touceda?= Date: Thu, 7 Nov 2013 10:40:12 -0300 Subject: Add changes file --- changes/bug_process_skipped_mail | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 changes/bug_process_skipped_mail diff --git a/changes/bug_process_skipped_mail b/changes/bug_process_skipped_mail new file mode 100644 index 0000000..3ec6da2 --- /dev/null +++ b/changes/bug_process_skipped_mail @@ -0,0 +1,4 @@ + o Some mail may be skipped at processing because of possible + problems (like connectivity issues to our couch nodes), MX now + looks for unprocessed mails every half hour and tries to process + them. Fixes #3628. \ No newline at end of file -- cgit v1.2.3 From f2c46e548138833f160cd081b0a8442bc3ac7437 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Touceda?= Date: Fri, 15 Nov 2013 10:41:29 -0300 Subject: Fold in changes --- CHANGELOG | 6 ++++++ changes/bug_process_skipped_mail | 4 ---- 2 files changed, 6 insertions(+), 4 deletions(-) delete mode 100644 changes/bug_process_skipped_mail diff --git a/CHANGELOG b/CHANGELOG index 67ff1e5..cbd9c74 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,9 @@ +0.3.4 Nov 15: + o Some mail may be skipped at processing because of possible + problems (like connectivity issues to our couch nodes), MX now + looks for unprocessed mails every half hour and tries to process + them. Fixes #3628. + 0.3.3 Nov 1: o Fix return codes for check recipient access. Fixes #3356. o Improve logging in general and support possible unicode parameters diff --git a/changes/bug_process_skipped_mail b/changes/bug_process_skipped_mail deleted file mode 100644 index 3ec6da2..0000000 --- a/changes/bug_process_skipped_mail +++ /dev/null @@ -1,4 +0,0 @@ - o Some mail may be skipped at processing because of possible - problems (like connectivity issues to our couch nodes), MX now - looks for unprocessed mails every half hour and tries to process - them. Fixes #3628. \ No newline at end of file -- cgit v1.2.3