diff options
Diffstat (limited to 'client/src/leap/soledad/client/http_target/send.py')
-rw-r--r-- | client/src/leap/soledad/client/http_target/send.py | 64 |
1 files changed, 24 insertions, 40 deletions
diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py index c7bd057e..fcda9bd7 100644 --- a/client/src/leap/soledad/client/http_target/send.py +++ b/client/src/leap/soledad/client/http_target/send.py @@ -22,6 +22,7 @@ from leap.soledad.common.log import getLogger from leap.soledad.client.events import emit_async from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS from leap.soledad.client.http_target.support import RequestBody +from .send_protocol import DocStreamProducer logger = getLogger(__name__) @@ -54,73 +55,56 @@ class HTTPDocSender(object): last_known_trans_id=last_known_trans_id, sync_id=sync_id, ensure=self._ensure_callback is not None) - total = len(docs_by_generation) - while body.consumed < total: - result = yield self._send_batch(total, body, docs_by_generation) + result = yield self._send_batch(body, docs_by_generation) response_dict = json.loads(result)[0] gen_after_send = response_dict['new_generation'] trans_id_after_send = response_dict['new_transaction_id'] defer.returnValue([gen_after_send, trans_id_after_send]) - def _delete_sent(self, docs): - for doc, gen, trans_id in docs: - self._sync_enc_pool.delete_encrypted_doc( - doc.doc_id, doc.rev) - @defer.inlineCallbacks - def _send_batch(self, total, body, docs): - sent = [] + def _send_batch(self, body, docs): + total = len(docs) missing = total - body.consumed + calls = [] for i in xrange(1, missing + 1): - if body.pending_size > self.MAX_BATCH_SIZE: - break idx = body.consumed + i entry = docs[idx - 1] - sent.append(entry) - yield self._prepare_one_doc(entry, body, idx, total) - result = yield self._send_request(body.pop()) - if self._defer_encryption: - self._delete_sent(sent) + calls.append((self._prepare_one_doc, + entry, body, idx, total)) + result = yield self._send_request(body, calls) _emit_send_status(self.uuid, body.consumed, total) defer.returnValue(result) - def _send_request(self, body): + def _send_request(self, body, calls): return self._http_request( self._url, method='POST', - body=body, - content_type='application/x-soledad-sync-put') + body=(body, calls), + content_type='application/x-soledad-sync-put', + body_producer=DocStreamProducer) @defer.inlineCallbacks def _prepare_one_doc(self, entry, body, idx, total): - doc, gen, trans_id = entry - content = yield self._encrypt_doc(doc) + get_doc, gen, trans_id = entry + doc, content = yield self._encrypt_doc(get_doc) body.insert_info( id=doc.doc_id, rev=doc.rev, content=content, gen=gen, trans_id=trans_id, number_of_docs=total, doc_idx=idx) - def _encrypt_doc(self, doc): - d = None + @defer.inlineCallbacks + def _encrypt_doc(self, get_doc): + if type(get_doc) == tuple: + f, args = get_doc + doc = yield f(args) + else: + # tests + doc = get_doc if doc.is_tombstone(): - d = defer.succeed(None) - elif not self._defer_encryption: - # fallback case, for tests - d = defer.succeed(self._crypto.encrypt_doc(doc)) + defer.returnValue((doc, None)) else: - - def _maybe_encrypt_doc_inline(doc_json): - if doc_json is None: - # the document is not marked as tombstone, but we got - # nothing from the sync db. As it is not encrypted - # yet, we force inline encryption. - return self._crypto.encrypt_doc(doc) - return doc_json - - d = self._sync_enc_pool.get_encrypted_doc(doc.doc_id, doc.rev) - d.addCallback(_maybe_encrypt_doc_inline) - return d + defer.returnValue((doc, self._crypto.encrypt_doc(doc))) def _emit_send_status(user_data, idx, total): |