summaryrefslogtreecommitdiff
path: root/server/src
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2014-04-17 16:15:04 -0300
committerdrebs <drebs@leap.se>2014-05-22 18:43:48 -0300
commit24465b7b2cd77b66f637e22453dd24a2d67c4ce6 (patch)
treef1e3ed62ed39a700979f8222020fd0bb5b70014a /server/src
parentf3abf619ddd6be9dee7ed5807b967e06a6d7ef93 (diff)
Split sync in multiple POST requests in server (#5571).
Diffstat (limited to 'server/src')
-rw-r--r--server/src/leap/soledad/server/__init__.py137
-rw-r--r--server/src/leap/soledad/server/sync.py462
2 files changed, 585 insertions, 14 deletions
diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py
index c170f230..573afdd6 100644
--- a/server/src/leap/soledad/server/__init__.py
+++ b/server/src/leap/soledad/server/__init__.py
@@ -87,8 +87,10 @@ and lock documents on the shared database is handled by
"""
import configparser
+import urlparse
+import sys
-from u1db.remote import http_app
+from u1db.remote import http_app, utils
# Keep OpenSSL's tsafe before importing Twisted submodules so we can put
# it back if Twisted==12.0.0 messes with it.
@@ -99,24 +101,24 @@ from twisted import version
if version.base() == "12.0.0":
# Put OpenSSL's tsafe back into place. This can probably be removed if we
# come to use Twisted>=12.3.0.
- import sys
sys.modules['OpenSSL.tsafe'] = old_tsafe
from leap.soledad.server.auth import SoledadTokenAuthMiddleware
from leap.soledad.server.gzip_middleware import GzipMiddleware
from leap.soledad.server.lock_resource import LockResource
+from leap.soledad.server.sync import (
+ SyncResource,
+ MAX_REQUEST_SIZE,
+ MAX_ENTRY_SIZE,
+)
from leap.soledad.common import SHARED_DB_NAME
from leap.soledad.common.couch import CouchServerState
-#-----------------------------------------------------------------------------
+# ----------------------------------------------------------------------------
# Soledad WSGI application
-#-----------------------------------------------------------------------------
-
-MAX_REQUEST_SIZE = 200 # in Mb
-MAX_ENTRY_SIZE = 200 # in Mb
-
+# ----------------------------------------------------------------------------
class SoledadApp(http_app.HTTPApp):
"""
@@ -147,14 +149,121 @@ class SoledadApp(http_app.HTTPApp):
return http_app.HTTPApp.__call__(self, environ, start_response)
+# ----------------------------------------------------------------------------
+# WSGI resources registration
+# ----------------------------------------------------------------------------
+
+# monkey patch u1db with a new resource map
+http_app.url_to_resource = http_app.URLToResource()
+
+# register u1db unmodified resources
+http_app.url_to_resource.register(http_app.GlobalResource)
+http_app.url_to_resource.register(http_app.DatabaseResource)
+http_app.url_to_resource.register(http_app.DocsResource)
+http_app.url_to_resource.register(http_app.DocResource)
+
+# register Soledad's new or modified resources
http_app.url_to_resource.register(LockResource)
-http_app.SyncResource.max_request_size = MAX_REQUEST_SIZE * 1024 * 1024
-http_app.SyncResource.max_entry_size = MAX_ENTRY_SIZE * 1024 * 1024
+http_app.url_to_resource.register(SyncResource)
+
+# ----------------------------------------------------------------------------
+# Modified HTTP method invocation (to account for splitted sync)
+# ----------------------------------------------------------------------------
-#-----------------------------------------------------------------------------
+class HTTPInvocationByMethodWithBody(
+ http_app.HTTPInvocationByMethodWithBody):
+ """
+ Invoke methods on a resource.
+ """
+
+ def __call__(self):
+ """
+ Call an HTTP method of a resource.
+
+ This method was rewritten to allow for a sync flow which uses one POST
+ request for each transferred document (back and forth).
+
+ Usual U1DB sync process transfers all documents from client to server
+ and back in only one POST request. This is inconvenient for some
+ reasons, as lack of possibility of gracefully interrupting the sync
+ process, and possible timeouts for when dealing with large documents
+ that have to be retrieved and encrypted/decrypted. Because of those,
+ we split the sync process into many POST requests.
+ """
+ args = urlparse.parse_qsl(self.environ['QUERY_STRING'],
+ strict_parsing=False)
+ try:
+ args = dict(
+ (k.decode('utf-8'), v.decode('utf-8')) for k, v in args)
+ except ValueError:
+ raise http_app.BadRequest()
+ method = self.environ['REQUEST_METHOD'].lower()
+ if method in ('get', 'delete'):
+ meth = self._lookup(method)
+ return meth(args, None)
+ else:
+ # we expect content-length > 0, reconsider if we move
+ # to support chunked enconding
+ try:
+ content_length = int(self.environ['CONTENT_LENGTH'])
+ except (ValueError, KeyError):
+ raise http_app.BadRequest
+ if content_length <= 0:
+ raise http_app.BadRequest
+ if content_length > self.max_request_size:
+ raise http_app.BadRequest
+ reader = http_app._FencedReader(
+ self.environ['wsgi.input'], content_length,
+ self.max_entry_size)
+ content_type = self.environ.get('CONTENT_TYPE')
+ if content_type == 'application/json':
+ meth = self._lookup(method)
+ body = reader.read_chunk(sys.maxint)
+ return meth(args, body)
+ elif content_type.startswith('application/x-soledad-sync'):
+ # read one line and validate it
+ body_getline = reader.getline
+ if body_getline().strip() != '[':
+ raise http_app.BadRequest()
+ line = body_getline()
+ line, comma = utils.check_and_strip_comma(line.strip())
+ meth_args = self._lookup('%s_args' % method)
+ meth_args(args, line)
+ # handle incoming documents
+ if content_type == 'application/x-soledad-sync-put':
+ meth_put = self._lookup('%s_put' % method)
+ meth_end = self._lookup('%s_end' % method)
+ while True:
+ line = body_getline()
+ entry = line.strip()
+ if entry == ']': # end of incoming document stream
+ break
+ if not entry or not comma: # empty or no prec comma
+ raise http_app.BadRequest
+ entry, comma = utils.check_and_strip_comma(entry)
+ meth_put({}, entry)
+ if comma or body_getline(): # extra comma or data
+ raise http_app.BadRequest
+ return meth_end()
+ # handle outgoing documents
+ elif content_type == 'application/x-soledad-sync-get':
+ line = body_getline()
+ entry = line.strip()
+ meth_get = self._lookup('%s_get' % method)
+ return meth_get({}, line)
+ else:
+ raise http_app.BadRequest()
+ else:
+ raise http_app.BadRequest()
+
+
+http_app.HTTPInvocationByMethodWithBody = HTTPInvocationByMethodWithBody
+
+
+# ----------------------------------------------------------------------------
# Auxiliary functions
-#-----------------------------------------------------------------------------
+# ----------------------------------------------------------------------------
def load_configuration(file_path):
"""
@@ -180,9 +289,9 @@ def load_configuration(file_path):
return conf
-#-----------------------------------------------------------------------------
+# ----------------------------------------------------------------------------
# Run as Twisted WSGI Resource
-#-----------------------------------------------------------------------------
+# ----------------------------------------------------------------------------
def application(environ, start_response):
conf = load_configuration('/etc/leap/soledad-server.conf')
diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py
new file mode 100644
index 00000000..3b8b69fb
--- /dev/null
+++ b/server/src/leap/soledad/server/sync.py
@@ -0,0 +1,462 @@
+# -*- coding: utf-8 -*-
+# sync.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+Server side synchronization infrastructure.
+"""
+
+import json
+
+
+from leap.soledad.common.couch import CouchDatabase
+from itertools import izip
+from u1db import sync, Document
+from u1db.remote import http_app
+
+
+MAX_REQUEST_SIZE = 200 # in Mb
+MAX_ENTRY_SIZE = 200 # in Mb
+
+
+class ServerSyncState(object):
+ """
+ The state of one sync session, as stored on backend server.
+
+ This object performes queries to distinct design documents:
+
+ _design/syncs/_update/state
+ _design/syncs/_view/state
+ _design/syncs/_view/seen_ids
+ _design/syncs/_view/changes_to_return
+
+ On server side, the ongoing syncs metadata is maintained in a document
+ called 'u1db_sync_state'.
+ """
+
+ def __init__(self, db, source_replica_uid):
+ """
+ Initialize the sync state object.
+
+ :param db: The target syncing database.
+ :type db: CouchDatabase.
+ :param source_replica_uid: CouchDatabase
+ :type source_replica_uid: str
+ """
+ self._db = db
+ self._source_replica_uid = source_replica_uid
+
+ def _key(self, key):
+ """
+ Format a key to be used on couch views.
+
+ :param key: The lookup key.
+ :type key: json serializable object
+
+ :return: The properly formatted key.
+ :rtype: str
+ """
+ return json.dumps(key, separators=(',', ':'))
+
+ def _put_info(self, key, value):
+ """
+ Put some information on the sync state document.
+
+ This method works in conjunction with the
+ _design/syncs/_update/state update handler couch backend.
+
+ :param key: The key for the info to be put.
+ :type key: str
+ :param value: The value for the info to be put.
+ :type value: str
+ """
+ ddoc_path = [
+ '_design', 'syncs', '_update', 'state',
+ 'u1db_sync_state']
+ res = self._db._database.resource(*ddoc_path)
+ with CouchDatabase.sync_info_lock[self._db.replica_uid]:
+ res.put_json(
+ body={
+ 'source_replica_uid': self._source_replica_uid,
+ key: value,
+ },
+ headers={'content-type': 'application/json'})
+
+ def put_seen_id(self, seen_id, gen):
+ """
+ Put one seen id on the sync state document.
+
+ :param seen_id: The doc_id of a document seen during sync.
+ :type seen_id: str
+ :param gen: The corresponding db generation for that document.
+ :type gen: int
+ """
+ self._put_info(
+ 'seen_id',
+ [seen_id, gen])
+
+ def seen_ids(self):
+ """
+ Return all document ids seen during the sync.
+
+ :return: A list with doc ids seen during the sync.
+ :rtype: list
+ """
+ ddoc_path = ['_design', 'syncs', '_view', 'seen_ids']
+ resource = self._db._database.resource(*ddoc_path)
+ response = resource.get_json(key=self._key(self._source_replica_uid))
+ data = response[2]
+ if len(data['rows']) > 0:
+ entry = data['rows'].pop()
+ return entry['value']['seen_ids']
+ return []
+
+ def put_changes_to_return(self, gen, trans_id, changes_to_return):
+ """
+ Put the calculated changes to return in the backend sync state
+ document.
+
+ :param gen: The target database generation that will be synced.
+ :type gen: int
+ :param trans_id: The target database transaction id that will be
+ synced.
+ :type trans_id: str
+ :param changes_to_return: A list of tuples with the changes to be
+ returned during the sync process.
+ :type changes_to_return: list
+ """
+ self._put_info(
+ 'changes_to_return',
+ {
+ 'gen': gen,
+ 'trans_id': trans_id,
+ 'changes_to_return': changes_to_return,
+ }
+ )
+
+ def sync_info(self):
+ """
+ Return information about the current sync state.
+
+ :return: The generation and transaction id of the target database
+ which will be synced, and the number of documents do return,
+ or a tuple of Nones if those have not already been sent to
+ server.
+ :rtype: tuple
+ """
+ ddoc_path = ['_design', 'syncs', '_view', 'state']
+ resource = self._db._database.resource(*ddoc_path)
+ response = resource.get_json(key=self._key(self._source_replica_uid))
+ data = response[2]
+ gen = None
+ trans_id = None
+ number_of_changes = None
+ if len(data['rows']) > 0 and data['rows'][0]['value'] is not None:
+ value = data['rows'][0]['value']
+ gen = value['gen']
+ trans_id = value['trans_id']
+ number_of_changes = value['number_of_changes']
+ return gen, trans_id, number_of_changes
+
+ def next_change_to_return(self, received):
+ """
+ Return the next change to be returned to the source syncing replica.
+
+ :param received: How many documents the source replica has already
+ received during the current sync process.
+ :type received: int
+ """
+ ddoc_path = ['_design', 'syncs', '_view', 'changes_to_return']
+ resource = self._db._database.resource(*ddoc_path)
+ response = resource.get_json(
+ key=self._key(
+ [self._source_replica_uid, received]))
+ data = response[2]
+ if len(data['rows']) == 0:
+ return None, None, None
+ value = data['rows'][0]['value']
+ gen = value['gen']
+ trans_id = value['trans_id']
+ next_change_to_return = value['next_change_to_return']
+ return gen, trans_id, tuple(next_change_to_return)
+
+
+class SyncExchange(sync.SyncExchange):
+
+ def __init__(self, db, source_replica_uid, last_known_generation):
+ """
+ :param db: The target syncing database.
+ :type db: CouchDatabase
+ :param source_replica_uid: The uid of the source syncing replica.
+ :type source_replica_uid: str
+ :param last_known_generation: The last target replica generation the
+ source replica knows about.
+ :type last_known_generation: int
+ """
+ self._db = db
+ self.source_replica_uid = source_replica_uid
+ self.source_last_known_generation = last_known_generation
+ self.new_gen = None
+ self.new_trans_id = None
+ self._trace_hook = None
+ # recover sync state
+ self._sync_state = ServerSyncState(self._db, self.source_replica_uid)
+ # for tests
+ #self._incoming_trace = []
+ #if hasattr(self._db, '_incoming_trace'):
+ # self._incoming_trace = self._db._incoming_trace
+ #self._db._last_exchange_log = {
+ # 'receive': {'docs': self._incoming_trace},
+ # 'return': None
+ # }
+
+
+ def find_changes_to_return(self, received):
+ """
+ Find changes to return.
+
+ Find changes since last_known_generation in db generation
+ order using whats_changed. It excludes documents ids that have
+ already been considered (superseded by the sender, etc).
+
+ :param received: How many documents the source replica has already
+ received during the current sync process.
+ :type received: int
+
+ :return: the generation of this database, which the caller can
+ consider themselves to be synchronized after processing
+ allreturned documents, and the amount of documents to be sent
+ to the source syncing replica.
+ :rtype: int
+ """
+ if hasattr(self._db, '_last_exchange_log'):
+ self._db._last_exchange_log['receive'].update({ # for tests
+ 'last_known_gen': self.source_last_known_generation
+ })
+ # check if changes to return have already been calculated
+ new_gen, new_trans_id, number_of_changes = self._sync_state.sync_info()
+ if number_of_changes is None:
+ self._trace('before whats_changed')
+ new_gen, new_trans_id, changes = self._db.whats_changed(
+ self.source_last_known_generation)
+ self._trace('after whats_changed')
+ seen_ids = self._sync_state.seen_ids()
+ # changed docs that weren't superseded by or converged with
+ changes_to_return = [
+ (doc_id, gen, trans_id) for (doc_id, gen, trans_id) in changes
+ # there was a subsequent update
+ if doc_id not in seen_ids or seen_ids.get(doc_id) < gen]
+ self._sync_state.put_changes_to_return(
+ new_gen, new_trans_id, changes_to_return)
+ number_of_changes = len(changes_to_return)
+ # query server for stored changes
+ _, _, next_change_to_return = \
+ self._sync_state.next_change_to_return(received)
+ self.new_gen = new_gen
+ self.new_trans_id = new_trans_id
+ # and append one change
+ self.changes_to_return = []
+ if next_change_to_return is not None:
+ self.changes_to_return.append(next_change_to_return)
+ return self.new_gen, number_of_changes
+
+ def return_one_doc(self, return_doc_cb):
+ """
+ Return one changed document and its last change generation to the
+ source syncing replica by invoking the callback return_doc_cb.
+
+ This is called once for each document to be transferred from target to
+ source.
+
+ :param return_doc_cb: is a callback used to return the documents with
+ their last change generation to the target
+ replica.
+ :type return_doc_cb: callable(doc, gen, trans_id)
+ """
+ changes_to_return = self.changes_to_return
+ # return docs, including conflicts
+ changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return]
+ self._trace('before get_docs')
+ docs = self._db.get_docs(
+ changed_doc_ids, check_for_conflicts=False, include_deleted=True)
+
+ docs_by_gen = izip(
+ docs, (gen for _, gen, _ in changes_to_return),
+ (trans_id for _, _, trans_id in changes_to_return))
+ for doc, gen, trans_id in docs_by_gen:
+ return_doc_cb(doc, gen, trans_id)
+ # for tests
+ if hasattr(self._db, '_outgoing_trace'):
+ self._db._outgoing_trace.append((doc.doc_id, doc.rev))
+ # for tests
+ if hasattr(self._db, '_outgoing_trace'):
+ self._db._last_exchange_log['return'] = {
+ 'docs': self._db._outgoing_trace,
+ 'last_gen': self.new_gen
+ }
+
+ def insert_doc_from_source(self, doc, source_gen, trans_id):
+ """Try to insert synced document from source.
+
+ Conflicting documents are not inserted but will be sent over
+ to the sync source.
+
+ It keeps track of progress by storing the document source
+ generation as well.
+
+ The 1st step of a sync exchange is to call this repeatedly to
+ try insert all incoming documents from the source.
+
+ :param doc: A Document object.
+ :type doc: Document
+ :param source_gen: The source generation of doc.
+ :type source_gen: int
+ :param trans_id: The transaction id of that document change.
+ :type trans_id: str
+ """
+ state, at_gen = self._db._put_doc_if_newer(
+ doc, save_conflict=False, replica_uid=self.source_replica_uid,
+ replica_gen=source_gen, replica_trans_id=trans_id)
+ if state == 'inserted':
+ self._sync_state.put_seen_id(doc.doc_id, at_gen)
+ elif state == 'converged':
+ # magical convergence
+ self._sync_state.put_seen_id(doc.doc_id, at_gen)
+ elif state == 'superseded':
+ # we have something newer that we will return
+ pass
+ else:
+ # conflict that we will returne
+ assert state == 'conflicted'
+ # for tests
+ if hasattr(self._db, '_incoming_trace') \
+ and hasattr(self._db, '_last_exchange_log'):
+ self._db._incoming_trace.append((doc.doc_id, doc.rev))
+ self._db._last_exchange_log['receive'].update({
+ 'source_uid': self.source_replica_uid,
+ 'source_gen': source_gen
+ })
+
+
+class SyncResource(http_app.SyncResource):
+
+ max_request_size = MAX_REQUEST_SIZE * 1024 * 1024
+ max_entry_size = MAX_ENTRY_SIZE * 1024 * 1024
+
+ sync_exchange_class = SyncExchange
+
+ @http_app.http_method(
+ last_known_generation=int, last_known_trans_id=http_app.none_or_str,
+ content_as_args=True)
+ def post_args(self, last_known_generation, last_known_trans_id=None,
+ ensure=False):
+ """
+ Handle the initial arguments for the sync POST request from client.
+
+ :param last_known_generation: The last server replica generation the
+ client knows about.
+ :type last_known_generation: int
+ :param last_known_trans_id: The last server replica transaction_id the
+ client knows about.
+ :type last_known_trans_id: str
+ :param ensure: Wether the server replica should be created if it does
+ not already exist.
+ :type ensure: bool
+ """
+ # create or open the database
+ if ensure:
+ db, self.replica_uid = self.state.ensure_database(self.dbname)
+ else:
+ db = self.state.open_database(self.dbname)
+ # validate the information the client has about server replica
+ db.validate_gen_and_trans_id(
+ last_known_generation, last_known_trans_id)
+ # get a sync exchange object
+ self.sync_exch = self.sync_exchange_class(
+ db, self.source_replica_uid, last_known_generation)
+
+ @http_app.http_method(content_as_args=True)
+ def post_put(self, id, rev, content, gen, trans_id):
+ """
+ Put one incoming document into the server replica.
+
+ :param id: The id of the incoming document.
+ :type id: str
+ :param rev: The revision of the incoming document.
+ :type rev: str
+ :param content: The content of the incoming document.
+ :type content: dict
+ :param gen: The source replica generation corresponding to the
+ revision of the incoming document.
+ :type gen: int
+ :param trans_id: The source replica transaction id corresponding to
+ the revision of the incoming document.
+ :type trans_id: str
+ """
+ doc = Document(id, rev, content)
+ self.sync_exch.insert_doc_from_source(doc, gen, trans_id)
+
+ @http_app.http_method(received=int, content_as_args=True)
+ def post_get(self, received):
+ """
+ Return one syncing document to the client.
+
+ :param received: How many documents have already been received by the
+ client on the current sync session.
+ :type received: int
+ """
+
+ def send_doc(doc, gen, trans_id):
+ entry = dict(id=doc.doc_id, rev=doc.rev, content=doc.get_json(),
+ gen=gen, trans_id=trans_id)
+ self.responder.stream_entry(entry)
+
+ new_gen, number_of_changes = \
+ self.sync_exch.find_changes_to_return(received)
+ self.responder.content_type = 'application/x-u1db-sync-response'
+ self.responder.start_response(200)
+ self.responder.start_stream(),
+ header = {
+ "new_generation": new_gen,
+ "new_transaction_id": self.sync_exch.new_trans_id,
+ "number_of_changes": number_of_changes,
+ }
+ if self.replica_uid is not None:
+ header['replica_uid'] = self.replica_uid
+ self.responder.stream_entry(header)
+ self.sync_exch.return_one_doc(send_doc)
+ self.responder.end_stream()
+ self.responder.finish_response()
+
+ def post_end(self):
+ """
+ Return the current generation and transaction_id after inserting a
+ series of incoming documents.
+ """
+ self.responder.content_type = 'application/x-soledad-sync-response'
+ self.responder.start_response(200)
+ self.responder.start_stream(),
+ new_gen, new_trans_id = self.sync_exch._db._get_generation_info()
+ header = {
+ "new_generation": new_gen,
+ "new_transaction_id": new_trans_id,
+ }
+ if self.replica_uid is not None:
+ header['replica_uid'] = self.replica_uid
+ self.responder.stream_entry(header)
+ self.responder.end_stream()
+ self.responder.finish_response()