From 7d9d827a5f66993863ca0c532c01ad3bf2c4353e Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 3 Jun 2014 18:43:56 -0300 Subject: Replace client sync state by a sync_id. --- client/src/leap/soledad/client/sqlcipher.py | 42 +---- client/src/leap/soledad/client/sync.py | 176 +++------------------ client/src/leap/soledad/client/target.py | 56 +++---- .../soledad/common/ddocs/syncs/updates/state.js | 10 +- .../ddocs/syncs/views/changes_to_return/map.js | 7 +- .../common/ddocs/syncs/views/seen_ids/map.js | 6 +- .../soledad/common/ddocs/syncs/views/state/map.js | 5 +- server/src/leap/soledad/server/sync.py | 28 ++-- 8 files changed, 84 insertions(+), 246 deletions(-) diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 74351116..26238af6 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -57,7 +57,7 @@ from pysqlcipher import dbapi2 from u1db.backends import sqlite_backend from u1db import errors as u1db_errors -from leap.soledad.client.sync import Synchronizer, ClientSyncState +from leap.soledad.client.sync import Synchronizer from leap.soledad.client.target import SoledadSyncTarget from leap.soledad.common.document import SoledadDocument @@ -889,45 +889,5 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): if self._db_handle is not None: self._db_handle.close() - def _get_stored_sync_state(self): - """ - Retrieve the currently stored sync state. - - :return: The current stored sync state or None if there's no stored - state. - :rtype: dict or None - """ - c = self._db_handle.cursor() - c.execute("SELECT value FROM u1db_config" - " WHERE name = 'sync_state'") - val = c.fetchone() - if val is None: - return None - return json.loads(val[0]) - - def _set_stored_sync_state(self, state): - """ - Stored the sync state. - - :param state: The sync state to be stored or None to delete any stored - state. - :type state: dict or None - """ - c = self._db_handle.cursor() - if state is None: - c.execute("DELETE FROM u1db_config" - " WHERE name = 'sync_state'") - else: - c.execute("INSERT OR REPLACE INTO u1db_config" - " VALUES ('sync_state', ?)", - (json.dumps(state),)) - - stored_sync_state = property( - _get_stored_sync_state, _set_stored_sync_state, - doc="The current sync state dict.") - - @property - def sync_state(self): - return ClientSyncState(self) sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase) diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index 5285d540..56e63416 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -27,103 +27,6 @@ from u1db import errors from u1db.sync import Synchronizer as U1DBSynchronizer -class ClientSyncState(object): - """ - The state of the current sync session, as stored on the client. - """ - - _private_attrs = [ - '_db', - ] - - _public_attrs = { - 'target_replica_uid': None, - 'target_gen': None, - 'target_trans_id': None, - 'target_my_gen': None, - 'target_my_trans_id': None, - 'target_last_known_gen': None, - 'target_last_known_trans_id': None, - 'my_gen': None, - 'changes': None, - 'sent': 0, - 'received': 0, - } - - @property - def _public_attr_keys(self): - return self._public_attrs.keys() - - def __init__(self, db=None): - """ - Initialize the client sync state. - - :param db: The database where to fetch/store the sync state. - :type db: SQLCipherDatabase - """ - self._db = db - self._init_state() - - def __setattr__(self, attr, val): - """ - Prevent setting arbitrary attributes. - - :param attr: The attribute name. - :type attr: str - :param val: The value to be set. - :type val: anything - """ - if attr not in self._public_attr_keys + self._private_attrs: - raise Exception - object.__setattr__(self, attr, val) - - def _init_state(self): - """ - Initialize current sync state, potentially fetching sync info stored - in database. - """ - # set local default attributes - for attr in self._public_attr_keys: - setattr(self, attr, self._public_attrs[attr]) - # fetch info from stored sync state - sync_state_dict = None - if self._db is not None: - sync_state_dict = self._db.stored_sync_state - if sync_state_dict is not None: - for attr in self._public_attr_keys: - setattr(self, attr, sync_state_dict[attr]) - - def save(self): - """ - Save the current sync state in the database. - """ - sync_state_dict = {} - for attr in self._public_attr_keys: - sync_state_dict[attr] = getattr(self, attr) - if self._db is not None: - self._db.stored_sync_state = sync_state_dict - - def clear(self): - """ - Clear the sync state info data. - """ - if self._db is not None: - self._db.stored_sync_state = None - self._init_state() - - def has_stored_info(self): - """ - Return whether there is any sync state info stored on the database. - - :return: Whether there's any sync state info store on db. - :rtype: bool - """ - return self._db is not None and self._db.stored_sync_state is not None - - def __str__(self): - return 'ClientSyncState: %s' % ', '.join( - ['%s: %s' % (k, getattr(self, k)) for k in self._public_attr_keys]) - class Synchronizer(U1DBSynchronizer): """ Collect the state around synchronizing 2 U1DB replicas. @@ -146,36 +49,20 @@ class Synchronizer(U1DBSynchronizer): """ sync_target = self.sync_target - # recover current sync state from source database - sync_state = self.source.sync_state - self.target_replica_uid = sync_state.target_replica_uid - target_gen = sync_state.target_gen - target_trans_id = sync_state.target_trans_id - target_my_gen = sync_state.target_my_gen - target_my_trans_id = sync_state.target_my_trans_id - target_last_known_gen = sync_state.target_last_known_gen - target_last_known_trans_id = \ - sync_state.target_last_known_trans_id - my_gen = sync_state.my_gen - changes = sync_state.changes - sent = sync_state.sent - received = sync_state.received - # get target identifier, its current generation, # and its last-seen database generation for this source ensure_callback = None - if not sync_state.has_stored_info(): - try: - (self.target_replica_uid, target_gen, target_trans_id, - target_my_gen, target_my_trans_id) = \ - sync_target.get_sync_info(self.source._replica_uid) - except errors.DatabaseDoesNotExist: - if not autocreate: - raise - # will try to ask sync_exchange() to create the db - self.target_replica_uid = None - target_gen, target_trans_id = (0, '') - target_my_gen, target_my_trans_id = (0, '') + try: + (self.target_replica_uid, target_gen, target_trans_id, + target_my_gen, target_my_trans_id) = \ + sync_target.get_sync_info(self.source._replica_uid) + except errors.DatabaseDoesNotExist: + if not autocreate: + raise + # will try to ask sync_exchange() to create the db + self.target_replica_uid = None + target_gen, target_trans_id = (0, '') + target_my_gen, target_my_trans_id = (0, '') # make sure we'll have access to target replica uid once it exists if self.target_replica_uid is None: @@ -192,17 +79,15 @@ class Synchronizer(U1DBSynchronizer): target_my_gen, target_my_trans_id) # what's changed since that generation and this current gen - if not sync_state.has_stored_info(): - my_gen, _, changes = self.source.whats_changed(target_my_gen) + my_gen, _, changes = self.source.whats_changed(target_my_gen) # get source last-seen database generation for the target - if not sync_state.has_stored_info(): - if self.target_replica_uid is None: - target_last_known_gen, target_last_known_trans_id = 0, '' - else: - target_last_known_gen, target_last_known_trans_id = \ - self.source._get_replica_gen_and_trans_id( - self.target_replica_uid) + if self.target_replica_uid is None: + target_last_known_gen, target_last_known_trans_id = 0, '' + else: + target_last_known_gen, target_last_known_trans_id = \ + self.source._get_replica_gen_and_trans_id( + self.target_replica_uid) # validate transaction ids if not changes and target_last_known_gen == target_gen: @@ -220,20 +105,6 @@ class Synchronizer(U1DBSynchronizer): _, gen, trans = changes[idx] docs_by_generation.append((doc, gen, trans)) idx += 1 - # store current sync state info - if not sync_state.has_stored_info(): - sync_state.target_replica_uid = self.target_replica_uid - sync_state.target_gen = target_gen - sync_state.target_trans_id = target_trans_id - sync_state.target_my_gen = target_my_gen - sync_state.target_my_trans_id = target_my_trans_id - sync_state.my_gen = my_gen - sync_state.changes = changes - sync_state.target_last_known_trans_id = \ - target_last_known_trans_id - sync_state.target_last_known_gen = target_last_known_gen - sync_state.sent = sent = 0 - sync_state.received = received = 0 # exchange documents and try to insert the returned ones with # the target, return target synced-up-to gen. @@ -243,16 +114,7 @@ class Synchronizer(U1DBSynchronizer): new_gen, new_trans_id = sync_target.sync_exchange( docs_by_generation, self.source._replica_uid, target_last_known_gen, target_last_known_trans_id, - self._insert_doc_from_target, ensure_callback=ensure_callback, - sync_state=sync_state) - - # save sync state info if the sync was interrupted - if new_gen is None and new_trans_id is None: - sync_state.save() - return my_gen - - # sync exchange was succesfull, remove sync state info from source - sync_state.clear() + self._insert_doc_from_target, ensure_callback=ensure_callback) # record target synced-up-to generation including applying what we sent self.source._set_replica_gen_and_trans_id( diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 93de98d3..8f753f74 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -63,7 +63,6 @@ from leap.soledad.client.events import ( SOLEDAD_SYNC_RECEIVE_STATUS, signal, ) -from leap.soledad.client.sync import ClientSyncState logger = logging.getLogger(__name__) @@ -321,7 +320,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): HTTPSyncTarget.__init__(self, url, creds) self._crypto = crypto self._stopped = True - self._sync_state = None self._stop_lock = threading.Lock() def _init_post_request(self, url, action, headers, content_length): @@ -347,7 +345,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self._conn.endheaders() def _get_remote_docs(self, url, last_known_generation, last_known_trans_id, - headers, return_doc_cb, ensure_callback=None): + headers, return_doc_cb, ensure_callback, sync_id): """ Fetch sync documents from the remote database and insert them in the local database. @@ -377,7 +375,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :rtype: list of str """ - def _post_get_doc(): + def _post_get_doc(received): """ Get a sync document from server by means of a POST request. """ @@ -388,10 +386,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): '', entries, last_known_generation=last_known_generation, last_known_trans_id=last_known_trans_id, + sync_id=sync_id, ensure=ensure_callback is not None) # inform server of how many documents have already been received size += self._prepare( - ',', entries, received=self._sync_state.received) + ',', entries, received=received) entries.append('\r\n]') size += len(entries[-1]) # send headers @@ -402,14 +401,16 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): return self._response() number_of_changes = None + received = 0 - while number_of_changes is None or \ - self._sync_state.received < number_of_changes: + new_generation = last_known_generation + new_transaction_id = last_known_trans_id + while number_of_changes is None or received < number_of_changes: # bail out if sync process was interrupted if self.stopped is True: - return None, None + return last_known_generation, last_known_trans_id # try to fetch one document from target - data, _ = _post_get_doc() + data, _ = _post_get_doc(received) # decode incoming stream parts = data.splitlines() if not parts or parts[0] != '[' or parts[-1] != ']': @@ -424,6 +425,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): soledad_assert('new_generation' in metadata) soledad_assert('new_transaction_id' in metadata) number_of_changes = metadata['number_of_changes'] + new_generation = metadata['new_generation'] + new_transaction_id = metadata['new_transaction_id'] except json.JSONDecodeError, AssertionError: raise BrokenSyncStream # make sure we have replica_uid from fresh new dbs @@ -453,12 +456,12 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): # end of symmetric decryption # ------------------------------------------------------------- return_doc_cb(doc, entry['gen'], entry['trans_id']) - self._sync_state.received += 1 + received += 1 signal( SOLEDAD_SYNC_RECEIVE_STATUS, "%d/%d" % - (self._sync_state.received, number_of_changes)) - return metadata['new_generation'], metadata['new_transaction_id'] + (received, number_of_changes)) + return new_generation, new_transaction_id def _request(self, method, url_parts, params=None, body=None, content_type=None): @@ -566,8 +569,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): def sync_exchange(self, docs_by_generations, source_replica_uid, last_known_generation, last_known_trans_id, - return_doc_cb, ensure_callback=None, - sync_state=None): + return_doc_cb, ensure_callback=None): """ Find out which documents the remote database does not know about, encrypt and send them. @@ -596,12 +598,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :return: The new generation and transaction id of the target replica. :rtype: tuple """ - # get the sync state information from client - self._sync_state = sync_state - if self._sync_state is None: - self._sync_state = ClientSyncState() - self.start() + sync_id = str(uuid4()) self._ensure_connection() if self._trace_hook: # for tests @@ -610,7 +608,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): headers = self._sign_request('POST', url, {}) def _post_put_doc(headers, last_known_generation, last_known_trans_id, - id, rev, content, gen, trans_id): + id, rev, content, gen, trans_id, sync_id): """ Put a sync document on server by means of a POST request. @@ -626,6 +624,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): '', entries, last_known_generation=last_known_generation, last_known_trans_id=last_known_trans_id, + sync_id=sync_id, ensure=ensure_callback is not None) # add the document to the request size += self._prepare( @@ -645,11 +644,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): cur_target_gen = last_known_generation cur_target_trans_id = last_known_trans_id - # skip docs that were already sent - if self._sync_state.sent > 0: - docs_by_generations = docs_by_generations[self._sync_state.sent:] - # send docs + sent = 0 + signal( + SOLEDAD_SYNC_SEND_STATUS, + "%d/%d" % (0, len(docs_by_generations))) for doc, gen, trans_id in docs_by_generations: # allow for interrupting the sync process if self.stopped is True: @@ -668,17 +667,18 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): # ------------------------------------------------------------- cur_target_gen, cur_target_trans_id = _post_put_doc( headers, cur_target_gen, cur_target_trans_id, id=doc.doc_id, - rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id) - self._sync_state.sent += 1 + rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id, + sync_id=sync_id) + sent += 1 signal( SOLEDAD_SYNC_SEND_STATUS, - "%d/%d" % (self._sync_state.sent, len(docs_by_generations))) + "%d/%d" % (sent, len(docs_by_generations))) # get docs from target cur_target_gen, cur_target_trans_id = self._get_remote_docs( url, last_known_generation, last_known_trans_id, headers, - return_doc_cb, ensure_callback) + return_doc_cb, ensure_callback, sync_id) self.stop() return cur_target_gen, cur_target_trans_id diff --git a/common/src/leap/soledad/common/ddocs/syncs/updates/state.js b/common/src/leap/soledad/common/ddocs/syncs/updates/state.js index cb2b6b7b..d62aeb40 100644 --- a/common/src/leap/soledad/common/ddocs/syncs/updates/state.js +++ b/common/src/leap/soledad/common/ddocs/syncs/updates/state.js @@ -29,6 +29,7 @@ * '_rev' '', * 'ongoing_syncs': { * '': { + * 'sync_id': '', * 'seen_ids': [['', [, ...], * 'changes_to_return': { * 'gen': , @@ -59,17 +60,22 @@ function(doc, req) { // parse and validate incoming data var body = JSON.parse(req.body); if (body['source_replica_uid'] == null) - return [null, 'invalid data'] + return [null, 'invalid data']; var source_replica_uid = body['source_replica_uid']; + if (body['sync_id'] == null) + return [null, 'invalid data']; + var sync_id = body['sync_id']; + // trash outdated sync data for that replica if that exists if (doc['ongoing_syncs'][source_replica_uid] != null && - doc['ongoing_syncs'][source_replica_uid] == null) + doc['ongoing_syncs'][source_replica_uid]['sync_id'] != sync_id) delete doc['ongoing_syncs'][source_replica_uid]; // create an entry for that source replica if (doc['ongoing_syncs'][source_replica_uid] == null) doc['ongoing_syncs'][source_replica_uid] = { + 'sync_id': sync_id, 'seen_ids': {}, 'changes_to_return': null, }; diff --git a/common/src/leap/soledad/common/ddocs/syncs/views/changes_to_return/map.js b/common/src/leap/soledad/common/ddocs/syncs/views/changes_to_return/map.js index 04ceb2ec..94b7e767 100644 --- a/common/src/leap/soledad/common/ddocs/syncs/views/changes_to_return/map.js +++ b/common/src/leap/soledad/common/ddocs/syncs/views/changes_to_return/map.js @@ -2,14 +2,15 @@ function(doc) { if (doc['_id'] == 'u1db_sync_state' && doc['ongoing_syncs'] != null) for (var source_replica_uid in doc['ongoing_syncs']) { var changes = doc['ongoing_syncs'][source_replica_uid]['changes_to_return']; + var sync_id = doc['ongoing_syncs'][source_replica_uid]['sync_id']; if (changes == null) - emit([source_replica_uid, 0], null); + emit([source_replica_uid, sync_id, 0], null); else if (changes.length == 0) - emit([source_replica_uid, 0], []); + emit([source_replica_uid, sync_id, 0], []); else for (var i = 0; i < changes['changes_to_return'].length; i++) emit( - [source_replica_uid, i], + [source_replica_uid, sync_id, i], { 'gen': changes['gen'], 'trans_id': changes['trans_id'], diff --git a/common/src/leap/soledad/common/ddocs/syncs/views/seen_ids/map.js b/common/src/leap/soledad/common/ddocs/syncs/views/seen_ids/map.js index 34c65b3f..16118e88 100644 --- a/common/src/leap/soledad/common/ddocs/syncs/views/seen_ids/map.js +++ b/common/src/leap/soledad/common/ddocs/syncs/views/seen_ids/map.js @@ -1,9 +1,11 @@ function(doc) { if (doc['_id'] == 'u1db_sync_state' && doc['ongoing_syncs'] != null) - for (var source_replica_uid in doc['ongoing_syncs']) + for (var source_replica_uid in doc['ongoing_syncs']) { + var sync_id = doc['ongoing_syncs'][source_replica_uid]['sync_id']; emit( - source_replica_uid, + [source_replica_uid, sync_id], { 'seen_ids': doc['ongoing_syncs'][source_replica_uid]['seen_ids'], }); + } } diff --git a/common/src/leap/soledad/common/ddocs/syncs/views/state/map.js b/common/src/leap/soledad/common/ddocs/syncs/views/state/map.js index 1d8f8e84..e88c6ebb 100644 --- a/common/src/leap/soledad/common/ddocs/syncs/views/state/map.js +++ b/common/src/leap/soledad/common/ddocs/syncs/views/state/map.js @@ -2,11 +2,12 @@ function(doc) { if (doc['_id'] == 'u1db_sync_state' && doc['ongoing_syncs'] != null) for (var source_replica_uid in doc['ongoing_syncs']) { var changes = doc['ongoing_syncs'][source_replica_uid]['changes_to_return']; + var sync_id = doc['ongoing_syncs'][source_replica_uid]['sync_id']; if (changes == null) - emit(source_replica_uid, null); + emit([source_replica_uid, sync_id], null); else emit( - source_replica_uid, + [source_replica_uid, sync_id], { 'gen': changes['gen'], 'trans_id': changes['trans_id'], diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index 16926f14..c6928aaa 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -48,7 +48,7 @@ class ServerSyncState(object): called 'u1db_sync_state'. """ - def __init__(self, db, source_replica_uid): + def __init__(self, db, source_replica_uid, sync_id): """ Initialize the sync state object. @@ -59,6 +59,7 @@ class ServerSyncState(object): """ self._db = db self._source_replica_uid = source_replica_uid + self._sync_id = sync_id def _key(self, key): """ @@ -91,6 +92,7 @@ class ServerSyncState(object): with CouchDatabase.sync_info_lock[self._db.replica_uid]: res.put_json( body={ + 'sync_id': self._sync_id, 'source_replica_uid': self._source_replica_uid, key: value, }, @@ -118,7 +120,8 @@ class ServerSyncState(object): """ ddoc_path = ['_design', 'syncs', '_view', 'seen_ids'] resource = self._db._database.resource(*ddoc_path) - response = resource.get_json(key=self._key(self._source_replica_uid)) + response = resource.get_json( + key=self._key([self._source_replica_uid, self._sync_id])) data = response[2] if data['rows']: entry = data['rows'].pop() @@ -160,7 +163,8 @@ class ServerSyncState(object): """ ddoc_path = ['_design', 'syncs', '_view', 'state'] resource = self._db._database.resource(*ddoc_path) - response = resource.get_json(key=self._key(self._source_replica_uid)) + response = resource.get_json( + key=self._key([self._source_replica_uid, self._sync_id])) data = response[2] gen = None trans_id = None @@ -184,7 +188,7 @@ class ServerSyncState(object): resource = self._db._database.resource(*ddoc_path) response = resource.get_json( key=self._key( - [self._source_replica_uid, received])) + [self._source_replica_uid, self._sync_id, received])) data = response[2] if not data['rows']: return None, None, None @@ -197,7 +201,7 @@ class ServerSyncState(object): class SyncExchange(sync.SyncExchange): - def __init__(self, db, source_replica_uid, last_known_generation): + def __init__(self, db, source_replica_uid, last_known_generation, sync_id): """ :param db: The target syncing database. :type db: CouchDatabase @@ -210,11 +214,13 @@ class SyncExchange(sync.SyncExchange): self._db = db self.source_replica_uid = source_replica_uid self.source_last_known_generation = last_known_generation + self.sync_id = sync_id self.new_gen = None self.new_trans_id = None self._trace_hook = None # recover sync state - self._sync_state = ServerSyncState(self._db, self.source_replica_uid) + self._sync_state = ServerSyncState( + self._db, self.source_replica_uid, sync_id) def find_changes_to_return(self, received): @@ -322,9 +328,9 @@ class SyncResource(http_app.SyncResource): @http_app.http_method( last_known_generation=int, last_known_trans_id=http_app.none_or_str, - content_as_args=True) + sync_id=http_app.none_or_str, content_as_args=True) def post_args(self, last_known_generation, last_known_trans_id=None, - ensure=False): + sync_id=None, ensure=False): """ Handle the initial arguments for the sync POST request from client. @@ -348,7 +354,7 @@ class SyncResource(http_app.SyncResource): last_known_generation, last_known_trans_id) # get a sync exchange object self.sync_exch = self.sync_exchange_class( - db, self.source_replica_uid, last_known_generation) + db, self.source_replica_uid, last_known_generation, sync_id) @http_app.http_method(content_as_args=True) def post_put(self, id, rev, content, gen, trans_id): @@ -405,8 +411,8 @@ class SyncResource(http_app.SyncResource): def post_end(self): """ - Return the current generation and transaction_id after inserting a - series of incoming documents. + Return the current generation and transaction_id after inserting one + incoming document. """ self.responder.content_type = 'application/x-soledad-sync-response' self.responder.start_response(200) -- cgit v1.2.3