summaryrefslogtreecommitdiff
path: root/client/src
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2014-06-03 18:43:56 -0300
committerdrebs <drebs@leap.se>2014-06-05 10:45:29 -0300
commit7d9d827a5f66993863ca0c532c01ad3bf2c4353e (patch)
treed712d632a339cb6865e5aa38110f7b56c5f1b7db /client/src
parenta0cbdc8bbba4369c20bb5b285e464c23d6954e17 (diff)
Replace client sync state by a sync_id.
Diffstat (limited to 'client/src')
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py42
-rw-r--r--client/src/leap/soledad/client/sync.py176
-rw-r--r--client/src/leap/soledad/client/target.py56
3 files changed, 48 insertions, 226 deletions
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index 74351116..26238af6 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -57,7 +57,7 @@ from pysqlcipher import dbapi2
from u1db.backends import sqlite_backend
from u1db import errors as u1db_errors
-from leap.soledad.client.sync import Synchronizer, ClientSyncState
+from leap.soledad.client.sync import Synchronizer
from leap.soledad.client.target import SoledadSyncTarget
from leap.soledad.common.document import SoledadDocument
@@ -889,45 +889,5 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
if self._db_handle is not None:
self._db_handle.close()
- def _get_stored_sync_state(self):
- """
- Retrieve the currently stored sync state.
-
- :return: The current stored sync state or None if there's no stored
- state.
- :rtype: dict or None
- """
- c = self._db_handle.cursor()
- c.execute("SELECT value FROM u1db_config"
- " WHERE name = 'sync_state'")
- val = c.fetchone()
- if val is None:
- return None
- return json.loads(val[0])
-
- def _set_stored_sync_state(self, state):
- """
- Stored the sync state.
-
- :param state: The sync state to be stored or None to delete any stored
- state.
- :type state: dict or None
- """
- c = self._db_handle.cursor()
- if state is None:
- c.execute("DELETE FROM u1db_config"
- " WHERE name = 'sync_state'")
- else:
- c.execute("INSERT OR REPLACE INTO u1db_config"
- " VALUES ('sync_state', ?)",
- (json.dumps(state),))
-
- stored_sync_state = property(
- _get_stored_sync_state, _set_stored_sync_state,
- doc="The current sync state dict.")
-
- @property
- def sync_state(self):
- return ClientSyncState(self)
sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase)
diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py
index 5285d540..56e63416 100644
--- a/client/src/leap/soledad/client/sync.py
+++ b/client/src/leap/soledad/client/sync.py
@@ -27,103 +27,6 @@ from u1db import errors
from u1db.sync import Synchronizer as U1DBSynchronizer
-class ClientSyncState(object):
- """
- The state of the current sync session, as stored on the client.
- """
-
- _private_attrs = [
- '_db',
- ]
-
- _public_attrs = {
- 'target_replica_uid': None,
- 'target_gen': None,
- 'target_trans_id': None,
- 'target_my_gen': None,
- 'target_my_trans_id': None,
- 'target_last_known_gen': None,
- 'target_last_known_trans_id': None,
- 'my_gen': None,
- 'changes': None,
- 'sent': 0,
- 'received': 0,
- }
-
- @property
- def _public_attr_keys(self):
- return self._public_attrs.keys()
-
- def __init__(self, db=None):
- """
- Initialize the client sync state.
-
- :param db: The database where to fetch/store the sync state.
- :type db: SQLCipherDatabase
- """
- self._db = db
- self._init_state()
-
- def __setattr__(self, attr, val):
- """
- Prevent setting arbitrary attributes.
-
- :param attr: The attribute name.
- :type attr: str
- :param val: The value to be set.
- :type val: anything
- """
- if attr not in self._public_attr_keys + self._private_attrs:
- raise Exception
- object.__setattr__(self, attr, val)
-
- def _init_state(self):
- """
- Initialize current sync state, potentially fetching sync info stored
- in database.
- """
- # set local default attributes
- for attr in self._public_attr_keys:
- setattr(self, attr, self._public_attrs[attr])
- # fetch info from stored sync state
- sync_state_dict = None
- if self._db is not None:
- sync_state_dict = self._db.stored_sync_state
- if sync_state_dict is not None:
- for attr in self._public_attr_keys:
- setattr(self, attr, sync_state_dict[attr])
-
- def save(self):
- """
- Save the current sync state in the database.
- """
- sync_state_dict = {}
- for attr in self._public_attr_keys:
- sync_state_dict[attr] = getattr(self, attr)
- if self._db is not None:
- self._db.stored_sync_state = sync_state_dict
-
- def clear(self):
- """
- Clear the sync state info data.
- """
- if self._db is not None:
- self._db.stored_sync_state = None
- self._init_state()
-
- def has_stored_info(self):
- """
- Return whether there is any sync state info stored on the database.
-
- :return: Whether there's any sync state info store on db.
- :rtype: bool
- """
- return self._db is not None and self._db.stored_sync_state is not None
-
- def __str__(self):
- return 'ClientSyncState: %s' % ', '.join(
- ['%s: %s' % (k, getattr(self, k)) for k in self._public_attr_keys])
-
class Synchronizer(U1DBSynchronizer):
"""
Collect the state around synchronizing 2 U1DB replicas.
@@ -146,36 +49,20 @@ class Synchronizer(U1DBSynchronizer):
"""
sync_target = self.sync_target
- # recover current sync state from source database
- sync_state = self.source.sync_state
- self.target_replica_uid = sync_state.target_replica_uid
- target_gen = sync_state.target_gen
- target_trans_id = sync_state.target_trans_id
- target_my_gen = sync_state.target_my_gen
- target_my_trans_id = sync_state.target_my_trans_id
- target_last_known_gen = sync_state.target_last_known_gen
- target_last_known_trans_id = \
- sync_state.target_last_known_trans_id
- my_gen = sync_state.my_gen
- changes = sync_state.changes
- sent = sync_state.sent
- received = sync_state.received
-
# get target identifier, its current generation,
# and its last-seen database generation for this source
ensure_callback = None
- if not sync_state.has_stored_info():
- try:
- (self.target_replica_uid, target_gen, target_trans_id,
- target_my_gen, target_my_trans_id) = \
- sync_target.get_sync_info(self.source._replica_uid)
- except errors.DatabaseDoesNotExist:
- if not autocreate:
- raise
- # will try to ask sync_exchange() to create the db
- self.target_replica_uid = None
- target_gen, target_trans_id = (0, '')
- target_my_gen, target_my_trans_id = (0, '')
+ try:
+ (self.target_replica_uid, target_gen, target_trans_id,
+ target_my_gen, target_my_trans_id) = \
+ sync_target.get_sync_info(self.source._replica_uid)
+ except errors.DatabaseDoesNotExist:
+ if not autocreate:
+ raise
+ # will try to ask sync_exchange() to create the db
+ self.target_replica_uid = None
+ target_gen, target_trans_id = (0, '')
+ target_my_gen, target_my_trans_id = (0, '')
# make sure we'll have access to target replica uid once it exists
if self.target_replica_uid is None:
@@ -192,17 +79,15 @@ class Synchronizer(U1DBSynchronizer):
target_my_gen, target_my_trans_id)
# what's changed since that generation and this current gen
- if not sync_state.has_stored_info():
- my_gen, _, changes = self.source.whats_changed(target_my_gen)
+ my_gen, _, changes = self.source.whats_changed(target_my_gen)
# get source last-seen database generation for the target
- if not sync_state.has_stored_info():
- if self.target_replica_uid is None:
- target_last_known_gen, target_last_known_trans_id = 0, ''
- else:
- target_last_known_gen, target_last_known_trans_id = \
- self.source._get_replica_gen_and_trans_id(
- self.target_replica_uid)
+ if self.target_replica_uid is None:
+ target_last_known_gen, target_last_known_trans_id = 0, ''
+ else:
+ target_last_known_gen, target_last_known_trans_id = \
+ self.source._get_replica_gen_and_trans_id(
+ self.target_replica_uid)
# validate transaction ids
if not changes and target_last_known_gen == target_gen:
@@ -220,20 +105,6 @@ class Synchronizer(U1DBSynchronizer):
_, gen, trans = changes[idx]
docs_by_generation.append((doc, gen, trans))
idx += 1
- # store current sync state info
- if not sync_state.has_stored_info():
- sync_state.target_replica_uid = self.target_replica_uid
- sync_state.target_gen = target_gen
- sync_state.target_trans_id = target_trans_id
- sync_state.target_my_gen = target_my_gen
- sync_state.target_my_trans_id = target_my_trans_id
- sync_state.my_gen = my_gen
- sync_state.changes = changes
- sync_state.target_last_known_trans_id = \
- target_last_known_trans_id
- sync_state.target_last_known_gen = target_last_known_gen
- sync_state.sent = sent = 0
- sync_state.received = received = 0
# exchange documents and try to insert the returned ones with
# the target, return target synced-up-to gen.
@@ -243,16 +114,7 @@ class Synchronizer(U1DBSynchronizer):
new_gen, new_trans_id = sync_target.sync_exchange(
docs_by_generation, self.source._replica_uid,
target_last_known_gen, target_last_known_trans_id,
- self._insert_doc_from_target, ensure_callback=ensure_callback,
- sync_state=sync_state)
-
- # save sync state info if the sync was interrupted
- if new_gen is None and new_trans_id is None:
- sync_state.save()
- return my_gen
-
- # sync exchange was succesfull, remove sync state info from source
- sync_state.clear()
+ self._insert_doc_from_target, ensure_callback=ensure_callback)
# record target synced-up-to generation including applying what we sent
self.source._set_replica_gen_and_trans_id(
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
index 93de98d3..8f753f74 100644
--- a/client/src/leap/soledad/client/target.py
+++ b/client/src/leap/soledad/client/target.py
@@ -63,7 +63,6 @@ from leap.soledad.client.events import (
SOLEDAD_SYNC_RECEIVE_STATUS,
signal,
)
-from leap.soledad.client.sync import ClientSyncState
logger = logging.getLogger(__name__)
@@ -321,7 +320,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
HTTPSyncTarget.__init__(self, url, creds)
self._crypto = crypto
self._stopped = True
- self._sync_state = None
self._stop_lock = threading.Lock()
def _init_post_request(self, url, action, headers, content_length):
@@ -347,7 +345,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self._conn.endheaders()
def _get_remote_docs(self, url, last_known_generation, last_known_trans_id,
- headers, return_doc_cb, ensure_callback=None):
+ headers, return_doc_cb, ensure_callback, sync_id):
"""
Fetch sync documents from the remote database and insert them in the
local database.
@@ -377,7 +375,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:rtype: list of str
"""
- def _post_get_doc():
+ def _post_get_doc(received):
"""
Get a sync document from server by means of a POST request.
"""
@@ -388,10 +386,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
'', entries,
last_known_generation=last_known_generation,
last_known_trans_id=last_known_trans_id,
+ sync_id=sync_id,
ensure=ensure_callback is not None)
# inform server of how many documents have already been received
size += self._prepare(
- ',', entries, received=self._sync_state.received)
+ ',', entries, received=received)
entries.append('\r\n]')
size += len(entries[-1])
# send headers
@@ -402,14 +401,16 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
return self._response()
number_of_changes = None
+ received = 0
- while number_of_changes is None or \
- self._sync_state.received < number_of_changes:
+ new_generation = last_known_generation
+ new_transaction_id = last_known_trans_id
+ while number_of_changes is None or received < number_of_changes:
# bail out if sync process was interrupted
if self.stopped is True:
- return None, None
+ return last_known_generation, last_known_trans_id
# try to fetch one document from target
- data, _ = _post_get_doc()
+ data, _ = _post_get_doc(received)
# decode incoming stream
parts = data.splitlines()
if not parts or parts[0] != '[' or parts[-1] != ']':
@@ -424,6 +425,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
soledad_assert('new_generation' in metadata)
soledad_assert('new_transaction_id' in metadata)
number_of_changes = metadata['number_of_changes']
+ new_generation = metadata['new_generation']
+ new_transaction_id = metadata['new_transaction_id']
except json.JSONDecodeError, AssertionError:
raise BrokenSyncStream
# make sure we have replica_uid from fresh new dbs
@@ -453,12 +456,12 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
# end of symmetric decryption
# -------------------------------------------------------------
return_doc_cb(doc, entry['gen'], entry['trans_id'])
- self._sync_state.received += 1
+ received += 1
signal(
SOLEDAD_SYNC_RECEIVE_STATUS,
"%d/%d" %
- (self._sync_state.received, number_of_changes))
- return metadata['new_generation'], metadata['new_transaction_id']
+ (received, number_of_changes))
+ return new_generation, new_transaction_id
def _request(self, method, url_parts, params=None, body=None,
content_type=None):
@@ -566,8 +569,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
def sync_exchange(self, docs_by_generations, source_replica_uid,
last_known_generation, last_known_trans_id,
- return_doc_cb, ensure_callback=None,
- sync_state=None):
+ return_doc_cb, ensure_callback=None):
"""
Find out which documents the remote database does not know about,
encrypt and send them.
@@ -596,12 +598,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:return: The new generation and transaction id of the target replica.
:rtype: tuple
"""
- # get the sync state information from client
- self._sync_state = sync_state
- if self._sync_state is None:
- self._sync_state = ClientSyncState()
-
self.start()
+ sync_id = str(uuid4())
self._ensure_connection()
if self._trace_hook: # for tests
@@ -610,7 +608,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
headers = self._sign_request('POST', url, {})
def _post_put_doc(headers, last_known_generation, last_known_trans_id,
- id, rev, content, gen, trans_id):
+ id, rev, content, gen, trans_id, sync_id):
"""
Put a sync document on server by means of a POST request.
@@ -626,6 +624,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
'', entries,
last_known_generation=last_known_generation,
last_known_trans_id=last_known_trans_id,
+ sync_id=sync_id,
ensure=ensure_callback is not None)
# add the document to the request
size += self._prepare(
@@ -645,11 +644,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
cur_target_gen = last_known_generation
cur_target_trans_id = last_known_trans_id
- # skip docs that were already sent
- if self._sync_state.sent > 0:
- docs_by_generations = docs_by_generations[self._sync_state.sent:]
-
# send docs
+ sent = 0
+ signal(
+ SOLEDAD_SYNC_SEND_STATUS,
+ "%d/%d" % (0, len(docs_by_generations)))
for doc, gen, trans_id in docs_by_generations:
# allow for interrupting the sync process
if self.stopped is True:
@@ -668,17 +667,18 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
# -------------------------------------------------------------
cur_target_gen, cur_target_trans_id = _post_put_doc(
headers, cur_target_gen, cur_target_trans_id, id=doc.doc_id,
- rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id)
- self._sync_state.sent += 1
+ rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id,
+ sync_id=sync_id)
+ sent += 1
signal(
SOLEDAD_SYNC_SEND_STATUS,
- "%d/%d" % (self._sync_state.sent, len(docs_by_generations)))
+ "%d/%d" % (sent, len(docs_by_generations)))
# get docs from target
cur_target_gen, cur_target_trans_id = self._get_remote_docs(
url,
last_known_generation, last_known_trans_id, headers,
- return_doc_cb, ensure_callback)
+ return_doc_cb, ensure_callback, sync_id)
self.stop()
return cur_target_gen, cur_target_trans_id