diff options
-rw-r--r-- | client/changes/bug_fix-sync-enc-close-queue-error | 1 | ||||
-rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 18 |
2 files changed, 11 insertions, 8 deletions
diff --git a/client/changes/bug_fix-sync-enc-close-queue-error b/client/changes/bug_fix-sync-enc-close-queue-error new file mode 100644 index 00000000..71af7c67 --- /dev/null +++ b/client/changes/bug_fix-sync-enc-close-queue-error @@ -0,0 +1 @@ + o Fix sync encrypter pool close queue error. Closes #7088. diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index e73792ea..94a0552f 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -29,7 +29,6 @@ import logging from twisted.internet import reactor from twisted.internet import defer -from twisted.internet.threads import deferToThread from leap.soledad.common.document import SoledadDocument from leap.soledad.common import soledad_assert @@ -159,9 +158,8 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): self._sync_queue = multiprocessing.Queue() # start the encryption loop - self._deferred_loop = deferToThread(self._encrypt_docs_loop) - self._deferred_loop.addCallback( - lambda _: logger.debug("Finished encrypter thread.")) + logger.debug("Starting the encryption loop...") + reactor.callWhenRunning(self._maybe_encrypt_and_recurse) def enqueue_doc_for_encryption(self, doc): """ @@ -176,19 +174,23 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): # do not asynchronously encrypt this file if the queue is full pass - def _encrypt_docs_loop(self): + def _maybe_encrypt_and_recurse(self): """ Process the syncing queue and send the documents there to be encrypted in the sync db. They will be read by the SoledadSyncTarget during the sync_exchange. """ - logger.debug("Starting encrypter thread.") - while not self._stopped: + if not self._stopped: try: - doc = self._sync_queue.get(True, self.ENCRYPT_LOOP_PERIOD) + doc = self._sync_queue.get(False) self._encrypt_doc(doc) except Queue.Empty: pass + reactor.callLater( + self.ENCRYPT_LOOP_PERIOD, + self._maybe_encrypt_and_recurse) + else: + logger.debug("Finished encrypter thread.") def _encrypt_doc(self, doc): """ |