diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/leap/mx/mail_receiver.py | 163 |
1 files changed, 116 insertions, 47 deletions
diff --git a/src/leap/mx/mail_receiver.py b/src/leap/mx/mail_receiver.py index 8fcadce..3890eb2 100644 --- a/src/leap/mx/mail_receiver.py +++ b/src/leap/mx/mail_receiver.py @@ -35,7 +35,7 @@ except ImportError: from email import message_from_string from twisted.application.service import Service -from twisted.internet import inotify +from twisted.internet import inotify, defer, task from twisted.python import filepath, log from leap.common.mail import get_email_charset @@ -74,6 +74,7 @@ class MailReceiver(Service): self._users_cdb = users_cdb self._directories = directories self._domain = socket.gethostbyaddr(socket.gethostname())[0] + self._processing_skipped = False def startService(self): """ @@ -91,33 +92,34 @@ class MailReceiver(Service): callbacks=[self._process_incoming_email], recursive=recursive) - def _encrypt_message(self, pubkey, uuid, message): + self._lcall = task.LoopingCall(self._process_skipped) + # Run once every half an hour, but don't start right now + self._lcall.start(interval=60*30, now=False) + + def _encrypt_message(self, pubkey, message): """ - Given a UUID, a public key and a message, it encrypts the - message to that public key. + Given a public key and a message, it encrypts the message to + that public key. The address is needed in order to build the OpenPGPKey object. - :param uuid_pubkey: tuple that holds the uuid and the public - key as it is returned by the previous call in the - chain - :type uuid_pubkey: tuple (str, str) + :param pubkey: public key for the owner of the message + :type pubkey: str :param message: message contents :type message: str - :return: uuid, doc to sync with Soledad or None, None if - something went wrong. - :rtype: tuple(str, SoledadDocument) + :return: doc to sync with Soledad or None, None if something + went wrong. + :rtype: SoledadDocument """ - if uuid is None or pubkey is None or len(pubkey) == 0: + if pubkey is None or len(pubkey) == 0: log.msg("_encrypt_message: Something went wrong, here's all " - "I know: %r | %r" % (uuid, pubkey)) - return None, None - - log.msg("Encrypting message to %s's pubkey" % (uuid,)) + "I know: %r" % (pubkey,)) + return None doc = SoledadDocument(doc_id=str(pyuuid.uuid4())) - encoding = get_email_charset(message, default=None) + encoding = get_email_charset(message.decode("utf8", "replace"), + default=None) if encoding is None: result = chardet.detect(message) encoding = result["encoding"] @@ -130,7 +132,7 @@ class MailReceiver(Service): ENC_SCHEME_KEY: EncryptionSchemes.NONE, ENC_JSON_KEY: json.dumps(data, encoding=encoding) } - return uuid, doc + return doc openpgp_key = None with openpgp.TempGPGWrapper(gpgbinary='/usr/bin/gpg') as gpg: @@ -149,22 +151,23 @@ class MailReceiver(Service): symmetric=False)) } - return uuid, doc + return doc - def _export_message(self, uuid_doc): + def _export_message(self, uuid, doc): """ Given a UUID and a SoledadDocument, it saves it directly in the couchdb that serves as a backend for Soledad, in a db accessible to the recipient of the mail. - :param uuid_doc: tuple that holds the UUID and SoledadDocument - :type uuid_doc: tuple(str, SoledadDocument) + :param uuid: the mail owner's uuid + :type uuid: str + :param doc: SoledadDocument that represents the email + :type doc: SoledadDocument :return: True if it's ok to remove the message, False otherwise :rtype: bool """ - uuid, doc = uuid_doc if uuid is None or doc is None: log.msg("_export_message: Something went wrong, here's all " "I know: %r | %r" % (uuid, doc)) @@ -213,6 +216,8 @@ class MailReceiver(Service): uuid = None delivereds = mail.get_all("Delivered-To") + if delivereds is None: + return None for to in delivereds: name, addr = email.utils.parseaddr(to) parts = addr.split("@") @@ -222,6 +227,87 @@ class MailReceiver(Service): return uuid + def sleep(self, secs): + """ + Async sleep for a defer. Use this when you want to wait for + another (non atomic) defer to finish. + + :param secs: seconds to wait (not really accurate, it depends + on the event queue) + :type secs: int + + :rtype: twisted.internet.defer.Deferred + """ + from twisted.internet import reactor + d = defer.Deferred() + reactor.callLater(secs, d.callback, None) + return d + + @defer.inlineCallbacks + def _process_skipped(self): + """ + Recursively or not (depending on the configuration) process + all the watched directories for unprocessed mail and try to + process it. + """ + if self._processing_skipped: + defer.returnValue(None) + + self._processing_skipped = True + log.msg("Starting processing skipped mail...") + log.msg("-"*50) + + for directory, recursive in self._directories: + for root, dirs, files in os.walk(directory): + for fname in files: + fullpath = os.path.join(root, fname) + fpath = filepath.FilePath(fullpath) + yield self._step_process_mail_backend(fpath) + + if not recursive: + break + + self._processing_skipped = False + log.msg("+"*50) + log.msg("Done processing skipped mail") + + @defer.inlineCallbacks + def _step_process_mail_backend(self, filepath): + """ + Processes the email pointed by filepath in an async + fashion. yield this method in another inlineCallbacks method + or return it for it to be run. + + :param filepath: Path of the file that changed + :type filepath: twisted.python.filepath.FilePath + """ + log.msg("Processing new mail at %r" % (filepath.path,)) + with filepath.open("r") as f: + mail_data = f.read() + mail = message_from_string(mail_data) + uuid = self._get_owner(mail) + if uuid is None: + log.msg("Don't know how to deliver mail %r, skipping..." % + (filepath.path,)) + defer.returnValue(None) + log.msg("Mail owner: %s" % (uuid,)) + + if uuid is None: + log.msg("BUG: There was no uuid!") + defer.returnValue(None) + + pubkey = yield self._users_cdb.getPubKey(uuid) + if pubkey is None or len(pubkey): + log.msg("No public key, stopping the processing chain") + defer.returnValue(None) + + log.msg("Encrypting message to %s's pubkey" % (uuid,)) + doc = yield self._encrypt_message(pubkey, mail_data) + + do_remove = yield self._export_message(uuid, doc) + yield self._conditional_remove(do_remove, filepath) + + @defer.inlineCallbacks def _process_incoming_email(self, otherself, filepath, mask): """ Callback that processes incoming email. @@ -236,28 +322,11 @@ class MailReceiver(Service): :type mask: int """ try: + while self._processing_skipped: + log.msg("Waiting for the process of skipped mail to be done...") + yield self.sleep(10) # NO-OP if os.path.split(filepath.dirname())[-1] == "new": - log.msg("Processing new mail at %r" % (filepath.path,)) - with filepath.open("r") as f: - mail_data = f.read() - mail = message_from_string(mail_data) - uuid = self._get_owner(mail) - if uuid is None: - log.msg("Don't know how to deliver mail %r, skipping..." % - filepath.path) - return - log.msg("Mail owner: %s" % (uuid,)) - - if uuid is None: - log.msg("BUG: There was no uuid!") - return - - d = self._users_cdb.getPubKey(uuid) - d.addCallbacks(self._encrypt_message, log.err, - (uuid, mail_data)) - d.addCallbacks(self._export_message, log.err) - d.addCallbacks(self._conditional_remove, log.err, - (filepath,)) - d.addErrback(log.err) - except Exception: - log.err() + yield self._step_process_mail_backend(filepath) + except Exception as e: + log.msg("Something went wrong while processing {0!r}: {1!r}" + .format(filepath, e)) |