summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--changes/bug_process_skipped_mail4
-rw-r--r--src/leap/mx/mail_receiver.py163
2 files changed, 120 insertions, 47 deletions
diff --git a/changes/bug_process_skipped_mail b/changes/bug_process_skipped_mail
new file mode 100644
index 0000000..3ec6da2
--- /dev/null
+++ b/changes/bug_process_skipped_mail
@@ -0,0 +1,4 @@
+ o Some mail may be skipped at processing because of possible
+ problems (like connectivity issues to our couch nodes), MX now
+ looks for unprocessed mails every half hour and tries to process
+ them. Fixes #3628. \ No newline at end of file
diff --git a/src/leap/mx/mail_receiver.py b/src/leap/mx/mail_receiver.py
index 8fcadce..3890eb2 100644
--- a/src/leap/mx/mail_receiver.py
+++ b/src/leap/mx/mail_receiver.py
@@ -35,7 +35,7 @@ except ImportError:
from email import message_from_string
from twisted.application.service import Service
-from twisted.internet import inotify
+from twisted.internet import inotify, defer, task
from twisted.python import filepath, log
from leap.common.mail import get_email_charset
@@ -74,6 +74,7 @@ class MailReceiver(Service):
self._users_cdb = users_cdb
self._directories = directories
self._domain = socket.gethostbyaddr(socket.gethostname())[0]
+ self._processing_skipped = False
def startService(self):
"""
@@ -91,33 +92,34 @@ class MailReceiver(Service):
callbacks=[self._process_incoming_email],
recursive=recursive)
- def _encrypt_message(self, pubkey, uuid, message):
+ self._lcall = task.LoopingCall(self._process_skipped)
+ # Run once every half an hour, but don't start right now
+ self._lcall.start(interval=60*30, now=False)
+
+ def _encrypt_message(self, pubkey, message):
"""
- Given a UUID, a public key and a message, it encrypts the
- message to that public key.
+ Given a public key and a message, it encrypts the message to
+ that public key.
The address is needed in order to build the OpenPGPKey object.
- :param uuid_pubkey: tuple that holds the uuid and the public
- key as it is returned by the previous call in the
- chain
- :type uuid_pubkey: tuple (str, str)
+ :param pubkey: public key for the owner of the message
+ :type pubkey: str
:param message: message contents
:type message: str
- :return: uuid, doc to sync with Soledad or None, None if
- something went wrong.
- :rtype: tuple(str, SoledadDocument)
+ :return: doc to sync with Soledad or None, None if something
+ went wrong.
+ :rtype: SoledadDocument
"""
- if uuid is None or pubkey is None or len(pubkey) == 0:
+ if pubkey is None or len(pubkey) == 0:
log.msg("_encrypt_message: Something went wrong, here's all "
- "I know: %r | %r" % (uuid, pubkey))
- return None, None
-
- log.msg("Encrypting message to %s's pubkey" % (uuid,))
+ "I know: %r" % (pubkey,))
+ return None
doc = SoledadDocument(doc_id=str(pyuuid.uuid4()))
- encoding = get_email_charset(message, default=None)
+ encoding = get_email_charset(message.decode("utf8", "replace"),
+ default=None)
if encoding is None:
result = chardet.detect(message)
encoding = result["encoding"]
@@ -130,7 +132,7 @@ class MailReceiver(Service):
ENC_SCHEME_KEY: EncryptionSchemes.NONE,
ENC_JSON_KEY: json.dumps(data, encoding=encoding)
}
- return uuid, doc
+ return doc
openpgp_key = None
with openpgp.TempGPGWrapper(gpgbinary='/usr/bin/gpg') as gpg:
@@ -149,22 +151,23 @@ class MailReceiver(Service):
symmetric=False))
}
- return uuid, doc
+ return doc
- def _export_message(self, uuid_doc):
+ def _export_message(self, uuid, doc):
"""
Given a UUID and a SoledadDocument, it saves it directly in the
couchdb that serves as a backend for Soledad, in a db
accessible to the recipient of the mail.
- :param uuid_doc: tuple that holds the UUID and SoledadDocument
- :type uuid_doc: tuple(str, SoledadDocument)
+ :param uuid: the mail owner's uuid
+ :type uuid: str
+ :param doc: SoledadDocument that represents the email
+ :type doc: SoledadDocument
:return: True if it's ok to remove the message, False
otherwise
:rtype: bool
"""
- uuid, doc = uuid_doc
if uuid is None or doc is None:
log.msg("_export_message: Something went wrong, here's all "
"I know: %r | %r" % (uuid, doc))
@@ -213,6 +216,8 @@ class MailReceiver(Service):
uuid = None
delivereds = mail.get_all("Delivered-To")
+ if delivereds is None:
+ return None
for to in delivereds:
name, addr = email.utils.parseaddr(to)
parts = addr.split("@")
@@ -222,6 +227,87 @@ class MailReceiver(Service):
return uuid
+ def sleep(self, secs):
+ """
+ Async sleep for a defer. Use this when you want to wait for
+ another (non atomic) defer to finish.
+
+ :param secs: seconds to wait (not really accurate, it depends
+ on the event queue)
+ :type secs: int
+
+ :rtype: twisted.internet.defer.Deferred
+ """
+ from twisted.internet import reactor
+ d = defer.Deferred()
+ reactor.callLater(secs, d.callback, None)
+ return d
+
+ @defer.inlineCallbacks
+ def _process_skipped(self):
+ """
+ Recursively or not (depending on the configuration) process
+ all the watched directories for unprocessed mail and try to
+ process it.
+ """
+ if self._processing_skipped:
+ defer.returnValue(None)
+
+ self._processing_skipped = True
+ log.msg("Starting processing skipped mail...")
+ log.msg("-"*50)
+
+ for directory, recursive in self._directories:
+ for root, dirs, files in os.walk(directory):
+ for fname in files:
+ fullpath = os.path.join(root, fname)
+ fpath = filepath.FilePath(fullpath)
+ yield self._step_process_mail_backend(fpath)
+
+ if not recursive:
+ break
+
+ self._processing_skipped = False
+ log.msg("+"*50)
+ log.msg("Done processing skipped mail")
+
+ @defer.inlineCallbacks
+ def _step_process_mail_backend(self, filepath):
+ """
+ Processes the email pointed by filepath in an async
+ fashion. yield this method in another inlineCallbacks method
+ or return it for it to be run.
+
+ :param filepath: Path of the file that changed
+ :type filepath: twisted.python.filepath.FilePath
+ """
+ log.msg("Processing new mail at %r" % (filepath.path,))
+ with filepath.open("r") as f:
+ mail_data = f.read()
+ mail = message_from_string(mail_data)
+ uuid = self._get_owner(mail)
+ if uuid is None:
+ log.msg("Don't know how to deliver mail %r, skipping..." %
+ (filepath.path,))
+ defer.returnValue(None)
+ log.msg("Mail owner: %s" % (uuid,))
+
+ if uuid is None:
+ log.msg("BUG: There was no uuid!")
+ defer.returnValue(None)
+
+ pubkey = yield self._users_cdb.getPubKey(uuid)
+ if pubkey is None or len(pubkey):
+ log.msg("No public key, stopping the processing chain")
+ defer.returnValue(None)
+
+ log.msg("Encrypting message to %s's pubkey" % (uuid,))
+ doc = yield self._encrypt_message(pubkey, mail_data)
+
+ do_remove = yield self._export_message(uuid, doc)
+ yield self._conditional_remove(do_remove, filepath)
+
+ @defer.inlineCallbacks
def _process_incoming_email(self, otherself, filepath, mask):
"""
Callback that processes incoming email.
@@ -236,28 +322,11 @@ class MailReceiver(Service):
:type mask: int
"""
try:
+ while self._processing_skipped:
+ log.msg("Waiting for the process of skipped mail to be done...")
+ yield self.sleep(10) # NO-OP
if os.path.split(filepath.dirname())[-1] == "new":
- log.msg("Processing new mail at %r" % (filepath.path,))
- with filepath.open("r") as f:
- mail_data = f.read()
- mail = message_from_string(mail_data)
- uuid = self._get_owner(mail)
- if uuid is None:
- log.msg("Don't know how to deliver mail %r, skipping..." %
- filepath.path)
- return
- log.msg("Mail owner: %s" % (uuid,))
-
- if uuid is None:
- log.msg("BUG: There was no uuid!")
- return
-
- d = self._users_cdb.getPubKey(uuid)
- d.addCallbacks(self._encrypt_message, log.err,
- (uuid, mail_data))
- d.addCallbacks(self._export_message, log.err)
- d.addCallbacks(self._conditional_remove, log.err,
- (filepath,))
- d.addErrback(log.err)
- except Exception:
- log.err()
+ yield self._step_process_mail_backend(filepath)
+ except Exception as e:
+ log.msg("Something went wrong while processing {0!r}: {1!r}"
+ .format(filepath, e))