summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad')
-rw-r--r--client/src/leap/soledad/client/__init__.py10
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py72
-rw-r--r--client/src/leap/soledad/client/sync.py176
-rw-r--r--client/src/leap/soledad/client/target.py60
4 files changed, 79 insertions, 239 deletions
diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py
index 2fb33184..0d3a21fd 100644
--- a/client/src/leap/soledad/client/__init__.py
+++ b/client/src/leap/soledad/client/__init__.py
@@ -34,8 +34,6 @@ import urlparse
import hmac
from hashlib import sha256
-from threading import Lock
-from collections import defaultdict
try:
import cchardet as chardet
@@ -224,12 +222,6 @@ class Soledad(object):
Prefix for default values for path.
"""
- syncing_lock = defaultdict(Lock)
- """
- A dictionary that hold locks which avoid multiple sync attempts from the
- same database replica.
- """
-
def __init__(self, uuid, passphrase, secrets_path, local_db_path,
server_url, cert_file, auth_token=None, secret_id=None):
"""
@@ -1064,8 +1056,6 @@ class Soledad(object):
:rtype: str
"""
if self._db:
- # acquire lock before attempt to sync
- with Soledad.syncing_lock[self._db._get_replica_uid()]:
local_gen = self._db.sync(
urlparse.urljoin(self.server_url, 'user-%s' % self._uuid),
creds=self._creds, autocreate=False)
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index 74351116..5ffa9c7e 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -52,12 +52,13 @@ import json
from hashlib import sha256
from contextlib import contextmanager
+from collections import defaultdict
from pysqlcipher import dbapi2
from u1db.backends import sqlite_backend
from u1db import errors as u1db_errors
-from leap.soledad.client.sync import Synchronizer, ClientSyncState
+from leap.soledad.client.sync import Synchronizer
from leap.soledad.client.target import SoledadSyncTarget
from leap.soledad.common.document import SoledadDocument
@@ -153,6 +154,13 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
create_doc_lock = threading.Lock()
update_indexes_lock = threading.Lock()
+ syncing_lock = defaultdict(threading.Lock)
+ """
+ A dictionary that hold locks which avoid multiple sync attempts from the
+ same database replica.
+ """
+
+
def __init__(self, sqlcipher_file, password, document_factory=None,
crypto=None, raw_key=False, cipher='aes-256-cbc',
kdf_iter=4000, cipher_page_size=1024):
@@ -343,6 +351,10 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
"""
Synchronize documents with remote replica exposed at url.
+ There can be at most one instance syncing the same database replica at
+ the same time, so this method will block until the syncing lock can be
+ acquired.
+
:param url: The url of the target replica to sync with.
:type url: str
:param creds: optional dictionary giving credentials.
@@ -355,6 +367,8 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
:rtype: int
"""
res = None
+ # the following context manager blocks until the syncing lock can be
+ # acquired.
with self.syncer(url, creds=creds) as syncer:
res = syncer.sync(autocreate=autocreate)
return res
@@ -371,10 +385,16 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
def syncer(self, url, creds=None):
"""
Accesor for synchronizer.
+
+ As we reuse the same synchronizer for every sync, there can be only
+ one instance synchronizing the same database replica at the same time.
+ Because of that, this method blocks until the syncing lock can be
+ acquired.
"""
- syncer = self._get_syncer(url, creds=creds)
- yield syncer
- syncer.sync_target.close()
+ with SQLCipherDatabase.syncing_lock[self._get_replica_uid()]:
+ syncer = self._get_syncer(url, creds=creds)
+ yield syncer
+ syncer.sync_target.close()
def _get_syncer(self, url, creds=None):
"""
@@ -401,6 +421,10 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
creds=creds,
crypto=self._crypto))
self._syncers[url] = (h, syncer)
+ # in order to reuse the same synchronizer multiple times we have to
+ # reset its state (i.e. the number of documents received from target
+ # and inserted in the local replica).
+ syncer.num_inserted = 0
return syncer
def _extra_schema_init(self, c):
@@ -889,45 +913,5 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
if self._db_handle is not None:
self._db_handle.close()
- def _get_stored_sync_state(self):
- """
- Retrieve the currently stored sync state.
-
- :return: The current stored sync state or None if there's no stored
- state.
- :rtype: dict or None
- """
- c = self._db_handle.cursor()
- c.execute("SELECT value FROM u1db_config"
- " WHERE name = 'sync_state'")
- val = c.fetchone()
- if val is None:
- return None
- return json.loads(val[0])
-
- def _set_stored_sync_state(self, state):
- """
- Stored the sync state.
-
- :param state: The sync state to be stored or None to delete any stored
- state.
- :type state: dict or None
- """
- c = self._db_handle.cursor()
- if state is None:
- c.execute("DELETE FROM u1db_config"
- " WHERE name = 'sync_state'")
- else:
- c.execute("INSERT OR REPLACE INTO u1db_config"
- " VALUES ('sync_state', ?)",
- (json.dumps(state),))
-
- stored_sync_state = property(
- _get_stored_sync_state, _set_stored_sync_state,
- doc="The current sync state dict.")
-
- @property
- def sync_state(self):
- return ClientSyncState(self)
sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase)
diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py
index 5285d540..56e63416 100644
--- a/client/src/leap/soledad/client/sync.py
+++ b/client/src/leap/soledad/client/sync.py
@@ -27,103 +27,6 @@ from u1db import errors
from u1db.sync import Synchronizer as U1DBSynchronizer
-class ClientSyncState(object):
- """
- The state of the current sync session, as stored on the client.
- """
-
- _private_attrs = [
- '_db',
- ]
-
- _public_attrs = {
- 'target_replica_uid': None,
- 'target_gen': None,
- 'target_trans_id': None,
- 'target_my_gen': None,
- 'target_my_trans_id': None,
- 'target_last_known_gen': None,
- 'target_last_known_trans_id': None,
- 'my_gen': None,
- 'changes': None,
- 'sent': 0,
- 'received': 0,
- }
-
- @property
- def _public_attr_keys(self):
- return self._public_attrs.keys()
-
- def __init__(self, db=None):
- """
- Initialize the client sync state.
-
- :param db: The database where to fetch/store the sync state.
- :type db: SQLCipherDatabase
- """
- self._db = db
- self._init_state()
-
- def __setattr__(self, attr, val):
- """
- Prevent setting arbitrary attributes.
-
- :param attr: The attribute name.
- :type attr: str
- :param val: The value to be set.
- :type val: anything
- """
- if attr not in self._public_attr_keys + self._private_attrs:
- raise Exception
- object.__setattr__(self, attr, val)
-
- def _init_state(self):
- """
- Initialize current sync state, potentially fetching sync info stored
- in database.
- """
- # set local default attributes
- for attr in self._public_attr_keys:
- setattr(self, attr, self._public_attrs[attr])
- # fetch info from stored sync state
- sync_state_dict = None
- if self._db is not None:
- sync_state_dict = self._db.stored_sync_state
- if sync_state_dict is not None:
- for attr in self._public_attr_keys:
- setattr(self, attr, sync_state_dict[attr])
-
- def save(self):
- """
- Save the current sync state in the database.
- """
- sync_state_dict = {}
- for attr in self._public_attr_keys:
- sync_state_dict[attr] = getattr(self, attr)
- if self._db is not None:
- self._db.stored_sync_state = sync_state_dict
-
- def clear(self):
- """
- Clear the sync state info data.
- """
- if self._db is not None:
- self._db.stored_sync_state = None
- self._init_state()
-
- def has_stored_info(self):
- """
- Return whether there is any sync state info stored on the database.
-
- :return: Whether there's any sync state info store on db.
- :rtype: bool
- """
- return self._db is not None and self._db.stored_sync_state is not None
-
- def __str__(self):
- return 'ClientSyncState: %s' % ', '.join(
- ['%s: %s' % (k, getattr(self, k)) for k in self._public_attr_keys])
-
class Synchronizer(U1DBSynchronizer):
"""
Collect the state around synchronizing 2 U1DB replicas.
@@ -146,36 +49,20 @@ class Synchronizer(U1DBSynchronizer):
"""
sync_target = self.sync_target
- # recover current sync state from source database
- sync_state = self.source.sync_state
- self.target_replica_uid = sync_state.target_replica_uid
- target_gen = sync_state.target_gen
- target_trans_id = sync_state.target_trans_id
- target_my_gen = sync_state.target_my_gen
- target_my_trans_id = sync_state.target_my_trans_id
- target_last_known_gen = sync_state.target_last_known_gen
- target_last_known_trans_id = \
- sync_state.target_last_known_trans_id
- my_gen = sync_state.my_gen
- changes = sync_state.changes
- sent = sync_state.sent
- received = sync_state.received
-
# get target identifier, its current generation,
# and its last-seen database generation for this source
ensure_callback = None
- if not sync_state.has_stored_info():
- try:
- (self.target_replica_uid, target_gen, target_trans_id,
- target_my_gen, target_my_trans_id) = \
- sync_target.get_sync_info(self.source._replica_uid)
- except errors.DatabaseDoesNotExist:
- if not autocreate:
- raise
- # will try to ask sync_exchange() to create the db
- self.target_replica_uid = None
- target_gen, target_trans_id = (0, '')
- target_my_gen, target_my_trans_id = (0, '')
+ try:
+ (self.target_replica_uid, target_gen, target_trans_id,
+ target_my_gen, target_my_trans_id) = \
+ sync_target.get_sync_info(self.source._replica_uid)
+ except errors.DatabaseDoesNotExist:
+ if not autocreate:
+ raise
+ # will try to ask sync_exchange() to create the db
+ self.target_replica_uid = None
+ target_gen, target_trans_id = (0, '')
+ target_my_gen, target_my_trans_id = (0, '')
# make sure we'll have access to target replica uid once it exists
if self.target_replica_uid is None:
@@ -192,17 +79,15 @@ class Synchronizer(U1DBSynchronizer):
target_my_gen, target_my_trans_id)
# what's changed since that generation and this current gen
- if not sync_state.has_stored_info():
- my_gen, _, changes = self.source.whats_changed(target_my_gen)
+ my_gen, _, changes = self.source.whats_changed(target_my_gen)
# get source last-seen database generation for the target
- if not sync_state.has_stored_info():
- if self.target_replica_uid is None:
- target_last_known_gen, target_last_known_trans_id = 0, ''
- else:
- target_last_known_gen, target_last_known_trans_id = \
- self.source._get_replica_gen_and_trans_id(
- self.target_replica_uid)
+ if self.target_replica_uid is None:
+ target_last_known_gen, target_last_known_trans_id = 0, ''
+ else:
+ target_last_known_gen, target_last_known_trans_id = \
+ self.source._get_replica_gen_and_trans_id(
+ self.target_replica_uid)
# validate transaction ids
if not changes and target_last_known_gen == target_gen:
@@ -220,20 +105,6 @@ class Synchronizer(U1DBSynchronizer):
_, gen, trans = changes[idx]
docs_by_generation.append((doc, gen, trans))
idx += 1
- # store current sync state info
- if not sync_state.has_stored_info():
- sync_state.target_replica_uid = self.target_replica_uid
- sync_state.target_gen = target_gen
- sync_state.target_trans_id = target_trans_id
- sync_state.target_my_gen = target_my_gen
- sync_state.target_my_trans_id = target_my_trans_id
- sync_state.my_gen = my_gen
- sync_state.changes = changes
- sync_state.target_last_known_trans_id = \
- target_last_known_trans_id
- sync_state.target_last_known_gen = target_last_known_gen
- sync_state.sent = sent = 0
- sync_state.received = received = 0
# exchange documents and try to insert the returned ones with
# the target, return target synced-up-to gen.
@@ -243,16 +114,7 @@ class Synchronizer(U1DBSynchronizer):
new_gen, new_trans_id = sync_target.sync_exchange(
docs_by_generation, self.source._replica_uid,
target_last_known_gen, target_last_known_trans_id,
- self._insert_doc_from_target, ensure_callback=ensure_callback,
- sync_state=sync_state)
-
- # save sync state info if the sync was interrupted
- if new_gen is None and new_trans_id is None:
- sync_state.save()
- return my_gen
-
- # sync exchange was succesfull, remove sync state info from source
- sync_state.clear()
+ self._insert_doc_from_target, ensure_callback=ensure_callback)
# record target synced-up-to generation including applying what we sent
self.source._set_replica_gen_and_trans_id(
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
index 93de98d3..968545b6 100644
--- a/client/src/leap/soledad/client/target.py
+++ b/client/src/leap/soledad/client/target.py
@@ -63,7 +63,6 @@ from leap.soledad.client.events import (
SOLEDAD_SYNC_RECEIVE_STATUS,
signal,
)
-from leap.soledad.client.sync import ClientSyncState
logger = logging.getLogger(__name__)
@@ -321,7 +320,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
HTTPSyncTarget.__init__(self, url, creds)
self._crypto = crypto
self._stopped = True
- self._sync_state = None
self._stop_lock = threading.Lock()
def _init_post_request(self, url, action, headers, content_length):
@@ -347,7 +345,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self._conn.endheaders()
def _get_remote_docs(self, url, last_known_generation, last_known_trans_id,
- headers, return_doc_cb, ensure_callback=None):
+ headers, return_doc_cb, ensure_callback, sync_id):
"""
Fetch sync documents from the remote database and insert them in the
local database.
@@ -377,9 +375,13 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:rtype: list of str
"""
- def _post_get_doc():
+ def _post_get_doc(received):
"""
Get a sync document from server by means of a POST request.
+
+ :param received: The number of documents already received in the
+ current sync session.
+ :type received: int
"""
entries = ['[']
size = 1
@@ -388,10 +390,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
'', entries,
last_known_generation=last_known_generation,
last_known_trans_id=last_known_trans_id,
+ sync_id=sync_id,
ensure=ensure_callback is not None)
# inform server of how many documents have already been received
size += self._prepare(
- ',', entries, received=self._sync_state.received)
+ ',', entries, received=received)
entries.append('\r\n]')
size += len(entries[-1])
# send headers
@@ -402,14 +405,16 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
return self._response()
number_of_changes = None
+ received = 0
- while number_of_changes is None or \
- self._sync_state.received < number_of_changes:
+ new_generation = last_known_generation
+ new_transaction_id = last_known_trans_id
+ while number_of_changes is None or received < number_of_changes:
# bail out if sync process was interrupted
if self.stopped is True:
- return None, None
+ return last_known_generation, last_known_trans_id
# try to fetch one document from target
- data, _ = _post_get_doc()
+ data, _ = _post_get_doc(received)
# decode incoming stream
parts = data.splitlines()
if not parts or parts[0] != '[' or parts[-1] != ']':
@@ -424,6 +429,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
soledad_assert('new_generation' in metadata)
soledad_assert('new_transaction_id' in metadata)
number_of_changes = metadata['number_of_changes']
+ new_generation = metadata['new_generation']
+ new_transaction_id = metadata['new_transaction_id']
except json.JSONDecodeError, AssertionError:
raise BrokenSyncStream
# make sure we have replica_uid from fresh new dbs
@@ -453,12 +460,12 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
# end of symmetric decryption
# -------------------------------------------------------------
return_doc_cb(doc, entry['gen'], entry['trans_id'])
- self._sync_state.received += 1
+ received += 1
signal(
SOLEDAD_SYNC_RECEIVE_STATUS,
"%d/%d" %
- (self._sync_state.received, number_of_changes))
- return metadata['new_generation'], metadata['new_transaction_id']
+ (received, number_of_changes))
+ return new_generation, new_transaction_id
def _request(self, method, url_parts, params=None, body=None,
content_type=None):
@@ -566,8 +573,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
def sync_exchange(self, docs_by_generations, source_replica_uid,
last_known_generation, last_known_trans_id,
- return_doc_cb, ensure_callback=None,
- sync_state=None):
+ return_doc_cb, ensure_callback=None):
"""
Find out which documents the remote database does not know about,
encrypt and send them.
@@ -596,12 +602,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:return: The new generation and transaction id of the target replica.
:rtype: tuple
"""
- # get the sync state information from client
- self._sync_state = sync_state
- if self._sync_state is None:
- self._sync_state = ClientSyncState()
-
self.start()
+ sync_id = str(uuid4())
self._ensure_connection()
if self._trace_hook: # for tests
@@ -610,7 +612,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
headers = self._sign_request('POST', url, {})
def _post_put_doc(headers, last_known_generation, last_known_trans_id,
- id, rev, content, gen, trans_id):
+ id, rev, content, gen, trans_id, sync_id):
"""
Put a sync document on server by means of a POST request.
@@ -626,6 +628,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
'', entries,
last_known_generation=last_known_generation,
last_known_trans_id=last_known_trans_id,
+ sync_id=sync_id,
ensure=ensure_callback is not None)
# add the document to the request
size += self._prepare(
@@ -645,11 +648,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
cur_target_gen = last_known_generation
cur_target_trans_id = last_known_trans_id
- # skip docs that were already sent
- if self._sync_state.sent > 0:
- docs_by_generations = docs_by_generations[self._sync_state.sent:]
-
# send docs
+ sent = 0
+ signal(
+ SOLEDAD_SYNC_SEND_STATUS,
+ "%d/%d" % (0, len(docs_by_generations)))
for doc, gen, trans_id in docs_by_generations:
# allow for interrupting the sync process
if self.stopped is True:
@@ -668,17 +671,18 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
# -------------------------------------------------------------
cur_target_gen, cur_target_trans_id = _post_put_doc(
headers, cur_target_gen, cur_target_trans_id, id=doc.doc_id,
- rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id)
- self._sync_state.sent += 1
+ rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id,
+ sync_id=sync_id)
+ sent += 1
signal(
SOLEDAD_SYNC_SEND_STATUS,
- "%d/%d" % (self._sync_state.sent, len(docs_by_generations)))
+ "%d/%d" % (sent, len(docs_by_generations)))
# get docs from target
cur_target_gen, cur_target_trans_id = self._get_remote_docs(
url,
last_known_generation, last_known_trans_id, headers,
- return_doc_cb, ensure_callback)
+ return_doc_cb, ensure_callback, sync_id)
self.stop()
return cur_target_gen, cur_target_trans_id