diff options
| -rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 89 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/http_target.py | 51 | 
2 files changed, 97 insertions, 43 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 02eeb590..d9f3d28c 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -32,6 +32,7 @@ from zope.proxy import sameProxiedObjects  from twisted.internet import defer  from twisted.internet.threads import deferToThread +from twisted.python.failure import Failure  from leap.soledad.common.document import SoledadDocument  from leap.soledad.common import soledad_assert @@ -390,7 +391,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          self._processed_docs = 0          self._async_results = [] -        self._exception = None +        self._failure = None          self._finished = threading.Event()          # clear the database before starting the sync @@ -399,10 +400,26 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          d.addCallback(lambda _: self._empty_db.set())          # start the decryption loop +        def _maybe_store_failure_and_finish(result): +            if isinstance(result, Failure): +                self._set_failure(result) +            self._finished.set() +            logger.debug("Finished decrypter thread.") +          self._deferred_loop = deferToThread(              self._decrypt_and_process_docs_loop) -        self._deferred_loop.addCallback( -            lambda _: logger.debug("Finished decrypter thread.")) +        self._deferred_loop.addBoth( +            _maybe_store_failure_and_finish) + +    @property +    def failure(self): +        return self._failure + +    def _set_failure(self, failure): +        self._failure = failure + +    def succeeded(self): +        return self._failure is None      def set_docs_to_process(self, docs_to_process):          """ @@ -760,35 +777,43 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          This method runs in its own thread, so sleeping will not interfere          with the main thread.          """ -        try: -            # wait for database to be emptied -            self._empty_db.wait() -            # wait until we know how many documents we need to process -            while self._docs_to_process is None: -                time.sleep(self.DECRYPT_LOOP_PERIOD) -            # because all database operations are asynchronous, we use an -            # event to make sure we don't start the next loop before the -            # current one has finished. -            event = threading.Event() -            # loop until we have processes as many docs as the number of -            # changes -            while self._processed_docs < self._docs_to_process: -                if sameProxiedObjects( -                        self._insert_doc_cb.get(self.source_replica_uid), -                        None): -                    continue -                event.clear() -                d = self._decrypt_received_docs() -                d.addCallback(lambda _: self._raise_if_async_fails()) -                d.addCallback(lambda _: self._process_decrypted()) -                d.addCallback(self._delete_processed_docs) -                d.addCallback(lambda _: event.set()) -                event.wait() -                # sleep a bit to give time for some decryption work -                time.sleep(self.DECRYPT_LOOP_PERIOD) -        except Exception as e: -            self._exception = e -        self._finished.set() +        # wait for database to be emptied +        self._empty_db.wait() + +        # wait until we know how many documents we need to process +        while self._docs_to_process is None: +            time.sleep(self.DECRYPT_LOOP_PERIOD) + +        # because all database operations are asynchronous, we use an +        # event to make sure we don't start the next loop before the +        # current one has finished. +        event = threading.Event() + +        # loop until we have processes as many docs as the number of +        # changes +        while self._processed_docs < self._docs_to_process: + +            if sameProxiedObjects( +                    self._insert_doc_cb.get(self.source_replica_uid), +                    None): +                continue + +            event.clear() + +            d = self._decrypt_received_docs() +            d.addCallback(lambda _: self._raise_if_async_fails()) +            d.addCallback(lambda _: self._process_decrypted()) +            d.addCallback(lambda r: self._delete_processed_docs(r)) +            d.addErrback(self._set_failure)  # grab failure and continue +            d.addCallback(lambda _: event.set()) + +            event.wait() + +            if not self.succeeded(): +                break + +            # sleep a bit to give time for some decryption work +            time.sleep(self.DECRYPT_LOOP_PERIOD)      def has_finished(self):          return self._finished.is_set() diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py index 5f18e4a9..bf397cfe 100644 --- a/client/src/leap/soledad/client/http_target.py +++ b/client/src/leap/soledad/client/http_target.py @@ -396,7 +396,14 @@ class SoledadHTTPSyncTarget(SyncTarget):          headers = self._auth_header.copy()          headers.update({'content-type': 'application/x-soledad-sync-get'}) -        # maybe get one doc +        #--------------------------------------------------------------------- +        # maybe receive the first document +        #--------------------------------------------------------------------- + +        # we fetch the first document before fetching the rest because we need +        # to know the total number of documents to be received, and this +        # information comes as metadata to each request. +          d = self._receive_one_doc(              headers, last_known_generation, last_known_trans_id,              sync_id, 0) @@ -406,28 +413,48 @@ class SoledadHTTPSyncTarget(SyncTarget):          if defer_decryption:              self._sync_decr_pool.set_docs_to_process(                  number_of_changes) -        idx = 1 -        # maybe get more documents +        #--------------------------------------------------------------------- +        # maybe receive the rest of the documents +        #--------------------------------------------------------------------- + +        # launch many asynchronous fetches and inserts of received documents +        # in the temporary sync db. Will wait for all results before +        # continuing. + +        received = 1          deferreds = [] -        while idx < number_of_changes: +        while received < number_of_changes:              d = self._receive_one_doc(                  headers, last_known_generation, -                last_known_trans_id, sync_id, idx) +                last_known_trans_id, sync_id, received)              d.addCallback(                  partial(                      self._insert_received_doc, -                    idx + 1, +                    received + 1,  # the index of the current received doc                      number_of_changes))              deferreds.append(d) -            idx += 1 +            received += 1          results = yield defer.gatherResults(deferreds) -        # get genration and transaction id of target after insertions +        # get generation and transaction id of target after insertions          if deferreds:              _, new_generation, new_transaction_id = results.pop() -        # get current target gen and trans id in case no documents were +        #--------------------------------------------------------------------- +        # wait for async decryption to finish +        #--------------------------------------------------------------------- + +        # below we do a trick so we can wait for the SyncDecrypterPool to +        # finish its work before finally returning the new generation and +        # transaction id of the remote replica. To achieve that, we create a +        # Deferred that will return the results of the sync and, if we are +        # decrypting asynchronously, we use reactor.callLater() to +        # periodically poll the decrypter and check if it has finished its +        # work. When it has finished, we either call the callback or errback +        # of that deferred. In case we are not asynchronously decrypting, we +        # just fire the deferred. +          def _shutdown_and_finish(res):              self._sync_decr_pool.close()              return new_generation, new_transaction_id @@ -441,9 +468,11 @@ class SoledadHTTPSyncTarget(SyncTarget):                      SyncDecrypterPool.DECRYPT_LOOP_PERIOD,                      _wait_or_finish)              else: -                d.callback(None) +                if self._sync_decr_pool.succeeded(): +                    d.callback(None) +                else: +                    d.errback(self._sync_decr_pool.failure) -        # decrypt docs in case of deferred decryption          if defer_decryption:              _wait_or_finish()          else:  | 
