summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/encdecpool.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/encdecpool.py')
-rw-r--r--client/src/leap/soledad/client/encdecpool.py42
1 files changed, 28 insertions, 14 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
index d9a72b25..f81cd2d1 100644
--- a/client/src/leap/soledad/client/encdecpool.py
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -29,7 +29,7 @@ import logging
from twisted.internet import reactor
from twisted.internet import defer
-from twisted.internet.threads import deferToThread
+from twisted.python import log
from leap.soledad.common.document import SoledadDocument
from leap.soledad.common import soledad_assert
@@ -147,7 +147,7 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
TABLE_NAME = "docs_tosync"
FIELD_NAMES = "doc_id PRIMARY KEY, rev, content"
- ENCRYPT_LOOP_PERIOD = 0.5
+ ENCRYPT_LOOP_PERIOD = 2
def __init__(self, *args, **kwargs):
"""
@@ -159,9 +159,8 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
self._sync_queue = multiprocessing.Queue()
# start the encryption loop
- self._deferred_loop = deferToThread(self._encrypt_docs_loop)
- self._deferred_loop.addCallback(
- lambda _: logger.debug("Finished encrypter thread."))
+ logger.debug("Starting the encryption loop...")
+ reactor.callWhenRunning(self._maybe_encrypt_and_recurse)
def enqueue_doc_for_encryption(self, doc):
"""
@@ -171,24 +170,28 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
:type doc: SoledadDocument
"""
try:
- self.sync_queue.put_nowait(doc)
- except multiprocessing.Queue.Full:
+ self._sync_queue.put_nowait(doc)
+ except Queue.Full:
# do not asynchronously encrypt this file if the queue is full
pass
- def _encrypt_docs_loop(self):
+ def _maybe_encrypt_and_recurse(self):
"""
Process the syncing queue and send the documents there to be encrypted
in the sync db. They will be read by the SoledadSyncTarget during the
sync_exchange.
"""
- logger.debug("Starting encrypter thread.")
- while not self._stopped:
+ if not self._stopped:
try:
- doc = self._sync_queue.get(True, self.ENCRYPT_LOOP_PERIOD)
+ doc = self._sync_queue.get(False)
self._encrypt_doc(doc)
except Queue.Empty:
pass
+ reactor.callLater(
+ self.ENCRYPT_LOOP_PERIOD,
+ self._maybe_encrypt_and_recurse)
+ else:
+ logger.debug("Finished encrypter thread.")
def _encrypt_doc(self, doc):
"""
@@ -374,9 +377,9 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
self.source_replica_uid = kwargs.pop("source_replica_uid")
SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
- self._last_inserted_idx = 0
self._docs_to_process = None
self._processed_docs = 0
+ self._last_inserted_idx = 0
self._async_results = []
self._failure = None
@@ -386,6 +389,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
# asynchronous call, so we have to somehow make sure that it is
# executed before any other call to the database, without
# blocking.
+ # XXX in mail and keymanager we have a pattern for that -- kali.
self._empty()
def _launch_decrypt_and_process(self):
@@ -402,6 +406,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
return self._failure
def _set_failure(self, failure):
+ log.err(failure)
self._failure = failure
self._finished = True
@@ -419,6 +424,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:type docs_to_process: int
"""
self._docs_to_process = docs_to_process
+ self._finished = False
self._schedule_decrypt_and_process()
def insert_encrypted_received_doc(
@@ -729,7 +735,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:rtype: twisted.internet.defer.Deferred
"""
if not self.failed():
- if self._processed_docs < self._docs_to_process:
+ processed = self._processed_docs
+ pending = self._docs_to_process
+
+ if not self.has_finished() and processed < pending:
yield self._async_decrypt_received_docs()
yield self._collect_async_decryption_results()
docs = yield self._process_decrypted_docs()
@@ -737,7 +746,12 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
# recurse
self._schedule_decrypt_and_process()
else:
- self._finished = True
+ self._mark_finished()
+
+ def _mark_finished(self):
+ self._finished = True
+ self._processed_docs = 0
+ self._last_inserted_idx = 0
def has_finished(self):
"""