From 8e750af78bd8f9a37d1b095712b66e6ecdb3e102 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 29 Apr 2014 13:29:23 -0300 Subject: Cleanup and pep8 fix. --- client/src/leap/soledad/client/__init__.py | 11 +++++------ client/src/leap/soledad/client/shared_db.py | 4 ++-- common/src/leap/soledad/common/errors.py | 1 + server/src/leap/soledad/server/lock_resource.py | 5 ++--- 4 files changed, 10 insertions(+), 11 deletions(-) diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py index 46e3cd5f..8881b422 100644 --- a/client/src/leap/soledad/client/__init__.py +++ b/client/src/leap/soledad/client/__init__.py @@ -720,8 +720,9 @@ class Soledad(object): :return: the hash :rtype: str """ - return sha256('%s%s' % - (self._passphrase_as_string(), self.uuid)).hexdigest() + return sha256( + '%s%s' % + (self._passphrase_as_string(), self.uuid)).hexdigest() def _get_secrets_from_shared_db(self): """ @@ -1211,7 +1212,6 @@ class Soledad(object): """ soledad_assert(self.STORAGE_SECRETS_KEY in data) # check mac of the recovery document - #mac_auth = False # XXX ? mac = None if MAC_KEY in data: soledad_assert(data[MAC_KEY] is not None) @@ -1234,7 +1234,6 @@ class Soledad(object): if mac != data[MAC_KEY]: raise WrongMac('Could not authenticate recovery document\'s ' 'contents.') - #mac_auth = True # XXX ? # include secrets in the secret pool. secrets = 0 for secret_id, secret_data in data[self.STORAGE_SECRETS_KEY].items(): @@ -1296,9 +1295,9 @@ class Soledad(object): return self._passphrase.encode('utf-8') -#----------------------------------------------------------------------------- +# ---------------------------------------------------------------------------- # Monkey patching u1db to be able to provide a custom SSL cert -#----------------------------------------------------------------------------- +# ---------------------------------------------------------------------------- # We need a more reasonable timeout (in seconds) SOLEDAD_TIMEOUT = 120 diff --git a/client/src/leap/soledad/client/shared_db.py b/client/src/leap/soledad/client/shared_db.py index 0753cbb5..52e51c6f 100644 --- a/client/src/leap/soledad/client/shared_db.py +++ b/client/src/leap/soledad/client/shared_db.py @@ -30,9 +30,9 @@ from leap.soledad.common import SHARED_DB_LOCK_DOC_ID_PREFIX from leap.soledad.client.auth import TokenBasedAuth -#----------------------------------------------------------------------------- +# ---------------------------------------------------------------------------- # Soledad shared database -#----------------------------------------------------------------------------- +# ---------------------------------------------------------------------------- class NoTokenForAuth(Exception): """ diff --git a/common/src/leap/soledad/common/errors.py b/common/src/leap/soledad/common/errors.py index 3a7eadd2..ea4bf7f6 100644 --- a/common/src/leap/soledad/common/errors.py +++ b/common/src/leap/soledad/common/errors.py @@ -62,6 +62,7 @@ class InvalidAuthTokenError(errors.Unauthorized): wire_descrition = "invalid auth token" status = 401 + # # LockResource errors # diff --git a/server/src/leap/soledad/server/lock_resource.py b/server/src/leap/soledad/server/lock_resource.py index a7870f77..0a602e26 100644 --- a/server/src/leap/soledad/server/lock_resource.py +++ b/server/src/leap/soledad/server/lock_resource.py @@ -178,9 +178,8 @@ class LockResource(object): error=InvalidTokenError.wire_description) else: self._shared_db.delete_doc(lock_doc) - self._responder.send_response_json(200) # success: should use 204 - # but u1db does not - # support it. + # respond success: should use 204 but u1db does not support it. + self._responder.send_response_json(200) def _remaining(self, lock_doc, now): """ -- cgit v1.2.3 From f3abf619ddd6be9dee7ed5807b967e06a6d7ef93 Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 24 Apr 2014 16:33:29 -0300 Subject: Add splitted POST sync design docs (#5571). --- .../soledad/common/ddocs/syncs/updates/state.js | 99 ++++++++++++++++++++++ .../ddocs/syncs/views/changes_to_return/map.js | 18 ++++ .../common/ddocs/syncs/views/seen_ids/map.js | 9 ++ .../soledad/common/ddocs/syncs/views/state/map.js | 16 ++++ 4 files changed, 142 insertions(+) create mode 100644 common/src/leap/soledad/common/ddocs/syncs/updates/state.js create mode 100644 common/src/leap/soledad/common/ddocs/syncs/views/changes_to_return/map.js create mode 100644 common/src/leap/soledad/common/ddocs/syncs/views/seen_ids/map.js create mode 100644 common/src/leap/soledad/common/ddocs/syncs/views/state/map.js diff --git a/common/src/leap/soledad/common/ddocs/syncs/updates/state.js b/common/src/leap/soledad/common/ddocs/syncs/updates/state.js new file mode 100644 index 00000000..cb2b6b7b --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/syncs/updates/state.js @@ -0,0 +1,99 @@ +/** + * This update handler stores information about ongoing synchronization + * attempts from distinct source replicas. + * + * Normally, u1db synchronization occurs during one POST request. In order to + * split that into many serial POST requests, we store the state of each sync + * in the server, using a document with id 'u1db_sync_state'. To identify + * each sync attempt, we use a sync_id sent by the client. If we ever receive + * a new sync_id, we trash current data for that source replica and start + * over. + * + * We expect the following in the document body: + * + * { + * 'source_replica_uid': '', + * 'sync_id': '', + * 'seen_ids': [['', ], ...], // optional + * 'changes_to_return': [ // optional + * 'gen': , + * 'trans_id': '', + * 'changes_to_return': [[', , ''], ...] + * ], + * } + * + * The format of the final document stored on server is: + * + * { + * '_id': '', + * '_rev' '', + * 'ongoing_syncs': { + * '': { + * 'seen_ids': [['', [, ...], + * 'changes_to_return': { + * 'gen': , + * 'trans_id': '', + * 'changes_to_return': [ + * ['', , ''], + * ..., + * ], + * }, + * }, + * ... // info about other source replicas here + * } + * } + */ +function(doc, req) { + + // prevent updates to alien documents + if (doc != null && doc['_id'] != 'u1db_sync_state') + return [null, 'invalid data']; + + // create the document if it doesn't exist + if (!doc) + doc = { + '_id': 'u1db_sync_state', + 'ongoing_syncs': {}, + }; + + // parse and validate incoming data + var body = JSON.parse(req.body); + if (body['source_replica_uid'] == null) + return [null, 'invalid data'] + var source_replica_uid = body['source_replica_uid']; + + // trash outdated sync data for that replica if that exists + if (doc['ongoing_syncs'][source_replica_uid] != null && + doc['ongoing_syncs'][source_replica_uid] == null) + delete doc['ongoing_syncs'][source_replica_uid]; + + // create an entry for that source replica + if (doc['ongoing_syncs'][source_replica_uid] == null) + doc['ongoing_syncs'][source_replica_uid] = { + 'seen_ids': {}, + 'changes_to_return': null, + }; + + // incoming meta-data values should be exclusive, so we count how many + // arrived and deny to accomplish the transaction if the count is high. + var incoming_values = 0; + var info = doc['ongoing_syncs'][source_replica_uid] + + // add incoming seen id + if ('seen_id' in body) { + info['seen_ids'][body['seen_id'][0]] = body['seen_id'][1]; + incoming_values += 1; + } + + // add incoming changes_to_return + if ('changes_to_return' in body) { + info['changes_to_return'] = body['changes_to_return']; + incoming_values += 1; + } + + if (incoming_values != 1) + return [null, 'invalid data']; + + return [doc, 'ok']; +} + diff --git a/common/src/leap/soledad/common/ddocs/syncs/views/changes_to_return/map.js b/common/src/leap/soledad/common/ddocs/syncs/views/changes_to_return/map.js new file mode 100644 index 00000000..220345dc --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/syncs/views/changes_to_return/map.js @@ -0,0 +1,18 @@ +function(doc) { + if (doc['_id'] == 'u1db_sync_state' && doc['ongoing_syncs'] != null) + for (var source_replica_uid in doc['ongoing_syncs']) { + var changes = doc['ongoing_syncs'][source_replica_uid]['changes_to_return']; + if (changes == null) + emit([source_replica_uid, 0], null); + else if (changes.length == 0) + emit([source_replica_uid, 0], []); + for (var i = 0; i < changes['changes_to_return'].length; i++) + emit( + [source_replica_uid, i], + { + 'gen': changes['gen'], + 'trans_id': changes['trans_id'], + 'next_change_to_return': changes['changes_to_return'][i], + }); + } +} diff --git a/common/src/leap/soledad/common/ddocs/syncs/views/seen_ids/map.js b/common/src/leap/soledad/common/ddocs/syncs/views/seen_ids/map.js new file mode 100644 index 00000000..34c65b3f --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/syncs/views/seen_ids/map.js @@ -0,0 +1,9 @@ +function(doc) { + if (doc['_id'] == 'u1db_sync_state' && doc['ongoing_syncs'] != null) + for (var source_replica_uid in doc['ongoing_syncs']) + emit( + source_replica_uid, + { + 'seen_ids': doc['ongoing_syncs'][source_replica_uid]['seen_ids'], + }); +} diff --git a/common/src/leap/soledad/common/ddocs/syncs/views/state/map.js b/common/src/leap/soledad/common/ddocs/syncs/views/state/map.js new file mode 100644 index 00000000..1d8f8e84 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/syncs/views/state/map.js @@ -0,0 +1,16 @@ +function(doc) { + if (doc['_id'] == 'u1db_sync_state' && doc['ongoing_syncs'] != null) + for (var source_replica_uid in doc['ongoing_syncs']) { + var changes = doc['ongoing_syncs'][source_replica_uid]['changes_to_return']; + if (changes == null) + emit(source_replica_uid, null); + else + emit( + source_replica_uid, + { + 'gen': changes['gen'], + 'trans_id': changes['trans_id'], + 'number_of_changes': changes['changes_to_return'].length + }); + } +} -- cgit v1.2.3 From 24465b7b2cd77b66f637e22453dd24a2d67c4ce6 Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 17 Apr 2014 16:15:04 -0300 Subject: Split sync in multiple POST requests in server (#5571). --- common/src/leap/soledad/common/couch.py | 8 +- server/changes/feature_5571_split-sync-post | 1 + server/src/leap/soledad/server/__init__.py | 137 ++++++++- server/src/leap/soledad/server/sync.py | 462 ++++++++++++++++++++++++++++ 4 files changed, 592 insertions(+), 16 deletions(-) create mode 100644 server/changes/feature_5571_split-sync-post create mode 100644 server/src/leap/soledad/server/sync.py diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 0aa84170..f4696cee 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -371,6 +371,7 @@ class CouchDatabase(CommonBackend): MAX_GET_DOCS_THREADS = 20 update_handler_lock = defaultdict(threading.Lock) + sync_info_lock = defaultdict(threading.Lock) class _GetDocThread(threading.Thread): """ @@ -440,7 +441,8 @@ class CouchDatabase(CommonBackend): if not create: raise DatabaseDoesNotExist() server.create(dbname) - return cls(url, dbname, replica_uid=replica_uid, ensure_ddocs=ensure_ddocs) + return cls( + url, dbname, replica_uid=replica_uid, ensure_ddocs=ensure_ddocs) def __init__(self, url, dbname, replica_uid=None, ensure_ddocs=True): """ @@ -576,6 +578,8 @@ class CouchDatabase(CommonBackend): _replica_uid = property(_get_replica_uid, _set_replica_uid) + replica_uid = property(_get_replica_uid) + def _get_generation(self): """ Return the current generation. @@ -869,7 +873,7 @@ class CouchDatabase(CommonBackend): # Date.prototype.getTime() which was used before inside a couchdb # update handler. (int(time.time() * 1000), - self._allocate_transaction_id())) + self._allocate_transaction_id())) # build the couch document couch_doc = { '_id': doc.doc_id, diff --git a/server/changes/feature_5571_split-sync-post b/server/changes/feature_5571_split-sync-post new file mode 100644 index 00000000..ad269cd4 --- /dev/null +++ b/server/changes/feature_5571_split-sync-post @@ -0,0 +1 @@ + o Split sync in multiple POST requests in server (#5571). diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py index c170f230..573afdd6 100644 --- a/server/src/leap/soledad/server/__init__.py +++ b/server/src/leap/soledad/server/__init__.py @@ -87,8 +87,10 @@ and lock documents on the shared database is handled by """ import configparser +import urlparse +import sys -from u1db.remote import http_app +from u1db.remote import http_app, utils # Keep OpenSSL's tsafe before importing Twisted submodules so we can put # it back if Twisted==12.0.0 messes with it. @@ -99,24 +101,24 @@ from twisted import version if version.base() == "12.0.0": # Put OpenSSL's tsafe back into place. This can probably be removed if we # come to use Twisted>=12.3.0. - import sys sys.modules['OpenSSL.tsafe'] = old_tsafe from leap.soledad.server.auth import SoledadTokenAuthMiddleware from leap.soledad.server.gzip_middleware import GzipMiddleware from leap.soledad.server.lock_resource import LockResource +from leap.soledad.server.sync import ( + SyncResource, + MAX_REQUEST_SIZE, + MAX_ENTRY_SIZE, +) from leap.soledad.common import SHARED_DB_NAME from leap.soledad.common.couch import CouchServerState -#----------------------------------------------------------------------------- +# ---------------------------------------------------------------------------- # Soledad WSGI application -#----------------------------------------------------------------------------- - -MAX_REQUEST_SIZE = 200 # in Mb -MAX_ENTRY_SIZE = 200 # in Mb - +# ---------------------------------------------------------------------------- class SoledadApp(http_app.HTTPApp): """ @@ -147,14 +149,121 @@ class SoledadApp(http_app.HTTPApp): return http_app.HTTPApp.__call__(self, environ, start_response) +# ---------------------------------------------------------------------------- +# WSGI resources registration +# ---------------------------------------------------------------------------- + +# monkey patch u1db with a new resource map +http_app.url_to_resource = http_app.URLToResource() + +# register u1db unmodified resources +http_app.url_to_resource.register(http_app.GlobalResource) +http_app.url_to_resource.register(http_app.DatabaseResource) +http_app.url_to_resource.register(http_app.DocsResource) +http_app.url_to_resource.register(http_app.DocResource) + +# register Soledad's new or modified resources http_app.url_to_resource.register(LockResource) -http_app.SyncResource.max_request_size = MAX_REQUEST_SIZE * 1024 * 1024 -http_app.SyncResource.max_entry_size = MAX_ENTRY_SIZE * 1024 * 1024 +http_app.url_to_resource.register(SyncResource) + +# ---------------------------------------------------------------------------- +# Modified HTTP method invocation (to account for splitted sync) +# ---------------------------------------------------------------------------- -#----------------------------------------------------------------------------- +class HTTPInvocationByMethodWithBody( + http_app.HTTPInvocationByMethodWithBody): + """ + Invoke methods on a resource. + """ + + def __call__(self): + """ + Call an HTTP method of a resource. + + This method was rewritten to allow for a sync flow which uses one POST + request for each transferred document (back and forth). + + Usual U1DB sync process transfers all documents from client to server + and back in only one POST request. This is inconvenient for some + reasons, as lack of possibility of gracefully interrupting the sync + process, and possible timeouts for when dealing with large documents + that have to be retrieved and encrypted/decrypted. Because of those, + we split the sync process into many POST requests. + """ + args = urlparse.parse_qsl(self.environ['QUERY_STRING'], + strict_parsing=False) + try: + args = dict( + (k.decode('utf-8'), v.decode('utf-8')) for k, v in args) + except ValueError: + raise http_app.BadRequest() + method = self.environ['REQUEST_METHOD'].lower() + if method in ('get', 'delete'): + meth = self._lookup(method) + return meth(args, None) + else: + # we expect content-length > 0, reconsider if we move + # to support chunked enconding + try: + content_length = int(self.environ['CONTENT_LENGTH']) + except (ValueError, KeyError): + raise http_app.BadRequest + if content_length <= 0: + raise http_app.BadRequest + if content_length > self.max_request_size: + raise http_app.BadRequest + reader = http_app._FencedReader( + self.environ['wsgi.input'], content_length, + self.max_entry_size) + content_type = self.environ.get('CONTENT_TYPE') + if content_type == 'application/json': + meth = self._lookup(method) + body = reader.read_chunk(sys.maxint) + return meth(args, body) + elif content_type.startswith('application/x-soledad-sync'): + # read one line and validate it + body_getline = reader.getline + if body_getline().strip() != '[': + raise http_app.BadRequest() + line = body_getline() + line, comma = utils.check_and_strip_comma(line.strip()) + meth_args = self._lookup('%s_args' % method) + meth_args(args, line) + # handle incoming documents + if content_type == 'application/x-soledad-sync-put': + meth_put = self._lookup('%s_put' % method) + meth_end = self._lookup('%s_end' % method) + while True: + line = body_getline() + entry = line.strip() + if entry == ']': # end of incoming document stream + break + if not entry or not comma: # empty or no prec comma + raise http_app.BadRequest + entry, comma = utils.check_and_strip_comma(entry) + meth_put({}, entry) + if comma or body_getline(): # extra comma or data + raise http_app.BadRequest + return meth_end() + # handle outgoing documents + elif content_type == 'application/x-soledad-sync-get': + line = body_getline() + entry = line.strip() + meth_get = self._lookup('%s_get' % method) + return meth_get({}, line) + else: + raise http_app.BadRequest() + else: + raise http_app.BadRequest() + + +http_app.HTTPInvocationByMethodWithBody = HTTPInvocationByMethodWithBody + + +# ---------------------------------------------------------------------------- # Auxiliary functions -#----------------------------------------------------------------------------- +# ---------------------------------------------------------------------------- def load_configuration(file_path): """ @@ -180,9 +289,9 @@ def load_configuration(file_path): return conf -#----------------------------------------------------------------------------- +# ---------------------------------------------------------------------------- # Run as Twisted WSGI Resource -#----------------------------------------------------------------------------- +# ---------------------------------------------------------------------------- def application(environ, start_response): conf = load_configuration('/etc/leap/soledad-server.conf') diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py new file mode 100644 index 00000000..3b8b69fb --- /dev/null +++ b/server/src/leap/soledad/server/sync.py @@ -0,0 +1,462 @@ +# -*- coding: utf-8 -*- +# sync.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +""" +Server side synchronization infrastructure. +""" + +import json + + +from leap.soledad.common.couch import CouchDatabase +from itertools import izip +from u1db import sync, Document +from u1db.remote import http_app + + +MAX_REQUEST_SIZE = 200 # in Mb +MAX_ENTRY_SIZE = 200 # in Mb + + +class ServerSyncState(object): + """ + The state of one sync session, as stored on backend server. + + This object performes queries to distinct design documents: + + _design/syncs/_update/state + _design/syncs/_view/state + _design/syncs/_view/seen_ids + _design/syncs/_view/changes_to_return + + On server side, the ongoing syncs metadata is maintained in a document + called 'u1db_sync_state'. + """ + + def __init__(self, db, source_replica_uid): + """ + Initialize the sync state object. + + :param db: The target syncing database. + :type db: CouchDatabase. + :param source_replica_uid: CouchDatabase + :type source_replica_uid: str + """ + self._db = db + self._source_replica_uid = source_replica_uid + + def _key(self, key): + """ + Format a key to be used on couch views. + + :param key: The lookup key. + :type key: json serializable object + + :return: The properly formatted key. + :rtype: str + """ + return json.dumps(key, separators=(',', ':')) + + def _put_info(self, key, value): + """ + Put some information on the sync state document. + + This method works in conjunction with the + _design/syncs/_update/state update handler couch backend. + + :param key: The key for the info to be put. + :type key: str + :param value: The value for the info to be put. + :type value: str + """ + ddoc_path = [ + '_design', 'syncs', '_update', 'state', + 'u1db_sync_state'] + res = self._db._database.resource(*ddoc_path) + with CouchDatabase.sync_info_lock[self._db.replica_uid]: + res.put_json( + body={ + 'source_replica_uid': self._source_replica_uid, + key: value, + }, + headers={'content-type': 'application/json'}) + + def put_seen_id(self, seen_id, gen): + """ + Put one seen id on the sync state document. + + :param seen_id: The doc_id of a document seen during sync. + :type seen_id: str + :param gen: The corresponding db generation for that document. + :type gen: int + """ + self._put_info( + 'seen_id', + [seen_id, gen]) + + def seen_ids(self): + """ + Return all document ids seen during the sync. + + :return: A list with doc ids seen during the sync. + :rtype: list + """ + ddoc_path = ['_design', 'syncs', '_view', 'seen_ids'] + resource = self._db._database.resource(*ddoc_path) + response = resource.get_json(key=self._key(self._source_replica_uid)) + data = response[2] + if len(data['rows']) > 0: + entry = data['rows'].pop() + return entry['value']['seen_ids'] + return [] + + def put_changes_to_return(self, gen, trans_id, changes_to_return): + """ + Put the calculated changes to return in the backend sync state + document. + + :param gen: The target database generation that will be synced. + :type gen: int + :param trans_id: The target database transaction id that will be + synced. + :type trans_id: str + :param changes_to_return: A list of tuples with the changes to be + returned during the sync process. + :type changes_to_return: list + """ + self._put_info( + 'changes_to_return', + { + 'gen': gen, + 'trans_id': trans_id, + 'changes_to_return': changes_to_return, + } + ) + + def sync_info(self): + """ + Return information about the current sync state. + + :return: The generation and transaction id of the target database + which will be synced, and the number of documents do return, + or a tuple of Nones if those have not already been sent to + server. + :rtype: tuple + """ + ddoc_path = ['_design', 'syncs', '_view', 'state'] + resource = self._db._database.resource(*ddoc_path) + response = resource.get_json(key=self._key(self._source_replica_uid)) + data = response[2] + gen = None + trans_id = None + number_of_changes = None + if len(data['rows']) > 0 and data['rows'][0]['value'] is not None: + value = data['rows'][0]['value'] + gen = value['gen'] + trans_id = value['trans_id'] + number_of_changes = value['number_of_changes'] + return gen, trans_id, number_of_changes + + def next_change_to_return(self, received): + """ + Return the next change to be returned to the source syncing replica. + + :param received: How many documents the source replica has already + received during the current sync process. + :type received: int + """ + ddoc_path = ['_design', 'syncs', '_view', 'changes_to_return'] + resource = self._db._database.resource(*ddoc_path) + response = resource.get_json( + key=self._key( + [self._source_replica_uid, received])) + data = response[2] + if len(data['rows']) == 0: + return None, None, None + value = data['rows'][0]['value'] + gen = value['gen'] + trans_id = value['trans_id'] + next_change_to_return = value['next_change_to_return'] + return gen, trans_id, tuple(next_change_to_return) + + +class SyncExchange(sync.SyncExchange): + + def __init__(self, db, source_replica_uid, last_known_generation): + """ + :param db: The target syncing database. + :type db: CouchDatabase + :param source_replica_uid: The uid of the source syncing replica. + :type source_replica_uid: str + :param last_known_generation: The last target replica generation the + source replica knows about. + :type last_known_generation: int + """ + self._db = db + self.source_replica_uid = source_replica_uid + self.source_last_known_generation = last_known_generation + self.new_gen = None + self.new_trans_id = None + self._trace_hook = None + # recover sync state + self._sync_state = ServerSyncState(self._db, self.source_replica_uid) + # for tests + #self._incoming_trace = [] + #if hasattr(self._db, '_incoming_trace'): + # self._incoming_trace = self._db._incoming_trace + #self._db._last_exchange_log = { + # 'receive': {'docs': self._incoming_trace}, + # 'return': None + # } + + + def find_changes_to_return(self, received): + """ + Find changes to return. + + Find changes since last_known_generation in db generation + order using whats_changed. It excludes documents ids that have + already been considered (superseded by the sender, etc). + + :param received: How many documents the source replica has already + received during the current sync process. + :type received: int + + :return: the generation of this database, which the caller can + consider themselves to be synchronized after processing + allreturned documents, and the amount of documents to be sent + to the source syncing replica. + :rtype: int + """ + if hasattr(self._db, '_last_exchange_log'): + self._db._last_exchange_log['receive'].update({ # for tests + 'last_known_gen': self.source_last_known_generation + }) + # check if changes to return have already been calculated + new_gen, new_trans_id, number_of_changes = self._sync_state.sync_info() + if number_of_changes is None: + self._trace('before whats_changed') + new_gen, new_trans_id, changes = self._db.whats_changed( + self.source_last_known_generation) + self._trace('after whats_changed') + seen_ids = self._sync_state.seen_ids() + # changed docs that weren't superseded by or converged with + changes_to_return = [ + (doc_id, gen, trans_id) for (doc_id, gen, trans_id) in changes + # there was a subsequent update + if doc_id not in seen_ids or seen_ids.get(doc_id) < gen] + self._sync_state.put_changes_to_return( + new_gen, new_trans_id, changes_to_return) + number_of_changes = len(changes_to_return) + # query server for stored changes + _, _, next_change_to_return = \ + self._sync_state.next_change_to_return(received) + self.new_gen = new_gen + self.new_trans_id = new_trans_id + # and append one change + self.changes_to_return = [] + if next_change_to_return is not None: + self.changes_to_return.append(next_change_to_return) + return self.new_gen, number_of_changes + + def return_one_doc(self, return_doc_cb): + """ + Return one changed document and its last change generation to the + source syncing replica by invoking the callback return_doc_cb. + + This is called once for each document to be transferred from target to + source. + + :param return_doc_cb: is a callback used to return the documents with + their last change generation to the target + replica. + :type return_doc_cb: callable(doc, gen, trans_id) + """ + changes_to_return = self.changes_to_return + # return docs, including conflicts + changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return] + self._trace('before get_docs') + docs = self._db.get_docs( + changed_doc_ids, check_for_conflicts=False, include_deleted=True) + + docs_by_gen = izip( + docs, (gen for _, gen, _ in changes_to_return), + (trans_id for _, _, trans_id in changes_to_return)) + for doc, gen, trans_id in docs_by_gen: + return_doc_cb(doc, gen, trans_id) + # for tests + if hasattr(self._db, '_outgoing_trace'): + self._db._outgoing_trace.append((doc.doc_id, doc.rev)) + # for tests + if hasattr(self._db, '_outgoing_trace'): + self._db._last_exchange_log['return'] = { + 'docs': self._db._outgoing_trace, + 'last_gen': self.new_gen + } + + def insert_doc_from_source(self, doc, source_gen, trans_id): + """Try to insert synced document from source. + + Conflicting documents are not inserted but will be sent over + to the sync source. + + It keeps track of progress by storing the document source + generation as well. + + The 1st step of a sync exchange is to call this repeatedly to + try insert all incoming documents from the source. + + :param doc: A Document object. + :type doc: Document + :param source_gen: The source generation of doc. + :type source_gen: int + :param trans_id: The transaction id of that document change. + :type trans_id: str + """ + state, at_gen = self._db._put_doc_if_newer( + doc, save_conflict=False, replica_uid=self.source_replica_uid, + replica_gen=source_gen, replica_trans_id=trans_id) + if state == 'inserted': + self._sync_state.put_seen_id(doc.doc_id, at_gen) + elif state == 'converged': + # magical convergence + self._sync_state.put_seen_id(doc.doc_id, at_gen) + elif state == 'superseded': + # we have something newer that we will return + pass + else: + # conflict that we will returne + assert state == 'conflicted' + # for tests + if hasattr(self._db, '_incoming_trace') \ + and hasattr(self._db, '_last_exchange_log'): + self._db._incoming_trace.append((doc.doc_id, doc.rev)) + self._db._last_exchange_log['receive'].update({ + 'source_uid': self.source_replica_uid, + 'source_gen': source_gen + }) + + +class SyncResource(http_app.SyncResource): + + max_request_size = MAX_REQUEST_SIZE * 1024 * 1024 + max_entry_size = MAX_ENTRY_SIZE * 1024 * 1024 + + sync_exchange_class = SyncExchange + + @http_app.http_method( + last_known_generation=int, last_known_trans_id=http_app.none_or_str, + content_as_args=True) + def post_args(self, last_known_generation, last_known_trans_id=None, + ensure=False): + """ + Handle the initial arguments for the sync POST request from client. + + :param last_known_generation: The last server replica generation the + client knows about. + :type last_known_generation: int + :param last_known_trans_id: The last server replica transaction_id the + client knows about. + :type last_known_trans_id: str + :param ensure: Wether the server replica should be created if it does + not already exist. + :type ensure: bool + """ + # create or open the database + if ensure: + db, self.replica_uid = self.state.ensure_database(self.dbname) + else: + db = self.state.open_database(self.dbname) + # validate the information the client has about server replica + db.validate_gen_and_trans_id( + last_known_generation, last_known_trans_id) + # get a sync exchange object + self.sync_exch = self.sync_exchange_class( + db, self.source_replica_uid, last_known_generation) + + @http_app.http_method(content_as_args=True) + def post_put(self, id, rev, content, gen, trans_id): + """ + Put one incoming document into the server replica. + + :param id: The id of the incoming document. + :type id: str + :param rev: The revision of the incoming document. + :type rev: str + :param content: The content of the incoming document. + :type content: dict + :param gen: The source replica generation corresponding to the + revision of the incoming document. + :type gen: int + :param trans_id: The source replica transaction id corresponding to + the revision of the incoming document. + :type trans_id: str + """ + doc = Document(id, rev, content) + self.sync_exch.insert_doc_from_source(doc, gen, trans_id) + + @http_app.http_method(received=int, content_as_args=True) + def post_get(self, received): + """ + Return one syncing document to the client. + + :param received: How many documents have already been received by the + client on the current sync session. + :type received: int + """ + + def send_doc(doc, gen, trans_id): + entry = dict(id=doc.doc_id, rev=doc.rev, content=doc.get_json(), + gen=gen, trans_id=trans_id) + self.responder.stream_entry(entry) + + new_gen, number_of_changes = \ + self.sync_exch.find_changes_to_return(received) + self.responder.content_type = 'application/x-u1db-sync-response' + self.responder.start_response(200) + self.responder.start_stream(), + header = { + "new_generation": new_gen, + "new_transaction_id": self.sync_exch.new_trans_id, + "number_of_changes": number_of_changes, + } + if self.replica_uid is not None: + header['replica_uid'] = self.replica_uid + self.responder.stream_entry(header) + self.sync_exch.return_one_doc(send_doc) + self.responder.end_stream() + self.responder.finish_response() + + def post_end(self): + """ + Return the current generation and transaction_id after inserting a + series of incoming documents. + """ + self.responder.content_type = 'application/x-soledad-sync-response' + self.responder.start_response(200) + self.responder.start_stream(), + new_gen, new_trans_id = self.sync_exch._db._get_generation_info() + header = { + "new_generation": new_gen, + "new_transaction_id": new_trans_id, + } + if self.replica_uid is not None: + header['replica_uid'] = self.replica_uid + self.responder.stream_entry(header) + self.responder.end_stream() + self.responder.finish_response() -- cgit v1.2.3 From ad748eb838a15b0263fdf18813404d3bee58cd03 Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 17 Apr 2014 16:16:20 -0300 Subject: Split sync in multiple POST requests in client (#5571). --- client/changes/feature_5571_split-sync-post | 1 + client/src/leap/soledad/client/target.py | 290 +++++++++++++++++++--------- 2 files changed, 198 insertions(+), 93 deletions(-) create mode 100644 client/changes/feature_5571_split-sync-post diff --git a/client/changes/feature_5571_split-sync-post b/client/changes/feature_5571_split-sync-post new file mode 100644 index 00000000..0d7b14dd --- /dev/null +++ b/client/changes/feature_5571_split-sync-post @@ -0,0 +1 @@ + o Split sync in multiple POST requests in client (#5571). diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 3b3d6870..7b77055c 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -28,6 +28,7 @@ import urllib import simplejson as json from time import sleep +from uuid import uuid4 from u1db.remote import utils, http_errors from u1db.errors import BrokenSyncStream @@ -149,10 +150,12 @@ def encrypt_doc(crypto, doc): ENC_SCHEME_KEY: EncryptionSchemes.SYMKEY, ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR, ENC_IV_KEY: iv, - MAC_KEY: binascii.b2a_hex(mac_doc( # store the mac as hex. - crypto, doc.doc_id, doc.rev, - ciphertext, - MacMethods.HMAC)), + # store the mac as hex. + MAC_KEY: binascii.b2a_hex( + mac_doc( + crypto, doc.doc_id, doc.rev, + ciphertext, + MacMethods.HMAC)), MAC_METHOD_KEY: MacMethods.HMAC, }) @@ -311,22 +314,51 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): HTTPSyncTarget.__init__(self, url, creds) self._crypto = crypto - def _parse_sync_stream(self, data, return_doc_cb, ensure_callback=None): + def _init_post_request(self, url, action, headers, content_length): + """ + Initiate a syncing POST request. + + :param url: The syncing URL. + :type url: str + :param action: The syncing action, either 'get' or 'receive'. + :type action: str + :param headers: The initial headers to be sent on this request. + :type headers: dict + :param content_length: The content-length of the request. + :type content_length: int + """ + self._conn.putrequest('POST', url) + self._conn.putheader( + 'content-type', 'application/x-soledad-sync-%s' % action) + for header_name, header_value in headers: + self._conn.putheader(header_name, header_value) + self._conn.putheader('accept-encoding', 'gzip') + self._conn.putheader('content-length', str(content_length)) + self._conn.endheaders() + + def _get_remote_docs(self, url, last_known_generation, last_known_trans_id, + headers, return_doc_cb, ensure_callback=None): """ - Parse incoming synchronization stream and insert documents in the + Fetch sync documents from the remote database and insert them in the local database. If an incoming document's encryption scheme is equal to EncryptionSchemes.SYMKEY, then this method will decrypt it with Soledad's symmetric key. - :param data: The body of the HTTP response. - :type data: str + :param url: The syncing URL. + :type url: str + :param last_known_generation: Target's last known generation. + :type last_known_generation: int + :param last_known_trans_id: Target's last known transaction id. + :type last_known_trans_id: str + :param headers: The headers of the HTTP request. + :type headers: dict :param return_doc_cb: A callback to insert docs from target. - :type return_doc_cb: function + :type return_doc_cb: callable :param ensure_callback: A callback to ensure we have the correct - target_replica_uid, if it was just created. - :type ensure_callback: function + target_replica_uid, if it was just created. + :type ensure_callback: callable :raise BrokenSyncStream: If C{data} is malformed. @@ -334,54 +366,94 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): from remote replica. :rtype: list of str """ - parts = data.splitlines() # one at a time - if not parts or parts[0] != '[': - raise BrokenSyncStream - data = parts[1:-1] - comma = False - if data: - line, comma = utils.check_and_strip_comma(data[0]) - res = json.loads(line) - if ensure_callback and 'replica_uid' in res: - ensure_callback(res['replica_uid']) - for entry in data[1:]: - if not comma: # missing in between comma - raise BrokenSyncStream - line, comma = utils.check_and_strip_comma(entry) - entry = json.loads(line) - #------------------------------------------------------------- - # symmetric decryption of document's contents - #------------------------------------------------------------- - # if arriving content was symmetrically encrypted, we decrypt - # it. - doc = SoledadDocument( - entry['id'], entry['rev'], entry['content']) - if doc.content and ENC_SCHEME_KEY in doc.content: - if doc.content[ENC_SCHEME_KEY] == \ - EncryptionSchemes.SYMKEY: - doc.set_json(decrypt_doc(self._crypto, doc)) - #------------------------------------------------------------- - # end of symmetric decryption - #------------------------------------------------------------- - return_doc_cb(doc, entry['gen'], entry['trans_id']) - if parts[-1] != ']': + + def _post_get_doc(received): + """ + Get a sync document from server by means of a POST request. + + :param received: How many documents have already been received in + this sync session. + :type received: int + """ + entries = ['['] + size = 1 + # add remote replica metadata to the request + size += self._prepare( + '', entries, + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + ensure=ensure_callback is not None) + # inform server of how many documents have already been received + size += self._prepare(',', entries, received=received) + entries.append('\r\n]') + size += len(entries[-1]) + # send headers + self._init_post_request(url, 'get', headers, size) + # get document + for entry in entries: + self._conn.send(entry) + return self._response() + + received = 0 + number_of_changes = None + + while number_of_changes is None or received < number_of_changes: + # try to fetch one document from target + data, _ = _post_get_doc(received) + received += 1 + # decode incoming stream + entries = None try: - partdic = json.loads(parts[-1]) + entries = json.loads(data) except ValueError: - pass - else: - if isinstance(partdic, dict): - self._error(partdic) - raise BrokenSyncStream - if not data or comma: # no entries or bad extra comma - raise BrokenSyncStream - return res + raise BrokenSyncStream + # bail out if there are no documents to be received + try: + number_of_changes = entries[0]['number_of_changes'] + except IndexError, KeyError: + raise BrokenSyncStream + if number_of_changes == 0: + break + # decrypt incoming document and insert into local database + entry = None + try: + entry = entries[1] + except IndexError: + raise BrokenSyncStream + if ensure_callback and 'replica_uid' in res: + ensure_callback(res['replica_uid']) + # ------------------------------------------------------------- + # symmetric decryption of document's contents + # ------------------------------------------------------------- + # if arriving content was symmetrically encrypted, we decrypt + # it. + doc = SoledadDocument( + entry['id'], entry['rev'], entry['content']) + if doc.content and ENC_SCHEME_KEY in doc.content: + if doc.content[ENC_SCHEME_KEY] == \ + EncryptionSchemes.SYMKEY: + doc.set_json(decrypt_doc(self._crypto, doc)) + # ------------------------------------------------------------- + # end of symmetric decryption + # ------------------------------------------------------------- + return_doc_cb(doc, entry['gen'], entry['trans_id']) + return entries[0]['new_generation'], entries[0]['new_transaction_id'] def _request(self, method, url_parts, params=None, body=None, content_type=None): """ - Overloaded method. See u1db docs. - Patched for adding gzip encoding. + Perform an HTTP request. + + :param method: The HTTP request method. + :type method: str + :param url_parts: A list representing the request path. + :type url_parts: list + :param params: Parameters for the URL query string. + :type params: dict + :param body: The body of the request. + :type body: str + :param content-type: The content-type of the request. + :type content-type: str """ self._ensure_connection() @@ -425,8 +497,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): def _response(self): """ - Overloaded method, see u1db docs. - We patched it for decrypting gzip content. + Return the response of the (possibly gzipped) HTTP request. + + :return: The body and headers of the response. + :rtype: tuple """ resp = self._conn.getresponse() body = resp.read() @@ -453,6 +527,22 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): raise errors.Unavailable(body, headers) raise errors.HTTPError(resp.status, body, headers) + def _prepare(self, comma, entries, **dic): + """ + Prepare an entry to be sent through a syncing POST request. + + :param comma: A string to be prepended to the current entry. + :type comma: str + :param entries: A list of entries accumulated to be sent on the + request. + :type entries: list + :param dic: The data to be included in this entry. + :type dic: dict + """ + entry = comma + '\r\n' + json.dumps(dic) + entries.append(entry) + return len(entry) + def sync_exchange(self, docs_by_generations, source_replica_uid, last_known_generation, last_known_trans_id, return_doc_cb, ensure_callback=None): @@ -488,50 +578,64 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): if self._trace_hook: # for tests self._trace_hook('sync_exchange') url = '%s/sync-from/%s' % (self._url.path, source_replica_uid) - self._conn.putrequest('POST', url) - self._conn.putheader('content-type', 'application/x-u1db-sync-stream') - for header_name, header_value in self._sign_request('POST', url, {}): - self._conn.putheader(header_name, header_value) - self._conn.putheader('accept-encoding', 'gzip') - entries = ['['] - size = 1 - - def prepare(**dic): - entry = comma + '\r\n' + json.dumps(dic) - entries.append(entry) - return len(entry) - - comma = '' - size += prepare( - last_known_generation=last_known_generation, - last_known_trans_id=last_known_trans_id, - ensure=ensure_callback is not None) - comma = ',' + headers = self._sign_request('POST', url, {}) + + def _post_put_doc(headers, last_known_generation, last_known_trans_id, + id, rev, content, gen, trans_id): + """ + Put a sync document on server by means of a POST request. + + :param received: How many documents have already been received in + this sync session. + :type received: int + """ + # prepare to send the document + entries = ['['] + size = 1 + # add remote replica metadata to the request + size += self._prepare( + '', entries, + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + ensure=ensure_callback is not None) + # add the document to the request + size += self._prepare( + ',', entries, + id=id, rev=rev, content=content, gen=gen, trans_id=trans_id) + entries.append('\r\n]') + size += len(entries[-1]) + # send headers + self._init_post_request(url, 'put', headers, size) + # send document + for entry in entries: + self._conn.send(entry) + data, _ = self._response() + data = json.loads(data) + return data[0]['new_generation'], data[0]['new_transaction_id'] + + cur_target_gen = last_known_generation + cur_target_trans_id = last_known_trans_id + for doc, gen, trans_id in docs_by_generations: # skip non-syncable docs if isinstance(doc, SoledadDocument) and not doc.syncable: continue - #------------------------------------------------------------- + # ------------------------------------------------------------- # symmetric encryption of document's contents - #------------------------------------------------------------- + # ------------------------------------------------------------- doc_json = doc.get_json() if not doc.is_tombstone(): doc_json = encrypt_doc(self._crypto, doc) - #------------------------------------------------------------- + # ------------------------------------------------------------- # end of symmetric encryption - #------------------------------------------------------------- - size += prepare(id=doc.doc_id, rev=doc.rev, - content=doc_json, - gen=gen, trans_id=trans_id) - entries.append('\r\n]') - size += len(entries[-1]) - self._conn.putheader('content-length', str(size)) - self._conn.endheaders() - for entry in entries: - self._conn.send(entry) - entries = None - data, headers = self._response() - - res = self._parse_sync_stream(data, return_doc_cb, ensure_callback) - data = None - return res['new_generation'], res['new_transaction_id'] + # ------------------------------------------------------------- + cur_target_gen, cur_target_trans_id = _post_put_doc( + headers, cur_target_gen, cur_target_trans_id, id=doc.doc_id, + rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id) + + cur_target_gen, cur_target_trans_id = self._get_remote_docs( + url, + last_known_generation, last_known_trans_id, headers, + return_doc_cb, ensure_callback) + + return cur_target_gen, cur_target_trans_id -- cgit v1.2.3 From 73d431a035fcdce8d623eefde2d62f28687fdb36 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 30 Apr 2014 12:52:24 -0300 Subject: Allow for interrupting and recovering sync (#5571). --- ...5571_allow-for-interrupting-and-recovering-sync | 1 + client/src/leap/soledad/client/__init__.py | 7 + client/src/leap/soledad/client/sqlcipher.py | 52 +++- client/src/leap/soledad/client/sync.py | 262 +++++++++++++++++++++ client/src/leap/soledad/client/target.py | 73 +++++- common/src/leap/soledad/common/tests/test_sync.py | 176 ++++++++++++++ ...5571_allow-for-interrupting-and-recovering-sync | 1 + 7 files changed, 556 insertions(+), 16 deletions(-) create mode 100644 client/changes/feature_5571_allow-for-interrupting-and-recovering-sync create mode 100644 client/src/leap/soledad/client/sync.py create mode 100644 common/src/leap/soledad/common/tests/test_sync.py create mode 100644 server/changes/feature_5571_allow-for-interrupting-and-recovering-sync diff --git a/client/changes/feature_5571_allow-for-interrupting-and-recovering-sync b/client/changes/feature_5571_allow-for-interrupting-and-recovering-sync new file mode 100644 index 00000000..0087c535 --- /dev/null +++ b/client/changes/feature_5571_allow-for-interrupting-and-recovering-sync @@ -0,0 +1 @@ + o Allow for interrupting and recovering sync (#5517). diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py index 8881b422..f92317e9 100644 --- a/client/src/leap/soledad/client/__init__.py +++ b/client/src/leap/soledad/client/__init__.py @@ -1096,6 +1096,13 @@ class Soledad(object): signal(SOLEDAD_DONE_DATA_SYNC, self._uuid) return local_gen + def stop_sync(self): + """ + Stop the current syncing process. + """ + if self._db: + self._db.stop_sync() + def need_sync(self, url): """ Return if local db replica differs from remote url's replica. diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 04f8ebf9..0885a35f 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -55,14 +55,14 @@ from contextlib import contextmanager from pysqlcipher import dbapi2 from u1db.backends import sqlite_backend -from u1db.sync import Synchronizer from u1db import errors as u1db_errors +from leap.soledad.client.sync import Synchronizer from leap.soledad.client.target import SoledadSyncTarget from leap.soledad.common.document import SoledadDocument -logger = logging.getLogger(__name__) +logger = logging.getLogger(__name__) # Monkey-patch u1db.backends.sqlite_backend with pysqlcipher.dbapi2 sqlite_backend.dbapi2 = dbapi2 @@ -214,6 +214,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): syncable=syncable) self.set_document_factory(factory) self._syncers = {} + self._real_sync_state = None @classmethod def _open_database(cls, sqlcipher_file, password, document_factory=None, @@ -359,6 +360,14 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): res = syncer.sync(autocreate=autocreate) return res + def stop_sync(self): + """ + Interrupt all ongoing syncs. + """ + for url in self._syncers: + _, syncer = self._syncers[url] + syncer.stop() + @contextmanager def syncer(self, url, creds=None): """ @@ -379,7 +388,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): :type creds: dict :return: A synchronizer. - :rtype: u1db.sync.Synchronizer + :rtype: Synchronizer """ # we want to store at most one syncer for each url, so we also store a # hash of the connection credentials and replace the stored syncer for @@ -881,5 +890,42 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): if self._db_handle is not None: self._db_handle.close() + def _get_sync_state(self): + """ + Get the current sync state. + + :return: The current sync state. + :rtype: dict + """ + if self._real_sync_state is not None: + return self._real_sync_state + c = self._db_handle.cursor() + c.execute("SELECT value FROM u1db_config" + " WHERE name = 'sync_state'") + val = c.fetchone() + if val is None: + return None + self._real_sync_state = json.loads(val[0]) + return self._real_sync_state + + def _set_sync_state(self, state): + """ + Set the current sync state. + + :param state: The sync state to be set. + :type state: dict + """ + c = self._db_handle.cursor() + if state is None: + c.execute("DELETE FROM u1db_config" + " WHERE name = 'sync_state'") + else: + c.execute("INSERT OR REPLACE INTO u1db_config" + " VALUES ('sync_state', ?)", + (json.dumps(state),)) + self._real_sync_state = state + + sync_state = property( + _get_sync_state, _set_sync_state, doc="The current sync state.") sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase) diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py new file mode 100644 index 00000000..6e9e23fa --- /dev/null +++ b/client/src/leap/soledad/client/sync.py @@ -0,0 +1,262 @@ +# -*- coding: utf-8 -*- +# sync.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +""" +Sync infrastructure that can be interrupted and recovered. +""" + +import json + + +from u1db import errors +from u1db.sync import Synchronizer as U1DBSynchronizer + + +class ClientSyncState(object): + """ + The state of the current sync session, as stored on the client. + """ + + _private_attrs = [ + '_db', + ] + + _public_attrs = { + 'target_replica_uid': None, + 'target_gen': None, + 'target_trans_id': None, + 'target_my_gen': None, + 'target_my_trans_id': None, + 'target_last_known_gen': None, + 'target_last_known_trans_id': None, + 'my_gen': None, + 'changes': None, + 'sent': 0, + 'received': 0, + } + + @property + def _public_attr_keys(self): + return [k for k in self._public_attrs] + + def __init__(self, db=None): + """ + Initialize the client sync state. + + :param db: The database where to fetch/store the sync state. + :type db: SQLCipherDatabase + """ + self._db = db + self._init_state() + + def __setattr__(self, attr, val): + """ + Prevent setting arbitrary attributes. + + :param attr: The attribute name. + :type attr: str + :param val: The value to be set. + :type val: anything + """ + if attr not in self._public_attr_keys + self._private_attrs: + raise Exception + object.__setattr__(self, attr, val) + + def _init_state(self): + """ + Initialize current sync state, potentially fetching sync info stored + in database. + """ + # set local default attributes + for attr in self._public_attr_keys: + setattr(self, attr, self._public_attrs[attr]) + # fetch info from stored sync state + sync_state = None + if self._db is not None: + sync_state = self._db.sync_state + if sync_state is not None: + for attr in self._public_attr_keys: + setattr(self, attr, sync_state[attr]) + + def save(self): + """ + Save the current sync state in the database. + """ + sync_state = {} + for attr in self._public_attr_keys: + sync_state[attr] = getattr(self, attr) + if self._db is not None: + self._db.sync_state = sync_state + + def clear(self): + """ + Clear the sync state info data. + """ + if self._db is not None: + self._db.sync_state = None + self._init_state() + + def has_stored_info(self): + """ + Return wether there is any sync state info stored on the database. + + :return: Wether there's any sync state info store on db. + :rtype: bool + """ + return self._db is not None and self._db.sync_state is not None + + def __str__(self): + ', '.join(['%s: %s' % (k, getattr(self, k)) for k in self._public_attr_keys]) + +class Synchronizer(U1DBSynchronizer): + """ + Collect the state around synchronizing 2 U1DB replicas. + + Modified to allow for interrupting the synchronization process. + """ + + def stop(self): + """ + Stop the current sync in progress. + """ + self.sync_target.stop() + + def sync(self, autocreate=False): + """ + Synchronize documents between source and target. + + :param autocreate: Wether the target replica should be created or not. + :type autocreate: bool + """ + sync_target = self.sync_target + + # recover current sync state from source database + sync_state = ClientSyncState(self.source) + self.target_replica_uid = sync_state.target_replica_uid + target_gen = sync_state.target_gen + target_trans_id = sync_state.target_trans_id + target_my_gen = sync_state.target_my_gen + target_my_trans_id = sync_state.target_my_trans_id + target_last_known_gen = sync_state.target_last_known_gen + target_last_known_trans_id = \ + sync_state.target_last_known_trans_id + my_gen = sync_state.my_gen + changes = sync_state.changes + sent = sync_state.sent + received = sync_state.received + + # get target identifier, its current generation, + # and its last-seen database generation for this source + ensure_callback = None + if not sync_state.has_stored_info(): + try: + (self.target_replica_uid, target_gen, target_trans_id, + target_my_gen, target_my_trans_id) = \ + sync_target.get_sync_info(self.source._replica_uid) + except errors.DatabaseDoesNotExist: + if not autocreate: + raise + # will try to ask sync_exchange() to create the db + self.target_replica_uid = None + target_gen, target_trans_id = 0, '' + target_my_gen, target_my_trans_id = 0, '' + + # make sure we'll have access to target replica uid once it exists + if self.target_replica_uid is None: + + def ensure_callback(replica_uid): + self.target_replica_uid = replica_uid + + # make sure we're not syncing one replica with itself + if self.target_replica_uid == self.source._replica_uid: + raise errors.InvalidReplicaUID + + # validate the info the target has about the source replica + self.source.validate_gen_and_trans_id( + target_my_gen, target_my_trans_id) + + # what's changed since that generation and this current gen + if not sync_state.has_stored_info(): + my_gen, _, changes = self.source.whats_changed(target_my_gen) + + # get source last-seen database generation for the target + if not sync_state.has_stored_info(): + if self.target_replica_uid is None: + target_last_known_gen, target_last_known_trans_id = 0, '' + else: + target_last_known_gen, target_last_known_trans_id = \ + self.source._get_replica_gen_and_trans_id( + self.target_replica_uid) + + # validate transaction ids + if not changes and target_last_known_gen == target_gen: + if target_trans_id != target_last_known_trans_id: + raise errors.InvalidTransactionId + return my_gen + + # prepare to send all the changed docs + changed_doc_ids = [doc_id for doc_id, _, _ in changes] + docs_to_send = self.source.get_docs( + changed_doc_ids, check_for_conflicts=False, include_deleted=True) + docs_by_generation = [] + idx = 0 + for doc in docs_to_send: + _, gen, trans = changes[idx] + docs_by_generation.append((doc, gen, trans)) + idx += 1 + # store current sync state info + if not sync_state.has_stored_info(): + sync_state.target_replica_uid = self.target_replica_uid + sync_state.target_gen = target_gen + sync_state.target_trans_id = target_trans_id + sync_state.target_my_gen = target_my_gen + sync_state.target_my_trans_id = target_my_trans_id + sync_state.my_gen = my_gen + sync_state.changes = changes + sync_state.target_last_known_trans_id = \ + target_last_known_trans_id + sync_state.target_last_known_gen = target_last_known_gen + sync_state.sent = sent = 0 + sync_state.received = received = 0 + + # exchange documents and try to insert the returned ones with + # the target, return target synced-up-to gen. + # + # The sync_exchange method may be interrupted, in which case it will + # return a tuple of Nones. + new_gen, new_trans_id = sync_target.sync_exchange( + docs_by_generation, self.source._replica_uid, + target_last_known_gen, target_last_known_trans_id, + self._insert_doc_from_target, ensure_callback=ensure_callback, + sync_state=sync_state) + + # save sync state info if the sync was interrupted + if new_gen is None and new_trans_id is None: + sync_state.save() + return my_gen + + # sync exchange was succesfull, remove sync state info from source + sync_state.clear() + + # record target synced-up-to generation including applying what we sent + self.source._set_replica_gen_and_trans_id( + self.target_replica_uid, new_gen, new_trans_id) + # if gapless record current reached generation with target + self._record_sync_info_with_the_target(my_gen) + + return my_gen diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 7b77055c..06e79e63 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # target.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013, 2014 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -25,6 +25,7 @@ import hashlib import hmac import logging import urllib +import threading import simplejson as json from time import sleep @@ -313,6 +314,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ HTTPSyncTarget.__init__(self, url, creds) self._crypto = crypto + self._stopped = True + self._sync_state = None + self._stop_lock = threading.Lock() def _init_post_request(self, url, action, headers, content_length): """ @@ -367,13 +371,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :rtype: list of str """ - def _post_get_doc(received): + def _post_get_doc(): """ Get a sync document from server by means of a POST request. - - :param received: How many documents have already been received in - this sync session. - :type received: int """ entries = ['['] size = 1 @@ -384,7 +384,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): last_known_trans_id=last_known_trans_id, ensure=ensure_callback is not None) # inform server of how many documents have already been received - size += self._prepare(',', entries, received=received) + size += self._prepare( + ',', entries, received=self._sync_state.received) entries.append('\r\n]') size += len(entries[-1]) # send headers @@ -394,13 +395,16 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self._conn.send(entry) return self._response() - received = 0 number_of_changes = None - while number_of_changes is None or received < number_of_changes: + while number_of_changes is None or \ + self._sync_state.received < number_of_changes: + # bail out if sync process was interrupted + if self.stopped is True: + return None, None # try to fetch one document from target - data, _ = _post_get_doc(received) - received += 1 + data, _ = _post_get_doc() + self._sync_state.received += 1 # decode incoming stream entries = None try: @@ -545,7 +549,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): def sync_exchange(self, docs_by_generations, source_replica_uid, last_known_generation, last_known_trans_id, - return_doc_cb, ensure_callback=None): + return_doc_cb, ensure_callback=None, + sync_state=None): """ Find out which documents the remote database does not know about, encrypt and send them. @@ -574,6 +579,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :return: The new generation and transaction id of the target replica. :rtype: tuple """ + self.start() + self._sync_state = sync_state + self._ensure_connection() if self._trace_hook: # for tests self._trace_hook('sync_exchange') @@ -616,7 +624,15 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): cur_target_gen = last_known_generation cur_target_trans_id = last_known_trans_id + # skip docs that were already sent + if self._sync_state.sent > 0: + docs_by_generations = docs_by_generations[self._sync_state.sent:] + + # send docs for doc, gen, trans_id in docs_by_generations: + # allow for interrupting the sync process + if self.stopped is True: + break # skip non-syncable docs if isinstance(doc, SoledadDocument) and not doc.syncable: continue @@ -632,10 +648,41 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): cur_target_gen, cur_target_trans_id = _post_put_doc( headers, cur_target_gen, cur_target_trans_id, id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id) + self._sync_state.sent += 1 + # get docs from target cur_target_gen, cur_target_trans_id = self._get_remote_docs( url, last_known_generation, last_known_trans_id, headers, return_doc_cb, ensure_callback) - + self.stop() return cur_target_gen, cur_target_trans_id + + def start(self): + """ + Mark current sync session as running. + """ + with self._stop_lock: + self._stopped = False + + def stop(self): + """ + Mark current sync session as stopped. + + This will eventually interrupt the sync_exchange() method and return + enough information to the synchronizer so the sync session can be + recovered afterwards. + """ + with self._stop_lock: + self._stopped = True + + @property + def stopped(self): + """ + Return wether this sync session is stopped. + + :return: Wether this sync session is stopped. + :rtype: bool + """ + with self._stop_lock: + return self._stopped is True diff --git a/common/src/leap/soledad/common/tests/test_sync.py b/common/src/leap/soledad/common/tests/test_sync.py new file mode 100644 index 00000000..fd4a2797 --- /dev/null +++ b/common/src/leap/soledad/common/tests/test_sync.py @@ -0,0 +1,176 @@ +# -*- coding: utf-8 -*- +# test_sync.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +import mock +import os +import json +import tempfile +import threading +import time +from urlparse import urljoin + +from leap.soledad.common.couch import ( + CouchServerState, + CouchDatabase, +) + +from leap.soledad.common.tests.u1db_tests import ( + TestCaseWithServer, + simple_doc, +) +from leap.soledad.common.tests.test_couch import CouchDBTestCase +from leap.soledad.common.tests.test_target import ( + make_token_soledad_app, + make_leap_document_for_test, + token_leap_sync_target, +) + +from leap.soledad.client import ( + Soledad, + target, +) + + +class InterruptableSyncTestCase( + CouchDBTestCase, TestCaseWithServer): + """ + Tests for encrypted sync using Soledad server backed by a couch database. + """ + + @staticmethod + def make_app_with_state(state): + return make_token_soledad_app(state) + + make_document_for_test = make_leap_document_for_test + + sync_target = token_leap_sync_target + + def _soledad_instance(self, user='user-uuid', passphrase=u'123', + prefix='', + secrets_path=Soledad.STORAGE_SECRETS_FILE_NAME, + local_db_path='soledad.u1db', server_url='', + cert_file=None, auth_token=None, secret_id=None): + """ + Instantiate Soledad. + """ + + # this callback ensures we save a document which is sent to the shared + # db. + def _put_doc_side_effect(doc): + self._doc_put = doc + + # we need a mocked shared db or else Soledad will try to access the + # network to find if there are uploaded secrets. + class MockSharedDB(object): + + get_doc = mock.Mock(return_value=None) + put_doc = mock.Mock(side_effect=_put_doc_side_effect) + lock = mock.Mock(return_value=('atoken', 300)) + unlock = mock.Mock() + + def __call__(self): + return self + + Soledad._shared_db = MockSharedDB() + return Soledad( + user, + passphrase, + secrets_path=os.path.join(self.tempdir, prefix, secrets_path), + local_db_path=os.path.join( + self.tempdir, prefix, local_db_path), + server_url=server_url, + cert_file=cert_file, + auth_token=auth_token, + secret_id=secret_id) + + def make_app(self): + self.request_state = CouchServerState(self._couch_url, 'shared', + 'tokens') + return self.make_app_with_state(self.request_state) + + def setUp(self): + TestCaseWithServer.setUp(self) + CouchDBTestCase.setUp(self) + self.tempdir = tempfile.mkdtemp(prefix="leap_tests-") + self._couch_url = 'http://localhost:' + str(self.wrapper.port) + + def tearDown(self): + CouchDBTestCase.tearDown(self) + TestCaseWithServer.tearDown(self) + + def test_interruptable_sync(self): + """ + Test if Soledad can sync many smallfiles. + """ + + class _SyncInterruptor(threading.Thread): + """ + A thread meant to interrupt the sync process. + """ + + def __init__(self, soledad, couchdb): + self._soledad = soledad + self._couchdb = couchdb + threading.Thread.__init__(self) + + def run(self): + while db._get_generation() < 2: + time.sleep(1) + self._soledad.stop_sync() + time.sleep(1) + + number_of_docs = 10 + self.startServer() + + # instantiate soledad and create a document + sol = self._soledad_instance( + # token is verified in test_target.make_token_soledad_app + auth_token='auth-token' + ) + _, doclist = sol.get_all_docs() + self.assertEqual([], doclist) + + # create many small files + for i in range(0, number_of_docs): + sol.create_doc(json.loads(simple_doc)) + + # ensure remote db exists before syncing + db = CouchDatabase.open_database( + urljoin(self._couch_url, 'user-user-uuid'), + create=True, + ensure_ddocs=True) + + # create interruptor thread + t = _SyncInterruptor(sol, db) + t.start() + + # sync with server + sol._server_url = self.getURL() + sol.sync() # this will be interrupted when couch db gen >= 2 + t.join() + + # recover the sync process + sol.sync() + + gen, doclist = db.get_all_docs() + self.assertEqual(number_of_docs, len(doclist)) + + # delete remote database + db.delete_database() + db.close() + sol.close() diff --git a/server/changes/feature_5571_allow-for-interrupting-and-recovering-sync b/server/changes/feature_5571_allow-for-interrupting-and-recovering-sync new file mode 100644 index 00000000..0087c535 --- /dev/null +++ b/server/changes/feature_5571_allow-for-interrupting-and-recovering-sync @@ -0,0 +1 @@ + o Allow for interrupting and recovering sync (#5517). -- cgit v1.2.3 From 30f25eb50ade884d807666cb7d6c17c09d56f834 Mon Sep 17 00:00:00 2001 From: drebs Date: Fri, 2 May 2014 14:35:26 -0300 Subject: Add sync status signals (#5517). --- .../changes/feature_5571_add-sync-status-signals | 1 + client/src/leap/soledad/client/__init__.py | 58 +++++++--------------- client/src/leap/soledad/client/events.py | 58 ++++++++++++++++++++++ client/src/leap/soledad/client/target.py | 13 ++++- 4 files changed, 88 insertions(+), 42 deletions(-) create mode 100644 client/changes/feature_5571_add-sync-status-signals create mode 100644 client/src/leap/soledad/client/events.py diff --git a/client/changes/feature_5571_add-sync-status-signals b/client/changes/feature_5571_add-sync-status-signals new file mode 100644 index 00000000..67bc7d9f --- /dev/null +++ b/client/changes/feature_5571_add-sync-status-signals @@ -0,0 +1 @@ + o Add sync status signals (#5517). diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py index f92317e9..2fb33184 100644 --- a/client/src/leap/soledad/client/__init__.py +++ b/client/src/leap/soledad/client/__init__.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # __init__.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013, 2014 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -49,7 +49,11 @@ import scrypt import simplejson as json from leap.common.config import get_path_prefix -from leap.soledad.common import SHARED_DB_NAME +from leap.soledad.common import ( + SHARED_DB_NAME, + soledad_assert, + soledad_assert_type +) from leap.soledad.common.errors import ( InvalidTokenError, NotLockedError, @@ -63,45 +67,17 @@ from leap.soledad.common.crypto import ( MAC_KEY, MAC_METHOD_KEY, ) - -# -# Signaling function -# - -SOLEDAD_CREATING_KEYS = 'Creating keys...' -SOLEDAD_DONE_CREATING_KEYS = 'Done creating keys.' -SOLEDAD_DOWNLOADING_KEYS = 'Downloading keys...' -SOLEDAD_DONE_DOWNLOADING_KEYS = 'Done downloading keys.' -SOLEDAD_UPLOADING_KEYS = 'Uploading keys...' -SOLEDAD_DONE_UPLOADING_KEYS = 'Done uploading keys.' -SOLEDAD_NEW_DATA_TO_SYNC = 'New data available.' -SOLEDAD_DONE_DATA_SYNC = 'Done data sync.' - -# we want to use leap.common.events to emits signals, if it is available. -try: - from leap.common import events - from leap.common.events import signal - SOLEDAD_CREATING_KEYS = events.events_pb2.SOLEDAD_CREATING_KEYS - SOLEDAD_DONE_CREATING_KEYS = events.events_pb2.SOLEDAD_DONE_CREATING_KEYS - SOLEDAD_DOWNLOADING_KEYS = events.events_pb2.SOLEDAD_DOWNLOADING_KEYS - SOLEDAD_DONE_DOWNLOADING_KEYS = \ - events.events_pb2.SOLEDAD_DONE_DOWNLOADING_KEYS - SOLEDAD_UPLOADING_KEYS = events.events_pb2.SOLEDAD_UPLOADING_KEYS - SOLEDAD_DONE_UPLOADING_KEYS = \ - events.events_pb2.SOLEDAD_DONE_UPLOADING_KEYS - SOLEDAD_NEW_DATA_TO_SYNC = events.events_pb2.SOLEDAD_NEW_DATA_TO_SYNC - SOLEDAD_DONE_DATA_SYNC = events.events_pb2.SOLEDAD_DONE_DATA_SYNC - -except ImportError: - # we define a fake signaling function and fake signal constants that will - # allow for logging signaling attempts in case leap.common.events is not - # available. - - def signal(signal, content=""): - logger.info("Would signal: %s - %s." % (str(signal), content)) - - -from leap.soledad.common import soledad_assert, soledad_assert_type +from leap.soledad.client.events import ( + SOLEDAD_CREATING_KEYS, + SOLEDAD_DONE_CREATING_KEYS, + SOLEDAD_DOWNLOADING_KEYS, + SOLEDAD_DONE_DOWNLOADING_KEYS, + SOLEDAD_UPLOADING_KEYS, + SOLEDAD_DONE_UPLOADING_KEYS, + SOLEDAD_NEW_DATA_TO_SYNC, + SOLEDAD_DONE_DATA_SYNC, + signal, +) from leap.soledad.common.document import SoledadDocument from leap.soledad.client.crypto import SoledadCrypto from leap.soledad.client.shared_db import SoledadSharedDatabase diff --git a/client/src/leap/soledad/client/events.py b/client/src/leap/soledad/client/events.py new file mode 100644 index 00000000..c4c09ac5 --- /dev/null +++ b/client/src/leap/soledad/client/events.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- +# signal.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +""" +Signaling functions. +""" + + +SOLEDAD_CREATING_KEYS = 'Creating keys...' +SOLEDAD_DONE_CREATING_KEYS = 'Done creating keys.' +SOLEDAD_DOWNLOADING_KEYS = 'Downloading keys...' +SOLEDAD_DONE_DOWNLOADING_KEYS = 'Done downloading keys.' +SOLEDAD_UPLOADING_KEYS = 'Uploading keys...' +SOLEDAD_DONE_UPLOADING_KEYS = 'Done uploading keys.' +SOLEDAD_NEW_DATA_TO_SYNC = 'New data available.' +SOLEDAD_DONE_DATA_SYNC = 'Done data sync.' +SOLEDAD_SYNC_SEND_STATUS = 'Sync: sent one document.' +SOLEDAD_SYNC_RECEIVE_STATUS = 'Sync: received one document.' + +# we want to use leap.common.events to emits signals, if it is available. +try: + from leap.common import events + from leap.common.events import signal + SOLEDAD_CREATING_KEYS = events.proto.SOLEDAD_CREATING_KEYS + SOLEDAD_DONE_CREATING_KEYS = events.proto.SOLEDAD_DONE_CREATING_KEYS + SOLEDAD_DOWNLOADING_KEYS = events.proto.SOLEDAD_DOWNLOADING_KEYS + SOLEDAD_DONE_DOWNLOADING_KEYS = \ + events.proto.SOLEDAD_DONE_DOWNLOADING_KEYS + SOLEDAD_UPLOADING_KEYS = events.proto.SOLEDAD_UPLOADING_KEYS + SOLEDAD_DONE_UPLOADING_KEYS = \ + events.proto.SOLEDAD_DONE_UPLOADING_KEYS + SOLEDAD_NEW_DATA_TO_SYNC = events.proto.SOLEDAD_NEW_DATA_TO_SYNC + SOLEDAD_DONE_DATA_SYNC = events.proto.SOLEDAD_DONE_DATA_SYNC + SOLEDAD_SYNC_SEND_STATUS = events.proto.SOLEDAD_SYNC_SEND_STATUS + SOLEDAD_SYNC_RECEIVE_STATUS = events.proto.SOLEDAD_SYNC_RECEIVE_STATUS + +except ImportError: + # we define a fake signaling function and fake signal constants that will + # allow for logging signaling attempts in case leap.common.events is not + # available. + + def signal(signal, content=""): + logger.info("Would signal: %s - %s." % (str(signal), content)) diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 06e79e63..e27ced08 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -58,6 +58,11 @@ from leap.soledad.client.crypto import ( EncryptionMethods, UnknownEncryptionMethod, ) +from leap.soledad.client.events import ( + SOLEDAD_SYNC_SEND_STATUS, + SOLEDAD_SYNC_RECEIVE_STATUS, + signal, +) logger = logging.getLogger(__name__) @@ -404,7 +409,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): return None, None # try to fetch one document from target data, _ = _post_get_doc() - self._sync_state.received += 1 # decode incoming stream entries = None try: @@ -441,6 +445,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): # end of symmetric decryption # ------------------------------------------------------------- return_doc_cb(doc, entry['gen'], entry['trans_id']) + self._sync_state.received += 1 + signal( + SOLEDAD_SYNC_RECEIVE_STATUS, + "%d/%d" % (self._sync_state.received, number_of_changes)) return entries[0]['new_generation'], entries[0]['new_transaction_id'] def _request(self, method, url_parts, params=None, body=None, @@ -649,6 +657,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): headers, cur_target_gen, cur_target_trans_id, id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id) self._sync_state.sent += 1 + signal( + SOLEDAD_SYNC_SEND_STATUS, + "%d/%d" % (self._sync_state.sent, len(docs_by_generations))) # get docs from target cur_target_gen, cur_target_trans_id = self._get_remote_docs( -- cgit v1.2.3 From adec26112cb0338405f6e6916c06196214cdf348 Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 22 May 2014 18:20:58 -0300 Subject: fix allow interrupt in target client --- client/src/leap/soledad/client/target.py | 39 +++++++++++++++++++++----------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index e27ced08..a3b8ed00 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -63,6 +63,7 @@ from leap.soledad.client.events import ( SOLEDAD_SYNC_RECEIVE_STATUS, signal, ) +from leap.soledad.client.sync import ClientSyncState logger = logging.getLogger(__name__) @@ -410,26 +411,33 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): # try to fetch one document from target data, _ = _post_get_doc() # decode incoming stream - entries = None - try: - entries = json.loads(data) - except ValueError: + parts = data.splitlines() + if not parts or parts[0] != '[' or parts[-1] != ']': raise BrokenSyncStream - # bail out if there are no documents to be received + data = parts[1:-1] + # decode metadata + line, comma = utils.check_and_strip_comma(data[0]) + metadata = None try: - number_of_changes = entries[0]['number_of_changes'] - except IndexError, KeyError: + metadata = json.loads(line) + soledad_assert('number_of_changes' in metadata) + soledad_assert('new_generation' in metadata) + soledad_assert('new_transaction_id' in metadata) + number_of_changes = metadata['number_of_changes'] + except json.JSONDecodeError, AssertionError: raise BrokenSyncStream + # make sure we have replica_uid from fresh new dbs + if ensure_callback and 'replica_uid' in metadata: + ensure_callback(metadata['replica_uid']) + # bail out if there are no documents to be received if number_of_changes == 0: break # decrypt incoming document and insert into local database entry = None try: - entry = entries[1] + entry = json.loads(data[1]) except IndexError: raise BrokenSyncStream - if ensure_callback and 'replica_uid' in res: - ensure_callback(res['replica_uid']) # ------------------------------------------------------------- # symmetric decryption of document's contents # ------------------------------------------------------------- @@ -448,8 +456,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self._sync_state.received += 1 signal( SOLEDAD_SYNC_RECEIVE_STATUS, - "%d/%d" % (self._sync_state.received, number_of_changes)) - return entries[0]['new_generation'], entries[0]['new_transaction_id'] + "%d/%d" % + (self._sync_state.received, number_of_changes)) + return metadata['new_generation'], metadata['new_transaction_id'] def _request(self, method, url_parts, params=None, body=None, content_type=None): @@ -587,8 +596,12 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :return: The new generation and transaction id of the target replica. :rtype: tuple """ - self.start() + # get the sync state information from client self._sync_state = sync_state + if self._sync_state is None: + self._sync_state = ClientSyncState() + + self.start() self._ensure_connection() if self._trace_hook: # for tests -- cgit v1.2.3 From 7bbdd02cdaa4d29bb83660d52c11a1d5f4357c97 Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 22 May 2014 18:27:58 -0300 Subject: Properly raise when couch db does not exist. --- common/src/leap/soledad/common/couch.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index f4696cee..3bc1f543 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -467,6 +467,10 @@ class CouchDatabase(CommonBackend): self._database = Database( urljoin(self._url, self._dbname), self._session) + try: + self._database.info() + except ResourceNotFound: + raise DatabaseDoesNotExist() if replica_uid is not None: self._set_replica_uid(replica_uid) if ensure_ddocs: @@ -1541,8 +1545,8 @@ class CouchServerState(ServerState): :param dbname: The name of the database to ensure. :type dbname: str - :return: The CouchDatabase object and the replica uid. - :rtype: (CouchDatabase, str) + :raise Unauthorized: Always, because Soledad server is not allowed to + create databases. """ raise Unauthorized() @@ -1552,6 +1556,9 @@ class CouchServerState(ServerState): :param dbname: The name of the database to delete. :type dbname: str + + :raise Unauthorized: Always, because Soledad server is not allowed to + delete databases. """ raise Unauthorized() -- cgit v1.2.3 From bb8edd8655a1723c8944c898b1249099759ebf59 Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 22 May 2014 18:43:14 -0300 Subject: Fix tests for new sync process. --- common/setup.py | 2 +- common/src/leap/soledad/common/tests/__init__.py | 44 +++++ .../src/leap/soledad/common/tests/server_state.py | 80 ++++++++++ common/src/leap/soledad/common/tests/test_couch.py | 13 +- .../leap/soledad/common/tests/test_sqlcipher.py | 69 ++++---- .../src/leap/soledad/common/tests/test_target.py | 160 ++++++++++--------- .../soledad/common/tests/u1db_tests/test_sync.py | 4 +- common/src/leap/soledad/common/tests/util.py | 177 +++++++++++++++++++++ server/src/leap/soledad/server/__init__.py | 1 + 9 files changed, 424 insertions(+), 126 deletions(-) create mode 100644 common/src/leap/soledad/common/tests/server_state.py create mode 100644 common/src/leap/soledad/common/tests/util.py diff --git a/common/setup.py b/common/setup.py index e142d958..6ee166ef 100644 --- a/common/setup.py +++ b/common/setup.py @@ -285,7 +285,7 @@ setup( namespace_packages=["leap", "leap.soledad"], packages=find_packages('src', exclude=['leap.soledad.common.tests']), package_dir={'': 'src'}, - test_suite='leap.soledad.common.tests', + test_suite='leap.soledad.common.tests.load_tests', install_requires=utils.parse_requirements(), tests_require=utils.parse_requirements( reqfiles=['pkg/requirements-testing.pip']), diff --git a/common/src/leap/soledad/common/tests/__init__.py b/common/src/leap/soledad/common/tests/__init__.py index 88f98272..a38bdaed 100644 --- a/common/src/leap/soledad/common/tests/__init__.py +++ b/common/src/leap/soledad/common/tests/__init__.py @@ -1,3 +1,21 @@ +# -*- coding: utf-8 -*- +# __init__.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + """ Tests to make sure Soledad provides U1DB functionality and more. """ @@ -273,3 +291,29 @@ RZXoH+FTg9UAW87eqU610npOkT6cRaBxaMK/mDtGNdc= =JTFu -----END PGP PRIVATE KEY BLOCK----- """ + + +def load_tests(): + """ + Build a test suite that includes all tests in leap.soledad.common.tests + but does not include tests in the u1db_tests/ subfolder. The reason for + not including those tests are: + + 1. they by themselves only test u1db functionality in the u1db module + (despite we use them as basis for testing soledad functionalities). + + 2. they would fail because we monkey patch u1db's remote http server + to add soledad functionality we need. + """ + import unittest + import glob + import imp + tests_prefix = os.path.join( + '.', 'src', 'leap', 'soledad', 'common', 'tests') + suite = unittest.TestSuite() + for testcase in glob.glob(os.path.join(tests_prefix, 'test_*.py')): + modname = os.path.basename(os.path.splitext(testcase)[0]) + f, pathname, description = imp.find_module(modname, [tests_prefix]) + module = imp.load_module(modname, f, pathname, description) + suite.addTest(unittest.TestLoader().loadTestsFromModule(module)) + return suite diff --git a/common/src/leap/soledad/common/tests/server_state.py b/common/src/leap/soledad/common/tests/server_state.py new file mode 100644 index 00000000..2bc15377 --- /dev/null +++ b/common/src/leap/soledad/common/tests/server_state.py @@ -0,0 +1,80 @@ +# -*- coding: utf-8 -*- +# server_state.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +""" +State for servers to be used in tests. +""" + + +import os +import errno +import tempfile + + +from u1db.remote.server_state import ServerState +from leap.soledad.common.tests.util import ( + copy_sqlcipher_database_for_test, +) + + +class ServerStateForTests(ServerState): + """Passed to a Request when it is instantiated. + + This is used to track server-side state, such as working-directory, open + databases, etc. + """ + + def __init__(self): + self._workingdir = tempfile.mkdtemp() + + def _relpath(self, relpath): + return os.path.join(self._workingdir, relpath) + + def open_database(self, path): + """Open a database at the given location.""" + from leap.soledad.client.sqlcipher import SQLCipherDatabase + return SQLCipherDatabase.open_database(path, '123', False) + + def create_database(self, path): + """Create a database at the given location.""" + from leap.soledad.client.sqlcipher import SQLCipherDatabase + return SQLCipherDatabase.open_database(path, '123', True) + + def check_database(self, path): + """Check if the database at the given location exists. + + Simply returns if it does or raises DatabaseDoesNotExist. + """ + db = self.open_database(path) + db.close() + + def ensure_database(self, path): + """Ensure database at the given location.""" + from leap.soledad.client.sqlcipher import SQLCipherDatabase + full_path = self._relpath(path) + db = SQLCipherDatabase.open_database(full_path, '123', False) + return db, db._replica_uid + + def delete_database(self, path): + """Delete database at the given location.""" + from leap.u1db.backends import sqlite_backend + full_path = self._relpath(path) + sqlite_backend.SQLiteDatabase.delete_database(full_path) + + def _copy_database(self, db): + return copy_sqlcipher_database_for_test(None, db) diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index 17d4a519..a1fa9568 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -249,9 +249,7 @@ class CouchTests(test_backends.AllDatabaseTests, CouchDBTestCase): # if current test is `test_close` we have to use saved objects to # delete the database because the close() method will have removed the # references needed to do it using the CouchDatabase. - if self.id() == \ - 'leap.soledad.common.tests.test_couch.CouchTests.' \ - 'test_close(couch)': + if self.id().endswith('test_couch.CouchTests.test_close(couch)'): session = couch.Session() server = Server(url=self._url, session=session) del(server[self._dbname]) @@ -365,8 +363,6 @@ class CouchDatabaseSyncTargetTests(test_sync.DatabaseSyncTargetTests, # The following tests need that the database have an index, so we fake one. -old_class = couch.CouchDatabase - from u1db.backends.inmemory import InMemoryIndex @@ -444,7 +440,12 @@ class IndexedCouchDatabase(couch.CouchDatabase): return list(set([tuple(key.split('\x01')) for key in keys])) -couch.CouchDatabase = IndexedCouchDatabase +# monkey patch CouchDatabase (once) to include virtual indexes +if getattr(couch.CouchDatabase, '_old_class', None) is None: + old_class = couch.CouchDatabase + IndexedCouchDatabase._old_class = old_class + couch.CouchDatabase = IndexedCouchDatabase + sync_scenarios = [] for name, scenario in COUCH_SCENARIOS: diff --git a/common/src/leap/soledad/common/tests/test_sqlcipher.py b/common/src/leap/soledad/common/tests/test_sqlcipher.py index c79a6045..891aca0f 100644 --- a/common/src/leap/soledad/common/tests/test_sqlcipher.py +++ b/common/src/leap/soledad/common/tests/test_sqlcipher.py @@ -30,6 +30,7 @@ import threading from pysqlcipher import dbapi2 from StringIO import StringIO +from urlparse import urljoin # u1db stuff. @@ -54,19 +55,26 @@ from leap.soledad.common.crypto import ( ENC_JSON_KEY, ENC_SCHEME_KEY, ) -from leap.soledad.client.target import decrypt_doc +from leap.soledad.client.target import ( + decrypt_doc, + SoledadSyncTarget, +) # u1db tests stuff. +from leap.common.testing.basetest import BaseLeapTest from leap.soledad.common.tests import u1db_tests as tests, BaseSoledadTest from leap.soledad.common.tests.u1db_tests import test_sqlite_backend from leap.soledad.common.tests.u1db_tests import test_backends from leap.soledad.common.tests.u1db_tests import test_open from leap.soledad.common.tests.u1db_tests import test_sync -from leap.soledad.client.target import SoledadSyncTarget -from leap.common.testing.basetest import BaseLeapTest - -PASSWORD = '123456' +from leap.soledad.common.tests.util import ( + make_sqlcipher_database_for_test, + copy_sqlcipher_database_for_test, + make_soledad_app, + SoledadWithCouchServerMixin, + PASSWORD, +) #----------------------------------------------------------------------------- @@ -88,32 +96,6 @@ class TestSQLCipherBackendImpl(tests.TestCase): # The following tests come from `u1db.tests.test_backends`. #----------------------------------------------------------------------------- -def make_sqlcipher_database_for_test(test, replica_uid): - db = SQLCipherDatabase(':memory:', PASSWORD) - db._set_replica_uid(replica_uid) - return db - - -def copy_sqlcipher_database_for_test(test, db): - # DO NOT COPY OR REUSE THIS CODE OUTSIDE TESTS: COPYING U1DB DATABASES IS - # THE WRONG THING TO DO, THE ONLY REASON WE DO SO HERE IS TO TEST THAT WE - # CORRECTLY DETECT IT HAPPENING SO THAT WE CAN RAISE ERRORS RATHER THAN - # CORRUPT USER DATA. USE SYNC INSTEAD, OR WE WILL SEND NINJA TO YOUR - # HOUSE. - new_db = SQLCipherDatabase(':memory:', PASSWORD) - tmpfile = StringIO() - for line in db._db_handle.iterdump(): - if not 'sqlite_sequence' in line: # work around bug in iterdump - tmpfile.write('%s\n' % line) - tmpfile.seek(0) - new_db._db_handle = dbapi2.connect(':memory:') - new_db._db_handle.cursor().executescript(tmpfile.read()) - new_db._db_handle.commit() - new_db._set_replica_uid(db._replica_uid) - new_db._factory = db._factory - return new_db - - def make_document_for_test(test, doc_id, rev, content, has_conflicts=False): return SoledadDocument(doc_id, rev, content, has_conflicts=has_conflicts) @@ -451,7 +433,7 @@ sync_scenarios.append(('pyleap', { 'copy_database_for_test': test_sync.copy_database_for_http_test, 'make_document_for_test': make_document_for_test, 'make_app_with_state': tests.test_remote_sync_target.make_http_app, - 'do_sync': sync_via_synchronizer_and_leap, + 'do_sync': test_sync.sync_via_synchronizer, })) @@ -616,7 +598,7 @@ class SQLCipherDatabaseSyncTests( # update on 1 doc1.set_json('{"a": 3}') self.db1.put_doc(doc1) - # conflicts + # conflicts self.sync(self.db2, self.db1) self.sync(db3, self.db1) self.assertTrue(self.db2.get_doc('the-doc').has_conflicts) @@ -658,32 +640,35 @@ class SQLCipherDatabaseSyncTests( 'return': {'docs': [], 'last_gen': 1}}) -def _make_local_db_and_leap_target(test, path='test'): +def _make_local_db_and_token_http_target(test, path='test'): test.startServer() db = test.request_state._create_database(os.path.basename(path)) - st = SoledadSyncTarget.connect(test.getURL(path), test._soledad._crypto) + st = SoledadSyncTarget.connect( + test.getURL(path), crypto=test._soledad._crypto) st.set_token_credentials('user-uuid', 'auth-token') return db, st target_scenarios = [ ('leap', { - 'create_db_and_target': _make_local_db_and_leap_target, - 'make_app_with_state': tests.test_remote_sync_target.make_http_app}), + 'create_db_and_target': _make_local_db_and_token_http_target, +# 'make_app_with_state': tests.test_remote_sync_target.make_http_app, + 'make_app_with_state': make_soledad_app, + 'do_sync': test_sync.sync_via_synchronizer}), ] class SQLCipherSyncTargetTests( - test_sync.DatabaseSyncTargetTests, BaseSoledadTest): + SoledadWithCouchServerMixin, test_sync.DatabaseSyncTargetTests): scenarios = (tests.multiply_scenarios(SQLCIPHER_SCENARIOS, target_scenarios)) - def setUp(self): - test_sync.DatabaseSyncTargetTests.setUp(self) + whitebox = False - def tearDown(self): - test_sync.DatabaseSyncTargetTests.tearDown(self) + def setUp(self): + self.main_test_class = test_sync.DatabaseSyncTargetTests + SoledadWithCouchServerMixin.setUp(self) def test_sync_exchange(self): """ diff --git a/common/src/leap/soledad/common/tests/test_target.py b/common/src/leap/soledad/common/tests/test_target.py index c1e00d52..3457a3e1 100644 --- a/common/src/leap/soledad/common/tests/test_target.py +++ b/common/src/leap/soledad/common/tests/test_target.py @@ -27,6 +27,7 @@ import simplejson as json import cStringIO +from u1db import SyncTarget from u1db.sync import Synchronizer from u1db.remote import ( http_client, @@ -39,14 +40,20 @@ from leap.soledad.client import ( target, auth, VerifiedHTTPSConnection, + sync, ) from leap.soledad.common.document import SoledadDocument -from leap.soledad.server import SoledadApp from leap.soledad.server.auth import SoledadTokenAuthMiddleware from leap.soledad.common.tests import u1db_tests as tests from leap.soledad.common.tests import BaseSoledadTest +from leap.soledad.common.tests.util import ( + make_sqlcipher_database_for_test, + make_soledad_app, + make_token_soledad_app, + SoledadWithCouchServerMixin, +) from leap.soledad.common.tests.u1db_tests import test_backends from leap.soledad.common.tests.u1db_tests import test_http_database from leap.soledad.common.tests.u1db_tests import test_http_client @@ -54,6 +61,10 @@ from leap.soledad.common.tests.u1db_tests import test_document from leap.soledad.common.tests.u1db_tests import test_remote_sync_target from leap.soledad.common.tests.u1db_tests import test_https from leap.soledad.common.tests.u1db_tests import test_sync +from leap.soledad.common.tests.test_couch import ( + CouchDBTestCase, + CouchDBWrapper, +) #----------------------------------------------------------------------------- @@ -66,28 +77,6 @@ def make_leap_document_for_test(test, doc_id, rev, content, doc_id, rev, content, has_conflicts=has_conflicts) -def make_soledad_app(state): - return SoledadApp(state) - - -def make_token_soledad_app(state): - app = SoledadApp(state) - - def _verify_authentication_data(uuid, auth_data): - if uuid == 'user-uuid' and auth_data == 'auth-token': - return True - return False - - # we test for action authorization in leap.soledad.common.tests.test_server - def _verify_authorization(uuid, environ): - return True - - application = SoledadTokenAuthMiddleware(app) - application._verify_authentication_data = _verify_authentication_data - application._verify_authorization = _verify_authorization - return application - - LEAP_SCENARIOS = [ ('http', { 'make_database_for_test': test_backends.make_http_database_for_test, @@ -362,16 +351,47 @@ def token_leap_sync_target(test, path): return st +def make_local_db_and_soledad_target(test, path='test'): + test.startServer() + db = test.request_state._create_database(os.path.basename(path)) + st = target.SoledadSyncTarget.connect( + test.getURL(path), crypto=test._soledad._crypto) + return db, st + + +def make_local_db_and_token_soledad_target(test): + db, st = make_local_db_and_soledad_target(test, 'test') + st.set_token_credentials('user-uuid', 'auth-token') + return db, st + + class TestSoledadSyncTarget( - test_remote_sync_target.TestRemoteSyncTargets, BaseSoledadTest): + SoledadWithCouchServerMixin, + test_remote_sync_target.TestRemoteSyncTargets): scenarios = [ ('token_soledad', {'make_app_with_state': make_token_soledad_app, 'make_document_for_test': make_leap_document_for_test, + 'create_db_and_target': make_local_db_and_token_soledad_target, + 'make_database_for_test': make_sqlcipher_database_for_test, 'sync_target': token_leap_sync_target}), ] + def setUp(self): + tests.TestCaseWithServer.setUp(self) + self.main_test_class = test_remote_sync_target.TestRemoteSyncTargets + SoledadWithCouchServerMixin.setUp(self) + self.startServer() + self.db1 = make_sqlcipher_database_for_test(self, 'test1') + self.db2 = self.request_state._create_database('test2') + + def tearDown(self): + SoledadWithCouchServerMixin.tearDown(self) + tests.TestCaseWithServer.tearDown(self) + db, _ = self.request_state.ensure_database('test2') + db.delete_database() + def test_sync_exchange_send(self): """ Test for sync exchanging send of document. @@ -383,7 +403,7 @@ class TestSoledadSyncTarget( remote_target = self.getSyncTarget('test') other_docs = [] - def receive_doc(doc): + def receive_doc(doc, gen, trans_id): other_docs.append((doc.doc_id, doc.rev, doc.get_json())) doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}') @@ -398,7 +418,10 @@ class TestSoledadSyncTarget( """ Test for sync exchange failure and retry. - This test was adapted to decrypt remote content before assert. + This test was adapted to: + - decrypt remote content before assert. + - not expect a bounced document because soledad has stateful + recoverable sync. """ self.startServer() @@ -412,7 +435,7 @@ class TestSoledadSyncTarget( _put_doc_if_newer = db._put_doc_if_newer trigger_ids = ['doc-here2'] - def bomb_put_doc_if_newer(doc, save_conflict, + def bomb_put_doc_if_newer(self, doc, save_conflict, replica_uid=None, replica_gen=None, replica_trans_id=None): if doc.doc_id in trigger_ids: @@ -421,7 +444,9 @@ class TestSoledadSyncTarget( replica_uid=replica_uid, replica_gen=replica_gen, replica_trans_id=replica_trans_id) - self.patch(db, '_put_doc_if_newer', bomb_put_doc_if_newer) + from leap.soledad.common.tests.test_couch import IndexedCouchDatabase + self.patch( + IndexedCouchDatabase, '_put_doc_if_newer', bomb_put_doc_if_newer) remote_target = self.getSyncTarget('test') other_changes = [] @@ -455,10 +480,11 @@ class TestSoledadSyncTarget( self.assertEqual( (11, 'T-sud'), db._get_replica_gen_and_trans_id('replica')) self.assertEqual(2, new_gen) - # bounced back to us - self.assertEqual( - ('doc-here', 'replica:1', '{"value": "here"}', 1), - other_changes[0][:-1]) + # we do not expect the document to be bounced back because soledad has + # stateful sync + #self.assertEqual( + # ('doc-here', 'replica:1', '{"value": "here"}', 1), + # other_changes[0][:-1]) def test_sync_exchange_send_ensure_callback(self): """ @@ -471,7 +497,7 @@ class TestSoledadSyncTarget( other_docs = [] replica_uid_box = [] - def receive_doc(doc): + def receive_doc(doc, gen, trans_id): other_docs.append((doc.doc_id, doc.rev, doc.get_json())) def ensure_cb(replica_uid): @@ -489,6 +515,11 @@ class TestSoledadSyncTarget( self.assertGetEncryptedDoc( db, 'doc-here', 'replica:1', '{"value": "here"}', False) + def test_sync_exchange_in_stream_error(self): + # we bypass this test because our sync_exchange process does not + # return u1db error 503 "unavailable" for now. + pass + #----------------------------------------------------------------------------- # The following tests come from `u1db.tests.test_https`. @@ -595,42 +626,34 @@ class TestHTTPDatabaseWithCreds( # The following tests come from `u1db.tests.test_sync`. #----------------------------------------------------------------------------- -def _make_local_db_and_leap_target(test, path='test'): - test.startServer() - db = test.request_state._create_database(os.path.basename(path)) - st = target.SoledadSyncTarget.connect( - test.getURL(path), crypto=test._soledad._crypto) - return db, st - - -def _make_local_db_and_token_leap_target(test): - db, st = _make_local_db_and_leap_target(test, 'test') - st.set_token_credentials('user-uuid', 'auth-token') - return db, st - - target_scenarios = [ ('token_leap', {'create_db_and_target': - _make_local_db_and_token_leap_target, - 'make_app_with_state': make_token_soledad_app}), + make_local_db_and_token_soledad_target, + 'make_app_with_state': make_soledad_app}), ] class SoledadDatabaseSyncTargetTests( - test_sync.DatabaseSyncTargetTests, BaseSoledadTest): + SoledadWithCouchServerMixin, test_sync.DatabaseSyncTargetTests): scenarios = ( tests.multiply_scenarios( tests.DatabaseBaseTests.scenarios, target_scenarios)) + whitebox = False + + def setUp(self): + self.main_test_class = test_sync.DatabaseSyncTargetTests + SoledadWithCouchServerMixin.setUp(self) + def test_sync_exchange(self): """ Test sync exchange. This test was adapted to decrypt remote content before assert. """ - sol = _make_local_db_and_leap_target(self) + sol, _ = make_local_db_and_soledad_target(self) docs_by_gen = [ (self.make_document('doc-id', 'replica:1', tests.simple_doc), 10, 'T-sid')] @@ -703,17 +726,15 @@ class SoledadDatabaseSyncTargetTests( [(doc.doc_id, doc.rev), (doc2.doc_id, doc2.rev)]}) -class TestSoledadDbSync(test_sync.TestDbSync, BaseSoledadTest): +class TestSoledadDbSync( + SoledadWithCouchServerMixin, test_sync.TestDbSync): """Test db.sync remote sync shortcut""" scenarios = [ - ('py-http', { - 'make_app_with_state': make_soledad_app, - 'make_database_for_test': tests.make_memory_database_for_test, - }), ('py-token-http', { + 'create_db_and_target': make_local_db_and_token_soledad_target, 'make_app_with_state': make_token_soledad_app, - 'make_database_for_test': tests.make_memory_database_for_test, + 'make_database_for_test': make_sqlcipher_database_for_test, 'token': True }), ] @@ -721,6 +742,10 @@ class TestSoledadDbSync(test_sync.TestDbSync, BaseSoledadTest): oauth = False token = False + def setUp(self): + self.main_test_class = test_sync.TestDbSync + SoledadWithCouchServerMixin.setUp(self) + def do_sync(self, target_name): """ Perform sync using SoledadSyncTarget and Token auth. @@ -748,7 +773,7 @@ class TestSoledadDbSync(test_sync.TestDbSync, BaseSoledadTest): """ doc1 = self.db.create_doc_from_json(tests.simple_doc) doc2 = self.db2.create_doc_from_json(tests.nested_doc) - local_gen_before_sync = self.do_sync('test2.db') + local_gen_before_sync = self.do_sync('test2') gen, _, changes = self.db.whats_changed(local_gen_before_sync) self.assertEqual(1, len(changes)) self.assertEqual(doc2.doc_id, changes[0][0]) @@ -760,24 +785,9 @@ class TestSoledadDbSync(test_sync.TestDbSync, BaseSoledadTest): def test_db_sync_autocreate(self): """ - Test sync. - - Adapted to check for encrypted content. + We bypass this test because we never need to autocreate databases. """ - doc1 = self.db.create_doc_from_json(tests.simple_doc) - local_gen_before_sync = self.do_sync('test3.db') - gen, _, changes = self.db.whats_changed(local_gen_before_sync) - self.assertEqual(0, gen - local_gen_before_sync) - db3 = self.request_state.open_database('test3.db') - gen, _, changes = db3.whats_changed() - self.assertEqual(1, len(changes)) - self.assertEqual(doc1.doc_id, changes[0][0]) - self.assertGetEncryptedDoc( - db3, doc1.doc_id, doc1.rev, tests.simple_doc, False) - t_gen, _ = self.db._get_replica_gen_and_trans_id('test3.db') - s_gen, _ = db3._get_replica_gen_and_trans_id('test1') - self.assertEqual(1, t_gen) - self.assertEqual(1, s_gen) + pass load_tests = tests.load_with_scenarios diff --git a/common/src/leap/soledad/common/tests/u1db_tests/test_sync.py b/common/src/leap/soledad/common/tests/u1db_tests/test_sync.py index a37c36db..633fd8dd 100644 --- a/common/src/leap/soledad/common/tests/u1db_tests/test_sync.py +++ b/common/src/leap/soledad/common/tests/u1db_tests/test_sync.py @@ -1155,12 +1155,12 @@ class TestDbSync(tests.TestCaseWithServer): super(TestDbSync, self).setUp() self.startServer() self.db = self.make_database_for_test(self, 'test1') - self.db2 = self.request_state._create_database('test2.db') + self.db2 = self.request_state._create_database('test2') def test_db_sync(self): doc1 = self.db.create_doc_from_json(tests.simple_doc) doc2 = self.db2.create_doc_from_json(tests.nested_doc) - local_gen_before_sync = self.do_sync('test2.db') + local_gen_before_sync = self.do_sync('test2') gen, _, changes = self.db.whats_changed(local_gen_before_sync) self.assertEqual(1, len(changes)) self.assertEqual(doc2.doc_id, changes[0][0]) diff --git a/common/src/leap/soledad/common/tests/util.py b/common/src/leap/soledad/common/tests/util.py new file mode 100644 index 00000000..249cbdaa --- /dev/null +++ b/common/src/leap/soledad/common/tests/util.py @@ -0,0 +1,177 @@ +# -*- coding: utf-8 -*- +# util.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +""" +Utilities used by multiple test suites. +""" + + +import tempfile +import shutil +from urlparse import urljoin + +from StringIO import StringIO +from pysqlcipher import dbapi2 +from u1db.errors import DatabaseDoesNotExist + + +from leap.soledad.common import soledad_assert +from leap.soledad.common.couch import CouchDatabase, CouchServerState +from leap.soledad.server import SoledadApp +from leap.soledad.server.auth import SoledadTokenAuthMiddleware + + +from leap.soledad.common.tests import u1db_tests as tests, BaseSoledadTest +from leap.soledad.common.tests.test_couch import CouchDBWrapper, CouchDBTestCase + + +from leap.soledad.client.sqlcipher import SQLCipherDatabase + + +PASSWORD = '123456' + + +def make_sqlcipher_database_for_test(test, replica_uid): + db = SQLCipherDatabase(':memory:', PASSWORD) + db._set_replica_uid(replica_uid) + return db + + +def copy_sqlcipher_database_for_test(test, db): + # DO NOT COPY OR REUSE THIS CODE OUTSIDE TESTS: COPYING U1DB DATABASES IS + # THE WRONG THING TO DO, THE ONLY REASON WE DO SO HERE IS TO TEST THAT WE + # CORRECTLY DETECT IT HAPPENING SO THAT WE CAN RAISE ERRORS RATHER THAN + # CORRUPT USER DATA. USE SYNC INSTEAD, OR WE WILL SEND NINJA TO YOUR + # HOUSE. + new_db = SQLCipherDatabase(':memory:', PASSWORD) + tmpfile = StringIO() + for line in db._db_handle.iterdump(): + if not 'sqlite_sequence' in line: # work around bug in iterdump + tmpfile.write('%s\n' % line) + tmpfile.seek(0) + new_db._db_handle = dbapi2.connect(':memory:') + new_db._db_handle.cursor().executescript(tmpfile.read()) + new_db._db_handle.commit() + new_db._set_replica_uid(db._replica_uid) + new_db._factory = db._factory + return new_db + + +def make_soledad_app(state): + return SoledadApp(state) + + +def make_token_soledad_app(state): + app = SoledadApp(state) + + def _verify_authentication_data(uuid, auth_data): + if uuid == 'user-uuid' and auth_data == 'auth-token': + return True + return False + + # we test for action authorization in leap.soledad.common.tests.test_server + def _verify_authorization(uuid, environ): + return True + + application = SoledadTokenAuthMiddleware(app) + application._verify_authentication_data = _verify_authentication_data + application._verify_authorization = _verify_authorization + return application + + +class CouchServerStateForTests(CouchServerState): + """ + This is a slightly modified CouchDB server state that allows for creating + a database. + + Ordinarily, the CouchDB server state does not allow some operations, + because for security purposes the Soledad Server should not even have + enough permissions to perform them. For tests, we allow database creation, + otherwise we'd have to create those databases in setUp/tearDown methods, + which is less pleasant than allowing the db to be automatically created. + """ + + def _create_database(self, dbname): + return CouchDatabase.open_database( + urljoin(self._couch_url, dbname), + True, + replica_uid=dbname, + ensure_ddocs=True) + + def ensure_database(self, dbname): + db = self._create_database(dbname) + return db, db.replica_uid + + +class SoledadWithCouchServerMixin( + BaseSoledadTest, + CouchDBTestCase): + + @classmethod + def setUpClass(cls): + """ + Make sure we have a CouchDB instance for a test. + """ + # from BaseLeapTest + cls.tempdir = tempfile.mkdtemp(prefix="leap_tests-") + # from CouchDBTestCase + cls.wrapper = CouchDBWrapper() + cls.wrapper.start() + #self.db = self.wrapper.db + + @classmethod + def tearDownClass(cls): + """ + Stop CouchDB instance for test. + """ + # from BaseLeapTest + soledad_assert( + cls.tempdir.startswith('/tmp/leap_tests-'), + "beware! tried to remove a dir which does not " + "live in temporal folder!") + shutil.rmtree(cls.tempdir) + # from CouchDBTestCase + cls.wrapper.stop() + + def setUp(self): + BaseSoledadTest.setUp(self) + CouchDBTestCase.setUp(self) + main_test_class = getattr(self, 'main_test_class', None) + if main_test_class is not None: + main_test_class.setUp(self) + self._couch_url = 'http://localhost:%d' % self.wrapper.port + + def tearDown(self): + BaseSoledadTest.tearDown(self) + CouchDBTestCase.tearDown(self) + main_test_class = getattr(self, 'main_test_class', None) + if main_test_class is not None: + main_test_class.tearDown(self) + # delete the test database + try: + db = CouchDatabase(self._couch_url, 'test') + db.delete_database() + except DatabaseDoesNotExist: + pass + + def make_app(self): + couch_url = urljoin( + 'http://localhost:' + str(self.wrapper.port), 'tests') + self.request_state = CouchServerStateForTests( + couch_url, 'shared', 'tokens') + return self.make_app_with_state(self.request_state) diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py index 573afdd6..cd006f51 100644 --- a/server/src/leap/soledad/server/__init__.py +++ b/server/src/leap/soledad/server/__init__.py @@ -258,6 +258,7 @@ class HTTPInvocationByMethodWithBody( raise http_app.BadRequest() +# monkey patch server with new http invocation http_app.HTTPInvocationByMethodWithBody = HTTPInvocationByMethodWithBody -- cgit v1.2.3 From 1427c60d7dff3fcfa6c30e16fd9815fd4787b458 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 28 May 2014 15:24:34 -0300 Subject: Fix stuff from kali's review. --- client/src/leap/soledad/client/sync.py | 15 +++--- client/src/leap/soledad/client/target.py | 4 +- .../ddocs/syncs/views/changes_to_return/map.js | 1 + server/src/leap/soledad/server/sync.py | 57 ++++------------------ 4 files changed, 20 insertions(+), 57 deletions(-) diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index 6e9e23fa..c29dd769 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -52,7 +52,7 @@ class ClientSyncState(object): @property def _public_attr_keys(self): - return [k for k in self._public_attrs] + return self._public_attrs.keys() def __init__(self, db=None): """ @@ -113,15 +113,16 @@ class ClientSyncState(object): def has_stored_info(self): """ - Return wether there is any sync state info stored on the database. + Return whether there is any sync state info stored on the database. - :return: Wether there's any sync state info store on db. + :return: Whether there's any sync state info store on db. :rtype: bool """ return self._db is not None and self._db.sync_state is not None def __str__(self): - ', '.join(['%s: %s' % (k, getattr(self, k)) for k in self._public_attr_keys]) + return 'ClientSyncState: %s' % ', '.join( + ['%s: %s' % (k, getattr(self, k)) for k in self._public_attr_keys]) class Synchronizer(U1DBSynchronizer): """ @@ -140,7 +141,7 @@ class Synchronizer(U1DBSynchronizer): """ Synchronize documents between source and target. - :param autocreate: Wether the target replica should be created or not. + :param autocreate: Whether the target replica should be created or not. :type autocreate: bool """ sync_target = self.sync_target @@ -173,8 +174,8 @@ class Synchronizer(U1DBSynchronizer): raise # will try to ask sync_exchange() to create the db self.target_replica_uid = None - target_gen, target_trans_id = 0, '' - target_my_gen, target_my_trans_id = 0, '' + target_gen, target_trans_id = (0, '') + target_my_gen, target_my_trans_id = (0, '') # make sure we'll have access to target replica uid once it exists if self.target_replica_uid is None: diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index a3b8ed00..93de98d3 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -703,9 +703,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): @property def stopped(self): """ - Return wether this sync session is stopped. + Return whether this sync session is stopped. - :return: Wether this sync session is stopped. + :return: Whether this sync session is stopped. :rtype: bool """ with self._stop_lock: diff --git a/common/src/leap/soledad/common/ddocs/syncs/views/changes_to_return/map.js b/common/src/leap/soledad/common/ddocs/syncs/views/changes_to_return/map.js index 220345dc..04ceb2ec 100644 --- a/common/src/leap/soledad/common/ddocs/syncs/views/changes_to_return/map.js +++ b/common/src/leap/soledad/common/ddocs/syncs/views/changes_to_return/map.js @@ -6,6 +6,7 @@ function(doc) { emit([source_replica_uid, 0], null); else if (changes.length == 0) emit([source_replica_uid, 0], []); + else for (var i = 0; i < changes['changes_to_return'].length; i++) emit( [source_replica_uid, i], diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index 3b8b69fb..16926f14 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -120,7 +120,7 @@ class ServerSyncState(object): resource = self._db._database.resource(*ddoc_path) response = resource.get_json(key=self._key(self._source_replica_uid)) data = response[2] - if len(data['rows']) > 0: + if data['rows']: entry = data['rows'].pop() return entry['value']['seen_ids'] return [] @@ -153,7 +153,7 @@ class ServerSyncState(object): Return information about the current sync state. :return: The generation and transaction id of the target database - which will be synced, and the number of documents do return, + which will be synced, and the number of documents to return, or a tuple of Nones if those have not already been sent to server. :rtype: tuple @@ -165,7 +165,7 @@ class ServerSyncState(object): gen = None trans_id = None number_of_changes = None - if len(data['rows']) > 0 and data['rows'][0]['value'] is not None: + if data['rows'] and data['rows'][0]['value'] is not None: value = data['rows'][0]['value'] gen = value['gen'] trans_id = value['trans_id'] @@ -186,7 +186,7 @@ class ServerSyncState(object): key=self._key( [self._source_replica_uid, received])) data = response[2] - if len(data['rows']) == 0: + if not data['rows']: return None, None, None value = data['rows'][0]['value'] gen = value['gen'] @@ -215,14 +215,6 @@ class SyncExchange(sync.SyncExchange): self._trace_hook = None # recover sync state self._sync_state = ServerSyncState(self._db, self.source_replica_uid) - # for tests - #self._incoming_trace = [] - #if hasattr(self._db, '_incoming_trace'): - # self._incoming_trace = self._db._incoming_trace - #self._db._last_exchange_log = { - # 'receive': {'docs': self._incoming_trace}, - # 'return': None - # } def find_changes_to_return(self, received): @@ -243,10 +235,6 @@ class SyncExchange(sync.SyncExchange): to the source syncing replica. :rtype: int """ - if hasattr(self._db, '_last_exchange_log'): - self._db._last_exchange_log['receive'].update({ # for tests - 'last_known_gen': self.source_last_known_generation - }) # check if changes to return have already been calculated new_gen, new_trans_id, number_of_changes = self._sync_state.sync_info() if number_of_changes is None: @@ -269,9 +257,7 @@ class SyncExchange(sync.SyncExchange): self.new_gen = new_gen self.new_trans_id = new_trans_id # and append one change - self.changes_to_return = [] - if next_change_to_return is not None: - self.changes_to_return.append(next_change_to_return) + self.change_to_return = next_change_to_return return self.new_gen, number_of_changes def return_one_doc(self, return_doc_cb): @@ -287,27 +273,10 @@ class SyncExchange(sync.SyncExchange): replica. :type return_doc_cb: callable(doc, gen, trans_id) """ - changes_to_return = self.changes_to_return - # return docs, including conflicts - changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return] - self._trace('before get_docs') - docs = self._db.get_docs( - changed_doc_ids, check_for_conflicts=False, include_deleted=True) - - docs_by_gen = izip( - docs, (gen for _, gen, _ in changes_to_return), - (trans_id for _, _, trans_id in changes_to_return)) - for doc, gen, trans_id in docs_by_gen: + if self.change_to_return is not None: + changed_doc_id, gen, trans_id = self.change_to_return + doc = self._db.get_doc(changed_doc_id, include_deleted=True) return_doc_cb(doc, gen, trans_id) - # for tests - if hasattr(self._db, '_outgoing_trace'): - self._db._outgoing_trace.append((doc.doc_id, doc.rev)) - # for tests - if hasattr(self._db, '_outgoing_trace'): - self._db._last_exchange_log['return'] = { - 'docs': self._db._outgoing_trace, - 'last_gen': self.new_gen - } def insert_doc_from_source(self, doc, source_gen, trans_id): """Try to insert synced document from source. @@ -342,14 +311,6 @@ class SyncExchange(sync.SyncExchange): else: # conflict that we will returne assert state == 'conflicted' - # for tests - if hasattr(self._db, '_incoming_trace') \ - and hasattr(self._db, '_last_exchange_log'): - self._db._incoming_trace.append((doc.doc_id, doc.rev)) - self._db._last_exchange_log['receive'].update({ - 'source_uid': self.source_replica_uid, - 'source_gen': source_gen - }) class SyncResource(http_app.SyncResource): @@ -373,7 +334,7 @@ class SyncResource(http_app.SyncResource): :param last_known_trans_id: The last server replica transaction_id the client knows about. :type last_known_trans_id: str - :param ensure: Wether the server replica should be created if it does + :param ensure: Whether the server replica should be created if it does not already exist. :type ensure: bool """ -- cgit v1.2.3 From 951ba59425a40d29cf6aea1d5ea56c92ef2404c1 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 28 May 2014 16:00:48 -0300 Subject: Turn SQLCipher.sync_state into a ClientSyncState instance. --- client/src/leap/soledad/client/sqlcipher.py | 36 +++++++++++++++-------------- client/src/leap/soledad/client/sync.py | 20 ++++++++-------- 2 files changed, 29 insertions(+), 27 deletions(-) diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 0885a35f..74351116 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -57,7 +57,7 @@ from pysqlcipher import dbapi2 from u1db.backends import sqlite_backend from u1db import errors as u1db_errors -from leap.soledad.client.sync import Synchronizer +from leap.soledad.client.sync import Synchronizer, ClientSyncState from leap.soledad.client.target import SoledadSyncTarget from leap.soledad.common.document import SoledadDocument @@ -214,7 +214,6 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): syncable=syncable) self.set_document_factory(factory) self._syncers = {} - self._real_sync_state = None @classmethod def _open_database(cls, sqlcipher_file, password, document_factory=None, @@ -890,30 +889,29 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): if self._db_handle is not None: self._db_handle.close() - def _get_sync_state(self): + def _get_stored_sync_state(self): """ - Get the current sync state. + Retrieve the currently stored sync state. - :return: The current sync state. - :rtype: dict + :return: The current stored sync state or None if there's no stored + state. + :rtype: dict or None """ - if self._real_sync_state is not None: - return self._real_sync_state c = self._db_handle.cursor() c.execute("SELECT value FROM u1db_config" " WHERE name = 'sync_state'") val = c.fetchone() if val is None: return None - self._real_sync_state = json.loads(val[0]) - return self._real_sync_state + return json.loads(val[0]) - def _set_sync_state(self, state): + def _set_stored_sync_state(self, state): """ - Set the current sync state. + Stored the sync state. - :param state: The sync state to be set. - :type state: dict + :param state: The sync state to be stored or None to delete any stored + state. + :type state: dict or None """ c = self._db_handle.cursor() if state is None: @@ -923,9 +921,13 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): c.execute("INSERT OR REPLACE INTO u1db_config" " VALUES ('sync_state', ?)", (json.dumps(state),)) - self._real_sync_state = state - sync_state = property( - _get_sync_state, _set_sync_state, doc="The current sync state.") + stored_sync_state = property( + _get_stored_sync_state, _set_stored_sync_state, + doc="The current sync state dict.") + + @property + def sync_state(self): + return ClientSyncState(self) sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase) diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index c29dd769..5285d540 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -86,29 +86,29 @@ class ClientSyncState(object): for attr in self._public_attr_keys: setattr(self, attr, self._public_attrs[attr]) # fetch info from stored sync state - sync_state = None + sync_state_dict = None if self._db is not None: - sync_state = self._db.sync_state - if sync_state is not None: + sync_state_dict = self._db.stored_sync_state + if sync_state_dict is not None: for attr in self._public_attr_keys: - setattr(self, attr, sync_state[attr]) + setattr(self, attr, sync_state_dict[attr]) def save(self): """ Save the current sync state in the database. """ - sync_state = {} + sync_state_dict = {} for attr in self._public_attr_keys: - sync_state[attr] = getattr(self, attr) + sync_state_dict[attr] = getattr(self, attr) if self._db is not None: - self._db.sync_state = sync_state + self._db.stored_sync_state = sync_state_dict def clear(self): """ Clear the sync state info data. """ if self._db is not None: - self._db.sync_state = None + self._db.stored_sync_state = None self._init_state() def has_stored_info(self): @@ -118,7 +118,7 @@ class ClientSyncState(object): :return: Whether there's any sync state info store on db. :rtype: bool """ - return self._db is not None and self._db.sync_state is not None + return self._db is not None and self._db.stored_sync_state is not None def __str__(self): return 'ClientSyncState: %s' % ', '.join( @@ -147,7 +147,7 @@ class Synchronizer(U1DBSynchronizer): sync_target = self.sync_target # recover current sync state from source database - sync_state = ClientSyncState(self.source) + sync_state = self.source.sync_state self.target_replica_uid = sync_state.target_replica_uid target_gen = sync_state.target_gen target_trans_id = sync_state.target_trans_id -- cgit v1.2.3 From fe0628eaccfa71ac20b0878dbc01f48e1a209d2d Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 3 Jun 2014 13:34:47 -0300 Subject: Use a fresh resource for multipart puts (#5739). --- common/src/leap/soledad/common/couch.py | 45 ++++++++++++++------------------- 1 file changed, 19 insertions(+), 26 deletions(-) diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 3bc1f543..b51b32f3 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -40,7 +40,9 @@ from couchdb.http import ( ResourceConflict, ResourceNotFound, ServerError, - Session as CouchHTTPSession, + Session, + urljoin as couch_urljoin, + Resource, ) from u1db import query_parser, vectorclock from u1db.errors import ( @@ -333,17 +335,6 @@ class MultipartWriter(object): self.headers[name] = value -class Session(CouchHTTPSession): - """ - An HTTP session that can be closed. - """ - - def close_connections(self): - for key, conns in list(self.conns.items()): - for conn in conns: - conn.close() - - @contextmanager def couch_server(url): """ @@ -359,7 +350,6 @@ def couch_server(url): session = Session(timeout=COUCH_TIMEOUT) server = Server(url=url, session=session) yield server - session.close_connections() class CouchDatabase(CommonBackend): @@ -511,7 +501,6 @@ class CouchDatabase(CommonBackend): """ with couch_server(self._url) as server: del(server[self._dbname]) - self.close_connections() def close(self): """ @@ -520,20 +509,12 @@ class CouchDatabase(CommonBackend): :return: True if db was succesfully closed. :rtype: bool """ - self.close_connections() self._url = None self._full_commit = None self._session = None self._database = None return True - def close_connections(self): - """ - Close all open connections to the couch server. - """ - if self._session is not None: - self._session.close_connections() - def __del__(self): """ Close the database upon garbage collection. @@ -897,11 +878,9 @@ class CouchDatabase(CommonBackend): envelope.close() # try to save and fail if there's a revision conflict try: - self._database.resource.put_json( + resource = self._new_resource() + resource.put_json( doc.doc_id, body=buf.getvalue(), headers=envelope.headers) - # What follows is a workaround for an ugly bug. See: - # https://leap.se/code/issues/5448 - self.close_connections() except ResourceConflict: raise RevisionConflict() @@ -1473,6 +1452,20 @@ class CouchDatabase(CommonBackend): continue yield t._doc + def _new_resource(self, *path): + """ + Return a new resource for accessing a couch database. + + :return: A resource for accessing a couch database. + :rtype: couchdb.http.Resource + """ + # Workaround for: https://leap.se/code/issues/5448 + url = couch_urljoin(self._database.resource.url, *path) + resource = Resource(url, Session(timeout=COUCH_TIMEOUT)) + resource.credentials = self._database.resource.credentials + resource.headers = self._database.resource.headers.copy() + return resource + class CouchSyncTarget(CommonSyncTarget): """ -- cgit v1.2.3 From fec4e0624d3bb80d19d106cf84d20a71d58623bc Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 3 Jun 2014 13:37:56 -0300 Subject: Fix package build script. --- scripts/build_debian_package.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/build_debian_package.sh b/scripts/build_debian_package.sh index cc62c3ac..1ec9b00a 100755 --- a/scripts/build_debian_package.sh +++ b/scripts/build_debian_package.sh @@ -26,7 +26,7 @@ export GIT_DIR=${workdir}/soledad/.git export GIT_WORK_TREE=${workdir}/soledad git remote add leapcode ${SOLEDAD_MAIN_REPO} git fetch leapcode -git checkout debian +git checkout -b debian leapcode/debian git merge --no-edit ${branch} (cd ${workdir}/soledad && debuild -uc -us) echo "Packages generated in ${workdir}" -- cgit v1.2.3 From 9bf2160ca7ecc0e10aac7f85cdc4967696a03042 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 3 Jun 2014 13:49:30 -0300 Subject: Add password option to client db script. --- scripts/db_access/client_side_db.py | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/scripts/db_access/client_side_db.py b/scripts/db_access/client_side_db.py index 2bf4ab5e..9aadd5fe 100644 --- a/scripts/db_access/client_side_db.py +++ b/scripts/db_access/client_side_db.py @@ -118,30 +118,37 @@ def get_soledad_instance(username, provider, passphrase, basedir): auth_token=token) +class ValidateUserHandle(argparse.Action): + def __call__(self, parser, namespace, values, option_string=None): + m = re.compile('^([^@]+)@([^@]+\.[^@]+)$') + res = m.match(values) + if res == None: + parser.error('User handle should have the form user@provider.') + setattr(namespace, 'username', res.groups()[0]) + setattr(namespace, 'provider', res.groups()[1]) + + # main program if __name__ == '__main__': - class ValidateUserHandle(argparse.Action): - def __call__(self, parser, namespace, values, option_string=None): - m = re.compile('^([^@]+)@([^@]+\.[^@]+)$') - res = m.match(values) - if res == None: - parser.error('User handle should have the form user@provider.') - setattr(namespace, 'username', res.groups()[0]) - setattr(namespace, 'provider', res.groups()[1]) - # parse command line parser = argparse.ArgumentParser() parser.add_argument( 'user@provider', action=ValidateUserHandle, help='the user handle') parser.add_argument( - '-b', dest='basedir', required=False, default=None, help='the user handle') + '-b', dest='basedir', required=False, default=None, + help='soledad base directory') + parser.add_argument( + '-p', dest='passphrase', required=False, default=None, + help='the user passphrase') args = parser.parse_args() # get the password - passphrase = getpass.getpass( - 'Password for %s@%s: ' % (args.username, args.provider)) + passphrase = args.passphrase + if passphrase is None: + passphrase = getpass.getpass( + 'Password for %s@%s: ' % (args.username, args.provider)) # get the basedir basedir = args.basedir -- cgit v1.2.3 From a0cbdc8bbba4369c20bb5b285e464c23d6954e17 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 3 Jun 2014 16:13:22 -0300 Subject: Remove close_connections() from tests. --- common/src/leap/soledad/common/tests/test_couch.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index a1fa9568..3b1e5a06 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -219,7 +219,6 @@ def copy_couch_database_for_test(test, db): new_couch_db.put_attachment(new_doc, att, filename=att_name) # cleanup connections to prevent file descriptor leaking - session.close_connections() return new_db @@ -253,7 +252,6 @@ class CouchTests(test_backends.AllDatabaseTests, CouchDBTestCase): session = couch.Session() server = Server(url=self._url, session=session) del(server[self._dbname]) - session.close_connections() else: self.db.delete_database() test_backends.AllDatabaseTests.tearDown(self) -- cgit v1.2.3 From 7d9d827a5f66993863ca0c532c01ad3bf2c4353e Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 3 Jun 2014 18:43:56 -0300 Subject: Replace client sync state by a sync_id. --- client/src/leap/soledad/client/sqlcipher.py | 42 +---- client/src/leap/soledad/client/sync.py | 176 +++------------------ client/src/leap/soledad/client/target.py | 56 +++---- .../soledad/common/ddocs/syncs/updates/state.js | 10 +- .../ddocs/syncs/views/changes_to_return/map.js | 7 +- .../common/ddocs/syncs/views/seen_ids/map.js | 6 +- .../soledad/common/ddocs/syncs/views/state/map.js | 5 +- server/src/leap/soledad/server/sync.py | 28 ++-- 8 files changed, 84 insertions(+), 246 deletions(-) diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 74351116..26238af6 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -57,7 +57,7 @@ from pysqlcipher import dbapi2 from u1db.backends import sqlite_backend from u1db import errors as u1db_errors -from leap.soledad.client.sync import Synchronizer, ClientSyncState +from leap.soledad.client.sync import Synchronizer from leap.soledad.client.target import SoledadSyncTarget from leap.soledad.common.document import SoledadDocument @@ -889,45 +889,5 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): if self._db_handle is not None: self._db_handle.close() - def _get_stored_sync_state(self): - """ - Retrieve the currently stored sync state. - - :return: The current stored sync state or None if there's no stored - state. - :rtype: dict or None - """ - c = self._db_handle.cursor() - c.execute("SELECT value FROM u1db_config" - " WHERE name = 'sync_state'") - val = c.fetchone() - if val is None: - return None - return json.loads(val[0]) - - def _set_stored_sync_state(self, state): - """ - Stored the sync state. - - :param state: The sync state to be stored or None to delete any stored - state. - :type state: dict or None - """ - c = self._db_handle.cursor() - if state is None: - c.execute("DELETE FROM u1db_config" - " WHERE name = 'sync_state'") - else: - c.execute("INSERT OR REPLACE INTO u1db_config" - " VALUES ('sync_state', ?)", - (json.dumps(state),)) - - stored_sync_state = property( - _get_stored_sync_state, _set_stored_sync_state, - doc="The current sync state dict.") - - @property - def sync_state(self): - return ClientSyncState(self) sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase) diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index 5285d540..56e63416 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -27,103 +27,6 @@ from u1db import errors from u1db.sync import Synchronizer as U1DBSynchronizer -class ClientSyncState(object): - """ - The state of the current sync session, as stored on the client. - """ - - _private_attrs = [ - '_db', - ] - - _public_attrs = { - 'target_replica_uid': None, - 'target_gen': None, - 'target_trans_id': None, - 'target_my_gen': None, - 'target_my_trans_id': None, - 'target_last_known_gen': None, - 'target_last_known_trans_id': None, - 'my_gen': None, - 'changes': None, - 'sent': 0, - 'received': 0, - } - - @property - def _public_attr_keys(self): - return self._public_attrs.keys() - - def __init__(self, db=None): - """ - Initialize the client sync state. - - :param db: The database where to fetch/store the sync state. - :type db: SQLCipherDatabase - """ - self._db = db - self._init_state() - - def __setattr__(self, attr, val): - """ - Prevent setting arbitrary attributes. - - :param attr: The attribute name. - :type attr: str - :param val: The value to be set. - :type val: anything - """ - if attr not in self._public_attr_keys + self._private_attrs: - raise Exception - object.__setattr__(self, attr, val) - - def _init_state(self): - """ - Initialize current sync state, potentially fetching sync info stored - in database. - """ - # set local default attributes - for attr in self._public_attr_keys: - setattr(self, attr, self._public_attrs[attr]) - # fetch info from stored sync state - sync_state_dict = None - if self._db is not None: - sync_state_dict = self._db.stored_sync_state - if sync_state_dict is not None: - for attr in self._public_attr_keys: - setattr(self, attr, sync_state_dict[attr]) - - def save(self): - """ - Save the current sync state in the database. - """ - sync_state_dict = {} - for attr in self._public_attr_keys: - sync_state_dict[attr] = getattr(self, attr) - if self._db is not None: - self._db.stored_sync_state = sync_state_dict - - def clear(self): - """ - Clear the sync state info data. - """ - if self._db is not None: - self._db.stored_sync_state = None - self._init_state() - - def has_stored_info(self): - """ - Return whether there is any sync state info stored on the database. - - :return: Whether there's any sync state info store on db. - :rtype: bool - """ - return self._db is not None and self._db.stored_sync_state is not None - - def __str__(self): - return 'ClientSyncState: %s' % ', '.join( - ['%s: %s' % (k, getattr(self, k)) for k in self._public_attr_keys]) - class Synchronizer(U1DBSynchronizer): """ Collect the state around synchronizing 2 U1DB replicas. @@ -146,36 +49,20 @@ class Synchronizer(U1DBSynchronizer): """ sync_target = self.sync_target - # recover current sync state from source database - sync_state = self.source.sync_state - self.target_replica_uid = sync_state.target_replica_uid - target_gen = sync_state.target_gen - target_trans_id = sync_state.target_trans_id - target_my_gen = sync_state.target_my_gen - target_my_trans_id = sync_state.target_my_trans_id - target_last_known_gen = sync_state.target_last_known_gen - target_last_known_trans_id = \ - sync_state.target_last_known_trans_id - my_gen = sync_state.my_gen - changes = sync_state.changes - sent = sync_state.sent - received = sync_state.received - # get target identifier, its current generation, # and its last-seen database generation for this source ensure_callback = None - if not sync_state.has_stored_info(): - try: - (self.target_replica_uid, target_gen, target_trans_id, - target_my_gen, target_my_trans_id) = \ - sync_target.get_sync_info(self.source._replica_uid) - except errors.DatabaseDoesNotExist: - if not autocreate: - raise - # will try to ask sync_exchange() to create the db - self.target_replica_uid = None - target_gen, target_trans_id = (0, '') - target_my_gen, target_my_trans_id = (0, '') + try: + (self.target_replica_uid, target_gen, target_trans_id, + target_my_gen, target_my_trans_id) = \ + sync_target.get_sync_info(self.source._replica_uid) + except errors.DatabaseDoesNotExist: + if not autocreate: + raise + # will try to ask sync_exchange() to create the db + self.target_replica_uid = None + target_gen, target_trans_id = (0, '') + target_my_gen, target_my_trans_id = (0, '') # make sure we'll have access to target replica uid once it exists if self.target_replica_uid is None: @@ -192,17 +79,15 @@ class Synchronizer(U1DBSynchronizer): target_my_gen, target_my_trans_id) # what's changed since that generation and this current gen - if not sync_state.has_stored_info(): - my_gen, _, changes = self.source.whats_changed(target_my_gen) + my_gen, _, changes = self.source.whats_changed(target_my_gen) # get source last-seen database generation for the target - if not sync_state.has_stored_info(): - if self.target_replica_uid is None: - target_last_known_gen, target_last_known_trans_id = 0, '' - else: - target_last_known_gen, target_last_known_trans_id = \ - self.source._get_replica_gen_and_trans_id( - self.target_replica_uid) + if self.target_replica_uid is None: + target_last_known_gen, target_last_known_trans_id = 0, '' + else: + target_last_known_gen, target_last_known_trans_id = \ + self.source._get_replica_gen_and_trans_id( + self.target_replica_uid) # validate transaction ids if not changes and target_last_known_gen == target_gen: @@ -220,20 +105,6 @@ class Synchronizer(U1DBSynchronizer): _, gen, trans = changes[idx] docs_by_generation.append((doc, gen, trans)) idx += 1 - # store current sync state info - if not sync_state.has_stored_info(): - sync_state.target_replica_uid = self.target_replica_uid - sync_state.target_gen = target_gen - sync_state.target_trans_id = target_trans_id - sync_state.target_my_gen = target_my_gen - sync_state.target_my_trans_id = target_my_trans_id - sync_state.my_gen = my_gen - sync_state.changes = changes - sync_state.target_last_known_trans_id = \ - target_last_known_trans_id - sync_state.target_last_known_gen = target_last_known_gen - sync_state.sent = sent = 0 - sync_state.received = received = 0 # exchange documents and try to insert the returned ones with # the target, return target synced-up-to gen. @@ -243,16 +114,7 @@ class Synchronizer(U1DBSynchronizer): new_gen, new_trans_id = sync_target.sync_exchange( docs_by_generation, self.source._replica_uid, target_last_known_gen, target_last_known_trans_id, - self._insert_doc_from_target, ensure_callback=ensure_callback, - sync_state=sync_state) - - # save sync state info if the sync was interrupted - if new_gen is None and new_trans_id is None: - sync_state.save() - return my_gen - - # sync exchange was succesfull, remove sync state info from source - sync_state.clear() + self._insert_doc_from_target, ensure_callback=ensure_callback) # record target synced-up-to generation including applying what we sent self.source._set_replica_gen_and_trans_id( diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 93de98d3..8f753f74 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -63,7 +63,6 @@ from leap.soledad.client.events import ( SOLEDAD_SYNC_RECEIVE_STATUS, signal, ) -from leap.soledad.client.sync import ClientSyncState logger = logging.getLogger(__name__) @@ -321,7 +320,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): HTTPSyncTarget.__init__(self, url, creds) self._crypto = crypto self._stopped = True - self._sync_state = None self._stop_lock = threading.Lock() def _init_post_request(self, url, action, headers, content_length): @@ -347,7 +345,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self._conn.endheaders() def _get_remote_docs(self, url, last_known_generation, last_known_trans_id, - headers, return_doc_cb, ensure_callback=None): + headers, return_doc_cb, ensure_callback, sync_id): """ Fetch sync documents from the remote database and insert them in the local database. @@ -377,7 +375,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :rtype: list of str """ - def _post_get_doc(): + def _post_get_doc(received): """ Get a sync document from server by means of a POST request. """ @@ -388,10 +386,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): '', entries, last_known_generation=last_known_generation, last_known_trans_id=last_known_trans_id, + sync_id=sync_id, ensure=ensure_callback is not None) # inform server of how many documents have already been received size += self._prepare( - ',', entries, received=self._sync_state.received) + ',', entries, received=received) entries.append('\r\n]') size += len(entries[-1]) # send headers @@ -402,14 +401,16 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): return self._response() number_of_changes = None + received = 0 - while number_of_changes is None or \ - self._sync_state.received < number_of_changes: + new_generation = last_known_generation + new_transaction_id = last_known_trans_id + while number_of_changes is None or received < number_of_changes: # bail out if sync process was interrupted if self.stopped is True: - return None, None + return last_known_generation, last_known_trans_id # try to fetch one document from target - data, _ = _post_get_doc() + data, _ = _post_get_doc(received) # decode incoming stream parts = data.splitlines() if not parts or parts[0] != '[' or parts[-1] != ']': @@ -424,6 +425,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): soledad_assert('new_generation' in metadata) soledad_assert('new_transaction_id' in metadata) number_of_changes = metadata['number_of_changes'] + new_generation = metadata['new_generation'] + new_transaction_id = metadata['new_transaction_id'] except json.JSONDecodeError, AssertionError: raise BrokenSyncStream # make sure we have replica_uid from fresh new dbs @@ -453,12 +456,12 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): # end of symmetric decryption # ------------------------------------------------------------- return_doc_cb(doc, entry['gen'], entry['trans_id']) - self._sync_state.received += 1 + received += 1 signal( SOLEDAD_SYNC_RECEIVE_STATUS, "%d/%d" % - (self._sync_state.received, number_of_changes)) - return metadata['new_generation'], metadata['new_transaction_id'] + (received, number_of_changes)) + return new_generation, new_transaction_id def _request(self, method, url_parts, params=None, body=None, content_type=None): @@ -566,8 +569,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): def sync_exchange(self, docs_by_generations, source_replica_uid, last_known_generation, last_known_trans_id, - return_doc_cb, ensure_callback=None, - sync_state=None): + return_doc_cb, ensure_callback=None): """ Find out which documents the remote database does not know about, encrypt and send them. @@ -596,12 +598,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :return: The new generation and transaction id of the target replica. :rtype: tuple """ - # get the sync state information from client - self._sync_state = sync_state - if self._sync_state is None: - self._sync_state = ClientSyncState() - self.start() + sync_id = str(uuid4()) self._ensure_connection() if self._trace_hook: # for tests @@ -610,7 +608,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): headers = self._sign_request('POST', url, {}) def _post_put_doc(headers, last_known_generation, last_known_trans_id, - id, rev, content, gen, trans_id): + id, rev, content, gen, trans_id, sync_id): """ Put a sync document on server by means of a POST request. @@ -626,6 +624,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): '', entries, last_known_generation=last_known_generation, last_known_trans_id=last_known_trans_id, + sync_id=sync_id, ensure=ensure_callback is not None) # add the document to the request size += self._prepare( @@ -645,11 +644,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): cur_target_gen = last_known_generation cur_target_trans_id = last_known_trans_id - # skip docs that were already sent - if self._sync_state.sent > 0: - docs_by_generations = docs_by_generations[self._sync_state.sent:] - # send docs + sent = 0 + signal( + SOLEDAD_SYNC_SEND_STATUS, + "%d/%d" % (0, len(docs_by_generations))) for doc, gen, trans_id in docs_by_generations: # allow for interrupting the sync process if self.stopped is True: @@ -668,17 +667,18 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): # ------------------------------------------------------------- cur_target_gen, cur_target_trans_id = _post_put_doc( headers, cur_target_gen, cur_target_trans_id, id=doc.doc_id, - rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id) - self._sync_state.sent += 1 + rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id, + sync_id=sync_id) + sent += 1 signal( SOLEDAD_SYNC_SEND_STATUS, - "%d/%d" % (self._sync_state.sent, len(docs_by_generations))) + "%d/%d" % (sent, len(docs_by_generations))) # get docs from target cur_target_gen, cur_target_trans_id = self._get_remote_docs( url, last_known_generation, last_known_trans_id, headers, - return_doc_cb, ensure_callback) + return_doc_cb, ensure_callback, sync_id) self.stop() return cur_target_gen, cur_target_trans_id diff --git a/common/src/leap/soledad/common/ddocs/syncs/updates/state.js b/common/src/leap/soledad/common/ddocs/syncs/updates/state.js index cb2b6b7b..d62aeb40 100644 --- a/common/src/leap/soledad/common/ddocs/syncs/updates/state.js +++ b/common/src/leap/soledad/common/ddocs/syncs/updates/state.js @@ -29,6 +29,7 @@ * '_rev' '', * 'ongoing_syncs': { * '': { + * 'sync_id': '', * 'seen_ids': [['', [, ...], * 'changes_to_return': { * 'gen': , @@ -59,17 +60,22 @@ function(doc, req) { // parse and validate incoming data var body = JSON.parse(req.body); if (body['source_replica_uid'] == null) - return [null, 'invalid data'] + return [null, 'invalid data']; var source_replica_uid = body['source_replica_uid']; + if (body['sync_id'] == null) + return [null, 'invalid data']; + var sync_id = body['sync_id']; + // trash outdated sync data for that replica if that exists if (doc['ongoing_syncs'][source_replica_uid] != null && - doc['ongoing_syncs'][source_replica_uid] == null) + doc['ongoing_syncs'][source_replica_uid]['sync_id'] != sync_id) delete doc['ongoing_syncs'][source_replica_uid]; // create an entry for that source replica if (doc['ongoing_syncs'][source_replica_uid] == null) doc['ongoing_syncs'][source_replica_uid] = { + 'sync_id': sync_id, 'seen_ids': {}, 'changes_to_return': null, }; diff --git a/common/src/leap/soledad/common/ddocs/syncs/views/changes_to_return/map.js b/common/src/leap/soledad/common/ddocs/syncs/views/changes_to_return/map.js index 04ceb2ec..94b7e767 100644 --- a/common/src/leap/soledad/common/ddocs/syncs/views/changes_to_return/map.js +++ b/common/src/leap/soledad/common/ddocs/syncs/views/changes_to_return/map.js @@ -2,14 +2,15 @@ function(doc) { if (doc['_id'] == 'u1db_sync_state' && doc['ongoing_syncs'] != null) for (var source_replica_uid in doc['ongoing_syncs']) { var changes = doc['ongoing_syncs'][source_replica_uid]['changes_to_return']; + var sync_id = doc['ongoing_syncs'][source_replica_uid]['sync_id']; if (changes == null) - emit([source_replica_uid, 0], null); + emit([source_replica_uid, sync_id, 0], null); else if (changes.length == 0) - emit([source_replica_uid, 0], []); + emit([source_replica_uid, sync_id, 0], []); else for (var i = 0; i < changes['changes_to_return'].length; i++) emit( - [source_replica_uid, i], + [source_replica_uid, sync_id, i], { 'gen': changes['gen'], 'trans_id': changes['trans_id'], diff --git a/common/src/leap/soledad/common/ddocs/syncs/views/seen_ids/map.js b/common/src/leap/soledad/common/ddocs/syncs/views/seen_ids/map.js index 34c65b3f..16118e88 100644 --- a/common/src/leap/soledad/common/ddocs/syncs/views/seen_ids/map.js +++ b/common/src/leap/soledad/common/ddocs/syncs/views/seen_ids/map.js @@ -1,9 +1,11 @@ function(doc) { if (doc['_id'] == 'u1db_sync_state' && doc['ongoing_syncs'] != null) - for (var source_replica_uid in doc['ongoing_syncs']) + for (var source_replica_uid in doc['ongoing_syncs']) { + var sync_id = doc['ongoing_syncs'][source_replica_uid]['sync_id']; emit( - source_replica_uid, + [source_replica_uid, sync_id], { 'seen_ids': doc['ongoing_syncs'][source_replica_uid]['seen_ids'], }); + } } diff --git a/common/src/leap/soledad/common/ddocs/syncs/views/state/map.js b/common/src/leap/soledad/common/ddocs/syncs/views/state/map.js index 1d8f8e84..e88c6ebb 100644 --- a/common/src/leap/soledad/common/ddocs/syncs/views/state/map.js +++ b/common/src/leap/soledad/common/ddocs/syncs/views/state/map.js @@ -2,11 +2,12 @@ function(doc) { if (doc['_id'] == 'u1db_sync_state' && doc['ongoing_syncs'] != null) for (var source_replica_uid in doc['ongoing_syncs']) { var changes = doc['ongoing_syncs'][source_replica_uid]['changes_to_return']; + var sync_id = doc['ongoing_syncs'][source_replica_uid]['sync_id']; if (changes == null) - emit(source_replica_uid, null); + emit([source_replica_uid, sync_id], null); else emit( - source_replica_uid, + [source_replica_uid, sync_id], { 'gen': changes['gen'], 'trans_id': changes['trans_id'], diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index 16926f14..c6928aaa 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -48,7 +48,7 @@ class ServerSyncState(object): called 'u1db_sync_state'. """ - def __init__(self, db, source_replica_uid): + def __init__(self, db, source_replica_uid, sync_id): """ Initialize the sync state object. @@ -59,6 +59,7 @@ class ServerSyncState(object): """ self._db = db self._source_replica_uid = source_replica_uid + self._sync_id = sync_id def _key(self, key): """ @@ -91,6 +92,7 @@ class ServerSyncState(object): with CouchDatabase.sync_info_lock[self._db.replica_uid]: res.put_json( body={ + 'sync_id': self._sync_id, 'source_replica_uid': self._source_replica_uid, key: value, }, @@ -118,7 +120,8 @@ class ServerSyncState(object): """ ddoc_path = ['_design', 'syncs', '_view', 'seen_ids'] resource = self._db._database.resource(*ddoc_path) - response = resource.get_json(key=self._key(self._source_replica_uid)) + response = resource.get_json( + key=self._key([self._source_replica_uid, self._sync_id])) data = response[2] if data['rows']: entry = data['rows'].pop() @@ -160,7 +163,8 @@ class ServerSyncState(object): """ ddoc_path = ['_design', 'syncs', '_view', 'state'] resource = self._db._database.resource(*ddoc_path) - response = resource.get_json(key=self._key(self._source_replica_uid)) + response = resource.get_json( + key=self._key([self._source_replica_uid, self._sync_id])) data = response[2] gen = None trans_id = None @@ -184,7 +188,7 @@ class ServerSyncState(object): resource = self._db._database.resource(*ddoc_path) response = resource.get_json( key=self._key( - [self._source_replica_uid, received])) + [self._source_replica_uid, self._sync_id, received])) data = response[2] if not data['rows']: return None, None, None @@ -197,7 +201,7 @@ class ServerSyncState(object): class SyncExchange(sync.SyncExchange): - def __init__(self, db, source_replica_uid, last_known_generation): + def __init__(self, db, source_replica_uid, last_known_generation, sync_id): """ :param db: The target syncing database. :type db: CouchDatabase @@ -210,11 +214,13 @@ class SyncExchange(sync.SyncExchange): self._db = db self.source_replica_uid = source_replica_uid self.source_last_known_generation = last_known_generation + self.sync_id = sync_id self.new_gen = None self.new_trans_id = None self._trace_hook = None # recover sync state - self._sync_state = ServerSyncState(self._db, self.source_replica_uid) + self._sync_state = ServerSyncState( + self._db, self.source_replica_uid, sync_id) def find_changes_to_return(self, received): @@ -322,9 +328,9 @@ class SyncResource(http_app.SyncResource): @http_app.http_method( last_known_generation=int, last_known_trans_id=http_app.none_or_str, - content_as_args=True) + sync_id=http_app.none_or_str, content_as_args=True) def post_args(self, last_known_generation, last_known_trans_id=None, - ensure=False): + sync_id=None, ensure=False): """ Handle the initial arguments for the sync POST request from client. @@ -348,7 +354,7 @@ class SyncResource(http_app.SyncResource): last_known_generation, last_known_trans_id) # get a sync exchange object self.sync_exch = self.sync_exchange_class( - db, self.source_replica_uid, last_known_generation) + db, self.source_replica_uid, last_known_generation, sync_id) @http_app.http_method(content_as_args=True) def post_put(self, id, rev, content, gen, trans_id): @@ -405,8 +411,8 @@ class SyncResource(http_app.SyncResource): def post_end(self): """ - Return the current generation and transaction_id after inserting a - series of incoming documents. + Return the current generation and transaction_id after inserting one + incoming document. """ self.responder.content_type = 'application/x-soledad-sync-response' self.responder.start_response(200) -- cgit v1.2.3 From a35176a298480676d16fe195971ed89b21a78357 Mon Sep 17 00:00:00 2001 From: drebs Date: Fri, 9 Aug 2013 13:29:01 +0200 Subject: Make server auth time-insensitive. --- server/src/leap/soledad/server/auth.py | 16 ++++++++++++---- .../changes/feature_3399-check-auth-in-constant-way | 1 + 2 files changed, 13 insertions(+), 4 deletions(-) create mode 100644 soledad_server/changes/feature_3399-check-auth-in-constant-way diff --git a/server/src/leap/soledad/server/auth.py b/server/src/leap/soledad/server/auth.py index e9d2b032..57f600a1 100644 --- a/server/src/leap/soledad/server/auth.py +++ b/server/src/leap/soledad/server/auth.py @@ -30,6 +30,7 @@ from abc import ABCMeta, abstractmethod from routes.mapper import Mapper from couchdb.client import Server from twisted.python import log +from hashlib import sha512 from leap.soledad.common import ( @@ -415,10 +416,17 @@ class SoledadTokenAuthMiddleware(SoledadAuthMiddleware): server = Server(url=self._app.state.couch_url) dbname = self.TOKENS_DB db = server[dbname] - token = db.get(token) - if token is None or \ - token[self.TOKENS_TYPE_KEY] != self.TOKENS_TYPE_DEF or \ - token[self.TOKENS_USER_ID_KEY] != uuid: + # lookup key is a hash of the token to prevent timing attacks. + token = db.get(sha512(token).hexdigest()) + if token is None: + raise InvalidAuthTokenError() + # we compare uuid hashes to avoid possible timing attacks that + # might exploit python's builtin comparison operator behaviour, + # which fails immediatelly when non-matching bytes are found. + couch_uuid_hash = sha512(token[self.TOKENS_USER_ID_KEY]).digest() + req_uuid_hash = sha512(uuid).digest() + if token[self.TOKENS_TYPE_KEY] != self.TOKENS_TYPE_DEF \ + or couch_uuid_hash != req_uuid_hash: raise InvalidAuthTokenError() return True diff --git a/soledad_server/changes/feature_3399-check-auth-in-constant-way b/soledad_server/changes/feature_3399-check-auth-in-constant-way new file mode 100644 index 00000000..ebd18680 --- /dev/null +++ b/soledad_server/changes/feature_3399-check-auth-in-constant-way @@ -0,0 +1 @@ + o Authenticate in time-insensitive manner. Closes #3399. -- cgit v1.2.3 From ea2e14430bb26c51611d7c5267489d264caa9bf9 Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 5 Jun 2014 10:01:57 -0300 Subject: Add changes file. --- common/changes/bug_5739_fix-multipart-problem | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 common/changes/bug_5739_fix-multipart-problem diff --git a/common/changes/bug_5739_fix-multipart-problem b/common/changes/bug_5739_fix-multipart-problem new file mode 100644 index 00000000..449e09b8 --- /dev/null +++ b/common/changes/bug_5739_fix-multipart-problem @@ -0,0 +1,2 @@ + o Use a dedicated HTTP resource for couch multipart PUTs to avoid bigcouch + bug (#5739). -- cgit v1.2.3 From 68443c469e224d93fc36c7c1014191e883edaf67 Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 5 Jun 2014 10:11:13 -0300 Subject: Reset synchronizer state in order to reuse the same synchronizer multiple times. --- client/changes/bug_reset-synchronizer-state | 2 ++ client/src/leap/soledad/client/__init__.py | 1 + client/src/leap/soledad/client/sqlcipher.py | 4 ++++ 3 files changed, 7 insertions(+) create mode 100644 client/changes/bug_reset-synchronizer-state diff --git a/client/changes/bug_reset-synchronizer-state b/client/changes/bug_reset-synchronizer-state new file mode 100644 index 00000000..9678b36b --- /dev/null +++ b/client/changes/bug_reset-synchronizer-state @@ -0,0 +1,2 @@ + o Reset synchronizer state in order to reuse the same synchronizer object + multiple times. diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py index 2fb33184..656c0e77 100644 --- a/client/src/leap/soledad/client/__init__.py +++ b/client/src/leap/soledad/client/__init__.py @@ -1065,6 +1065,7 @@ class Soledad(object): """ if self._db: # acquire lock before attempt to sync + # TODO: move this lock to inside SQLCipherDatabase. with Soledad.syncing_lock[self._db._get_replica_uid()]: local_gen = self._db.sync( urlparse.urljoin(self.server_url, 'user-%s' % self._uuid), diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 26238af6..576b51ad 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -401,6 +401,10 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): creds=creds, crypto=self._crypto)) self._syncers[url] = (h, syncer) + # in order to reuse the same synchronizer multiple times we have to + # reset its state (i.e. the number of documents received from target + # and inserted in the local replica). + syncer.num_inserted = 0 return syncer def _extra_schema_init(self, c): -- cgit v1.2.3 From 08a2f350690b3a66212f3d4f63b24b20f682f88e Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 5 Jun 2014 10:21:29 -0300 Subject: Move the syncing lock to inside SQLCipherDatabase. --- client/src/leap/soledad/client/__init__.py | 11 ----------- client/src/leap/soledad/client/sqlcipher.py | 26 +++++++++++++++++++++++--- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py index 656c0e77..0d3a21fd 100644 --- a/client/src/leap/soledad/client/__init__.py +++ b/client/src/leap/soledad/client/__init__.py @@ -34,8 +34,6 @@ import urlparse import hmac from hashlib import sha256 -from threading import Lock -from collections import defaultdict try: import cchardet as chardet @@ -224,12 +222,6 @@ class Soledad(object): Prefix for default values for path. """ - syncing_lock = defaultdict(Lock) - """ - A dictionary that hold locks which avoid multiple sync attempts from the - same database replica. - """ - def __init__(self, uuid, passphrase, secrets_path, local_db_path, server_url, cert_file, auth_token=None, secret_id=None): """ @@ -1064,9 +1056,6 @@ class Soledad(object): :rtype: str """ if self._db: - # acquire lock before attempt to sync - # TODO: move this lock to inside SQLCipherDatabase. - with Soledad.syncing_lock[self._db._get_replica_uid()]: local_gen = self._db.sync( urlparse.urljoin(self.server_url, 'user-%s' % self._uuid), creds=self._creds, autocreate=False) diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 576b51ad..5ffa9c7e 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -52,6 +52,7 @@ import json from hashlib import sha256 from contextlib import contextmanager +from collections import defaultdict from pysqlcipher import dbapi2 from u1db.backends import sqlite_backend @@ -153,6 +154,13 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): create_doc_lock = threading.Lock() update_indexes_lock = threading.Lock() + syncing_lock = defaultdict(threading.Lock) + """ + A dictionary that hold locks which avoid multiple sync attempts from the + same database replica. + """ + + def __init__(self, sqlcipher_file, password, document_factory=None, crypto=None, raw_key=False, cipher='aes-256-cbc', kdf_iter=4000, cipher_page_size=1024): @@ -343,6 +351,10 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): """ Synchronize documents with remote replica exposed at url. + There can be at most one instance syncing the same database replica at + the same time, so this method will block until the syncing lock can be + acquired. + :param url: The url of the target replica to sync with. :type url: str :param creds: optional dictionary giving credentials. @@ -355,6 +367,8 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): :rtype: int """ res = None + # the following context manager blocks until the syncing lock can be + # acquired. with self.syncer(url, creds=creds) as syncer: res = syncer.sync(autocreate=autocreate) return res @@ -371,10 +385,16 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): def syncer(self, url, creds=None): """ Accesor for synchronizer. + + As we reuse the same synchronizer for every sync, there can be only + one instance synchronizing the same database replica at the same time. + Because of that, this method blocks until the syncing lock can be + acquired. """ - syncer = self._get_syncer(url, creds=creds) - yield syncer - syncer.sync_target.close() + with SQLCipherDatabase.syncing_lock[self._get_replica_uid()]: + syncer = self._get_syncer(url, creds=creds) + yield syncer + syncer.sync_target.close() def _get_syncer(self, url, creds=None): """ -- cgit v1.2.3 From 9a4fd9e1e7580f4f31acbaea7d3315302c01e42e Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 5 Jun 2014 13:08:59 -0300 Subject: Reorganize profiling scripts. --- scripts/db_access/client_side_db.py | 14 +- .../profiling/backends_cpu_usage/movingaverage.py | 210 +-------------------- scripts/profiling/movingaverage.py | 209 ++++++++++++++++++++ 3 files changed, 214 insertions(+), 219 deletions(-) mode change 100644 => 120000 scripts/profiling/backends_cpu_usage/movingaverage.py create mode 100644 scripts/profiling/movingaverage.py diff --git a/scripts/db_access/client_side_db.py b/scripts/db_access/client_side_db.py index 9aadd5fe..6c456c41 100644 --- a/scripts/db_access/client_side_db.py +++ b/scripts/db_access/client_side_db.py @@ -15,10 +15,14 @@ import srp._pysrp as srp import binascii import logging + from leap.common.config import get_path_prefix from leap.soledad.client import Soledad +from util import ValidateUserHandle + + # create a logger logger = logging.getLogger(__name__) LOG_FORMAT = '%(asctime)s %(message)s' @@ -118,16 +122,6 @@ def get_soledad_instance(username, provider, passphrase, basedir): auth_token=token) -class ValidateUserHandle(argparse.Action): - def __call__(self, parser, namespace, values, option_string=None): - m = re.compile('^([^@]+)@([^@]+\.[^@]+)$') - res = m.match(values) - if res == None: - parser.error('User handle should have the form user@provider.') - setattr(namespace, 'username', res.groups()[0]) - setattr(namespace, 'provider', res.groups()[1]) - - # main program if __name__ == '__main__': diff --git a/scripts/profiling/backends_cpu_usage/movingaverage.py b/scripts/profiling/backends_cpu_usage/movingaverage.py deleted file mode 100644 index bac1b3e1..00000000 --- a/scripts/profiling/backends_cpu_usage/movingaverage.py +++ /dev/null @@ -1,209 +0,0 @@ -#!/usr/bin/env python -# -# Sean Reifschneider, tummy.com, ltd. -# Released into the Public Domain, 2011-02-06 - -import itertools -from itertools import islice -from collections import deque - - -######################################################### -def movingaverage(data, subset_size, data_is_list = None, - avoid_fp_drift = True): - '''Return the moving averages of the data, with a window size of - `subset_size`. `subset_size` must be an integer greater than 0 and - less than the length of the input data, or a ValueError will be raised. - - `data_is_list` can be used to tune the algorithm for list or iteratable - as an input. The default value, `None` will auto-detect this. - The algorithm used if `data` is a list is almost twice as fast as if - it is an iteratable. - - `avoid_fp_drift`, if True (the default) sums every sub-set rather than - keeping a "rolling sum" (which may be subject to floating-point drift). - While more correct, it is also dramatically slower for subset sizes - much larger than 20. - - NOTE: You really should consider setting `avoid_fp_drift = False` unless - you are dealing with very small numbers (say, far smaller than 0.00001) - or require extreme accuracy at the cost of execution time. For - `subset_size` < 20, the performance difference is very small. - ''' - if subset_size < 1: - raise ValueError('subset_size must be 1 or larger') - - if data_is_list is None: - data_is_list = hasattr(data, '__getslice__') - - divisor = float(subset_size) - if data_is_list: - # This only works if we can re-access old elements, but is much faster. - # In other words, it can't be just an iterable, it needs to be a list. - - if subset_size > len(data): - raise ValueError('subset_size must be smaller than data set size') - - if avoid_fp_drift: - for x in range(subset_size, len(data) + 1): - yield sum(data[x - subset_size:x]) / divisor - else: - cur = sum(data[0:subset_size]) - yield cur / divisor - for x in range(subset_size, len(data)): - cur += data[x] - data[x - subset_size] - yield cur / divisor - else: - # Based on the recipe at: - # http://docs.python.org/library/collections.html#deque-recipes - it = iter(data) - d = deque(islice(it, subset_size)) - - if subset_size > len(d): - raise ValueError('subset_size must be smaller than data set size') - - if avoid_fp_drift: - yield sum(d) / divisor - for elem in it: - d.popleft() - d.append(elem) - yield sum(d) / divisor - else: - s = sum(d) - yield s / divisor - for elem in it: - s += elem - d.popleft() - d.append(elem) - yield s / divisor - - -########################## -if __name__ == '__main__': - import unittest - - class TestMovingAverage(unittest.TestCase): - #################### - def test_List(self): - try: - list(movingaverage([1,2,3], 0)) - self.fail('Did not raise ValueError on subset_size=0') - except ValueError: - pass - - try: - list(movingaverage([1,2,3,4,5,6], 7)) - self.fail('Did not raise ValueError on subset_size > len(data)') - except ValueError: - pass - - self.assertEqual(list(movingaverage([1,2,3,4,5,6], 1)), [1,2,3,4,5,6]) - self.assertEqual(list(movingaverage([1,2,3,4,5,6], 2)), - [1.5,2.5,3.5,4.5,5.5]) - self.assertEqual(list(movingaverage(map(float, [1,2,3,4,5,6]), 2)), - [1.5,2.5,3.5,4.5,5.5]) - self.assertEqual(list(movingaverage([1,2,3,4,5,6], 3)), [2,3,4,5]) - self.assertEqual(list(movingaverage([1,2,3,4,5,6], 4)), [2.5,3.5,4.5]) - self.assertEqual(list(movingaverage([1,2,3,4,5,6], 5)), [3,4]) - self.assertEqual(list(movingaverage([1,2,3,4,5,6], 6)), [3.5]) - - self.assertEqual(list(movingaverage([40, 30, 50, 46, 39, 44], - 3, False)), [40.0,42.0,45.0,43.0]) - self.assertEqual(list(movingaverage([40, 30, 50, 46, 39, 44], - 3, True)), [40.0,42.0,45.0,43.0]) - - - ###################### - def test_XRange(self): - try: - list(movingaverage(xrange(1, 4), 0)) - self.fail('Did not raise ValueError on subset_size=0') - except ValueError: - pass - - try: - list(movingaverage(xrange(1, 7), 7)) - self.fail('Did not raise ValueError on subset_size > len(data)') - except ValueError: - pass - - self.assertEqual(list(movingaverage(xrange(1, 7), 1)), [1,2,3,4,5,6]) - self.assertEqual(list(movingaverage(xrange(1, 7), 2)), - [1.5,2.5,3.5,4.5,5.5]) - self.assertEqual(list(movingaverage(iter(map(float, xrange(1, 7))), - 2)), [1.5,2.5,3.5,4.5,5.5]) - self.assertEqual(list(movingaverage(xrange(1, 7), 3)), [2,3,4,5]) - self.assertEqual(list(movingaverage(xrange(1, 7), 4)), [2.5,3.5,4.5]) - self.assertEqual(list(movingaverage(xrange(1, 7), 5)), [3,4]) - self.assertEqual(list(movingaverage(xrange(1, 7), 6)), [3.5]) - - - ########################### - def test_ListRolling(self): - try: - list(movingaverage([1,2,3], 0, avoid_fp_drift = False)) - self.fail('Did not raise ValueError on subset_size=0') - except ValueError: - pass - - try: - list(movingaverage([1,2,3,4,5,6], 7, avoid_fp_drift = False)) - self.fail('Did not raise ValueError on subset_size > len(data)') - except ValueError: - pass - - self.assertEqual(list(movingaverage([1,2,3,4,5,6], 1, - avoid_fp_drift = False)), [1,2,3,4,5,6]) - self.assertEqual(list(movingaverage([1,2,3,4,5,6], 2, - avoid_fp_drift = False)), - [1.5,2.5,3.5,4.5,5.5]) - self.assertEqual(list(movingaverage(map(float, [1,2,3,4,5,6]), 2, - avoid_fp_drift = False)), [1.5,2.5,3.5,4.5,5.5]) - self.assertEqual(list(movingaverage([1,2,3,4,5,6], 3, - avoid_fp_drift = False)), [2,3,4,5]) - self.assertEqual(list(movingaverage([1,2,3,4,5,6], 4, - avoid_fp_drift = False)), [2.5,3.5,4.5]) - self.assertEqual(list(movingaverage([1,2,3,4,5,6], 5, - avoid_fp_drift = False)), [3,4]) - self.assertEqual(list(movingaverage([1,2,3,4,5,6], 6, - avoid_fp_drift = False)), [3.5]) - - self.assertEqual(list(movingaverage([40, 30, 50, 46, 39, 44], - 3, False, avoid_fp_drift = False)), [40.0,42.0,45.0,43.0]) - self.assertEqual(list(movingaverage([40, 30, 50, 46, 39, 44], - 3, True, avoid_fp_drift = False)), [40.0,42.0,45.0,43.0]) - - - ############################# - def test_XRangeRolling(self): - try: - list(movingaverage(xrange(1, 4), 0, avoid_fp_drift = False)) - self.fail('Did not raise ValueError on subset_size=0') - except ValueError: - pass - - try: - list(movingaverage(xrange(1, 7), 7, avoid_fp_drift = False)) - self.fail('Did not raise ValueError on subset_size > len(data)') - except ValueError: - pass - - self.assertEqual(list(movingaverage(xrange(1, 7), 1, - avoid_fp_drift = False)), [1,2,3,4,5,6]) - self.assertEqual(list(movingaverage(xrange(1, 7), 2, - avoid_fp_drift = False)), [1.5,2.5,3.5,4.5,5.5]) - self.assertEqual(list(movingaverage(iter(map(float, xrange(1, 7))), - 2, avoid_fp_drift = False)), [1.5,2.5,3.5,4.5,5.5]) - self.assertEqual(list(movingaverage(xrange(1, 7), 3, - avoid_fp_drift = False)), [2,3,4,5]) - self.assertEqual(list(movingaverage(xrange(1, 7), 4, - avoid_fp_drift = False)), [2.5,3.5,4.5]) - self.assertEqual(list(movingaverage(xrange(1, 7), 5, - avoid_fp_drift = False)), [3,4]) - self.assertEqual(list(movingaverage(xrange(1, 7), 6, - avoid_fp_drift = False)), [3.5]) - - - ###################################################################### - suite = unittest.TestLoader().loadTestsFromTestCase(TestMovingAverage) - unittest.TextTestRunner(verbosity = 2).run(suite) - diff --git a/scripts/profiling/backends_cpu_usage/movingaverage.py b/scripts/profiling/backends_cpu_usage/movingaverage.py new file mode 120000 index 00000000..098b0a01 --- /dev/null +++ b/scripts/profiling/backends_cpu_usage/movingaverage.py @@ -0,0 +1 @@ +../movingaverage.py \ No newline at end of file diff --git a/scripts/profiling/movingaverage.py b/scripts/profiling/movingaverage.py new file mode 100644 index 00000000..bac1b3e1 --- /dev/null +++ b/scripts/profiling/movingaverage.py @@ -0,0 +1,209 @@ +#!/usr/bin/env python +# +# Sean Reifschneider, tummy.com, ltd. +# Released into the Public Domain, 2011-02-06 + +import itertools +from itertools import islice +from collections import deque + + +######################################################### +def movingaverage(data, subset_size, data_is_list = None, + avoid_fp_drift = True): + '''Return the moving averages of the data, with a window size of + `subset_size`. `subset_size` must be an integer greater than 0 and + less than the length of the input data, or a ValueError will be raised. + + `data_is_list` can be used to tune the algorithm for list or iteratable + as an input. The default value, `None` will auto-detect this. + The algorithm used if `data` is a list is almost twice as fast as if + it is an iteratable. + + `avoid_fp_drift`, if True (the default) sums every sub-set rather than + keeping a "rolling sum" (which may be subject to floating-point drift). + While more correct, it is also dramatically slower for subset sizes + much larger than 20. + + NOTE: You really should consider setting `avoid_fp_drift = False` unless + you are dealing with very small numbers (say, far smaller than 0.00001) + or require extreme accuracy at the cost of execution time. For + `subset_size` < 20, the performance difference is very small. + ''' + if subset_size < 1: + raise ValueError('subset_size must be 1 or larger') + + if data_is_list is None: + data_is_list = hasattr(data, '__getslice__') + + divisor = float(subset_size) + if data_is_list: + # This only works if we can re-access old elements, but is much faster. + # In other words, it can't be just an iterable, it needs to be a list. + + if subset_size > len(data): + raise ValueError('subset_size must be smaller than data set size') + + if avoid_fp_drift: + for x in range(subset_size, len(data) + 1): + yield sum(data[x - subset_size:x]) / divisor + else: + cur = sum(data[0:subset_size]) + yield cur / divisor + for x in range(subset_size, len(data)): + cur += data[x] - data[x - subset_size] + yield cur / divisor + else: + # Based on the recipe at: + # http://docs.python.org/library/collections.html#deque-recipes + it = iter(data) + d = deque(islice(it, subset_size)) + + if subset_size > len(d): + raise ValueError('subset_size must be smaller than data set size') + + if avoid_fp_drift: + yield sum(d) / divisor + for elem in it: + d.popleft() + d.append(elem) + yield sum(d) / divisor + else: + s = sum(d) + yield s / divisor + for elem in it: + s += elem - d.popleft() + d.append(elem) + yield s / divisor + + +########################## +if __name__ == '__main__': + import unittest + + class TestMovingAverage(unittest.TestCase): + #################### + def test_List(self): + try: + list(movingaverage([1,2,3], 0)) + self.fail('Did not raise ValueError on subset_size=0') + except ValueError: + pass + + try: + list(movingaverage([1,2,3,4,5,6], 7)) + self.fail('Did not raise ValueError on subset_size > len(data)') + except ValueError: + pass + + self.assertEqual(list(movingaverage([1,2,3,4,5,6], 1)), [1,2,3,4,5,6]) + self.assertEqual(list(movingaverage([1,2,3,4,5,6], 2)), + [1.5,2.5,3.5,4.5,5.5]) + self.assertEqual(list(movingaverage(map(float, [1,2,3,4,5,6]), 2)), + [1.5,2.5,3.5,4.5,5.5]) + self.assertEqual(list(movingaverage([1,2,3,4,5,6], 3)), [2,3,4,5]) + self.assertEqual(list(movingaverage([1,2,3,4,5,6], 4)), [2.5,3.5,4.5]) + self.assertEqual(list(movingaverage([1,2,3,4,5,6], 5)), [3,4]) + self.assertEqual(list(movingaverage([1,2,3,4,5,6], 6)), [3.5]) + + self.assertEqual(list(movingaverage([40, 30, 50, 46, 39, 44], + 3, False)), [40.0,42.0,45.0,43.0]) + self.assertEqual(list(movingaverage([40, 30, 50, 46, 39, 44], + 3, True)), [40.0,42.0,45.0,43.0]) + + + ###################### + def test_XRange(self): + try: + list(movingaverage(xrange(1, 4), 0)) + self.fail('Did not raise ValueError on subset_size=0') + except ValueError: + pass + + try: + list(movingaverage(xrange(1, 7), 7)) + self.fail('Did not raise ValueError on subset_size > len(data)') + except ValueError: + pass + + self.assertEqual(list(movingaverage(xrange(1, 7), 1)), [1,2,3,4,5,6]) + self.assertEqual(list(movingaverage(xrange(1, 7), 2)), + [1.5,2.5,3.5,4.5,5.5]) + self.assertEqual(list(movingaverage(iter(map(float, xrange(1, 7))), + 2)), [1.5,2.5,3.5,4.5,5.5]) + self.assertEqual(list(movingaverage(xrange(1, 7), 3)), [2,3,4,5]) + self.assertEqual(list(movingaverage(xrange(1, 7), 4)), [2.5,3.5,4.5]) + self.assertEqual(list(movingaverage(xrange(1, 7), 5)), [3,4]) + self.assertEqual(list(movingaverage(xrange(1, 7), 6)), [3.5]) + + + ########################### + def test_ListRolling(self): + try: + list(movingaverage([1,2,3], 0, avoid_fp_drift = False)) + self.fail('Did not raise ValueError on subset_size=0') + except ValueError: + pass + + try: + list(movingaverage([1,2,3,4,5,6], 7, avoid_fp_drift = False)) + self.fail('Did not raise ValueError on subset_size > len(data)') + except ValueError: + pass + + self.assertEqual(list(movingaverage([1,2,3,4,5,6], 1, + avoid_fp_drift = False)), [1,2,3,4,5,6]) + self.assertEqual(list(movingaverage([1,2,3,4,5,6], 2, + avoid_fp_drift = False)), + [1.5,2.5,3.5,4.5,5.5]) + self.assertEqual(list(movingaverage(map(float, [1,2,3,4,5,6]), 2, + avoid_fp_drift = False)), [1.5,2.5,3.5,4.5,5.5]) + self.assertEqual(list(movingaverage([1,2,3,4,5,6], 3, + avoid_fp_drift = False)), [2,3,4,5]) + self.assertEqual(list(movingaverage([1,2,3,4,5,6], 4, + avoid_fp_drift = False)), [2.5,3.5,4.5]) + self.assertEqual(list(movingaverage([1,2,3,4,5,6], 5, + avoid_fp_drift = False)), [3,4]) + self.assertEqual(list(movingaverage([1,2,3,4,5,6], 6, + avoid_fp_drift = False)), [3.5]) + + self.assertEqual(list(movingaverage([40, 30, 50, 46, 39, 44], + 3, False, avoid_fp_drift = False)), [40.0,42.0,45.0,43.0]) + self.assertEqual(list(movingaverage([40, 30, 50, 46, 39, 44], + 3, True, avoid_fp_drift = False)), [40.0,42.0,45.0,43.0]) + + + ############################# + def test_XRangeRolling(self): + try: + list(movingaverage(xrange(1, 4), 0, avoid_fp_drift = False)) + self.fail('Did not raise ValueError on subset_size=0') + except ValueError: + pass + + try: + list(movingaverage(xrange(1, 7), 7, avoid_fp_drift = False)) + self.fail('Did not raise ValueError on subset_size > len(data)') + except ValueError: + pass + + self.assertEqual(list(movingaverage(xrange(1, 7), 1, + avoid_fp_drift = False)), [1,2,3,4,5,6]) + self.assertEqual(list(movingaverage(xrange(1, 7), 2, + avoid_fp_drift = False)), [1.5,2.5,3.5,4.5,5.5]) + self.assertEqual(list(movingaverage(iter(map(float, xrange(1, 7))), + 2, avoid_fp_drift = False)), [1.5,2.5,3.5,4.5,5.5]) + self.assertEqual(list(movingaverage(xrange(1, 7), 3, + avoid_fp_drift = False)), [2,3,4,5]) + self.assertEqual(list(movingaverage(xrange(1, 7), 4, + avoid_fp_drift = False)), [2.5,3.5,4.5]) + self.assertEqual(list(movingaverage(xrange(1, 7), 5, + avoid_fp_drift = False)), [3,4]) + self.assertEqual(list(movingaverage(xrange(1, 7), 6, + avoid_fp_drift = False)), [3.5]) + + + ###################################################################### + suite = unittest.TestLoader().loadTestsFromTestCase(TestMovingAverage) + unittest.TextTestRunner(verbosity = 2).run(suite) + -- cgit v1.2.3 From bff6444cc2a119a49f5c259644ffc2d6862f779e Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 5 Jun 2014 14:57:08 -0300 Subject: Add missing doc. --- client/src/leap/soledad/client/target.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 8f753f74..968545b6 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -378,6 +378,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): def _post_get_doc(received): """ Get a sync document from server by means of a POST request. + + :param received: The number of documents already received in the + current sync session. + :type received: int """ entries = ['['] size = 1 -- cgit v1.2.3 From 4dc1cd8662a34893051d2f520b73a0bd0774215c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Touceda?= Date: Fri, 6 Jun 2014 14:10:24 -0300 Subject: Fold in changes --- CHANGELOG | 17 +++++++++++++++++ client/changes/bug_reset-synchronizer-state | 2 -- client/changes/feature_5571_add-sync-status-signals | 1 - ...ture_5571_allow-for-interrupting-and-recovering-sync | 1 - client/changes/feature_5571_split-sync-post | 1 - common/changes/bug_5739_fix-multipart-problem | 2 -- ...ture_5571_allow-for-interrupting-and-recovering-sync | 1 - server/changes/feature_5571_split-sync-post | 1 - .../changes/feature_3399-check-auth-in-constant-way | 1 - 9 files changed, 17 insertions(+), 10 deletions(-) delete mode 100644 client/changes/bug_reset-synchronizer-state delete mode 100644 client/changes/feature_5571_add-sync-status-signals delete mode 100644 client/changes/feature_5571_allow-for-interrupting-and-recovering-sync delete mode 100644 client/changes/feature_5571_split-sync-post delete mode 100644 common/changes/bug_5739_fix-multipart-problem delete mode 100644 server/changes/feature_5571_allow-for-interrupting-and-recovering-sync delete mode 100644 server/changes/feature_5571_split-sync-post delete mode 100644 soledad_server/changes/feature_3399-check-auth-in-constant-way diff --git a/CHANGELOG b/CHANGELOG index ff1ba240..768e31af 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,20 @@ +0.5.2 Jun 6, 2014: +Client: + o Reset synchronizer state in order to reuse the same synchronizer + object multiple times. + o Add sync status signals. Closes #5517. + o Allow for interrupting and recovering sync. Closes #5517. + o Split sync in multiple POST requests in client. Closes #5571. + +Common: + o Use a dedicated HTTP resource for couch multipart PUTs to avoid bigcouch + bug. Fixes #5739. + +Server: + o Allow for interrupting and recovering sync. Closes #5517. + o Split sync in multiple POST requests in server. Closes #5571. + o Authenticate in time-insensitive manner. Closes #3399. + 0.5.1 May 16, 2014: Client: o Close connection with server after syncing to avoid client hanging diff --git a/client/changes/bug_reset-synchronizer-state b/client/changes/bug_reset-synchronizer-state deleted file mode 100644 index 9678b36b..00000000 --- a/client/changes/bug_reset-synchronizer-state +++ /dev/null @@ -1,2 +0,0 @@ - o Reset synchronizer state in order to reuse the same synchronizer object - multiple times. diff --git a/client/changes/feature_5571_add-sync-status-signals b/client/changes/feature_5571_add-sync-status-signals deleted file mode 100644 index 67bc7d9f..00000000 --- a/client/changes/feature_5571_add-sync-status-signals +++ /dev/null @@ -1 +0,0 @@ - o Add sync status signals (#5517). diff --git a/client/changes/feature_5571_allow-for-interrupting-and-recovering-sync b/client/changes/feature_5571_allow-for-interrupting-and-recovering-sync deleted file mode 100644 index 0087c535..00000000 --- a/client/changes/feature_5571_allow-for-interrupting-and-recovering-sync +++ /dev/null @@ -1 +0,0 @@ - o Allow for interrupting and recovering sync (#5517). diff --git a/client/changes/feature_5571_split-sync-post b/client/changes/feature_5571_split-sync-post deleted file mode 100644 index 0d7b14dd..00000000 --- a/client/changes/feature_5571_split-sync-post +++ /dev/null @@ -1 +0,0 @@ - o Split sync in multiple POST requests in client (#5571). diff --git a/common/changes/bug_5739_fix-multipart-problem b/common/changes/bug_5739_fix-multipart-problem deleted file mode 100644 index 449e09b8..00000000 --- a/common/changes/bug_5739_fix-multipart-problem +++ /dev/null @@ -1,2 +0,0 @@ - o Use a dedicated HTTP resource for couch multipart PUTs to avoid bigcouch - bug (#5739). diff --git a/server/changes/feature_5571_allow-for-interrupting-and-recovering-sync b/server/changes/feature_5571_allow-for-interrupting-and-recovering-sync deleted file mode 100644 index 0087c535..00000000 --- a/server/changes/feature_5571_allow-for-interrupting-and-recovering-sync +++ /dev/null @@ -1 +0,0 @@ - o Allow for interrupting and recovering sync (#5517). diff --git a/server/changes/feature_5571_split-sync-post b/server/changes/feature_5571_split-sync-post deleted file mode 100644 index ad269cd4..00000000 --- a/server/changes/feature_5571_split-sync-post +++ /dev/null @@ -1 +0,0 @@ - o Split sync in multiple POST requests in server (#5571). diff --git a/soledad_server/changes/feature_3399-check-auth-in-constant-way b/soledad_server/changes/feature_3399-check-auth-in-constant-way deleted file mode 100644 index ebd18680..00000000 --- a/soledad_server/changes/feature_3399-check-auth-in-constant-way +++ /dev/null @@ -1 +0,0 @@ - o Authenticate in time-insensitive manner. Closes #3399. -- cgit v1.2.3