diff options
Diffstat (limited to 'server/src/leap/soledad/server/sync.py')
-rw-r--r-- | server/src/leap/soledad/server/sync.py | 429 |
1 files changed, 429 insertions, 0 deletions
diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py new file mode 100644 index 00000000..c6928aaa --- /dev/null +++ b/server/src/leap/soledad/server/sync.py @@ -0,0 +1,429 @@ +# -*- coding: utf-8 -*- +# sync.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Server side synchronization infrastructure. +""" + +import json + + +from leap.soledad.common.couch import CouchDatabase +from itertools import izip +from u1db import sync, Document +from u1db.remote import http_app + + +MAX_REQUEST_SIZE = 200 # in Mb +MAX_ENTRY_SIZE = 200 # in Mb + + +class ServerSyncState(object): + """ + The state of one sync session, as stored on backend server. + + This object performes queries to distinct design documents: + + _design/syncs/_update/state + _design/syncs/_view/state + _design/syncs/_view/seen_ids + _design/syncs/_view/changes_to_return + + On server side, the ongoing syncs metadata is maintained in a document + called 'u1db_sync_state'. + """ + + def __init__(self, db, source_replica_uid, sync_id): + """ + Initialize the sync state object. + + :param db: The target syncing database. + :type db: CouchDatabase. + :param source_replica_uid: CouchDatabase + :type source_replica_uid: str + """ + self._db = db + self._source_replica_uid = source_replica_uid + self._sync_id = sync_id + + def _key(self, key): + """ + Format a key to be used on couch views. + + :param key: The lookup key. + :type key: json serializable object + + :return: The properly formatted key. + :rtype: str + """ + return json.dumps(key, separators=(',', ':')) + + def _put_info(self, key, value): + """ + Put some information on the sync state document. + + This method works in conjunction with the + _design/syncs/_update/state update handler couch backend. + + :param key: The key for the info to be put. + :type key: str + :param value: The value for the info to be put. + :type value: str + """ + ddoc_path = [ + '_design', 'syncs', '_update', 'state', + 'u1db_sync_state'] + res = self._db._database.resource(*ddoc_path) + with CouchDatabase.sync_info_lock[self._db.replica_uid]: + res.put_json( + body={ + 'sync_id': self._sync_id, + 'source_replica_uid': self._source_replica_uid, + key: value, + }, + headers={'content-type': 'application/json'}) + + def put_seen_id(self, seen_id, gen): + """ + Put one seen id on the sync state document. + + :param seen_id: The doc_id of a document seen during sync. + :type seen_id: str + :param gen: The corresponding db generation for that document. + :type gen: int + """ + self._put_info( + 'seen_id', + [seen_id, gen]) + + def seen_ids(self): + """ + Return all document ids seen during the sync. + + :return: A list with doc ids seen during the sync. + :rtype: list + """ + ddoc_path = ['_design', 'syncs', '_view', 'seen_ids'] + resource = self._db._database.resource(*ddoc_path) + response = resource.get_json( + key=self._key([self._source_replica_uid, self._sync_id])) + data = response[2] + if data['rows']: + entry = data['rows'].pop() + return entry['value']['seen_ids'] + return [] + + def put_changes_to_return(self, gen, trans_id, changes_to_return): + """ + Put the calculated changes to return in the backend sync state + document. + + :param gen: The target database generation that will be synced. + :type gen: int + :param trans_id: The target database transaction id that will be + synced. + :type trans_id: str + :param changes_to_return: A list of tuples with the changes to be + returned during the sync process. + :type changes_to_return: list + """ + self._put_info( + 'changes_to_return', + { + 'gen': gen, + 'trans_id': trans_id, + 'changes_to_return': changes_to_return, + } + ) + + def sync_info(self): + """ + Return information about the current sync state. + + :return: The generation and transaction id of the target database + which will be synced, and the number of documents to return, + or a tuple of Nones if those have not already been sent to + server. + :rtype: tuple + """ + ddoc_path = ['_design', 'syncs', '_view', 'state'] + resource = self._db._database.resource(*ddoc_path) + response = resource.get_json( + key=self._key([self._source_replica_uid, self._sync_id])) + data = response[2] + gen = None + trans_id = None + number_of_changes = None + if data['rows'] and data['rows'][0]['value'] is not None: + value = data['rows'][0]['value'] + gen = value['gen'] + trans_id = value['trans_id'] + number_of_changes = value['number_of_changes'] + return gen, trans_id, number_of_changes + + def next_change_to_return(self, received): + """ + Return the next change to be returned to the source syncing replica. + + :param received: How many documents the source replica has already + received during the current sync process. + :type received: int + """ + ddoc_path = ['_design', 'syncs', '_view', 'changes_to_return'] + resource = self._db._database.resource(*ddoc_path) + response = resource.get_json( + key=self._key( + [self._source_replica_uid, self._sync_id, received])) + data = response[2] + if not data['rows']: + return None, None, None + value = data['rows'][0]['value'] + gen = value['gen'] + trans_id = value['trans_id'] + next_change_to_return = value['next_change_to_return'] + return gen, trans_id, tuple(next_change_to_return) + + +class SyncExchange(sync.SyncExchange): + + def __init__(self, db, source_replica_uid, last_known_generation, sync_id): + """ + :param db: The target syncing database. + :type db: CouchDatabase + :param source_replica_uid: The uid of the source syncing replica. + :type source_replica_uid: str + :param last_known_generation: The last target replica generation the + source replica knows about. + :type last_known_generation: int + """ + self._db = db + self.source_replica_uid = source_replica_uid + self.source_last_known_generation = last_known_generation + self.sync_id = sync_id + self.new_gen = None + self.new_trans_id = None + self._trace_hook = None + # recover sync state + self._sync_state = ServerSyncState( + self._db, self.source_replica_uid, sync_id) + + + def find_changes_to_return(self, received): + """ + Find changes to return. + + Find changes since last_known_generation in db generation + order using whats_changed. It excludes documents ids that have + already been considered (superseded by the sender, etc). + + :param received: How many documents the source replica has already + received during the current sync process. + :type received: int + + :return: the generation of this database, which the caller can + consider themselves to be synchronized after processing + allreturned documents, and the amount of documents to be sent + to the source syncing replica. + :rtype: int + """ + # check if changes to return have already been calculated + new_gen, new_trans_id, number_of_changes = self._sync_state.sync_info() + if number_of_changes is None: + self._trace('before whats_changed') + new_gen, new_trans_id, changes = self._db.whats_changed( + self.source_last_known_generation) + self._trace('after whats_changed') + seen_ids = self._sync_state.seen_ids() + # changed docs that weren't superseded by or converged with + changes_to_return = [ + (doc_id, gen, trans_id) for (doc_id, gen, trans_id) in changes + # there was a subsequent update + if doc_id not in seen_ids or seen_ids.get(doc_id) < gen] + self._sync_state.put_changes_to_return( + new_gen, new_trans_id, changes_to_return) + number_of_changes = len(changes_to_return) + # query server for stored changes + _, _, next_change_to_return = \ + self._sync_state.next_change_to_return(received) + self.new_gen = new_gen + self.new_trans_id = new_trans_id + # and append one change + self.change_to_return = next_change_to_return + return self.new_gen, number_of_changes + + def return_one_doc(self, return_doc_cb): + """ + Return one changed document and its last change generation to the + source syncing replica by invoking the callback return_doc_cb. + + This is called once for each document to be transferred from target to + source. + + :param return_doc_cb: is a callback used to return the documents with + their last change generation to the target + replica. + :type return_doc_cb: callable(doc, gen, trans_id) + """ + if self.change_to_return is not None: + changed_doc_id, gen, trans_id = self.change_to_return + doc = self._db.get_doc(changed_doc_id, include_deleted=True) + return_doc_cb(doc, gen, trans_id) + + def insert_doc_from_source(self, doc, source_gen, trans_id): + """Try to insert synced document from source. + + Conflicting documents are not inserted but will be sent over + to the sync source. + + It keeps track of progress by storing the document source + generation as well. + + The 1st step of a sync exchange is to call this repeatedly to + try insert all incoming documents from the source. + + :param doc: A Document object. + :type doc: Document + :param source_gen: The source generation of doc. + :type source_gen: int + :param trans_id: The transaction id of that document change. + :type trans_id: str + """ + state, at_gen = self._db._put_doc_if_newer( + doc, save_conflict=False, replica_uid=self.source_replica_uid, + replica_gen=source_gen, replica_trans_id=trans_id) + if state == 'inserted': + self._sync_state.put_seen_id(doc.doc_id, at_gen) + elif state == 'converged': + # magical convergence + self._sync_state.put_seen_id(doc.doc_id, at_gen) + elif state == 'superseded': + # we have something newer that we will return + pass + else: + # conflict that we will returne + assert state == 'conflicted' + + +class SyncResource(http_app.SyncResource): + + max_request_size = MAX_REQUEST_SIZE * 1024 * 1024 + max_entry_size = MAX_ENTRY_SIZE * 1024 * 1024 + + sync_exchange_class = SyncExchange + + @http_app.http_method( + last_known_generation=int, last_known_trans_id=http_app.none_or_str, + sync_id=http_app.none_or_str, content_as_args=True) + def post_args(self, last_known_generation, last_known_trans_id=None, + sync_id=None, ensure=False): + """ + Handle the initial arguments for the sync POST request from client. + + :param last_known_generation: The last server replica generation the + client knows about. + :type last_known_generation: int + :param last_known_trans_id: The last server replica transaction_id the + client knows about. + :type last_known_trans_id: str + :param ensure: Whether the server replica should be created if it does + not already exist. + :type ensure: bool + """ + # create or open the database + if ensure: + db, self.replica_uid = self.state.ensure_database(self.dbname) + else: + db = self.state.open_database(self.dbname) + # validate the information the client has about server replica + db.validate_gen_and_trans_id( + last_known_generation, last_known_trans_id) + # get a sync exchange object + self.sync_exch = self.sync_exchange_class( + db, self.source_replica_uid, last_known_generation, sync_id) + + @http_app.http_method(content_as_args=True) + def post_put(self, id, rev, content, gen, trans_id): + """ + Put one incoming document into the server replica. + + :param id: The id of the incoming document. + :type id: str + :param rev: The revision of the incoming document. + :type rev: str + :param content: The content of the incoming document. + :type content: dict + :param gen: The source replica generation corresponding to the + revision of the incoming document. + :type gen: int + :param trans_id: The source replica transaction id corresponding to + the revision of the incoming document. + :type trans_id: str + """ + doc = Document(id, rev, content) + self.sync_exch.insert_doc_from_source(doc, gen, trans_id) + + @http_app.http_method(received=int, content_as_args=True) + def post_get(self, received): + """ + Return one syncing document to the client. + + :param received: How many documents have already been received by the + client on the current sync session. + :type received: int + """ + + def send_doc(doc, gen, trans_id): + entry = dict(id=doc.doc_id, rev=doc.rev, content=doc.get_json(), + gen=gen, trans_id=trans_id) + self.responder.stream_entry(entry) + + new_gen, number_of_changes = \ + self.sync_exch.find_changes_to_return(received) + self.responder.content_type = 'application/x-u1db-sync-response' + self.responder.start_response(200) + self.responder.start_stream(), + header = { + "new_generation": new_gen, + "new_transaction_id": self.sync_exch.new_trans_id, + "number_of_changes": number_of_changes, + } + if self.replica_uid is not None: + header['replica_uid'] = self.replica_uid + self.responder.stream_entry(header) + self.sync_exch.return_one_doc(send_doc) + self.responder.end_stream() + self.responder.finish_response() + + def post_end(self): + """ + Return the current generation and transaction_id after inserting one + incoming document. + """ + self.responder.content_type = 'application/x-soledad-sync-response' + self.responder.start_response(200) + self.responder.start_stream(), + new_gen, new_trans_id = self.sync_exch._db._get_generation_info() + header = { + "new_generation": new_gen, + "new_transaction_id": new_trans_id, + } + if self.replica_uid is not None: + header['replica_uid'] = self.replica_uid + self.responder.stream_entry(header) + self.responder.end_stream() + self.responder.finish_response() |