summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/leap/mx/mail_receiver.py68
1 files changed, 52 insertions, 16 deletions
diff --git a/src/leap/mx/mail_receiver.py b/src/leap/mx/mail_receiver.py
index 5fa3863..630c982 100644
--- a/src/leap/mx/mail_receiver.py
+++ b/src/leap/mx/mail_receiver.py
@@ -143,6 +143,12 @@ class MailReceiver(Service):
ERROR_DECRYPTING_KEY = "errdecr"
PROCESS_SKIPPED_INTERVAL = 60 * 30 # every half an hour
+ """
+ If there's a failure when trying to watch a directory for file creation,
+ the service will schedule a retry delayed by the following amount of time.
+ """
+ RETRY_DIR_WATCH_DELAY = 60 * 5 # 5 minutes
+
def __init__(self, mail_couch_url, users_cdb, directories, bounce_from,
bounce_subject):
"""
@@ -165,20 +171,15 @@ class MailReceiver(Service):
:param bounce_subject: Subject line used in the bounced mail
:type bounce_subject: str
"""
- # Service doesn't define an __init__
+ # IService doesn't define an __init__
self._mail_couch_url = mail_couch_url
self._users_cdb = users_cdb
self._directories = directories
- self._domain = socket.gethostbyaddr(socket.gethostname())[0]
- self._processing_skipped = False
-
self._bounce_from = bounce_from
self._bounce_subject = bounce_subject
- def _signal_handler(sig_num, stack_frame):
- self._process_skipped()
-
- signal.signal(signal.SIGUSR1, _signal_handler)
+ self._domain = socket.gethostbyaddr(socket.gethostname())[0]
+ self._processing_skipped = False
def startService(self):
"""
@@ -188,19 +189,55 @@ class MailReceiver(Service):
self.wm = inotify.INotify()
self.wm.startReading()
- # watch the mail directory for new files and process incoming mail
- mask = inotify.IN_CREATE
+ # watch mail directories for new files to trigger processing of
+ # incoming mail
for directory, recursive in self._directories:
- log.msg("Watching %r --- Recursive: %r" % (directory, recursive))
- self.wm.watch(filepath.FilePath(directory), mask,
- callbacks=[self._process_incoming_email],
- recursive=recursive)
+ self._start_watching_dir(directory, recursive)
- # schedule a periodic task to process skipped mail, but also run it
+ # schedule a periodic task to process skipped mail, and also run it
# immediatelly
self._lcall = task.LoopingCall(self._process_skipped)
self._lcall.start(interval=self.PROCESS_SKIPPED_INTERVAL, now=True)
+ # catch SIGUSR1 to trigger processing of skipped mail
+ signal.signal(
+ signal.SIGUSR1,
+ lambda *_: self._process_skipped())
+
+ def stopService(self):
+ """
+ Stop the MailReceiver service
+ """
+ self.wm.stopReading()
+ self._lcall.stop()
+
+ def _start_watching_dir(self, dirname, recursive):
+ """
+ Start watching a directory to trigger processing of newly created
+ files.
+
+ Will also add a delayed call to retry when failed for some reason.
+ """
+ directory = filepath.FilePath(dirname)
+ try:
+ if not directory.isdir():
+ raise OSError("Not a directory: '%s'" % directory.path)
+ self.wm.watch(
+ directory,
+ inotify.IN_CREATE,
+ callbacks=[self._process_incoming_email],
+ recursive=recursive)
+ log.msg("Watching %r --- Recursive: %r" % (directory, recursive))
+ except Exception as e:
+ log.msg(
+ "Failed adding watch to %s, will try again in %s seconds: %s"
+ % (directory, self.RETRY_DIR_WATCH_DELAY, e))
+ reactor.callLater(
+ self.RETRY_DIR_WATCH_DELAY,
+ self._start_watching_dir,
+ dirname,
+ recursive)
+
def _encrypt_message(self, pubkey, message):
"""
Given a public key and a message, it encrypts the message to
@@ -380,7 +417,6 @@ class MailReceiver(Service):
:rtype: twisted.internet.defer.Deferred
"""
- from twisted.internet import reactor
d = defer.Deferred()
reactor.callLater(secs, d.callback, None)
return d