diff options
Diffstat (limited to 'server/src')
-rw-r--r-- | server/src/leap/soledad/server/__init__.py | 17 | ||||
-rw-r--r-- | server/src/leap/soledad/server/config.py | 2 | ||||
-rw-r--r-- | server/src/leap/soledad/server/resource.py | 53 | ||||
-rw-r--r-- | server/src/leap/soledad/server/sync.py | 98 |
4 files changed, 118 insertions, 52 deletions
diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py index d8243c19..039bef75 100644 --- a/server/src/leap/soledad/server/__init__.py +++ b/server/src/leap/soledad/server/__init__.py @@ -193,7 +193,8 @@ class HTTPInvocationByMethodWithBody( try: content_length = int(self.environ['CONTENT_LENGTH']) except (ValueError, KeyError): - raise http_app.BadRequest + # raise http_app.BadRequest + content_length = self.max_request_size if content_length <= 0: raise http_app.BadRequest if content_length > self.max_request_size: @@ -219,27 +220,23 @@ class HTTPInvocationByMethodWithBody( if content_type == 'application/x-soledad-sync-put': meth_put = self._lookup('%s_put' % method) meth_end = self._lookup('%s_end' % method) - entries = [] while True: - line = body_getline() - entry = line.strip() + entry = body_getline().strip() if entry == ']': # end of incoming document stream break if not entry or not comma: # empty or no prec comma raise http_app.BadRequest entry, comma = utils.check_and_strip_comma(entry) - entries.append(entry) + content = body_getline().strip() + content, comma = utils.check_and_strip_comma(content) + meth_put({'content': content or None}, entry) if comma or body_getline(): # extra comma or data raise http_app.BadRequest - for entry in entries: - meth_put({}, entry) return meth_end() # handle outgoing documents elif content_type == 'application/x-soledad-sync-get': - line = body_getline() - entry = line.strip() meth_get = self._lookup('%s_get' % method) - return meth_get({}, line) + return meth_get() else: raise http_app.BadRequest() else: diff --git a/server/src/leap/soledad/server/config.py b/server/src/leap/soledad/server/config.py index 4a791cbe..3c17ec19 100644 --- a/server/src/leap/soledad/server/config.py +++ b/server/src/leap/soledad/server/config.py @@ -24,7 +24,7 @@ CONFIG_DEFAULTS = { 'couch_url': 'http://localhost:5984', 'create_cmd': None, 'admin_netrc': '/etc/couchdb/couchdb-admin.netrc', - 'batching': False + 'batching': True }, 'database-security': { 'members': ['soledad'], diff --git a/server/src/leap/soledad/server/resource.py b/server/src/leap/soledad/server/resource.py new file mode 100644 index 00000000..dbb91b0a --- /dev/null +++ b/server/src/leap/soledad/server/resource.py @@ -0,0 +1,53 @@ +# -*- coding: utf-8 -*- +# resource.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +A twisted resource that serves the Soledad Server. +""" + +from twisted.web.resource import Resource +from twisted.web.wsgi import WSGIResource +from twisted.internet import reactor +from twisted.python import threadpool + +from leap.soledad.server.application import wsgi_application + + +__all__ = ['SoledadResource'] + + +# setup a wsgi resource with its own threadpool +pool = threadpool.ThreadPool() +reactor.callWhenRunning(pool.start) +reactor.addSystemEventTrigger('after', 'shutdown', pool.stop) +wsgi_resource = WSGIResource(reactor, pool, wsgi_application) + + +class SoledadResource(Resource): + """ + This is a dummy twisted resource, used only to allow different entry points + for the Soledad Server. + """ + + def __init__(self): + self.children = {'': wsgi_resource} + + def getChild(self, path, request): + # for now, just "rewind" the path and serve the wsgi resource for all + # requests. In the future, we might look into the request path to + # decide which child resources should serve each request. + request.postpath.insert(0, request.prepath.pop()) + return self.children[''] diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index 3f5c4aba..b553a056 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -17,14 +17,19 @@ """ Server side synchronization infrastructure. """ -from leap.soledad.common.l2db import sync, Document +import time +from itertools import izip + +from leap.soledad.common.l2db import sync from leap.soledad.common.l2db.remote import http_app from leap.soledad.server.caching import get_cache_for from leap.soledad.server.state import ServerSyncState +from leap.soledad.common.document import ServerDocument -MAX_REQUEST_SIZE = 200 # in Mb +MAX_REQUEST_SIZE = float('inf') # It's a stream. MAX_ENTRY_SIZE = 200 # in Mb +ENTRY_CACHE_SIZE = 8192 * 1024 class SyncExchange(sync.SyncExchange): @@ -51,7 +56,7 @@ class SyncExchange(sync.SyncExchange): # recover sync state self._sync_state = ServerSyncState(self.source_replica_uid, sync_id) - def find_changes_to_return(self, received): + def find_changes_to_return(self): """ Find changes to return. @@ -59,10 +64,6 @@ class SyncExchange(sync.SyncExchange): order using whats_changed. It excludes documents ids that have already been considered (superseded by the sender, etc). - :param received: How many documents the source replica has already - received during the current sync process. - :type received: int - :return: the generation of this database, which the caller can consider themselves to be synchronized after processing allreturned documents, and the amount of documents to be sent @@ -78,41 +79,45 @@ class SyncExchange(sync.SyncExchange): self._trace('after whats_changed') seen_ids = self._sync_state.seen_ids() # changed docs that weren't superseded by or converged with - changes_to_return = [ + self.changes_to_return = [ (doc_id, gen, trans_id) for (doc_id, gen, trans_id) in changes # there was a subsequent update if doc_id not in seen_ids or seen_ids.get(doc_id) < gen] self._sync_state.put_changes_to_return( - new_gen, new_trans_id, changes_to_return) - number_of_changes = len(changes_to_return) - # query server for stored changes - _, _, next_change_to_return = \ - self._sync_state.next_change_to_return(received) + new_gen, new_trans_id, self.changes_to_return) + number_of_changes = len(self.changes_to_return) self.new_gen = new_gen self.new_trans_id = new_trans_id - # and append one change - self.change_to_return = next_change_to_return return self.new_gen, number_of_changes - def return_one_doc(self, return_doc_cb): - """ - Return one changed document and its last change generation to the - source syncing replica by invoking the callback return_doc_cb. + def return_docs(self, return_doc_cb): + """Return the changed documents and their last change generation + repeatedly invoking the callback return_doc_cb. - This is called once for each document to be transferred from target to - source. + The final step of a sync exchange. - :param return_doc_cb: is a callback used to return the documents with - their last change generation to the target - replica. - :type return_doc_cb: callable(doc, gen, trans_id) + :param: return_doc_cb(doc, gen, trans_id): is a callback + used to return the documents with their last change generation + to the target replica. + :return: None """ - if self.change_to_return is not None: - changed_doc_id, gen, trans_id = self.change_to_return - doc = self._db.get_doc(changed_doc_id, include_deleted=True) + changes_to_return = self.changes_to_return + # return docs, including conflicts. + # content as a file-object (will be read when writing) + changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return] + docs = self._db.get_docs( + changed_doc_ids, check_for_conflicts=False, + include_deleted=True, read_content=False) + + docs_by_gen = izip( + docs, (gen for _, gen, _ in changes_to_return), + (trans_id for _, _, trans_id in changes_to_return)) + for doc, gen, trans_id in docs_by_gen: return_doc_cb(doc, gen, trans_id) def batched_insert_from_source(self, entries, sync_id): + if not entries: + return self._db.batch_start() for entry in entries: doc, gen, trans_id, number_of_docs, doc_idx = entry @@ -207,6 +212,7 @@ class SyncResource(http_app.SyncResource): db, self.source_replica_uid, last_known_generation, sync_id) self._sync_id = sync_id self._staging = [] + self._staging_size = 0 @http_app.http_method(content_as_args=True) def post_put( @@ -233,26 +239,37 @@ class SyncResource(http_app.SyncResource): :param doc_idx: The index of the current document. :type doc_idx: int """ - doc = Document(id, rev, content) + doc = ServerDocument(id, rev, json=content) + self._staging_size += len(content or '') self._staging.append((doc, gen, trans_id, number_of_docs, doc_idx)) + if self._staging_size > ENTRY_CACHE_SIZE or doc_idx == number_of_docs: + self.sync_exch.batched_insert_from_source(self._staging, + self._sync_id) + self._staging = [] + self._staging_size = 0 - @http_app.http_method(received=int, content_as_args=True) - def post_get(self, received): + def post_get(self): """ - Return one syncing document to the client. - - :param received: How many documents have already been received by the - client on the current sync session. - :type received: int + Return syncing documents to the client. """ - def send_doc(doc, gen, trans_id): - entry = dict(id=doc.doc_id, rev=doc.rev, content=doc.get_json(), + entry = dict(id=doc.doc_id, rev=doc.rev, gen=gen, trans_id=trans_id) self.responder.stream_entry(entry) + content_reader = doc.get_json() + if content_reader: + content = content_reader.read() + self.responder.stream_entry(content) + content_reader.close() + # throttle at 5mb/s + # FIXME: twistd cant control througput + # we need to either use gunicorn or go async + time.sleep(len(content) / (5.0 * 1024 * 1024)) + else: + self.responder.stream_entry('') new_gen, number_of_changes = \ - self.sync_exch.find_changes_to_return(received) + self.sync_exch.find_changes_to_return() self.responder.content_type = 'application/x-u1db-sync-response' self.responder.start_response(200) self.responder.start_stream(), @@ -264,7 +281,7 @@ class SyncResource(http_app.SyncResource): if self.replica_uid is not None: header['replica_uid'] = self.replica_uid self.responder.stream_entry(header) - self.sync_exch.return_one_doc(send_doc) + self.sync_exch.return_docs(send_doc) self.responder.end_stream() self.responder.finish_response() @@ -273,7 +290,6 @@ class SyncResource(http_app.SyncResource): Return the current generation and transaction_id after inserting one incoming document. """ - self.sync_exch.batched_insert_from_source(self._staging, self._sync_id) self.responder.content_type = 'application/x-soledad-sync-response' self.responder.start_response(200) self.responder.start_stream(), |