From 24465b7b2cd77b66f637e22453dd24a2d67c4ce6 Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 17 Apr 2014 16:15:04 -0300 Subject: Split sync in multiple POST requests in server (#5571). --- server/changes/feature_5571_split-sync-post | 1 + server/src/leap/soledad/server/__init__.py | 137 ++++++++- server/src/leap/soledad/server/sync.py | 462 ++++++++++++++++++++++++++++ 3 files changed, 586 insertions(+), 14 deletions(-) create mode 100644 server/changes/feature_5571_split-sync-post create mode 100644 server/src/leap/soledad/server/sync.py (limited to 'server') diff --git a/server/changes/feature_5571_split-sync-post b/server/changes/feature_5571_split-sync-post new file mode 100644 index 00000000..ad269cd4 --- /dev/null +++ b/server/changes/feature_5571_split-sync-post @@ -0,0 +1 @@ + o Split sync in multiple POST requests in server (#5571). diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py index c170f230..573afdd6 100644 --- a/server/src/leap/soledad/server/__init__.py +++ b/server/src/leap/soledad/server/__init__.py @@ -87,8 +87,10 @@ and lock documents on the shared database is handled by """ import configparser +import urlparse +import sys -from u1db.remote import http_app +from u1db.remote import http_app, utils # Keep OpenSSL's tsafe before importing Twisted submodules so we can put # it back if Twisted==12.0.0 messes with it. @@ -99,24 +101,24 @@ from twisted import version if version.base() == "12.0.0": # Put OpenSSL's tsafe back into place. This can probably be removed if we # come to use Twisted>=12.3.0. - import sys sys.modules['OpenSSL.tsafe'] = old_tsafe from leap.soledad.server.auth import SoledadTokenAuthMiddleware from leap.soledad.server.gzip_middleware import GzipMiddleware from leap.soledad.server.lock_resource import LockResource +from leap.soledad.server.sync import ( + SyncResource, + MAX_REQUEST_SIZE, + MAX_ENTRY_SIZE, +) from leap.soledad.common import SHARED_DB_NAME from leap.soledad.common.couch import CouchServerState -#----------------------------------------------------------------------------- +# ---------------------------------------------------------------------------- # Soledad WSGI application -#----------------------------------------------------------------------------- - -MAX_REQUEST_SIZE = 200 # in Mb -MAX_ENTRY_SIZE = 200 # in Mb - +# ---------------------------------------------------------------------------- class SoledadApp(http_app.HTTPApp): """ @@ -147,14 +149,121 @@ class SoledadApp(http_app.HTTPApp): return http_app.HTTPApp.__call__(self, environ, start_response) +# ---------------------------------------------------------------------------- +# WSGI resources registration +# ---------------------------------------------------------------------------- + +# monkey patch u1db with a new resource map +http_app.url_to_resource = http_app.URLToResource() + +# register u1db unmodified resources +http_app.url_to_resource.register(http_app.GlobalResource) +http_app.url_to_resource.register(http_app.DatabaseResource) +http_app.url_to_resource.register(http_app.DocsResource) +http_app.url_to_resource.register(http_app.DocResource) + +# register Soledad's new or modified resources http_app.url_to_resource.register(LockResource) -http_app.SyncResource.max_request_size = MAX_REQUEST_SIZE * 1024 * 1024 -http_app.SyncResource.max_entry_size = MAX_ENTRY_SIZE * 1024 * 1024 +http_app.url_to_resource.register(SyncResource) + +# ---------------------------------------------------------------------------- +# Modified HTTP method invocation (to account for splitted sync) +# ---------------------------------------------------------------------------- -#----------------------------------------------------------------------------- +class HTTPInvocationByMethodWithBody( + http_app.HTTPInvocationByMethodWithBody): + """ + Invoke methods on a resource. + """ + + def __call__(self): + """ + Call an HTTP method of a resource. + + This method was rewritten to allow for a sync flow which uses one POST + request for each transferred document (back and forth). + + Usual U1DB sync process transfers all documents from client to server + and back in only one POST request. This is inconvenient for some + reasons, as lack of possibility of gracefully interrupting the sync + process, and possible timeouts for when dealing with large documents + that have to be retrieved and encrypted/decrypted. Because of those, + we split the sync process into many POST requests. + """ + args = urlparse.parse_qsl(self.environ['QUERY_STRING'], + strict_parsing=False) + try: + args = dict( + (k.decode('utf-8'), v.decode('utf-8')) for k, v in args) + except ValueError: + raise http_app.BadRequest() + method = self.environ['REQUEST_METHOD'].lower() + if method in ('get', 'delete'): + meth = self._lookup(method) + return meth(args, None) + else: + # we expect content-length > 0, reconsider if we move + # to support chunked enconding + try: + content_length = int(self.environ['CONTENT_LENGTH']) + except (ValueError, KeyError): + raise http_app.BadRequest + if content_length <= 0: + raise http_app.BadRequest + if content_length > self.max_request_size: + raise http_app.BadRequest + reader = http_app._FencedReader( + self.environ['wsgi.input'], content_length, + self.max_entry_size) + content_type = self.environ.get('CONTENT_TYPE') + if content_type == 'application/json': + meth = self._lookup(method) + body = reader.read_chunk(sys.maxint) + return meth(args, body) + elif content_type.startswith('application/x-soledad-sync'): + # read one line and validate it + body_getline = reader.getline + if body_getline().strip() != '[': + raise http_app.BadRequest() + line = body_getline() + line, comma = utils.check_and_strip_comma(line.strip()) + meth_args = self._lookup('%s_args' % method) + meth_args(args, line) + # handle incoming documents + if content_type == 'application/x-soledad-sync-put': + meth_put = self._lookup('%s_put' % method) + meth_end = self._lookup('%s_end' % method) + while True: + line = body_getline() + entry = line.strip() + if entry == ']': # end of incoming document stream + break + if not entry or not comma: # empty or no prec comma + raise http_app.BadRequest + entry, comma = utils.check_and_strip_comma(entry) + meth_put({}, entry) + if comma or body_getline(): # extra comma or data + raise http_app.BadRequest + return meth_end() + # handle outgoing documents + elif content_type == 'application/x-soledad-sync-get': + line = body_getline() + entry = line.strip() + meth_get = self._lookup('%s_get' % method) + return meth_get({}, line) + else: + raise http_app.BadRequest() + else: + raise http_app.BadRequest() + + +http_app.HTTPInvocationByMethodWithBody = HTTPInvocationByMethodWithBody + + +# ---------------------------------------------------------------------------- # Auxiliary functions -#----------------------------------------------------------------------------- +# ---------------------------------------------------------------------------- def load_configuration(file_path): """ @@ -180,9 +289,9 @@ def load_configuration(file_path): return conf -#----------------------------------------------------------------------------- +# ---------------------------------------------------------------------------- # Run as Twisted WSGI Resource -#----------------------------------------------------------------------------- +# ---------------------------------------------------------------------------- def application(environ, start_response): conf = load_configuration('/etc/leap/soledad-server.conf') diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py new file mode 100644 index 00000000..3b8b69fb --- /dev/null +++ b/server/src/leap/soledad/server/sync.py @@ -0,0 +1,462 @@ +# -*- coding: utf-8 -*- +# sync.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +""" +Server side synchronization infrastructure. +""" + +import json + + +from leap.soledad.common.couch import CouchDatabase +from itertools import izip +from u1db import sync, Document +from u1db.remote import http_app + + +MAX_REQUEST_SIZE = 200 # in Mb +MAX_ENTRY_SIZE = 200 # in Mb + + +class ServerSyncState(object): + """ + The state of one sync session, as stored on backend server. + + This object performes queries to distinct design documents: + + _design/syncs/_update/state + _design/syncs/_view/state + _design/syncs/_view/seen_ids + _design/syncs/_view/changes_to_return + + On server side, the ongoing syncs metadata is maintained in a document + called 'u1db_sync_state'. + """ + + def __init__(self, db, source_replica_uid): + """ + Initialize the sync state object. + + :param db: The target syncing database. + :type db: CouchDatabase. + :param source_replica_uid: CouchDatabase + :type source_replica_uid: str + """ + self._db = db + self._source_replica_uid = source_replica_uid + + def _key(self, key): + """ + Format a key to be used on couch views. + + :param key: The lookup key. + :type key: json serializable object + + :return: The properly formatted key. + :rtype: str + """ + return json.dumps(key, separators=(',', ':')) + + def _put_info(self, key, value): + """ + Put some information on the sync state document. + + This method works in conjunction with the + _design/syncs/_update/state update handler couch backend. + + :param key: The key for the info to be put. + :type key: str + :param value: The value for the info to be put. + :type value: str + """ + ddoc_path = [ + '_design', 'syncs', '_update', 'state', + 'u1db_sync_state'] + res = self._db._database.resource(*ddoc_path) + with CouchDatabase.sync_info_lock[self._db.replica_uid]: + res.put_json( + body={ + 'source_replica_uid': self._source_replica_uid, + key: value, + }, + headers={'content-type': 'application/json'}) + + def put_seen_id(self, seen_id, gen): + """ + Put one seen id on the sync state document. + + :param seen_id: The doc_id of a document seen during sync. + :type seen_id: str + :param gen: The corresponding db generation for that document. + :type gen: int + """ + self._put_info( + 'seen_id', + [seen_id, gen]) + + def seen_ids(self): + """ + Return all document ids seen during the sync. + + :return: A list with doc ids seen during the sync. + :rtype: list + """ + ddoc_path = ['_design', 'syncs', '_view', 'seen_ids'] + resource = self._db._database.resource(*ddoc_path) + response = resource.get_json(key=self._key(self._source_replica_uid)) + data = response[2] + if len(data['rows']) > 0: + entry = data['rows'].pop() + return entry['value']['seen_ids'] + return [] + + def put_changes_to_return(self, gen, trans_id, changes_to_return): + """ + Put the calculated changes to return in the backend sync state + document. + + :param gen: The target database generation that will be synced. + :type gen: int + :param trans_id: The target database transaction id that will be + synced. + :type trans_id: str + :param changes_to_return: A list of tuples with the changes to be + returned during the sync process. + :type changes_to_return: list + """ + self._put_info( + 'changes_to_return', + { + 'gen': gen, + 'trans_id': trans_id, + 'changes_to_return': changes_to_return, + } + ) + + def sync_info(self): + """ + Return information about the current sync state. + + :return: The generation and transaction id of the target database + which will be synced, and the number of documents do return, + or a tuple of Nones if those have not already been sent to + server. + :rtype: tuple + """ + ddoc_path = ['_design', 'syncs', '_view', 'state'] + resource = self._db._database.resource(*ddoc_path) + response = resource.get_json(key=self._key(self._source_replica_uid)) + data = response[2] + gen = None + trans_id = None + number_of_changes = None + if len(data['rows']) > 0 and data['rows'][0]['value'] is not None: + value = data['rows'][0]['value'] + gen = value['gen'] + trans_id = value['trans_id'] + number_of_changes = value['number_of_changes'] + return gen, trans_id, number_of_changes + + def next_change_to_return(self, received): + """ + Return the next change to be returned to the source syncing replica. + + :param received: How many documents the source replica has already + received during the current sync process. + :type received: int + """ + ddoc_path = ['_design', 'syncs', '_view', 'changes_to_return'] + resource = self._db._database.resource(*ddoc_path) + response = resource.get_json( + key=self._key( + [self._source_replica_uid, received])) + data = response[2] + if len(data['rows']) == 0: + return None, None, None + value = data['rows'][0]['value'] + gen = value['gen'] + trans_id = value['trans_id'] + next_change_to_return = value['next_change_to_return'] + return gen, trans_id, tuple(next_change_to_return) + + +class SyncExchange(sync.SyncExchange): + + def __init__(self, db, source_replica_uid, last_known_generation): + """ + :param db: The target syncing database. + :type db: CouchDatabase + :param source_replica_uid: The uid of the source syncing replica. + :type source_replica_uid: str + :param last_known_generation: The last target replica generation the + source replica knows about. + :type last_known_generation: int + """ + self._db = db + self.source_replica_uid = source_replica_uid + self.source_last_known_generation = last_known_generation + self.new_gen = None + self.new_trans_id = None + self._trace_hook = None + # recover sync state + self._sync_state = ServerSyncState(self._db, self.source_replica_uid) + # for tests + #self._incoming_trace = [] + #if hasattr(self._db, '_incoming_trace'): + # self._incoming_trace = self._db._incoming_trace + #self._db._last_exchange_log = { + # 'receive': {'docs': self._incoming_trace}, + # 'return': None + # } + + + def find_changes_to_return(self, received): + """ + Find changes to return. + + Find changes since last_known_generation in db generation + order using whats_changed. It excludes documents ids that have + already been considered (superseded by the sender, etc). + + :param received: How many documents the source replica has already + received during the current sync process. + :type received: int + + :return: the generation of this database, which the caller can + consider themselves to be synchronized after processing + allreturned documents, and the amount of documents to be sent + to the source syncing replica. + :rtype: int + """ + if hasattr(self._db, '_last_exchange_log'): + self._db._last_exchange_log['receive'].update({ # for tests + 'last_known_gen': self.source_last_known_generation + }) + # check if changes to return have already been calculated + new_gen, new_trans_id, number_of_changes = self._sync_state.sync_info() + if number_of_changes is None: + self._trace('before whats_changed') + new_gen, new_trans_id, changes = self._db.whats_changed( + self.source_last_known_generation) + self._trace('after whats_changed') + seen_ids = self._sync_state.seen_ids() + # changed docs that weren't superseded by or converged with + changes_to_return = [ + (doc_id, gen, trans_id) for (doc_id, gen, trans_id) in changes + # there was a subsequent update + if doc_id not in seen_ids or seen_ids.get(doc_id) < gen] + self._sync_state.put_changes_to_return( + new_gen, new_trans_id, changes_to_return) + number_of_changes = len(changes_to_return) + # query server for stored changes + _, _, next_change_to_return = \ + self._sync_state.next_change_to_return(received) + self.new_gen = new_gen + self.new_trans_id = new_trans_id + # and append one change + self.changes_to_return = [] + if next_change_to_return is not None: + self.changes_to_return.append(next_change_to_return) + return self.new_gen, number_of_changes + + def return_one_doc(self, return_doc_cb): + """ + Return one changed document and its last change generation to the + source syncing replica by invoking the callback return_doc_cb. + + This is called once for each document to be transferred from target to + source. + + :param return_doc_cb: is a callback used to return the documents with + their last change generation to the target + replica. + :type return_doc_cb: callable(doc, gen, trans_id) + """ + changes_to_return = self.changes_to_return + # return docs, including conflicts + changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return] + self._trace('before get_docs') + docs = self._db.get_docs( + changed_doc_ids, check_for_conflicts=False, include_deleted=True) + + docs_by_gen = izip( + docs, (gen for _, gen, _ in changes_to_return), + (trans_id for _, _, trans_id in changes_to_return)) + for doc, gen, trans_id in docs_by_gen: + return_doc_cb(doc, gen, trans_id) + # for tests + if hasattr(self._db, '_outgoing_trace'): + self._db._outgoing_trace.append((doc.doc_id, doc.rev)) + # for tests + if hasattr(self._db, '_outgoing_trace'): + self._db._last_exchange_log['return'] = { + 'docs': self._db._outgoing_trace, + 'last_gen': self.new_gen + } + + def insert_doc_from_source(self, doc, source_gen, trans_id): + """Try to insert synced document from source. + + Conflicting documents are not inserted but will be sent over + to the sync source. + + It keeps track of progress by storing the document source + generation as well. + + The 1st step of a sync exchange is to call this repeatedly to + try insert all incoming documents from the source. + + :param doc: A Document object. + :type doc: Document + :param source_gen: The source generation of doc. + :type source_gen: int + :param trans_id: The transaction id of that document change. + :type trans_id: str + """ + state, at_gen = self._db._put_doc_if_newer( + doc, save_conflict=False, replica_uid=self.source_replica_uid, + replica_gen=source_gen, replica_trans_id=trans_id) + if state == 'inserted': + self._sync_state.put_seen_id(doc.doc_id, at_gen) + elif state == 'converged': + # magical convergence + self._sync_state.put_seen_id(doc.doc_id, at_gen) + elif state == 'superseded': + # we have something newer that we will return + pass + else: + # conflict that we will returne + assert state == 'conflicted' + # for tests + if hasattr(self._db, '_incoming_trace') \ + and hasattr(self._db, '_last_exchange_log'): + self._db._incoming_trace.append((doc.doc_id, doc.rev)) + self._db._last_exchange_log['receive'].update({ + 'source_uid': self.source_replica_uid, + 'source_gen': source_gen + }) + + +class SyncResource(http_app.SyncResource): + + max_request_size = MAX_REQUEST_SIZE * 1024 * 1024 + max_entry_size = MAX_ENTRY_SIZE * 1024 * 1024 + + sync_exchange_class = SyncExchange + + @http_app.http_method( + last_known_generation=int, last_known_trans_id=http_app.none_or_str, + content_as_args=True) + def post_args(self, last_known_generation, last_known_trans_id=None, + ensure=False): + """ + Handle the initial arguments for the sync POST request from client. + + :param last_known_generation: The last server replica generation the + client knows about. + :type last_known_generation: int + :param last_known_trans_id: The last server replica transaction_id the + client knows about. + :type last_known_trans_id: str + :param ensure: Wether the server replica should be created if it does + not already exist. + :type ensure: bool + """ + # create or open the database + if ensure: + db, self.replica_uid = self.state.ensure_database(self.dbname) + else: + db = self.state.open_database(self.dbname) + # validate the information the client has about server replica + db.validate_gen_and_trans_id( + last_known_generation, last_known_trans_id) + # get a sync exchange object + self.sync_exch = self.sync_exchange_class( + db, self.source_replica_uid, last_known_generation) + + @http_app.http_method(content_as_args=True) + def post_put(self, id, rev, content, gen, trans_id): + """ + Put one incoming document into the server replica. + + :param id: The id of the incoming document. + :type id: str + :param rev: The revision of the incoming document. + :type rev: str + :param content: The content of the incoming document. + :type content: dict + :param gen: The source replica generation corresponding to the + revision of the incoming document. + :type gen: int + :param trans_id: The source replica transaction id corresponding to + the revision of the incoming document. + :type trans_id: str + """ + doc = Document(id, rev, content) + self.sync_exch.insert_doc_from_source(doc, gen, trans_id) + + @http_app.http_method(received=int, content_as_args=True) + def post_get(self, received): + """ + Return one syncing document to the client. + + :param received: How many documents have already been received by the + client on the current sync session. + :type received: int + """ + + def send_doc(doc, gen, trans_id): + entry = dict(id=doc.doc_id, rev=doc.rev, content=doc.get_json(), + gen=gen, trans_id=trans_id) + self.responder.stream_entry(entry) + + new_gen, number_of_changes = \ + self.sync_exch.find_changes_to_return(received) + self.responder.content_type = 'application/x-u1db-sync-response' + self.responder.start_response(200) + self.responder.start_stream(), + header = { + "new_generation": new_gen, + "new_transaction_id": self.sync_exch.new_trans_id, + "number_of_changes": number_of_changes, + } + if self.replica_uid is not None: + header['replica_uid'] = self.replica_uid + self.responder.stream_entry(header) + self.sync_exch.return_one_doc(send_doc) + self.responder.end_stream() + self.responder.finish_response() + + def post_end(self): + """ + Return the current generation and transaction_id after inserting a + series of incoming documents. + """ + self.responder.content_type = 'application/x-soledad-sync-response' + self.responder.start_response(200) + self.responder.start_stream(), + new_gen, new_trans_id = self.sync_exch._db._get_generation_info() + header = { + "new_generation": new_gen, + "new_transaction_id": new_trans_id, + } + if self.replica_uid is not None: + header['replica_uid'] = self.replica_uid + self.responder.stream_entry(header) + self.responder.end_stream() + self.responder.finish_response() -- cgit v1.2.3