summaryrefslogtreecommitdiff
path: root/server/src
diff options
context:
space:
mode:
Diffstat (limited to 'server/src')
-rw-r--r--server/src/leap/soledad/server/__init__.py17
-rw-r--r--server/src/leap/soledad/server/config.py2
-rw-r--r--server/src/leap/soledad/server/resource.py53
-rw-r--r--server/src/leap/soledad/server/sync.py98
4 files changed, 118 insertions, 52 deletions
diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py
index d8243c19..039bef75 100644
--- a/server/src/leap/soledad/server/__init__.py
+++ b/server/src/leap/soledad/server/__init__.py
@@ -193,7 +193,8 @@ class HTTPInvocationByMethodWithBody(
try:
content_length = int(self.environ['CONTENT_LENGTH'])
except (ValueError, KeyError):
- raise http_app.BadRequest
+ # raise http_app.BadRequest
+ content_length = self.max_request_size
if content_length <= 0:
raise http_app.BadRequest
if content_length > self.max_request_size:
@@ -219,27 +220,23 @@ class HTTPInvocationByMethodWithBody(
if content_type == 'application/x-soledad-sync-put':
meth_put = self._lookup('%s_put' % method)
meth_end = self._lookup('%s_end' % method)
- entries = []
while True:
- line = body_getline()
- entry = line.strip()
+ entry = body_getline().strip()
if entry == ']': # end of incoming document stream
break
if not entry or not comma: # empty or no prec comma
raise http_app.BadRequest
entry, comma = utils.check_and_strip_comma(entry)
- entries.append(entry)
+ content = body_getline().strip()
+ content, comma = utils.check_and_strip_comma(content)
+ meth_put({'content': content or None}, entry)
if comma or body_getline(): # extra comma or data
raise http_app.BadRequest
- for entry in entries:
- meth_put({}, entry)
return meth_end()
# handle outgoing documents
elif content_type == 'application/x-soledad-sync-get':
- line = body_getline()
- entry = line.strip()
meth_get = self._lookup('%s_get' % method)
- return meth_get({}, line)
+ return meth_get()
else:
raise http_app.BadRequest()
else:
diff --git a/server/src/leap/soledad/server/config.py b/server/src/leap/soledad/server/config.py
index 4a791cbe..3c17ec19 100644
--- a/server/src/leap/soledad/server/config.py
+++ b/server/src/leap/soledad/server/config.py
@@ -24,7 +24,7 @@ CONFIG_DEFAULTS = {
'couch_url': 'http://localhost:5984',
'create_cmd': None,
'admin_netrc': '/etc/couchdb/couchdb-admin.netrc',
- 'batching': False
+ 'batching': True
},
'database-security': {
'members': ['soledad'],
diff --git a/server/src/leap/soledad/server/resource.py b/server/src/leap/soledad/server/resource.py
new file mode 100644
index 00000000..dbb91b0a
--- /dev/null
+++ b/server/src/leap/soledad/server/resource.py
@@ -0,0 +1,53 @@
+# -*- coding: utf-8 -*-
+# resource.py
+# Copyright (C) 2016 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+A twisted resource that serves the Soledad Server.
+"""
+
+from twisted.web.resource import Resource
+from twisted.web.wsgi import WSGIResource
+from twisted.internet import reactor
+from twisted.python import threadpool
+
+from leap.soledad.server.application import wsgi_application
+
+
+__all__ = ['SoledadResource']
+
+
+# setup a wsgi resource with its own threadpool
+pool = threadpool.ThreadPool()
+reactor.callWhenRunning(pool.start)
+reactor.addSystemEventTrigger('after', 'shutdown', pool.stop)
+wsgi_resource = WSGIResource(reactor, pool, wsgi_application)
+
+
+class SoledadResource(Resource):
+ """
+ This is a dummy twisted resource, used only to allow different entry points
+ for the Soledad Server.
+ """
+
+ def __init__(self):
+ self.children = {'': wsgi_resource}
+
+ def getChild(self, path, request):
+ # for now, just "rewind" the path and serve the wsgi resource for all
+ # requests. In the future, we might look into the request path to
+ # decide which child resources should serve each request.
+ request.postpath.insert(0, request.prepath.pop())
+ return self.children['']
diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py
index 3f5c4aba..b553a056 100644
--- a/server/src/leap/soledad/server/sync.py
+++ b/server/src/leap/soledad/server/sync.py
@@ -17,14 +17,19 @@
"""
Server side synchronization infrastructure.
"""
-from leap.soledad.common.l2db import sync, Document
+import time
+from itertools import izip
+
+from leap.soledad.common.l2db import sync
from leap.soledad.common.l2db.remote import http_app
from leap.soledad.server.caching import get_cache_for
from leap.soledad.server.state import ServerSyncState
+from leap.soledad.common.document import ServerDocument
-MAX_REQUEST_SIZE = 200 # in Mb
+MAX_REQUEST_SIZE = float('inf') # It's a stream.
MAX_ENTRY_SIZE = 200 # in Mb
+ENTRY_CACHE_SIZE = 8192 * 1024
class SyncExchange(sync.SyncExchange):
@@ -51,7 +56,7 @@ class SyncExchange(sync.SyncExchange):
# recover sync state
self._sync_state = ServerSyncState(self.source_replica_uid, sync_id)
- def find_changes_to_return(self, received):
+ def find_changes_to_return(self):
"""
Find changes to return.
@@ -59,10 +64,6 @@ class SyncExchange(sync.SyncExchange):
order using whats_changed. It excludes documents ids that have
already been considered (superseded by the sender, etc).
- :param received: How many documents the source replica has already
- received during the current sync process.
- :type received: int
-
:return: the generation of this database, which the caller can
consider themselves to be synchronized after processing
allreturned documents, and the amount of documents to be sent
@@ -78,41 +79,45 @@ class SyncExchange(sync.SyncExchange):
self._trace('after whats_changed')
seen_ids = self._sync_state.seen_ids()
# changed docs that weren't superseded by or converged with
- changes_to_return = [
+ self.changes_to_return = [
(doc_id, gen, trans_id) for (doc_id, gen, trans_id) in changes
# there was a subsequent update
if doc_id not in seen_ids or seen_ids.get(doc_id) < gen]
self._sync_state.put_changes_to_return(
- new_gen, new_trans_id, changes_to_return)
- number_of_changes = len(changes_to_return)
- # query server for stored changes
- _, _, next_change_to_return = \
- self._sync_state.next_change_to_return(received)
+ new_gen, new_trans_id, self.changes_to_return)
+ number_of_changes = len(self.changes_to_return)
self.new_gen = new_gen
self.new_trans_id = new_trans_id
- # and append one change
- self.change_to_return = next_change_to_return
return self.new_gen, number_of_changes
- def return_one_doc(self, return_doc_cb):
- """
- Return one changed document and its last change generation to the
- source syncing replica by invoking the callback return_doc_cb.
+ def return_docs(self, return_doc_cb):
+ """Return the changed documents and their last change generation
+ repeatedly invoking the callback return_doc_cb.
- This is called once for each document to be transferred from target to
- source.
+ The final step of a sync exchange.
- :param return_doc_cb: is a callback used to return the documents with
- their last change generation to the target
- replica.
- :type return_doc_cb: callable(doc, gen, trans_id)
+ :param: return_doc_cb(doc, gen, trans_id): is a callback
+ used to return the documents with their last change generation
+ to the target replica.
+ :return: None
"""
- if self.change_to_return is not None:
- changed_doc_id, gen, trans_id = self.change_to_return
- doc = self._db.get_doc(changed_doc_id, include_deleted=True)
+ changes_to_return = self.changes_to_return
+ # return docs, including conflicts.
+ # content as a file-object (will be read when writing)
+ changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return]
+ docs = self._db.get_docs(
+ changed_doc_ids, check_for_conflicts=False,
+ include_deleted=True, read_content=False)
+
+ docs_by_gen = izip(
+ docs, (gen for _, gen, _ in changes_to_return),
+ (trans_id for _, _, trans_id in changes_to_return))
+ for doc, gen, trans_id in docs_by_gen:
return_doc_cb(doc, gen, trans_id)
def batched_insert_from_source(self, entries, sync_id):
+ if not entries:
+ return
self._db.batch_start()
for entry in entries:
doc, gen, trans_id, number_of_docs, doc_idx = entry
@@ -207,6 +212,7 @@ class SyncResource(http_app.SyncResource):
db, self.source_replica_uid, last_known_generation, sync_id)
self._sync_id = sync_id
self._staging = []
+ self._staging_size = 0
@http_app.http_method(content_as_args=True)
def post_put(
@@ -233,26 +239,37 @@ class SyncResource(http_app.SyncResource):
:param doc_idx: The index of the current document.
:type doc_idx: int
"""
- doc = Document(id, rev, content)
+ doc = ServerDocument(id, rev, json=content)
+ self._staging_size += len(content or '')
self._staging.append((doc, gen, trans_id, number_of_docs, doc_idx))
+ if self._staging_size > ENTRY_CACHE_SIZE or doc_idx == number_of_docs:
+ self.sync_exch.batched_insert_from_source(self._staging,
+ self._sync_id)
+ self._staging = []
+ self._staging_size = 0
- @http_app.http_method(received=int, content_as_args=True)
- def post_get(self, received):
+ def post_get(self):
"""
- Return one syncing document to the client.
-
- :param received: How many documents have already been received by the
- client on the current sync session.
- :type received: int
+ Return syncing documents to the client.
"""
-
def send_doc(doc, gen, trans_id):
- entry = dict(id=doc.doc_id, rev=doc.rev, content=doc.get_json(),
+ entry = dict(id=doc.doc_id, rev=doc.rev,
gen=gen, trans_id=trans_id)
self.responder.stream_entry(entry)
+ content_reader = doc.get_json()
+ if content_reader:
+ content = content_reader.read()
+ self.responder.stream_entry(content)
+ content_reader.close()
+ # throttle at 5mb/s
+ # FIXME: twistd cant control througput
+ # we need to either use gunicorn or go async
+ time.sleep(len(content) / (5.0 * 1024 * 1024))
+ else:
+ self.responder.stream_entry('')
new_gen, number_of_changes = \
- self.sync_exch.find_changes_to_return(received)
+ self.sync_exch.find_changes_to_return()
self.responder.content_type = 'application/x-u1db-sync-response'
self.responder.start_response(200)
self.responder.start_stream(),
@@ -264,7 +281,7 @@ class SyncResource(http_app.SyncResource):
if self.replica_uid is not None:
header['replica_uid'] = self.replica_uid
self.responder.stream_entry(header)
- self.sync_exch.return_one_doc(send_doc)
+ self.sync_exch.return_docs(send_doc)
self.responder.end_stream()
self.responder.finish_response()
@@ -273,7 +290,6 @@ class SyncResource(http_app.SyncResource):
Return the current generation and transaction_id after inserting one
incoming document.
"""
- self.sync_exch.batched_insert_from_source(self._staging, self._sync_id)
self.responder.content_type = 'application/x-soledad-sync-response'
self.responder.start_response(200)
self.responder.start_stream(),