diff options
author | drebs <drebs@leap.se> | 2015-06-02 08:40:06 -0300 |
---|---|---|
committer | drebs <drebs@leap.se> | 2015-06-03 16:07:50 -0300 |
commit | 36d3fe05282bbc586c52536962cebf636a9b499f (patch) | |
tree | 788323087bca08f14d83678841a7286a8ec1d5f1 | |
parent | 544b0ff28c654a7c21832d47f34861c3cfbfc26d (diff) |
[bug] do not block when getting doc for async enc
Previous to this change, the actual encryption method used to run on its own
thread. When the close method was called from another thread, the queue could
be deleted after the encryption method loop had started, but before the queue
was checked for new items.
By removing that thread and moving the encryption loop to the reactor, that
race condition should disappear.
Closes: #7088.
-rw-r--r-- | client/changes/bug_fix-sync-enc-close-queue-error | 1 | ||||
-rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 18 |
2 files changed, 11 insertions, 8 deletions
diff --git a/client/changes/bug_fix-sync-enc-close-queue-error b/client/changes/bug_fix-sync-enc-close-queue-error new file mode 100644 index 00000000..71af7c67 --- /dev/null +++ b/client/changes/bug_fix-sync-enc-close-queue-error @@ -0,0 +1 @@ + o Fix sync encrypter pool close queue error. Closes #7088. diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index e73792ea..94a0552f 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -29,7 +29,6 @@ import logging from twisted.internet import reactor from twisted.internet import defer -from twisted.internet.threads import deferToThread from leap.soledad.common.document import SoledadDocument from leap.soledad.common import soledad_assert @@ -159,9 +158,8 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): self._sync_queue = multiprocessing.Queue() # start the encryption loop - self._deferred_loop = deferToThread(self._encrypt_docs_loop) - self._deferred_loop.addCallback( - lambda _: logger.debug("Finished encrypter thread.")) + logger.debug("Starting the encryption loop...") + reactor.callWhenRunning(self._maybe_encrypt_and_recurse) def enqueue_doc_for_encryption(self, doc): """ @@ -176,19 +174,23 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): # do not asynchronously encrypt this file if the queue is full pass - def _encrypt_docs_loop(self): + def _maybe_encrypt_and_recurse(self): """ Process the syncing queue and send the documents there to be encrypted in the sync db. They will be read by the SoledadSyncTarget during the sync_exchange. """ - logger.debug("Starting encrypter thread.") - while not self._stopped: + if not self._stopped: try: - doc = self._sync_queue.get(True, self.ENCRYPT_LOOP_PERIOD) + doc = self._sync_queue.get(False) self._encrypt_doc(doc) except Queue.Empty: pass + reactor.callLater( + self.ENCRYPT_LOOP_PERIOD, + self._maybe_encrypt_and_recurse) + else: + logger.debug("Finished encrypter thread.") def _encrypt_doc(self, doc): """ |