diff options
Diffstat (limited to 'client/src')
-rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 24 |
1 files changed, 13 insertions, 11 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index d9a72b25..c2e2eda8 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -29,7 +29,6 @@ import logging from twisted.internet import reactor from twisted.internet import defer -from twisted.internet.threads import deferToThread from leap.soledad.common.document import SoledadDocument from leap.soledad.common import soledad_assert @@ -147,7 +146,7 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): TABLE_NAME = "docs_tosync" FIELD_NAMES = "doc_id PRIMARY KEY, rev, content" - ENCRYPT_LOOP_PERIOD = 0.5 + ENCRYPT_LOOP_PERIOD = 2 def __init__(self, *args, **kwargs): """ @@ -159,9 +158,8 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): self._sync_queue = multiprocessing.Queue() # start the encryption loop - self._deferred_loop = deferToThread(self._encrypt_docs_loop) - self._deferred_loop.addCallback( - lambda _: logger.debug("Finished encrypter thread.")) + logger.debug("Starting the encryption loop...") + reactor.callWhenRunning(self._maybe_encrypt_and_recurse) def enqueue_doc_for_encryption(self, doc): """ @@ -171,24 +169,28 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): :type doc: SoledadDocument """ try: - self.sync_queue.put_nowait(doc) - except multiprocessing.Queue.Full: + self._sync_queue.put_nowait(doc) + except Queue.Full: # do not asynchronously encrypt this file if the queue is full pass - def _encrypt_docs_loop(self): + def _maybe_encrypt_and_recurse(self): """ Process the syncing queue and send the documents there to be encrypted in the sync db. They will be read by the SoledadSyncTarget during the sync_exchange. """ - logger.debug("Starting encrypter thread.") - while not self._stopped: + if not self._stopped: try: - doc = self._sync_queue.get(True, self.ENCRYPT_LOOP_PERIOD) + doc = self._sync_queue.get(False) self._encrypt_doc(doc) except Queue.Empty: pass + reactor.callLater( + self.ENCRYPT_LOOP_PERIOD, + self._maybe_encrypt_and_recurse) + else: + logger.debug("Finished encrypter thread.") def _encrypt_doc(self, doc): """ |