summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/http_target/send.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/http_target/send.py')
-rw-r--r--client/src/leap/soledad/client/http_target/send.py64
1 files changed, 24 insertions, 40 deletions
diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py
index c7bd057e..fcda9bd7 100644
--- a/client/src/leap/soledad/client/http_target/send.py
+++ b/client/src/leap/soledad/client/http_target/send.py
@@ -22,6 +22,7 @@ from leap.soledad.common.log import getLogger
from leap.soledad.client.events import emit_async
from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS
from leap.soledad.client.http_target.support import RequestBody
+from .send_protocol import DocStreamProducer
logger = getLogger(__name__)
@@ -54,73 +55,56 @@ class HTTPDocSender(object):
last_known_trans_id=last_known_trans_id,
sync_id=sync_id,
ensure=self._ensure_callback is not None)
- total = len(docs_by_generation)
- while body.consumed < total:
- result = yield self._send_batch(total, body, docs_by_generation)
+ result = yield self._send_batch(body, docs_by_generation)
response_dict = json.loads(result)[0]
gen_after_send = response_dict['new_generation']
trans_id_after_send = response_dict['new_transaction_id']
defer.returnValue([gen_after_send, trans_id_after_send])
- def _delete_sent(self, docs):
- for doc, gen, trans_id in docs:
- self._sync_enc_pool.delete_encrypted_doc(
- doc.doc_id, doc.rev)
-
@defer.inlineCallbacks
- def _send_batch(self, total, body, docs):
- sent = []
+ def _send_batch(self, body, docs):
+ total = len(docs)
missing = total - body.consumed
+ calls = []
for i in xrange(1, missing + 1):
- if body.pending_size > self.MAX_BATCH_SIZE:
- break
idx = body.consumed + i
entry = docs[idx - 1]
- sent.append(entry)
- yield self._prepare_one_doc(entry, body, idx, total)
- result = yield self._send_request(body.pop())
- if self._defer_encryption:
- self._delete_sent(sent)
+ calls.append((self._prepare_one_doc,
+ entry, body, idx, total))
+ result = yield self._send_request(body, calls)
_emit_send_status(self.uuid, body.consumed, total)
defer.returnValue(result)
- def _send_request(self, body):
+ def _send_request(self, body, calls):
return self._http_request(
self._url,
method='POST',
- body=body,
- content_type='application/x-soledad-sync-put')
+ body=(body, calls),
+ content_type='application/x-soledad-sync-put',
+ body_producer=DocStreamProducer)
@defer.inlineCallbacks
def _prepare_one_doc(self, entry, body, idx, total):
- doc, gen, trans_id = entry
- content = yield self._encrypt_doc(doc)
+ get_doc, gen, trans_id = entry
+ doc, content = yield self._encrypt_doc(get_doc)
body.insert_info(
id=doc.doc_id, rev=doc.rev, content=content, gen=gen,
trans_id=trans_id, number_of_docs=total,
doc_idx=idx)
- def _encrypt_doc(self, doc):
- d = None
+ @defer.inlineCallbacks
+ def _encrypt_doc(self, get_doc):
+ if type(get_doc) == tuple:
+ f, args = get_doc
+ doc = yield f(args)
+ else:
+ # tests
+ doc = get_doc
if doc.is_tombstone():
- d = defer.succeed(None)
- elif not self._defer_encryption:
- # fallback case, for tests
- d = defer.succeed(self._crypto.encrypt_doc(doc))
+ defer.returnValue((doc, None))
else:
-
- def _maybe_encrypt_doc_inline(doc_json):
- if doc_json is None:
- # the document is not marked as tombstone, but we got
- # nothing from the sync db. As it is not encrypted
- # yet, we force inline encryption.
- return self._crypto.encrypt_doc(doc)
- return doc_json
-
- d = self._sync_enc_pool.get_encrypted_doc(doc.doc_id, doc.rev)
- d.addCallback(_maybe_encrypt_doc_inline)
- return d
+ defer.returnValue((doc, self._crypto.encrypt_doc(doc)))
def _emit_send_status(user_data, idx, total):