diff options
Diffstat (limited to 'client')
| -rw-r--r-- | client/src/leap/soledad/client/sync.py | 10 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/target.py | 77 | 
2 files changed, 56 insertions, 31 deletions
| diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index be60d0ab..5d545a77 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -32,6 +32,7 @@ Extend u1db Synchronizer with the ability to:  import json  import logging +import traceback  from threading import Lock  from u1db import errors @@ -85,13 +86,11 @@ class SoledadSynchronizer(Synchronizer):              return self._sync(autocreate=autocreate,                                defer_decryption=defer_decryption)          except Exception: -            # We release the lock if there was an error. -            # Otherwise, the lock should be released from the function -            # `complete_sync`. -            self.release_syncing_lock()              # re-raising the exceptions to let syqlcipher.sync catch them              # (and re-create the syncer instance if needed)              raise +        finally: +            self.release_syncing_lock()      def _sync(self, autocreate=False, defer_decryption=True):          """ @@ -161,7 +160,6 @@ class SoledadSynchronizer(Synchronizer):          if not changes and target_last_known_gen == target_gen:              if target_trans_id != target_last_known_trans_id:                  raise errors.InvalidTransactionId -            self.release_syncing_lock()              return my_gen          # prepare to send all the changed docs @@ -205,6 +203,7 @@ class SoledadSynchronizer(Synchronizer):              self.complete_sync()          except Exception as e:              logger.error("Soledad sync error: %s" % str(e)) +            logger.error(traceback.format_exc())              sync_target.stop()          finally:              sync_target.close() @@ -228,7 +227,6 @@ class SoledadSynchronizer(Synchronizer):          # if gapless record current reached generation with target          self._record_sync_info_with_the_target(info["my_gen"]) -        self.syncing_lock.release()      @property      def syncing(self): diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 6d8ecfeb..70e4d3a2 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -794,15 +794,21 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          self.source_replica_uid = source_replica_uid          self._defer_decryption = False +        # deferred decryption attributes          self._sync_db = None +        self._sync_db_write_lock = None +        self._decryption_callback = None          self._sync_decr_pool = None          self._sync_watcher = None -          if sync_db and sync_db_write_lock is not None:              self._sync_db = sync_db -            self._decryption_callback = None              self._sync_db_write_lock = sync_db_write_lock +    def _setup_sync_decr_pool(self): +        """ +        Set up the SyncDecrypterPool for deferred decryption. +        """ +        if self._sync_decr_pool is None:              # initialize syncing queue decryption pool              self._sync_decr_pool = SyncDecrypterPool(                  self._crypto, self._sync_db, @@ -810,10 +816,33 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):                  insert_doc_cb=self._insert_doc_cb)              self._sync_decr_pool.set_source_replica_uid(                  self.source_replica_uid) + +    def _teardown_sync_decr_pool(self): +        """ +        Tear down the SyncDecrypterPool. +        """ +        if self._sync_decr_pool is not None: +            self._sync_decr_pool.close() +            self._sync_decr_pool = None + +    def _setup_sync_watcher(self): +        """ +        Set up the sync watcher for deferred decryption. +        """ +        if self._sync_watcher is None:              self._sync_watcher = TimerTask(                  self._decrypt_syncing_received_docs,                  delay=self.DECRYPT_TASK_PERIOD) +    def _teardown_sync_watcher(self): +        """ +        Tear down the sync watcher. +        """ +        if self._sync_watcher is not None: +            self._sync_watcher.stop() +            self._sync_watcher.shutdown() +            self._sync_watcher = None +      def _get_replica_uid(self, url):          """          Return replica uid from the url, or None. @@ -824,17 +853,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          replica_uid_match = re.findall("user-([0-9a-fA-F]+)", url)          return replica_uid_match[0] if len(replica_uid_match) > 0 else None -    def close(self): -        """ -        Cleanly close pool of workers. -        """ -        if self._sync_watcher is not None: -            self._sync_watcher.stop() -            self._sync_watcher.shutdown() -        if self._sync_decr_pool is not None: -            self._sync_decr_pool.close() -        HTTPSyncTarget.close(self) -      @staticmethod      def connect(url, source_replica_uid=None, crypto=None):          return SoledadSyncTarget( @@ -1044,9 +1062,18 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          if last_successful_thread is not None:              body, _ = last_successful_thread.response              parsed_body = json.loads(body) -            metadata = parsed_body[0] -            new_generation = metadata['new_generation'] -            new_transaction_id = metadata['new_transaction_id'] +            # get current target gen and trans id in case no documents were +            # transferred +            if len(parsed_body) == 1: +                metadata = parsed_body[0] +                new_generation = metadata['new_generation'] +                new_transaction_id = metadata['new_transaction_id'] +            # get current target gen and trans id from last transferred +            # document +            else: +                doc_data = parsed_body[1] +                new_generation = doc_data['gen'] +                new_transaction_id = doc_data['trans_id']          return new_generation, new_transaction_id @@ -1097,16 +1124,15 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          :rtype: tuple          """          self._ensure_callback = ensure_callback +          if defer_decryption: -            if self._sync_watcher is None: -                logger.warning( -                    "Soledad syncer: can't defer decryption, falling back to " -                    "normal syncing mode.") -                defer_decryption = False -            else: -                self._sync_exchange_lock.acquire() -                self._defer_decryption = True +            self._sync_exchange_lock.acquire() +            self._setup_sync_decr_pool() +            self._setup_sync_watcher() +            self._defer_decryption = True +          self.start() +          if sync_id is None:              sync_id = str(uuid4())          self.source_replica_uid = source_replica_uid @@ -1248,8 +1274,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          if defer_decryption:              while self.clear_to_sync() is False:                  sleep(self.DECRYPT_TASK_PERIOD) +            self._teardown_sync_watcher() +            self._teardown_sync_decr_pool()              self._sync_exchange_lock.release() -            self._sync_watcher.stop()          self.stop()          return cur_target_gen, cur_target_trans_id | 
