From 6fd543b9fd9679f4978aeedee32eeece5593acc3 Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Fri, 13 Nov 2015 22:10:18 -0300
Subject: [feat] Adds support to batching limited by size

u1db provides batching by default. Current Soledad HTTPS Sync Target was
stuck at 1 doc per request. This commit adds batching capability,
limiting the size to a predefined value.
Default limit size: 500kB
---
 client/src/leap/soledad/client/http_target/send.py | 46 +++++++++++++++-------
 .../src/leap/soledad/client/http_target/support.py | 13 ++++--
 2 files changed, 41 insertions(+), 18 deletions(-)

(limited to 'client/src')

diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py
index 80483f0d..c1252c13 100644
--- a/client/src/leap/soledad/client/http_target/send.py
+++ b/client/src/leap/soledad/client/http_target/send.py
@@ -29,6 +29,8 @@ class HTTPDocSender(object):
     They need to be encrypted and metadata prepared before sending.
     """
 
+    MAX_BATCH_SIZE = 500 * 1000  # 500kB by default
+
     @defer.inlineCallbacks
     def _send_docs(self, docs_by_generation, last_known_generation,
                    last_known_trans_id, sync_id):
@@ -43,25 +45,41 @@ class HTTPDocSender(object):
             sync_id=sync_id,
             ensure=self._ensure_callback is not None)
         total = len(docs_by_generation)
-        for idx, entry in enumerate(docs_by_generation, 1):
-            yield self._prepare_one_doc(entry, body, idx, total)
-            result = yield self._http_request(
-                self._url,
-                method='POST',
-                body=body.pop(1),
-                content_type='application/x-soledad-sync-put')
-            if self._defer_encryption:
-                self._delete_sent(idx, docs_by_generation)
-            _emit_send_status(idx, total)
+        while body.consumed < total:
+            result = yield self._send_batch(total, body, docs_by_generation)
         response_dict = json.loads(result)[0]
         gen_after_send = response_dict['new_generation']
         trans_id_after_send = response_dict['new_transaction_id']
         defer.returnValue([gen_after_send, trans_id_after_send])
 
-    def _delete_sent(self, idx, docs_by_generation):
-        doc = docs_by_generation[idx - 1][0]
-        self._sync_enc_pool.delete_encrypted_doc(
-            doc.doc_id, doc.rev)
+    def _delete_sent(self, docs):
+        for doc, gen, trans_id in docs:
+            self._sync_enc_pool.delete_encrypted_doc(
+                doc.doc_id, doc.rev)
+
+    @defer.inlineCallbacks
+    def _send_batch(self, total, body, docs):
+        sent = []
+        missing = total - body.consumed
+        for i in xrange(1, missing + 1):
+            if body.pending_size > self.MAX_BATCH_SIZE:
+                break
+            idx = body.consumed + i
+            entry = docs[idx - 1]
+            sent.append(entry)
+            yield self._prepare_one_doc(entry, body, idx, total)
+        result = yield self._send_request(body.pop())
+        if self._defer_encryption:
+            self._delete_sent(sent)
+        _emit_send_status(body.consumed, total)
+        defer.returnValue(result)
+
+    def _send_request(self, body):
+        return self._http_request(
+            self._url,
+            method='POST',
+            body=body,
+            content_type='application/x-soledad-sync-put')
 
     @defer.inlineCallbacks
     def _prepare_one_doc(self, entry, body, idx, total):
diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py
index 44cd7089..2625744c 100644
--- a/client/src/leap/soledad/client/http_target/support.py
+++ b/client/src/leap/soledad/client/http_target/support.py
@@ -152,6 +152,8 @@ class RequestBody(object):
         """
         self.headers = header_dict
         self.entries = []
+        self.consumed = 0
+        self.pending_size = 0
 
     def insert_info(self, **entry_dict):
         """
@@ -165,11 +167,11 @@ class RequestBody(object):
         """
         entry = json.dumps(entry_dict)
         self.entries.append(entry)
-        return len(entry)
+        self.pending_size += len(entry)
 
-    def pop(self, number=1):
+    def pop(self):
         """
-        Removes an amount of entries and returns it formatted and ready
+        Removes all entries and returns it formatted and ready
         to be sent.
 
         :param number: number of entries to pop and format
@@ -178,7 +180,10 @@ class RequestBody(object):
         :return: formatted body ready to be sent
         :rtype: str
         """
-        entries = [self.entries.pop(0) for i in xrange(number)]
+        entries = self.entries[:]
+        self.entries = []
+        self.pending_size = 0
+        self.consumed += len(entries)
         return self.entries_to_str(entries)
 
     def __str__(self):
-- 
cgit v1.2.3