diff options
| -rw-r--r-- | client/src/leap/soledad/client/api.py | 1 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/crypto.py | 69 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/target.py | 12 | 
3 files changed, 39 insertions, 43 deletions
diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index 35b44ac8..b2cabe08 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -44,7 +44,6 @@ from u1db.remote.ssl_match_hostname import match_hostname  from zope.interface import implements  from twisted.python import log -from twisted.internet import defer  from leap.common.config import get_path_prefix diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index 950576ec..107bf7f1 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -725,6 +725,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          self._insert_doc_cb = kwargs.pop("insert_doc_cb")          SyncEncryptDecryptPool.__init__(self, *args, **kwargs)          self.source_replica_uid = None +        self._async_results = []      def set_source_replica_uid(self, source_replica_uid):          """ @@ -850,33 +851,20 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):              # not encrypted payload              return -        try: -            content = json.loads(content) -        except TypeError: -            logger.warning("Wrong type while decoding json: %s" -                           % repr(content)) -            return - +        content = json.loads(content)          key = self._crypto.doc_passphrase(doc_id)          secret = self._crypto.secret          args = doc_id, rev, content, gen, trans_id, key, secret -        try: -            if workers: -                # Ouch. This is sent to the workers asynchronously, so -                # we have no way of logging errors. We'd have to inspect -                # lingering results by querying successful / get() over them... -                # Or move the heck out of it to twisted. -                res = self._pool.apply_async( -                    decrypt_doc_task, args, -                    callback=self.decrypt_doc_cb) -            else: -                # decrypt inline -                res = decrypt_doc_task(*args) -                self.decrypt_doc_cb(res) - -        except Exception as exc: -            logger.exception(exc) +        if workers: +            # save the async result object so we can inspect it for failures +            self._async_results.append(self._pool.apply_async( +                decrypt_doc_task, args, +                callback=self.decrypt_doc_cb)) +        else: +            # decrypt inline +            res = decrypt_doc_task(*args) +            self.decrypt_doc_cb(res)      def decrypt_doc_cb(self, result):          """ @@ -1010,21 +998,16 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          insert_fun = self._insert_doc_cb[self.source_replica_uid]          logger.debug("Sync decrypter pool: inserting doc in local db: "                       "%s:%s %s" % (doc_id, doc_rev, gen)) -        try: -            # convert deleted documents to avoid error on document creation -            if content == 'null': -                content = None -            doc = SoledadDocument(doc_id, doc_rev, content) -            gen = int(gen) -            insert_fun(doc, gen, trans_id) -        except Exception as exc: -            logger.error("Sync decrypter pool: error while inserting " -                         "decrypted doc into local db.") -            logger.exception(exc) -        else: -            # If no errors found, remove it from the received database. -            self.delete_received_doc(doc_id, doc_rev) +        # convert deleted documents to avoid error on document creation +        if content == 'null': +            content = None +        doc = SoledadDocument(doc_id, doc_rev, content) +        gen = int(gen) +        insert_fun(doc, gen, trans_id) + +        # If no errors found, remove it from the received database. +        self.delete_received_doc(doc_id, doc_rev)      def empty(self):          """ @@ -1038,3 +1021,15 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):              c = self._sync_db.cursor()              c.execute(*args, **kwargs)              return c.fetchall() + +    def raise_in_case_of_failed_async_calls(self): +        """ +        Re-raise any exception raised by an async call. + +        :raise Exception: Raised if an async call has raised an exception. +        """ +        for res in self._async_results: +            if res.ready(): +                if not res.successful(): +                    # re-raise the exception raised by the remote call +                    res.get() diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index dd61c070..986bd991 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -1296,7 +1296,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          # decrypt docs in case of deferred decryption          if defer_decryption: -            while self.clear_to_sync() is False: +            while not self.clear_to_sync():                  sleep(self.DECRYPT_LOOP_PERIOD)              self._teardown_sync_loop()              self._teardown_sync_decr_pool() @@ -1435,13 +1435,14 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):      def clear_to_sync(self):          """ -        Return True if sync can proceed (ie, the received db table is empty). +        Return whether sync can proceed (ie, the received db table is empty). + +        :return: Whether sync can proceed.          :rtype: bool          """ -        if self._sync_decr_pool is not None: +        if self._sync_decr_pool:              return self._sync_decr_pool.count_docs_in_sync_db() == 0 -        else: -            return True +        return True      def set_decryption_callback(self, cb):          """ @@ -1478,6 +1479,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):              return          decrypter = self._sync_decr_pool +        decrypter.raise_in_case_of_failed_async_calls()          decrypter.decrypt_received_docs()          decrypter.process_decrypted()  | 
