diff options
author | drebs <drebs@leap.se> | 2014-04-30 12:52:24 -0300 |
---|---|---|
committer | drebs <drebs@leap.se> | 2014-05-22 18:44:03 -0300 |
commit | 73d431a035fcdce8d623eefde2d62f28687fdb36 (patch) | |
tree | 03822bf442514677af15fbae1d62b7196b45f5aa | |
parent | ad748eb838a15b0263fdf18813404d3bee58cd03 (diff) |
Allow for interrupting and recovering sync (#5571).
-rw-r--r-- | client/changes/feature_5571_allow-for-interrupting-and-recovering-sync | 1 | ||||
-rw-r--r-- | client/src/leap/soledad/client/__init__.py | 7 | ||||
-rw-r--r-- | client/src/leap/soledad/client/sqlcipher.py | 52 | ||||
-rw-r--r-- | client/src/leap/soledad/client/sync.py | 262 | ||||
-rw-r--r-- | client/src/leap/soledad/client/target.py | 73 | ||||
-rw-r--r-- | common/src/leap/soledad/common/tests/test_sync.py | 176 | ||||
-rw-r--r-- | server/changes/feature_5571_allow-for-interrupting-and-recovering-sync | 1 |
7 files changed, 556 insertions, 16 deletions
diff --git a/client/changes/feature_5571_allow-for-interrupting-and-recovering-sync b/client/changes/feature_5571_allow-for-interrupting-and-recovering-sync new file mode 100644 index 00000000..0087c535 --- /dev/null +++ b/client/changes/feature_5571_allow-for-interrupting-and-recovering-sync @@ -0,0 +1 @@ + o Allow for interrupting and recovering sync (#5517). diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py index 8881b422..f92317e9 100644 --- a/client/src/leap/soledad/client/__init__.py +++ b/client/src/leap/soledad/client/__init__.py @@ -1096,6 +1096,13 @@ class Soledad(object): signal(SOLEDAD_DONE_DATA_SYNC, self._uuid) return local_gen + def stop_sync(self): + """ + Stop the current syncing process. + """ + if self._db: + self._db.stop_sync() + def need_sync(self, url): """ Return if local db replica differs from remote url's replica. diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 04f8ebf9..0885a35f 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -55,14 +55,14 @@ from contextlib import contextmanager from pysqlcipher import dbapi2 from u1db.backends import sqlite_backend -from u1db.sync import Synchronizer from u1db import errors as u1db_errors +from leap.soledad.client.sync import Synchronizer from leap.soledad.client.target import SoledadSyncTarget from leap.soledad.common.document import SoledadDocument -logger = logging.getLogger(__name__) +logger = logging.getLogger(__name__) # Monkey-patch u1db.backends.sqlite_backend with pysqlcipher.dbapi2 sqlite_backend.dbapi2 = dbapi2 @@ -214,6 +214,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): syncable=syncable) self.set_document_factory(factory) self._syncers = {} + self._real_sync_state = None @classmethod def _open_database(cls, sqlcipher_file, password, document_factory=None, @@ -359,6 +360,14 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): res = syncer.sync(autocreate=autocreate) return res + def stop_sync(self): + """ + Interrupt all ongoing syncs. + """ + for url in self._syncers: + _, syncer = self._syncers[url] + syncer.stop() + @contextmanager def syncer(self, url, creds=None): """ @@ -379,7 +388,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): :type creds: dict :return: A synchronizer. - :rtype: u1db.sync.Synchronizer + :rtype: Synchronizer """ # we want to store at most one syncer for each url, so we also store a # hash of the connection credentials and replace the stored syncer for @@ -881,5 +890,42 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): if self._db_handle is not None: self._db_handle.close() + def _get_sync_state(self): + """ + Get the current sync state. + + :return: The current sync state. + :rtype: dict + """ + if self._real_sync_state is not None: + return self._real_sync_state + c = self._db_handle.cursor() + c.execute("SELECT value FROM u1db_config" + " WHERE name = 'sync_state'") + val = c.fetchone() + if val is None: + return None + self._real_sync_state = json.loads(val[0]) + return self._real_sync_state + + def _set_sync_state(self, state): + """ + Set the current sync state. + + :param state: The sync state to be set. + :type state: dict + """ + c = self._db_handle.cursor() + if state is None: + c.execute("DELETE FROM u1db_config" + " WHERE name = 'sync_state'") + else: + c.execute("INSERT OR REPLACE INTO u1db_config" + " VALUES ('sync_state', ?)", + (json.dumps(state),)) + self._real_sync_state = state + + sync_state = property( + _get_sync_state, _set_sync_state, doc="The current sync state.") sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase) diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py new file mode 100644 index 00000000..6e9e23fa --- /dev/null +++ b/client/src/leap/soledad/client/sync.py @@ -0,0 +1,262 @@ +# -*- coding: utf-8 -*- +# sync.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Sync infrastructure that can be interrupted and recovered. +""" + +import json + + +from u1db import errors +from u1db.sync import Synchronizer as U1DBSynchronizer + + +class ClientSyncState(object): + """ + The state of the current sync session, as stored on the client. + """ + + _private_attrs = [ + '_db', + ] + + _public_attrs = { + 'target_replica_uid': None, + 'target_gen': None, + 'target_trans_id': None, + 'target_my_gen': None, + 'target_my_trans_id': None, + 'target_last_known_gen': None, + 'target_last_known_trans_id': None, + 'my_gen': None, + 'changes': None, + 'sent': 0, + 'received': 0, + } + + @property + def _public_attr_keys(self): + return [k for k in self._public_attrs] + + def __init__(self, db=None): + """ + Initialize the client sync state. + + :param db: The database where to fetch/store the sync state. + :type db: SQLCipherDatabase + """ + self._db = db + self._init_state() + + def __setattr__(self, attr, val): + """ + Prevent setting arbitrary attributes. + + :param attr: The attribute name. + :type attr: str + :param val: The value to be set. + :type val: anything + """ + if attr not in self._public_attr_keys + self._private_attrs: + raise Exception + object.__setattr__(self, attr, val) + + def _init_state(self): + """ + Initialize current sync state, potentially fetching sync info stored + in database. + """ + # set local default attributes + for attr in self._public_attr_keys: + setattr(self, attr, self._public_attrs[attr]) + # fetch info from stored sync state + sync_state = None + if self._db is not None: + sync_state = self._db.sync_state + if sync_state is not None: + for attr in self._public_attr_keys: + setattr(self, attr, sync_state[attr]) + + def save(self): + """ + Save the current sync state in the database. + """ + sync_state = {} + for attr in self._public_attr_keys: + sync_state[attr] = getattr(self, attr) + if self._db is not None: + self._db.sync_state = sync_state + + def clear(self): + """ + Clear the sync state info data. + """ + if self._db is not None: + self._db.sync_state = None + self._init_state() + + def has_stored_info(self): + """ + Return wether there is any sync state info stored on the database. + + :return: Wether there's any sync state info store on db. + :rtype: bool + """ + return self._db is not None and self._db.sync_state is not None + + def __str__(self): + ', '.join(['%s: %s' % (k, getattr(self, k)) for k in self._public_attr_keys]) + +class Synchronizer(U1DBSynchronizer): + """ + Collect the state around synchronizing 2 U1DB replicas. + + Modified to allow for interrupting the synchronization process. + """ + + def stop(self): + """ + Stop the current sync in progress. + """ + self.sync_target.stop() + + def sync(self, autocreate=False): + """ + Synchronize documents between source and target. + + :param autocreate: Wether the target replica should be created or not. + :type autocreate: bool + """ + sync_target = self.sync_target + + # recover current sync state from source database + sync_state = ClientSyncState(self.source) + self.target_replica_uid = sync_state.target_replica_uid + target_gen = sync_state.target_gen + target_trans_id = sync_state.target_trans_id + target_my_gen = sync_state.target_my_gen + target_my_trans_id = sync_state.target_my_trans_id + target_last_known_gen = sync_state.target_last_known_gen + target_last_known_trans_id = \ + sync_state.target_last_known_trans_id + my_gen = sync_state.my_gen + changes = sync_state.changes + sent = sync_state.sent + received = sync_state.received + + # get target identifier, its current generation, + # and its last-seen database generation for this source + ensure_callback = None + if not sync_state.has_stored_info(): + try: + (self.target_replica_uid, target_gen, target_trans_id, + target_my_gen, target_my_trans_id) = \ + sync_target.get_sync_info(self.source._replica_uid) + except errors.DatabaseDoesNotExist: + if not autocreate: + raise + # will try to ask sync_exchange() to create the db + self.target_replica_uid = None + target_gen, target_trans_id = 0, '' + target_my_gen, target_my_trans_id = 0, '' + + # make sure we'll have access to target replica uid once it exists + if self.target_replica_uid is None: + + def ensure_callback(replica_uid): + self.target_replica_uid = replica_uid + + # make sure we're not syncing one replica with itself + if self.target_replica_uid == self.source._replica_uid: + raise errors.InvalidReplicaUID + + # validate the info the target has about the source replica + self.source.validate_gen_and_trans_id( + target_my_gen, target_my_trans_id) + + # what's changed since that generation and this current gen + if not sync_state.has_stored_info(): + my_gen, _, changes = self.source.whats_changed(target_my_gen) + + # get source last-seen database generation for the target + if not sync_state.has_stored_info(): + if self.target_replica_uid is None: + target_last_known_gen, target_last_known_trans_id = 0, '' + else: + target_last_known_gen, target_last_known_trans_id = \ + self.source._get_replica_gen_and_trans_id( + self.target_replica_uid) + + # validate transaction ids + if not changes and target_last_known_gen == target_gen: + if target_trans_id != target_last_known_trans_id: + raise errors.InvalidTransactionId + return my_gen + + # prepare to send all the changed docs + changed_doc_ids = [doc_id for doc_id, _, _ in changes] + docs_to_send = self.source.get_docs( + changed_doc_ids, check_for_conflicts=False, include_deleted=True) + docs_by_generation = [] + idx = 0 + for doc in docs_to_send: + _, gen, trans = changes[idx] + docs_by_generation.append((doc, gen, trans)) + idx += 1 + # store current sync state info + if not sync_state.has_stored_info(): + sync_state.target_replica_uid = self.target_replica_uid + sync_state.target_gen = target_gen + sync_state.target_trans_id = target_trans_id + sync_state.target_my_gen = target_my_gen + sync_state.target_my_trans_id = target_my_trans_id + sync_state.my_gen = my_gen + sync_state.changes = changes + sync_state.target_last_known_trans_id = \ + target_last_known_trans_id + sync_state.target_last_known_gen = target_last_known_gen + sync_state.sent = sent = 0 + sync_state.received = received = 0 + + # exchange documents and try to insert the returned ones with + # the target, return target synced-up-to gen. + # + # The sync_exchange method may be interrupted, in which case it will + # return a tuple of Nones. + new_gen, new_trans_id = sync_target.sync_exchange( + docs_by_generation, self.source._replica_uid, + target_last_known_gen, target_last_known_trans_id, + self._insert_doc_from_target, ensure_callback=ensure_callback, + sync_state=sync_state) + + # save sync state info if the sync was interrupted + if new_gen is None and new_trans_id is None: + sync_state.save() + return my_gen + + # sync exchange was succesfull, remove sync state info from source + sync_state.clear() + + # record target synced-up-to generation including applying what we sent + self.source._set_replica_gen_and_trans_id( + self.target_replica_uid, new_gen, new_trans_id) + # if gapless record current reached generation with target + self._record_sync_info_with_the_target(my_gen) + + return my_gen diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 7b77055c..06e79e63 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # target.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013, 2014 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -25,6 +25,7 @@ import hashlib import hmac import logging import urllib +import threading import simplejson as json from time import sleep @@ -313,6 +314,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ HTTPSyncTarget.__init__(self, url, creds) self._crypto = crypto + self._stopped = True + self._sync_state = None + self._stop_lock = threading.Lock() def _init_post_request(self, url, action, headers, content_length): """ @@ -367,13 +371,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :rtype: list of str """ - def _post_get_doc(received): + def _post_get_doc(): """ Get a sync document from server by means of a POST request. - - :param received: How many documents have already been received in - this sync session. - :type received: int """ entries = ['['] size = 1 @@ -384,7 +384,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): last_known_trans_id=last_known_trans_id, ensure=ensure_callback is not None) # inform server of how many documents have already been received - size += self._prepare(',', entries, received=received) + size += self._prepare( + ',', entries, received=self._sync_state.received) entries.append('\r\n]') size += len(entries[-1]) # send headers @@ -394,13 +395,16 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self._conn.send(entry) return self._response() - received = 0 number_of_changes = None - while number_of_changes is None or received < number_of_changes: + while number_of_changes is None or \ + self._sync_state.received < number_of_changes: + # bail out if sync process was interrupted + if self.stopped is True: + return None, None # try to fetch one document from target - data, _ = _post_get_doc(received) - received += 1 + data, _ = _post_get_doc() + self._sync_state.received += 1 # decode incoming stream entries = None try: @@ -545,7 +549,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): def sync_exchange(self, docs_by_generations, source_replica_uid, last_known_generation, last_known_trans_id, - return_doc_cb, ensure_callback=None): + return_doc_cb, ensure_callback=None, + sync_state=None): """ Find out which documents the remote database does not know about, encrypt and send them. @@ -574,6 +579,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :return: The new generation and transaction id of the target replica. :rtype: tuple """ + self.start() + self._sync_state = sync_state + self._ensure_connection() if self._trace_hook: # for tests self._trace_hook('sync_exchange') @@ -616,7 +624,15 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): cur_target_gen = last_known_generation cur_target_trans_id = last_known_trans_id + # skip docs that were already sent + if self._sync_state.sent > 0: + docs_by_generations = docs_by_generations[self._sync_state.sent:] + + # send docs for doc, gen, trans_id in docs_by_generations: + # allow for interrupting the sync process + if self.stopped is True: + break # skip non-syncable docs if isinstance(doc, SoledadDocument) and not doc.syncable: continue @@ -632,10 +648,41 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): cur_target_gen, cur_target_trans_id = _post_put_doc( headers, cur_target_gen, cur_target_trans_id, id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id) + self._sync_state.sent += 1 + # get docs from target cur_target_gen, cur_target_trans_id = self._get_remote_docs( url, last_known_generation, last_known_trans_id, headers, return_doc_cb, ensure_callback) - + self.stop() return cur_target_gen, cur_target_trans_id + + def start(self): + """ + Mark current sync session as running. + """ + with self._stop_lock: + self._stopped = False + + def stop(self): + """ + Mark current sync session as stopped. + + This will eventually interrupt the sync_exchange() method and return + enough information to the synchronizer so the sync session can be + recovered afterwards. + """ + with self._stop_lock: + self._stopped = True + + @property + def stopped(self): + """ + Return wether this sync session is stopped. + + :return: Wether this sync session is stopped. + :rtype: bool + """ + with self._stop_lock: + return self._stopped is True diff --git a/common/src/leap/soledad/common/tests/test_sync.py b/common/src/leap/soledad/common/tests/test_sync.py new file mode 100644 index 00000000..fd4a2797 --- /dev/null +++ b/common/src/leap/soledad/common/tests/test_sync.py @@ -0,0 +1,176 @@ +# -*- coding: utf-8 -*- +# test_sync.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import mock +import os +import json +import tempfile +import threading +import time +from urlparse import urljoin + +from leap.soledad.common.couch import ( + CouchServerState, + CouchDatabase, +) + +from leap.soledad.common.tests.u1db_tests import ( + TestCaseWithServer, + simple_doc, +) +from leap.soledad.common.tests.test_couch import CouchDBTestCase +from leap.soledad.common.tests.test_target import ( + make_token_soledad_app, + make_leap_document_for_test, + token_leap_sync_target, +) + +from leap.soledad.client import ( + Soledad, + target, +) + + +class InterruptableSyncTestCase( + CouchDBTestCase, TestCaseWithServer): + """ + Tests for encrypted sync using Soledad server backed by a couch database. + """ + + @staticmethod + def make_app_with_state(state): + return make_token_soledad_app(state) + + make_document_for_test = make_leap_document_for_test + + sync_target = token_leap_sync_target + + def _soledad_instance(self, user='user-uuid', passphrase=u'123', + prefix='', + secrets_path=Soledad.STORAGE_SECRETS_FILE_NAME, + local_db_path='soledad.u1db', server_url='', + cert_file=None, auth_token=None, secret_id=None): + """ + Instantiate Soledad. + """ + + # this callback ensures we save a document which is sent to the shared + # db. + def _put_doc_side_effect(doc): + self._doc_put = doc + + # we need a mocked shared db or else Soledad will try to access the + # network to find if there are uploaded secrets. + class MockSharedDB(object): + + get_doc = mock.Mock(return_value=None) + put_doc = mock.Mock(side_effect=_put_doc_side_effect) + lock = mock.Mock(return_value=('atoken', 300)) + unlock = mock.Mock() + + def __call__(self): + return self + + Soledad._shared_db = MockSharedDB() + return Soledad( + user, + passphrase, + secrets_path=os.path.join(self.tempdir, prefix, secrets_path), + local_db_path=os.path.join( + self.tempdir, prefix, local_db_path), + server_url=server_url, + cert_file=cert_file, + auth_token=auth_token, + secret_id=secret_id) + + def make_app(self): + self.request_state = CouchServerState(self._couch_url, 'shared', + 'tokens') + return self.make_app_with_state(self.request_state) + + def setUp(self): + TestCaseWithServer.setUp(self) + CouchDBTestCase.setUp(self) + self.tempdir = tempfile.mkdtemp(prefix="leap_tests-") + self._couch_url = 'http://localhost:' + str(self.wrapper.port) + + def tearDown(self): + CouchDBTestCase.tearDown(self) + TestCaseWithServer.tearDown(self) + + def test_interruptable_sync(self): + """ + Test if Soledad can sync many smallfiles. + """ + + class _SyncInterruptor(threading.Thread): + """ + A thread meant to interrupt the sync process. + """ + + def __init__(self, soledad, couchdb): + self._soledad = soledad + self._couchdb = couchdb + threading.Thread.__init__(self) + + def run(self): + while db._get_generation() < 2: + time.sleep(1) + self._soledad.stop_sync() + time.sleep(1) + + number_of_docs = 10 + self.startServer() + + # instantiate soledad and create a document + sol = self._soledad_instance( + # token is verified in test_target.make_token_soledad_app + auth_token='auth-token' + ) + _, doclist = sol.get_all_docs() + self.assertEqual([], doclist) + + # create many small files + for i in range(0, number_of_docs): + sol.create_doc(json.loads(simple_doc)) + + # ensure remote db exists before syncing + db = CouchDatabase.open_database( + urljoin(self._couch_url, 'user-user-uuid'), + create=True, + ensure_ddocs=True) + + # create interruptor thread + t = _SyncInterruptor(sol, db) + t.start() + + # sync with server + sol._server_url = self.getURL() + sol.sync() # this will be interrupted when couch db gen >= 2 + t.join() + + # recover the sync process + sol.sync() + + gen, doclist = db.get_all_docs() + self.assertEqual(number_of_docs, len(doclist)) + + # delete remote database + db.delete_database() + db.close() + sol.close() diff --git a/server/changes/feature_5571_allow-for-interrupting-and-recovering-sync b/server/changes/feature_5571_allow-for-interrupting-and-recovering-sync new file mode 100644 index 00000000..0087c535 --- /dev/null +++ b/server/changes/feature_5571_allow-for-interrupting-and-recovering-sync @@ -0,0 +1 @@ + o Allow for interrupting and recovering sync (#5517). |