diff options
Diffstat (limited to 'client')
-rw-r--r-- | client/changes/feature_5571_add-sync-status-signals | 1 | ||||
-rw-r--r-- | client/changes/feature_5571_allow-for-interrupting-and-recovering-sync | 1 | ||||
-rw-r--r-- | client/changes/feature_5571_split-sync-post | 1 | ||||
-rw-r--r-- | client/src/leap/soledad/client/__init__.py | 76 | ||||
-rw-r--r-- | client/src/leap/soledad/client/events.py | 58 | ||||
-rw-r--r-- | client/src/leap/soledad/client/shared_db.py | 4 | ||||
-rw-r--r-- | client/src/leap/soledad/client/sqlcipher.py | 54 | ||||
-rw-r--r-- | client/src/leap/soledad/client/sync.py | 263 | ||||
-rw-r--r-- | client/src/leap/soledad/client/target.py | 365 |
9 files changed, 676 insertions, 147 deletions
diff --git a/client/changes/feature_5571_add-sync-status-signals b/client/changes/feature_5571_add-sync-status-signals new file mode 100644 index 00000000..67bc7d9f --- /dev/null +++ b/client/changes/feature_5571_add-sync-status-signals @@ -0,0 +1 @@ + o Add sync status signals (#5517). diff --git a/client/changes/feature_5571_allow-for-interrupting-and-recovering-sync b/client/changes/feature_5571_allow-for-interrupting-and-recovering-sync new file mode 100644 index 00000000..0087c535 --- /dev/null +++ b/client/changes/feature_5571_allow-for-interrupting-and-recovering-sync @@ -0,0 +1 @@ + o Allow for interrupting and recovering sync (#5517). diff --git a/client/changes/feature_5571_split-sync-post b/client/changes/feature_5571_split-sync-post new file mode 100644 index 00000000..0d7b14dd --- /dev/null +++ b/client/changes/feature_5571_split-sync-post @@ -0,0 +1 @@ + o Split sync in multiple POST requests in client (#5571). diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py index 46e3cd5f..2fb33184 100644 --- a/client/src/leap/soledad/client/__init__.py +++ b/client/src/leap/soledad/client/__init__.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # __init__.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013, 2014 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -49,7 +49,11 @@ import scrypt import simplejson as json from leap.common.config import get_path_prefix -from leap.soledad.common import SHARED_DB_NAME +from leap.soledad.common import ( + SHARED_DB_NAME, + soledad_assert, + soledad_assert_type +) from leap.soledad.common.errors import ( InvalidTokenError, NotLockedError, @@ -63,45 +67,17 @@ from leap.soledad.common.crypto import ( MAC_KEY, MAC_METHOD_KEY, ) - -# -# Signaling function -# - -SOLEDAD_CREATING_KEYS = 'Creating keys...' -SOLEDAD_DONE_CREATING_KEYS = 'Done creating keys.' -SOLEDAD_DOWNLOADING_KEYS = 'Downloading keys...' -SOLEDAD_DONE_DOWNLOADING_KEYS = 'Done downloading keys.' -SOLEDAD_UPLOADING_KEYS = 'Uploading keys...' -SOLEDAD_DONE_UPLOADING_KEYS = 'Done uploading keys.' -SOLEDAD_NEW_DATA_TO_SYNC = 'New data available.' -SOLEDAD_DONE_DATA_SYNC = 'Done data sync.' - -# we want to use leap.common.events to emits signals, if it is available. -try: - from leap.common import events - from leap.common.events import signal - SOLEDAD_CREATING_KEYS = events.events_pb2.SOLEDAD_CREATING_KEYS - SOLEDAD_DONE_CREATING_KEYS = events.events_pb2.SOLEDAD_DONE_CREATING_KEYS - SOLEDAD_DOWNLOADING_KEYS = events.events_pb2.SOLEDAD_DOWNLOADING_KEYS - SOLEDAD_DONE_DOWNLOADING_KEYS = \ - events.events_pb2.SOLEDAD_DONE_DOWNLOADING_KEYS - SOLEDAD_UPLOADING_KEYS = events.events_pb2.SOLEDAD_UPLOADING_KEYS - SOLEDAD_DONE_UPLOADING_KEYS = \ - events.events_pb2.SOLEDAD_DONE_UPLOADING_KEYS - SOLEDAD_NEW_DATA_TO_SYNC = events.events_pb2.SOLEDAD_NEW_DATA_TO_SYNC - SOLEDAD_DONE_DATA_SYNC = events.events_pb2.SOLEDAD_DONE_DATA_SYNC - -except ImportError: - # we define a fake signaling function and fake signal constants that will - # allow for logging signaling attempts in case leap.common.events is not - # available. - - def signal(signal, content=""): - logger.info("Would signal: %s - %s." % (str(signal), content)) - - -from leap.soledad.common import soledad_assert, soledad_assert_type +from leap.soledad.client.events import ( + SOLEDAD_CREATING_KEYS, + SOLEDAD_DONE_CREATING_KEYS, + SOLEDAD_DOWNLOADING_KEYS, + SOLEDAD_DONE_DOWNLOADING_KEYS, + SOLEDAD_UPLOADING_KEYS, + SOLEDAD_DONE_UPLOADING_KEYS, + SOLEDAD_NEW_DATA_TO_SYNC, + SOLEDAD_DONE_DATA_SYNC, + signal, +) from leap.soledad.common.document import SoledadDocument from leap.soledad.client.crypto import SoledadCrypto from leap.soledad.client.shared_db import SoledadSharedDatabase @@ -720,8 +696,9 @@ class Soledad(object): :return: the hash :rtype: str """ - return sha256('%s%s' % - (self._passphrase_as_string(), self.uuid)).hexdigest() + return sha256( + '%s%s' % + (self._passphrase_as_string(), self.uuid)).hexdigest() def _get_secrets_from_shared_db(self): """ @@ -1095,6 +1072,13 @@ class Soledad(object): signal(SOLEDAD_DONE_DATA_SYNC, self._uuid) return local_gen + def stop_sync(self): + """ + Stop the current syncing process. + """ + if self._db: + self._db.stop_sync() + def need_sync(self, url): """ Return if local db replica differs from remote url's replica. @@ -1211,7 +1195,6 @@ class Soledad(object): """ soledad_assert(self.STORAGE_SECRETS_KEY in data) # check mac of the recovery document - #mac_auth = False # XXX ? mac = None if MAC_KEY in data: soledad_assert(data[MAC_KEY] is not None) @@ -1234,7 +1217,6 @@ class Soledad(object): if mac != data[MAC_KEY]: raise WrongMac('Could not authenticate recovery document\'s ' 'contents.') - #mac_auth = True # XXX ? # include secrets in the secret pool. secrets = 0 for secret_id, secret_data in data[self.STORAGE_SECRETS_KEY].items(): @@ -1296,9 +1278,9 @@ class Soledad(object): return self._passphrase.encode('utf-8') -#----------------------------------------------------------------------------- +# ---------------------------------------------------------------------------- # Monkey patching u1db to be able to provide a custom SSL cert -#----------------------------------------------------------------------------- +# ---------------------------------------------------------------------------- # We need a more reasonable timeout (in seconds) SOLEDAD_TIMEOUT = 120 diff --git a/client/src/leap/soledad/client/events.py b/client/src/leap/soledad/client/events.py new file mode 100644 index 00000000..c4c09ac5 --- /dev/null +++ b/client/src/leap/soledad/client/events.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- +# signal.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Signaling functions. +""" + + +SOLEDAD_CREATING_KEYS = 'Creating keys...' +SOLEDAD_DONE_CREATING_KEYS = 'Done creating keys.' +SOLEDAD_DOWNLOADING_KEYS = 'Downloading keys...' +SOLEDAD_DONE_DOWNLOADING_KEYS = 'Done downloading keys.' +SOLEDAD_UPLOADING_KEYS = 'Uploading keys...' +SOLEDAD_DONE_UPLOADING_KEYS = 'Done uploading keys.' +SOLEDAD_NEW_DATA_TO_SYNC = 'New data available.' +SOLEDAD_DONE_DATA_SYNC = 'Done data sync.' +SOLEDAD_SYNC_SEND_STATUS = 'Sync: sent one document.' +SOLEDAD_SYNC_RECEIVE_STATUS = 'Sync: received one document.' + +# we want to use leap.common.events to emits signals, if it is available. +try: + from leap.common import events + from leap.common.events import signal + SOLEDAD_CREATING_KEYS = events.proto.SOLEDAD_CREATING_KEYS + SOLEDAD_DONE_CREATING_KEYS = events.proto.SOLEDAD_DONE_CREATING_KEYS + SOLEDAD_DOWNLOADING_KEYS = events.proto.SOLEDAD_DOWNLOADING_KEYS + SOLEDAD_DONE_DOWNLOADING_KEYS = \ + events.proto.SOLEDAD_DONE_DOWNLOADING_KEYS + SOLEDAD_UPLOADING_KEYS = events.proto.SOLEDAD_UPLOADING_KEYS + SOLEDAD_DONE_UPLOADING_KEYS = \ + events.proto.SOLEDAD_DONE_UPLOADING_KEYS + SOLEDAD_NEW_DATA_TO_SYNC = events.proto.SOLEDAD_NEW_DATA_TO_SYNC + SOLEDAD_DONE_DATA_SYNC = events.proto.SOLEDAD_DONE_DATA_SYNC + SOLEDAD_SYNC_SEND_STATUS = events.proto.SOLEDAD_SYNC_SEND_STATUS + SOLEDAD_SYNC_RECEIVE_STATUS = events.proto.SOLEDAD_SYNC_RECEIVE_STATUS + +except ImportError: + # we define a fake signaling function and fake signal constants that will + # allow for logging signaling attempts in case leap.common.events is not + # available. + + def signal(signal, content=""): + logger.info("Would signal: %s - %s." % (str(signal), content)) diff --git a/client/src/leap/soledad/client/shared_db.py b/client/src/leap/soledad/client/shared_db.py index 0753cbb5..52e51c6f 100644 --- a/client/src/leap/soledad/client/shared_db.py +++ b/client/src/leap/soledad/client/shared_db.py @@ -30,9 +30,9 @@ from leap.soledad.common import SHARED_DB_LOCK_DOC_ID_PREFIX from leap.soledad.client.auth import TokenBasedAuth -#----------------------------------------------------------------------------- +# ---------------------------------------------------------------------------- # Soledad shared database -#----------------------------------------------------------------------------- +# ---------------------------------------------------------------------------- class NoTokenForAuth(Exception): """ diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 04f8ebf9..74351116 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -55,14 +55,14 @@ from contextlib import contextmanager from pysqlcipher import dbapi2 from u1db.backends import sqlite_backend -from u1db.sync import Synchronizer from u1db import errors as u1db_errors +from leap.soledad.client.sync import Synchronizer, ClientSyncState from leap.soledad.client.target import SoledadSyncTarget from leap.soledad.common.document import SoledadDocument -logger = logging.getLogger(__name__) +logger = logging.getLogger(__name__) # Monkey-patch u1db.backends.sqlite_backend with pysqlcipher.dbapi2 sqlite_backend.dbapi2 = dbapi2 @@ -359,6 +359,14 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): res = syncer.sync(autocreate=autocreate) return res + def stop_sync(self): + """ + Interrupt all ongoing syncs. + """ + for url in self._syncers: + _, syncer = self._syncers[url] + syncer.stop() + @contextmanager def syncer(self, url, creds=None): """ @@ -379,7 +387,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): :type creds: dict :return: A synchronizer. - :rtype: u1db.sync.Synchronizer + :rtype: Synchronizer """ # we want to store at most one syncer for each url, so we also store a # hash of the connection credentials and replace the stored syncer for @@ -881,5 +889,45 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): if self._db_handle is not None: self._db_handle.close() + def _get_stored_sync_state(self): + """ + Retrieve the currently stored sync state. + + :return: The current stored sync state or None if there's no stored + state. + :rtype: dict or None + """ + c = self._db_handle.cursor() + c.execute("SELECT value FROM u1db_config" + " WHERE name = 'sync_state'") + val = c.fetchone() + if val is None: + return None + return json.loads(val[0]) + + def _set_stored_sync_state(self, state): + """ + Stored the sync state. + + :param state: The sync state to be stored or None to delete any stored + state. + :type state: dict or None + """ + c = self._db_handle.cursor() + if state is None: + c.execute("DELETE FROM u1db_config" + " WHERE name = 'sync_state'") + else: + c.execute("INSERT OR REPLACE INTO u1db_config" + " VALUES ('sync_state', ?)", + (json.dumps(state),)) + + stored_sync_state = property( + _get_stored_sync_state, _set_stored_sync_state, + doc="The current sync state dict.") + + @property + def sync_state(self): + return ClientSyncState(self) sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase) diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py new file mode 100644 index 00000000..5285d540 --- /dev/null +++ b/client/src/leap/soledad/client/sync.py @@ -0,0 +1,263 @@ +# -*- coding: utf-8 -*- +# sync.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Sync infrastructure that can be interrupted and recovered. +""" + +import json + + +from u1db import errors +from u1db.sync import Synchronizer as U1DBSynchronizer + + +class ClientSyncState(object): + """ + The state of the current sync session, as stored on the client. + """ + + _private_attrs = [ + '_db', + ] + + _public_attrs = { + 'target_replica_uid': None, + 'target_gen': None, + 'target_trans_id': None, + 'target_my_gen': None, + 'target_my_trans_id': None, + 'target_last_known_gen': None, + 'target_last_known_trans_id': None, + 'my_gen': None, + 'changes': None, + 'sent': 0, + 'received': 0, + } + + @property + def _public_attr_keys(self): + return self._public_attrs.keys() + + def __init__(self, db=None): + """ + Initialize the client sync state. + + :param db: The database where to fetch/store the sync state. + :type db: SQLCipherDatabase + """ + self._db = db + self._init_state() + + def __setattr__(self, attr, val): + """ + Prevent setting arbitrary attributes. + + :param attr: The attribute name. + :type attr: str + :param val: The value to be set. + :type val: anything + """ + if attr not in self._public_attr_keys + self._private_attrs: + raise Exception + object.__setattr__(self, attr, val) + + def _init_state(self): + """ + Initialize current sync state, potentially fetching sync info stored + in database. + """ + # set local default attributes + for attr in self._public_attr_keys: + setattr(self, attr, self._public_attrs[attr]) + # fetch info from stored sync state + sync_state_dict = None + if self._db is not None: + sync_state_dict = self._db.stored_sync_state + if sync_state_dict is not None: + for attr in self._public_attr_keys: + setattr(self, attr, sync_state_dict[attr]) + + def save(self): + """ + Save the current sync state in the database. + """ + sync_state_dict = {} + for attr in self._public_attr_keys: + sync_state_dict[attr] = getattr(self, attr) + if self._db is not None: + self._db.stored_sync_state = sync_state_dict + + def clear(self): + """ + Clear the sync state info data. + """ + if self._db is not None: + self._db.stored_sync_state = None + self._init_state() + + def has_stored_info(self): + """ + Return whether there is any sync state info stored on the database. + + :return: Whether there's any sync state info store on db. + :rtype: bool + """ + return self._db is not None and self._db.stored_sync_state is not None + + def __str__(self): + return 'ClientSyncState: %s' % ', '.join( + ['%s: %s' % (k, getattr(self, k)) for k in self._public_attr_keys]) + +class Synchronizer(U1DBSynchronizer): + """ + Collect the state around synchronizing 2 U1DB replicas. + + Modified to allow for interrupting the synchronization process. + """ + + def stop(self): + """ + Stop the current sync in progress. + """ + self.sync_target.stop() + + def sync(self, autocreate=False): + """ + Synchronize documents between source and target. + + :param autocreate: Whether the target replica should be created or not. + :type autocreate: bool + """ + sync_target = self.sync_target + + # recover current sync state from source database + sync_state = self.source.sync_state + self.target_replica_uid = sync_state.target_replica_uid + target_gen = sync_state.target_gen + target_trans_id = sync_state.target_trans_id + target_my_gen = sync_state.target_my_gen + target_my_trans_id = sync_state.target_my_trans_id + target_last_known_gen = sync_state.target_last_known_gen + target_last_known_trans_id = \ + sync_state.target_last_known_trans_id + my_gen = sync_state.my_gen + changes = sync_state.changes + sent = sync_state.sent + received = sync_state.received + + # get target identifier, its current generation, + # and its last-seen database generation for this source + ensure_callback = None + if not sync_state.has_stored_info(): + try: + (self.target_replica_uid, target_gen, target_trans_id, + target_my_gen, target_my_trans_id) = \ + sync_target.get_sync_info(self.source._replica_uid) + except errors.DatabaseDoesNotExist: + if not autocreate: + raise + # will try to ask sync_exchange() to create the db + self.target_replica_uid = None + target_gen, target_trans_id = (0, '') + target_my_gen, target_my_trans_id = (0, '') + + # make sure we'll have access to target replica uid once it exists + if self.target_replica_uid is None: + + def ensure_callback(replica_uid): + self.target_replica_uid = replica_uid + + # make sure we're not syncing one replica with itself + if self.target_replica_uid == self.source._replica_uid: + raise errors.InvalidReplicaUID + + # validate the info the target has about the source replica + self.source.validate_gen_and_trans_id( + target_my_gen, target_my_trans_id) + + # what's changed since that generation and this current gen + if not sync_state.has_stored_info(): + my_gen, _, changes = self.source.whats_changed(target_my_gen) + + # get source last-seen database generation for the target + if not sync_state.has_stored_info(): + if self.target_replica_uid is None: + target_last_known_gen, target_last_known_trans_id = 0, '' + else: + target_last_known_gen, target_last_known_trans_id = \ + self.source._get_replica_gen_and_trans_id( + self.target_replica_uid) + + # validate transaction ids + if not changes and target_last_known_gen == target_gen: + if target_trans_id != target_last_known_trans_id: + raise errors.InvalidTransactionId + return my_gen + + # prepare to send all the changed docs + changed_doc_ids = [doc_id for doc_id, _, _ in changes] + docs_to_send = self.source.get_docs( + changed_doc_ids, check_for_conflicts=False, include_deleted=True) + docs_by_generation = [] + idx = 0 + for doc in docs_to_send: + _, gen, trans = changes[idx] + docs_by_generation.append((doc, gen, trans)) + idx += 1 + # store current sync state info + if not sync_state.has_stored_info(): + sync_state.target_replica_uid = self.target_replica_uid + sync_state.target_gen = target_gen + sync_state.target_trans_id = target_trans_id + sync_state.target_my_gen = target_my_gen + sync_state.target_my_trans_id = target_my_trans_id + sync_state.my_gen = my_gen + sync_state.changes = changes + sync_state.target_last_known_trans_id = \ + target_last_known_trans_id + sync_state.target_last_known_gen = target_last_known_gen + sync_state.sent = sent = 0 + sync_state.received = received = 0 + + # exchange documents and try to insert the returned ones with + # the target, return target synced-up-to gen. + # + # The sync_exchange method may be interrupted, in which case it will + # return a tuple of Nones. + new_gen, new_trans_id = sync_target.sync_exchange( + docs_by_generation, self.source._replica_uid, + target_last_known_gen, target_last_known_trans_id, + self._insert_doc_from_target, ensure_callback=ensure_callback, + sync_state=sync_state) + + # save sync state info if the sync was interrupted + if new_gen is None and new_trans_id is None: + sync_state.save() + return my_gen + + # sync exchange was succesfull, remove sync state info from source + sync_state.clear() + + # record target synced-up-to generation including applying what we sent + self.source._set_replica_gen_and_trans_id( + self.target_replica_uid, new_gen, new_trans_id) + # if gapless record current reached generation with target + self._record_sync_info_with_the_target(my_gen) + + return my_gen diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 3b3d6870..93de98d3 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # target.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013, 2014 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -25,9 +25,11 @@ import hashlib import hmac import logging import urllib +import threading import simplejson as json from time import sleep +from uuid import uuid4 from u1db.remote import utils, http_errors from u1db.errors import BrokenSyncStream @@ -56,6 +58,12 @@ from leap.soledad.client.crypto import ( EncryptionMethods, UnknownEncryptionMethod, ) +from leap.soledad.client.events import ( + SOLEDAD_SYNC_SEND_STATUS, + SOLEDAD_SYNC_RECEIVE_STATUS, + signal, +) +from leap.soledad.client.sync import ClientSyncState logger = logging.getLogger(__name__) @@ -149,10 +157,12 @@ def encrypt_doc(crypto, doc): ENC_SCHEME_KEY: EncryptionSchemes.SYMKEY, ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR, ENC_IV_KEY: iv, - MAC_KEY: binascii.b2a_hex(mac_doc( # store the mac as hex. - crypto, doc.doc_id, doc.rev, - ciphertext, - MacMethods.HMAC)), + # store the mac as hex. + MAC_KEY: binascii.b2a_hex( + mac_doc( + crypto, doc.doc_id, doc.rev, + ciphertext, + MacMethods.HMAC)), MAC_METHOD_KEY: MacMethods.HMAC, }) @@ -310,23 +320,55 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ HTTPSyncTarget.__init__(self, url, creds) self._crypto = crypto + self._stopped = True + self._sync_state = None + self._stop_lock = threading.Lock() + + def _init_post_request(self, url, action, headers, content_length): + """ + Initiate a syncing POST request. + + :param url: The syncing URL. + :type url: str + :param action: The syncing action, either 'get' or 'receive'. + :type action: str + :param headers: The initial headers to be sent on this request. + :type headers: dict + :param content_length: The content-length of the request. + :type content_length: int + """ + self._conn.putrequest('POST', url) + self._conn.putheader( + 'content-type', 'application/x-soledad-sync-%s' % action) + for header_name, header_value in headers: + self._conn.putheader(header_name, header_value) + self._conn.putheader('accept-encoding', 'gzip') + self._conn.putheader('content-length', str(content_length)) + self._conn.endheaders() - def _parse_sync_stream(self, data, return_doc_cb, ensure_callback=None): + def _get_remote_docs(self, url, last_known_generation, last_known_trans_id, + headers, return_doc_cb, ensure_callback=None): """ - Parse incoming synchronization stream and insert documents in the + Fetch sync documents from the remote database and insert them in the local database. If an incoming document's encryption scheme is equal to EncryptionSchemes.SYMKEY, then this method will decrypt it with Soledad's symmetric key. - :param data: The body of the HTTP response. - :type data: str + :param url: The syncing URL. + :type url: str + :param last_known_generation: Target's last known generation. + :type last_known_generation: int + :param last_known_trans_id: Target's last known transaction id. + :type last_known_trans_id: str + :param headers: The headers of the HTTP request. + :type headers: dict :param return_doc_cb: A callback to insert docs from target. - :type return_doc_cb: function + :type return_doc_cb: callable :param ensure_callback: A callback to ensure we have the correct - target_replica_uid, if it was just created. - :type ensure_callback: function + target_replica_uid, if it was just created. + :type ensure_callback: callable :raise BrokenSyncStream: If C{data} is malformed. @@ -334,54 +376,105 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): from remote replica. :rtype: list of str """ - parts = data.splitlines() # one at a time - if not parts or parts[0] != '[': - raise BrokenSyncStream - data = parts[1:-1] - comma = False - if data: + + def _post_get_doc(): + """ + Get a sync document from server by means of a POST request. + """ + entries = ['['] + size = 1 + # add remote replica metadata to the request + size += self._prepare( + '', entries, + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + ensure=ensure_callback is not None) + # inform server of how many documents have already been received + size += self._prepare( + ',', entries, received=self._sync_state.received) + entries.append('\r\n]') + size += len(entries[-1]) + # send headers + self._init_post_request(url, 'get', headers, size) + # get document + for entry in entries: + self._conn.send(entry) + return self._response() + + number_of_changes = None + + while number_of_changes is None or \ + self._sync_state.received < number_of_changes: + # bail out if sync process was interrupted + if self.stopped is True: + return None, None + # try to fetch one document from target + data, _ = _post_get_doc() + # decode incoming stream + parts = data.splitlines() + if not parts or parts[0] != '[' or parts[-1] != ']': + raise BrokenSyncStream + data = parts[1:-1] + # decode metadata line, comma = utils.check_and_strip_comma(data[0]) - res = json.loads(line) - if ensure_callback and 'replica_uid' in res: - ensure_callback(res['replica_uid']) - for entry in data[1:]: - if not comma: # missing in between comma - raise BrokenSyncStream - line, comma = utils.check_and_strip_comma(entry) - entry = json.loads(line) - #------------------------------------------------------------- - # symmetric decryption of document's contents - #------------------------------------------------------------- - # if arriving content was symmetrically encrypted, we decrypt - # it. - doc = SoledadDocument( - entry['id'], entry['rev'], entry['content']) - if doc.content and ENC_SCHEME_KEY in doc.content: - if doc.content[ENC_SCHEME_KEY] == \ - EncryptionSchemes.SYMKEY: - doc.set_json(decrypt_doc(self._crypto, doc)) - #------------------------------------------------------------- - # end of symmetric decryption - #------------------------------------------------------------- - return_doc_cb(doc, entry['gen'], entry['trans_id']) - if parts[-1] != ']': + metadata = None try: - partdic = json.loads(parts[-1]) - except ValueError: - pass - else: - if isinstance(partdic, dict): - self._error(partdic) - raise BrokenSyncStream - if not data or comma: # no entries or bad extra comma - raise BrokenSyncStream - return res + metadata = json.loads(line) + soledad_assert('number_of_changes' in metadata) + soledad_assert('new_generation' in metadata) + soledad_assert('new_transaction_id' in metadata) + number_of_changes = metadata['number_of_changes'] + except json.JSONDecodeError, AssertionError: + raise BrokenSyncStream + # make sure we have replica_uid from fresh new dbs + if ensure_callback and 'replica_uid' in metadata: + ensure_callback(metadata['replica_uid']) + # bail out if there are no documents to be received + if number_of_changes == 0: + break + # decrypt incoming document and insert into local database + entry = None + try: + entry = json.loads(data[1]) + except IndexError: + raise BrokenSyncStream + # ------------------------------------------------------------- + # symmetric decryption of document's contents + # ------------------------------------------------------------- + # if arriving content was symmetrically encrypted, we decrypt + # it. + doc = SoledadDocument( + entry['id'], entry['rev'], entry['content']) + if doc.content and ENC_SCHEME_KEY in doc.content: + if doc.content[ENC_SCHEME_KEY] == \ + EncryptionSchemes.SYMKEY: + doc.set_json(decrypt_doc(self._crypto, doc)) + # ------------------------------------------------------------- + # end of symmetric decryption + # ------------------------------------------------------------- + return_doc_cb(doc, entry['gen'], entry['trans_id']) + self._sync_state.received += 1 + signal( + SOLEDAD_SYNC_RECEIVE_STATUS, + "%d/%d" % + (self._sync_state.received, number_of_changes)) + return metadata['new_generation'], metadata['new_transaction_id'] def _request(self, method, url_parts, params=None, body=None, content_type=None): """ - Overloaded method. See u1db docs. - Patched for adding gzip encoding. + Perform an HTTP request. + + :param method: The HTTP request method. + :type method: str + :param url_parts: A list representing the request path. + :type url_parts: list + :param params: Parameters for the URL query string. + :type params: dict + :param body: The body of the request. + :type body: str + :param content-type: The content-type of the request. + :type content-type: str """ self._ensure_connection() @@ -425,8 +518,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): def _response(self): """ - Overloaded method, see u1db docs. - We patched it for decrypting gzip content. + Return the response of the (possibly gzipped) HTTP request. + + :return: The body and headers of the response. + :rtype: tuple """ resp = self._conn.getresponse() body = resp.read() @@ -453,9 +548,26 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): raise errors.Unavailable(body, headers) raise errors.HTTPError(resp.status, body, headers) + def _prepare(self, comma, entries, **dic): + """ + Prepare an entry to be sent through a syncing POST request. + + :param comma: A string to be prepended to the current entry. + :type comma: str + :param entries: A list of entries accumulated to be sent on the + request. + :type entries: list + :param dic: The data to be included in this entry. + :type dic: dict + """ + entry = comma + '\r\n' + json.dumps(dic) + entries.append(entry) + return len(entry) + def sync_exchange(self, docs_by_generations, source_replica_uid, last_known_generation, last_known_trans_id, - return_doc_cb, ensure_callback=None): + return_doc_cb, ensure_callback=None, + sync_state=None): """ Find out which documents the remote database does not know about, encrypt and send them. @@ -484,54 +596,117 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :return: The new generation and transaction id of the target replica. :rtype: tuple """ + # get the sync state information from client + self._sync_state = sync_state + if self._sync_state is None: + self._sync_state = ClientSyncState() + + self.start() + self._ensure_connection() if self._trace_hook: # for tests self._trace_hook('sync_exchange') url = '%s/sync-from/%s' % (self._url.path, source_replica_uid) - self._conn.putrequest('POST', url) - self._conn.putheader('content-type', 'application/x-u1db-sync-stream') - for header_name, header_value in self._sign_request('POST', url, {}): - self._conn.putheader(header_name, header_value) - self._conn.putheader('accept-encoding', 'gzip') - entries = ['['] - size = 1 - - def prepare(**dic): - entry = comma + '\r\n' + json.dumps(dic) - entries.append(entry) - return len(entry) - - comma = '' - size += prepare( - last_known_generation=last_known_generation, - last_known_trans_id=last_known_trans_id, - ensure=ensure_callback is not None) - comma = ',' + headers = self._sign_request('POST', url, {}) + + def _post_put_doc(headers, last_known_generation, last_known_trans_id, + id, rev, content, gen, trans_id): + """ + Put a sync document on server by means of a POST request. + + :param received: How many documents have already been received in + this sync session. + :type received: int + """ + # prepare to send the document + entries = ['['] + size = 1 + # add remote replica metadata to the request + size += self._prepare( + '', entries, + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + ensure=ensure_callback is not None) + # add the document to the request + size += self._prepare( + ',', entries, + id=id, rev=rev, content=content, gen=gen, trans_id=trans_id) + entries.append('\r\n]') + size += len(entries[-1]) + # send headers + self._init_post_request(url, 'put', headers, size) + # send document + for entry in entries: + self._conn.send(entry) + data, _ = self._response() + data = json.loads(data) + return data[0]['new_generation'], data[0]['new_transaction_id'] + + cur_target_gen = last_known_generation + cur_target_trans_id = last_known_trans_id + + # skip docs that were already sent + if self._sync_state.sent > 0: + docs_by_generations = docs_by_generations[self._sync_state.sent:] + + # send docs for doc, gen, trans_id in docs_by_generations: + # allow for interrupting the sync process + if self.stopped is True: + break # skip non-syncable docs if isinstance(doc, SoledadDocument) and not doc.syncable: continue - #------------------------------------------------------------- + # ------------------------------------------------------------- # symmetric encryption of document's contents - #------------------------------------------------------------- + # ------------------------------------------------------------- doc_json = doc.get_json() if not doc.is_tombstone(): doc_json = encrypt_doc(self._crypto, doc) - #------------------------------------------------------------- + # ------------------------------------------------------------- # end of symmetric encryption - #------------------------------------------------------------- - size += prepare(id=doc.doc_id, rev=doc.rev, - content=doc_json, - gen=gen, trans_id=trans_id) - entries.append('\r\n]') - size += len(entries[-1]) - self._conn.putheader('content-length', str(size)) - self._conn.endheaders() - for entry in entries: - self._conn.send(entry) - entries = None - data, headers = self._response() - - res = self._parse_sync_stream(data, return_doc_cb, ensure_callback) - data = None - return res['new_generation'], res['new_transaction_id'] + # ------------------------------------------------------------- + cur_target_gen, cur_target_trans_id = _post_put_doc( + headers, cur_target_gen, cur_target_trans_id, id=doc.doc_id, + rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id) + self._sync_state.sent += 1 + signal( + SOLEDAD_SYNC_SEND_STATUS, + "%d/%d" % (self._sync_state.sent, len(docs_by_generations))) + + # get docs from target + cur_target_gen, cur_target_trans_id = self._get_remote_docs( + url, + last_known_generation, last_known_trans_id, headers, + return_doc_cb, ensure_callback) + self.stop() + return cur_target_gen, cur_target_trans_id + + def start(self): + """ + Mark current sync session as running. + """ + with self._stop_lock: + self._stopped = False + + def stop(self): + """ + Mark current sync session as stopped. + + This will eventually interrupt the sync_exchange() method and return + enough information to the synchronizer so the sync session can be + recovered afterwards. + """ + with self._stop_lock: + self._stopped = True + + @property + def stopped(self): + """ + Return whether this sync session is stopped. + + :return: Whether this sync session is stopped. + :rtype: bool + """ + with self._stop_lock: + return self._stopped is True |