diff options
author | drebs <drebs@riseup.net> | 2015-05-20 17:28:27 -0300 |
---|---|---|
committer | drebs <drebs@riseup.net> | 2015-05-20 17:28:27 -0300 |
commit | ce161f9623a1dea6eda9fc2350c60073dbcdce06 (patch) | |
tree | da4a7da65a3d33156eba9c7afc71e946d0eb97f5 | |
parent | d59ac3b5ce713787cd7a46e181f2381de3a8fde2 (diff) |
[bug] ensure async decryption failures are logged
We have to make sure any failures in asynchronous decryption code is grabbed
and properly transmitted up the deferred chain so it can be logged. This
commit adds errbacks in the decryption pool that grab any failure and a
check on the http target the failure if that is the case.
-rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 89 | ||||
-rw-r--r-- | client/src/leap/soledad/client/http_target.py | 51 |
2 files changed, 97 insertions, 43 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 02eeb590..d9f3d28c 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -32,6 +32,7 @@ from zope.proxy import sameProxiedObjects from twisted.internet import defer from twisted.internet.threads import deferToThread +from twisted.python.failure import Failure from leap.soledad.common.document import SoledadDocument from leap.soledad.common import soledad_assert @@ -390,7 +391,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self._processed_docs = 0 self._async_results = [] - self._exception = None + self._failure = None self._finished = threading.Event() # clear the database before starting the sync @@ -399,10 +400,26 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): d.addCallback(lambda _: self._empty_db.set()) # start the decryption loop + def _maybe_store_failure_and_finish(result): + if isinstance(result, Failure): + self._set_failure(result) + self._finished.set() + logger.debug("Finished decrypter thread.") + self._deferred_loop = deferToThread( self._decrypt_and_process_docs_loop) - self._deferred_loop.addCallback( - lambda _: logger.debug("Finished decrypter thread.")) + self._deferred_loop.addBoth( + _maybe_store_failure_and_finish) + + @property + def failure(self): + return self._failure + + def _set_failure(self, failure): + self._failure = failure + + def succeeded(self): + return self._failure is None def set_docs_to_process(self, docs_to_process): """ @@ -760,35 +777,43 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): This method runs in its own thread, so sleeping will not interfere with the main thread. """ - try: - # wait for database to be emptied - self._empty_db.wait() - # wait until we know how many documents we need to process - while self._docs_to_process is None: - time.sleep(self.DECRYPT_LOOP_PERIOD) - # because all database operations are asynchronous, we use an - # event to make sure we don't start the next loop before the - # current one has finished. - event = threading.Event() - # loop until we have processes as many docs as the number of - # changes - while self._processed_docs < self._docs_to_process: - if sameProxiedObjects( - self._insert_doc_cb.get(self.source_replica_uid), - None): - continue - event.clear() - d = self._decrypt_received_docs() - d.addCallback(lambda _: self._raise_if_async_fails()) - d.addCallback(lambda _: self._process_decrypted()) - d.addCallback(self._delete_processed_docs) - d.addCallback(lambda _: event.set()) - event.wait() - # sleep a bit to give time for some decryption work - time.sleep(self.DECRYPT_LOOP_PERIOD) - except Exception as e: - self._exception = e - self._finished.set() + # wait for database to be emptied + self._empty_db.wait() + + # wait until we know how many documents we need to process + while self._docs_to_process is None: + time.sleep(self.DECRYPT_LOOP_PERIOD) + + # because all database operations are asynchronous, we use an + # event to make sure we don't start the next loop before the + # current one has finished. + event = threading.Event() + + # loop until we have processes as many docs as the number of + # changes + while self._processed_docs < self._docs_to_process: + + if sameProxiedObjects( + self._insert_doc_cb.get(self.source_replica_uid), + None): + continue + + event.clear() + + d = self._decrypt_received_docs() + d.addCallback(lambda _: self._raise_if_async_fails()) + d.addCallback(lambda _: self._process_decrypted()) + d.addCallback(lambda r: self._delete_processed_docs(r)) + d.addErrback(self._set_failure) # grab failure and continue + d.addCallback(lambda _: event.set()) + + event.wait() + + if not self.succeeded(): + break + + # sleep a bit to give time for some decryption work + time.sleep(self.DECRYPT_LOOP_PERIOD) def has_finished(self): return self._finished.is_set() diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py index 5f18e4a9..bf397cfe 100644 --- a/client/src/leap/soledad/client/http_target.py +++ b/client/src/leap/soledad/client/http_target.py @@ -396,7 +396,14 @@ class SoledadHTTPSyncTarget(SyncTarget): headers = self._auth_header.copy() headers.update({'content-type': 'application/x-soledad-sync-get'}) - # maybe get one doc + #--------------------------------------------------------------------- + # maybe receive the first document + #--------------------------------------------------------------------- + + # we fetch the first document before fetching the rest because we need + # to know the total number of documents to be received, and this + # information comes as metadata to each request. + d = self._receive_one_doc( headers, last_known_generation, last_known_trans_id, sync_id, 0) @@ -406,28 +413,48 @@ class SoledadHTTPSyncTarget(SyncTarget): if defer_decryption: self._sync_decr_pool.set_docs_to_process( number_of_changes) - idx = 1 - # maybe get more documents + #--------------------------------------------------------------------- + # maybe receive the rest of the documents + #--------------------------------------------------------------------- + + # launch many asynchronous fetches and inserts of received documents + # in the temporary sync db. Will wait for all results before + # continuing. + + received = 1 deferreds = [] - while idx < number_of_changes: + while received < number_of_changes: d = self._receive_one_doc( headers, last_known_generation, - last_known_trans_id, sync_id, idx) + last_known_trans_id, sync_id, received) d.addCallback( partial( self._insert_received_doc, - idx + 1, + received + 1, # the index of the current received doc number_of_changes)) deferreds.append(d) - idx += 1 + received += 1 results = yield defer.gatherResults(deferreds) - # get genration and transaction id of target after insertions + # get generation and transaction id of target after insertions if deferreds: _, new_generation, new_transaction_id = results.pop() - # get current target gen and trans id in case no documents were + #--------------------------------------------------------------------- + # wait for async decryption to finish + #--------------------------------------------------------------------- + + # below we do a trick so we can wait for the SyncDecrypterPool to + # finish its work before finally returning the new generation and + # transaction id of the remote replica. To achieve that, we create a + # Deferred that will return the results of the sync and, if we are + # decrypting asynchronously, we use reactor.callLater() to + # periodically poll the decrypter and check if it has finished its + # work. When it has finished, we either call the callback or errback + # of that deferred. In case we are not asynchronously decrypting, we + # just fire the deferred. + def _shutdown_and_finish(res): self._sync_decr_pool.close() return new_generation, new_transaction_id @@ -441,9 +468,11 @@ class SoledadHTTPSyncTarget(SyncTarget): SyncDecrypterPool.DECRYPT_LOOP_PERIOD, _wait_or_finish) else: - d.callback(None) + if self._sync_decr_pool.succeeded(): + d.callback(None) + else: + d.errback(self._sync_decr_pool.failure) - # decrypt docs in case of deferred decryption if defer_decryption: _wait_or_finish() else: |