diff options
Diffstat (limited to 'client/src/leap/soledad/client/encdecpool.py')
-rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 89 |
1 files changed, 57 insertions, 32 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 02eeb590..d9f3d28c 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -32,6 +32,7 @@ from zope.proxy import sameProxiedObjects from twisted.internet import defer from twisted.internet.threads import deferToThread +from twisted.python.failure import Failure from leap.soledad.common.document import SoledadDocument from leap.soledad.common import soledad_assert @@ -390,7 +391,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self._processed_docs = 0 self._async_results = [] - self._exception = None + self._failure = None self._finished = threading.Event() # clear the database before starting the sync @@ -399,10 +400,26 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): d.addCallback(lambda _: self._empty_db.set()) # start the decryption loop + def _maybe_store_failure_and_finish(result): + if isinstance(result, Failure): + self._set_failure(result) + self._finished.set() + logger.debug("Finished decrypter thread.") + self._deferred_loop = deferToThread( self._decrypt_and_process_docs_loop) - self._deferred_loop.addCallback( - lambda _: logger.debug("Finished decrypter thread.")) + self._deferred_loop.addBoth( + _maybe_store_failure_and_finish) + + @property + def failure(self): + return self._failure + + def _set_failure(self, failure): + self._failure = failure + + def succeeded(self): + return self._failure is None def set_docs_to_process(self, docs_to_process): """ @@ -760,35 +777,43 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): This method runs in its own thread, so sleeping will not interfere with the main thread. """ - try: - # wait for database to be emptied - self._empty_db.wait() - # wait until we know how many documents we need to process - while self._docs_to_process is None: - time.sleep(self.DECRYPT_LOOP_PERIOD) - # because all database operations are asynchronous, we use an - # event to make sure we don't start the next loop before the - # current one has finished. - event = threading.Event() - # loop until we have processes as many docs as the number of - # changes - while self._processed_docs < self._docs_to_process: - if sameProxiedObjects( - self._insert_doc_cb.get(self.source_replica_uid), - None): - continue - event.clear() - d = self._decrypt_received_docs() - d.addCallback(lambda _: self._raise_if_async_fails()) - d.addCallback(lambda _: self._process_decrypted()) - d.addCallback(self._delete_processed_docs) - d.addCallback(lambda _: event.set()) - event.wait() - # sleep a bit to give time for some decryption work - time.sleep(self.DECRYPT_LOOP_PERIOD) - except Exception as e: - self._exception = e - self._finished.set() + # wait for database to be emptied + self._empty_db.wait() + + # wait until we know how many documents we need to process + while self._docs_to_process is None: + time.sleep(self.DECRYPT_LOOP_PERIOD) + + # because all database operations are asynchronous, we use an + # event to make sure we don't start the next loop before the + # current one has finished. + event = threading.Event() + + # loop until we have processes as many docs as the number of + # changes + while self._processed_docs < self._docs_to_process: + + if sameProxiedObjects( + self._insert_doc_cb.get(self.source_replica_uid), + None): + continue + + event.clear() + + d = self._decrypt_received_docs() + d.addCallback(lambda _: self._raise_if_async_fails()) + d.addCallback(lambda _: self._process_decrypted()) + d.addCallback(lambda r: self._delete_processed_docs(r)) + d.addErrback(self._set_failure) # grab failure and continue + d.addCallback(lambda _: event.set()) + + event.wait() + + if not self.succeeded(): + break + + # sleep a bit to give time for some decryption work + time.sleep(self.DECRYPT_LOOP_PERIOD) def has_finished(self): return self._finished.is_set() |