summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Shyba <victor.shyba@gmail.com>2015-11-13 22:10:18 -0300
committerVictor Shyba <victor.shyba@gmail.com>2015-12-03 17:30:48 -0300
commit6fd543b9fd9679f4978aeedee32eeece5593acc3 (patch)
tree43148b17680e55fbc97fc054bcb0cc92b80a456b
parent129db70b5237ffbc7b38d9931598629f46ce4763 (diff)
[feat] Adds support to batching limited by size
u1db provides batching by default. Current Soledad HTTPS Sync Target was stuck at 1 doc per request. This commit adds batching capability, limiting the size to a predefined value. Default limit size: 500kB
-rw-r--r--client/src/leap/soledad/client/http_target/send.py46
-rw-r--r--client/src/leap/soledad/client/http_target/support.py13
2 files changed, 41 insertions, 18 deletions
diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py
index 80483f0d..c1252c13 100644
--- a/client/src/leap/soledad/client/http_target/send.py
+++ b/client/src/leap/soledad/client/http_target/send.py
@@ -29,6 +29,8 @@ class HTTPDocSender(object):
They need to be encrypted and metadata prepared before sending.
"""
+ MAX_BATCH_SIZE = 500 * 1000 # 500kB by default
+
@defer.inlineCallbacks
def _send_docs(self, docs_by_generation, last_known_generation,
last_known_trans_id, sync_id):
@@ -43,25 +45,41 @@ class HTTPDocSender(object):
sync_id=sync_id,
ensure=self._ensure_callback is not None)
total = len(docs_by_generation)
- for idx, entry in enumerate(docs_by_generation, 1):
- yield self._prepare_one_doc(entry, body, idx, total)
- result = yield self._http_request(
- self._url,
- method='POST',
- body=body.pop(1),
- content_type='application/x-soledad-sync-put')
- if self._defer_encryption:
- self._delete_sent(idx, docs_by_generation)
- _emit_send_status(idx, total)
+ while body.consumed < total:
+ result = yield self._send_batch(total, body, docs_by_generation)
response_dict = json.loads(result)[0]
gen_after_send = response_dict['new_generation']
trans_id_after_send = response_dict['new_transaction_id']
defer.returnValue([gen_after_send, trans_id_after_send])
- def _delete_sent(self, idx, docs_by_generation):
- doc = docs_by_generation[idx - 1][0]
- self._sync_enc_pool.delete_encrypted_doc(
- doc.doc_id, doc.rev)
+ def _delete_sent(self, docs):
+ for doc, gen, trans_id in docs:
+ self._sync_enc_pool.delete_encrypted_doc(
+ doc.doc_id, doc.rev)
+
+ @defer.inlineCallbacks
+ def _send_batch(self, total, body, docs):
+ sent = []
+ missing = total - body.consumed
+ for i in xrange(1, missing + 1):
+ if body.pending_size > self.MAX_BATCH_SIZE:
+ break
+ idx = body.consumed + i
+ entry = docs[idx - 1]
+ sent.append(entry)
+ yield self._prepare_one_doc(entry, body, idx, total)
+ result = yield self._send_request(body.pop())
+ if self._defer_encryption:
+ self._delete_sent(sent)
+ _emit_send_status(body.consumed, total)
+ defer.returnValue(result)
+
+ def _send_request(self, body):
+ return self._http_request(
+ self._url,
+ method='POST',
+ body=body,
+ content_type='application/x-soledad-sync-put')
@defer.inlineCallbacks
def _prepare_one_doc(self, entry, body, idx, total):
diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py
index 44cd7089..2625744c 100644
--- a/client/src/leap/soledad/client/http_target/support.py
+++ b/client/src/leap/soledad/client/http_target/support.py
@@ -152,6 +152,8 @@ class RequestBody(object):
"""
self.headers = header_dict
self.entries = []
+ self.consumed = 0
+ self.pending_size = 0
def insert_info(self, **entry_dict):
"""
@@ -165,11 +167,11 @@ class RequestBody(object):
"""
entry = json.dumps(entry_dict)
self.entries.append(entry)
- return len(entry)
+ self.pending_size += len(entry)
- def pop(self, number=1):
+ def pop(self):
"""
- Removes an amount of entries and returns it formatted and ready
+ Removes all entries and returns it formatted and ready
to be sent.
:param number: number of entries to pop and format
@@ -178,7 +180,10 @@ class RequestBody(object):
:return: formatted body ready to be sent
:rtype: str
"""
- entries = [self.entries.pop(0) for i in xrange(number)]
+ entries = self.entries[:]
+ self.entries = []
+ self.pending_size = 0
+ self.consumed += len(entries)
return self.entries_to_str(entries)
def __str__(self):