diff options
| -rw-r--r-- | client/changes/feat-refactor_decr_pool | 1 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 67 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/http_target.py | 54 | 
3 files changed, 39 insertions, 83 deletions
| diff --git a/client/changes/feat-refactor_decr_pool b/client/changes/feat-refactor_decr_pool new file mode 100644 index 00000000..7a567bcc --- /dev/null +++ b/client/changes/feat-refactor_decr_pool @@ -0,0 +1 @@ +- Refactor decription pool and http target to use a deferred instead of a waiting loop. diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index f81cd2d1..7923bf70 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -382,8 +382,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          self._last_inserted_idx = 0          self._async_results = [] -        self._failure = None -        self._finished = False          # XXX we want to empty the database before starting, but this is an          #     asynchronous call, so we have to somehow make sure that it is @@ -394,24 +392,26 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):      def _launch_decrypt_and_process(self):          d = self._decrypt_and_process_docs() -        d.addErrback(lambda f: self._set_failure(f)) +        d.addErrback(self._errback)      def _schedule_decrypt_and_process(self):          reactor.callLater(              self.DECRYPT_LOOP_PERIOD,              self._launch_decrypt_and_process) -    @property -    def failure(self): -        return self._failure - -    def _set_failure(self, failure): +    def _errback(self, failure):          log.err(failure) -        self._failure = failure -        self._finished = True +        self._deferred.errback(failure) +        self._processed_docs = 0 +        self._last_inserted_idx = 0 -    def failed(self): -        return bool(self._failure) +    @property +    def deferred(self): +        """ +        Deferred that will be fired when the decryption loop has finished +        processing all the documents. +        """ +        return self._deferred      def start(self, docs_to_process):          """ @@ -424,7 +424,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          :type docs_to_process: int          """          self._docs_to_process = docs_to_process -        self._finished = False +        self._deferred = defer.Deferred()          self._schedule_decrypt_and_process()      def insert_encrypted_received_doc( @@ -521,10 +521,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          :type trans_id: str          :param idx: The index of this document in the current sync process.          :type idx: int - -        :return: A deferred that will fire after the document hasa been -                 decrypted and inserted in the sync db. -        :rtype: twisted.internet.defer.Deferred          """          soledad_assert(self._crypto is not None, "need a crypto object") @@ -734,27 +730,20 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):                   delete operations have been executed.          :rtype: twisted.internet.defer.Deferred          """ -        if not self.failed(): -            processed = self._processed_docs -            pending = self._docs_to_process - -            if not self.has_finished() and processed < pending: -                yield self._async_decrypt_received_docs() -                yield self._collect_async_decryption_results() -                docs = yield self._process_decrypted_docs() -                yield self._delete_processed_docs(docs) -                # recurse -                self._schedule_decrypt_and_process() -            else: -                self._mark_finished() - -    def _mark_finished(self): -        self._finished = True +        processed = self._processed_docs +        pending = self._docs_to_process + +        if processed < pending: +            yield self._async_decrypt_received_docs() +            self._collect_async_decryption_results() +            docs = yield self._process_decrypted_docs() +            yield self._delete_processed_docs(docs) +            # recurse +            self._schedule_decrypt_and_process() +        else: +            self._finish() + +    def _finish(self): +        self._deferred.callback(None)          self._processed_docs = 0          self._last_inserted_idx = 0 - -    def has_finished(self): -        """ -        Return whether the decrypter has finished its work. -        """ -        return self._finished diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py index ac078f39..f1e83130 100644 --- a/client/src/leap/soledad/client/http_target.py +++ b/client/src/leap/soledad/client/http_target.py @@ -27,10 +27,8 @@ import base64  import logging  from uuid import uuid4 -from functools import partial  from twisted.internet import defer -from twisted.internet import reactor  from twisted.web.error import Error  from u1db import errors @@ -393,11 +391,10 @@ class SoledadHTTPSyncTarget(SyncTarget):          # to know the total number of documents to be received, and this          # information comes as metadata to each request. -        d = self._receive_one_doc( +        doc = yield self._receive_one_doc(              headers, last_known_generation, last_known_trans_id,              sync_id, 0) -        d.addCallback(partial(self._insert_received_doc, 1, 1)) -        number_of_changes, ngen, ntrans = yield d +        number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1)          if defer_decryption:              self._sync_decr_pool.start(number_of_changes) @@ -417,10 +414,9 @@ class SoledadHTTPSyncTarget(SyncTarget):                  headers, last_known_generation,                  last_known_trans_id, sync_id, received)              d.addCallback( -                partial( -                    self._insert_received_doc, -                    received + 1,  # the index of the current received doc -                    number_of_changes)) +                self._insert_received_doc, +                received + 1,  # the index of the current received doc +                number_of_changes)              deferreds.append(d)              received += 1          results = yield defer.gatherResults(deferreds) @@ -433,40 +429,10 @@ class SoledadHTTPSyncTarget(SyncTarget):          # wait for async decryption to finish          #--------------------------------------------------------------------- -        # below we do a trick so we can wait for the SyncDecrypterPool to -        # finish its work before finally returning the new generation and -        # transaction id of the remote replica. To achieve that, we create a -        # Deferred that will return the results of the sync and, if we are -        # decrypting asynchronously, we use reactor.callLater() to -        # periodically poll the decrypter and check if it has finished its -        # work. When it has finished, we either call the callback or errback -        # of that deferred. In case we are not asynchronously decrypting, we -        # just fire the deferred. - -        def _shutdown_and_finish(res): -            self._sync_decr_pool.close() -            return new_generation, new_transaction_id - -        d = defer.Deferred() -        d.addCallback(_shutdown_and_finish) - -        def _wait_or_finish(): -            if not self._sync_decr_pool.has_finished(): -                reactor.callLater( -                    SyncDecrypterPool.DECRYPT_LOOP_PERIOD, -                    _wait_or_finish) -            else: -                if not self._sync_decr_pool.failed(): -                    d.callback(None) -                else: -                    d.errback(self._sync_decr_pool.failure) -          if defer_decryption: -            _wait_or_finish() -        else: -            d.callback(None) +            yield self._sync_decr_pool.deferred +            self._sync_decr_pool.close() -        new_generation, new_transaction_id = yield d          defer.returnValue([new_generation, new_transaction_id])      def _receive_one_doc(self, headers, last_known_generation, @@ -490,16 +456,16 @@ class SoledadHTTPSyncTarget(SyncTarget):              headers=headers,              body=''.join(entries)) -    def _insert_received_doc(self, idx, total, response): +    def _insert_received_doc(self, response, idx, total):          """          Insert a received document into the local replica. +        :param response: The body and headers of the response. +        :type response: tuple(str, dict)          :param idx: The index count of the current operation.          :type idx: int          :param total: The total number of operations.          :type total: int -        :param response: The body and headers of the response. -        :type response: tuple(str, dict)          """          new_generation, new_transaction_id, number_of_changes, doc_id, \              rev, content, gen, trans_id = \ | 
