summaryrefslogtreecommitdiff
path: root/client
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2015-12-04 15:02:42 -0400
committerKali Kaneko <kali@leap.se>2015-12-04 15:02:42 -0400
commitfb7045bbb15af70def876775eef9eb1a2f5c6ca7 (patch)
tree1cb0547cd42a0347e8c849e4d1e0343cf8626f9a /client
parentf8982e74768fd7039b543a97060701dec444a9f5 (diff)
parent27bda0ac201e236e3a2c9671462a337f2970e993 (diff)
Merge branch 'develop' into debian/platform-0.8
Diffstat (limited to 'client')
-rw-r--r--client/changes/feat_send_batch1
-rw-r--r--client/src/leap/soledad/client/encdecpool.py20
-rw-r--r--client/src/leap/soledad/client/http_target/send.py46
-rw-r--r--client/src/leap/soledad/client/http_target/support.py13
4 files changed, 50 insertions, 30 deletions
diff --git a/client/changes/feat_send_batch b/client/changes/feat_send_batch
new file mode 100644
index 00000000..fbfce519
--- /dev/null
+++ b/client/changes/feat_send_batch
@@ -0,0 +1 @@
+o Client will now send documents at a limited size batch due to changes on SyncTarget. The default limit is 500kB.
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
index 6d3c11b9..0954c1df 100644
--- a/client/src/leap/soledad/client/encdecpool.py
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -70,11 +70,13 @@ class SyncEncryptDecryptPool(object):
self._started = False
def start(self):
+ if self.running:
+ return
self._create_pool()
self._started = True
def stop(self):
- if not self._started:
+ if not self.running:
return
self._started = False
self._destroy_pool()
@@ -650,14 +652,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
last_idx = self._last_inserted_idx
for doc_id, rev, content, gen, trans_id, encrypted, idx in \
decrypted_docs:
- # XXX for some reason, a document might not have been deleted from
- # the database. This is a bug. In this point, already
- # processed documents should have been removed from the sync
- # database and we should not have to skip them here. We need
- # to find out why this is happening, fix, and remove the
- # skipping below.
- if (idx < last_idx + 1):
- continue
if (idx != last_idx + 1):
break
insertable.append((doc_id, rev, content, gen, trans_id, idx))
@@ -763,6 +757,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,)
return self._runOperation(query)
+ @defer.inlineCallbacks
def _collect_async_decryption_results(self):
"""
Collect the results of the asynchronous doc decryptions and re-raise
@@ -773,7 +768,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
async_results = self._async_results[:]
for res in async_results:
if res.ready():
- self._decrypt_doc_cb(res.get()) # might raise an exception!
+ # XXX: might raise an exception!
+ yield self._decrypt_doc_cb(res.get())
self._async_results.remove(res)
@defer.inlineCallbacks
@@ -796,7 +792,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
if processed < pending:
yield self._async_decrypt_received_docs()
- self._collect_async_decryption_results()
+ yield self._collect_async_decryption_results()
docs = yield self._process_decrypted_docs()
yield self._delete_processed_docs(docs)
# recurse
@@ -807,6 +803,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
self._finish()
def _finish(self):
- self._deferred.callback(None)
self._processed_docs = 0
self._last_inserted_idx = 0
+ self._deferred.callback(None)
diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py
index 80483f0d..e8abf35b 100644
--- a/client/src/leap/soledad/client/http_target/send.py
+++ b/client/src/leap/soledad/client/http_target/send.py
@@ -29,6 +29,8 @@ class HTTPDocSender(object):
They need to be encrypted and metadata prepared before sending.
"""
+ MAX_BATCH_SIZE = 0 # disabled by now, this is being tested yet
+
@defer.inlineCallbacks
def _send_docs(self, docs_by_generation, last_known_generation,
last_known_trans_id, sync_id):
@@ -43,25 +45,41 @@ class HTTPDocSender(object):
sync_id=sync_id,
ensure=self._ensure_callback is not None)
total = len(docs_by_generation)
- for idx, entry in enumerate(docs_by_generation, 1):
- yield self._prepare_one_doc(entry, body, idx, total)
- result = yield self._http_request(
- self._url,
- method='POST',
- body=body.pop(1),
- content_type='application/x-soledad-sync-put')
- if self._defer_encryption:
- self._delete_sent(idx, docs_by_generation)
- _emit_send_status(idx, total)
+ while body.consumed < total:
+ result = yield self._send_batch(total, body, docs_by_generation)
response_dict = json.loads(result)[0]
gen_after_send = response_dict['new_generation']
trans_id_after_send = response_dict['new_transaction_id']
defer.returnValue([gen_after_send, trans_id_after_send])
- def _delete_sent(self, idx, docs_by_generation):
- doc = docs_by_generation[idx - 1][0]
- self._sync_enc_pool.delete_encrypted_doc(
- doc.doc_id, doc.rev)
+ def _delete_sent(self, docs):
+ for doc, gen, trans_id in docs:
+ self._sync_enc_pool.delete_encrypted_doc(
+ doc.doc_id, doc.rev)
+
+ @defer.inlineCallbacks
+ def _send_batch(self, total, body, docs):
+ sent = []
+ missing = total - body.consumed
+ for i in xrange(1, missing + 1):
+ if body.pending_size > self.MAX_BATCH_SIZE:
+ break
+ idx = body.consumed + i
+ entry = docs[idx - 1]
+ sent.append(entry)
+ yield self._prepare_one_doc(entry, body, idx, total)
+ result = yield self._send_request(body.pop())
+ if self._defer_encryption:
+ self._delete_sent(sent)
+ _emit_send_status(body.consumed, total)
+ defer.returnValue(result)
+
+ def _send_request(self, body):
+ return self._http_request(
+ self._url,
+ method='POST',
+ body=body,
+ content_type='application/x-soledad-sync-put')
@defer.inlineCallbacks
def _prepare_one_doc(self, entry, body, idx, total):
diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py
index 44cd7089..2625744c 100644
--- a/client/src/leap/soledad/client/http_target/support.py
+++ b/client/src/leap/soledad/client/http_target/support.py
@@ -152,6 +152,8 @@ class RequestBody(object):
"""
self.headers = header_dict
self.entries = []
+ self.consumed = 0
+ self.pending_size = 0
def insert_info(self, **entry_dict):
"""
@@ -165,11 +167,11 @@ class RequestBody(object):
"""
entry = json.dumps(entry_dict)
self.entries.append(entry)
- return len(entry)
+ self.pending_size += len(entry)
- def pop(self, number=1):
+ def pop(self):
"""
- Removes an amount of entries and returns it formatted and ready
+ Removes all entries and returns it formatted and ready
to be sent.
:param number: number of entries to pop and format
@@ -178,7 +180,10 @@ class RequestBody(object):
:return: formatted body ready to be sent
:rtype: str
"""
- entries = [self.entries.pop(0) for i in xrange(number)]
+ entries = self.entries[:]
+ self.entries = []
+ self.pending_size = 0
+ self.consumed += len(entries)
return self.entries_to_str(entries)
def __str__(self):