summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py42
-rw-r--r--client/src/leap/soledad/client/sync.py176
-rw-r--r--client/src/leap/soledad/client/target.py56
-rw-r--r--common/src/leap/soledad/common/ddocs/syncs/updates/state.js10
-rw-r--r--common/src/leap/soledad/common/ddocs/syncs/views/changes_to_return/map.js7
-rw-r--r--common/src/leap/soledad/common/ddocs/syncs/views/seen_ids/map.js6
-rw-r--r--common/src/leap/soledad/common/ddocs/syncs/views/state/map.js5
-rw-r--r--server/src/leap/soledad/server/sync.py28
8 files changed, 84 insertions, 246 deletions
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index 74351116..26238af6 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -57,7 +57,7 @@ from pysqlcipher import dbapi2
from u1db.backends import sqlite_backend
from u1db import errors as u1db_errors
-from leap.soledad.client.sync import Synchronizer, ClientSyncState
+from leap.soledad.client.sync import Synchronizer
from leap.soledad.client.target import SoledadSyncTarget
from leap.soledad.common.document import SoledadDocument
@@ -889,45 +889,5 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
if self._db_handle is not None:
self._db_handle.close()
- def _get_stored_sync_state(self):
- """
- Retrieve the currently stored sync state.
-
- :return: The current stored sync state or None if there's no stored
- state.
- :rtype: dict or None
- """
- c = self._db_handle.cursor()
- c.execute("SELECT value FROM u1db_config"
- " WHERE name = 'sync_state'")
- val = c.fetchone()
- if val is None:
- return None
- return json.loads(val[0])
-
- def _set_stored_sync_state(self, state):
- """
- Stored the sync state.
-
- :param state: The sync state to be stored or None to delete any stored
- state.
- :type state: dict or None
- """
- c = self._db_handle.cursor()
- if state is None:
- c.execute("DELETE FROM u1db_config"
- " WHERE name = 'sync_state'")
- else:
- c.execute("INSERT OR REPLACE INTO u1db_config"
- " VALUES ('sync_state', ?)",
- (json.dumps(state),))
-
- stored_sync_state = property(
- _get_stored_sync_state, _set_stored_sync_state,
- doc="The current sync state dict.")
-
- @property
- def sync_state(self):
- return ClientSyncState(self)
sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase)
diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py
index 5285d540..56e63416 100644
--- a/client/src/leap/soledad/client/sync.py
+++ b/client/src/leap/soledad/client/sync.py
@@ -27,103 +27,6 @@ from u1db import errors
from u1db.sync import Synchronizer as U1DBSynchronizer
-class ClientSyncState(object):
- """
- The state of the current sync session, as stored on the client.
- """
-
- _private_attrs = [
- '_db',
- ]
-
- _public_attrs = {
- 'target_replica_uid': None,
- 'target_gen': None,
- 'target_trans_id': None,
- 'target_my_gen': None,
- 'target_my_trans_id': None,
- 'target_last_known_gen': None,
- 'target_last_known_trans_id': None,
- 'my_gen': None,
- 'changes': None,
- 'sent': 0,
- 'received': 0,
- }
-
- @property
- def _public_attr_keys(self):
- return self._public_attrs.keys()
-
- def __init__(self, db=None):
- """
- Initialize the client sync state.
-
- :param db: The database where to fetch/store the sync state.
- :type db: SQLCipherDatabase
- """
- self._db = db
- self._init_state()
-
- def __setattr__(self, attr, val):
- """
- Prevent setting arbitrary attributes.
-
- :param attr: The attribute name.
- :type attr: str
- :param val: The value to be set.
- :type val: anything
- """
- if attr not in self._public_attr_keys + self._private_attrs:
- raise Exception
- object.__setattr__(self, attr, val)
-
- def _init_state(self):
- """
- Initialize current sync state, potentially fetching sync info stored
- in database.
- """
- # set local default attributes
- for attr in self._public_attr_keys:
- setattr(self, attr, self._public_attrs[attr])
- # fetch info from stored sync state
- sync_state_dict = None
- if self._db is not None:
- sync_state_dict = self._db.stored_sync_state
- if sync_state_dict is not None:
- for attr in self._public_attr_keys:
- setattr(self, attr, sync_state_dict[attr])
-
- def save(self):
- """
- Save the current sync state in the database.
- """
- sync_state_dict = {}
- for attr in self._public_attr_keys:
- sync_state_dict[attr] = getattr(self, attr)
- if self._db is not None:
- self._db.stored_sync_state = sync_state_dict
-
- def clear(self):
- """
- Clear the sync state info data.
- """
- if self._db is not None:
- self._db.stored_sync_state = None
- self._init_state()
-
- def has_stored_info(self):
- """
- Return whether there is any sync state info stored on the database.
-
- :return: Whether there's any sync state info store on db.
- :rtype: bool
- """
- return self._db is not None and self._db.stored_sync_state is not None
-
- def __str__(self):
- return 'ClientSyncState: %s' % ', '.join(
- ['%s: %s' % (k, getattr(self, k)) for k in self._public_attr_keys])
-
class Synchronizer(U1DBSynchronizer):
"""
Collect the state around synchronizing 2 U1DB replicas.
@@ -146,36 +49,20 @@ class Synchronizer(U1DBSynchronizer):
"""
sync_target = self.sync_target
- # recover current sync state from source database
- sync_state = self.source.sync_state
- self.target_replica_uid = sync_state.target_replica_uid
- target_gen = sync_state.target_gen
- target_trans_id = sync_state.target_trans_id
- target_my_gen = sync_state.target_my_gen
- target_my_trans_id = sync_state.target_my_trans_id
- target_last_known_gen = sync_state.target_last_known_gen
- target_last_known_trans_id = \
- sync_state.target_last_known_trans_id
- my_gen = sync_state.my_gen
- changes = sync_state.changes
- sent = sync_state.sent
- received = sync_state.received
-
# get target identifier, its current generation,
# and its last-seen database generation for this source
ensure_callback = None
- if not sync_state.has_stored_info():
- try:
- (self.target_replica_uid, target_gen, target_trans_id,
- target_my_gen, target_my_trans_id) = \
- sync_target.get_sync_info(self.source._replica_uid)
- except errors.DatabaseDoesNotExist:
- if not autocreate:
- raise
- # will try to ask sync_exchange() to create the db
- self.target_replica_uid = None
- target_gen, target_trans_id = (0, '')
- target_my_gen, target_my_trans_id = (0, '')
+ try:
+ (self.target_replica_uid, target_gen, target_trans_id,
+ target_my_gen, target_my_trans_id) = \
+ sync_target.get_sync_info(self.source._replica_uid)
+ except errors.DatabaseDoesNotExist:
+ if not autocreate:
+ raise
+ # will try to ask sync_exchange() to create the db
+ self.target_replica_uid = None
+ target_gen, target_trans_id = (0, '')
+ target_my_gen, target_my_trans_id = (0, '')
# make sure we'll have access to target replica uid once it exists
if self.target_replica_uid is None:
@@ -192,17 +79,15 @@ class Synchronizer(U1DBSynchronizer):
target_my_gen, target_my_trans_id)
# what's changed since that generation and this current gen
- if not sync_state.has_stored_info():
- my_gen, _, changes = self.source.whats_changed(target_my_gen)
+ my_gen, _, changes = self.source.whats_changed(target_my_gen)
# get source last-seen database generation for the target
- if not sync_state.has_stored_info():
- if self.target_replica_uid is None:
- target_last_known_gen, target_last_known_trans_id = 0, ''
- else:
- target_last_known_gen, target_last_known_trans_id = \
- self.source._get_replica_gen_and_trans_id(
- self.target_replica_uid)
+ if self.target_replica_uid is None:
+ target_last_known_gen, target_last_known_trans_id = 0, ''
+ else:
+ target_last_known_gen, target_last_known_trans_id = \
+ self.source._get_replica_gen_and_trans_id(
+ self.target_replica_uid)
# validate transaction ids
if not changes and target_last_known_gen == target_gen:
@@ -220,20 +105,6 @@ class Synchronizer(U1DBSynchronizer):
_, gen, trans = changes[idx]
docs_by_generation.append((doc, gen, trans))
idx += 1
- # store current sync state info
- if not sync_state.has_stored_info():
- sync_state.target_replica_uid = self.target_replica_uid
- sync_state.target_gen = target_gen
- sync_state.target_trans_id = target_trans_id
- sync_state.target_my_gen = target_my_gen
- sync_state.target_my_trans_id = target_my_trans_id
- sync_state.my_gen = my_gen
- sync_state.changes = changes
- sync_state.target_last_known_trans_id = \
- target_last_known_trans_id
- sync_state.target_last_known_gen = target_last_known_gen
- sync_state.sent = sent = 0
- sync_state.received = received = 0
# exchange documents and try to insert the returned ones with
# the target, return target synced-up-to gen.
@@ -243,16 +114,7 @@ class Synchronizer(U1DBSynchronizer):
new_gen, new_trans_id = sync_target.sync_exchange(
docs_by_generation, self.source._replica_uid,
target_last_known_gen, target_last_known_trans_id,
- self._insert_doc_from_target, ensure_callback=ensure_callback,
- sync_state=sync_state)
-
- # save sync state info if the sync was interrupted
- if new_gen is None and new_trans_id is None:
- sync_state.save()
- return my_gen
-
- # sync exchange was succesfull, remove sync state info from source
- sync_state.clear()
+ self._insert_doc_from_target, ensure_callback=ensure_callback)
# record target synced-up-to generation including applying what we sent
self.source._set_replica_gen_and_trans_id(
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
index 93de98d3..8f753f74 100644
--- a/client/src/leap/soledad/client/target.py
+++ b/client/src/leap/soledad/client/target.py
@@ -63,7 +63,6 @@ from leap.soledad.client.events import (
SOLEDAD_SYNC_RECEIVE_STATUS,
signal,
)
-from leap.soledad.client.sync import ClientSyncState
logger = logging.getLogger(__name__)
@@ -321,7 +320,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
HTTPSyncTarget.__init__(self, url, creds)
self._crypto = crypto
self._stopped = True
- self._sync_state = None
self._stop_lock = threading.Lock()
def _init_post_request(self, url, action, headers, content_length):
@@ -347,7 +345,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self._conn.endheaders()
def _get_remote_docs(self, url, last_known_generation, last_known_trans_id,
- headers, return_doc_cb, ensure_callback=None):
+ headers, return_doc_cb, ensure_callback, sync_id):
"""
Fetch sync documents from the remote database and insert them in the
local database.
@@ -377,7 +375,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:rtype: list of str
"""
- def _post_get_doc():
+ def _post_get_doc(received):
"""
Get a sync document from server by means of a POST request.
"""
@@ -388,10 +386,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
'', entries,
last_known_generation=last_known_generation,
last_known_trans_id=last_known_trans_id,
+ sync_id=sync_id,
ensure=ensure_callback is not None)
# inform server of how many documents have already been received
size += self._prepare(
- ',', entries, received=self._sync_state.received)
+ ',', entries, received=received)
entries.append('\r\n]')
size += len(entries[-1])
# send headers
@@ -402,14 +401,16 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
return self._response()
number_of_changes = None
+ received = 0
- while number_of_changes is None or \
- self._sync_state.received < number_of_changes:
+ new_generation = last_known_generation
+ new_transaction_id = last_known_trans_id
+ while number_of_changes is None or received < number_of_changes:
# bail out if sync process was interrupted
if self.stopped is True:
- return None, None
+ return last_known_generation, last_known_trans_id
# try to fetch one document from target
- data, _ = _post_get_doc()
+ data, _ = _post_get_doc(received)
# decode incoming stream
parts = data.splitlines()
if not parts or parts[0] != '[' or parts[-1] != ']':
@@ -424,6 +425,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
soledad_assert('new_generation' in metadata)
soledad_assert('new_transaction_id' in metadata)
number_of_changes = metadata['number_of_changes']
+ new_generation = metadata['new_generation']
+ new_transaction_id = metadata['new_transaction_id']
except json.JSONDecodeError, AssertionError:
raise BrokenSyncStream
# make sure we have replica_uid from fresh new dbs
@@ -453,12 +456,12 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
# end of symmetric decryption
# -------------------------------------------------------------
return_doc_cb(doc, entry['gen'], entry['trans_id'])
- self._sync_state.received += 1
+ received += 1
signal(
SOLEDAD_SYNC_RECEIVE_STATUS,
"%d/%d" %
- (self._sync_state.received, number_of_changes))
- return metadata['new_generation'], metadata['new_transaction_id']
+ (received, number_of_changes))
+ return new_generation, new_transaction_id
def _request(self, method, url_parts, params=None, body=None,
content_type=None):
@@ -566,8 +569,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
def sync_exchange(self, docs_by_generations, source_replica_uid,
last_known_generation, last_known_trans_id,
- return_doc_cb, ensure_callback=None,
- sync_state=None):
+ return_doc_cb, ensure_callback=None):
"""
Find out which documents the remote database does not know about,
encrypt and send them.
@@ -596,12 +598,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:return: The new generation and transaction id of the target replica.
:rtype: tuple
"""
- # get the sync state information from client
- self._sync_state = sync_state
- if self._sync_state is None:
- self._sync_state = ClientSyncState()
-
self.start()
+ sync_id = str(uuid4())
self._ensure_connection()
if self._trace_hook: # for tests
@@ -610,7 +608,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
headers = self._sign_request('POST', url, {})
def _post_put_doc(headers, last_known_generation, last_known_trans_id,
- id, rev, content, gen, trans_id):
+ id, rev, content, gen, trans_id, sync_id):
"""
Put a sync document on server by means of a POST request.
@@ -626,6 +624,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
'', entries,
last_known_generation=last_known_generation,
last_known_trans_id=last_known_trans_id,
+ sync_id=sync_id,
ensure=ensure_callback is not None)
# add the document to the request
size += self._prepare(
@@ -645,11 +644,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
cur_target_gen = last_known_generation
cur_target_trans_id = last_known_trans_id
- # skip docs that were already sent
- if self._sync_state.sent > 0:
- docs_by_generations = docs_by_generations[self._sync_state.sent:]
-
# send docs
+ sent = 0
+ signal(
+ SOLEDAD_SYNC_SEND_STATUS,
+ "%d/%d" % (0, len(docs_by_generations)))
for doc, gen, trans_id in docs_by_generations:
# allow for interrupting the sync process
if self.stopped is True:
@@ -668,17 +667,18 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
# -------------------------------------------------------------
cur_target_gen, cur_target_trans_id = _post_put_doc(
headers, cur_target_gen, cur_target_trans_id, id=doc.doc_id,
- rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id)
- self._sync_state.sent += 1
+ rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id,
+ sync_id=sync_id)
+ sent += 1
signal(
SOLEDAD_SYNC_SEND_STATUS,
- "%d/%d" % (self._sync_state.sent, len(docs_by_generations)))
+ "%d/%d" % (sent, len(docs_by_generations)))
# get docs from target
cur_target_gen, cur_target_trans_id = self._get_remote_docs(
url,
last_known_generation, last_known_trans_id, headers,
- return_doc_cb, ensure_callback)
+ return_doc_cb, ensure_callback, sync_id)
self.stop()
return cur_target_gen, cur_target_trans_id
diff --git a/common/src/leap/soledad/common/ddocs/syncs/updates/state.js b/common/src/leap/soledad/common/ddocs/syncs/updates/state.js
index cb2b6b7b..d62aeb40 100644
--- a/common/src/leap/soledad/common/ddocs/syncs/updates/state.js
+++ b/common/src/leap/soledad/common/ddocs/syncs/updates/state.js
@@ -29,6 +29,7 @@
* '_rev' '<str>',
* 'ongoing_syncs': {
* '<source_replica_uid>': {
+ * 'sync_id': '<sync_id>',
* 'seen_ids': [['<doc_id>', <at_gen>[, ...],
* 'changes_to_return': {
* 'gen': <gen>,
@@ -59,17 +60,22 @@ function(doc, req) {
// parse and validate incoming data
var body = JSON.parse(req.body);
if (body['source_replica_uid'] == null)
- return [null, 'invalid data']
+ return [null, 'invalid data'];
var source_replica_uid = body['source_replica_uid'];
+ if (body['sync_id'] == null)
+ return [null, 'invalid data'];
+ var sync_id = body['sync_id'];
+
// trash outdated sync data for that replica if that exists
if (doc['ongoing_syncs'][source_replica_uid] != null &&
- doc['ongoing_syncs'][source_replica_uid] == null)
+ doc['ongoing_syncs'][source_replica_uid]['sync_id'] != sync_id)
delete doc['ongoing_syncs'][source_replica_uid];
// create an entry for that source replica
if (doc['ongoing_syncs'][source_replica_uid] == null)
doc['ongoing_syncs'][source_replica_uid] = {
+ 'sync_id': sync_id,
'seen_ids': {},
'changes_to_return': null,
};
diff --git a/common/src/leap/soledad/common/ddocs/syncs/views/changes_to_return/map.js b/common/src/leap/soledad/common/ddocs/syncs/views/changes_to_return/map.js
index 04ceb2ec..94b7e767 100644
--- a/common/src/leap/soledad/common/ddocs/syncs/views/changes_to_return/map.js
+++ b/common/src/leap/soledad/common/ddocs/syncs/views/changes_to_return/map.js
@@ -2,14 +2,15 @@ function(doc) {
if (doc['_id'] == 'u1db_sync_state' && doc['ongoing_syncs'] != null)
for (var source_replica_uid in doc['ongoing_syncs']) {
var changes = doc['ongoing_syncs'][source_replica_uid]['changes_to_return'];
+ var sync_id = doc['ongoing_syncs'][source_replica_uid]['sync_id'];
if (changes == null)
- emit([source_replica_uid, 0], null);
+ emit([source_replica_uid, sync_id, 0], null);
else if (changes.length == 0)
- emit([source_replica_uid, 0], []);
+ emit([source_replica_uid, sync_id, 0], []);
else
for (var i = 0; i < changes['changes_to_return'].length; i++)
emit(
- [source_replica_uid, i],
+ [source_replica_uid, sync_id, i],
{
'gen': changes['gen'],
'trans_id': changes['trans_id'],
diff --git a/common/src/leap/soledad/common/ddocs/syncs/views/seen_ids/map.js b/common/src/leap/soledad/common/ddocs/syncs/views/seen_ids/map.js
index 34c65b3f..16118e88 100644
--- a/common/src/leap/soledad/common/ddocs/syncs/views/seen_ids/map.js
+++ b/common/src/leap/soledad/common/ddocs/syncs/views/seen_ids/map.js
@@ -1,9 +1,11 @@
function(doc) {
if (doc['_id'] == 'u1db_sync_state' && doc['ongoing_syncs'] != null)
- for (var source_replica_uid in doc['ongoing_syncs'])
+ for (var source_replica_uid in doc['ongoing_syncs']) {
+ var sync_id = doc['ongoing_syncs'][source_replica_uid]['sync_id'];
emit(
- source_replica_uid,
+ [source_replica_uid, sync_id],
{
'seen_ids': doc['ongoing_syncs'][source_replica_uid]['seen_ids'],
});
+ }
}
diff --git a/common/src/leap/soledad/common/ddocs/syncs/views/state/map.js b/common/src/leap/soledad/common/ddocs/syncs/views/state/map.js
index 1d8f8e84..e88c6ebb 100644
--- a/common/src/leap/soledad/common/ddocs/syncs/views/state/map.js
+++ b/common/src/leap/soledad/common/ddocs/syncs/views/state/map.js
@@ -2,11 +2,12 @@ function(doc) {
if (doc['_id'] == 'u1db_sync_state' && doc['ongoing_syncs'] != null)
for (var source_replica_uid in doc['ongoing_syncs']) {
var changes = doc['ongoing_syncs'][source_replica_uid]['changes_to_return'];
+ var sync_id = doc['ongoing_syncs'][source_replica_uid]['sync_id'];
if (changes == null)
- emit(source_replica_uid, null);
+ emit([source_replica_uid, sync_id], null);
else
emit(
- source_replica_uid,
+ [source_replica_uid, sync_id],
{
'gen': changes['gen'],
'trans_id': changes['trans_id'],
diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py
index 16926f14..c6928aaa 100644
--- a/server/src/leap/soledad/server/sync.py
+++ b/server/src/leap/soledad/server/sync.py
@@ -48,7 +48,7 @@ class ServerSyncState(object):
called 'u1db_sync_state'.
"""
- def __init__(self, db, source_replica_uid):
+ def __init__(self, db, source_replica_uid, sync_id):
"""
Initialize the sync state object.
@@ -59,6 +59,7 @@ class ServerSyncState(object):
"""
self._db = db
self._source_replica_uid = source_replica_uid
+ self._sync_id = sync_id
def _key(self, key):
"""
@@ -91,6 +92,7 @@ class ServerSyncState(object):
with CouchDatabase.sync_info_lock[self._db.replica_uid]:
res.put_json(
body={
+ 'sync_id': self._sync_id,
'source_replica_uid': self._source_replica_uid,
key: value,
},
@@ -118,7 +120,8 @@ class ServerSyncState(object):
"""
ddoc_path = ['_design', 'syncs', '_view', 'seen_ids']
resource = self._db._database.resource(*ddoc_path)
- response = resource.get_json(key=self._key(self._source_replica_uid))
+ response = resource.get_json(
+ key=self._key([self._source_replica_uid, self._sync_id]))
data = response[2]
if data['rows']:
entry = data['rows'].pop()
@@ -160,7 +163,8 @@ class ServerSyncState(object):
"""
ddoc_path = ['_design', 'syncs', '_view', 'state']
resource = self._db._database.resource(*ddoc_path)
- response = resource.get_json(key=self._key(self._source_replica_uid))
+ response = resource.get_json(
+ key=self._key([self._source_replica_uid, self._sync_id]))
data = response[2]
gen = None
trans_id = None
@@ -184,7 +188,7 @@ class ServerSyncState(object):
resource = self._db._database.resource(*ddoc_path)
response = resource.get_json(
key=self._key(
- [self._source_replica_uid, received]))
+ [self._source_replica_uid, self._sync_id, received]))
data = response[2]
if not data['rows']:
return None, None, None
@@ -197,7 +201,7 @@ class ServerSyncState(object):
class SyncExchange(sync.SyncExchange):
- def __init__(self, db, source_replica_uid, last_known_generation):
+ def __init__(self, db, source_replica_uid, last_known_generation, sync_id):
"""
:param db: The target syncing database.
:type db: CouchDatabase
@@ -210,11 +214,13 @@ class SyncExchange(sync.SyncExchange):
self._db = db
self.source_replica_uid = source_replica_uid
self.source_last_known_generation = last_known_generation
+ self.sync_id = sync_id
self.new_gen = None
self.new_trans_id = None
self._trace_hook = None
# recover sync state
- self._sync_state = ServerSyncState(self._db, self.source_replica_uid)
+ self._sync_state = ServerSyncState(
+ self._db, self.source_replica_uid, sync_id)
def find_changes_to_return(self, received):
@@ -322,9 +328,9 @@ class SyncResource(http_app.SyncResource):
@http_app.http_method(
last_known_generation=int, last_known_trans_id=http_app.none_or_str,
- content_as_args=True)
+ sync_id=http_app.none_or_str, content_as_args=True)
def post_args(self, last_known_generation, last_known_trans_id=None,
- ensure=False):
+ sync_id=None, ensure=False):
"""
Handle the initial arguments for the sync POST request from client.
@@ -348,7 +354,7 @@ class SyncResource(http_app.SyncResource):
last_known_generation, last_known_trans_id)
# get a sync exchange object
self.sync_exch = self.sync_exchange_class(
- db, self.source_replica_uid, last_known_generation)
+ db, self.source_replica_uid, last_known_generation, sync_id)
@http_app.http_method(content_as_args=True)
def post_put(self, id, rev, content, gen, trans_id):
@@ -405,8 +411,8 @@ class SyncResource(http_app.SyncResource):
def post_end(self):
"""
- Return the current generation and transaction_id after inserting a
- series of incoming documents.
+ Return the current generation and transaction_id after inserting one
+ incoming document.
"""
self.responder.content_type = 'application/x-soledad-sync-response'
self.responder.start_response(200)