summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/encdecpool.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/encdecpool.py')
-rw-r--r--client/src/leap/soledad/client/encdecpool.py89
1 files changed, 57 insertions, 32 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
index 02eeb590..d9f3d28c 100644
--- a/client/src/leap/soledad/client/encdecpool.py
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -32,6 +32,7 @@ from zope.proxy import sameProxiedObjects
from twisted.internet import defer
from twisted.internet.threads import deferToThread
+from twisted.python.failure import Failure
from leap.soledad.common.document import SoledadDocument
from leap.soledad.common import soledad_assert
@@ -390,7 +391,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
self._processed_docs = 0
self._async_results = []
- self._exception = None
+ self._failure = None
self._finished = threading.Event()
# clear the database before starting the sync
@@ -399,10 +400,26 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
d.addCallback(lambda _: self._empty_db.set())
# start the decryption loop
+ def _maybe_store_failure_and_finish(result):
+ if isinstance(result, Failure):
+ self._set_failure(result)
+ self._finished.set()
+ logger.debug("Finished decrypter thread.")
+
self._deferred_loop = deferToThread(
self._decrypt_and_process_docs_loop)
- self._deferred_loop.addCallback(
- lambda _: logger.debug("Finished decrypter thread."))
+ self._deferred_loop.addBoth(
+ _maybe_store_failure_and_finish)
+
+ @property
+ def failure(self):
+ return self._failure
+
+ def _set_failure(self, failure):
+ self._failure = failure
+
+ def succeeded(self):
+ return self._failure is None
def set_docs_to_process(self, docs_to_process):
"""
@@ -760,35 +777,43 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
This method runs in its own thread, so sleeping will not interfere
with the main thread.
"""
- try:
- # wait for database to be emptied
- self._empty_db.wait()
- # wait until we know how many documents we need to process
- while self._docs_to_process is None:
- time.sleep(self.DECRYPT_LOOP_PERIOD)
- # because all database operations are asynchronous, we use an
- # event to make sure we don't start the next loop before the
- # current one has finished.
- event = threading.Event()
- # loop until we have processes as many docs as the number of
- # changes
- while self._processed_docs < self._docs_to_process:
- if sameProxiedObjects(
- self._insert_doc_cb.get(self.source_replica_uid),
- None):
- continue
- event.clear()
- d = self._decrypt_received_docs()
- d.addCallback(lambda _: self._raise_if_async_fails())
- d.addCallback(lambda _: self._process_decrypted())
- d.addCallback(self._delete_processed_docs)
- d.addCallback(lambda _: event.set())
- event.wait()
- # sleep a bit to give time for some decryption work
- time.sleep(self.DECRYPT_LOOP_PERIOD)
- except Exception as e:
- self._exception = e
- self._finished.set()
+ # wait for database to be emptied
+ self._empty_db.wait()
+
+ # wait until we know how many documents we need to process
+ while self._docs_to_process is None:
+ time.sleep(self.DECRYPT_LOOP_PERIOD)
+
+ # because all database operations are asynchronous, we use an
+ # event to make sure we don't start the next loop before the
+ # current one has finished.
+ event = threading.Event()
+
+ # loop until we have processes as many docs as the number of
+ # changes
+ while self._processed_docs < self._docs_to_process:
+
+ if sameProxiedObjects(
+ self._insert_doc_cb.get(self.source_replica_uid),
+ None):
+ continue
+
+ event.clear()
+
+ d = self._decrypt_received_docs()
+ d.addCallback(lambda _: self._raise_if_async_fails())
+ d.addCallback(lambda _: self._process_decrypted())
+ d.addCallback(lambda r: self._delete_processed_docs(r))
+ d.addErrback(self._set_failure) # grab failure and continue
+ d.addCallback(lambda _: event.set())
+
+ event.wait()
+
+ if not self.succeeded():
+ break
+
+ # sleep a bit to give time for some decryption work
+ time.sleep(self.DECRYPT_LOOP_PERIOD)
def has_finished(self):
return self._finished.is_set()