diff options
| -rw-r--r-- | changes/bug_6687_log-and-retry-when-watched-dirs-dont-exist | 1 | ||||
| -rw-r--r-- | src/leap/mx/mail_receiver.py | 68 | 
2 files changed, 53 insertions, 16 deletions
| diff --git a/changes/bug_6687_log-and-retry-when-watched-dirs-dont-exist b/changes/bug_6687_log-and-retry-when-watched-dirs-dont-exist new file mode 100644 index 0000000..bddba01 --- /dev/null +++ b/changes/bug_6687_log-and-retry-when-watched-dirs-dont-exist @@ -0,0 +1 @@ +  o MX log and retry when delivery directories don't exist. Closes #6687. diff --git a/src/leap/mx/mail_receiver.py b/src/leap/mx/mail_receiver.py index 5fa3863..630c982 100644 --- a/src/leap/mx/mail_receiver.py +++ b/src/leap/mx/mail_receiver.py @@ -143,6 +143,12 @@ class MailReceiver(Service):      ERROR_DECRYPTING_KEY = "errdecr"      PROCESS_SKIPPED_INTERVAL = 60 * 30  # every half an hour +    """ +    If there's a failure when trying to watch a directory for file creation, +    the service will schedule a retry delayed by the following amount of time. +    """ +    RETRY_DIR_WATCH_DELAY = 60 * 5  # 5 minutes +      def __init__(self, mail_couch_url, users_cdb, directories, bounce_from,                   bounce_subject):          """ @@ -165,20 +171,15 @@ class MailReceiver(Service):          :param bounce_subject: Subject line used in the bounced mail          :type bounce_subject: str          """ -        # Service doesn't define an __init__ +        # IService doesn't define an __init__          self._mail_couch_url = mail_couch_url          self._users_cdb = users_cdb          self._directories = directories -        self._domain = socket.gethostbyaddr(socket.gethostname())[0] -        self._processing_skipped = False -          self._bounce_from = bounce_from          self._bounce_subject = bounce_subject -        def _signal_handler(sig_num, stack_frame): -            self._process_skipped() - -        signal.signal(signal.SIGUSR1, _signal_handler) +        self._domain = socket.gethostbyaddr(socket.gethostname())[0] +        self._processing_skipped = False      def startService(self):          """ @@ -188,19 +189,55 @@ class MailReceiver(Service):          self.wm = inotify.INotify()          self.wm.startReading() -        # watch the mail directory for new files and process incoming mail -        mask = inotify.IN_CREATE +        # watch mail directories for new files to trigger processing of +        # incoming mail          for directory, recursive in self._directories: -            log.msg("Watching %r --- Recursive: %r" % (directory, recursive)) -            self.wm.watch(filepath.FilePath(directory), mask, -                          callbacks=[self._process_incoming_email], -                          recursive=recursive) +            self._start_watching_dir(directory, recursive) -        # schedule a periodic task to process skipped mail, but also run it +        # schedule a periodic task to process skipped mail, and also run it          # immediatelly          self._lcall = task.LoopingCall(self._process_skipped)          self._lcall.start(interval=self.PROCESS_SKIPPED_INTERVAL, now=True) +        # catch SIGUSR1 to trigger processing of skipped mail +        signal.signal( +            signal.SIGUSR1, +            lambda *_: self._process_skipped()) + +    def stopService(self): +        """ +        Stop the MailReceiver service +        """ +        self.wm.stopReading() +        self._lcall.stop() + +    def _start_watching_dir(self, dirname, recursive): +        """ +        Start watching a directory to trigger processing of newly created +        files. + +        Will also add a delayed call to retry when failed for some reason. +        """ +        directory = filepath.FilePath(dirname) +        try: +            if not directory.isdir(): +                raise OSError("Not a directory: '%s'" % directory.path) +            self.wm.watch( +                directory, +                inotify.IN_CREATE, +                callbacks=[self._process_incoming_email], +                recursive=recursive) +            log.msg("Watching %r --- Recursive: %r" % (directory, recursive)) +        except Exception as e: +            log.msg( +                "Failed adding watch to %s, will try again in %s seconds: %s" +                % (directory, self.RETRY_DIR_WATCH_DELAY, e)) +            reactor.callLater( +                self.RETRY_DIR_WATCH_DELAY, +                self._start_watching_dir, +                dirname, +                recursive) +      def _encrypt_message(self, pubkey, message):          """          Given a public key and a message, it encrypts the message to @@ -380,7 +417,6 @@ class MailReceiver(Service):          :rtype: twisted.internet.defer.Deferred          """ -        from twisted.internet import reactor          d = defer.Deferred()          reactor.callLater(secs, d.callback, None)          return d | 
