summaryrefslogtreecommitdiff
path: root/client/src
diff options
context:
space:
mode:
authorVictor Shyba <victor.shyba@gmail.com>2015-08-12 15:51:03 -0300
committerBruno Wagner <bwgpro@gmail.com>2015-08-12 17:17:20 -0300
commitf9297cd90ac2623b36d31e3ebf4b9b719a0efc1f (patch)
tree72316d86a8b7493a73ae872f434fa57e0c67a313 /client/src
parent9423a3fe4191eb54c5ddf17ceee3cc05b5c074c9 (diff)
[bug] changes multiprocessing.Queue to Twisted's
multiprocessing.Queue is suitable for process communication, but its not the ideal for a reactor model. This commit changes it to DeferredQueue, where consumers and producers doesnt block and Twisted can handle them better.
Diffstat (limited to 'client/src')
-rw-r--r--client/src/leap/soledad/client/encdecpool.py22
1 files changed, 12 insertions, 10 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
index 4cdb3cd4..df32d74f 100644
--- a/client/src/leap/soledad/client/encdecpool.py
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -175,7 +175,7 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
Initialize the sync encrypter pool.
"""
SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
- self._encr_queue = multiprocessing.Queue()
+ self._encr_queue = defer.DeferredQueue()
# TODO delete already synced files from database
def start(self):
@@ -192,8 +192,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
"""
# close the sync queue
if self._encr_queue:
- self._encr_queue.close()
q = self._encr_queue
+ for d in q.pending:
+ d.cancel()
del q
self._encr_queue = None
@@ -207,11 +208,12 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
:type doc: SoledadDocument
"""
try:
- self._encr_queue.put_nowait(doc)
+ self._encr_queue.put(doc)
except Queue.Full:
# do not asynchronously encrypt this file if the queue is full
pass
+ @defer.inlineCallbacks
def _maybe_encrypt_and_recurse(self):
"""
Process one document from the encryption queue.
@@ -221,13 +223,13 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
the sync_exchange.
"""
try:
- doc = self._encr_queue.get(False)
- self._encrypt_doc(doc)
- except Queue.Empty:
- pass
- self._delayed_call = reactor.callLater(
- self.ENCRYPT_LOOP_PERIOD,
- self._maybe_encrypt_and_recurse)
+ while self.running:
+ doc = yield self._encr_queue.get()
+ self._encrypt_doc(doc)
+ except defer.QueueUnderflow:
+ self._delayed_call = reactor.callLater(
+ self.ENCRYPT_LOOP_PERIOD,
+ self._maybe_encrypt_and_recurse)
def _encrypt_doc(self, doc):
"""