summaryrefslogtreecommitdiff
path: root/mail/src/leap
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-02-07 03:24:23 -0400
committerKali Kaneko <kali@leap.se>2014-02-07 03:24:23 -0400
commit5c29bf0cfe91032764052bc925754d728276d149 (patch)
tree9f60602268b9a11c80ad46986cb15f1380542262 /mail/src/leap
parentad904c9a670b3d58689a811e2a200b2e7c4233eb (diff)
parent54045366aa275fd43ab65e450ffa23f03db6bb72 (diff)
Merge remote-tracking branch 'drebs/feature/in-memory-store-with-data-flushing' into develop
Diffstat (limited to 'mail/src/leap')
-rw-r--r--mail/src/leap/mail/imap/memorystore.py22
-rw-r--r--mail/src/leap/mail/imap/service/imap.py31
-rw-r--r--mail/src/leap/mail/messageflow.py11
3 files changed, 58 insertions, 6 deletions
diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py
index 9c7973d..3eba59a 100644
--- a/mail/src/leap/mail/imap/memorystore.py
+++ b/mail/src/leap/mail/imap/memorystore.py
@@ -875,6 +875,15 @@ class MemoryStore(object):
self.remove_message(mbox, uid)
return mem_deleted
+ def stop_and_flush(self):
+ """
+ Stop the write loop and trigger a write to the producer.
+ """
+ self._stop_write_loop()
+ if self._permanent_store is not None:
+ self.write_messages(self._permanent_store)
+ self.producer.flush()
+
def expunge(self, mbox, observer):
"""
Remove all messages flagged \\Deleted, from the Memory Store
@@ -890,12 +899,9 @@ class MemoryStore(object):
"""
soledad_store = self._permanent_store
try:
- # 1. Stop the writing call
- self._stop_write_loop()
- # 2. Enqueue a last write.
- self.write_messages(soledad_store)
- # 3. Wait on the writebacks to finish
-
+ # Stop and trigger last write
+ self.stop_and_flush()
+ # Wait on the writebacks to finish
pending_deferreds = (self._new_deferreds.get(mbox, []) +
self._dirty_deferreds.get(mbox, []))
d1 = defer.gatherResults(pending_deferreds, consumeErrors=True)
@@ -962,6 +968,10 @@ class MemoryStore(object):
# are done (gatherResults)
return getattr(self, self.WRITING_FLAG)
+ @property
+ def permanent_store(self):
+ return self._permanent_store
+
# Memory management.
def get_size(self):
diff --git a/mail/src/leap/mail/imap/service/imap.py b/mail/src/leap/mail/imap/service/imap.py
index 5487cfc..93df51d 100644
--- a/mail/src/leap/mail/imap/service/imap.py
+++ b/mail/src/leap/mail/imap/service/imap.py
@@ -19,7 +19,9 @@ Imap service initialization
"""
import logging
import os
+import time
+from twisted.internet import defer, threads
from twisted.internet.protocol import ServerFactory
from twisted.internet.error import CannotListenError
from twisted.mail import imap4
@@ -122,6 +124,35 @@ class LeapIMAPFactory(ServerFactory):
imapProtocol.factory = self
return imapProtocol
+ def doStop(self, cv):
+ """
+ Stops imap service (fetcher, factory and port).
+
+ :param cv: A condition variable to which we can signal when imap
+ indeed stops.
+ :type cv: threading.Condition
+ :return: a Deferred that stops and flushes the in memory store data to
+ disk in another thread.
+ :rtype: Deferred
+ """
+ ServerFactory.doStop(self)
+
+ def _stop_imap_cb():
+ logger.debug('Stopping in memory store.')
+ self._memstore.stop_and_flush()
+ while not self._memstore.producer.is_queue_empty():
+ logger.debug('Waiting for queue to be empty.')
+ # TODO use a gatherResults over the new/dirty deferred list,
+ # as in memorystore's expunge() method.
+ time.sleep(1)
+ # notify that service has stopped
+ logger.debug('Notifying that service has stopped.')
+ cv.acquire()
+ cv.notify()
+ cv.release()
+
+ return threads.deferToThread(_stop_imap_cb)
+
def run_service(*args, **kwargs):
"""
diff --git a/mail/src/leap/mail/messageflow.py b/mail/src/leap/mail/messageflow.py
index b7fc030..80121c8 100644
--- a/mail/src/leap/mail/messageflow.py
+++ b/mail/src/leap/mail/messageflow.py
@@ -64,6 +64,11 @@ class IMessageProducer(Interface):
Stop producing items.
"""
+ def flush(self):
+ """
+ Flush queued messages to consumer.
+ """
+
class DummyMsgConsumer(object):
@@ -162,6 +167,12 @@ class MessageProducer(object):
if self._loop.running:
self._loop.stop()
+ def flush(self):
+ """
+ Flush queued messages to consumer.
+ """
+ self._check_for_new()
+
if __name__ == "__main__":
from twisted.internet import reactor