diff options
-rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 89 | ||||
-rw-r--r-- | client/src/leap/soledad/client/http_target.py | 51 |
2 files changed, 97 insertions, 43 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 02eeb590..d9f3d28c 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -32,6 +32,7 @@ from zope.proxy import sameProxiedObjects from twisted.internet import defer from twisted.internet.threads import deferToThread +from twisted.python.failure import Failure from leap.soledad.common.document import SoledadDocument from leap.soledad.common import soledad_assert @@ -390,7 +391,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self._processed_docs = 0 self._async_results = [] - self._exception = None + self._failure = None self._finished = threading.Event() # clear the database before starting the sync @@ -399,10 +400,26 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): d.addCallback(lambda _: self._empty_db.set()) # start the decryption loop + def _maybe_store_failure_and_finish(result): + if isinstance(result, Failure): + self._set_failure(result) + self._finished.set() + logger.debug("Finished decrypter thread.") + self._deferred_loop = deferToThread( self._decrypt_and_process_docs_loop) - self._deferred_loop.addCallback( - lambda _: logger.debug("Finished decrypter thread.")) + self._deferred_loop.addBoth( + _maybe_store_failure_and_finish) + + @property + def failure(self): + return self._failure + + def _set_failure(self, failure): + self._failure = failure + + def succeeded(self): + return self._failure is None def set_docs_to_process(self, docs_to_process): """ @@ -760,35 +777,43 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): This method runs in its own thread, so sleeping will not interfere with the main thread. """ - try: - # wait for database to be emptied - self._empty_db.wait() - # wait until we know how many documents we need to process - while self._docs_to_process is None: - time.sleep(self.DECRYPT_LOOP_PERIOD) - # because all database operations are asynchronous, we use an - # event to make sure we don't start the next loop before the - # current one has finished. - event = threading.Event() - # loop until we have processes as many docs as the number of - # changes - while self._processed_docs < self._docs_to_process: - if sameProxiedObjects( - self._insert_doc_cb.get(self.source_replica_uid), - None): - continue - event.clear() - d = self._decrypt_received_docs() - d.addCallback(lambda _: self._raise_if_async_fails()) - d.addCallback(lambda _: self._process_decrypted()) - d.addCallback(self._delete_processed_docs) - d.addCallback(lambda _: event.set()) - event.wait() - # sleep a bit to give time for some decryption work - time.sleep(self.DECRYPT_LOOP_PERIOD) - except Exception as e: - self._exception = e - self._finished.set() + # wait for database to be emptied + self._empty_db.wait() + + # wait until we know how many documents we need to process + while self._docs_to_process is None: + time.sleep(self.DECRYPT_LOOP_PERIOD) + + # because all database operations are asynchronous, we use an + # event to make sure we don't start the next loop before the + # current one has finished. + event = threading.Event() + + # loop until we have processes as many docs as the number of + # changes + while self._processed_docs < self._docs_to_process: + + if sameProxiedObjects( + self._insert_doc_cb.get(self.source_replica_uid), + None): + continue + + event.clear() + + d = self._decrypt_received_docs() + d.addCallback(lambda _: self._raise_if_async_fails()) + d.addCallback(lambda _: self._process_decrypted()) + d.addCallback(lambda r: self._delete_processed_docs(r)) + d.addErrback(self._set_failure) # grab failure and continue + d.addCallback(lambda _: event.set()) + + event.wait() + + if not self.succeeded(): + break + + # sleep a bit to give time for some decryption work + time.sleep(self.DECRYPT_LOOP_PERIOD) def has_finished(self): return self._finished.is_set() diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py index 5f18e4a9..bf397cfe 100644 --- a/client/src/leap/soledad/client/http_target.py +++ b/client/src/leap/soledad/client/http_target.py @@ -396,7 +396,14 @@ class SoledadHTTPSyncTarget(SyncTarget): headers = self._auth_header.copy() headers.update({'content-type': 'application/x-soledad-sync-get'}) - # maybe get one doc + #--------------------------------------------------------------------- + # maybe receive the first document + #--------------------------------------------------------------------- + + # we fetch the first document before fetching the rest because we need + # to know the total number of documents to be received, and this + # information comes as metadata to each request. + d = self._receive_one_doc( headers, last_known_generation, last_known_trans_id, sync_id, 0) @@ -406,28 +413,48 @@ class SoledadHTTPSyncTarget(SyncTarget): if defer_decryption: self._sync_decr_pool.set_docs_to_process( number_of_changes) - idx = 1 - # maybe get more documents + #--------------------------------------------------------------------- + # maybe receive the rest of the documents + #--------------------------------------------------------------------- + + # launch many asynchronous fetches and inserts of received documents + # in the temporary sync db. Will wait for all results before + # continuing. + + received = 1 deferreds = [] - while idx < number_of_changes: + while received < number_of_changes: d = self._receive_one_doc( headers, last_known_generation, - last_known_trans_id, sync_id, idx) + last_known_trans_id, sync_id, received) d.addCallback( partial( self._insert_received_doc, - idx + 1, + received + 1, # the index of the current received doc number_of_changes)) deferreds.append(d) - idx += 1 + received += 1 results = yield defer.gatherResults(deferreds) - # get genration and transaction id of target after insertions + # get generation and transaction id of target after insertions if deferreds: _, new_generation, new_transaction_id = results.pop() - # get current target gen and trans id in case no documents were + #--------------------------------------------------------------------- + # wait for async decryption to finish + #--------------------------------------------------------------------- + + # below we do a trick so we can wait for the SyncDecrypterPool to + # finish its work before finally returning the new generation and + # transaction id of the remote replica. To achieve that, we create a + # Deferred that will return the results of the sync and, if we are + # decrypting asynchronously, we use reactor.callLater() to + # periodically poll the decrypter and check if it has finished its + # work. When it has finished, we either call the callback or errback + # of that deferred. In case we are not asynchronously decrypting, we + # just fire the deferred. + def _shutdown_and_finish(res): self._sync_decr_pool.close() return new_generation, new_transaction_id @@ -441,9 +468,11 @@ class SoledadHTTPSyncTarget(SyncTarget): SyncDecrypterPool.DECRYPT_LOOP_PERIOD, _wait_or_finish) else: - d.callback(None) + if self._sync_decr_pool.succeeded(): + d.callback(None) + else: + d.errback(self._sync_decr_pool.failure) - # decrypt docs in case of deferred decryption if defer_decryption: _wait_or_finish() else: |