diff options
Diffstat (limited to 'client/src/leap/soledad/client/target.py')
-rw-r--r-- | client/src/leap/soledad/client/target.py | 64 |
1 files changed, 23 insertions, 41 deletions
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 06cef1ee..17ce718f 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -43,7 +43,8 @@ from leap.soledad.common.document import SoledadDocument from leap.soledad.client.auth import TokenBasedAuth from leap.soledad.client.crypto import is_symmetrically_encrypted from leap.soledad.client.crypto import encrypt_doc, decrypt_doc -from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool +from leap.soledad.client.encdecpool import SyncEncrypterPool +from leap.soledad.client.encdecpool import SyncDecrypterPool from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS from leap.soledad.client.events import signal @@ -787,9 +788,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self._crypto = crypto self._stopped = True self._stop_lock = threading.Lock() - self._sync_exchange_lock = threading.Lock() self.source_replica_uid = source_replica_uid - self._defer_decryption = False self._syncer_pool = None # deferred decryption attributes @@ -813,9 +812,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ Tear down the SyncDecrypterPool. """ - if self._sync_decr_pool is not None: - self._sync_decr_pool.close() - self._sync_decr_pool = None + self._sync_decr_pool.close() + self._sync_decr_pool = None def _get_replica_uid(self, url): """ @@ -903,7 +901,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): doc = SoledadDocument(doc_id, rev, content) if is_symmetrically_encrypted(doc): if self._queue_for_decrypt: - self._save_encrypted_received_doc( + self._enqueue_encrypted_received_doc( doc, gen, trans_id, idx, total) else: # defer_decryption is False or no-sync-db fallback @@ -913,7 +911,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): # not symmetrically encrypted doc, insert it directly # or save it in the decrypted stage. if self._queue_for_decrypt: - self._save_received_doc(doc, gen, trans_id, idx, total) + self._enqueue_received_doc(doc, gen, trans_id, idx, total) else: self._return_doc_cb(doc, gen, trans_id) # ------------------------------------------------------------- @@ -996,6 +994,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self.stop(fail=True) break + if defer_decryption: + self._setup_sync_decr_pool() + t.doc_syncer.set_request_method( 'get', idx, sync_id, last_known_generation, last_known_trans_id) @@ -1021,6 +1022,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): t.join() if t.success: number_of_changes, _, _ = t.result + if defer_decryption and number_of_changes: + self._sync_decr_pool.set_docs_to_process( + number_of_changes) else: raise t.exception first_request = False @@ -1053,6 +1057,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): new_generation = doc_data['gen'] new_transaction_id = doc_data['trans_id'] + # decrypt docs in case of deferred decryption + if defer_decryption: + self._sync_decr_pool.wait() + self._teardown_sync_decr_pool() + return new_generation, new_transaction_id def sync_exchange(self, docs_by_generations, @@ -1103,14 +1112,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ self._ensure_callback = ensure_callback - if defer_decryption and self._sync_db is not None: - self._sync_exchange_lock.acquire() - self._setup_sync_decr_pool() - self._defer_decryption = True - else: - # fall back - defer_decryption = False - self.start() if sync_id is None: @@ -1120,10 +1121,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): setProxiedObject(self._insert_doc_cb[source_replica_uid], return_doc_cb) - # empty the database before starting a new sync - if defer_decryption is True and not self.clear_to_sync(): - self._sync_decr_pool.empty() - self._ensure_connection() if self._trace_hook: # for tests self._trace_hook('sync_exchange') @@ -1257,6 +1254,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): trans_id_after_send = response_dict['new_transaction_id'] # get docs from target + if self._sync_db is None: + defer_decryption = False if self.stopped is False: cur_target_gen, cur_target_trans_id = self._get_remote_docs( url, @@ -1266,12 +1265,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self._syncer_pool.cleanup() - # decrypt docs in case of deferred decryption - if defer_decryption: - self._sync_decr_pool.wait() - self._teardown_sync_decr_pool() - self._sync_exchange_lock.release() - # update gen and trans id info in case we just sent and did not # receive docs. if gen_after_send is not None and gen_after_send > cur_target_gen: @@ -1357,7 +1350,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): encr.TABLE_NAME,)) self._sync_db.execute(sql, (doc_id, doc_rev)) - def _save_encrypted_received_doc(self, doc, gen, trans_id, idx, total): + def _enqueue_encrypted_received_doc(self, doc, gen, trans_id, idx, total): """ Save a symmetrically encrypted incoming document into the received docs table in the sync db. A decryption task will pick it up @@ -1378,9 +1371,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): "Enqueueing doc for decryption: %d/%d." % (idx + 1, total)) self._sync_decr_pool.insert_encrypted_received_doc( - doc.doc_id, doc.rev, doc.content, gen, trans_id) + doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1) - def _save_received_doc(self, doc, gen, trans_id, idx, total): + def _enqueue_received_doc(self, doc, gen, trans_id, idx, total): """ Save any incoming document into the received docs table in the sync db. @@ -1399,23 +1392,12 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): "Enqueueing doc, no decryption needed: %d/%d." % (idx + 1, total)) self._sync_decr_pool.insert_received_doc( - doc.doc_id, doc.rev, doc.content, gen, trans_id) + doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1) # # Symmetric decryption of syncing docs # - def clear_to_sync(self): - """ - Return whether sync can proceed (ie, the received db table is empty). - - :return: Whether sync can proceed. - :rtype: bool - """ - if self._sync_decr_pool: - return self._sync_decr_pool.clear_to_sync() - return True - def set_decryption_callback(self, cb): """ Set callback to be called when the decryption finishes. |