From 8074dcfff4bf2304d581efe8a01174a2dd1288eb Mon Sep 17 00:00:00 2001 From: Ruben Pollan Date: Wed, 15 Jul 2015 23:24:46 -0400 Subject: [refactor] use a deferred to signal when SyncDecriptionPool has finished It makes the code simpler and clearer to use a deferred instead of having to pull on 'has_finished'. - Related: #7234 --- client/src/leap/soledad/client/encdecpool.py | 67 +++++++++++---------------- client/src/leap/soledad/client/http_target.py | 54 ++++----------------- 2 files changed, 38 insertions(+), 83 deletions(-) (limited to 'client/src') diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index f81cd2d1..7923bf70 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -382,8 +382,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self._last_inserted_idx = 0 self._async_results = [] - self._failure = None - self._finished = False # XXX we want to empty the database before starting, but this is an # asynchronous call, so we have to somehow make sure that it is @@ -394,24 +392,26 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): def _launch_decrypt_and_process(self): d = self._decrypt_and_process_docs() - d.addErrback(lambda f: self._set_failure(f)) + d.addErrback(self._errback) def _schedule_decrypt_and_process(self): reactor.callLater( self.DECRYPT_LOOP_PERIOD, self._launch_decrypt_and_process) - @property - def failure(self): - return self._failure - - def _set_failure(self, failure): + def _errback(self, failure): log.err(failure) - self._failure = failure - self._finished = True + self._deferred.errback(failure) + self._processed_docs = 0 + self._last_inserted_idx = 0 - def failed(self): - return bool(self._failure) + @property + def deferred(self): + """ + Deferred that will be fired when the decryption loop has finished + processing all the documents. + """ + return self._deferred def start(self, docs_to_process): """ @@ -424,7 +424,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :type docs_to_process: int """ self._docs_to_process = docs_to_process - self._finished = False + self._deferred = defer.Deferred() self._schedule_decrypt_and_process() def insert_encrypted_received_doc( @@ -521,10 +521,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :type trans_id: str :param idx: The index of this document in the current sync process. :type idx: int - - :return: A deferred that will fire after the document hasa been - decrypted and inserted in the sync db. - :rtype: twisted.internet.defer.Deferred """ soledad_assert(self._crypto is not None, "need a crypto object") @@ -734,27 +730,20 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): delete operations have been executed. :rtype: twisted.internet.defer.Deferred """ - if not self.failed(): - processed = self._processed_docs - pending = self._docs_to_process - - if not self.has_finished() and processed < pending: - yield self._async_decrypt_received_docs() - yield self._collect_async_decryption_results() - docs = yield self._process_decrypted_docs() - yield self._delete_processed_docs(docs) - # recurse - self._schedule_decrypt_and_process() - else: - self._mark_finished() - - def _mark_finished(self): - self._finished = True + processed = self._processed_docs + pending = self._docs_to_process + + if processed < pending: + yield self._async_decrypt_received_docs() + self._collect_async_decryption_results() + docs = yield self._process_decrypted_docs() + yield self._delete_processed_docs(docs) + # recurse + self._schedule_decrypt_and_process() + else: + self._finish() + + def _finish(self): + self._deferred.callback(None) self._processed_docs = 0 self._last_inserted_idx = 0 - - def has_finished(self): - """ - Return whether the decrypter has finished its work. - """ - return self._finished diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py index ac078f39..f1e83130 100644 --- a/client/src/leap/soledad/client/http_target.py +++ b/client/src/leap/soledad/client/http_target.py @@ -27,10 +27,8 @@ import base64 import logging from uuid import uuid4 -from functools import partial from twisted.internet import defer -from twisted.internet import reactor from twisted.web.error import Error from u1db import errors @@ -393,11 +391,10 @@ class SoledadHTTPSyncTarget(SyncTarget): # to know the total number of documents to be received, and this # information comes as metadata to each request. - d = self._receive_one_doc( + doc = yield self._receive_one_doc( headers, last_known_generation, last_known_trans_id, sync_id, 0) - d.addCallback(partial(self._insert_received_doc, 1, 1)) - number_of_changes, ngen, ntrans = yield d + number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1) if defer_decryption: self._sync_decr_pool.start(number_of_changes) @@ -417,10 +414,9 @@ class SoledadHTTPSyncTarget(SyncTarget): headers, last_known_generation, last_known_trans_id, sync_id, received) d.addCallback( - partial( - self._insert_received_doc, - received + 1, # the index of the current received doc - number_of_changes)) + self._insert_received_doc, + received + 1, # the index of the current received doc + number_of_changes) deferreds.append(d) received += 1 results = yield defer.gatherResults(deferreds) @@ -433,40 +429,10 @@ class SoledadHTTPSyncTarget(SyncTarget): # wait for async decryption to finish #--------------------------------------------------------------------- - # below we do a trick so we can wait for the SyncDecrypterPool to - # finish its work before finally returning the new generation and - # transaction id of the remote replica. To achieve that, we create a - # Deferred that will return the results of the sync and, if we are - # decrypting asynchronously, we use reactor.callLater() to - # periodically poll the decrypter and check if it has finished its - # work. When it has finished, we either call the callback or errback - # of that deferred. In case we are not asynchronously decrypting, we - # just fire the deferred. - - def _shutdown_and_finish(res): - self._sync_decr_pool.close() - return new_generation, new_transaction_id - - d = defer.Deferred() - d.addCallback(_shutdown_and_finish) - - def _wait_or_finish(): - if not self._sync_decr_pool.has_finished(): - reactor.callLater( - SyncDecrypterPool.DECRYPT_LOOP_PERIOD, - _wait_or_finish) - else: - if not self._sync_decr_pool.failed(): - d.callback(None) - else: - d.errback(self._sync_decr_pool.failure) - if defer_decryption: - _wait_or_finish() - else: - d.callback(None) + yield self._sync_decr_pool.deferred + self._sync_decr_pool.close() - new_generation, new_transaction_id = yield d defer.returnValue([new_generation, new_transaction_id]) def _receive_one_doc(self, headers, last_known_generation, @@ -490,16 +456,16 @@ class SoledadHTTPSyncTarget(SyncTarget): headers=headers, body=''.join(entries)) - def _insert_received_doc(self, idx, total, response): + def _insert_received_doc(self, response, idx, total): """ Insert a received document into the local replica. + :param response: The body and headers of the response. + :type response: tuple(str, dict) :param idx: The index count of the current operation. :type idx: int :param total: The total number of operations. :type total: int - :param response: The body and headers of the response. - :type response: tuple(str, dict) """ new_generation, new_transaction_id, number_of_changes, doc_id, \ rev, content, gen, trans_id = \ -- cgit v1.2.3