diff options
| -rw-r--r-- | common/src/leap/soledad/common/couch.py | 8 | ||||
| -rw-r--r-- | server/changes/feature_5571_split-sync-post | 1 | ||||
| -rw-r--r-- | server/src/leap/soledad/server/__init__.py | 137 | ||||
| -rw-r--r-- | server/src/leap/soledad/server/sync.py | 462 | 
4 files changed, 592 insertions, 16 deletions
| diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 0aa84170..f4696cee 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -371,6 +371,7 @@ class CouchDatabase(CommonBackend):      MAX_GET_DOCS_THREADS = 20      update_handler_lock = defaultdict(threading.Lock) +    sync_info_lock = defaultdict(threading.Lock)      class _GetDocThread(threading.Thread):          """ @@ -440,7 +441,8 @@ class CouchDatabase(CommonBackend):                  if not create:                      raise DatabaseDoesNotExist()                  server.create(dbname) -        return cls(url, dbname, replica_uid=replica_uid, ensure_ddocs=ensure_ddocs) +        return cls( +            url, dbname, replica_uid=replica_uid, ensure_ddocs=ensure_ddocs)      def __init__(self, url, dbname, replica_uid=None, ensure_ddocs=True):          """ @@ -576,6 +578,8 @@ class CouchDatabase(CommonBackend):      _replica_uid = property(_get_replica_uid, _set_replica_uid) +    replica_uid = property(_get_replica_uid) +      def _get_generation(self):          """          Return the current generation. @@ -869,7 +873,7 @@ class CouchDatabase(CommonBackend):              # Date.prototype.getTime() which was used before inside a couchdb              # update handler.              (int(time.time() * 1000), -            self._allocate_transaction_id())) +             self._allocate_transaction_id()))          # build the couch document          couch_doc = {              '_id': doc.doc_id, diff --git a/server/changes/feature_5571_split-sync-post b/server/changes/feature_5571_split-sync-post new file mode 100644 index 00000000..ad269cd4 --- /dev/null +++ b/server/changes/feature_5571_split-sync-post @@ -0,0 +1 @@ +  o Split sync in multiple POST requests in server (#5571). diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py index c170f230..573afdd6 100644 --- a/server/src/leap/soledad/server/__init__.py +++ b/server/src/leap/soledad/server/__init__.py @@ -87,8 +87,10 @@ and lock documents on the shared database is handled by  """  import configparser +import urlparse +import sys -from u1db.remote import http_app +from u1db.remote import http_app, utils  # Keep OpenSSL's tsafe before importing Twisted submodules so we can put  # it back if Twisted==12.0.0 messes with it. @@ -99,24 +101,24 @@ from twisted import version  if version.base() == "12.0.0":      # Put OpenSSL's tsafe back into place. This can probably be removed if we      # come to use Twisted>=12.3.0. -    import sys      sys.modules['OpenSSL.tsafe'] = old_tsafe  from leap.soledad.server.auth import SoledadTokenAuthMiddleware  from leap.soledad.server.gzip_middleware import GzipMiddleware  from leap.soledad.server.lock_resource import LockResource +from leap.soledad.server.sync import ( +    SyncResource, +    MAX_REQUEST_SIZE, +    MAX_ENTRY_SIZE, +)  from leap.soledad.common import SHARED_DB_NAME  from leap.soledad.common.couch import CouchServerState -#----------------------------------------------------------------------------- +# ----------------------------------------------------------------------------  # Soledad WSGI application -#----------------------------------------------------------------------------- - -MAX_REQUEST_SIZE = 200  # in Mb -MAX_ENTRY_SIZE = 200  # in Mb - +# ----------------------------------------------------------------------------  class SoledadApp(http_app.HTTPApp):      """ @@ -147,14 +149,121 @@ class SoledadApp(http_app.HTTPApp):          return http_app.HTTPApp.__call__(self, environ, start_response) +# ---------------------------------------------------------------------------- +# WSGI resources registration +# ---------------------------------------------------------------------------- + +# monkey patch u1db with a new resource map +http_app.url_to_resource = http_app.URLToResource() + +# register u1db unmodified resources +http_app.url_to_resource.register(http_app.GlobalResource) +http_app.url_to_resource.register(http_app.DatabaseResource) +http_app.url_to_resource.register(http_app.DocsResource) +http_app.url_to_resource.register(http_app.DocResource) + +# register Soledad's new or modified resources  http_app.url_to_resource.register(LockResource) -http_app.SyncResource.max_request_size = MAX_REQUEST_SIZE * 1024 * 1024 -http_app.SyncResource.max_entry_size = MAX_ENTRY_SIZE * 1024 * 1024 +http_app.url_to_resource.register(SyncResource) + +# ---------------------------------------------------------------------------- +# Modified HTTP method invocation (to account for splitted sync) +# ---------------------------------------------------------------------------- -#----------------------------------------------------------------------------- +class HTTPInvocationByMethodWithBody( +        http_app.HTTPInvocationByMethodWithBody): +    """ +    Invoke methods on a resource. +    """ + +    def __call__(self): +        """ +        Call an HTTP method of a resource. + +        This method was rewritten to allow for a sync flow which uses one POST +        request for each transferred document (back and forth). + +        Usual U1DB sync process transfers all documents from client to server +        and back in only one POST request. This is inconvenient for some +        reasons, as lack of possibility of gracefully interrupting the sync +        process, and possible timeouts for when dealing with large documents +        that have to be retrieved and encrypted/decrypted. Because of those, +        we split the sync process into many POST requests. +        """ +        args = urlparse.parse_qsl(self.environ['QUERY_STRING'], +                                  strict_parsing=False) +        try: +            args = dict( +                (k.decode('utf-8'), v.decode('utf-8')) for k, v in args) +        except ValueError: +            raise http_app.BadRequest() +        method = self.environ['REQUEST_METHOD'].lower() +        if method in ('get', 'delete'): +            meth = self._lookup(method) +            return meth(args, None) +        else: +            # we expect content-length > 0, reconsider if we move +            # to support chunked enconding +            try: +                content_length = int(self.environ['CONTENT_LENGTH']) +            except (ValueError, KeyError): +                raise http_app.BadRequest +            if content_length <= 0: +                raise http_app.BadRequest +            if content_length > self.max_request_size: +                raise http_app.BadRequest +            reader = http_app._FencedReader( +                self.environ['wsgi.input'], content_length, +                self.max_entry_size) +            content_type = self.environ.get('CONTENT_TYPE') +            if content_type == 'application/json': +                meth = self._lookup(method) +                body = reader.read_chunk(sys.maxint) +                return meth(args, body) +            elif content_type.startswith('application/x-soledad-sync'): +                # read one line and validate it +                body_getline = reader.getline +                if body_getline().strip() != '[': +                    raise http_app.BadRequest() +                line = body_getline() +                line, comma = utils.check_and_strip_comma(line.strip()) +                meth_args = self._lookup('%s_args' % method) +                meth_args(args, line) +                # handle incoming documents +                if content_type == 'application/x-soledad-sync-put': +                    meth_put = self._lookup('%s_put' % method) +                    meth_end = self._lookup('%s_end' % method) +                    while True: +                        line = body_getline() +                        entry = line.strip() +                        if entry == ']':  # end of incoming document stream +                            break +                        if not entry or not comma:  # empty or no prec comma +                            raise http_app.BadRequest +                        entry, comma = utils.check_and_strip_comma(entry) +                        meth_put({}, entry) +                    if comma or body_getline():  # extra comma or data +                        raise http_app.BadRequest +                    return meth_end() +                # handle outgoing documents +                elif content_type == 'application/x-soledad-sync-get': +                    line = body_getline() +                    entry = line.strip() +                    meth_get = self._lookup('%s_get' % method) +                    return meth_get({}, line) +                else: +                    raise http_app.BadRequest() +            else: +                raise http_app.BadRequest() + + +http_app.HTTPInvocationByMethodWithBody = HTTPInvocationByMethodWithBody + + +# ----------------------------------------------------------------------------  # Auxiliary functions -#----------------------------------------------------------------------------- +# ----------------------------------------------------------------------------  def load_configuration(file_path):      """ @@ -180,9 +289,9 @@ def load_configuration(file_path):      return conf -#----------------------------------------------------------------------------- +# ----------------------------------------------------------------------------  # Run as Twisted WSGI Resource -#----------------------------------------------------------------------------- +# ----------------------------------------------------------------------------  def application(environ, start_response):      conf = load_configuration('/etc/leap/soledad-server.conf') diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py new file mode 100644 index 00000000..3b8b69fb --- /dev/null +++ b/server/src/leap/soledad/server/sync.py @@ -0,0 +1,462 @@ +# -*- coding: utf-8 -*- +# sync.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Server side synchronization infrastructure. +""" + +import json + + +from leap.soledad.common.couch import CouchDatabase +from itertools import izip +from u1db import sync, Document +from u1db.remote import http_app + + +MAX_REQUEST_SIZE = 200  # in Mb +MAX_ENTRY_SIZE = 200  # in Mb + + +class ServerSyncState(object): +    """ +    The state of one sync session, as stored on backend server. + +    This object performes queries to distinct design documents: + +        _design/syncs/_update/state +        _design/syncs/_view/state +        _design/syncs/_view/seen_ids +        _design/syncs/_view/changes_to_return + +    On server side, the ongoing syncs metadata is maintained in a document +    called 'u1db_sync_state'. +    """ + +    def __init__(self, db, source_replica_uid): +        """ +        Initialize the sync state object. + +        :param db: The target syncing database. +        :type db: CouchDatabase. +        :param source_replica_uid: CouchDatabase +        :type source_replica_uid: str +        """ +        self._db = db +        self._source_replica_uid = source_replica_uid + +    def _key(self, key): +        """ +        Format a key to be used on couch views. + +        :param key: The lookup key. +        :type key: json serializable object + +        :return: The properly formatted key. +        :rtype: str +        """ +        return json.dumps(key, separators=(',', ':')) + +    def _put_info(self, key, value): +        """ +        Put some information on the sync state document. + +        This method works in conjunction with the +        _design/syncs/_update/state update handler couch backend. + +        :param key: The key for the info to be put. +        :type key: str +        :param value: The value for the info to be put. +        :type value: str +        """ +        ddoc_path = [ +            '_design', 'syncs', '_update', 'state', +            'u1db_sync_state'] +        res = self._db._database.resource(*ddoc_path) +        with CouchDatabase.sync_info_lock[self._db.replica_uid]: +            res.put_json( +                body={ +                    'source_replica_uid': self._source_replica_uid, +                    key: value, +                }, +                headers={'content-type': 'application/json'}) + +    def put_seen_id(self, seen_id, gen): +        """ +        Put one seen id on the sync state document. + +        :param seen_id: The doc_id of a document seen during sync. +        :type seen_id: str +        :param gen: The corresponding db generation for that document. +        :type gen: int +        """ +        self._put_info( +            'seen_id', +            [seen_id, gen]) + +    def seen_ids(self): +        """ +        Return all document ids seen during the sync. + +        :return: A list with doc ids seen during the sync. +        :rtype: list +        """ +        ddoc_path = ['_design', 'syncs', '_view', 'seen_ids'] +        resource = self._db._database.resource(*ddoc_path) +        response = resource.get_json(key=self._key(self._source_replica_uid)) +        data = response[2] +        if len(data['rows']) > 0: +            entry = data['rows'].pop() +            return entry['value']['seen_ids'] +        return [] + +    def put_changes_to_return(self, gen, trans_id, changes_to_return): +        """ +        Put the calculated changes to return in the backend sync state +        document. + +        :param gen: The target database generation that will be synced. +        :type gen: int +        :param trans_id: The target database transaction id that will be +                         synced. +        :type trans_id: str +        :param changes_to_return: A list of tuples with the changes to be +                                  returned during the sync process. +        :type changes_to_return: list +        """ +        self._put_info( +            'changes_to_return', +            { +                'gen': gen, +                'trans_id': trans_id, +                'changes_to_return': changes_to_return, +            } +        ) + +    def sync_info(self): +        """ +        Return information about the current sync state. + +        :return: The generation and transaction id of the target database +                 which will be synced, and the number of documents do return, +                 or a tuple of Nones if those have not already been sent to +                 server. +        :rtype: tuple +        """ +        ddoc_path = ['_design', 'syncs', '_view', 'state'] +        resource = self._db._database.resource(*ddoc_path) +        response = resource.get_json(key=self._key(self._source_replica_uid)) +        data = response[2] +        gen = None +        trans_id = None +        number_of_changes = None +        if len(data['rows']) > 0 and data['rows'][0]['value'] is not None: +            value = data['rows'][0]['value'] +            gen = value['gen'] +            trans_id = value['trans_id'] +            number_of_changes = value['number_of_changes'] +        return gen, trans_id, number_of_changes + +    def next_change_to_return(self, received): +        """ +        Return the next change to be returned to the source syncing replica. + +        :param received: How many documents the source replica has already +                         received during the current sync process. +        :type received: int +        """ +        ddoc_path = ['_design', 'syncs', '_view', 'changes_to_return'] +        resource = self._db._database.resource(*ddoc_path) +        response = resource.get_json( +            key=self._key( +                [self._source_replica_uid, received])) +        data = response[2] +        if len(data['rows']) == 0: +            return None, None, None +        value = data['rows'][0]['value'] +        gen = value['gen'] +        trans_id = value['trans_id'] +        next_change_to_return = value['next_change_to_return'] +        return gen, trans_id, tuple(next_change_to_return) + + +class SyncExchange(sync.SyncExchange): + +    def __init__(self, db, source_replica_uid, last_known_generation): +        """ +        :param db: The target syncing database. +        :type db: CouchDatabase +        :param source_replica_uid: The uid of the source syncing replica. +        :type source_replica_uid: str +        :param last_known_generation: The last target replica generation the +                                      source replica knows about. +        :type last_known_generation: int +        """ +        self._db = db +        self.source_replica_uid = source_replica_uid +        self.source_last_known_generation = last_known_generation +        self.new_gen = None +        self.new_trans_id = None +        self._trace_hook = None +        # recover sync state +        self._sync_state = ServerSyncState(self._db, self.source_replica_uid) +        # for tests +        #self._incoming_trace = [] +        #if hasattr(self._db, '_incoming_trace'): +        #    self._incoming_trace = self._db._incoming_trace +        #self._db._last_exchange_log = { +        #    'receive': {'docs': self._incoming_trace}, +        #    'return': None +        #    } + + +    def find_changes_to_return(self, received): +        """ +        Find changes to return. + +        Find changes since last_known_generation in db generation +        order using whats_changed. It excludes documents ids that have +        already been considered (superseded by the sender, etc). + +        :param received: How many documents the source replica has already +                         received during the current sync process. +        :type received: int + +        :return: the generation of this database, which the caller can +                 consider themselves to be synchronized after processing +                 allreturned documents, and the amount of documents to be sent +                 to the source syncing replica. +        :rtype: int +        """ +        if hasattr(self._db, '_last_exchange_log'): +            self._db._last_exchange_log['receive'].update({  # for tests +                'last_known_gen': self.source_last_known_generation +            }) +        # check if changes to return have already been calculated +        new_gen, new_trans_id, number_of_changes = self._sync_state.sync_info() +        if number_of_changes is None: +            self._trace('before whats_changed') +            new_gen, new_trans_id, changes = self._db.whats_changed( +                self.source_last_known_generation) +            self._trace('after whats_changed') +            seen_ids = self._sync_state.seen_ids() +            # changed docs that weren't superseded by or converged with +            changes_to_return = [ +                (doc_id, gen, trans_id) for (doc_id, gen, trans_id) in changes +                # there was a subsequent update +                if doc_id not in seen_ids or seen_ids.get(doc_id) < gen] +            self._sync_state.put_changes_to_return( +                new_gen, new_trans_id, changes_to_return) +            number_of_changes = len(changes_to_return) +        # query server for stored changes +        _, _, next_change_to_return = \ +            self._sync_state.next_change_to_return(received) +        self.new_gen = new_gen +        self.new_trans_id = new_trans_id +        # and append one change +        self.changes_to_return = [] +        if next_change_to_return is not None: +            self.changes_to_return.append(next_change_to_return) +        return self.new_gen, number_of_changes + +    def return_one_doc(self, return_doc_cb): +        """ +        Return one changed document and its last change generation to the +        source syncing replica by invoking the callback return_doc_cb. + +        This is called once for each document to be transferred from target to +        source. + +        :param return_doc_cb: is a callback used to return the documents with +                              their last change generation to the target +                              replica. +        :type return_doc_cb: callable(doc, gen, trans_id) +        """ +        changes_to_return = self.changes_to_return +        # return docs, including conflicts +        changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return] +        self._trace('before get_docs') +        docs = self._db.get_docs( +            changed_doc_ids, check_for_conflicts=False, include_deleted=True) + +        docs_by_gen = izip( +            docs, (gen for _, gen, _ in changes_to_return), +            (trans_id for _, _, trans_id in changes_to_return)) +        for doc, gen, trans_id in docs_by_gen: +            return_doc_cb(doc, gen, trans_id) +            # for tests +            if hasattr(self._db, '_outgoing_trace'): +                self._db._outgoing_trace.append((doc.doc_id, doc.rev)) +        # for tests +        if hasattr(self._db, '_outgoing_trace'): +            self._db._last_exchange_log['return'] = { +                'docs': self._db._outgoing_trace, +                'last_gen': self.new_gen +            } + +    def insert_doc_from_source(self, doc, source_gen, trans_id): +        """Try to insert synced document from source. + +        Conflicting documents are not inserted but will be sent over +        to the sync source. + +        It keeps track of progress by storing the document source +        generation as well. + +        The 1st step of a sync exchange is to call this repeatedly to +        try insert all incoming documents from the source. + +        :param doc: A Document object. +        :type doc: Document +        :param source_gen: The source generation of doc. +        :type source_gen: int +        :param trans_id: The transaction id of that document change. +        :type trans_id: str +        """ +        state, at_gen = self._db._put_doc_if_newer( +            doc, save_conflict=False, replica_uid=self.source_replica_uid, +            replica_gen=source_gen, replica_trans_id=trans_id) +        if state == 'inserted': +            self._sync_state.put_seen_id(doc.doc_id, at_gen) +        elif state == 'converged': +            # magical convergence +            self._sync_state.put_seen_id(doc.doc_id, at_gen) +        elif state == 'superseded': +            # we have something newer that we will return +            pass +        else: +            # conflict that we will returne +            assert state == 'conflicted' +        # for tests +        if hasattr(self._db, '_incoming_trace') \ +                and hasattr(self._db, '_last_exchange_log'): +            self._db._incoming_trace.append((doc.doc_id, doc.rev)) +            self._db._last_exchange_log['receive'].update({ +                'source_uid': self.source_replica_uid, +                'source_gen': source_gen +            }) + + +class SyncResource(http_app.SyncResource): + +    max_request_size = MAX_REQUEST_SIZE * 1024 * 1024 +    max_entry_size = MAX_ENTRY_SIZE * 1024 * 1024 + +    sync_exchange_class = SyncExchange + +    @http_app.http_method( +        last_known_generation=int, last_known_trans_id=http_app.none_or_str, +        content_as_args=True) +    def post_args(self, last_known_generation, last_known_trans_id=None, +                  ensure=False): +        """ +        Handle the initial arguments for the sync POST request from client. + +        :param last_known_generation: The last server replica generation the +                                      client knows about. +        :type last_known_generation: int +        :param last_known_trans_id: The last server replica transaction_id the +                                    client knows about. +        :type last_known_trans_id: str +        :param ensure: Wether the server replica should be created if it does +                       not already exist. +        :type ensure: bool +        """ +        # create or open the database +        if ensure: +            db, self.replica_uid = self.state.ensure_database(self.dbname) +        else: +            db = self.state.open_database(self.dbname) +        # validate the information the client has about server replica +        db.validate_gen_and_trans_id( +            last_known_generation, last_known_trans_id) +        # get a sync exchange object +        self.sync_exch = self.sync_exchange_class( +            db, self.source_replica_uid, last_known_generation) + +    @http_app.http_method(content_as_args=True) +    def post_put(self, id, rev, content, gen, trans_id): +        """ +        Put one incoming document into the server replica. + +        :param id: The id of the incoming document. +        :type id: str +        :param rev: The revision of the incoming document. +        :type rev: str +        :param content: The content of the incoming document. +        :type content: dict +        :param gen: The source replica generation corresponding to the +                    revision of the incoming document. +        :type gen: int +        :param trans_id: The source replica transaction id corresponding to +                         the revision of the incoming document. +        :type trans_id: str +        """ +        doc = Document(id, rev, content) +        self.sync_exch.insert_doc_from_source(doc, gen, trans_id) + +    @http_app.http_method(received=int, content_as_args=True) +    def post_get(self, received): +        """ +        Return one syncing document to the client. + +        :param received: How many documents have already been received by the +                         client on the current sync session. +        :type received: int +        """ + +        def send_doc(doc, gen, trans_id): +            entry = dict(id=doc.doc_id, rev=doc.rev, content=doc.get_json(), +                         gen=gen, trans_id=trans_id) +            self.responder.stream_entry(entry) + +        new_gen, number_of_changes = \ +            self.sync_exch.find_changes_to_return(received) +        self.responder.content_type = 'application/x-u1db-sync-response' +        self.responder.start_response(200) +        self.responder.start_stream(), +        header = { +            "new_generation": new_gen, +            "new_transaction_id": self.sync_exch.new_trans_id, +            "number_of_changes": number_of_changes, +        } +        if self.replica_uid is not None: +            header['replica_uid'] = self.replica_uid +        self.responder.stream_entry(header) +        self.sync_exch.return_one_doc(send_doc) +        self.responder.end_stream() +        self.responder.finish_response() + +    def post_end(self): +        """ +        Return the current generation and transaction_id after inserting a +        series of incoming documents. +        """ +        self.responder.content_type = 'application/x-soledad-sync-response' +        self.responder.start_response(200) +        self.responder.start_stream(), +        new_gen, new_trans_id = self.sync_exch._db._get_generation_info() +        header = { +            "new_generation": new_gen, +            "new_transaction_id": new_trans_id, +        } +        if self.replica_uid is not None: +            header['replica_uid'] = self.replica_uid +        self.responder.stream_entry(header) +        self.responder.end_stream() +        self.responder.finish_response() | 
