summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/target.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/target.py')
-rw-r--r--client/src/leap/soledad/client/target.py64
1 files changed, 23 insertions, 41 deletions
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
index 06cef1ee..17ce718f 100644
--- a/client/src/leap/soledad/client/target.py
+++ b/client/src/leap/soledad/client/target.py
@@ -43,7 +43,8 @@ from leap.soledad.common.document import SoledadDocument
from leap.soledad.client.auth import TokenBasedAuth
from leap.soledad.client.crypto import is_symmetrically_encrypted
from leap.soledad.client.crypto import encrypt_doc, decrypt_doc
-from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool
+from leap.soledad.client.encdecpool import SyncEncrypterPool
+from leap.soledad.client.encdecpool import SyncDecrypterPool
from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS
from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
from leap.soledad.client.events import signal
@@ -787,9 +788,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self._crypto = crypto
self._stopped = True
self._stop_lock = threading.Lock()
- self._sync_exchange_lock = threading.Lock()
self.source_replica_uid = source_replica_uid
- self._defer_decryption = False
self._syncer_pool = None
# deferred decryption attributes
@@ -813,9 +812,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"""
Tear down the SyncDecrypterPool.
"""
- if self._sync_decr_pool is not None:
- self._sync_decr_pool.close()
- self._sync_decr_pool = None
+ self._sync_decr_pool.close()
+ self._sync_decr_pool = None
def _get_replica_uid(self, url):
"""
@@ -903,7 +901,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
doc = SoledadDocument(doc_id, rev, content)
if is_symmetrically_encrypted(doc):
if self._queue_for_decrypt:
- self._save_encrypted_received_doc(
+ self._enqueue_encrypted_received_doc(
doc, gen, trans_id, idx, total)
else:
# defer_decryption is False or no-sync-db fallback
@@ -913,7 +911,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
# not symmetrically encrypted doc, insert it directly
# or save it in the decrypted stage.
if self._queue_for_decrypt:
- self._save_received_doc(doc, gen, trans_id, idx, total)
+ self._enqueue_received_doc(doc, gen, trans_id, idx, total)
else:
self._return_doc_cb(doc, gen, trans_id)
# -------------------------------------------------------------
@@ -996,6 +994,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self.stop(fail=True)
break
+ if defer_decryption:
+ self._setup_sync_decr_pool()
+
t.doc_syncer.set_request_method(
'get', idx, sync_id, last_known_generation,
last_known_trans_id)
@@ -1021,6 +1022,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
t.join()
if t.success:
number_of_changes, _, _ = t.result
+ if defer_decryption and number_of_changes:
+ self._sync_decr_pool.set_docs_to_process(
+ number_of_changes)
else:
raise t.exception
first_request = False
@@ -1053,6 +1057,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
new_generation = doc_data['gen']
new_transaction_id = doc_data['trans_id']
+ # decrypt docs in case of deferred decryption
+ if defer_decryption:
+ self._sync_decr_pool.wait()
+ self._teardown_sync_decr_pool()
+
return new_generation, new_transaction_id
def sync_exchange(self, docs_by_generations,
@@ -1103,14 +1112,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"""
self._ensure_callback = ensure_callback
- if defer_decryption and self._sync_db is not None:
- self._sync_exchange_lock.acquire()
- self._setup_sync_decr_pool()
- self._defer_decryption = True
- else:
- # fall back
- defer_decryption = False
-
self.start()
if sync_id is None:
@@ -1120,10 +1121,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
setProxiedObject(self._insert_doc_cb[source_replica_uid],
return_doc_cb)
- # empty the database before starting a new sync
- if defer_decryption is True and not self.clear_to_sync():
- self._sync_decr_pool.empty()
-
self._ensure_connection()
if self._trace_hook: # for tests
self._trace_hook('sync_exchange')
@@ -1257,6 +1254,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
trans_id_after_send = response_dict['new_transaction_id']
# get docs from target
+ if self._sync_db is None:
+ defer_decryption = False
if self.stopped is False:
cur_target_gen, cur_target_trans_id = self._get_remote_docs(
url,
@@ -1266,12 +1265,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self._syncer_pool.cleanup()
- # decrypt docs in case of deferred decryption
- if defer_decryption:
- self._sync_decr_pool.wait()
- self._teardown_sync_decr_pool()
- self._sync_exchange_lock.release()
-
# update gen and trans id info in case we just sent and did not
# receive docs.
if gen_after_send is not None and gen_after_send > cur_target_gen:
@@ -1357,7 +1350,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
encr.TABLE_NAME,))
self._sync_db.execute(sql, (doc_id, doc_rev))
- def _save_encrypted_received_doc(self, doc, gen, trans_id, idx, total):
+ def _enqueue_encrypted_received_doc(self, doc, gen, trans_id, idx, total):
"""
Save a symmetrically encrypted incoming document into the received
docs table in the sync db. A decryption task will pick it up
@@ -1378,9 +1371,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"Enqueueing doc for decryption: %d/%d."
% (idx + 1, total))
self._sync_decr_pool.insert_encrypted_received_doc(
- doc.doc_id, doc.rev, doc.content, gen, trans_id)
+ doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1)
- def _save_received_doc(self, doc, gen, trans_id, idx, total):
+ def _enqueue_received_doc(self, doc, gen, trans_id, idx, total):
"""
Save any incoming document into the received docs table in the sync db.
@@ -1399,23 +1392,12 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"Enqueueing doc, no decryption needed: %d/%d."
% (idx + 1, total))
self._sync_decr_pool.insert_received_doc(
- doc.doc_id, doc.rev, doc.content, gen, trans_id)
+ doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1)
#
# Symmetric decryption of syncing docs
#
- def clear_to_sync(self):
- """
- Return whether sync can proceed (ie, the received db table is empty).
-
- :return: Whether sync can proceed.
- :rtype: bool
- """
- if self._sync_decr_pool:
- return self._sync_decr_pool.clear_to_sync()
- return True
-
def set_decryption_callback(self, cb):
"""
Set callback to be called when the decryption finishes.