diff options
author | Kali Kaneko <kali@leap.se> | 2014-02-07 03:24:23 -0400 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2014-02-07 03:24:23 -0400 |
commit | 287022942487c9d350660281c71f918e19f42533 (patch) | |
tree | ea059287fdd75432a3b36ad5ef35b61755992215 /src/leap/mail/imap/service/imap.py | |
parent | b887115370795f514c66bdc9c034acd3fb3c4376 (diff) | |
parent | 362aaec0897261973e58b4282f5c054985d1f113 (diff) |
Merge remote-tracking branch 'drebs/feature/in-memory-store-with-data-flushing' into develop
Diffstat (limited to 'src/leap/mail/imap/service/imap.py')
-rw-r--r-- | src/leap/mail/imap/service/imap.py | 31 |
1 files changed, 31 insertions, 0 deletions
diff --git a/src/leap/mail/imap/service/imap.py b/src/leap/mail/imap/service/imap.py index 5487cfc..93df51d 100644 --- a/src/leap/mail/imap/service/imap.py +++ b/src/leap/mail/imap/service/imap.py @@ -19,7 +19,9 @@ Imap service initialization """ import logging import os +import time +from twisted.internet import defer, threads from twisted.internet.protocol import ServerFactory from twisted.internet.error import CannotListenError from twisted.mail import imap4 @@ -122,6 +124,35 @@ class LeapIMAPFactory(ServerFactory): imapProtocol.factory = self return imapProtocol + def doStop(self, cv): + """ + Stops imap service (fetcher, factory and port). + + :param cv: A condition variable to which we can signal when imap + indeed stops. + :type cv: threading.Condition + :return: a Deferred that stops and flushes the in memory store data to + disk in another thread. + :rtype: Deferred + """ + ServerFactory.doStop(self) + + def _stop_imap_cb(): + logger.debug('Stopping in memory store.') + self._memstore.stop_and_flush() + while not self._memstore.producer.is_queue_empty(): + logger.debug('Waiting for queue to be empty.') + # TODO use a gatherResults over the new/dirty deferred list, + # as in memorystore's expunge() method. + time.sleep(1) + # notify that service has stopped + logger.debug('Notifying that service has stopped.') + cv.acquire() + cv.notify() + cv.release() + + return threads.deferToThread(_stop_imap_cb) + def run_service(*args, **kwargs): """ |