From 8e750af78bd8f9a37d1b095712b66e6ecdb3e102 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 29 Apr 2014 13:29:23 -0300 Subject: Cleanup and pep8 fix. --- server/src/leap/soledad/server/lock_resource.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'server/src') diff --git a/server/src/leap/soledad/server/lock_resource.py b/server/src/leap/soledad/server/lock_resource.py index a7870f77..0a602e26 100644 --- a/server/src/leap/soledad/server/lock_resource.py +++ b/server/src/leap/soledad/server/lock_resource.py @@ -178,9 +178,8 @@ class LockResource(object): error=InvalidTokenError.wire_description) else: self._shared_db.delete_doc(lock_doc) - self._responder.send_response_json(200) # success: should use 204 - # but u1db does not - # support it. + # respond success: should use 204 but u1db does not support it. + self._responder.send_response_json(200) def _remaining(self, lock_doc, now): """ -- cgit v1.2.3 From 24465b7b2cd77b66f637e22453dd24a2d67c4ce6 Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 17 Apr 2014 16:15:04 -0300 Subject: Split sync in multiple POST requests in server (#5571). --- server/src/leap/soledad/server/__init__.py | 137 ++++++++- server/src/leap/soledad/server/sync.py | 462 +++++++++++++++++++++++++++++ 2 files changed, 585 insertions(+), 14 deletions(-) create mode 100644 server/src/leap/soledad/server/sync.py (limited to 'server/src') diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py index c170f230..573afdd6 100644 --- a/server/src/leap/soledad/server/__init__.py +++ b/server/src/leap/soledad/server/__init__.py @@ -87,8 +87,10 @@ and lock documents on the shared database is handled by """ import configparser +import urlparse +import sys -from u1db.remote import http_app +from u1db.remote import http_app, utils # Keep OpenSSL's tsafe before importing Twisted submodules so we can put # it back if Twisted==12.0.0 messes with it. @@ -99,24 +101,24 @@ from twisted import version if version.base() == "12.0.0": # Put OpenSSL's tsafe back into place. This can probably be removed if we # come to use Twisted>=12.3.0. - import sys sys.modules['OpenSSL.tsafe'] = old_tsafe from leap.soledad.server.auth import SoledadTokenAuthMiddleware from leap.soledad.server.gzip_middleware import GzipMiddleware from leap.soledad.server.lock_resource import LockResource +from leap.soledad.server.sync import ( + SyncResource, + MAX_REQUEST_SIZE, + MAX_ENTRY_SIZE, +) from leap.soledad.common import SHARED_DB_NAME from leap.soledad.common.couch import CouchServerState -#----------------------------------------------------------------------------- +# ---------------------------------------------------------------------------- # Soledad WSGI application -#----------------------------------------------------------------------------- - -MAX_REQUEST_SIZE = 200 # in Mb -MAX_ENTRY_SIZE = 200 # in Mb - +# ---------------------------------------------------------------------------- class SoledadApp(http_app.HTTPApp): """ @@ -147,14 +149,121 @@ class SoledadApp(http_app.HTTPApp): return http_app.HTTPApp.__call__(self, environ, start_response) +# ---------------------------------------------------------------------------- +# WSGI resources registration +# ---------------------------------------------------------------------------- + +# monkey patch u1db with a new resource map +http_app.url_to_resource = http_app.URLToResource() + +# register u1db unmodified resources +http_app.url_to_resource.register(http_app.GlobalResource) +http_app.url_to_resource.register(http_app.DatabaseResource) +http_app.url_to_resource.register(http_app.DocsResource) +http_app.url_to_resource.register(http_app.DocResource) + +# register Soledad's new or modified resources http_app.url_to_resource.register(LockResource) -http_app.SyncResource.max_request_size = MAX_REQUEST_SIZE * 1024 * 1024 -http_app.SyncResource.max_entry_size = MAX_ENTRY_SIZE * 1024 * 1024 +http_app.url_to_resource.register(SyncResource) + +# ---------------------------------------------------------------------------- +# Modified HTTP method invocation (to account for splitted sync) +# ---------------------------------------------------------------------------- -#----------------------------------------------------------------------------- +class HTTPInvocationByMethodWithBody( + http_app.HTTPInvocationByMethodWithBody): + """ + Invoke methods on a resource. + """ + + def __call__(self): + """ + Call an HTTP method of a resource. + + This method was rewritten to allow for a sync flow which uses one POST + request for each transferred document (back and forth). + + Usual U1DB sync process transfers all documents from client to server + and back in only one POST request. This is inconvenient for some + reasons, as lack of possibility of gracefully interrupting the sync + process, and possible timeouts for when dealing with large documents + that have to be retrieved and encrypted/decrypted. Because of those, + we split the sync process into many POST requests. + """ + args = urlparse.parse_qsl(self.environ['QUERY_STRING'], + strict_parsing=False) + try: + args = dict( + (k.decode('utf-8'), v.decode('utf-8')) for k, v in args) + except ValueError: + raise http_app.BadRequest() + method = self.environ['REQUEST_METHOD'].lower() + if method in ('get', 'delete'): + meth = self._lookup(method) + return meth(args, None) + else: + # we expect content-length > 0, reconsider if we move + # to support chunked enconding + try: + content_length = int(self.environ['CONTENT_LENGTH']) + except (ValueError, KeyError): + raise http_app.BadRequest + if content_length <= 0: + raise http_app.BadRequest + if content_length > self.max_request_size: + raise http_app.BadRequest + reader = http_app._FencedReader( + self.environ['wsgi.input'], content_length, + self.max_entry_size) + content_type = self.environ.get('CONTENT_TYPE') + if content_type == 'application/json': + meth = self._lookup(method) + body = reader.read_chunk(sys.maxint) + return meth(args, body) + elif content_type.startswith('application/x-soledad-sync'): + # read one line and validate it + body_getline = reader.getline + if body_getline().strip() != '[': + raise http_app.BadRequest() + line = body_getline() + line, comma = utils.check_and_strip_comma(line.strip()) + meth_args = self._lookup('%s_args' % method) + meth_args(args, line) + # handle incoming documents + if content_type == 'application/x-soledad-sync-put': + meth_put = self._lookup('%s_put' % method) + meth_end = self._lookup('%s_end' % method) + while True: + line = body_getline() + entry = line.strip() + if entry == ']': # end of incoming document stream + break + if not entry or not comma: # empty or no prec comma + raise http_app.BadRequest + entry, comma = utils.check_and_strip_comma(entry) + meth_put({}, entry) + if comma or body_getline(): # extra comma or data + raise http_app.BadRequest + return meth_end() + # handle outgoing documents + elif content_type == 'application/x-soledad-sync-get': + line = body_getline() + entry = line.strip() + meth_get = self._lookup('%s_get' % method) + return meth_get({}, line) + else: + raise http_app.BadRequest() + else: + raise http_app.BadRequest() + + +http_app.HTTPInvocationByMethodWithBody = HTTPInvocationByMethodWithBody + + +# ---------------------------------------------------------------------------- # Auxiliary functions -#----------------------------------------------------------------------------- +# ---------------------------------------------------------------------------- def load_configuration(file_path): """ @@ -180,9 +289,9 @@ def load_configuration(file_path): return conf -#----------------------------------------------------------------------------- +# ---------------------------------------------------------------------------- # Run as Twisted WSGI Resource -#----------------------------------------------------------------------------- +# ---------------------------------------------------------------------------- def application(environ, start_response): conf = load_configuration('/etc/leap/soledad-server.conf') diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py new file mode 100644 index 00000000..3b8b69fb --- /dev/null +++ b/server/src/leap/soledad/server/sync.py @@ -0,0 +1,462 @@ +# -*- coding: utf-8 -*- +# sync.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +""" +Server side synchronization infrastructure. +""" + +import json + + +from leap.soledad.common.couch import CouchDatabase +from itertools import izip +from u1db import sync, Document +from u1db.remote import http_app + + +MAX_REQUEST_SIZE = 200 # in Mb +MAX_ENTRY_SIZE = 200 # in Mb + + +class ServerSyncState(object): + """ + The state of one sync session, as stored on backend server. + + This object performes queries to distinct design documents: + + _design/syncs/_update/state + _design/syncs/_view/state + _design/syncs/_view/seen_ids + _design/syncs/_view/changes_to_return + + On server side, the ongoing syncs metadata is maintained in a document + called 'u1db_sync_state'. + """ + + def __init__(self, db, source_replica_uid): + """ + Initialize the sync state object. + + :param db: The target syncing database. + :type db: CouchDatabase. + :param source_replica_uid: CouchDatabase + :type source_replica_uid: str + """ + self._db = db + self._source_replica_uid = source_replica_uid + + def _key(self, key): + """ + Format a key to be used on couch views. + + :param key: The lookup key. + :type key: json serializable object + + :return: The properly formatted key. + :rtype: str + """ + return json.dumps(key, separators=(',', ':')) + + def _put_info(self, key, value): + """ + Put some information on the sync state document. + + This method works in conjunction with the + _design/syncs/_update/state update handler couch backend. + + :param key: The key for the info to be put. + :type key: str + :param value: The value for the info to be put. + :type value: str + """ + ddoc_path = [ + '_design', 'syncs', '_update', 'state', + 'u1db_sync_state'] + res = self._db._database.resource(*ddoc_path) + with CouchDatabase.sync_info_lock[self._db.replica_uid]: + res.put_json( + body={ + 'source_replica_uid': self._source_replica_uid, + key: value, + }, + headers={'content-type': 'application/json'}) + + def put_seen_id(self, seen_id, gen): + """ + Put one seen id on the sync state document. + + :param seen_id: The doc_id of a document seen during sync. + :type seen_id: str + :param gen: The corresponding db generation for that document. + :type gen: int + """ + self._put_info( + 'seen_id', + [seen_id, gen]) + + def seen_ids(self): + """ + Return all document ids seen during the sync. + + :return: A list with doc ids seen during the sync. + :rtype: list + """ + ddoc_path = ['_design', 'syncs', '_view', 'seen_ids'] + resource = self._db._database.resource(*ddoc_path) + response = resource.get_json(key=self._key(self._source_replica_uid)) + data = response[2] + if len(data['rows']) > 0: + entry = data['rows'].pop() + return entry['value']['seen_ids'] + return [] + + def put_changes_to_return(self, gen, trans_id, changes_to_return): + """ + Put the calculated changes to return in the backend sync state + document. + + :param gen: The target database generation that will be synced. + :type gen: int + :param trans_id: The target database transaction id that will be + synced. + :type trans_id: str + :param changes_to_return: A list of tuples with the changes to be + returned during the sync process. + :type changes_to_return: list + """ + self._put_info( + 'changes_to_return', + { + 'gen': gen, + 'trans_id': trans_id, + 'changes_to_return': changes_to_return, + } + ) + + def sync_info(self): + """ + Return information about the current sync state. + + :return: The generation and transaction id of the target database + which will be synced, and the number of documents do return, + or a tuple of Nones if those have not already been sent to + server. + :rtype: tuple + """ + ddoc_path = ['_design', 'syncs', '_view', 'state'] + resource = self._db._database.resource(*ddoc_path) + response = resource.get_json(key=self._key(self._source_replica_uid)) + data = response[2] + gen = None + trans_id = None + number_of_changes = None + if len(data['rows']) > 0 and data['rows'][0]['value'] is not None: + value = data['rows'][0]['value'] + gen = value['gen'] + trans_id = value['trans_id'] + number_of_changes = value['number_of_changes'] + return gen, trans_id, number_of_changes + + def next_change_to_return(self, received): + """ + Return the next change to be returned to the source syncing replica. + + :param received: How many documents the source replica has already + received during the current sync process. + :type received: int + """ + ddoc_path = ['_design', 'syncs', '_view', 'changes_to_return'] + resource = self._db._database.resource(*ddoc_path) + response = resource.get_json( + key=self._key( + [self._source_replica_uid, received])) + data = response[2] + if len(data['rows']) == 0: + return None, None, None + value = data['rows'][0]['value'] + gen = value['gen'] + trans_id = value['trans_id'] + next_change_to_return = value['next_change_to_return'] + return gen, trans_id, tuple(next_change_to_return) + + +class SyncExchange(sync.SyncExchange): + + def __init__(self, db, source_replica_uid, last_known_generation): + """ + :param db: The target syncing database. + :type db: CouchDatabase + :param source_replica_uid: The uid of the source syncing replica. + :type source_replica_uid: str + :param last_known_generation: The last target replica generation the + source replica knows about. + :type last_known_generation: int + """ + self._db = db + self.source_replica_uid = source_replica_uid + self.source_last_known_generation = last_known_generation + self.new_gen = None + self.new_trans_id = None + self._trace_hook = None + # recover sync state + self._sync_state = ServerSyncState(self._db, self.source_replica_uid) + # for tests + #self._incoming_trace = [] + #if hasattr(self._db, '_incoming_trace'): + # self._incoming_trace = self._db._incoming_trace + #self._db._last_exchange_log = { + # 'receive': {'docs': self._incoming_trace}, + # 'return': None + # } + + + def find_changes_to_return(self, received): + """ + Find changes to return. + + Find changes since last_known_generation in db generation + order using whats_changed. It excludes documents ids that have + already been considered (superseded by the sender, etc). + + :param received: How many documents the source replica has already + received during the current sync process. + :type received: int + + :return: the generation of this database, which the caller can + consider themselves to be synchronized after processing + allreturned documents, and the amount of documents to be sent + to the source syncing replica. + :rtype: int + """ + if hasattr(self._db, '_last_exchange_log'): + self._db._last_exchange_log['receive'].update({ # for tests + 'last_known_gen': self.source_last_known_generation + }) + # check if changes to return have already been calculated + new_gen, new_trans_id, number_of_changes = self._sync_state.sync_info() + if number_of_changes is None: + self._trace('before whats_changed') + new_gen, new_trans_id, changes = self._db.whats_changed( + self.source_last_known_generation) + self._trace('after whats_changed') + seen_ids = self._sync_state.seen_ids() + # changed docs that weren't superseded by or converged with + changes_to_return = [ + (doc_id, gen, trans_id) for (doc_id, gen, trans_id) in changes + # there was a subsequent update + if doc_id not in seen_ids or seen_ids.get(doc_id) < gen] + self._sync_state.put_changes_to_return( + new_gen, new_trans_id, changes_to_return) + number_of_changes = len(changes_to_return) + # query server for stored changes + _, _, next_change_to_return = \ + self._sync_state.next_change_to_return(received) + self.new_gen = new_gen + self.new_trans_id = new_trans_id + # and append one change + self.changes_to_return = [] + if next_change_to_return is not None: + self.changes_to_return.append(next_change_to_return) + return self.new_gen, number_of_changes + + def return_one_doc(self, return_doc_cb): + """ + Return one changed document and its last change generation to the + source syncing replica by invoking the callback return_doc_cb. + + This is called once for each document to be transferred from target to + source. + + :param return_doc_cb: is a callback used to return the documents with + their last change generation to the target + replica. + :type return_doc_cb: callable(doc, gen, trans_id) + """ + changes_to_return = self.changes_to_return + # return docs, including conflicts + changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return] + self._trace('before get_docs') + docs = self._db.get_docs( + changed_doc_ids, check_for_conflicts=False, include_deleted=True) + + docs_by_gen = izip( + docs, (gen for _, gen, _ in changes_to_return), + (trans_id for _, _, trans_id in changes_to_return)) + for doc, gen, trans_id in docs_by_gen: + return_doc_cb(doc, gen, trans_id) + # for tests + if hasattr(self._db, '_outgoing_trace'): + self._db._outgoing_trace.append((doc.doc_id, doc.rev)) + # for tests + if hasattr(self._db, '_outgoing_trace'): + self._db._last_exchange_log['return'] = { + 'docs': self._db._outgoing_trace, + 'last_gen': self.new_gen + } + + def insert_doc_from_source(self, doc, source_gen, trans_id): + """Try to insert synced document from source. + + Conflicting documents are not inserted but will be sent over + to the sync source. + + It keeps track of progress by storing the document source + generation as well. + + The 1st step of a sync exchange is to call this repeatedly to + try insert all incoming documents from the source. + + :param doc: A Document object. + :type doc: Document + :param source_gen: The source generation of doc. + :type source_gen: int + :param trans_id: The transaction id of that document change. + :type trans_id: str + """ + state, at_gen = self._db._put_doc_if_newer( + doc, save_conflict=False, replica_uid=self.source_replica_uid, + replica_gen=source_gen, replica_trans_id=trans_id) + if state == 'inserted': + self._sync_state.put_seen_id(doc.doc_id, at_gen) + elif state == 'converged': + # magical convergence + self._sync_state.put_seen_id(doc.doc_id, at_gen) + elif state == 'superseded': + # we have something newer that we will return + pass + else: + # conflict that we will returne + assert state == 'conflicted' + # for tests + if hasattr(self._db, '_incoming_trace') \ + and hasattr(self._db, '_last_exchange_log'): + self._db._incoming_trace.append((doc.doc_id, doc.rev)) + self._db._last_exchange_log['receive'].update({ + 'source_uid': self.source_replica_uid, + 'source_gen': source_gen + }) + + +class SyncResource(http_app.SyncResource): + + max_request_size = MAX_REQUEST_SIZE * 1024 * 1024 + max_entry_size = MAX_ENTRY_SIZE * 1024 * 1024 + + sync_exchange_class = SyncExchange + + @http_app.http_method( + last_known_generation=int, last_known_trans_id=http_app.none_or_str, + content_as_args=True) + def post_args(self, last_known_generation, last_known_trans_id=None, + ensure=False): + """ + Handle the initial arguments for the sync POST request from client. + + :param last_known_generation: The last server replica generation the + client knows about. + :type last_known_generation: int + :param last_known_trans_id: The last server replica transaction_id the + client knows about. + :type last_known_trans_id: str + :param ensure: Wether the server replica should be created if it does + not already exist. + :type ensure: bool + """ + # create or open the database + if ensure: + db, self.replica_uid = self.state.ensure_database(self.dbname) + else: + db = self.state.open_database(self.dbname) + # validate the information the client has about server replica + db.validate_gen_and_trans_id( + last_known_generation, last_known_trans_id) + # get a sync exchange object + self.sync_exch = self.sync_exchange_class( + db, self.source_replica_uid, last_known_generation) + + @http_app.http_method(content_as_args=True) + def post_put(self, id, rev, content, gen, trans_id): + """ + Put one incoming document into the server replica. + + :param id: The id of the incoming document. + :type id: str + :param rev: The revision of the incoming document. + :type rev: str + :param content: The content of the incoming document. + :type content: dict + :param gen: The source replica generation corresponding to the + revision of the incoming document. + :type gen: int + :param trans_id: The source replica transaction id corresponding to + the revision of the incoming document. + :type trans_id: str + """ + doc = Document(id, rev, content) + self.sync_exch.insert_doc_from_source(doc, gen, trans_id) + + @http_app.http_method(received=int, content_as_args=True) + def post_get(self, received): + """ + Return one syncing document to the client. + + :param received: How many documents have already been received by the + client on the current sync session. + :type received: int + """ + + def send_doc(doc, gen, trans_id): + entry = dict(id=doc.doc_id, rev=doc.rev, content=doc.get_json(), + gen=gen, trans_id=trans_id) + self.responder.stream_entry(entry) + + new_gen, number_of_changes = \ + self.sync_exch.find_changes_to_return(received) + self.responder.content_type = 'application/x-u1db-sync-response' + self.responder.start_response(200) + self.responder.start_stream(), + header = { + "new_generation": new_gen, + "new_transaction_id": self.sync_exch.new_trans_id, + "number_of_changes": number_of_changes, + } + if self.replica_uid is not None: + header['replica_uid'] = self.replica_uid + self.responder.stream_entry(header) + self.sync_exch.return_one_doc(send_doc) + self.responder.end_stream() + self.responder.finish_response() + + def post_end(self): + """ + Return the current generation and transaction_id after inserting a + series of incoming documents. + """ + self.responder.content_type = 'application/x-soledad-sync-response' + self.responder.start_response(200) + self.responder.start_stream(), + new_gen, new_trans_id = self.sync_exch._db._get_generation_info() + header = { + "new_generation": new_gen, + "new_transaction_id": new_trans_id, + } + if self.replica_uid is not None: + header['replica_uid'] = self.replica_uid + self.responder.stream_entry(header) + self.responder.end_stream() + self.responder.finish_response() -- cgit v1.2.3 From bb8edd8655a1723c8944c898b1249099759ebf59 Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 22 May 2014 18:43:14 -0300 Subject: Fix tests for new sync process. --- server/src/leap/soledad/server/__init__.py | 1 + 1 file changed, 1 insertion(+) (limited to 'server/src') diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py index 573afdd6..cd006f51 100644 --- a/server/src/leap/soledad/server/__init__.py +++ b/server/src/leap/soledad/server/__init__.py @@ -258,6 +258,7 @@ class HTTPInvocationByMethodWithBody( raise http_app.BadRequest() +# monkey patch server with new http invocation http_app.HTTPInvocationByMethodWithBody = HTTPInvocationByMethodWithBody -- cgit v1.2.3 From 1427c60d7dff3fcfa6c30e16fd9815fd4787b458 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 28 May 2014 15:24:34 -0300 Subject: Fix stuff from kali's review. --- server/src/leap/soledad/server/sync.py | 57 ++++++---------------------------- 1 file changed, 9 insertions(+), 48 deletions(-) (limited to 'server/src') diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index 3b8b69fb..16926f14 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -120,7 +120,7 @@ class ServerSyncState(object): resource = self._db._database.resource(*ddoc_path) response = resource.get_json(key=self._key(self._source_replica_uid)) data = response[2] - if len(data['rows']) > 0: + if data['rows']: entry = data['rows'].pop() return entry['value']['seen_ids'] return [] @@ -153,7 +153,7 @@ class ServerSyncState(object): Return information about the current sync state. :return: The generation and transaction id of the target database - which will be synced, and the number of documents do return, + which will be synced, and the number of documents to return, or a tuple of Nones if those have not already been sent to server. :rtype: tuple @@ -165,7 +165,7 @@ class ServerSyncState(object): gen = None trans_id = None number_of_changes = None - if len(data['rows']) > 0 and data['rows'][0]['value'] is not None: + if data['rows'] and data['rows'][0]['value'] is not None: value = data['rows'][0]['value'] gen = value['gen'] trans_id = value['trans_id'] @@ -186,7 +186,7 @@ class ServerSyncState(object): key=self._key( [self._source_replica_uid, received])) data = response[2] - if len(data['rows']) == 0: + if not data['rows']: return None, None, None value = data['rows'][0]['value'] gen = value['gen'] @@ -215,14 +215,6 @@ class SyncExchange(sync.SyncExchange): self._trace_hook = None # recover sync state self._sync_state = ServerSyncState(self._db, self.source_replica_uid) - # for tests - #self._incoming_trace = [] - #if hasattr(self._db, '_incoming_trace'): - # self._incoming_trace = self._db._incoming_trace - #self._db._last_exchange_log = { - # 'receive': {'docs': self._incoming_trace}, - # 'return': None - # } def find_changes_to_return(self, received): @@ -243,10 +235,6 @@ class SyncExchange(sync.SyncExchange): to the source syncing replica. :rtype: int """ - if hasattr(self._db, '_last_exchange_log'): - self._db._last_exchange_log['receive'].update({ # for tests - 'last_known_gen': self.source_last_known_generation - }) # check if changes to return have already been calculated new_gen, new_trans_id, number_of_changes = self._sync_state.sync_info() if number_of_changes is None: @@ -269,9 +257,7 @@ class SyncExchange(sync.SyncExchange): self.new_gen = new_gen self.new_trans_id = new_trans_id # and append one change - self.changes_to_return = [] - if next_change_to_return is not None: - self.changes_to_return.append(next_change_to_return) + self.change_to_return = next_change_to_return return self.new_gen, number_of_changes def return_one_doc(self, return_doc_cb): @@ -287,27 +273,10 @@ class SyncExchange(sync.SyncExchange): replica. :type return_doc_cb: callable(doc, gen, trans_id) """ - changes_to_return = self.changes_to_return - # return docs, including conflicts - changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return] - self._trace('before get_docs') - docs = self._db.get_docs( - changed_doc_ids, check_for_conflicts=False, include_deleted=True) - - docs_by_gen = izip( - docs, (gen for _, gen, _ in changes_to_return), - (trans_id for _, _, trans_id in changes_to_return)) - for doc, gen, trans_id in docs_by_gen: + if self.change_to_return is not None: + changed_doc_id, gen, trans_id = self.change_to_return + doc = self._db.get_doc(changed_doc_id, include_deleted=True) return_doc_cb(doc, gen, trans_id) - # for tests - if hasattr(self._db, '_outgoing_trace'): - self._db._outgoing_trace.append((doc.doc_id, doc.rev)) - # for tests - if hasattr(self._db, '_outgoing_trace'): - self._db._last_exchange_log['return'] = { - 'docs': self._db._outgoing_trace, - 'last_gen': self.new_gen - } def insert_doc_from_source(self, doc, source_gen, trans_id): """Try to insert synced document from source. @@ -342,14 +311,6 @@ class SyncExchange(sync.SyncExchange): else: # conflict that we will returne assert state == 'conflicted' - # for tests - if hasattr(self._db, '_incoming_trace') \ - and hasattr(self._db, '_last_exchange_log'): - self._db._incoming_trace.append((doc.doc_id, doc.rev)) - self._db._last_exchange_log['receive'].update({ - 'source_uid': self.source_replica_uid, - 'source_gen': source_gen - }) class SyncResource(http_app.SyncResource): @@ -373,7 +334,7 @@ class SyncResource(http_app.SyncResource): :param last_known_trans_id: The last server replica transaction_id the client knows about. :type last_known_trans_id: str - :param ensure: Wether the server replica should be created if it does + :param ensure: Whether the server replica should be created if it does not already exist. :type ensure: bool """ -- cgit v1.2.3 From 7d9d827a5f66993863ca0c532c01ad3bf2c4353e Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 3 Jun 2014 18:43:56 -0300 Subject: Replace client sync state by a sync_id. --- server/src/leap/soledad/server/sync.py | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) (limited to 'server/src') diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index 16926f14..c6928aaa 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -48,7 +48,7 @@ class ServerSyncState(object): called 'u1db_sync_state'. """ - def __init__(self, db, source_replica_uid): + def __init__(self, db, source_replica_uid, sync_id): """ Initialize the sync state object. @@ -59,6 +59,7 @@ class ServerSyncState(object): """ self._db = db self._source_replica_uid = source_replica_uid + self._sync_id = sync_id def _key(self, key): """ @@ -91,6 +92,7 @@ class ServerSyncState(object): with CouchDatabase.sync_info_lock[self._db.replica_uid]: res.put_json( body={ + 'sync_id': self._sync_id, 'source_replica_uid': self._source_replica_uid, key: value, }, @@ -118,7 +120,8 @@ class ServerSyncState(object): """ ddoc_path = ['_design', 'syncs', '_view', 'seen_ids'] resource = self._db._database.resource(*ddoc_path) - response = resource.get_json(key=self._key(self._source_replica_uid)) + response = resource.get_json( + key=self._key([self._source_replica_uid, self._sync_id])) data = response[2] if data['rows']: entry = data['rows'].pop() @@ -160,7 +163,8 @@ class ServerSyncState(object): """ ddoc_path = ['_design', 'syncs', '_view', 'state'] resource = self._db._database.resource(*ddoc_path) - response = resource.get_json(key=self._key(self._source_replica_uid)) + response = resource.get_json( + key=self._key([self._source_replica_uid, self._sync_id])) data = response[2] gen = None trans_id = None @@ -184,7 +188,7 @@ class ServerSyncState(object): resource = self._db._database.resource(*ddoc_path) response = resource.get_json( key=self._key( - [self._source_replica_uid, received])) + [self._source_replica_uid, self._sync_id, received])) data = response[2] if not data['rows']: return None, None, None @@ -197,7 +201,7 @@ class ServerSyncState(object): class SyncExchange(sync.SyncExchange): - def __init__(self, db, source_replica_uid, last_known_generation): + def __init__(self, db, source_replica_uid, last_known_generation, sync_id): """ :param db: The target syncing database. :type db: CouchDatabase @@ -210,11 +214,13 @@ class SyncExchange(sync.SyncExchange): self._db = db self.source_replica_uid = source_replica_uid self.source_last_known_generation = last_known_generation + self.sync_id = sync_id self.new_gen = None self.new_trans_id = None self._trace_hook = None # recover sync state - self._sync_state = ServerSyncState(self._db, self.source_replica_uid) + self._sync_state = ServerSyncState( + self._db, self.source_replica_uid, sync_id) def find_changes_to_return(self, received): @@ -322,9 +328,9 @@ class SyncResource(http_app.SyncResource): @http_app.http_method( last_known_generation=int, last_known_trans_id=http_app.none_or_str, - content_as_args=True) + sync_id=http_app.none_or_str, content_as_args=True) def post_args(self, last_known_generation, last_known_trans_id=None, - ensure=False): + sync_id=None, ensure=False): """ Handle the initial arguments for the sync POST request from client. @@ -348,7 +354,7 @@ class SyncResource(http_app.SyncResource): last_known_generation, last_known_trans_id) # get a sync exchange object self.sync_exch = self.sync_exchange_class( - db, self.source_replica_uid, last_known_generation) + db, self.source_replica_uid, last_known_generation, sync_id) @http_app.http_method(content_as_args=True) def post_put(self, id, rev, content, gen, trans_id): @@ -405,8 +411,8 @@ class SyncResource(http_app.SyncResource): def post_end(self): """ - Return the current generation and transaction_id after inserting a - series of incoming documents. + Return the current generation and transaction_id after inserting one + incoming document. """ self.responder.content_type = 'application/x-soledad-sync-response' self.responder.start_response(200) -- cgit v1.2.3 From a35176a298480676d16fe195971ed89b21a78357 Mon Sep 17 00:00:00 2001 From: drebs Date: Fri, 9 Aug 2013 13:29:01 +0200 Subject: Make server auth time-insensitive. --- server/src/leap/soledad/server/auth.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) (limited to 'server/src') diff --git a/server/src/leap/soledad/server/auth.py b/server/src/leap/soledad/server/auth.py index e9d2b032..57f600a1 100644 --- a/server/src/leap/soledad/server/auth.py +++ b/server/src/leap/soledad/server/auth.py @@ -30,6 +30,7 @@ from abc import ABCMeta, abstractmethod from routes.mapper import Mapper from couchdb.client import Server from twisted.python import log +from hashlib import sha512 from leap.soledad.common import ( @@ -415,10 +416,17 @@ class SoledadTokenAuthMiddleware(SoledadAuthMiddleware): server = Server(url=self._app.state.couch_url) dbname = self.TOKENS_DB db = server[dbname] - token = db.get(token) - if token is None or \ - token[self.TOKENS_TYPE_KEY] != self.TOKENS_TYPE_DEF or \ - token[self.TOKENS_USER_ID_KEY] != uuid: + # lookup key is a hash of the token to prevent timing attacks. + token = db.get(sha512(token).hexdigest()) + if token is None: + raise InvalidAuthTokenError() + # we compare uuid hashes to avoid possible timing attacks that + # might exploit python's builtin comparison operator behaviour, + # which fails immediatelly when non-matching bytes are found. + couch_uuid_hash = sha512(token[self.TOKENS_USER_ID_KEY]).digest() + req_uuid_hash = sha512(uuid).digest() + if token[self.TOKENS_TYPE_KEY] != self.TOKENS_TYPE_DEF \ + or couch_uuid_hash != req_uuid_hash: raise InvalidAuthTokenError() return True -- cgit v1.2.3