summaryrefslogtreecommitdiff
path: root/src/leap/mx
diff options
context:
space:
mode:
authorTomás Touceda <chiiph@leap.se>2013-11-07 09:46:28 -0300
committerTomás Touceda <chiiph@leap.se>2013-11-08 07:22:26 -0300
commit2db1b2a7187e7ebc543d5c01e585fdc3be7dcb1b (patch)
treeca7a67b9ad4c1de7013a566390bb1425675eb169 /src/leap/mx
parent3fc38cfa269fd7a8fa602aaf2a3e025b5403b71b (diff)
Implement processing skipped mail regularly
Diffstat (limited to 'src/leap/mx')
-rw-r--r--src/leap/mx/mail_receiver.py76
1 files changed, 71 insertions, 5 deletions
diff --git a/src/leap/mx/mail_receiver.py b/src/leap/mx/mail_receiver.py
index 785be2e..3890eb2 100644
--- a/src/leap/mx/mail_receiver.py
+++ b/src/leap/mx/mail_receiver.py
@@ -35,7 +35,7 @@ except ImportError:
from email import message_from_string
from twisted.application.service import Service
-from twisted.internet import inotify, defer
+from twisted.internet import inotify, defer, task
from twisted.python import filepath, log
from leap.common.mail import get_email_charset
@@ -74,6 +74,7 @@ class MailReceiver(Service):
self._users_cdb = users_cdb
self._directories = directories
self._domain = socket.gethostbyaddr(socket.gethostname())[0]
+ self._processing_skipped = False
def startService(self):
"""
@@ -91,6 +92,10 @@ class MailReceiver(Service):
callbacks=[self._process_incoming_email],
recursive=recursive)
+ self._lcall = task.LoopingCall(self._process_skipped)
+ # Run once every half an hour, but don't start right now
+ self._lcall.start(interval=60*30, now=False)
+
def _encrypt_message(self, pubkey, message):
"""
Given a public key and a message, it encrypts the message to
@@ -109,7 +114,7 @@ class MailReceiver(Service):
if pubkey is None or len(pubkey) == 0:
log.msg("_encrypt_message: Something went wrong, here's all "
"I know: %r" % (pubkey,))
- return None, None
+ return None
doc = SoledadDocument(doc_id=str(pyuuid.uuid4()))
@@ -211,6 +216,8 @@ class MailReceiver(Service):
uuid = None
delivereds = mail.get_all("Delivered-To")
+ if delivereds is None:
+ return None
for to in delivereds:
name, addr = email.utils.parseaddr(to)
parts = addr.split("@")
@@ -220,8 +227,60 @@ class MailReceiver(Service):
return uuid
+ def sleep(self, secs):
+ """
+ Async sleep for a defer. Use this when you want to wait for
+ another (non atomic) defer to finish.
+
+ :param secs: seconds to wait (not really accurate, it depends
+ on the event queue)
+ :type secs: int
+
+ :rtype: twisted.internet.defer.Deferred
+ """
+ from twisted.internet import reactor
+ d = defer.Deferred()
+ reactor.callLater(secs, d.callback, None)
+ return d
+
+ @defer.inlineCallbacks
+ def _process_skipped(self):
+ """
+ Recursively or not (depending on the configuration) process
+ all the watched directories for unprocessed mail and try to
+ process it.
+ """
+ if self._processing_skipped:
+ defer.returnValue(None)
+
+ self._processing_skipped = True
+ log.msg("Starting processing skipped mail...")
+ log.msg("-"*50)
+
+ for directory, recursive in self._directories:
+ for root, dirs, files in os.walk(directory):
+ for fname in files:
+ fullpath = os.path.join(root, fname)
+ fpath = filepath.FilePath(fullpath)
+ yield self._step_process_mail_backend(fpath)
+
+ if not recursive:
+ break
+
+ self._processing_skipped = False
+ log.msg("+"*50)
+ log.msg("Done processing skipped mail")
+
@defer.inlineCallbacks
def _step_process_mail_backend(self, filepath):
+ """
+ Processes the email pointed by filepath in an async
+ fashion. yield this method in another inlineCallbacks method
+ or return it for it to be run.
+
+ :param filepath: Path of the file that changed
+ :type filepath: twisted.python.filepath.FilePath
+ """
log.msg("Processing new mail at %r" % (filepath.path,))
with filepath.open("r") as f:
mail_data = f.read()
@@ -229,7 +288,7 @@ class MailReceiver(Service):
uuid = self._get_owner(mail)
if uuid is None:
log.msg("Don't know how to deliver mail %r, skipping..." %
- filepath.path)
+ (filepath.path,))
defer.returnValue(None)
log.msg("Mail owner: %s" % (uuid,))
@@ -238,6 +297,9 @@ class MailReceiver(Service):
defer.returnValue(None)
pubkey = yield self._users_cdb.getPubKey(uuid)
+ if pubkey is None or len(pubkey):
+ log.msg("No public key, stopping the processing chain")
+ defer.returnValue(None)
log.msg("Encrypting message to %s's pubkey" % (uuid,))
doc = yield self._encrypt_message(pubkey, mail_data)
@@ -260,7 +322,11 @@ class MailReceiver(Service):
:type mask: int
"""
try:
+ while self._processing_skipped:
+ log.msg("Waiting for the process of skipped mail to be done...")
+ yield self.sleep(10) # NO-OP
if os.path.split(filepath.dirname())[-1] == "new":
yield self._step_process_mail_backend(filepath)
- except Exception:
- log.err()
+ except Exception as e:
+ log.msg("Something went wrong while processing {0!r}: {1!r}"
+ .format(filepath, e))