summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/changes/feature_5571_allow-for-interrupting-and-recovering-sync1
-rw-r--r--client/src/leap/soledad/client/__init__.py7
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py52
-rw-r--r--client/src/leap/soledad/client/sync.py262
-rw-r--r--client/src/leap/soledad/client/target.py73
-rw-r--r--common/src/leap/soledad/common/tests/test_sync.py176
-rw-r--r--server/changes/feature_5571_allow-for-interrupting-and-recovering-sync1
7 files changed, 556 insertions, 16 deletions
diff --git a/client/changes/feature_5571_allow-for-interrupting-and-recovering-sync b/client/changes/feature_5571_allow-for-interrupting-and-recovering-sync
new file mode 100644
index 00000000..0087c535
--- /dev/null
+++ b/client/changes/feature_5571_allow-for-interrupting-and-recovering-sync
@@ -0,0 +1 @@
+ o Allow for interrupting and recovering sync (#5517).
diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py
index 8881b422..f92317e9 100644
--- a/client/src/leap/soledad/client/__init__.py
+++ b/client/src/leap/soledad/client/__init__.py
@@ -1096,6 +1096,13 @@ class Soledad(object):
signal(SOLEDAD_DONE_DATA_SYNC, self._uuid)
return local_gen
+ def stop_sync(self):
+ """
+ Stop the current syncing process.
+ """
+ if self._db:
+ self._db.stop_sync()
+
def need_sync(self, url):
"""
Return if local db replica differs from remote url's replica.
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index 04f8ebf9..0885a35f 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -55,14 +55,14 @@ from contextlib import contextmanager
from pysqlcipher import dbapi2
from u1db.backends import sqlite_backend
-from u1db.sync import Synchronizer
from u1db import errors as u1db_errors
+from leap.soledad.client.sync import Synchronizer
from leap.soledad.client.target import SoledadSyncTarget
from leap.soledad.common.document import SoledadDocument
-logger = logging.getLogger(__name__)
+logger = logging.getLogger(__name__)
# Monkey-patch u1db.backends.sqlite_backend with pysqlcipher.dbapi2
sqlite_backend.dbapi2 = dbapi2
@@ -214,6 +214,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
syncable=syncable)
self.set_document_factory(factory)
self._syncers = {}
+ self._real_sync_state = None
@classmethod
def _open_database(cls, sqlcipher_file, password, document_factory=None,
@@ -359,6 +360,14 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
res = syncer.sync(autocreate=autocreate)
return res
+ def stop_sync(self):
+ """
+ Interrupt all ongoing syncs.
+ """
+ for url in self._syncers:
+ _, syncer = self._syncers[url]
+ syncer.stop()
+
@contextmanager
def syncer(self, url, creds=None):
"""
@@ -379,7 +388,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
:type creds: dict
:return: A synchronizer.
- :rtype: u1db.sync.Synchronizer
+ :rtype: Synchronizer
"""
# we want to store at most one syncer for each url, so we also store a
# hash of the connection credentials and replace the stored syncer for
@@ -881,5 +890,42 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
if self._db_handle is not None:
self._db_handle.close()
+ def _get_sync_state(self):
+ """
+ Get the current sync state.
+
+ :return: The current sync state.
+ :rtype: dict
+ """
+ if self._real_sync_state is not None:
+ return self._real_sync_state
+ c = self._db_handle.cursor()
+ c.execute("SELECT value FROM u1db_config"
+ " WHERE name = 'sync_state'")
+ val = c.fetchone()
+ if val is None:
+ return None
+ self._real_sync_state = json.loads(val[0])
+ return self._real_sync_state
+
+ def _set_sync_state(self, state):
+ """
+ Set the current sync state.
+
+ :param state: The sync state to be set.
+ :type state: dict
+ """
+ c = self._db_handle.cursor()
+ if state is None:
+ c.execute("DELETE FROM u1db_config"
+ " WHERE name = 'sync_state'")
+ else:
+ c.execute("INSERT OR REPLACE INTO u1db_config"
+ " VALUES ('sync_state', ?)",
+ (json.dumps(state),))
+ self._real_sync_state = state
+
+ sync_state = property(
+ _get_sync_state, _set_sync_state, doc="The current sync state.")
sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase)
diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py
new file mode 100644
index 00000000..6e9e23fa
--- /dev/null
+++ b/client/src/leap/soledad/client/sync.py
@@ -0,0 +1,262 @@
+# -*- coding: utf-8 -*-
+# sync.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+Sync infrastructure that can be interrupted and recovered.
+"""
+
+import json
+
+
+from u1db import errors
+from u1db.sync import Synchronizer as U1DBSynchronizer
+
+
+class ClientSyncState(object):
+ """
+ The state of the current sync session, as stored on the client.
+ """
+
+ _private_attrs = [
+ '_db',
+ ]
+
+ _public_attrs = {
+ 'target_replica_uid': None,
+ 'target_gen': None,
+ 'target_trans_id': None,
+ 'target_my_gen': None,
+ 'target_my_trans_id': None,
+ 'target_last_known_gen': None,
+ 'target_last_known_trans_id': None,
+ 'my_gen': None,
+ 'changes': None,
+ 'sent': 0,
+ 'received': 0,
+ }
+
+ @property
+ def _public_attr_keys(self):
+ return [k for k in self._public_attrs]
+
+ def __init__(self, db=None):
+ """
+ Initialize the client sync state.
+
+ :param db: The database where to fetch/store the sync state.
+ :type db: SQLCipherDatabase
+ """
+ self._db = db
+ self._init_state()
+
+ def __setattr__(self, attr, val):
+ """
+ Prevent setting arbitrary attributes.
+
+ :param attr: The attribute name.
+ :type attr: str
+ :param val: The value to be set.
+ :type val: anything
+ """
+ if attr not in self._public_attr_keys + self._private_attrs:
+ raise Exception
+ object.__setattr__(self, attr, val)
+
+ def _init_state(self):
+ """
+ Initialize current sync state, potentially fetching sync info stored
+ in database.
+ """
+ # set local default attributes
+ for attr in self._public_attr_keys:
+ setattr(self, attr, self._public_attrs[attr])
+ # fetch info from stored sync state
+ sync_state = None
+ if self._db is not None:
+ sync_state = self._db.sync_state
+ if sync_state is not None:
+ for attr in self._public_attr_keys:
+ setattr(self, attr, sync_state[attr])
+
+ def save(self):
+ """
+ Save the current sync state in the database.
+ """
+ sync_state = {}
+ for attr in self._public_attr_keys:
+ sync_state[attr] = getattr(self, attr)
+ if self._db is not None:
+ self._db.sync_state = sync_state
+
+ def clear(self):
+ """
+ Clear the sync state info data.
+ """
+ if self._db is not None:
+ self._db.sync_state = None
+ self._init_state()
+
+ def has_stored_info(self):
+ """
+ Return wether there is any sync state info stored on the database.
+
+ :return: Wether there's any sync state info store on db.
+ :rtype: bool
+ """
+ return self._db is not None and self._db.sync_state is not None
+
+ def __str__(self):
+ ', '.join(['%s: %s' % (k, getattr(self, k)) for k in self._public_attr_keys])
+
+class Synchronizer(U1DBSynchronizer):
+ """
+ Collect the state around synchronizing 2 U1DB replicas.
+
+ Modified to allow for interrupting the synchronization process.
+ """
+
+ def stop(self):
+ """
+ Stop the current sync in progress.
+ """
+ self.sync_target.stop()
+
+ def sync(self, autocreate=False):
+ """
+ Synchronize documents between source and target.
+
+ :param autocreate: Wether the target replica should be created or not.
+ :type autocreate: bool
+ """
+ sync_target = self.sync_target
+
+ # recover current sync state from source database
+ sync_state = ClientSyncState(self.source)
+ self.target_replica_uid = sync_state.target_replica_uid
+ target_gen = sync_state.target_gen
+ target_trans_id = sync_state.target_trans_id
+ target_my_gen = sync_state.target_my_gen
+ target_my_trans_id = sync_state.target_my_trans_id
+ target_last_known_gen = sync_state.target_last_known_gen
+ target_last_known_trans_id = \
+ sync_state.target_last_known_trans_id
+ my_gen = sync_state.my_gen
+ changes = sync_state.changes
+ sent = sync_state.sent
+ received = sync_state.received
+
+ # get target identifier, its current generation,
+ # and its last-seen database generation for this source
+ ensure_callback = None
+ if not sync_state.has_stored_info():
+ try:
+ (self.target_replica_uid, target_gen, target_trans_id,
+ target_my_gen, target_my_trans_id) = \
+ sync_target.get_sync_info(self.source._replica_uid)
+ except errors.DatabaseDoesNotExist:
+ if not autocreate:
+ raise
+ # will try to ask sync_exchange() to create the db
+ self.target_replica_uid = None
+ target_gen, target_trans_id = 0, ''
+ target_my_gen, target_my_trans_id = 0, ''
+
+ # make sure we'll have access to target replica uid once it exists
+ if self.target_replica_uid is None:
+
+ def ensure_callback(replica_uid):
+ self.target_replica_uid = replica_uid
+
+ # make sure we're not syncing one replica with itself
+ if self.target_replica_uid == self.source._replica_uid:
+ raise errors.InvalidReplicaUID
+
+ # validate the info the target has about the source replica
+ self.source.validate_gen_and_trans_id(
+ target_my_gen, target_my_trans_id)
+
+ # what's changed since that generation and this current gen
+ if not sync_state.has_stored_info():
+ my_gen, _, changes = self.source.whats_changed(target_my_gen)
+
+ # get source last-seen database generation for the target
+ if not sync_state.has_stored_info():
+ if self.target_replica_uid is None:
+ target_last_known_gen, target_last_known_trans_id = 0, ''
+ else:
+ target_last_known_gen, target_last_known_trans_id = \
+ self.source._get_replica_gen_and_trans_id(
+ self.target_replica_uid)
+
+ # validate transaction ids
+ if not changes and target_last_known_gen == target_gen:
+ if target_trans_id != target_last_known_trans_id:
+ raise errors.InvalidTransactionId
+ return my_gen
+
+ # prepare to send all the changed docs
+ changed_doc_ids = [doc_id for doc_id, _, _ in changes]
+ docs_to_send = self.source.get_docs(
+ changed_doc_ids, check_for_conflicts=False, include_deleted=True)
+ docs_by_generation = []
+ idx = 0
+ for doc in docs_to_send:
+ _, gen, trans = changes[idx]
+ docs_by_generation.append((doc, gen, trans))
+ idx += 1
+ # store current sync state info
+ if not sync_state.has_stored_info():
+ sync_state.target_replica_uid = self.target_replica_uid
+ sync_state.target_gen = target_gen
+ sync_state.target_trans_id = target_trans_id
+ sync_state.target_my_gen = target_my_gen
+ sync_state.target_my_trans_id = target_my_trans_id
+ sync_state.my_gen = my_gen
+ sync_state.changes = changes
+ sync_state.target_last_known_trans_id = \
+ target_last_known_trans_id
+ sync_state.target_last_known_gen = target_last_known_gen
+ sync_state.sent = sent = 0
+ sync_state.received = received = 0
+
+ # exchange documents and try to insert the returned ones with
+ # the target, return target synced-up-to gen.
+ #
+ # The sync_exchange method may be interrupted, in which case it will
+ # return a tuple of Nones.
+ new_gen, new_trans_id = sync_target.sync_exchange(
+ docs_by_generation, self.source._replica_uid,
+ target_last_known_gen, target_last_known_trans_id,
+ self._insert_doc_from_target, ensure_callback=ensure_callback,
+ sync_state=sync_state)
+
+ # save sync state info if the sync was interrupted
+ if new_gen is None and new_trans_id is None:
+ sync_state.save()
+ return my_gen
+
+ # sync exchange was succesfull, remove sync state info from source
+ sync_state.clear()
+
+ # record target synced-up-to generation including applying what we sent
+ self.source._set_replica_gen_and_trans_id(
+ self.target_replica_uid, new_gen, new_trans_id)
+ # if gapless record current reached generation with target
+ self._record_sync_info_with_the_target(my_gen)
+
+ return my_gen
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
index 7b77055c..06e79e63 100644
--- a/client/src/leap/soledad/client/target.py
+++ b/client/src/leap/soledad/client/target.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# target.py
-# Copyright (C) 2013 LEAP
+# Copyright (C) 2013, 2014 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -25,6 +25,7 @@ import hashlib
import hmac
import logging
import urllib
+import threading
import simplejson as json
from time import sleep
@@ -313,6 +314,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"""
HTTPSyncTarget.__init__(self, url, creds)
self._crypto = crypto
+ self._stopped = True
+ self._sync_state = None
+ self._stop_lock = threading.Lock()
def _init_post_request(self, url, action, headers, content_length):
"""
@@ -367,13 +371,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:rtype: list of str
"""
- def _post_get_doc(received):
+ def _post_get_doc():
"""
Get a sync document from server by means of a POST request.
-
- :param received: How many documents have already been received in
- this sync session.
- :type received: int
"""
entries = ['[']
size = 1
@@ -384,7 +384,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
last_known_trans_id=last_known_trans_id,
ensure=ensure_callback is not None)
# inform server of how many documents have already been received
- size += self._prepare(',', entries, received=received)
+ size += self._prepare(
+ ',', entries, received=self._sync_state.received)
entries.append('\r\n]')
size += len(entries[-1])
# send headers
@@ -394,13 +395,16 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self._conn.send(entry)
return self._response()
- received = 0
number_of_changes = None
- while number_of_changes is None or received < number_of_changes:
+ while number_of_changes is None or \
+ self._sync_state.received < number_of_changes:
+ # bail out if sync process was interrupted
+ if self.stopped is True:
+ return None, None
# try to fetch one document from target
- data, _ = _post_get_doc(received)
- received += 1
+ data, _ = _post_get_doc()
+ self._sync_state.received += 1
# decode incoming stream
entries = None
try:
@@ -545,7 +549,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
def sync_exchange(self, docs_by_generations, source_replica_uid,
last_known_generation, last_known_trans_id,
- return_doc_cb, ensure_callback=None):
+ return_doc_cb, ensure_callback=None,
+ sync_state=None):
"""
Find out which documents the remote database does not know about,
encrypt and send them.
@@ -574,6 +579,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:return: The new generation and transaction id of the target replica.
:rtype: tuple
"""
+ self.start()
+ self._sync_state = sync_state
+
self._ensure_connection()
if self._trace_hook: # for tests
self._trace_hook('sync_exchange')
@@ -616,7 +624,15 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
cur_target_gen = last_known_generation
cur_target_trans_id = last_known_trans_id
+ # skip docs that were already sent
+ if self._sync_state.sent > 0:
+ docs_by_generations = docs_by_generations[self._sync_state.sent:]
+
+ # send docs
for doc, gen, trans_id in docs_by_generations:
+ # allow for interrupting the sync process
+ if self.stopped is True:
+ break
# skip non-syncable docs
if isinstance(doc, SoledadDocument) and not doc.syncable:
continue
@@ -632,10 +648,41 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
cur_target_gen, cur_target_trans_id = _post_put_doc(
headers, cur_target_gen, cur_target_trans_id, id=doc.doc_id,
rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id)
+ self._sync_state.sent += 1
+ # get docs from target
cur_target_gen, cur_target_trans_id = self._get_remote_docs(
url,
last_known_generation, last_known_trans_id, headers,
return_doc_cb, ensure_callback)
-
+ self.stop()
return cur_target_gen, cur_target_trans_id
+
+ def start(self):
+ """
+ Mark current sync session as running.
+ """
+ with self._stop_lock:
+ self._stopped = False
+
+ def stop(self):
+ """
+ Mark current sync session as stopped.
+
+ This will eventually interrupt the sync_exchange() method and return
+ enough information to the synchronizer so the sync session can be
+ recovered afterwards.
+ """
+ with self._stop_lock:
+ self._stopped = True
+
+ @property
+ def stopped(self):
+ """
+ Return wether this sync session is stopped.
+
+ :return: Wether this sync session is stopped.
+ :rtype: bool
+ """
+ with self._stop_lock:
+ return self._stopped is True
diff --git a/common/src/leap/soledad/common/tests/test_sync.py b/common/src/leap/soledad/common/tests/test_sync.py
new file mode 100644
index 00000000..fd4a2797
--- /dev/null
+++ b/common/src/leap/soledad/common/tests/test_sync.py
@@ -0,0 +1,176 @@
+# -*- coding: utf-8 -*-
+# test_sync.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import mock
+import os
+import json
+import tempfile
+import threading
+import time
+from urlparse import urljoin
+
+from leap.soledad.common.couch import (
+ CouchServerState,
+ CouchDatabase,
+)
+
+from leap.soledad.common.tests.u1db_tests import (
+ TestCaseWithServer,
+ simple_doc,
+)
+from leap.soledad.common.tests.test_couch import CouchDBTestCase
+from leap.soledad.common.tests.test_target import (
+ make_token_soledad_app,
+ make_leap_document_for_test,
+ token_leap_sync_target,
+)
+
+from leap.soledad.client import (
+ Soledad,
+ target,
+)
+
+
+class InterruptableSyncTestCase(
+ CouchDBTestCase, TestCaseWithServer):
+ """
+ Tests for encrypted sync using Soledad server backed by a couch database.
+ """
+
+ @staticmethod
+ def make_app_with_state(state):
+ return make_token_soledad_app(state)
+
+ make_document_for_test = make_leap_document_for_test
+
+ sync_target = token_leap_sync_target
+
+ def _soledad_instance(self, user='user-uuid', passphrase=u'123',
+ prefix='',
+ secrets_path=Soledad.STORAGE_SECRETS_FILE_NAME,
+ local_db_path='soledad.u1db', server_url='',
+ cert_file=None, auth_token=None, secret_id=None):
+ """
+ Instantiate Soledad.
+ """
+
+ # this callback ensures we save a document which is sent to the shared
+ # db.
+ def _put_doc_side_effect(doc):
+ self._doc_put = doc
+
+ # we need a mocked shared db or else Soledad will try to access the
+ # network to find if there are uploaded secrets.
+ class MockSharedDB(object):
+
+ get_doc = mock.Mock(return_value=None)
+ put_doc = mock.Mock(side_effect=_put_doc_side_effect)
+ lock = mock.Mock(return_value=('atoken', 300))
+ unlock = mock.Mock()
+
+ def __call__(self):
+ return self
+
+ Soledad._shared_db = MockSharedDB()
+ return Soledad(
+ user,
+ passphrase,
+ secrets_path=os.path.join(self.tempdir, prefix, secrets_path),
+ local_db_path=os.path.join(
+ self.tempdir, prefix, local_db_path),
+ server_url=server_url,
+ cert_file=cert_file,
+ auth_token=auth_token,
+ secret_id=secret_id)
+
+ def make_app(self):
+ self.request_state = CouchServerState(self._couch_url, 'shared',
+ 'tokens')
+ return self.make_app_with_state(self.request_state)
+
+ def setUp(self):
+ TestCaseWithServer.setUp(self)
+ CouchDBTestCase.setUp(self)
+ self.tempdir = tempfile.mkdtemp(prefix="leap_tests-")
+ self._couch_url = 'http://localhost:' + str(self.wrapper.port)
+
+ def tearDown(self):
+ CouchDBTestCase.tearDown(self)
+ TestCaseWithServer.tearDown(self)
+
+ def test_interruptable_sync(self):
+ """
+ Test if Soledad can sync many smallfiles.
+ """
+
+ class _SyncInterruptor(threading.Thread):
+ """
+ A thread meant to interrupt the sync process.
+ """
+
+ def __init__(self, soledad, couchdb):
+ self._soledad = soledad
+ self._couchdb = couchdb
+ threading.Thread.__init__(self)
+
+ def run(self):
+ while db._get_generation() < 2:
+ time.sleep(1)
+ self._soledad.stop_sync()
+ time.sleep(1)
+
+ number_of_docs = 10
+ self.startServer()
+
+ # instantiate soledad and create a document
+ sol = self._soledad_instance(
+ # token is verified in test_target.make_token_soledad_app
+ auth_token='auth-token'
+ )
+ _, doclist = sol.get_all_docs()
+ self.assertEqual([], doclist)
+
+ # create many small files
+ for i in range(0, number_of_docs):
+ sol.create_doc(json.loads(simple_doc))
+
+ # ensure remote db exists before syncing
+ db = CouchDatabase.open_database(
+ urljoin(self._couch_url, 'user-user-uuid'),
+ create=True,
+ ensure_ddocs=True)
+
+ # create interruptor thread
+ t = _SyncInterruptor(sol, db)
+ t.start()
+
+ # sync with server
+ sol._server_url = self.getURL()
+ sol.sync() # this will be interrupted when couch db gen >= 2
+ t.join()
+
+ # recover the sync process
+ sol.sync()
+
+ gen, doclist = db.get_all_docs()
+ self.assertEqual(number_of_docs, len(doclist))
+
+ # delete remote database
+ db.delete_database()
+ db.close()
+ sol.close()
diff --git a/server/changes/feature_5571_allow-for-interrupting-and-recovering-sync b/server/changes/feature_5571_allow-for-interrupting-and-recovering-sync
new file mode 100644
index 00000000..0087c535
--- /dev/null
+++ b/server/changes/feature_5571_allow-for-interrupting-and-recovering-sync
@@ -0,0 +1 @@
+ o Allow for interrupting and recovering sync (#5517).