summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/changes/feat-refactor_decr_pool1
-rw-r--r--client/src/leap/soledad/client/encdecpool.py67
-rw-r--r--client/src/leap/soledad/client/http_target.py54
3 files changed, 39 insertions, 83 deletions
diff --git a/client/changes/feat-refactor_decr_pool b/client/changes/feat-refactor_decr_pool
new file mode 100644
index 00000000..7a567bcc
--- /dev/null
+++ b/client/changes/feat-refactor_decr_pool
@@ -0,0 +1 @@
+- Refactor decription pool and http target to use a deferred instead of a waiting loop.
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
index f81cd2d1..7923bf70 100644
--- a/client/src/leap/soledad/client/encdecpool.py
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -382,8 +382,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
self._last_inserted_idx = 0
self._async_results = []
- self._failure = None
- self._finished = False
# XXX we want to empty the database before starting, but this is an
# asynchronous call, so we have to somehow make sure that it is
@@ -394,24 +392,26 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
def _launch_decrypt_and_process(self):
d = self._decrypt_and_process_docs()
- d.addErrback(lambda f: self._set_failure(f))
+ d.addErrback(self._errback)
def _schedule_decrypt_and_process(self):
reactor.callLater(
self.DECRYPT_LOOP_PERIOD,
self._launch_decrypt_and_process)
- @property
- def failure(self):
- return self._failure
-
- def _set_failure(self, failure):
+ def _errback(self, failure):
log.err(failure)
- self._failure = failure
- self._finished = True
+ self._deferred.errback(failure)
+ self._processed_docs = 0
+ self._last_inserted_idx = 0
- def failed(self):
- return bool(self._failure)
+ @property
+ def deferred(self):
+ """
+ Deferred that will be fired when the decryption loop has finished
+ processing all the documents.
+ """
+ return self._deferred
def start(self, docs_to_process):
"""
@@ -424,7 +424,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:type docs_to_process: int
"""
self._docs_to_process = docs_to_process
- self._finished = False
+ self._deferred = defer.Deferred()
self._schedule_decrypt_and_process()
def insert_encrypted_received_doc(
@@ -521,10 +521,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:type trans_id: str
:param idx: The index of this document in the current sync process.
:type idx: int
-
- :return: A deferred that will fire after the document hasa been
- decrypted and inserted in the sync db.
- :rtype: twisted.internet.defer.Deferred
"""
soledad_assert(self._crypto is not None, "need a crypto object")
@@ -734,27 +730,20 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
delete operations have been executed.
:rtype: twisted.internet.defer.Deferred
"""
- if not self.failed():
- processed = self._processed_docs
- pending = self._docs_to_process
-
- if not self.has_finished() and processed < pending:
- yield self._async_decrypt_received_docs()
- yield self._collect_async_decryption_results()
- docs = yield self._process_decrypted_docs()
- yield self._delete_processed_docs(docs)
- # recurse
- self._schedule_decrypt_and_process()
- else:
- self._mark_finished()
-
- def _mark_finished(self):
- self._finished = True
+ processed = self._processed_docs
+ pending = self._docs_to_process
+
+ if processed < pending:
+ yield self._async_decrypt_received_docs()
+ self._collect_async_decryption_results()
+ docs = yield self._process_decrypted_docs()
+ yield self._delete_processed_docs(docs)
+ # recurse
+ self._schedule_decrypt_and_process()
+ else:
+ self._finish()
+
+ def _finish(self):
+ self._deferred.callback(None)
self._processed_docs = 0
self._last_inserted_idx = 0
-
- def has_finished(self):
- """
- Return whether the decrypter has finished its work.
- """
- return self._finished
diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py
index ac078f39..f1e83130 100644
--- a/client/src/leap/soledad/client/http_target.py
+++ b/client/src/leap/soledad/client/http_target.py
@@ -27,10 +27,8 @@ import base64
import logging
from uuid import uuid4
-from functools import partial
from twisted.internet import defer
-from twisted.internet import reactor
from twisted.web.error import Error
from u1db import errors
@@ -393,11 +391,10 @@ class SoledadHTTPSyncTarget(SyncTarget):
# to know the total number of documents to be received, and this
# information comes as metadata to each request.
- d = self._receive_one_doc(
+ doc = yield self._receive_one_doc(
headers, last_known_generation, last_known_trans_id,
sync_id, 0)
- d.addCallback(partial(self._insert_received_doc, 1, 1))
- number_of_changes, ngen, ntrans = yield d
+ number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1)
if defer_decryption:
self._sync_decr_pool.start(number_of_changes)
@@ -417,10 +414,9 @@ class SoledadHTTPSyncTarget(SyncTarget):
headers, last_known_generation,
last_known_trans_id, sync_id, received)
d.addCallback(
- partial(
- self._insert_received_doc,
- received + 1, # the index of the current received doc
- number_of_changes))
+ self._insert_received_doc,
+ received + 1, # the index of the current received doc
+ number_of_changes)
deferreds.append(d)
received += 1
results = yield defer.gatherResults(deferreds)
@@ -433,40 +429,10 @@ class SoledadHTTPSyncTarget(SyncTarget):
# wait for async decryption to finish
#---------------------------------------------------------------------
- # below we do a trick so we can wait for the SyncDecrypterPool to
- # finish its work before finally returning the new generation and
- # transaction id of the remote replica. To achieve that, we create a
- # Deferred that will return the results of the sync and, if we are
- # decrypting asynchronously, we use reactor.callLater() to
- # periodically poll the decrypter and check if it has finished its
- # work. When it has finished, we either call the callback or errback
- # of that deferred. In case we are not asynchronously decrypting, we
- # just fire the deferred.
-
- def _shutdown_and_finish(res):
- self._sync_decr_pool.close()
- return new_generation, new_transaction_id
-
- d = defer.Deferred()
- d.addCallback(_shutdown_and_finish)
-
- def _wait_or_finish():
- if not self._sync_decr_pool.has_finished():
- reactor.callLater(
- SyncDecrypterPool.DECRYPT_LOOP_PERIOD,
- _wait_or_finish)
- else:
- if not self._sync_decr_pool.failed():
- d.callback(None)
- else:
- d.errback(self._sync_decr_pool.failure)
-
if defer_decryption:
- _wait_or_finish()
- else:
- d.callback(None)
+ yield self._sync_decr_pool.deferred
+ self._sync_decr_pool.close()
- new_generation, new_transaction_id = yield d
defer.returnValue([new_generation, new_transaction_id])
def _receive_one_doc(self, headers, last_known_generation,
@@ -490,16 +456,16 @@ class SoledadHTTPSyncTarget(SyncTarget):
headers=headers,
body=''.join(entries))
- def _insert_received_doc(self, idx, total, response):
+ def _insert_received_doc(self, response, idx, total):
"""
Insert a received document into the local replica.
+ :param response: The body and headers of the response.
+ :type response: tuple(str, dict)
:param idx: The index count of the current operation.
:type idx: int
:param total: The total number of operations.
:type total: int
- :param response: The body and headers of the response.
- :type response: tuple(str, dict)
"""
new_generation, new_transaction_id, number_of_changes, doc_id, \
rev, content, gen, trans_id = \