diff options
author | drebs <drebs@leap.se> | 2014-07-10 11:59:31 -0300 |
---|---|---|
committer | drebs <drebs@leap.se> | 2014-07-10 14:14:51 -0300 |
commit | d870e1353270c2c468f78f97e4b33f8008905608 (patch) | |
tree | ded194e851448a90b0bb43ceff5b3eb73708d30b /client/src/leap/soledad/client/target.py | |
parent | d0b57fe6babdadc39342afe080dd29a3c95f786e (diff) |
Fix recovery from failed sync.
Diffstat (limited to 'client/src/leap/soledad/client/target.py')
-rw-r--r-- | client/src/leap/soledad/client/target.py | 77 |
1 files changed, 52 insertions, 25 deletions
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 6d8ecfeb..70e4d3a2 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -794,15 +794,21 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self.source_replica_uid = source_replica_uid self._defer_decryption = False + # deferred decryption attributes self._sync_db = None + self._sync_db_write_lock = None + self._decryption_callback = None self._sync_decr_pool = None self._sync_watcher = None - if sync_db and sync_db_write_lock is not None: self._sync_db = sync_db - self._decryption_callback = None self._sync_db_write_lock = sync_db_write_lock + def _setup_sync_decr_pool(self): + """ + Set up the SyncDecrypterPool for deferred decryption. + """ + if self._sync_decr_pool is None: # initialize syncing queue decryption pool self._sync_decr_pool = SyncDecrypterPool( self._crypto, self._sync_db, @@ -810,10 +816,33 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): insert_doc_cb=self._insert_doc_cb) self._sync_decr_pool.set_source_replica_uid( self.source_replica_uid) + + def _teardown_sync_decr_pool(self): + """ + Tear down the SyncDecrypterPool. + """ + if self._sync_decr_pool is not None: + self._sync_decr_pool.close() + self._sync_decr_pool = None + + def _setup_sync_watcher(self): + """ + Set up the sync watcher for deferred decryption. + """ + if self._sync_watcher is None: self._sync_watcher = TimerTask( self._decrypt_syncing_received_docs, delay=self.DECRYPT_TASK_PERIOD) + def _teardown_sync_watcher(self): + """ + Tear down the sync watcher. + """ + if self._sync_watcher is not None: + self._sync_watcher.stop() + self._sync_watcher.shutdown() + self._sync_watcher = None + def _get_replica_uid(self, url): """ Return replica uid from the url, or None. @@ -824,17 +853,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): replica_uid_match = re.findall("user-([0-9a-fA-F]+)", url) return replica_uid_match[0] if len(replica_uid_match) > 0 else None - def close(self): - """ - Cleanly close pool of workers. - """ - if self._sync_watcher is not None: - self._sync_watcher.stop() - self._sync_watcher.shutdown() - if self._sync_decr_pool is not None: - self._sync_decr_pool.close() - HTTPSyncTarget.close(self) - @staticmethod def connect(url, source_replica_uid=None, crypto=None): return SoledadSyncTarget( @@ -1044,9 +1062,18 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): if last_successful_thread is not None: body, _ = last_successful_thread.response parsed_body = json.loads(body) - metadata = parsed_body[0] - new_generation = metadata['new_generation'] - new_transaction_id = metadata['new_transaction_id'] + # get current target gen and trans id in case no documents were + # transferred + if len(parsed_body) == 1: + metadata = parsed_body[0] + new_generation = metadata['new_generation'] + new_transaction_id = metadata['new_transaction_id'] + # get current target gen and trans id from last transferred + # document + else: + doc_data = parsed_body[1] + new_generation = doc_data['gen'] + new_transaction_id = doc_data['trans_id'] return new_generation, new_transaction_id @@ -1097,16 +1124,15 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :rtype: tuple """ self._ensure_callback = ensure_callback + if defer_decryption: - if self._sync_watcher is None: - logger.warning( - "Soledad syncer: can't defer decryption, falling back to " - "normal syncing mode.") - defer_decryption = False - else: - self._sync_exchange_lock.acquire() - self._defer_decryption = True + self._sync_exchange_lock.acquire() + self._setup_sync_decr_pool() + self._setup_sync_watcher() + self._defer_decryption = True + self.start() + if sync_id is None: sync_id = str(uuid4()) self.source_replica_uid = source_replica_uid @@ -1248,8 +1274,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): if defer_decryption: while self.clear_to_sync() is False: sleep(self.DECRYPT_TASK_PERIOD) + self._teardown_sync_watcher() + self._teardown_sync_decr_pool() self._sync_exchange_lock.release() - self._sync_watcher.stop() self.stop() return cur_target_gen, cur_target_trans_id |