diff options
Diffstat (limited to 'client/src/leap/soledad/client/http_target/send.py')
-rw-r--r-- | client/src/leap/soledad/client/http_target/send.py | 59 |
1 files changed, 43 insertions, 16 deletions
diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py index 80483f0d..89288779 100644 --- a/client/src/leap/soledad/client/http_target/send.py +++ b/client/src/leap/soledad/client/http_target/send.py @@ -29,6 +29,15 @@ class HTTPDocSender(object): They need to be encrypted and metadata prepared before sending. """ + MAX_BATCH_SIZE = 0 # disabled by now, this is being tested yet + + # The uuid of the local replica. + # Any class inheriting from this one should provide a meaningful attribute + # if the sync status event is meant to be used somewhere else. + + uuid = 'undefined' + userid = 'undefined' + @defer.inlineCallbacks def _send_docs(self, docs_by_generation, last_known_generation, last_known_trans_id, sync_id): @@ -43,25 +52,43 @@ class HTTPDocSender(object): sync_id=sync_id, ensure=self._ensure_callback is not None) total = len(docs_by_generation) - for idx, entry in enumerate(docs_by_generation, 1): - yield self._prepare_one_doc(entry, body, idx, total) - result = yield self._http_request( - self._url, - method='POST', - body=body.pop(1), - content_type='application/x-soledad-sync-put') - if self._defer_encryption: - self._delete_sent(idx, docs_by_generation) - _emit_send_status(idx, total) + while body.consumed < total: + result = yield self._send_batch(total, body, docs_by_generation) response_dict = json.loads(result)[0] gen_after_send = response_dict['new_generation'] trans_id_after_send = response_dict['new_transaction_id'] defer.returnValue([gen_after_send, trans_id_after_send]) - def _delete_sent(self, idx, docs_by_generation): - doc = docs_by_generation[idx - 1][0] - self._sync_enc_pool.delete_encrypted_doc( - doc.doc_id, doc.rev) + def _delete_sent(self, docs): + for doc, gen, trans_id in docs: + self._sync_enc_pool.delete_encrypted_doc( + doc.doc_id, doc.rev) + + @defer.inlineCallbacks + def _send_batch(self, total, body, docs): + sent = [] + missing = total - body.consumed + for i in xrange(1, missing + 1): + if body.pending_size > self.MAX_BATCH_SIZE: + break + idx = body.consumed + i + entry = docs[idx - 1] + sent.append(entry) + yield self._prepare_one_doc(entry, body, idx, total) + result = yield self._send_request(body.pop()) + if self._defer_encryption: + self._delete_sent(sent) + + user_data = {'uuid': self.uuid, 'userid': self.userid} + _emit_send_status(self.uuid, body.consumed, total) + defer.returnValue(result) + + def _send_request(self, body): + return self._http_request( + self._url, + method='POST', + body=body, + content_type='application/x-soledad-sync-put') @defer.inlineCallbacks def _prepare_one_doc(self, entry, body, idx, total): @@ -94,9 +121,9 @@ class HTTPDocSender(object): return d -def _emit_send_status(idx, total): +def _emit_send_status(user_data, idx, total): content = {'sent': idx, 'total': total} - emit_async(SOLEDAD_SYNC_SEND_STATUS, content) + emit_async(SOLEDAD_SYNC_SEND_STATUS, user_data, content) msg = "%d/%d" % (idx, total) logger.debug("Sync send status: %s" % msg) |