summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2015-06-02 08:40:06 -0300
committerdrebs <drebs@leap.se>2015-06-03 16:07:50 -0300
commit36d3fe05282bbc586c52536962cebf636a9b499f (patch)
tree788323087bca08f14d83678841a7286a8ec1d5f1
parent544b0ff28c654a7c21832d47f34861c3cfbfc26d (diff)
[bug] do not block when getting doc for async enc
Previous to this change, the actual encryption method used to run on its own thread. When the close method was called from another thread, the queue could be deleted after the encryption method loop had started, but before the queue was checked for new items. By removing that thread and moving the encryption loop to the reactor, that race condition should disappear. Closes: #7088.
-rw-r--r--client/changes/bug_fix-sync-enc-close-queue-error1
-rw-r--r--client/src/leap/soledad/client/encdecpool.py18
2 files changed, 11 insertions, 8 deletions
diff --git a/client/changes/bug_fix-sync-enc-close-queue-error b/client/changes/bug_fix-sync-enc-close-queue-error
new file mode 100644
index 00000000..71af7c67
--- /dev/null
+++ b/client/changes/bug_fix-sync-enc-close-queue-error
@@ -0,0 +1 @@
+ o Fix sync encrypter pool close queue error. Closes #7088.
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
index e73792ea..94a0552f 100644
--- a/client/src/leap/soledad/client/encdecpool.py
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -29,7 +29,6 @@ import logging
from twisted.internet import reactor
from twisted.internet import defer
-from twisted.internet.threads import deferToThread
from leap.soledad.common.document import SoledadDocument
from leap.soledad.common import soledad_assert
@@ -159,9 +158,8 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
self._sync_queue = multiprocessing.Queue()
# start the encryption loop
- self._deferred_loop = deferToThread(self._encrypt_docs_loop)
- self._deferred_loop.addCallback(
- lambda _: logger.debug("Finished encrypter thread."))
+ logger.debug("Starting the encryption loop...")
+ reactor.callWhenRunning(self._maybe_encrypt_and_recurse)
def enqueue_doc_for_encryption(self, doc):
"""
@@ -176,19 +174,23 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
# do not asynchronously encrypt this file if the queue is full
pass
- def _encrypt_docs_loop(self):
+ def _maybe_encrypt_and_recurse(self):
"""
Process the syncing queue and send the documents there to be encrypted
in the sync db. They will be read by the SoledadSyncTarget during the
sync_exchange.
"""
- logger.debug("Starting encrypter thread.")
- while not self._stopped:
+ if not self._stopped:
try:
- doc = self._sync_queue.get(True, self.ENCRYPT_LOOP_PERIOD)
+ doc = self._sync_queue.get(False)
self._encrypt_doc(doc)
except Queue.Empty:
pass
+ reactor.callLater(
+ self.ENCRYPT_LOOP_PERIOD,
+ self._maybe_encrypt_and_recurse)
+ else:
+ logger.debug("Finished encrypter thread.")
def _encrypt_doc(self, doc):
"""