diff options
25 files changed, 628 insertions, 617 deletions
@@ -1,9 +1,9 @@ -Soledad +Soledad ================================================================== *Synchronization Of Locally Encrypted Data Among Devices* Soledad is the part of LEAP that allows application data to be -securely shared among devices. It provides, to other parts of the +securely shared among devices. It provides, to other parts of the LEAP project, an API for data storage and sync. This software is under development. @@ -53,5 +53,39 @@ to run tests in development mode you must do the following:: scripts/develop_mode.sh ./run_tests.sh -Note that to run CouchDB tests, be sure you have ``CouchDB`` installed on your +Note that to run CouchDB tests, be sure you have `CouchDB`_ installed on your system. + +.. _`CouchDB`: https://couchdb.apache.org/ + +Privileges +----- +In order to prevent privilege escalation, Soledad should not be run as a +database administrator. This implies the following side effects: + +----------------- +Database creation: +----------------- +Can be done via a script located in ``server/pkg/create-user-db`` +It reads a netrc file that should be placed on +``/etc/couchdb/couchdb-admin.netrc``. +That file holds the admin credentials in netrc format and should be accessible +only by 'soledad-admin' user. + +The debian package will do the following in order to automate this: + +* create a user ``soledad-admin`` +* make this script available as ``create-user-db`` in ``/usr/bin`` +* grant restricted sudo access, that only enables user ``soledad`` to call this + exact command via ``soledad-admin`` user. + +The server side process, configured via ``/etc/soledad/soledad-server.conf``, will +then use a parameter called 'create_cmd' to know which command is used to +allocate new databases. All steps of creation process is then handled +automatically by the server, following the same logic as u1db server. + +------------------ +Database deletion: +------------------ +No code at all handles this and privilege to do so needs to be removed as +explained before. This can be automated via a simple cron job. diff --git a/client/changes/bug_7503-do-not-signal-sync-complete b/client/changes/bug_7503-do-not-signal-sync-complete new file mode 100644 index 00000000..4cc361e0 --- /dev/null +++ b/client/changes/bug_7503-do-not-signal-sync-complete @@ -0,0 +1 @@ +o Do not signal sync completion if sync failed. Closes: #7503 diff --git a/client/changes/bug_missing_design_doc_handler b/client/changes/bug_missing_design_doc_handler new file mode 100644 index 00000000..72e42b85 --- /dev/null +++ b/client/changes/bug_missing_design_doc_handler @@ -0,0 +1 @@ +o Handle missing design doc at GET (get_sync_info). Soledad server can handle this during sync. diff --git a/common/changes/create_db_cmd b/common/changes/create_db_cmd new file mode 100644 index 00000000..00bbdf71 --- /dev/null +++ b/common/changes/create_db_cmd @@ -0,0 +1,2 @@ + o Add a sanitized command executor for database creation and re-enable + user database creation on CouchServerState via command line. diff --git a/common/src/leap/soledad/common/command.py b/common/src/leap/soledad/common/command.py new file mode 100644 index 00000000..811bf135 --- /dev/null +++ b/common/src/leap/soledad/common/command.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 -*- +# command.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Utility to sanitize and run shell commands. +""" + + +import subprocess + + +def exec_validated_cmd(cmd, argument, validator=None): + """ + Executes cmd, validating argument with a validator function. + + :param cmd: command. + :type dbname: str + :param argument: argument. + :type argument: str + :param validator: optional function to validate argument + :type validator: function + + :return: exit code and stdout or stderr (if code != 0) + :rtype: (int, str) + """ + if validator and not validator(argument): + return 1, "invalid argument" + command = cmd.split(' ') + command.append(argument) + try: + process = subprocess.Popen(command, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + except OSError, e: + return 1, e + (out, err) = process.communicate() + code = process.wait() + if code is not 0: + return code, err + else: + return code, out diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 1c762036..3dee1473 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -26,13 +26,12 @@ import logging import binascii import time import sys -import threading from StringIO import StringIO -from collections import defaultdict from urlparse import urljoin from contextlib import contextmanager +from multiprocessing.pool import ThreadPool from couchdb.client import Server, Database @@ -61,6 +60,7 @@ from u1db.remote.server_state import ServerState from leap.soledad.common import ddocs, errors +from leap.soledad.common.command import exec_validated_cmd from leap.soledad.common.document import SoledadDocument @@ -103,6 +103,7 @@ class CouchDocument(SoledadDocument): SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts) self.couch_rev = None self.transactions = None + self._conflicts = None def get_conflicts(self): """ @@ -111,7 +112,7 @@ class CouchDocument(SoledadDocument): :return: The conflicted versions of the document. :rtype: [CouchDocument] """ - return self._conflicts + return self._conflicts or [] def set_conflicts(self, conflicts): """ @@ -357,61 +358,19 @@ def couch_server(url): :type url: str """ session = Session(timeout=COUCH_TIMEOUT) - server = Server(url=url, session=session) + server = Server(url=url, full_commit=False, session=session) yield server +THREAD_POOL = ThreadPool(20) + + class CouchDatabase(CommonBackend): """ A U1DB implementation that uses CouchDB as its persistence layer. """ - # We spawn threads to parallelize the CouchDatabase.get_docs() method - MAX_GET_DOCS_THREADS = 20 - - update_handler_lock = defaultdict(threading.Lock) - sync_info_lock = defaultdict(threading.Lock) - - class _GetDocThread(threading.Thread): - - """ - A thread that gets a document from a database. - - TODO: switch this for a twisted deferred to thread. This depends on - replacing python-couchdb for paisley in this module. - """ - - def __init__(self, db, doc_id, check_for_conflicts, - release_fun): - """ - :param db: The database from where to get the document. - :type db: CouchDatabase - :param doc_id: The doc_id of the document to be retrieved. - :type doc_id: str - :param check_for_conflicts: Whether the get_doc() method should - check for existing conflicts. - :type check_for_conflicts: bool - :param release_fun: A function that releases a semaphore, to be - called after the document is fetched. - :type release_fun: function - """ - threading.Thread.__init__(self) - self._db = db - self._doc_id = doc_id - self._check_for_conflicts = check_for_conflicts - self._release_fun = release_fun - self._doc = None - - def run(self): - """ - Fetch the document, store it as a property, and call the release - function. - """ - self._doc = self._db._get_doc( - self._doc_id, self._check_for_conflicts) - self._release_fun() - @classmethod def open_database(cls, url, create, replica_uid=None, ensure_ddocs=False): """ @@ -445,7 +404,7 @@ class CouchDatabase(CommonBackend): return cls( url, dbname, replica_uid=replica_uid, ensure_ddocs=ensure_ddocs) - def __init__(self, url, dbname, replica_uid=None, ensure_ddocs=True): + def __init__(self, url, dbname, replica_uid=None, ensure_ddocs=False): """ Create a new Couch data container. @@ -476,9 +435,24 @@ class CouchDatabase(CommonBackend): self._set_replica_uid(replica_uid) if ensure_ddocs: self.ensure_ddocs_on_db() - # initialize a thread pool for parallelizing get_docs() - self._sem_pool = threading.BoundedSemaphore( - value=self.MAX_GET_DOCS_THREADS) + self.ensure_security_ddoc() + self._cache = None + + @property + def cache(self): + if self._cache is not None: + return self._cache + else: + return {} + + def init_caching(self, cache): + """ + Start using cache by setting internal _cache attribute. + + :param cache: the cache instance, anything that behaves like a dict + :type cache: dict + """ + self._cache = cache def ensure_ddocs_on_db(self): """ @@ -487,13 +461,29 @@ class CouchDatabase(CommonBackend): """ for ddoc_name in ['docs', 'syncs', 'transactions']: try: - self._database.info(ddoc_name) + self._database.resource('_design', + ddoc_name, '_info').get_json() except ResourceNotFound: ddoc = json.loads( binascii.a2b_base64( getattr(ddocs, ddoc_name))) self._database.save(ddoc) + def ensure_security_ddoc(self): + """ + Make sure that only soledad user is able to access this database as + an unprivileged member, meaning that administration access will + be forbidden even inside an user database. + The goal is to make sure that only the lowest access level is given + to the unprivileged CouchDB user set on the server process. + This is achieved by creating a _security design document, see: + http://docs.couchdb.org/en/latest/api/database/security.html + """ + security = self._database.resource.get_json('_security')[2] + security['members'] = {'names': ['soledad'], 'roles': []} + security['admins'] = {'names': [], 'roles': []} + self._database.resource.put_json('_security', body=security) + def get_sync_target(self): """ Return a SyncTarget object, for another u1db to synchronize with. @@ -557,10 +547,14 @@ class CouchDatabase(CommonBackend): :rtype: str """ if self._real_replica_uid is not None: + self.cache[self._url] = {'replica_uid': self._real_replica_uid} return self._real_replica_uid + if self._url in self.cache: + return self.cache[self._url]['replica_uid'] try: # grab replica_uid from server doc = self._database['u1db_config'] + self.cache[self._url] = doc self._real_replica_uid = doc['replica_uid'] return self._real_replica_uid except ResourceNotFound: @@ -595,10 +589,13 @@ class CouchDatabase(CommonBackend): unknown reason. """ # query a couch list function + if self.replica_uid + '_gen' in self.cache: + return self.cache[self.replica_uid + '_gen']['generation'] ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log'] res = self._database.resource(*ddoc_path) try: response = res.get_json() + self.cache[self.replica_uid + '_gen'] = response[2] return response[2]['generation'] except ResourceNotFound as e: raise_missing_design_doc_error(e, ddoc_path) @@ -626,11 +623,15 @@ class CouchDatabase(CommonBackend): design document for an yet unknown reason. """ + if self.replica_uid + '_gen' in self.cache: + response = self.cache[self.replica_uid + '_gen'] + return (response['generation'], response['transaction_id']) # query a couch list function ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log'] res = self._database.resource(*ddoc_path) try: response = res.get_json() + self.cache[self.replica_uid + '_gen'] = response[2] return (response[2]['generation'], response[2]['transaction_id']) except ResourceNotFound as e: raise_missing_design_doc_error(e, ddoc_path) @@ -734,6 +735,10 @@ class CouchDatabase(CommonBackend): attachments=True)[2] except ResourceNotFound: return None + return self.__parse_doc_from_couch(result, doc_id, check_for_conflicts) + + def __parse_doc_from_couch(self, result, doc_id, + check_for_conflicts=False): # restrict to u1db documents if 'u1db_rev' not in result: return None @@ -798,11 +803,8 @@ class CouchDatabase(CommonBackend): """ generation = self._get_generation() - results = [] - for row in self._database.view('_all_docs'): - doc = self.get_doc(row.id, include_deleted=include_deleted) - if doc is not None: - results.append(doc) + results = list(self.get_docs(self._database, + include_deleted=include_deleted)) return (generation, results) def _put_doc(self, old_doc, doc): @@ -887,9 +889,13 @@ class CouchDatabase(CommonBackend): try: resource = self._new_resource() resource.put_json( - doc.doc_id, body=buf.getvalue(), headers=envelope.headers) + doc.doc_id, body=str(buf.getvalue()), headers=envelope.headers) except ResourceConflict: raise RevisionConflict() + if self.replica_uid + '_gen' in self.cache: + gen_info = self.cache[self.replica_uid + '_gen'] + gen_info['generation'] += 1 + gen_info['transaction_id'] = transactions[-1][1] def put_doc(self, doc): """ @@ -1092,14 +1098,22 @@ class CouchDatabase(CommonBackend): synchronized with the replica, this is (0, ''). :rtype: (int, str) """ - # query a couch view - result = self._database.view('syncs/log') - if len(result[other_replica_uid].rows) == 0: - return (0, '') - return ( - result[other_replica_uid].rows[0]['value']['known_generation'], - result[other_replica_uid].rows[0]['value']['known_transaction_id'] - ) + if other_replica_uid in self.cache: + return self.cache[other_replica_uid] + + doc_id = 'u1db_sync_%s' % other_replica_uid + try: + doc = self._database[doc_id] + except ResourceNotFound: + doc = { + '_id': doc_id, + 'generation': 0, + 'transaction_id': '', + } + self._database.save(doc) + result = doc['generation'], doc['transaction_id'] + self.cache[other_replica_uid] = result + return result def _set_replica_gen_and_trans_id(self, other_replica_uid, other_generation, other_transaction_id, @@ -1159,42 +1173,17 @@ class CouchDatabase(CommonBackend): :type doc_idx: int :param sync_id: The id of the current sync session. :type sync_id: str - - :raise MissingDesignDocError: Raised when tried to access a missing - design document. - :raise MissingDesignDocListFunctionError: Raised when trying to access - a missing list function on a - design document. - :raise MissingDesignDocNamedViewError: Raised when trying to access a - missing named view on a design - document. - :raise MissingDesignDocDeletedError: Raised when trying to access a - deleted design document. - :raise MissingDesignDocUnknownError: Raised when failed to access a - design document for an yet - unknown reason. """ - # query a couch update function - ddoc_path = ['_design', 'syncs', '_update', 'put', 'u1db_sync_log'] - res = self._database.resource(*ddoc_path) + self.cache[other_replica_uid] = (other_generation, + other_transaction_id) + doc_id = 'u1db_sync_%s' % other_replica_uid try: - with CouchDatabase.update_handler_lock[self._get_replica_uid()]: - body = { - 'other_replica_uid': other_replica_uid, - 'other_generation': other_generation, - 'other_transaction_id': other_transaction_id, - } - if number_of_docs is not None: - body['number_of_docs'] = number_of_docs - if doc_idx is not None: - body['doc_idx'] = doc_idx - if sync_id is not None: - body['sync_id'] = sync_id - res.put_json( - body=body, - headers={'content-type': 'application/json'}) - except ResourceNotFound as e: - raise_missing_design_doc_error(e, ddoc_path) + doc = self._database[doc_id] + except ResourceNotFound: + doc = {'_id': doc_id} + doc['generation'] = other_generation + doc['transaction_id'] = other_transaction_id + self._database.save(doc) def _force_doc_sync_conflict(self, doc): """ @@ -1203,10 +1192,11 @@ class CouchDatabase(CommonBackend): :param doc: The document to be put. :type doc: CouchDocument """ - my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) - doc.prune_conflicts( - vectorclock.VectorClockRev(doc.rev), self._replica_uid) - doc.add_conflict(my_doc) + my_doc = self._get_doc(doc.doc_id) + self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev)) + doc.add_conflict(self._factory(doc.doc_id, my_doc.rev, + my_doc.get_json())) + doc.has_conflicts = True self._put_doc(my_doc, doc) def resolve_doc(self, doc, conflicted_doc_revs): @@ -1320,42 +1310,27 @@ class CouchDatabase(CommonBackend): """ if not isinstance(doc, CouchDocument): doc = self._factory(doc.doc_id, doc.rev, doc.get_json()) - self._save_source_info(replica_uid, replica_gen, - replica_trans_id, number_of_docs, - doc_idx, sync_id) my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) - if my_doc is not None: - my_doc.set_conflicts( - self.get_doc_conflicts(my_doc.doc_id, my_doc.couch_rev)) - state, save_doc = _process_incoming_doc( - my_doc, doc, save_conflict, self.replica_uid) - if save_doc: - self._put_doc(my_doc, save_doc) - doc.update(save_doc) - return state, self._get_generation() - - def _save_source_info(self, replica_uid, replica_gen, replica_trans_id, - number_of_docs, doc_idx, sync_id): - """ - Validate and save source information. - """ - self._validate_source(replica_uid, replica_gen, replica_trans_id) - self._set_replica_gen_and_trans_id( - replica_uid, replica_gen, replica_trans_id, - number_of_docs=number_of_docs, doc_idx=doc_idx, - sync_id=sync_id) + if my_doc: + doc.set_conflicts(my_doc.get_conflicts()) + return CommonBackend._put_doc_if_newer(self, doc, save_conflict, + replica_uid, replica_gen, + replica_trans_id) + + def _put_and_update_indexes(self, cur_doc, doc): + self._put_doc(cur_doc, doc) def get_docs(self, doc_ids, check_for_conflicts=True, include_deleted=False): """ Get the JSON content for many documents. - :param doc_ids: A list of document identifiers. + :param doc_ids: A list of document identifiers or None for all. :type doc_ids: list :param check_for_conflicts: If set to False, then the conflict check will be skipped, and 'None' will be returned instead of True/False. - :type check_for_conflictsa: bool + :type check_for_conflicts: bool :param include_deleted: If set to True, deleted documents will be returned with empty content. Otherwise deleted documents will not be included in the results. @@ -1374,20 +1349,14 @@ class CouchDatabase(CommonBackend): # This will not be needed when/if we switch from python-couchdb to # paisley. time.strptime('Mar 8 1917', '%b %d %Y') - # spawn threads to retrieve docs - threads = [] - for doc_id in doc_ids: - self._sem_pool.acquire() - t = self._GetDocThread(self, doc_id, check_for_conflicts, - self._sem_pool.release) - t.start() - threads.append(t) - # join threads and yield docs - for t in threads: - t.join() - if t._doc.is_tombstone() and not include_deleted: + get_one = lambda doc_id: self._get_doc(doc_id, check_for_conflicts) + docs = [THREAD_POOL.apply_async(get_one, [doc_id]) + for doc_id in doc_ids] + for doc in docs: + doc = doc.get() + if not doc or not include_deleted and doc.is_tombstone(): continue - yield t._doc + yield doc def _prune_conflicts(self, doc, doc_vcr): """ @@ -1434,13 +1403,27 @@ class CouchSyncTarget(CommonSyncTarget): source_replica_transaction_id) +def is_db_name_valid(name): + """ + Validate a user database using a regular expression. + + :param name: database name. + :type name: str + + :return: boolean for name vailidity + :rtype: bool + """ + db_name_regex = "^user-[a-f0-9]+$" + return re.match(db_name_regex, name) is not None + + class CouchServerState(ServerState): """ Inteface of the WSGI server with the CouchDB backend. """ - def __init__(self, couch_url): + def __init__(self, couch_url, create_cmd=None): """ Initialize the couch server state. @@ -1448,6 +1431,7 @@ class CouchServerState(ServerState): :type couch_url: str """ self.couch_url = couch_url + self.create_cmd = create_cmd def open_database(self, dbname): """ @@ -1459,29 +1443,38 @@ class CouchServerState(ServerState): :return: The CouchDatabase object. :rtype: CouchDatabase """ - return CouchDatabase( + db = CouchDatabase( self.couch_url, dbname, ensure_ddocs=False) + return db def ensure_database(self, dbname): """ Ensure couch database exists. - Usually, this method is used by the server to ensure the existence of - a database. In our setup, the Soledad user that accesses the underlying - couch server should never have permission to create (or delete) - databases. But, in case it ever does, by raising an exception here we - have one more guarantee that no modified client will be able to - enforce creation of a database when syncing. - :param dbname: The name of the database to ensure. :type dbname: str - :raise Unauthorized: Always, because Soledad server is not allowed to - create databases. + :raise Unauthorized: If disabled or other error was raised. + + :return: The CouchDatabase object and its replica_uid. + :rtype: (CouchDatabase, str) """ - raise Unauthorized() + if not self.create_cmd: + raise Unauthorized() + else: + code, out = exec_validated_cmd(self.create_cmd, dbname, + validator=is_db_name_valid) + if code is not 0: + logger.error(""" + Error while creating database (%s) with (%s) command. + Output: %s + Exit code: %d + """ % (dbname, self.create_cmd, out, code)) + raise Unauthorized() + db = self.open_database(dbname) + return db, db.replica_uid def delete_database(self, dbname): """ @@ -1494,53 +1487,3 @@ class CouchServerState(ServerState): delete databases. """ raise Unauthorized() - - -def _process_incoming_doc(my_doc, other_doc, save_conflict, replica_uid): - """ - Check document, save and return state. - """ - # at this point, `doc` has arrived from the other syncing party, and - # we will decide what to do with it. - # First, we prepare the arriving doc to update couch database. - new_doc = CouchDocument( - other_doc.doc_id, other_doc.rev, other_doc.get_json()) - if my_doc is None: - return 'inserted', new_doc - new_doc.couch_rev = my_doc.couch_rev - new_doc.set_conflicts(my_doc.get_conflicts()) - # fetch conflicts because we will eventually manipulate them - # from now on, it works just like u1db sqlite backend - doc_vcr = vectorclock.VectorClockRev(new_doc.rev) - cur_vcr = vectorclock.VectorClockRev(my_doc.rev) - if doc_vcr.is_newer(cur_vcr): - rev = new_doc.rev - new_doc.prune_conflicts(doc_vcr, replica_uid) - if new_doc.rev != rev: - # conflicts have been autoresolved - return 'superseded', new_doc - else: - return'inserted', new_doc - elif new_doc.rev == my_doc.rev: - # magical convergence - return 'converged', None - elif cur_vcr.is_newer(doc_vcr): - # Don't add this to seen_ids, because we have something newer, - # so we should send it back, and we should not generate a - # conflict - other_doc.update(new_doc) - return 'superseded', None - elif my_doc.same_content_as(new_doc): - # the documents have been edited to the same thing at both ends - doc_vcr.maximize(cur_vcr) - doc_vcr.increment(replica_uid) - new_doc.rev = doc_vcr.as_str() - return 'superseded', new_doc - else: - if save_conflict: - new_doc.prune_conflicts( - vectorclock.VectorClockRev(new_doc.rev), replica_uid) - new_doc.add_conflict(my_doc) - return 'conflicted', new_doc - other_doc.update(new_doc) - return 'conflicted', None diff --git a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js deleted file mode 100644 index b0ae2de6..00000000 --- a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js +++ /dev/null @@ -1,151 +0,0 @@ -/** - * The u1db_sync_log document stores both the actual sync log and a list of - * pending updates to the log, in case we receive incoming documents out of - * the correct order (i.e. if there are parallel PUTs during the sync - * process). - * - * The structure of the document is the following: - * - * { - * 'syncs': [ - * ['<replica_uid>', <gen>, '<trans_id>'], - * ... - * ], - * 'pending': { - * 'other_replica_uid': { - * 'sync_id': '<sync_id>', - * 'log': [[<gen>, '<trans_id>'], ...] - * }, - * ... - * } - * } - * - * The update function below does the following: - * - * 0. If we do not receive a sync_id, we just update the 'syncs' list with - * the incoming info about the source replica state. - * - * 1. Otherwise, if the incoming sync_id differs from current stored - * sync_id, then we assume that the previous sync session for that source - * replica was interrupted and discard all pending data. - * - * 2. Then we append incoming info as pending data for that source replica - * and current sync_id, and sort the pending data by generation. - * - * 3. Then we go through pending data and find the most recent generation - * that we can use to update the actual sync log. - * - * 4. Finally, we insert the most up to date information into the sync log. - */ -function(doc, req){ - - // create the document if it doesn't exist - if (!doc) { - doc = {} - doc['_id'] = 'u1db_sync_log'; - doc['syncs'] = []; - } - - // get and validate incoming info - var body = JSON.parse(req.body); - var other_replica_uid = body['other_replica_uid']; - var other_generation = parseInt(body['other_generation']); - var other_transaction_id = body['other_transaction_id'] - var sync_id = body['sync_id']; - var number_of_docs = body['number_of_docs']; - var doc_idx = body['doc_idx']; - - // parse integers - if (number_of_docs != null) - number_of_docs = parseInt(number_of_docs); - if (doc_idx != null) - doc_idx = parseInt(doc_idx); - - if (other_replica_uid == null - || other_generation == null - || other_transaction_id == null) - return [null, 'invalid data']; - - // create slot for pending logs - if (doc['pending'] == null) - doc['pending'] = {}; - - // these are the values that will be actually inserted - var current_gen = other_generation; - var current_trans_id = other_transaction_id; - - /*------------- Wait for sequential values before storing -------------*/ - - // we just try to obtain pending log if we received a sync_id - if (sync_id != null) { - - // create slot for current source and sync_id pending log - if (doc['pending'][other_replica_uid] == null - || doc['pending'][other_replica_uid]['sync_id'] != sync_id) { - doc['pending'][other_replica_uid] = { - 'sync_id': sync_id, - 'log': [], - 'last_doc_idx': 0, - } - } - - // append incoming data to pending log - doc['pending'][other_replica_uid]['log'].push([ - other_generation, - other_transaction_id, - doc_idx, - ]) - - // sort pending log according to generation - doc['pending'][other_replica_uid]['log'].sort(function(a, b) { - return a[0] - b[0]; - }); - - // get most up-to-date information from pending log - var last_doc_idx = doc['pending'][other_replica_uid]['last_doc_idx']; - var pending_idx = doc['pending'][other_replica_uid]['log'][0][2]; - - current_gen = null; - current_trans_id = null; - - while (last_doc_idx + 1 == pending_idx) { - pending = doc['pending'][other_replica_uid]['log'].shift() - current_gen = pending[0]; - current_trans_id = pending[1]; - last_doc_idx = pending[2] - if (doc['pending'][other_replica_uid]['log'].length == 0) - break; - pending_idx = doc['pending'][other_replica_uid]['log'][0][2]; - } - - // leave the sync log untouched if we still did not receive enough docs - if (current_gen == null) - return [doc, 'ok']; - - // update last index of received doc - doc['pending'][other_replica_uid]['last_doc_idx'] = last_doc_idx; - - // eventually remove all pending data from that replica - if (last_doc_idx == number_of_docs) - delete doc['pending'][other_replica_uid] - } - - /*--------------- Store source replica info on sync log ---------------*/ - - // remove outdated info - doc['syncs'] = doc['syncs'].filter( - function (entry) { - return entry[0] != other_replica_uid; - } - ); - - // store in log - doc['syncs'].push([ - other_replica_uid, - current_gen, - current_trans_id - ]); - - return [doc, 'ok']; -} - diff --git a/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js b/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js deleted file mode 100644 index a63c7cf4..00000000 --- a/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js +++ /dev/null @@ -1,12 +0,0 @@ -function(doc) { - if (doc._id == 'u1db_sync_log') { - if (doc.syncs) - doc.syncs.forEach(function (entry) { - emit(entry[0], - { - 'known_generation': entry[1], - 'known_transaction_id': entry[2] - }); - }); - } -} diff --git a/common/src/leap/soledad/common/tests/test_command.py b/common/src/leap/soledad/common/tests/test_command.py new file mode 100644 index 00000000..c386bdd2 --- /dev/null +++ b/common/src/leap/soledad/common/tests/test_command.py @@ -0,0 +1,53 @@ +# -*- coding: utf-8 -*- +# test_command.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for command execution using a validator function for arguments. +""" +from twisted.trial import unittest +from leap.soledad.common.command import exec_validated_cmd + + +class ExecuteValidatedCommandTest(unittest.TestCase): + + def test_argument_validation(self): + validator = lambda arg: True if arg is 'valid' else False + status, out = exec_validated_cmd("command", "invalid arg", validator) + self.assertEquals(status, 1) + self.assertEquals(out, "invalid argument") + status, out = exec_validated_cmd("echo", "valid", validator) + self.assertEquals(status, 0) + self.assertEquals(out, "valid\n") + + def test_return_status_code_success(self): + status, out = exec_validated_cmd("echo", "arg") + self.assertEquals(status, 0) + self.assertEquals(out, "arg\n") + + def test_handle_command_with_spaces(self): + status, out = exec_validated_cmd("echo I am", "an argument") + self.assertEquals(status, 0, out) + self.assertEquals(out, "I am an argument\n") + + def test_handle_oserror_on_invalid_command(self): + status, out = exec_validated_cmd("inexistent command with", "args") + self.assertEquals(status, 1) + self.assertIn("No such file or directory", out) + + def test_return_status_code_number_on_failure(self): + status, out = exec_validated_cmd("ls", "user-bebacafe") + self.assertNotEquals(status, 0) + self.assertIn('No such file or directory\n', out) diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index a08ffd16..b4797f5e 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -28,6 +28,8 @@ from couchdb.client import Server from uuid import uuid4 from testscenarios import TestWithScenarios +from twisted.trial import unittest +from mock import Mock from u1db import errors as u1db_errors from u1db import SyncTarget @@ -1359,10 +1361,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase): self.assertRaises( errors.MissingDesignDocError, self.db.whats_changed) - # _do_set_replica_gen_and_trans_id() - self.assertRaises( - errors.MissingDesignDocError, - self.db._do_set_replica_gen_and_trans_id, 1, 2, 3) def test_missing_design_doc_functions_raises(self): """ @@ -1489,10 +1487,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase): self.assertRaises( errors.MissingDesignDocDeletedError, self.db.whats_changed) - # _do_set_replica_gen_and_trans_id() - self.assertRaises( - errors.MissingDesignDocDeletedError, - self.db._do_set_replica_gen_and_trans_id, 1, 2, 3) def test_ensure_ddoc_independently(self): """ @@ -1506,3 +1500,46 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase): self.db._get_transaction_log) self.create_db(ensure=True, dbname=self.db._dbname) self.db._get_transaction_log() + + def test_ensure_security_doc(self): + """ + Ensure_security creates a _security ddoc to ensure that only soledad + will have the lowest privileged access to an user db. + """ + self.create_db(ensure=False) + self.assertFalse(self.db._database.resource.get_json('_security')[2]) + self.db.ensure_security_ddoc() + security_ddoc = self.db._database.resource.get_json('_security')[2] + self.assertIn('admins', security_ddoc) + self.assertFalse(security_ddoc['admins']['names']) + self.assertIn('members', security_ddoc) + self.assertIn('soledad', security_ddoc['members']['names']) + + +class DatabaseNameValidationTest(unittest.TestCase): + + def test_database_name_validation(self): + self.assertFalse(couch.is_db_name_valid("user-deadbeef | cat /secret")) + self.assertTrue(couch.is_db_name_valid("user-cafe1337")) + + +class CommandBasedDBCreationTest(unittest.TestCase): + + def test_ensure_db_using_custom_command(self): + state = couch.CouchServerState("url", create_cmd="echo") + mock_db = Mock() + mock_db.replica_uid = 'replica_uid' + state.open_database = Mock(return_value=mock_db) + db, replica_uid = state.ensure_database("user-1337") # works + self.assertEquals(mock_db, db) + self.assertEquals(mock_db.replica_uid, replica_uid) + + def test_raises_unauthorized_on_failure(self): + state = couch.CouchServerState("url", create_cmd="inexistent") + self.assertRaises(u1db_errors.Unauthorized, + state.ensure_database, "user-1337") + + def test_raises_unauthorized_by_default(self): + state = couch.CouchServerState("url") + self.assertRaises(u1db_errors.Unauthorized, + state.ensure_database, "user-1337") diff --git a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py index 25f709ca..507f2984 100644 --- a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py +++ b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py @@ -35,17 +35,11 @@ from leap.soledad.common.tests.util import ( ) from leap.soledad.common.tests.test_couch import CouchDBTestCase from leap.soledad.common.tests.u1db_tests import TestCaseWithServer -from leap.soledad.common.tests.test_server import _couch_ensure_database REPEAT_TIMES = 20 -# monkey path CouchServerState so it can ensure databases. - -CouchServerState.ensure_database = _couch_ensure_database - - class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): @staticmethod @@ -163,6 +157,7 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): 2, len(filter(lambda t: t[0] == doc_id, transaction_log))) + @defer.inlineCallbacks def test_correct_sync_log_after_sequential_syncs(self): """ Assert that the sync_log increases accordingly with sequential syncs. @@ -170,21 +165,21 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): sol = self._soledad_instance( auth_token='auth-token', server_url=self.getURL()) + source_replica_uid = sol._dbpool.replica_uid - def _create_docs(results): + def _create_docs(): deferreds = [] for i in xrange(0, REPEAT_TIMES): deferreds.append(sol.create_doc({})) - return defer.DeferredList(deferreds) + return defer.gatherResults(deferreds) def _assert_transaction_and_sync_logs(results, sync_idx): # assert sizes of transaction and sync logs self.assertEqual( sync_idx * REPEAT_TIMES, len(self.db._get_transaction_log())) - self.assertEqual( - 1 if sync_idx > 0 else 0, - len(self.db._database.view('syncs/log').rows)) + gen, _ = self.db._get_replica_gen_and_trans_id(source_replica_uid) + self.assertEqual(sync_idx * REPEAT_TIMES, gen) def _assert_sync(results, sync_idx): gen, docs = results @@ -193,40 +188,28 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): # assert sizes of transaction and sync logs self.assertEqual((sync_idx + 1) * REPEAT_TIMES, len(self.db._get_transaction_log())) - sync_log_rows = self.db._database.view('syncs/log').rows - sync_log = sync_log_rows[0].value - replica_uid = sync_log_rows[0].key - known_gen = sync_log['known_generation'] - known_trans_id = sync_log['known_transaction_id'] - # assert sync_log has exactly 1 row - self.assertEqual(1, len(sync_log_rows)) - # assert it has the correct replica_uid, gen and trans_id - self.assertEqual(sol._dbpool.replica_uid, replica_uid) + target_known_gen, target_known_trans_id = \ + self.db._get_replica_gen_and_trans_id(source_replica_uid) + # assert it has the correct gen and trans_id conn_key = sol._dbpool._u1dbconnections.keys().pop() conn = sol._dbpool._u1dbconnections[conn_key] sol_gen, sol_trans_id = conn._get_generation_info() - self.assertEqual(sol_gen, known_gen) - self.assertEqual(sol_trans_id, known_trans_id) - - # create some documents - d = _create_docs(None) + self.assertEqual(sol_gen, target_known_gen) + self.assertEqual(sol_trans_id, target_known_trans_id) # sync first time and assert success - d.addCallback(_assert_transaction_and_sync_logs, 0) - d.addCallback(lambda _: sol.sync()) - d.addCallback(lambda _: sol.get_all_docs()) - d.addCallback(_assert_sync, 0) + results = yield _create_docs() + _assert_transaction_and_sync_logs(results, 0) + yield sol.sync() + results = yield sol.get_all_docs() + _assert_sync(results, 0) # create more docs, sync second time and assert success - d.addCallback(_create_docs) - d.addCallback(_assert_transaction_and_sync_logs, 1) - d.addCallback(lambda _: sol.sync()) - d.addCallback(lambda _: sol.get_all_docs()) - d.addCallback(_assert_sync, 1) - - d.addCallback(lambda _: sol.close()) - - return d + results = yield _create_docs() + _assert_transaction_and_sync_logs(results, 1) + yield sol.sync() + results = yield sol.get_all_docs() + _assert_sync(results, 1) # # Concurrency tests diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py index f512d6c1..19d2907d 100644 --- a/common/src/leap/soledad/common/tests/test_server.py +++ b/common/src/leap/soledad/common/tests/test_server.py @@ -46,18 +46,6 @@ from leap.soledad.server import LockResource from leap.soledad.server.auth import URLToAuthorization -# monkey path CouchServerState so it can ensure databases. - -def _couch_ensure_database(self, dbname): - db = CouchDatabase.open_database( - self.couch_url + '/' + dbname, - create=True, - ensure_ddocs=True) - return db, db._replica_uid - -CouchServerState.ensure_database = _couch_ensure_database - - class ServerAuthorizationTestCase(BaseSoledadTest): """ diff --git a/scripts/db_access/reset_db.py b/scripts/db_access/reset_db.py index 7c6d281b..c48b511e 100644 --- a/scripts/db_access/reset_db.py +++ b/scripts/db_access/reset_db.py @@ -63,7 +63,7 @@ def get_url(empty): if empty is False: # get couch url cp = ConfigParser() - cp.read('/etc/leap/soledad-server.conf') + cp.read('/etc/soledad/soledad-server.conf') url = cp.get('soledad-server', 'couch_url') else: with open('/etc/couchdb/couchdb.netrc') as f: diff --git a/scripts/db_access/server_side_db.py b/scripts/db_access/server_side_db.py index 18641a0f..fcdd14b6 100644 --- a/scripts/db_access/server_side_db.py +++ b/scripts/db_access/server_side_db.py @@ -1,7 +1,7 @@ #!/usr/bin/python # This script gives server-side access to one Soledad user database by using -# the configuration stored in /etc/leap/soledad-server.conf. +# the configuration stored in /etc/soledad/soledad-server.conf. # # Use it like this: # @@ -20,7 +20,7 @@ uuid = sys.argv[1] # get couch url cp = ConfigParser() -cp.read('/etc/leap/soledad-server.conf') +cp.read('/etc/soledad/soledad-server.conf') url = cp.get('soledad-server', 'couch_url') # access user db diff --git a/scripts/ddocs/update_design_docs.py b/scripts/ddocs/update_design_docs.py index 2e2fa8f0..281482b8 100644 --- a/scripts/ddocs/update_design_docs.py +++ b/scripts/ddocs/update_design_docs.py @@ -50,7 +50,7 @@ def _parse_args(): def _get_url(): # get couch url cp = ConfigParser() - cp.read('/etc/leap/soledad-server.conf') + cp.read('/etc/soledad/soledad-server.conf') url = urlparse(cp.get('soledad-server', 'couch_url')) # get admin password netloc = re.sub('^.*@', '', url.netloc) diff --git a/scripts/profiling/doc_put_memory_usage/find_max_upload_size.py b/scripts/profiling/doc_put_memory_usage/find_max_upload_size.py index 02c68015..1a603fd0 100755 --- a/scripts/profiling/doc_put_memory_usage/find_max_upload_size.py +++ b/scripts/profiling/doc_put_memory_usage/find_max_upload_size.py @@ -30,7 +30,7 @@ from socket import error as socket_error from leap.soledad.common.couch import CouchDatabase -SOLEDAD_CONFIG_FILE = '/etc/leap/soledad-server.conf' +SOLEDAD_CONFIG_FILE = '/etc/soledad/soledad-server.conf' PREFIX = '/tmp/soledad_test' LOG_FORMAT = '%(asctime)s %(levelname)s %(message)s' RETRIES = 3 # number of times to retry uploading a document of a certain diff --git a/server/changes/change_soledad_configdir b/server/changes/change_soledad_configdir new file mode 100644 index 00000000..710b9ac8 --- /dev/null +++ b/server/changes/change_soledad_configdir @@ -0,0 +1,2 @@ +o Moves config directory from /etc/leap to /etc/soledad + resolves #7509 diff --git a/server/changes/create_db_cmd b/server/changes/create_db_cmd new file mode 100644 index 00000000..964a7906 --- /dev/null +++ b/server/changes/create_db_cmd @@ -0,0 +1,4 @@ + o Adds a new config parameter 'create_cmd', which allows sysadmin to specify + which command will create a database. That command was added in + pkg/create-user-db and debian package automates steps needed for sudo access. + o Read netrc path from configuration file for create-user-db command. diff --git a/server/pkg/create-user-db b/server/pkg/create-user-db new file mode 100755 index 00000000..7eafc945 --- /dev/null +++ b/server/pkg/create-user-db @@ -0,0 +1,60 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# create-user-db +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import os +import sys +import netrc +import argparse +from leap.soledad.common.couch import CouchDatabase +from leap.soledad.common.couch import is_db_name_valid +from leap.soledad.server import load_configuration + + +description = """ +Creates a user database. +This is meant to be used by Soledad Server. +""" +parser = argparse.ArgumentParser(description=description) +parser.add_argument('dbname', metavar='user-d34db33f', type=str, + help='database name on the format user-{uuid4}') +NETRC_PATH = load_configuration('/etc/soledad/soledad-server.conf')['admin_netrc'] + + +def url_for_db(dbname): + if not os.path.exists(NETRC_PATH): + print ('netrc not found in %s' % NETRC_PATH) + sys.exit(1) + parsed_netrc = netrc.netrc(NETRC_PATH) + host, (login, _, password) = parsed_netrc.hosts.items()[0] + url = ('http://%(login)s:%(password)s@%(host)s:5984/%(dbname)s' % { + 'login': login, + 'password': password, + 'host': host, + 'dbname': dbname}) + return url + + +if __name__ == '__main__': + args = parser.parse_args() + if not is_db_name_valid(args.dbname): + print ("Invalid name! %s" % args.dbname) + sys.exit(1) + url = url_for_db(args.dbname) + db = CouchDatabase.open_database(url=url, create=True, + replica_uid=None, ensure_ddocs=True) + print ('success! Created %s, replica_uid: %s' % + (db._dbname, db.replica_uid)) diff --git a/server/pkg/requirements.pip b/server/pkg/requirements.pip index d75678b2..58834d0e 100644 --- a/server/pkg/requirements.pip +++ b/server/pkg/requirements.pip @@ -1,9 +1,11 @@ configparser -couchdb u1db routes PyOpenSSL twisted +#pinned for wheezy compatibility +Beaker==1.6.3 #wheezy +couchdb==0.8 #wheezy # XXX -- fix me! # oauth is not strictly needed by us, but we need it until u1db adds it to its diff --git a/server/pkg/soledad-server b/server/pkg/soledad-server index 811ad55b..74ed122e 100644 --- a/server/pkg/soledad-server +++ b/server/pkg/soledad-server @@ -11,12 +11,12 @@ PATH=/sbin:/bin:/usr/sbin:/usr/bin PIDFILE=/var/run/soledad.pid -RUNDIR=/var/lib/soledad/ OBJ=leap.soledad.server.application LOGFILE=/var/log/soledad.log HTTPS_PORT=2424 -CERT_PATH=/etc/leap/soledad-server.pem -PRIVKEY_PATH=/etc/leap/soledad-server.key +CONFDIR=/etc/soledad +CERT_PATH="${CONFDIR}/soledad-server.pem" +PRIVKEY_PATH="${CONFDIR}/soledad-server.key" TWISTD_PATH=/usr/bin/twistd HOME=/var/lib/soledad/ SSL_METHOD=SSLv23_METHOD @@ -25,7 +25,7 @@ GROUP=soledad [ -r /etc/default/soledad ] && . /etc/default/soledad -test -r /etc/leap/ || exit 0 +test -r ${CONFDIR} || exit 0 . /lib/lsb/init-functions diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py index 1b795016..f64d07bf 100644 --- a/server/src/leap/soledad/server/__init__.py +++ b/server/src/leap/soledad/server/__init__.py @@ -283,18 +283,20 @@ def load_configuration(file_path): @return: A dictionary with the configuration. @rtype: dict """ - conf = { + defaults = { 'couch_url': 'http://localhost:5984', + 'create_cmd': None, + 'admin_netrc': '/etc/couchdb/couchdb-admin.netrc', } config = configparser.ConfigParser() config.read(file_path) if 'soledad-server' in config: - for key in conf: + for key in defaults: if key in config['soledad-server']: - conf[key] = config['soledad-server'][key] + defaults[key] = config['soledad-server'][key] # TODO: implement basic parsing/sanitization of options comming from # config file. - return conf + return defaults # ---------------------------------------------------------------------------- @@ -302,8 +304,8 @@ def load_configuration(file_path): # ---------------------------------------------------------------------------- def application(environ, start_response): - conf = load_configuration('/etc/leap/soledad-server.conf') - state = CouchServerState(conf['couch_url']) + conf = load_configuration('/etc/soledad/soledad-server.conf') + state = CouchServerState(conf['couch_url'], create_cmd=conf['create_cmd']) # WSGI application that may be used by `twistd -web` application = GzipMiddleware( SoledadTokenAuthMiddleware(SoledadApp(state))) diff --git a/server/src/leap/soledad/server/caching.py b/server/src/leap/soledad/server/caching.py new file mode 100644 index 00000000..9a049a39 --- /dev/null +++ b/server/src/leap/soledad/server/caching.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +# caching.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Server side caching. Using beaker for now. +""" +from beaker.cache import CacheManager + + +def setup_caching(): + _cache_manager = CacheManager(type='memory') + return _cache_manager + + +_cache_manager = setup_caching() + + +def get_cache_for(key, expire=3600): + return _cache_manager.get_cache(key, expire=expire) diff --git a/server/src/leap/soledad/server/state.py b/server/src/leap/soledad/server/state.py new file mode 100644 index 00000000..f269b77e --- /dev/null +++ b/server/src/leap/soledad/server/state.py @@ -0,0 +1,141 @@ +# -*- coding: utf-8 -*- +# state.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Server side synchronization infrastructure. +""" +from leap.soledad.server import caching + + +class ServerSyncState(object): + """ + The state of one sync session, as stored on backend server. + + On server side, the ongoing syncs metadata is maintained in + a caching layer. + """ + + def __init__(self, source_replica_uid, sync_id): + """ + Initialize the sync state object. + + :param sync_id: The id of current sync + :type sync_id: str + :param source_replica_uid: The source replica uid + :type source_replica_uid: str + """ + self._source_replica_uid = source_replica_uid + self._sync_id = sync_id + caching_key = source_replica_uid + sync_id + self._storage = caching.get_cache_for(caching_key) + + def _put_dict_info(self, key, value): + """ + Put some information about the sync state. + + :param key: The key for the info to be put. + :type key: str + :param value: The value for the info to be put. + :type value: str + """ + if key not in self._storage: + self._storage[key] = [] + info_list = self._storage.get(key) + info_list.append(value) + self._storage[key] = info_list + + def put_seen_id(self, seen_id, gen): + """ + Put one seen id on the sync state. + + :param seen_id: The doc_id of a document seen during sync. + :type seen_id: str + :param gen: The corresponding db generation. + :type gen: int + """ + self._put_dict_info( + 'seen_id', + (seen_id, gen)) + + def seen_ids(self): + """ + Return all document ids seen during the sync. + + :return: A dict with doc ids seen during the sync. + :rtype: dict + """ + if 'seen_id' in self._storage: + seen_ids = self._storage.get('seen_id') + else: + seen_ids = [] + return dict(seen_ids) + + def put_changes_to_return(self, gen, trans_id, changes_to_return): + """ + Put the calculated changes to return in the backend sync state. + + :param gen: The target database generation that will be synced. + :type gen: int + :param trans_id: The target database transaction id that will be + synced. + :type trans_id: str + :param changes_to_return: A list of tuples with the changes to be + returned during the sync process. + :type changes_to_return: list + """ + self._put_dict_info( + 'changes_to_return', + { + 'gen': gen, + 'trans_id': trans_id, + 'changes_to_return': changes_to_return, + } + ) + + def sync_info(self): + """ + Return information about the current sync state. + + :return: The generation and transaction id of the target database + which will be synced, and the number of documents to return, + or a tuple of Nones if those have not already been sent to + server. + :rtype: tuple + """ + gen = trans_id = number_of_changes = None + if 'changes_to_return' in self._storage: + info = self._storage.get('changes_to_return')[0] + gen = info['gen'] + trans_id = info['trans_id'] + number_of_changes = len(info['changes_to_return']) + return gen, trans_id, number_of_changes + + def next_change_to_return(self, received): + """ + Return the next change to be returned to the source syncing replica. + + :param received: How many documents the source replica has already + received during the current sync process. + :type received: int + """ + gen = trans_id = next_change_to_return = None + if 'changes_to_return' in self._storage: + info = self._storage.get('changes_to_return')[0] + gen = info['gen'] + trans_id = info['trans_id'] + if received < len(info['changes_to_return']): + next_change_to_return = (info['changes_to_return'][received]) + return gen, trans_id, next_change_to_return diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index 18c4ee40..92b29102 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -17,183 +17,16 @@ """ Server side synchronization infrastructure. """ -import json - -from leap.soledad.common.couch import CouchDatabase from u1db import sync, Document from u1db.remote import http_app +from leap.soledad.server.state import ServerSyncState +from leap.soledad.server.caching import get_cache_for MAX_REQUEST_SIZE = 200 # in Mb MAX_ENTRY_SIZE = 200 # in Mb -class ServerSyncState(object): - """ - The state of one sync session, as stored on backend server. - - This object performes queries to distinct design documents: - - _design/syncs/_update/state - _design/syncs/_view/state - _design/syncs/_view/seen_ids - _design/syncs/_view/changes_to_return - - On server side, the ongoing syncs metadata is maintained in a document - called 'u1db_sync_state'. - """ - - def __init__(self, db, source_replica_uid, sync_id): - """ - Initialize the sync state object. - - :param db: The target syncing database. - :type db: CouchDatabase. - :param source_replica_uid: CouchDatabase - :type source_replica_uid: str - """ - self._db = db - self._source_replica_uid = source_replica_uid - self._sync_id = sync_id - - def _key(self, key): - """ - Format a key to be used on couch views. - - :param key: The lookup key. - :type key: json serializable object - - :return: The properly formatted key. - :rtype: str - """ - return json.dumps(key, separators=(',', ':')) - - def _put_info(self, key, value): - """ - Put some information on the sync state document. - - This method works in conjunction with the - _design/syncs/_update/state update handler couch backend. - - :param key: The key for the info to be put. - :type key: str - :param value: The value for the info to be put. - :type value: str - """ - ddoc_path = [ - '_design', 'syncs', '_update', 'state', - 'u1db_sync_state'] - res = self._db._database.resource(*ddoc_path) - with CouchDatabase.sync_info_lock[self._db.replica_uid]: - res.put_json( - body={ - 'sync_id': self._sync_id, - 'source_replica_uid': self._source_replica_uid, - key: value, - }, - headers={'content-type': 'application/json'}) - - def put_seen_id(self, seen_id, gen): - """ - Put one seen id on the sync state document. - - :param seen_id: The doc_id of a document seen during sync. - :type seen_id: str - :param gen: The corresponding db generation for that document. - :type gen: int - """ - self._put_info( - 'seen_id', - [seen_id, gen]) - - def seen_ids(self): - """ - Return all document ids seen during the sync. - - :return: A list with doc ids seen during the sync. - :rtype: list - """ - ddoc_path = ['_design', 'syncs', '_view', 'seen_ids'] - resource = self._db._database.resource(*ddoc_path) - response = resource.get_json( - key=self._key([self._source_replica_uid, self._sync_id])) - data = response[2] - if data['rows']: - entry = data['rows'].pop() - return entry['value']['seen_ids'] - return [] - - def put_changes_to_return(self, gen, trans_id, changes_to_return): - """ - Put the calculated changes to return in the backend sync state - document. - - :param gen: The target database generation that will be synced. - :type gen: int - :param trans_id: The target database transaction id that will be - synced. - :type trans_id: str - :param changes_to_return: A list of tuples with the changes to be - returned during the sync process. - :type changes_to_return: list - """ - self._put_info( - 'changes_to_return', - { - 'gen': gen, - 'trans_id': trans_id, - 'changes_to_return': changes_to_return, - } - ) - - def sync_info(self): - """ - Return information about the current sync state. - - :return: The generation and transaction id of the target database - which will be synced, and the number of documents to return, - or a tuple of Nones if those have not already been sent to - server. - :rtype: tuple - """ - ddoc_path = ['_design', 'syncs', '_view', 'state'] - resource = self._db._database.resource(*ddoc_path) - response = resource.get_json( - key=self._key([self._source_replica_uid, self._sync_id])) - data = response[2] - gen = None - trans_id = None - number_of_changes = None - if data['rows'] and data['rows'][0]['value'] is not None: - value = data['rows'][0]['value'] - gen = value['gen'] - trans_id = value['trans_id'] - number_of_changes = value['number_of_changes'] - return gen, trans_id, number_of_changes - - def next_change_to_return(self, received): - """ - Return the next change to be returned to the source syncing replica. - - :param received: How many documents the source replica has already - received during the current sync process. - :type received: int - """ - ddoc_path = ['_design', 'syncs', '_view', 'changes_to_return'] - resource = self._db._database.resource(*ddoc_path) - response = resource.get_json( - key=self._key( - [self._source_replica_uid, self._sync_id, received])) - data = response[2] - if not data['rows']: - return None, None, None - value = data['rows'][0]['value'] - gen = value['gen'] - trans_id = value['trans_id'] - next_change_to_return = value['next_change_to_return'] - return gen, trans_id, tuple(next_change_to_return) - - class SyncExchange(sync.SyncExchange): def __init__(self, db, source_replica_uid, last_known_generation, sync_id): @@ -216,8 +49,7 @@ class SyncExchange(sync.SyncExchange): self.new_trans_id = None self._trace_hook = None # recover sync state - self._sync_state = ServerSyncState( - self._db, self.source_replica_uid, sync_id) + self._sync_state = ServerSyncState(self.source_replica_uid, sync_id) def find_changes_to_return(self, received): """ @@ -353,10 +185,12 @@ class SyncResource(http_app.SyncResource): :type ensure: bool """ # create or open the database + cache = get_cache_for('db-' + sync_id + self.dbname, expire=120) if ensure: db, self.replica_uid = self.state.ensure_database(self.dbname) else: db = self.state.open_database(self.dbname) + db.init_caching(cache) # validate the information the client has about server replica db.validate_gen_and_trans_id( last_known_generation, last_known_trans_id) |