diff options
author | Kali Kaneko <kali@leap.se> | 2014-02-07 03:24:23 -0400 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2014-02-07 03:24:23 -0400 |
commit | 287022942487c9d350660281c71f918e19f42533 (patch) | |
tree | ea059287fdd75432a3b36ad5ef35b61755992215 /src | |
parent | b887115370795f514c66bdc9c034acd3fb3c4376 (diff) | |
parent | 362aaec0897261973e58b4282f5c054985d1f113 (diff) |
Merge remote-tracking branch 'drebs/feature/in-memory-store-with-data-flushing' into develop
Diffstat (limited to 'src')
-rw-r--r-- | src/leap/mail/imap/memorystore.py | 22 | ||||
-rw-r--r-- | src/leap/mail/imap/service/imap.py | 31 | ||||
-rw-r--r-- | src/leap/mail/messageflow.py | 11 |
3 files changed, 58 insertions, 6 deletions
diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 9c7973d..3eba59a 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -875,6 +875,15 @@ class MemoryStore(object): self.remove_message(mbox, uid) return mem_deleted + def stop_and_flush(self): + """ + Stop the write loop and trigger a write to the producer. + """ + self._stop_write_loop() + if self._permanent_store is not None: + self.write_messages(self._permanent_store) + self.producer.flush() + def expunge(self, mbox, observer): """ Remove all messages flagged \\Deleted, from the Memory Store @@ -890,12 +899,9 @@ class MemoryStore(object): """ soledad_store = self._permanent_store try: - # 1. Stop the writing call - self._stop_write_loop() - # 2. Enqueue a last write. - self.write_messages(soledad_store) - # 3. Wait on the writebacks to finish - + # Stop and trigger last write + self.stop_and_flush() + # Wait on the writebacks to finish pending_deferreds = (self._new_deferreds.get(mbox, []) + self._dirty_deferreds.get(mbox, [])) d1 = defer.gatherResults(pending_deferreds, consumeErrors=True) @@ -962,6 +968,10 @@ class MemoryStore(object): # are done (gatherResults) return getattr(self, self.WRITING_FLAG) + @property + def permanent_store(self): + return self._permanent_store + # Memory management. def get_size(self): diff --git a/src/leap/mail/imap/service/imap.py b/src/leap/mail/imap/service/imap.py index 5487cfc..93df51d 100644 --- a/src/leap/mail/imap/service/imap.py +++ b/src/leap/mail/imap/service/imap.py @@ -19,7 +19,9 @@ Imap service initialization """ import logging import os +import time +from twisted.internet import defer, threads from twisted.internet.protocol import ServerFactory from twisted.internet.error import CannotListenError from twisted.mail import imap4 @@ -122,6 +124,35 @@ class LeapIMAPFactory(ServerFactory): imapProtocol.factory = self return imapProtocol + def doStop(self, cv): + """ + Stops imap service (fetcher, factory and port). + + :param cv: A condition variable to which we can signal when imap + indeed stops. + :type cv: threading.Condition + :return: a Deferred that stops and flushes the in memory store data to + disk in another thread. + :rtype: Deferred + """ + ServerFactory.doStop(self) + + def _stop_imap_cb(): + logger.debug('Stopping in memory store.') + self._memstore.stop_and_flush() + while not self._memstore.producer.is_queue_empty(): + logger.debug('Waiting for queue to be empty.') + # TODO use a gatherResults over the new/dirty deferred list, + # as in memorystore's expunge() method. + time.sleep(1) + # notify that service has stopped + logger.debug('Notifying that service has stopped.') + cv.acquire() + cv.notify() + cv.release() + + return threads.deferToThread(_stop_imap_cb) + def run_service(*args, **kwargs): """ diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py index b7fc030..80121c8 100644 --- a/src/leap/mail/messageflow.py +++ b/src/leap/mail/messageflow.py @@ -64,6 +64,11 @@ class IMessageProducer(Interface): Stop producing items. """ + def flush(self): + """ + Flush queued messages to consumer. + """ + class DummyMsgConsumer(object): @@ -162,6 +167,12 @@ class MessageProducer(object): if self._loop.running: self._loop.stop() + def flush(self): + """ + Flush queued messages to consumer. + """ + self._check_for_new() + if __name__ == "__main__": from twisted.internet import reactor |