summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad')
-rw-r--r--client/src/leap/soledad/client/http_target/fetch.py128
1 files changed, 49 insertions, 79 deletions
diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py
index 184c5883..0fb5040f 100644
--- a/client/src/leap/soledad/client/http_target/fetch.py
+++ b/client/src/leap/soledad/client/http_target/fetch.py
@@ -51,6 +51,7 @@ class HTTPDocFetcher(object):
@defer.inlineCallbacks
def _receive_docs(self, last_known_generation, last_known_trans_id,
ensure_callback, sync_id, defer_decryption):
+ defer_decryption = False
self._queue_for_decrypt = defer_decryption \
and self._sync_db is not None
@@ -73,43 +74,18 @@ class HTTPDocFetcher(object):
# to know the total number of documents to be received, and this
# information comes as metadata to each request.
- doc = yield self._receive_one_doc(
+ docs = yield self._fetch_all(
last_known_generation, last_known_trans_id,
sync_id, 0)
self._received_docs = 0
- number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1)
+ number_of_changes, ngen, ntrans =\
+ self._insert_received_docs(docs, 1, 1)
if ngen:
new_generation = ngen
new_transaction_id = ntrans
# ---------------------------------------------------------------------
- # maybe receive the rest of the documents
- # ---------------------------------------------------------------------
-
- # launch many asynchronous fetches and inserts of received documents
- # in the temporary sync db. Will wait for all results before
- # continuing.
-
- received = 1
- deferreds = []
- while received < number_of_changes:
- d = self._receive_one_doc(
- last_known_generation,
- last_known_trans_id, sync_id, received)
- d.addCallback(
- self._insert_received_doc,
- received + 1, # the index of the current received doc
- number_of_changes)
- deferreds.append(d)
- received += 1
- results = yield defer.gatherResults(deferreds)
-
- # get generation and transaction id of target after insertions
- if deferreds:
- _, new_generation, new_transaction_id = results.pop()
-
- # ---------------------------------------------------------------------
# wait for async decryption to finish
# ---------------------------------------------------------------------
@@ -119,8 +95,8 @@ class HTTPDocFetcher(object):
defer.returnValue([new_generation, new_transaction_id])
- def _receive_one_doc(self, last_known_generation,
- last_known_trans_id, sync_id, received):
+ def _fetch_all(self, last_known_generation,
+ last_known_trans_id, sync_id, received):
# add remote replica metadata to the request
body = RequestBody(
last_known_generation=last_known_generation,
@@ -136,7 +112,7 @@ class HTTPDocFetcher(object):
body=str(body),
content_type='application/x-soledad-sync-get')
- def _insert_received_doc(self, response, idx, total):
+ def _insert_received_docs(self, response, idx, total):
"""
Insert a received document into the local replica.
@@ -147,47 +123,47 @@ class HTTPDocFetcher(object):
:param total: The total number of operations.
:type total: int
"""
- new_generation, new_transaction_id, number_of_changes, doc_id, \
- rev, content, gen, trans_id = \
+ new_generation, new_transaction_id, number_of_changes, entries =\
self._parse_received_doc_response(response)
if self._sync_decr_pool and not self._sync_decr_pool.running:
self._sync_decr_pool.start(number_of_changes)
- if doc_id is not None:
- # decrypt incoming document and insert into local database
- # -------------------------------------------------------------
- # symmetric decryption of document's contents
- # -------------------------------------------------------------
- # If arriving content was symmetrically encrypted, we decrypt it.
- # We do it inline if defer_decryption flag is False or no sync_db
- # was defined, otherwise we defer it writing it to the received
- # docs table.
- doc = SoledadDocument(doc_id, rev, content)
- if is_symmetrically_encrypted(doc):
- if self._queue_for_decrypt:
- self._sync_decr_pool.insert_encrypted_received_doc(
- doc.doc_id, doc.rev, doc.content, gen, trans_id,
- idx)
- else:
- # defer_decryption is False or no-sync-db fallback
- doc.set_json(self._crypto.decrypt_doc(doc))
- self._insert_doc_cb(doc, gen, trans_id)
- else:
- # not symmetrically encrypted doc, insert it directly
- # or save it in the decrypted stage.
- if self._queue_for_decrypt:
- self._sync_decr_pool.insert_received_doc(
- doc.doc_id, doc.rev, doc.content, gen, trans_id,
- idx)
+ for doc_id, rev, content, gen, trans_id in entries:
+ if doc_id is not None:
+ # decrypt incoming document and insert into local database
+ # ---------------------------------------------------------
+ # symmetric decryption of document's contents
+ # ---------------------------------------------------------
+ # If arriving content was symmetrically encrypted, we decrypt
+ # it. We do it inline if defer_decryption flag is False or no
+ # sync_db was defined, otherwise we defer it writing it to the
+ # received docs table.
+ doc = SoledadDocument(doc_id, rev, content)
+ if is_symmetrically_encrypted(doc):
+ if self._queue_for_decrypt:
+ self._sync_decr_pool.insert_encrypted_received_doc(
+ doc.doc_id, doc.rev, doc.content, gen, trans_id,
+ idx)
+ else:
+ # defer_decryption is False or no-sync-db fallback
+ doc.set_json(self._crypto.decrypt_doc(doc))
+ self._insert_doc_cb(doc, gen, trans_id)
else:
- self._insert_doc_cb(doc, gen, trans_id)
- # -------------------------------------------------------------
- # end of symmetric decryption
- # -------------------------------------------------------------
- self._received_docs += 1
- user_data = {'uuid': self.uuid, 'userid': self.userid}
- _emit_receive_status(user_data, self._received_docs, total)
+ # not symmetrically encrypted doc, insert it directly
+ # or save it in the decrypted stage.
+ if self._queue_for_decrypt:
+ self._sync_decr_pool.insert_received_doc(
+ doc.doc_id, doc.rev, doc.content, gen, trans_id,
+ idx)
+ else:
+ self._insert_doc_cb(doc, gen, trans_id)
+ # -------------------------------------------------------------
+ # end of symmetric decryption
+ # -------------------------------------------------------------
+ self._received_docs += 1
+ user_data = {'uuid': self.uuid, 'userid': self.userid}
+ _emit_receive_status(user_data, self._received_docs, total)
return number_of_changes, new_generation, new_transaction_id
def _parse_received_doc_response(self, response):
@@ -223,23 +199,17 @@ class HTTPDocFetcher(object):
if self._ensure_callback and 'replica_uid' in metadata:
self._ensure_callback(metadata['replica_uid'])
# parse incoming document info
- doc_id = None
- rev = None
- content = None
- gen = None
- trans_id = None
- if number_of_changes > 0:
+ entries = []
+ for data in data[1:]:
try:
- entry = json.loads(data[1])
- doc_id = entry['id']
- rev = entry['rev']
- content = entry['content']
- gen = entry['gen']
- trans_id = entry['trans_id']
+ line, comma = utils.check_and_strip_comma(data)
+ entry = json.loads(line)
+ entries.append((entry['id'], entry['rev'], entry['content'],
+ entry['gen'], entry['trans_id']))
except (IndexError, KeyError):
raise errors.BrokenSyncStream
return new_generation, new_transaction_id, number_of_changes, \
- doc_id, rev, content, gen, trans_id
+ entries
def _setup_sync_decr_pool(self):
"""