summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/src/leap/soledad/client/encdecpool.py22
1 files changed, 12 insertions, 10 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
index 4cdb3cd4..df32d74f 100644
--- a/client/src/leap/soledad/client/encdecpool.py
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -175,7 +175,7 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
Initialize the sync encrypter pool.
"""
SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
- self._encr_queue = multiprocessing.Queue()
+ self._encr_queue = defer.DeferredQueue()
# TODO delete already synced files from database
def start(self):
@@ -192,8 +192,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
"""
# close the sync queue
if self._encr_queue:
- self._encr_queue.close()
q = self._encr_queue
+ for d in q.pending:
+ d.cancel()
del q
self._encr_queue = None
@@ -207,11 +208,12 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
:type doc: SoledadDocument
"""
try:
- self._encr_queue.put_nowait(doc)
+ self._encr_queue.put(doc)
except Queue.Full:
# do not asynchronously encrypt this file if the queue is full
pass
+ @defer.inlineCallbacks
def _maybe_encrypt_and_recurse(self):
"""
Process one document from the encryption queue.
@@ -221,13 +223,13 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
the sync_exchange.
"""
try:
- doc = self._encr_queue.get(False)
- self._encrypt_doc(doc)
- except Queue.Empty:
- pass
- self._delayed_call = reactor.callLater(
- self.ENCRYPT_LOOP_PERIOD,
- self._maybe_encrypt_and_recurse)
+ while self.running:
+ doc = yield self._encr_queue.get()
+ self._encrypt_doc(doc)
+ except defer.QueueUnderflow:
+ self._delayed_call = reactor.callLater(
+ self.ENCRYPT_LOOP_PERIOD,
+ self._maybe_encrypt_and_recurse)
def _encrypt_doc(self, doc):
"""