summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/src/leap/soledad/client/encdecpool.py89
-rw-r--r--client/src/leap/soledad/client/http_target.py51
2 files changed, 97 insertions, 43 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
index 02eeb590..d9f3d28c 100644
--- a/client/src/leap/soledad/client/encdecpool.py
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -32,6 +32,7 @@ from zope.proxy import sameProxiedObjects
from twisted.internet import defer
from twisted.internet.threads import deferToThread
+from twisted.python.failure import Failure
from leap.soledad.common.document import SoledadDocument
from leap.soledad.common import soledad_assert
@@ -390,7 +391,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
self._processed_docs = 0
self._async_results = []
- self._exception = None
+ self._failure = None
self._finished = threading.Event()
# clear the database before starting the sync
@@ -399,10 +400,26 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
d.addCallback(lambda _: self._empty_db.set())
# start the decryption loop
+ def _maybe_store_failure_and_finish(result):
+ if isinstance(result, Failure):
+ self._set_failure(result)
+ self._finished.set()
+ logger.debug("Finished decrypter thread.")
+
self._deferred_loop = deferToThread(
self._decrypt_and_process_docs_loop)
- self._deferred_loop.addCallback(
- lambda _: logger.debug("Finished decrypter thread."))
+ self._deferred_loop.addBoth(
+ _maybe_store_failure_and_finish)
+
+ @property
+ def failure(self):
+ return self._failure
+
+ def _set_failure(self, failure):
+ self._failure = failure
+
+ def succeeded(self):
+ return self._failure is None
def set_docs_to_process(self, docs_to_process):
"""
@@ -760,35 +777,43 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
This method runs in its own thread, so sleeping will not interfere
with the main thread.
"""
- try:
- # wait for database to be emptied
- self._empty_db.wait()
- # wait until we know how many documents we need to process
- while self._docs_to_process is None:
- time.sleep(self.DECRYPT_LOOP_PERIOD)
- # because all database operations are asynchronous, we use an
- # event to make sure we don't start the next loop before the
- # current one has finished.
- event = threading.Event()
- # loop until we have processes as many docs as the number of
- # changes
- while self._processed_docs < self._docs_to_process:
- if sameProxiedObjects(
- self._insert_doc_cb.get(self.source_replica_uid),
- None):
- continue
- event.clear()
- d = self._decrypt_received_docs()
- d.addCallback(lambda _: self._raise_if_async_fails())
- d.addCallback(lambda _: self._process_decrypted())
- d.addCallback(self._delete_processed_docs)
- d.addCallback(lambda _: event.set())
- event.wait()
- # sleep a bit to give time for some decryption work
- time.sleep(self.DECRYPT_LOOP_PERIOD)
- except Exception as e:
- self._exception = e
- self._finished.set()
+ # wait for database to be emptied
+ self._empty_db.wait()
+
+ # wait until we know how many documents we need to process
+ while self._docs_to_process is None:
+ time.sleep(self.DECRYPT_LOOP_PERIOD)
+
+ # because all database operations are asynchronous, we use an
+ # event to make sure we don't start the next loop before the
+ # current one has finished.
+ event = threading.Event()
+
+ # loop until we have processes as many docs as the number of
+ # changes
+ while self._processed_docs < self._docs_to_process:
+
+ if sameProxiedObjects(
+ self._insert_doc_cb.get(self.source_replica_uid),
+ None):
+ continue
+
+ event.clear()
+
+ d = self._decrypt_received_docs()
+ d.addCallback(lambda _: self._raise_if_async_fails())
+ d.addCallback(lambda _: self._process_decrypted())
+ d.addCallback(lambda r: self._delete_processed_docs(r))
+ d.addErrback(self._set_failure) # grab failure and continue
+ d.addCallback(lambda _: event.set())
+
+ event.wait()
+
+ if not self.succeeded():
+ break
+
+ # sleep a bit to give time for some decryption work
+ time.sleep(self.DECRYPT_LOOP_PERIOD)
def has_finished(self):
return self._finished.is_set()
diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py
index 5f18e4a9..bf397cfe 100644
--- a/client/src/leap/soledad/client/http_target.py
+++ b/client/src/leap/soledad/client/http_target.py
@@ -396,7 +396,14 @@ class SoledadHTTPSyncTarget(SyncTarget):
headers = self._auth_header.copy()
headers.update({'content-type': 'application/x-soledad-sync-get'})
- # maybe get one doc
+ #---------------------------------------------------------------------
+ # maybe receive the first document
+ #---------------------------------------------------------------------
+
+ # we fetch the first document before fetching the rest because we need
+ # to know the total number of documents to be received, and this
+ # information comes as metadata to each request.
+
d = self._receive_one_doc(
headers, last_known_generation, last_known_trans_id,
sync_id, 0)
@@ -406,28 +413,48 @@ class SoledadHTTPSyncTarget(SyncTarget):
if defer_decryption:
self._sync_decr_pool.set_docs_to_process(
number_of_changes)
- idx = 1
- # maybe get more documents
+ #---------------------------------------------------------------------
+ # maybe receive the rest of the documents
+ #---------------------------------------------------------------------
+
+ # launch many asynchronous fetches and inserts of received documents
+ # in the temporary sync db. Will wait for all results before
+ # continuing.
+
+ received = 1
deferreds = []
- while idx < number_of_changes:
+ while received < number_of_changes:
d = self._receive_one_doc(
headers, last_known_generation,
- last_known_trans_id, sync_id, idx)
+ last_known_trans_id, sync_id, received)
d.addCallback(
partial(
self._insert_received_doc,
- idx + 1,
+ received + 1, # the index of the current received doc
number_of_changes))
deferreds.append(d)
- idx += 1
+ received += 1
results = yield defer.gatherResults(deferreds)
- # get genration and transaction id of target after insertions
+ # get generation and transaction id of target after insertions
if deferreds:
_, new_generation, new_transaction_id = results.pop()
- # get current target gen and trans id in case no documents were
+ #---------------------------------------------------------------------
+ # wait for async decryption to finish
+ #---------------------------------------------------------------------
+
+ # below we do a trick so we can wait for the SyncDecrypterPool to
+ # finish its work before finally returning the new generation and
+ # transaction id of the remote replica. To achieve that, we create a
+ # Deferred that will return the results of the sync and, if we are
+ # decrypting asynchronously, we use reactor.callLater() to
+ # periodically poll the decrypter and check if it has finished its
+ # work. When it has finished, we either call the callback or errback
+ # of that deferred. In case we are not asynchronously decrypting, we
+ # just fire the deferred.
+
def _shutdown_and_finish(res):
self._sync_decr_pool.close()
return new_generation, new_transaction_id
@@ -441,9 +468,11 @@ class SoledadHTTPSyncTarget(SyncTarget):
SyncDecrypterPool.DECRYPT_LOOP_PERIOD,
_wait_or_finish)
else:
- d.callback(None)
+ if self._sync_decr_pool.succeeded():
+ d.callback(None)
+ else:
+ d.errback(self._sync_decr_pool.failure)
- # decrypt docs in case of deferred decryption
if defer_decryption:
_wait_or_finish()
else: