From 36d3fe05282bbc586c52536962cebf636a9b499f Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 2 Jun 2015 08:40:06 -0300 Subject: [bug] do not block when getting doc for async enc Previous to this change, the actual encryption method used to run on its own thread. When the close method was called from another thread, the queue could be deleted after the encryption method loop had started, but before the queue was checked for new items. By removing that thread and moving the encryption loop to the reactor, that race condition should disappear. Closes: #7088. --- client/src/leap/soledad/client/encdecpool.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) (limited to 'client/src/leap') diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index e73792ea..94a0552f 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -29,7 +29,6 @@ import logging from twisted.internet import reactor from twisted.internet import defer -from twisted.internet.threads import deferToThread from leap.soledad.common.document import SoledadDocument from leap.soledad.common import soledad_assert @@ -159,9 +158,8 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): self._sync_queue = multiprocessing.Queue() # start the encryption loop - self._deferred_loop = deferToThread(self._encrypt_docs_loop) - self._deferred_loop.addCallback( - lambda _: logger.debug("Finished encrypter thread.")) + logger.debug("Starting the encryption loop...") + reactor.callWhenRunning(self._maybe_encrypt_and_recurse) def enqueue_doc_for_encryption(self, doc): """ @@ -176,19 +174,23 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): # do not asynchronously encrypt this file if the queue is full pass - def _encrypt_docs_loop(self): + def _maybe_encrypt_and_recurse(self): """ Process the syncing queue and send the documents there to be encrypted in the sync db. They will be read by the SoledadSyncTarget during the sync_exchange. """ - logger.debug("Starting encrypter thread.") - while not self._stopped: + if not self._stopped: try: - doc = self._sync_queue.get(True, self.ENCRYPT_LOOP_PERIOD) + doc = self._sync_queue.get(False) self._encrypt_doc(doc) except Queue.Empty: pass + reactor.callLater( + self.ENCRYPT_LOOP_PERIOD, + self._maybe_encrypt_and_recurse) + else: + logger.debug("Finished encrypter thread.") def _encrypt_doc(self, doc): """ -- cgit v1.2.3