summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/encdecpool.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/encdecpool.py')
-rw-r--r--client/src/leap/soledad/client/encdecpool.py67
1 files changed, 28 insertions, 39 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
index f81cd2d1..7923bf70 100644
--- a/client/src/leap/soledad/client/encdecpool.py
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -382,8 +382,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
self._last_inserted_idx = 0
self._async_results = []
- self._failure = None
- self._finished = False
# XXX we want to empty the database before starting, but this is an
# asynchronous call, so we have to somehow make sure that it is
@@ -394,24 +392,26 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
def _launch_decrypt_and_process(self):
d = self._decrypt_and_process_docs()
- d.addErrback(lambda f: self._set_failure(f))
+ d.addErrback(self._errback)
def _schedule_decrypt_and_process(self):
reactor.callLater(
self.DECRYPT_LOOP_PERIOD,
self._launch_decrypt_and_process)
- @property
- def failure(self):
- return self._failure
-
- def _set_failure(self, failure):
+ def _errback(self, failure):
log.err(failure)
- self._failure = failure
- self._finished = True
+ self._deferred.errback(failure)
+ self._processed_docs = 0
+ self._last_inserted_idx = 0
- def failed(self):
- return bool(self._failure)
+ @property
+ def deferred(self):
+ """
+ Deferred that will be fired when the decryption loop has finished
+ processing all the documents.
+ """
+ return self._deferred
def start(self, docs_to_process):
"""
@@ -424,7 +424,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:type docs_to_process: int
"""
self._docs_to_process = docs_to_process
- self._finished = False
+ self._deferred = defer.Deferred()
self._schedule_decrypt_and_process()
def insert_encrypted_received_doc(
@@ -521,10 +521,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:type trans_id: str
:param idx: The index of this document in the current sync process.
:type idx: int
-
- :return: A deferred that will fire after the document hasa been
- decrypted and inserted in the sync db.
- :rtype: twisted.internet.defer.Deferred
"""
soledad_assert(self._crypto is not None, "need a crypto object")
@@ -734,27 +730,20 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
delete operations have been executed.
:rtype: twisted.internet.defer.Deferred
"""
- if not self.failed():
- processed = self._processed_docs
- pending = self._docs_to_process
-
- if not self.has_finished() and processed < pending:
- yield self._async_decrypt_received_docs()
- yield self._collect_async_decryption_results()
- docs = yield self._process_decrypted_docs()
- yield self._delete_processed_docs(docs)
- # recurse
- self._schedule_decrypt_and_process()
- else:
- self._mark_finished()
-
- def _mark_finished(self):
- self._finished = True
+ processed = self._processed_docs
+ pending = self._docs_to_process
+
+ if processed < pending:
+ yield self._async_decrypt_received_docs()
+ self._collect_async_decryption_results()
+ docs = yield self._process_decrypted_docs()
+ yield self._delete_processed_docs(docs)
+ # recurse
+ self._schedule_decrypt_and_process()
+ else:
+ self._finish()
+
+ def _finish(self):
+ self._deferred.callback(None)
self._processed_docs = 0
self._last_inserted_idx = 0
-
- def has_finished(self):
- """
- Return whether the decrypter has finished its work.
- """
- return self._finished