summaryrefslogtreecommitdiff
path: root/server/src/leap/soledad/server/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'server/src/leap/soledad/server/sync.py')
-rw-r--r--server/src/leap/soledad/server/sync.py98
1 files changed, 57 insertions, 41 deletions
diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py
index 3f5c4aba..b553a056 100644
--- a/server/src/leap/soledad/server/sync.py
+++ b/server/src/leap/soledad/server/sync.py
@@ -17,14 +17,19 @@
"""
Server side synchronization infrastructure.
"""
-from leap.soledad.common.l2db import sync, Document
+import time
+from itertools import izip
+
+from leap.soledad.common.l2db import sync
from leap.soledad.common.l2db.remote import http_app
from leap.soledad.server.caching import get_cache_for
from leap.soledad.server.state import ServerSyncState
+from leap.soledad.common.document import ServerDocument
-MAX_REQUEST_SIZE = 200 # in Mb
+MAX_REQUEST_SIZE = float('inf') # It's a stream.
MAX_ENTRY_SIZE = 200 # in Mb
+ENTRY_CACHE_SIZE = 8192 * 1024
class SyncExchange(sync.SyncExchange):
@@ -51,7 +56,7 @@ class SyncExchange(sync.SyncExchange):
# recover sync state
self._sync_state = ServerSyncState(self.source_replica_uid, sync_id)
- def find_changes_to_return(self, received):
+ def find_changes_to_return(self):
"""
Find changes to return.
@@ -59,10 +64,6 @@ class SyncExchange(sync.SyncExchange):
order using whats_changed. It excludes documents ids that have
already been considered (superseded by the sender, etc).
- :param received: How many documents the source replica has already
- received during the current sync process.
- :type received: int
-
:return: the generation of this database, which the caller can
consider themselves to be synchronized after processing
allreturned documents, and the amount of documents to be sent
@@ -78,41 +79,45 @@ class SyncExchange(sync.SyncExchange):
self._trace('after whats_changed')
seen_ids = self._sync_state.seen_ids()
# changed docs that weren't superseded by or converged with
- changes_to_return = [
+ self.changes_to_return = [
(doc_id, gen, trans_id) for (doc_id, gen, trans_id) in changes
# there was a subsequent update
if doc_id not in seen_ids or seen_ids.get(doc_id) < gen]
self._sync_state.put_changes_to_return(
- new_gen, new_trans_id, changes_to_return)
- number_of_changes = len(changes_to_return)
- # query server for stored changes
- _, _, next_change_to_return = \
- self._sync_state.next_change_to_return(received)
+ new_gen, new_trans_id, self.changes_to_return)
+ number_of_changes = len(self.changes_to_return)
self.new_gen = new_gen
self.new_trans_id = new_trans_id
- # and append one change
- self.change_to_return = next_change_to_return
return self.new_gen, number_of_changes
- def return_one_doc(self, return_doc_cb):
- """
- Return one changed document and its last change generation to the
- source syncing replica by invoking the callback return_doc_cb.
+ def return_docs(self, return_doc_cb):
+ """Return the changed documents and their last change generation
+ repeatedly invoking the callback return_doc_cb.
- This is called once for each document to be transferred from target to
- source.
+ The final step of a sync exchange.
- :param return_doc_cb: is a callback used to return the documents with
- their last change generation to the target
- replica.
- :type return_doc_cb: callable(doc, gen, trans_id)
+ :param: return_doc_cb(doc, gen, trans_id): is a callback
+ used to return the documents with their last change generation
+ to the target replica.
+ :return: None
"""
- if self.change_to_return is not None:
- changed_doc_id, gen, trans_id = self.change_to_return
- doc = self._db.get_doc(changed_doc_id, include_deleted=True)
+ changes_to_return = self.changes_to_return
+ # return docs, including conflicts.
+ # content as a file-object (will be read when writing)
+ changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return]
+ docs = self._db.get_docs(
+ changed_doc_ids, check_for_conflicts=False,
+ include_deleted=True, read_content=False)
+
+ docs_by_gen = izip(
+ docs, (gen for _, gen, _ in changes_to_return),
+ (trans_id for _, _, trans_id in changes_to_return))
+ for doc, gen, trans_id in docs_by_gen:
return_doc_cb(doc, gen, trans_id)
def batched_insert_from_source(self, entries, sync_id):
+ if not entries:
+ return
self._db.batch_start()
for entry in entries:
doc, gen, trans_id, number_of_docs, doc_idx = entry
@@ -207,6 +212,7 @@ class SyncResource(http_app.SyncResource):
db, self.source_replica_uid, last_known_generation, sync_id)
self._sync_id = sync_id
self._staging = []
+ self._staging_size = 0
@http_app.http_method(content_as_args=True)
def post_put(
@@ -233,26 +239,37 @@ class SyncResource(http_app.SyncResource):
:param doc_idx: The index of the current document.
:type doc_idx: int
"""
- doc = Document(id, rev, content)
+ doc = ServerDocument(id, rev, json=content)
+ self._staging_size += len(content or '')
self._staging.append((doc, gen, trans_id, number_of_docs, doc_idx))
+ if self._staging_size > ENTRY_CACHE_SIZE or doc_idx == number_of_docs:
+ self.sync_exch.batched_insert_from_source(self._staging,
+ self._sync_id)
+ self._staging = []
+ self._staging_size = 0
- @http_app.http_method(received=int, content_as_args=True)
- def post_get(self, received):
+ def post_get(self):
"""
- Return one syncing document to the client.
-
- :param received: How many documents have already been received by the
- client on the current sync session.
- :type received: int
+ Return syncing documents to the client.
"""
-
def send_doc(doc, gen, trans_id):
- entry = dict(id=doc.doc_id, rev=doc.rev, content=doc.get_json(),
+ entry = dict(id=doc.doc_id, rev=doc.rev,
gen=gen, trans_id=trans_id)
self.responder.stream_entry(entry)
+ content_reader = doc.get_json()
+ if content_reader:
+ content = content_reader.read()
+ self.responder.stream_entry(content)
+ content_reader.close()
+ # throttle at 5mb/s
+ # FIXME: twistd cant control througput
+ # we need to either use gunicorn or go async
+ time.sleep(len(content) / (5.0 * 1024 * 1024))
+ else:
+ self.responder.stream_entry('')
new_gen, number_of_changes = \
- self.sync_exch.find_changes_to_return(received)
+ self.sync_exch.find_changes_to_return()
self.responder.content_type = 'application/x-u1db-sync-response'
self.responder.start_response(200)
self.responder.start_stream(),
@@ -264,7 +281,7 @@ class SyncResource(http_app.SyncResource):
if self.replica_uid is not None:
header['replica_uid'] = self.replica_uid
self.responder.stream_entry(header)
- self.sync_exch.return_one_doc(send_doc)
+ self.sync_exch.return_docs(send_doc)
self.responder.end_stream()
self.responder.finish_response()
@@ -273,7 +290,6 @@ class SyncResource(http_app.SyncResource):
Return the current generation and transaction_id after inserting one
incoming document.
"""
- self.sync_exch.batched_insert_from_source(self._staging, self._sync_id)
self.responder.content_type = 'application/x-soledad-sync-response'
self.responder.start_response(200)
self.responder.start_stream(),