summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/crypto.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/crypto.py')
-rw-r--r--client/src/leap/soledad/client/crypto.py69
1 files changed, 32 insertions, 37 deletions
diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py
index 950576ec..107bf7f1 100644
--- a/client/src/leap/soledad/client/crypto.py
+++ b/client/src/leap/soledad/client/crypto.py
@@ -725,6 +725,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
self._insert_doc_cb = kwargs.pop("insert_doc_cb")
SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
self.source_replica_uid = None
+ self._async_results = []
def set_source_replica_uid(self, source_replica_uid):
"""
@@ -850,33 +851,20 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
# not encrypted payload
return
- try:
- content = json.loads(content)
- except TypeError:
- logger.warning("Wrong type while decoding json: %s"
- % repr(content))
- return
-
+ content = json.loads(content)
key = self._crypto.doc_passphrase(doc_id)
secret = self._crypto.secret
args = doc_id, rev, content, gen, trans_id, key, secret
- try:
- if workers:
- # Ouch. This is sent to the workers asynchronously, so
- # we have no way of logging errors. We'd have to inspect
- # lingering results by querying successful / get() over them...
- # Or move the heck out of it to twisted.
- res = self._pool.apply_async(
- decrypt_doc_task, args,
- callback=self.decrypt_doc_cb)
- else:
- # decrypt inline
- res = decrypt_doc_task(*args)
- self.decrypt_doc_cb(res)
-
- except Exception as exc:
- logger.exception(exc)
+ if workers:
+ # save the async result object so we can inspect it for failures
+ self._async_results.append(self._pool.apply_async(
+ decrypt_doc_task, args,
+ callback=self.decrypt_doc_cb))
+ else:
+ # decrypt inline
+ res = decrypt_doc_task(*args)
+ self.decrypt_doc_cb(res)
def decrypt_doc_cb(self, result):
"""
@@ -1010,21 +998,16 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
insert_fun = self._insert_doc_cb[self.source_replica_uid]
logger.debug("Sync decrypter pool: inserting doc in local db: "
"%s:%s %s" % (doc_id, doc_rev, gen))
- try:
- # convert deleted documents to avoid error on document creation
- if content == 'null':
- content = None
- doc = SoledadDocument(doc_id, doc_rev, content)
- gen = int(gen)
- insert_fun(doc, gen, trans_id)
- except Exception as exc:
- logger.error("Sync decrypter pool: error while inserting "
- "decrypted doc into local db.")
- logger.exception(exc)
- else:
- # If no errors found, remove it from the received database.
- self.delete_received_doc(doc_id, doc_rev)
+ # convert deleted documents to avoid error on document creation
+ if content == 'null':
+ content = None
+ doc = SoledadDocument(doc_id, doc_rev, content)
+ gen = int(gen)
+ insert_fun(doc, gen, trans_id)
+
+ # If no errors found, remove it from the received database.
+ self.delete_received_doc(doc_id, doc_rev)
def empty(self):
"""
@@ -1038,3 +1021,15 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
c = self._sync_db.cursor()
c.execute(*args, **kwargs)
return c.fetchall()
+
+ def raise_in_case_of_failed_async_calls(self):
+ """
+ Re-raise any exception raised by an async call.
+
+ :raise Exception: Raised if an async call has raised an exception.
+ """
+ for res in self._async_results:
+ if res.ready():
+ if not res.successful():
+ # re-raise the exception raised by the remote call
+ res.get()