From 54045366aa275fd43ab65e450ffa23f03db6bb72 Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 6 Feb 2014 15:46:01 -0200 Subject: Flush IMAP data to disk when stopping. Closes #5095. --- .../feature_5095_flush-data-to-disk-when-stopping | 1 + mail/src/leap/mail/imap/memorystore.py | 22 ++++++++++----- mail/src/leap/mail/imap/service/imap.py | 31 ++++++++++++++++++++++ mail/src/leap/mail/messageflow.py | 11 ++++++++ 4 files changed, 59 insertions(+), 6 deletions(-) create mode 100644 mail/changes/feature_5095_flush-data-to-disk-when-stopping (limited to 'mail') diff --git a/mail/changes/feature_5095_flush-data-to-disk-when-stopping b/mail/changes/feature_5095_flush-data-to-disk-when-stopping new file mode 100644 index 00000000..d7c1ce7e --- /dev/null +++ b/mail/changes/feature_5095_flush-data-to-disk-when-stopping @@ -0,0 +1 @@ + o Flush IMAP data to disk when stopping. Closes #5095. diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py index 9c7973d4..3eba59a0 100644 --- a/mail/src/leap/mail/imap/memorystore.py +++ b/mail/src/leap/mail/imap/memorystore.py @@ -875,6 +875,15 @@ class MemoryStore(object): self.remove_message(mbox, uid) return mem_deleted + def stop_and_flush(self): + """ + Stop the write loop and trigger a write to the producer. + """ + self._stop_write_loop() + if self._permanent_store is not None: + self.write_messages(self._permanent_store) + self.producer.flush() + def expunge(self, mbox, observer): """ Remove all messages flagged \\Deleted, from the Memory Store @@ -890,12 +899,9 @@ class MemoryStore(object): """ soledad_store = self._permanent_store try: - # 1. Stop the writing call - self._stop_write_loop() - # 2. Enqueue a last write. - self.write_messages(soledad_store) - # 3. Wait on the writebacks to finish - + # Stop and trigger last write + self.stop_and_flush() + # Wait on the writebacks to finish pending_deferreds = (self._new_deferreds.get(mbox, []) + self._dirty_deferreds.get(mbox, [])) d1 = defer.gatherResults(pending_deferreds, consumeErrors=True) @@ -962,6 +968,10 @@ class MemoryStore(object): # are done (gatherResults) return getattr(self, self.WRITING_FLAG) + @property + def permanent_store(self): + return self._permanent_store + # Memory management. def get_size(self): diff --git a/mail/src/leap/mail/imap/service/imap.py b/mail/src/leap/mail/imap/service/imap.py index 5487cfc8..93df51d1 100644 --- a/mail/src/leap/mail/imap/service/imap.py +++ b/mail/src/leap/mail/imap/service/imap.py @@ -19,7 +19,9 @@ Imap service initialization """ import logging import os +import time +from twisted.internet import defer, threads from twisted.internet.protocol import ServerFactory from twisted.internet.error import CannotListenError from twisted.mail import imap4 @@ -122,6 +124,35 @@ class LeapIMAPFactory(ServerFactory): imapProtocol.factory = self return imapProtocol + def doStop(self, cv): + """ + Stops imap service (fetcher, factory and port). + + :param cv: A condition variable to which we can signal when imap + indeed stops. + :type cv: threading.Condition + :return: a Deferred that stops and flushes the in memory store data to + disk in another thread. + :rtype: Deferred + """ + ServerFactory.doStop(self) + + def _stop_imap_cb(): + logger.debug('Stopping in memory store.') + self._memstore.stop_and_flush() + while not self._memstore.producer.is_queue_empty(): + logger.debug('Waiting for queue to be empty.') + # TODO use a gatherResults over the new/dirty deferred list, + # as in memorystore's expunge() method. + time.sleep(1) + # notify that service has stopped + logger.debug('Notifying that service has stopped.') + cv.acquire() + cv.notify() + cv.release() + + return threads.deferToThread(_stop_imap_cb) + def run_service(*args, **kwargs): """ diff --git a/mail/src/leap/mail/messageflow.py b/mail/src/leap/mail/messageflow.py index b7fc0304..80121c85 100644 --- a/mail/src/leap/mail/messageflow.py +++ b/mail/src/leap/mail/messageflow.py @@ -64,6 +64,11 @@ class IMessageProducer(Interface): Stop producing items. """ + def flush(self): + """ + Flush queued messages to consumer. + """ + class DummyMsgConsumer(object): @@ -162,6 +167,12 @@ class MessageProducer(object): if self._loop.running: self._loop.stop() + def flush(self): + """ + Flush queued messages to consumer. + """ + self._check_for_new() + if __name__ == "__main__": from twisted.internet import reactor -- cgit v1.2.3