diff options
| author | drebs <drebs@leap.se> | 2014-04-30 12:52:24 -0300 | 
|---|---|---|
| committer | drebs <drebs@leap.se> | 2014-05-22 18:44:03 -0300 | 
| commit | 73d431a035fcdce8d623eefde2d62f28687fdb36 (patch) | |
| tree | 03822bf442514677af15fbae1d62b7196b45f5aa | |
| parent | ad748eb838a15b0263fdf18813404d3bee58cd03 (diff) | |
Allow for interrupting and recovering sync (#5571).
| -rw-r--r-- | client/changes/feature_5571_allow-for-interrupting-and-recovering-sync | 1 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/__init__.py | 7 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/sqlcipher.py | 52 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/sync.py | 262 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/target.py | 73 | ||||
| -rw-r--r-- | common/src/leap/soledad/common/tests/test_sync.py | 176 | ||||
| -rw-r--r-- | server/changes/feature_5571_allow-for-interrupting-and-recovering-sync | 1 | 
7 files changed, 556 insertions, 16 deletions
| diff --git a/client/changes/feature_5571_allow-for-interrupting-and-recovering-sync b/client/changes/feature_5571_allow-for-interrupting-and-recovering-sync new file mode 100644 index 00000000..0087c535 --- /dev/null +++ b/client/changes/feature_5571_allow-for-interrupting-and-recovering-sync @@ -0,0 +1 @@ +  o Allow for interrupting and recovering sync (#5517). diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py index 8881b422..f92317e9 100644 --- a/client/src/leap/soledad/client/__init__.py +++ b/client/src/leap/soledad/client/__init__.py @@ -1096,6 +1096,13 @@ class Soledad(object):                  signal(SOLEDAD_DONE_DATA_SYNC, self._uuid)                  return local_gen +    def stop_sync(self): +        """ +        Stop the current syncing process. +        """ +        if self._db: +            self._db.stop_sync() +      def need_sync(self, url):          """          Return if local db replica differs from remote url's replica. diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 04f8ebf9..0885a35f 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -55,14 +55,14 @@ from contextlib import contextmanager  from pysqlcipher import dbapi2  from u1db.backends import sqlite_backend -from u1db.sync import Synchronizer  from u1db import errors as u1db_errors +from leap.soledad.client.sync import Synchronizer  from leap.soledad.client.target import SoledadSyncTarget  from leap.soledad.common.document import SoledadDocument -logger = logging.getLogger(__name__) +logger = logging.getLogger(__name__)  # Monkey-patch u1db.backends.sqlite_backend with pysqlcipher.dbapi2  sqlite_backend.dbapi2 = dbapi2 @@ -214,6 +214,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):                                     syncable=syncable)          self.set_document_factory(factory)          self._syncers = {} +        self._real_sync_state = None      @classmethod      def _open_database(cls, sqlcipher_file, password, document_factory=None, @@ -359,6 +360,14 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):              res = syncer.sync(autocreate=autocreate)          return res +    def stop_sync(self): +        """ +        Interrupt all ongoing syncs. +        """ +        for url in self._syncers: +            _, syncer = self._syncers[url] +            syncer.stop() +      @contextmanager      def syncer(self, url, creds=None):          """ @@ -379,7 +388,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):          :type creds: dict          :return: A synchronizer. -        :rtype: u1db.sync.Synchronizer +        :rtype: Synchronizer          """          # we want to store at most one syncer for each url, so we also store a          # hash of the connection credentials and replace the stored syncer for @@ -881,5 +890,42 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):          if self._db_handle is not None:              self._db_handle.close() +    def _get_sync_state(self): +        """ +        Get the current sync state. + +        :return: The current sync state. +        :rtype: dict +        """ +        if self._real_sync_state is not None: +            return self._real_sync_state +        c = self._db_handle.cursor() +        c.execute("SELECT value FROM u1db_config" +                  " WHERE name = 'sync_state'") +        val = c.fetchone() +        if val is None: +            return None +        self._real_sync_state = json.loads(val[0]) +        return self._real_sync_state + +    def _set_sync_state(self, state): +        """ +        Set the current sync state. + +        :param state: The sync state to be set. +        :type state: dict +        """ +        c = self._db_handle.cursor() +        if state is None: +            c.execute("DELETE FROM u1db_config" +                      " WHERE name = 'sync_state'") +        else: +            c.execute("INSERT OR REPLACE INTO u1db_config" +                      " VALUES ('sync_state', ?)", +                      (json.dumps(state),)) +        self._real_sync_state = state + +    sync_state = property( +        _get_sync_state, _set_sync_state, doc="The current sync state.")  sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase) diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py new file mode 100644 index 00000000..6e9e23fa --- /dev/null +++ b/client/src/leap/soledad/client/sync.py @@ -0,0 +1,262 @@ +# -*- coding: utf-8 -*- +# sync.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Sync infrastructure that can be interrupted and recovered. +""" + +import json + + +from u1db import errors +from u1db.sync import Synchronizer as U1DBSynchronizer + + +class ClientSyncState(object): +    """ +    The state of the current sync session, as stored on the client. +    """ + +    _private_attrs = [ +        '_db', +    ] + +    _public_attrs = { +        'target_replica_uid': None, +        'target_gen': None, +        'target_trans_id': None, +        'target_my_gen': None, +        'target_my_trans_id': None, +        'target_last_known_gen': None, +        'target_last_known_trans_id': None, +        'my_gen': None, +        'changes': None, +        'sent': 0, +        'received': 0, +    } + +    @property +    def _public_attr_keys(self): +        return [k for k in self._public_attrs] + +    def __init__(self, db=None): +        """ +        Initialize the client sync state. + +        :param db: The database where to fetch/store the sync state. +        :type db: SQLCipherDatabase +        """ +        self._db = db +        self._init_state() + +    def __setattr__(self, attr, val): +        """ +        Prevent setting arbitrary attributes. + +        :param attr: The attribute name. +        :type attr: str +        :param val: The value to be set. +        :type val: anything +        """ +        if attr not in self._public_attr_keys + self._private_attrs: +            raise Exception +        object.__setattr__(self, attr, val) + +    def _init_state(self): +        """ +        Initialize current sync state, potentially fetching sync info stored +        in database. +        """ +        # set local default attributes +        for attr in self._public_attr_keys: +            setattr(self, attr, self._public_attrs[attr]) +        # fetch info from stored sync state +        sync_state = None +        if self._db is not None: +            sync_state = self._db.sync_state +        if sync_state is not None: +            for attr in self._public_attr_keys: +                setattr(self, attr, sync_state[attr]) + +    def save(self): +        """ +        Save the current sync state in the database. +        """ +        sync_state = {} +        for attr in self._public_attr_keys: +            sync_state[attr] = getattr(self, attr) +        if self._db is not None: +            self._db.sync_state = sync_state + +    def clear(self): +        """ +        Clear the sync state info data. +        """ +        if self._db is not None: +            self._db.sync_state = None +        self._init_state() + +    def has_stored_info(self): +        """ +        Return wether there is any sync state info stored on the database. + +        :return: Wether there's any sync state info store on db. +        :rtype: bool +        """ +        return self._db is not None and self._db.sync_state is not None + +    def __str__(self): +        ', '.join(['%s: %s' % (k, getattr(self, k)) for k in self._public_attr_keys]) + +class Synchronizer(U1DBSynchronizer): +    """ +    Collect the state around synchronizing 2 U1DB replicas. + +    Modified to allow for interrupting the synchronization process. +    """ + +    def stop(self): +        """ +        Stop the current sync in progress. +        """ +        self.sync_target.stop() + +    def sync(self, autocreate=False): +        """ +        Synchronize documents between source and target. + +        :param autocreate: Wether the target replica should be created or not. +        :type autocreate: bool +        """ +        sync_target = self.sync_target + +        # recover current sync state from source database +        sync_state = ClientSyncState(self.source) +        self.target_replica_uid = sync_state.target_replica_uid +        target_gen = sync_state.target_gen +        target_trans_id = sync_state.target_trans_id +        target_my_gen = sync_state.target_my_gen +        target_my_trans_id = sync_state.target_my_trans_id +        target_last_known_gen = sync_state.target_last_known_gen +        target_last_known_trans_id = \ +            sync_state.target_last_known_trans_id +        my_gen = sync_state.my_gen +        changes = sync_state.changes +        sent = sync_state.sent +        received = sync_state.received + +        # get target identifier, its current generation, +        # and its last-seen database generation for this source +        ensure_callback = None +        if not sync_state.has_stored_info(): +            try: +                (self.target_replica_uid, target_gen, target_trans_id, +                 target_my_gen, target_my_trans_id) = \ +                    sync_target.get_sync_info(self.source._replica_uid) +            except errors.DatabaseDoesNotExist: +                if not autocreate: +                    raise +                # will try to ask sync_exchange() to create the db +                self.target_replica_uid = None +                target_gen, target_trans_id = 0, '' +                target_my_gen, target_my_trans_id = 0, '' + +        # make sure we'll have access to target replica uid once it exists +        if self.target_replica_uid is None: + +            def ensure_callback(replica_uid): +                self.target_replica_uid = replica_uid + +        # make sure we're not syncing one replica with itself +        if self.target_replica_uid == self.source._replica_uid: +            raise errors.InvalidReplicaUID + +        # validate the info the target has about the source replica +        self.source.validate_gen_and_trans_id( +            target_my_gen, target_my_trans_id) + +        # what's changed since that generation and this current gen +        if not sync_state.has_stored_info(): +            my_gen, _, changes = self.source.whats_changed(target_my_gen) + +        # get source last-seen database generation for the target +        if not sync_state.has_stored_info(): +            if self.target_replica_uid is None: +                target_last_known_gen, target_last_known_trans_id = 0, '' +            else: +                target_last_known_gen, target_last_known_trans_id = \ +                    self.source._get_replica_gen_and_trans_id( +                        self.target_replica_uid) + +        # validate transaction ids +        if not changes and target_last_known_gen == target_gen: +            if target_trans_id != target_last_known_trans_id: +                raise errors.InvalidTransactionId +            return my_gen + +        # prepare to send all the changed docs +        changed_doc_ids = [doc_id for doc_id, _, _ in changes] +        docs_to_send = self.source.get_docs( +            changed_doc_ids, check_for_conflicts=False, include_deleted=True) +        docs_by_generation = [] +        idx = 0 +        for doc in docs_to_send: +            _, gen, trans = changes[idx] +            docs_by_generation.append((doc, gen, trans)) +            idx += 1 +        # store current sync state info +        if not sync_state.has_stored_info(): +            sync_state.target_replica_uid = self.target_replica_uid +            sync_state.target_gen = target_gen +            sync_state.target_trans_id = target_trans_id +            sync_state.target_my_gen = target_my_gen +            sync_state.target_my_trans_id = target_my_trans_id +            sync_state.my_gen = my_gen +            sync_state.changes = changes +            sync_state.target_last_known_trans_id = \ +                target_last_known_trans_id +            sync_state.target_last_known_gen = target_last_known_gen +            sync_state.sent = sent = 0 +            sync_state.received = received = 0 + +        # exchange documents and try to insert the returned ones with +        # the target, return target synced-up-to gen. +        # +        # The sync_exchange method may be interrupted, in which case it will +        # return a tuple of Nones. +        new_gen, new_trans_id = sync_target.sync_exchange( +            docs_by_generation, self.source._replica_uid, +            target_last_known_gen, target_last_known_trans_id, +            self._insert_doc_from_target, ensure_callback=ensure_callback, +            sync_state=sync_state) + +        # save sync state info if the sync was interrupted +        if new_gen is None and new_trans_id is None: +            sync_state.save() +            return my_gen + +        # sync exchange was succesfull, remove sync state info from source +        sync_state.clear() + +        # record target synced-up-to generation including applying what we sent +        self.source._set_replica_gen_and_trans_id( +            self.target_replica_uid, new_gen, new_trans_id) +        # if gapless record current reached generation with target +        self._record_sync_info_with_the_target(my_gen) + +        return my_gen diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 7b77055c..06e79e63 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -1,6 +1,6 @@  # -*- coding: utf-8 -*-  # target.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013, 2014 LEAP  #  # This program is free software: you can redistribute it and/or modify  # it under the terms of the GNU General Public License as published by @@ -25,6 +25,7 @@ import hashlib  import hmac  import logging  import urllib +import threading  import simplejson as json  from time import sleep @@ -313,6 +314,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          """          HTTPSyncTarget.__init__(self, url, creds)          self._crypto = crypto +        self._stopped = True +        self._sync_state = None +        self._stop_lock = threading.Lock()      def _init_post_request(self, url, action, headers, content_length):          """ @@ -367,13 +371,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          :rtype: list of str          """ -        def _post_get_doc(received): +        def _post_get_doc():              """              Get a sync document from server by means of a POST request. - -            :param received: How many documents have already been received in -                             this sync session. -            :type received: int              """              entries = ['[']              size = 1 @@ -384,7 +384,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):                  last_known_trans_id=last_known_trans_id,                  ensure=ensure_callback is not None)              # inform server of how many documents have already been received -            size += self._prepare(',', entries, received=received) +            size += self._prepare( +                ',', entries, received=self._sync_state.received)              entries.append('\r\n]')              size += len(entries[-1])              # send headers @@ -394,13 +395,16 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):                  self._conn.send(entry)              return self._response() -        received = 0          number_of_changes = None -        while number_of_changes is None or received < number_of_changes: +        while number_of_changes is None or \ +                self._sync_state.received < number_of_changes: +            # bail out if sync process was interrupted +            if self.stopped is True: +                return None, None              # try to fetch one document from target -            data, _ = _post_get_doc(received) -            received += 1 +            data, _ = _post_get_doc() +            self._sync_state.received += 1              # decode incoming stream              entries = None              try: @@ -545,7 +549,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):      def sync_exchange(self, docs_by_generations, source_replica_uid,                        last_known_generation, last_known_trans_id, -                      return_doc_cb, ensure_callback=None): +                      return_doc_cb, ensure_callback=None, +                      sync_state=None):          """          Find out which documents the remote database does not know about,          encrypt and send them. @@ -574,6 +579,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          :return: The new generation and transaction id of the target replica.          :rtype: tuple          """ +        self.start() +        self._sync_state = sync_state +          self._ensure_connection()          if self._trace_hook:  # for tests              self._trace_hook('sync_exchange') @@ -616,7 +624,15 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          cur_target_gen = last_known_generation          cur_target_trans_id = last_known_trans_id +        # skip docs that were already sent +        if self._sync_state.sent > 0: +            docs_by_generations = docs_by_generations[self._sync_state.sent:] + +        # send docs          for doc, gen, trans_id in docs_by_generations: +            # allow for interrupting the sync process +            if self.stopped is True: +                break              # skip non-syncable docs              if isinstance(doc, SoledadDocument) and not doc.syncable:                  continue @@ -632,10 +648,41 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):              cur_target_gen, cur_target_trans_id = _post_put_doc(                  headers, cur_target_gen, cur_target_trans_id, id=doc.doc_id,                  rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id) +            self._sync_state.sent += 1 +        # get docs from target          cur_target_gen, cur_target_trans_id = self._get_remote_docs(              url,              last_known_generation, last_known_trans_id, headers,              return_doc_cb, ensure_callback) - +        self.stop()          return cur_target_gen, cur_target_trans_id + +    def start(self): +        """ +        Mark current sync session as running. +        """ +        with self._stop_lock: +            self._stopped = False + +    def stop(self): +        """ +        Mark current sync session as stopped. + +        This will eventually interrupt the sync_exchange() method and return +        enough information to the synchronizer so the sync session can be +        recovered afterwards. +        """ +        with self._stop_lock: +            self._stopped = True + +    @property +    def stopped(self): +        """ +        Return wether this sync session is stopped. + +        :return: Wether this sync session is stopped. +        :rtype: bool +        """ +        with self._stop_lock: +            return self._stopped is True diff --git a/common/src/leap/soledad/common/tests/test_sync.py b/common/src/leap/soledad/common/tests/test_sync.py new file mode 100644 index 00000000..fd4a2797 --- /dev/null +++ b/common/src/leap/soledad/common/tests/test_sync.py @@ -0,0 +1,176 @@ +# -*- coding: utf-8 -*- +# test_sync.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import mock +import os +import json +import tempfile +import threading +import time +from urlparse import urljoin + +from leap.soledad.common.couch import ( +    CouchServerState, +    CouchDatabase, +) + +from leap.soledad.common.tests.u1db_tests import ( +    TestCaseWithServer, +    simple_doc, +) +from leap.soledad.common.tests.test_couch import CouchDBTestCase +from leap.soledad.common.tests.test_target import ( +    make_token_soledad_app, +    make_leap_document_for_test, +    token_leap_sync_target, +) + +from leap.soledad.client import ( +    Soledad, +    target, +) + + +class InterruptableSyncTestCase( +        CouchDBTestCase, TestCaseWithServer): +    """ +    Tests for encrypted sync using Soledad server backed by a couch database. +    """ + +    @staticmethod +    def make_app_with_state(state): +        return make_token_soledad_app(state) + +    make_document_for_test = make_leap_document_for_test + +    sync_target = token_leap_sync_target + +    def _soledad_instance(self, user='user-uuid', passphrase=u'123', +                          prefix='', +                          secrets_path=Soledad.STORAGE_SECRETS_FILE_NAME, +                          local_db_path='soledad.u1db', server_url='', +                          cert_file=None, auth_token=None, secret_id=None): +        """ +        Instantiate Soledad. +        """ + +        # this callback ensures we save a document which is sent to the shared +        # db. +        def _put_doc_side_effect(doc): +            self._doc_put = doc + +        # we need a mocked shared db or else Soledad will try to access the +        # network to find if there are uploaded secrets. +        class MockSharedDB(object): + +            get_doc = mock.Mock(return_value=None) +            put_doc = mock.Mock(side_effect=_put_doc_side_effect) +            lock = mock.Mock(return_value=('atoken', 300)) +            unlock = mock.Mock() + +            def __call__(self): +                return self + +        Soledad._shared_db = MockSharedDB() +        return Soledad( +            user, +            passphrase, +            secrets_path=os.path.join(self.tempdir, prefix, secrets_path), +            local_db_path=os.path.join( +                self.tempdir, prefix, local_db_path), +            server_url=server_url, +            cert_file=cert_file, +            auth_token=auth_token, +            secret_id=secret_id) + +    def make_app(self): +        self.request_state = CouchServerState(self._couch_url, 'shared', +                                              'tokens') +        return self.make_app_with_state(self.request_state) + +    def setUp(self): +        TestCaseWithServer.setUp(self) +        CouchDBTestCase.setUp(self) +        self.tempdir = tempfile.mkdtemp(prefix="leap_tests-") +        self._couch_url = 'http://localhost:' + str(self.wrapper.port) + +    def tearDown(self): +        CouchDBTestCase.tearDown(self) +        TestCaseWithServer.tearDown(self) + +    def test_interruptable_sync(self): +        """ +        Test if Soledad can sync many smallfiles. +        """ + +        class _SyncInterruptor(threading.Thread): +            """ +            A thread meant to interrupt the sync process. +            """ +             +            def __init__(self, soledad, couchdb): +                self._soledad = soledad +                self._couchdb = couchdb +                threading.Thread.__init__(self) + +            def run(self): +                while db._get_generation() < 2: +                    time.sleep(1) +                self._soledad.stop_sync() +                time.sleep(1) + +        number_of_docs = 10 +        self.startServer() + +        # instantiate soledad and create a document +        sol = self._soledad_instance( +            # token is verified in test_target.make_token_soledad_app +            auth_token='auth-token' +        ) +        _, doclist = sol.get_all_docs() +        self.assertEqual([], doclist) + +        # create many small files +        for i in range(0, number_of_docs): +            sol.create_doc(json.loads(simple_doc)) + +        # ensure remote db exists before syncing +        db = CouchDatabase.open_database( +            urljoin(self._couch_url, 'user-user-uuid'), +            create=True, +            ensure_ddocs=True) + +        # create interruptor thread +        t = _SyncInterruptor(sol, db) +        t.start() + +        # sync with server +        sol._server_url = self.getURL() +        sol.sync()  # this will be interrupted when couch db gen >= 2 +        t.join() + +        # recover the sync process +        sol.sync() + +        gen, doclist = db.get_all_docs() +        self.assertEqual(number_of_docs, len(doclist)) + +        # delete remote database +        db.delete_database() +        db.close() +        sol.close() diff --git a/server/changes/feature_5571_allow-for-interrupting-and-recovering-sync b/server/changes/feature_5571_allow-for-interrupting-and-recovering-sync new file mode 100644 index 00000000..0087c535 --- /dev/null +++ b/server/changes/feature_5571_allow-for-interrupting-and-recovering-sync @@ -0,0 +1 @@ +  o Allow for interrupting and recovering sync (#5517). | 
