diff options
| author | Kali Kaneko <kali@leap.se> | 2014-02-07 03:24:23 -0400 | 
|---|---|---|
| committer | Kali Kaneko <kali@leap.se> | 2014-02-07 03:24:23 -0400 | 
| commit | 5c29bf0cfe91032764052bc925754d728276d149 (patch) | |
| tree | 9f60602268b9a11c80ad46986cb15f1380542262 | |
| parent | ad904c9a670b3d58689a811e2a200b2e7c4233eb (diff) | |
| parent | 54045366aa275fd43ab65e450ffa23f03db6bb72 (diff) | |
Merge remote-tracking branch 'drebs/feature/in-memory-store-with-data-flushing' into develop
| -rw-r--r-- | mail/changes/feature_5095_flush-data-to-disk-when-stopping | 1 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/memorystore.py | 22 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/service/imap.py | 31 | ||||
| -rw-r--r-- | mail/src/leap/mail/messageflow.py | 11 | 
4 files changed, 59 insertions, 6 deletions
| diff --git a/mail/changes/feature_5095_flush-data-to-disk-when-stopping b/mail/changes/feature_5095_flush-data-to-disk-when-stopping new file mode 100644 index 0000000..d7c1ce7 --- /dev/null +++ b/mail/changes/feature_5095_flush-data-to-disk-when-stopping @@ -0,0 +1 @@ +  o Flush IMAP data to disk when stopping. Closes #5095. diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py index 9c7973d..3eba59a 100644 --- a/mail/src/leap/mail/imap/memorystore.py +++ b/mail/src/leap/mail/imap/memorystore.py @@ -875,6 +875,15 @@ class MemoryStore(object):              self.remove_message(mbox, uid)          return mem_deleted +    def stop_and_flush(self): +        """ +        Stop the write loop and trigger a write to the producer. +        """ +        self._stop_write_loop() +        if self._permanent_store is not None: +            self.write_messages(self._permanent_store) +            self.producer.flush() +      def expunge(self, mbox, observer):          """          Remove all messages flagged \\Deleted, from the Memory Store @@ -890,12 +899,9 @@ class MemoryStore(object):          """          soledad_store = self._permanent_store          try: -            # 1. Stop the writing call -            self._stop_write_loop() -            # 2. Enqueue a last write. -            self.write_messages(soledad_store) -            # 3. Wait on the writebacks to finish - +            # Stop and trigger last write +            self.stop_and_flush() +            # Wait on the writebacks to finish              pending_deferreds = (self._new_deferreds.get(mbox, []) +                                   self._dirty_deferreds.get(mbox, []))              d1 = defer.gatherResults(pending_deferreds, consumeErrors=True) @@ -962,6 +968,10 @@ class MemoryStore(object):          # are done (gatherResults)          return getattr(self, self.WRITING_FLAG) +    @property +    def permanent_store(self): +        return self._permanent_store +      # Memory management.      def get_size(self): diff --git a/mail/src/leap/mail/imap/service/imap.py b/mail/src/leap/mail/imap/service/imap.py index 5487cfc..93df51d 100644 --- a/mail/src/leap/mail/imap/service/imap.py +++ b/mail/src/leap/mail/imap/service/imap.py @@ -19,7 +19,9 @@ Imap service initialization  """  import logging  import os +import time +from twisted.internet import defer, threads  from twisted.internet.protocol import ServerFactory  from twisted.internet.error import CannotListenError  from twisted.mail import imap4 @@ -122,6 +124,35 @@ class LeapIMAPFactory(ServerFactory):          imapProtocol.factory = self          return imapProtocol +    def doStop(self, cv): +        """ +        Stops imap service (fetcher, factory and port). + +        :param cv: A condition variable to which we can signal when imap +                   indeed stops. +        :type cv: threading.Condition +        :return: a Deferred that stops and flushes the in memory store data to +                 disk in another thread. +        :rtype: Deferred +        """ +        ServerFactory.doStop(self) + +        def _stop_imap_cb(): +            logger.debug('Stopping in memory store.') +            self._memstore.stop_and_flush() +            while not self._memstore.producer.is_queue_empty(): +                logger.debug('Waiting for queue to be empty.') +                # TODO use a gatherResults over the new/dirty deferred list, +                # as in memorystore's expunge() method. +                time.sleep(1) +            # notify that service has stopped +            logger.debug('Notifying that service has stopped.') +            cv.acquire() +            cv.notify() +            cv.release() + +        return threads.deferToThread(_stop_imap_cb) +  def run_service(*args, **kwargs):      """ diff --git a/mail/src/leap/mail/messageflow.py b/mail/src/leap/mail/messageflow.py index b7fc030..80121c8 100644 --- a/mail/src/leap/mail/messageflow.py +++ b/mail/src/leap/mail/messageflow.py @@ -64,6 +64,11 @@ class IMessageProducer(Interface):          Stop producing items.          """ +    def flush(self): +        """ +        Flush queued messages to consumer. +        """ +  class DummyMsgConsumer(object): @@ -162,6 +167,12 @@ class MessageProducer(object):          if self._loop.running:              self._loop.stop() +    def flush(self): +        """ +        Flush queued messages to consumer. +        """ +        self._check_for_new() +  if __name__ == "__main__":      from twisted.internet import reactor | 
