diff options
Diffstat (limited to 'src/leap')
-rw-r--r-- | src/leap/mx/mail_receiver.py | 68 |
1 files changed, 52 insertions, 16 deletions
diff --git a/src/leap/mx/mail_receiver.py b/src/leap/mx/mail_receiver.py index 5fa3863..630c982 100644 --- a/src/leap/mx/mail_receiver.py +++ b/src/leap/mx/mail_receiver.py @@ -143,6 +143,12 @@ class MailReceiver(Service): ERROR_DECRYPTING_KEY = "errdecr" PROCESS_SKIPPED_INTERVAL = 60 * 30 # every half an hour + """ + If there's a failure when trying to watch a directory for file creation, + the service will schedule a retry delayed by the following amount of time. + """ + RETRY_DIR_WATCH_DELAY = 60 * 5 # 5 minutes + def __init__(self, mail_couch_url, users_cdb, directories, bounce_from, bounce_subject): """ @@ -165,20 +171,15 @@ class MailReceiver(Service): :param bounce_subject: Subject line used in the bounced mail :type bounce_subject: str """ - # Service doesn't define an __init__ + # IService doesn't define an __init__ self._mail_couch_url = mail_couch_url self._users_cdb = users_cdb self._directories = directories - self._domain = socket.gethostbyaddr(socket.gethostname())[0] - self._processing_skipped = False - self._bounce_from = bounce_from self._bounce_subject = bounce_subject - def _signal_handler(sig_num, stack_frame): - self._process_skipped() - - signal.signal(signal.SIGUSR1, _signal_handler) + self._domain = socket.gethostbyaddr(socket.gethostname())[0] + self._processing_skipped = False def startService(self): """ @@ -188,19 +189,55 @@ class MailReceiver(Service): self.wm = inotify.INotify() self.wm.startReading() - # watch the mail directory for new files and process incoming mail - mask = inotify.IN_CREATE + # watch mail directories for new files to trigger processing of + # incoming mail for directory, recursive in self._directories: - log.msg("Watching %r --- Recursive: %r" % (directory, recursive)) - self.wm.watch(filepath.FilePath(directory), mask, - callbacks=[self._process_incoming_email], - recursive=recursive) + self._start_watching_dir(directory, recursive) - # schedule a periodic task to process skipped mail, but also run it + # schedule a periodic task to process skipped mail, and also run it # immediatelly self._lcall = task.LoopingCall(self._process_skipped) self._lcall.start(interval=self.PROCESS_SKIPPED_INTERVAL, now=True) + # catch SIGUSR1 to trigger processing of skipped mail + signal.signal( + signal.SIGUSR1, + lambda *_: self._process_skipped()) + + def stopService(self): + """ + Stop the MailReceiver service + """ + self.wm.stopReading() + self._lcall.stop() + + def _start_watching_dir(self, dirname, recursive): + """ + Start watching a directory to trigger processing of newly created + files. + + Will also add a delayed call to retry when failed for some reason. + """ + directory = filepath.FilePath(dirname) + try: + if not directory.isdir(): + raise OSError("Not a directory: '%s'" % directory.path) + self.wm.watch( + directory, + inotify.IN_CREATE, + callbacks=[self._process_incoming_email], + recursive=recursive) + log.msg("Watching %r --- Recursive: %r" % (directory, recursive)) + except Exception as e: + log.msg( + "Failed adding watch to %s, will try again in %s seconds: %s" + % (directory, self.RETRY_DIR_WATCH_DELAY, e)) + reactor.callLater( + self.RETRY_DIR_WATCH_DELAY, + self._start_watching_dir, + dirname, + recursive) + def _encrypt_message(self, pubkey, message): """ Given a public key and a message, it encrypts the message to @@ -380,7 +417,6 @@ class MailReceiver(Service): :rtype: twisted.internet.defer.Deferred """ - from twisted.internet import reactor d = defer.Deferred() reactor.callLater(secs, d.callback, None) return d |