diff options
-rw-r--r-- | client/src/leap/soledad/client/sync.py | 10 | ||||
-rw-r--r-- | client/src/leap/soledad/client/target.py | 77 |
2 files changed, 56 insertions, 31 deletions
diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index be60d0ab..5d545a77 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -32,6 +32,7 @@ Extend u1db Synchronizer with the ability to: import json import logging +import traceback from threading import Lock from u1db import errors @@ -85,13 +86,11 @@ class SoledadSynchronizer(Synchronizer): return self._sync(autocreate=autocreate, defer_decryption=defer_decryption) except Exception: - # We release the lock if there was an error. - # Otherwise, the lock should be released from the function - # `complete_sync`. - self.release_syncing_lock() # re-raising the exceptions to let syqlcipher.sync catch them # (and re-create the syncer instance if needed) raise + finally: + self.release_syncing_lock() def _sync(self, autocreate=False, defer_decryption=True): """ @@ -161,7 +160,6 @@ class SoledadSynchronizer(Synchronizer): if not changes and target_last_known_gen == target_gen: if target_trans_id != target_last_known_trans_id: raise errors.InvalidTransactionId - self.release_syncing_lock() return my_gen # prepare to send all the changed docs @@ -205,6 +203,7 @@ class SoledadSynchronizer(Synchronizer): self.complete_sync() except Exception as e: logger.error("Soledad sync error: %s" % str(e)) + logger.error(traceback.format_exc()) sync_target.stop() finally: sync_target.close() @@ -228,7 +227,6 @@ class SoledadSynchronizer(Synchronizer): # if gapless record current reached generation with target self._record_sync_info_with_the_target(info["my_gen"]) - self.syncing_lock.release() @property def syncing(self): diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 6d8ecfeb..70e4d3a2 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -794,15 +794,21 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self.source_replica_uid = source_replica_uid self._defer_decryption = False + # deferred decryption attributes self._sync_db = None + self._sync_db_write_lock = None + self._decryption_callback = None self._sync_decr_pool = None self._sync_watcher = None - if sync_db and sync_db_write_lock is not None: self._sync_db = sync_db - self._decryption_callback = None self._sync_db_write_lock = sync_db_write_lock + def _setup_sync_decr_pool(self): + """ + Set up the SyncDecrypterPool for deferred decryption. + """ + if self._sync_decr_pool is None: # initialize syncing queue decryption pool self._sync_decr_pool = SyncDecrypterPool( self._crypto, self._sync_db, @@ -810,10 +816,33 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): insert_doc_cb=self._insert_doc_cb) self._sync_decr_pool.set_source_replica_uid( self.source_replica_uid) + + def _teardown_sync_decr_pool(self): + """ + Tear down the SyncDecrypterPool. + """ + if self._sync_decr_pool is not None: + self._sync_decr_pool.close() + self._sync_decr_pool = None + + def _setup_sync_watcher(self): + """ + Set up the sync watcher for deferred decryption. + """ + if self._sync_watcher is None: self._sync_watcher = TimerTask( self._decrypt_syncing_received_docs, delay=self.DECRYPT_TASK_PERIOD) + def _teardown_sync_watcher(self): + """ + Tear down the sync watcher. + """ + if self._sync_watcher is not None: + self._sync_watcher.stop() + self._sync_watcher.shutdown() + self._sync_watcher = None + def _get_replica_uid(self, url): """ Return replica uid from the url, or None. @@ -824,17 +853,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): replica_uid_match = re.findall("user-([0-9a-fA-F]+)", url) return replica_uid_match[0] if len(replica_uid_match) > 0 else None - def close(self): - """ - Cleanly close pool of workers. - """ - if self._sync_watcher is not None: - self._sync_watcher.stop() - self._sync_watcher.shutdown() - if self._sync_decr_pool is not None: - self._sync_decr_pool.close() - HTTPSyncTarget.close(self) - @staticmethod def connect(url, source_replica_uid=None, crypto=None): return SoledadSyncTarget( @@ -1044,9 +1062,18 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): if last_successful_thread is not None: body, _ = last_successful_thread.response parsed_body = json.loads(body) - metadata = parsed_body[0] - new_generation = metadata['new_generation'] - new_transaction_id = metadata['new_transaction_id'] + # get current target gen and trans id in case no documents were + # transferred + if len(parsed_body) == 1: + metadata = parsed_body[0] + new_generation = metadata['new_generation'] + new_transaction_id = metadata['new_transaction_id'] + # get current target gen and trans id from last transferred + # document + else: + doc_data = parsed_body[1] + new_generation = doc_data['gen'] + new_transaction_id = doc_data['trans_id'] return new_generation, new_transaction_id @@ -1097,16 +1124,15 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :rtype: tuple """ self._ensure_callback = ensure_callback + if defer_decryption: - if self._sync_watcher is None: - logger.warning( - "Soledad syncer: can't defer decryption, falling back to " - "normal syncing mode.") - defer_decryption = False - else: - self._sync_exchange_lock.acquire() - self._defer_decryption = True + self._sync_exchange_lock.acquire() + self._setup_sync_decr_pool() + self._setup_sync_watcher() + self._defer_decryption = True + self.start() + if sync_id is None: sync_id = str(uuid4()) self.source_replica_uid = source_replica_uid @@ -1248,8 +1274,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): if defer_decryption: while self.clear_to_sync() is False: sleep(self.DECRYPT_TASK_PERIOD) + self._teardown_sync_watcher() + self._teardown_sync_decr_pool() self._sync_exchange_lock.release() - self._sync_watcher.stop() self.stop() return cur_target_gen, cur_target_trans_id |