summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Shyba <victor.shyba@gmail.com>2016-09-22 01:00:23 -0300
committerdrebs <drebs@leap.se>2016-12-12 09:11:59 -0200
commit7680ec18f26ce6bab48c8a57a05e08cba7c6ba5e (patch)
tree12b8478e6ae9c23a1dc8c68269f66be9a4df8f6b
parenta302322e53878a6212532d33ac0a0f9e0c34b176 (diff)
[feature] stream content in a separate line
This allow different paths for raw data and metadata, avoiding unnecessary json parsing.
-rw-r--r--client/src/leap/soledad/client/http_target/send_protocol.py2
-rw-r--r--client/src/leap/soledad/client/http_target/support.py8
-rw-r--r--server/src/leap/soledad/server/__init__.py7
-rw-r--r--server/src/leap/soledad/server/sync.py11
4 files changed, 20 insertions, 8 deletions
diff --git a/client/src/leap/soledad/client/http_target/send_protocol.py b/client/src/leap/soledad/client/http_target/send_protocol.py
index c72c6d13..b93a4284 100644
--- a/client/src/leap/soledad/client/http_target/send_protocol.py
+++ b/client/src/leap/soledad/client/http_target/send_protocol.py
@@ -39,7 +39,7 @@ class DocStreamProducer(object):
yield call[0](*call[1:])
while self.producer and not self.stop:
if self.pause:
- yield self.sleep(0.01)
+ yield self.sleep(0.001)
continue
call = self.producer.pop(0)
yield call[0](*call[1:])
diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py
index fe91c5b1..c066331c 100644
--- a/client/src/leap/soledad/client/http_target/support.py
+++ b/client/src/leap/soledad/client/http_target/support.py
@@ -169,7 +169,13 @@ class RequestBody(object):
:return: length of the entry after JSON dumps
:rtype: int
"""
- entry = json.dumps(entry_dict)
+ if 'content' in entry_dict:
+ content = entry_dict['content'] or ''
+ del entry_dict['content']
+ entry = json.dumps(entry_dict)
+ entry = entry + ',\r\n' + content
+ else:
+ entry = json.dumps(entry_dict)
self.entries.append(entry)
def pop(self, amount=10):
diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py
index 889bf48f..7ba95543 100644
--- a/server/src/leap/soledad/server/__init__.py
+++ b/server/src/leap/soledad/server/__init__.py
@@ -221,14 +221,15 @@ class HTTPInvocationByMethodWithBody(
meth_put = self._lookup('%s_put' % method)
meth_end = self._lookup('%s_end' % method)
while True:
- line = body_getline()
- entry = line.strip()
+ entry = body_getline().strip()
if entry == ']': # end of incoming document stream
break
if not entry or not comma: # empty or no prec comma
raise http_app.BadRequest
entry, comma = utils.check_and_strip_comma(entry)
- meth_put({}, entry)
+ content = body_getline().strip()
+ content, comma = utils.check_and_strip_comma(content)
+ meth_put({'content': content or None}, entry)
if comma or body_getline(): # extra comma or data
raise http_app.BadRequest
return meth_end()
diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py
index bc977210..6fcfe240 100644
--- a/server/src/leap/soledad/server/sync.py
+++ b/server/src/leap/soledad/server/sync.py
@@ -212,6 +212,7 @@ class SyncResource(http_app.SyncResource):
db, self.source_replica_uid, last_known_generation, sync_id)
self._sync_id = sync_id
self._staging = []
+ self._staging_size = 0
@http_app.http_method(content_as_args=True)
def post_put(
@@ -240,9 +241,13 @@ class SyncResource(http_app.SyncResource):
"""
doc = ServerDocument(id, rev)
doc._json = content
- self.sync_exch.insert_doc_from_source(
- doc, gen, trans_id, number_of_docs=None,
- doc_idx=None, sync_id=None)
+ self._staging_size += len(content or '')
+ self._staging.append((doc, gen, trans_id, number_of_docs, doc_idx))
+ if self._staging_size > 8192 * 1024 or doc_idx == number_of_docs:
+ self.sync_exch.batched_insert_from_source(self._staging,
+ self._sync_id)
+ self._staging = []
+ self._staging_size = 0
@http_app.http_method(received=int, content_as_args=True)
def post_get(self, received):