diff options
Diffstat (limited to 'client')
| -rw-r--r-- | client/changes/bug_reset-synchronizer-state | 2 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/__init__.py | 10 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/sqlcipher.py | 72 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/sync.py | 176 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/target.py | 60 | 
5 files changed, 81 insertions, 239 deletions
| diff --git a/client/changes/bug_reset-synchronizer-state b/client/changes/bug_reset-synchronizer-state new file mode 100644 index 00000000..9678b36b --- /dev/null +++ b/client/changes/bug_reset-synchronizer-state @@ -0,0 +1,2 @@ +  o Reset synchronizer state in order to reuse the same synchronizer object +    multiple times. diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py index 2fb33184..0d3a21fd 100644 --- a/client/src/leap/soledad/client/__init__.py +++ b/client/src/leap/soledad/client/__init__.py @@ -34,8 +34,6 @@ import urlparse  import hmac  from hashlib import sha256 -from threading import Lock -from collections import defaultdict  try:      import cchardet as chardet @@ -224,12 +222,6 @@ class Soledad(object):      Prefix for default values for path.      """ -    syncing_lock = defaultdict(Lock) -    """ -    A dictionary that hold locks which avoid multiple sync attempts from the -    same database replica. -    """ -      def __init__(self, uuid, passphrase, secrets_path, local_db_path,                   server_url, cert_file, auth_token=None, secret_id=None):          """ @@ -1064,8 +1056,6 @@ class Soledad(object):          :rtype: str          """          if self._db: -            # acquire lock before attempt to sync -            with Soledad.syncing_lock[self._db._get_replica_uid()]:                  local_gen = self._db.sync(                      urlparse.urljoin(self.server_url, 'user-%s' % self._uuid),                      creds=self._creds, autocreate=False) diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 74351116..5ffa9c7e 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -52,12 +52,13 @@ import json  from hashlib import sha256  from contextlib import contextmanager +from collections import defaultdict  from pysqlcipher import dbapi2  from u1db.backends import sqlite_backend  from u1db import errors as u1db_errors -from leap.soledad.client.sync import Synchronizer, ClientSyncState +from leap.soledad.client.sync import Synchronizer  from leap.soledad.client.target import SoledadSyncTarget  from leap.soledad.common.document import SoledadDocument @@ -153,6 +154,13 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):      create_doc_lock = threading.Lock()      update_indexes_lock = threading.Lock() +    syncing_lock = defaultdict(threading.Lock) +    """ +    A dictionary that hold locks which avoid multiple sync attempts from the +    same database replica. +    """ + +      def __init__(self, sqlcipher_file, password, document_factory=None,                   crypto=None, raw_key=False, cipher='aes-256-cbc',                   kdf_iter=4000, cipher_page_size=1024): @@ -343,6 +351,10 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):          """          Synchronize documents with remote replica exposed at url. +        There can be at most one instance syncing the same database replica at +        the same time, so this method will block until the syncing lock can be +        acquired. +          :param url: The url of the target replica to sync with.          :type url: str          :param creds: optional dictionary giving credentials. @@ -355,6 +367,8 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):          :rtype: int          """          res = None +        # the following context manager blocks until the syncing lock can be +        # acquired.          with self.syncer(url, creds=creds) as syncer:              res = syncer.sync(autocreate=autocreate)          return res @@ -371,10 +385,16 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):      def syncer(self, url, creds=None):          """          Accesor for synchronizer. + +        As we reuse the same synchronizer for every sync, there can be only +        one instance synchronizing the same database replica at the same time. +        Because of that, this method blocks until the syncing lock can be +        acquired.          """ -        syncer = self._get_syncer(url, creds=creds) -        yield syncer -        syncer.sync_target.close() +        with SQLCipherDatabase.syncing_lock[self._get_replica_uid()]: +            syncer = self._get_syncer(url, creds=creds) +            yield syncer +            syncer.sync_target.close()      def _get_syncer(self, url, creds=None):          """ @@ -401,6 +421,10 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):                                    creds=creds,                                    crypto=self._crypto))              self._syncers[url] = (h, syncer) +        # in order to reuse the same synchronizer multiple times we have to +        # reset its state (i.e. the number of documents received from target +        # and inserted in the local replica). +        syncer.num_inserted = 0          return syncer      def _extra_schema_init(self, c): @@ -889,45 +913,5 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):          if self._db_handle is not None:              self._db_handle.close() -    def _get_stored_sync_state(self): -        """ -        Retrieve the currently stored sync state. - -        :return: The current stored sync state or None if there's no stored -                 state. -        :rtype: dict or None -        """ -        c = self._db_handle.cursor() -        c.execute("SELECT value FROM u1db_config" -                  " WHERE name = 'sync_state'") -        val = c.fetchone() -        if val is None: -            return None -        return json.loads(val[0]) - -    def _set_stored_sync_state(self, state): -        """ -        Stored the sync state. - -        :param state: The sync state to be stored or None to delete any stored -                      state. -        :type state: dict or None -        """ -        c = self._db_handle.cursor() -        if state is None: -            c.execute("DELETE FROM u1db_config" -                      " WHERE name = 'sync_state'") -        else: -            c.execute("INSERT OR REPLACE INTO u1db_config" -                      " VALUES ('sync_state', ?)", -                      (json.dumps(state),)) - -    stored_sync_state = property( -        _get_stored_sync_state, _set_stored_sync_state, -        doc="The current sync state dict.") - -    @property -    def sync_state(self): -        return ClientSyncState(self)  sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase) diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index 5285d540..56e63416 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -27,103 +27,6 @@ from u1db import errors  from u1db.sync import Synchronizer as U1DBSynchronizer -class ClientSyncState(object): -    """ -    The state of the current sync session, as stored on the client. -    """ - -    _private_attrs = [ -        '_db', -    ] - -    _public_attrs = { -        'target_replica_uid': None, -        'target_gen': None, -        'target_trans_id': None, -        'target_my_gen': None, -        'target_my_trans_id': None, -        'target_last_known_gen': None, -        'target_last_known_trans_id': None, -        'my_gen': None, -        'changes': None, -        'sent': 0, -        'received': 0, -    } - -    @property -    def _public_attr_keys(self): -        return self._public_attrs.keys() - -    def __init__(self, db=None): -        """ -        Initialize the client sync state. - -        :param db: The database where to fetch/store the sync state. -        :type db: SQLCipherDatabase -        """ -        self._db = db -        self._init_state() - -    def __setattr__(self, attr, val): -        """ -        Prevent setting arbitrary attributes. - -        :param attr: The attribute name. -        :type attr: str -        :param val: The value to be set. -        :type val: anything -        """ -        if attr not in self._public_attr_keys + self._private_attrs: -            raise Exception -        object.__setattr__(self, attr, val) - -    def _init_state(self): -        """ -        Initialize current sync state, potentially fetching sync info stored -        in database. -        """ -        # set local default attributes -        for attr in self._public_attr_keys: -            setattr(self, attr, self._public_attrs[attr]) -        # fetch info from stored sync state -        sync_state_dict = None -        if self._db is not None: -            sync_state_dict = self._db.stored_sync_state -        if sync_state_dict is not None: -            for attr in self._public_attr_keys: -                setattr(self, attr, sync_state_dict[attr]) - -    def save(self): -        """ -        Save the current sync state in the database. -        """ -        sync_state_dict = {} -        for attr in self._public_attr_keys: -            sync_state_dict[attr] = getattr(self, attr) -        if self._db is not None: -            self._db.stored_sync_state = sync_state_dict - -    def clear(self): -        """ -        Clear the sync state info data. -        """ -        if self._db is not None: -            self._db.stored_sync_state = None -        self._init_state() - -    def has_stored_info(self): -        """ -        Return whether there is any sync state info stored on the database. - -        :return: Whether there's any sync state info store on db. -        :rtype: bool -        """ -        return self._db is not None and self._db.stored_sync_state is not None - -    def __str__(self): -        return 'ClientSyncState: %s' % ', '.join( -            ['%s: %s' % (k, getattr(self, k)) for k in self._public_attr_keys]) -  class Synchronizer(U1DBSynchronizer):      """      Collect the state around synchronizing 2 U1DB replicas. @@ -146,36 +49,20 @@ class Synchronizer(U1DBSynchronizer):          """          sync_target = self.sync_target -        # recover current sync state from source database -        sync_state = self.source.sync_state -        self.target_replica_uid = sync_state.target_replica_uid -        target_gen = sync_state.target_gen -        target_trans_id = sync_state.target_trans_id -        target_my_gen = sync_state.target_my_gen -        target_my_trans_id = sync_state.target_my_trans_id -        target_last_known_gen = sync_state.target_last_known_gen -        target_last_known_trans_id = \ -            sync_state.target_last_known_trans_id -        my_gen = sync_state.my_gen -        changes = sync_state.changes -        sent = sync_state.sent -        received = sync_state.received -          # get target identifier, its current generation,          # and its last-seen database generation for this source          ensure_callback = None -        if not sync_state.has_stored_info(): -            try: -                (self.target_replica_uid, target_gen, target_trans_id, -                 target_my_gen, target_my_trans_id) = \ -                    sync_target.get_sync_info(self.source._replica_uid) -            except errors.DatabaseDoesNotExist: -                if not autocreate: -                    raise -                # will try to ask sync_exchange() to create the db -                self.target_replica_uid = None -                target_gen, target_trans_id = (0, '') -                target_my_gen, target_my_trans_id = (0, '') +        try: +            (self.target_replica_uid, target_gen, target_trans_id, +             target_my_gen, target_my_trans_id) = \ +                sync_target.get_sync_info(self.source._replica_uid) +        except errors.DatabaseDoesNotExist: +            if not autocreate: +                raise +            # will try to ask sync_exchange() to create the db +            self.target_replica_uid = None +            target_gen, target_trans_id = (0, '') +            target_my_gen, target_my_trans_id = (0, '')          # make sure we'll have access to target replica uid once it exists          if self.target_replica_uid is None: @@ -192,17 +79,15 @@ class Synchronizer(U1DBSynchronizer):              target_my_gen, target_my_trans_id)          # what's changed since that generation and this current gen -        if not sync_state.has_stored_info(): -            my_gen, _, changes = self.source.whats_changed(target_my_gen) +        my_gen, _, changes = self.source.whats_changed(target_my_gen)          # get source last-seen database generation for the target -        if not sync_state.has_stored_info(): -            if self.target_replica_uid is None: -                target_last_known_gen, target_last_known_trans_id = 0, '' -            else: -                target_last_known_gen, target_last_known_trans_id = \ -                    self.source._get_replica_gen_and_trans_id( -                        self.target_replica_uid) +        if self.target_replica_uid is None: +            target_last_known_gen, target_last_known_trans_id = 0, '' +        else: +            target_last_known_gen, target_last_known_trans_id = \ +                self.source._get_replica_gen_and_trans_id( +                    self.target_replica_uid)          # validate transaction ids          if not changes and target_last_known_gen == target_gen: @@ -220,20 +105,6 @@ class Synchronizer(U1DBSynchronizer):              _, gen, trans = changes[idx]              docs_by_generation.append((doc, gen, trans))              idx += 1 -        # store current sync state info -        if not sync_state.has_stored_info(): -            sync_state.target_replica_uid = self.target_replica_uid -            sync_state.target_gen = target_gen -            sync_state.target_trans_id = target_trans_id -            sync_state.target_my_gen = target_my_gen -            sync_state.target_my_trans_id = target_my_trans_id -            sync_state.my_gen = my_gen -            sync_state.changes = changes -            sync_state.target_last_known_trans_id = \ -                target_last_known_trans_id -            sync_state.target_last_known_gen = target_last_known_gen -            sync_state.sent = sent = 0 -            sync_state.received = received = 0          # exchange documents and try to insert the returned ones with          # the target, return target synced-up-to gen. @@ -243,16 +114,7 @@ class Synchronizer(U1DBSynchronizer):          new_gen, new_trans_id = sync_target.sync_exchange(              docs_by_generation, self.source._replica_uid,              target_last_known_gen, target_last_known_trans_id, -            self._insert_doc_from_target, ensure_callback=ensure_callback, -            sync_state=sync_state) - -        # save sync state info if the sync was interrupted -        if new_gen is None and new_trans_id is None: -            sync_state.save() -            return my_gen - -        # sync exchange was succesfull, remove sync state info from source -        sync_state.clear() +            self._insert_doc_from_target, ensure_callback=ensure_callback)          # record target synced-up-to generation including applying what we sent          self.source._set_replica_gen_and_trans_id( diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 93de98d3..968545b6 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -63,7 +63,6 @@ from leap.soledad.client.events import (      SOLEDAD_SYNC_RECEIVE_STATUS,      signal,  ) -from leap.soledad.client.sync import ClientSyncState  logger = logging.getLogger(__name__) @@ -321,7 +320,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          HTTPSyncTarget.__init__(self, url, creds)          self._crypto = crypto          self._stopped = True -        self._sync_state = None          self._stop_lock = threading.Lock()      def _init_post_request(self, url, action, headers, content_length): @@ -347,7 +345,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          self._conn.endheaders()      def _get_remote_docs(self, url, last_known_generation, last_known_trans_id, -                         headers, return_doc_cb, ensure_callback=None): +                         headers, return_doc_cb, ensure_callback, sync_id):          """          Fetch sync documents from the remote database and insert them in the          local database. @@ -377,9 +375,13 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          :rtype: list of str          """ -        def _post_get_doc(): +        def _post_get_doc(received):              """              Get a sync document from server by means of a POST request. + +            :param received: The number of documents already received in the +                             current sync session. +            :type received: int              """              entries = ['[']              size = 1 @@ -388,10 +390,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):                  '', entries,                  last_known_generation=last_known_generation,                  last_known_trans_id=last_known_trans_id, +                sync_id=sync_id,                  ensure=ensure_callback is not None)              # inform server of how many documents have already been received              size += self._prepare( -                ',', entries, received=self._sync_state.received) +                ',', entries, received=received)              entries.append('\r\n]')              size += len(entries[-1])              # send headers @@ -402,14 +405,16 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):              return self._response()          number_of_changes = None +        received = 0 -        while number_of_changes is None or \ -                self._sync_state.received < number_of_changes: +        new_generation = last_known_generation +        new_transaction_id = last_known_trans_id +        while number_of_changes is None or received < number_of_changes:              # bail out if sync process was interrupted              if self.stopped is True: -                return None, None +                return last_known_generation, last_known_trans_id              # try to fetch one document from target -            data, _ = _post_get_doc() +            data, _ = _post_get_doc(received)              # decode incoming stream              parts = data.splitlines()              if not parts or parts[0] != '[' or parts[-1] != ']': @@ -424,6 +429,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):                  soledad_assert('new_generation' in metadata)                  soledad_assert('new_transaction_id' in metadata)                  number_of_changes = metadata['number_of_changes'] +                new_generation = metadata['new_generation'] +                new_transaction_id = metadata['new_transaction_id']              except json.JSONDecodeError, AssertionError:                  raise BrokenSyncStream              # make sure we have replica_uid from fresh new dbs @@ -453,12 +460,12 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):              # end of symmetric decryption              # -------------------------------------------------------------              return_doc_cb(doc, entry['gen'], entry['trans_id']) -            self._sync_state.received += 1 +            received += 1              signal(                  SOLEDAD_SYNC_RECEIVE_STATUS,                  "%d/%d" % -                (self._sync_state.received, number_of_changes)) -        return metadata['new_generation'], metadata['new_transaction_id'] +                (received, number_of_changes)) +        return new_generation, new_transaction_id      def _request(self, method, url_parts, params=None, body=None,                   content_type=None): @@ -566,8 +573,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):      def sync_exchange(self, docs_by_generations, source_replica_uid,                        last_known_generation, last_known_trans_id, -                      return_doc_cb, ensure_callback=None, -                      sync_state=None): +                      return_doc_cb, ensure_callback=None):          """          Find out which documents the remote database does not know about,          encrypt and send them. @@ -596,12 +602,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          :return: The new generation and transaction id of the target replica.          :rtype: tuple          """ -        # get the sync state information from client -        self._sync_state = sync_state -        if self._sync_state is None: -            self._sync_state = ClientSyncState() -          self.start() +        sync_id = str(uuid4())          self._ensure_connection()          if self._trace_hook:  # for tests @@ -610,7 +612,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          headers = self._sign_request('POST', url, {})          def _post_put_doc(headers, last_known_generation, last_known_trans_id, -                          id, rev, content, gen, trans_id): +                          id, rev, content, gen, trans_id, sync_id):              """              Put a sync document on server by means of a POST request. @@ -626,6 +628,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):                  '', entries,                  last_known_generation=last_known_generation,                  last_known_trans_id=last_known_trans_id, +                sync_id=sync_id,                  ensure=ensure_callback is not None)              # add the document to the request              size += self._prepare( @@ -645,11 +648,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          cur_target_gen = last_known_generation          cur_target_trans_id = last_known_trans_id -        # skip docs that were already sent -        if self._sync_state.sent > 0: -            docs_by_generations = docs_by_generations[self._sync_state.sent:] -          # send docs +        sent = 0 +        signal( +            SOLEDAD_SYNC_SEND_STATUS, +            "%d/%d" % (0, len(docs_by_generations)))          for doc, gen, trans_id in docs_by_generations:              # allow for interrupting the sync process              if self.stopped is True: @@ -668,17 +671,18 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):              # -------------------------------------------------------------              cur_target_gen, cur_target_trans_id = _post_put_doc(                  headers, cur_target_gen, cur_target_trans_id, id=doc.doc_id, -                rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id) -            self._sync_state.sent += 1 +                rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id, +                sync_id=sync_id) +            sent += 1              signal(                  SOLEDAD_SYNC_SEND_STATUS, -                "%d/%d" % (self._sync_state.sent, len(docs_by_generations))) +                "%d/%d" % (sent, len(docs_by_generations)))          # get docs from target          cur_target_gen, cur_target_trans_id = self._get_remote_docs(              url,              last_known_generation, last_known_trans_id, headers, -            return_doc_cb, ensure_callback) +            return_doc_cb, ensure_callback, sync_id)          self.stop()          return cur_target_gen, cur_target_trans_id | 
