summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/http_target/fetch.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/http_target/fetch.py')
-rw-r--r--client/src/leap/soledad/client/http_target/fetch.py51
1 files changed, 22 insertions, 29 deletions
diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py
index 53650de4..7d27c06d 100644
--- a/client/src/leap/soledad/client/http_target/fetch.py
+++ b/client/src/leap/soledad/client/http_target/fetch.py
@@ -54,7 +54,6 @@ class HTTPDocFetcher(object):
ensure_callback, sync_id):
new_generation = last_known_generation
new_transaction_id = last_known_trans_id
- self._received_docs = 0
# Acts as a queue, ensuring line order on async processing
# as `self._insert_doc_cb` cant be run concurrently or out of order.
# DeferredSemaphore solves the concurrency and its implementation uses
@@ -64,9 +63,8 @@ class HTTPDocFetcher(object):
metadata = yield self._fetch_all(
last_known_generation, last_known_trans_id,
- sync_id, self._received_docs)
- metadata = self._parse_metadata(metadata)
- number_of_changes, ngen, ntrans = metadata
+ sync_id)
+ number_of_changes, ngen, ntrans = self._parse_metadata(metadata)
# wait for pending inserts
yield self.semaphore.acquire()
@@ -78,16 +76,15 @@ class HTTPDocFetcher(object):
defer.returnValue([new_generation, new_transaction_id])
def _fetch_all(self, last_known_generation,
- last_known_trans_id, sync_id, received):
+ last_known_trans_id, sync_id):
# add remote replica metadata to the request
body = RequestBody(
last_known_generation=last_known_generation,
last_known_trans_id=last_known_trans_id,
sync_id=sync_id,
ensure=self._ensure_callback is not None)
- # inform server of how many documents have already been received
- body.insert_info(received=received)
- # build a stream reader with doc parser callback
+ self._received_docs = 0
+ # build a stream reader with _doc_parser as a callback
body_reader = fetch_protocol.build_body_reader(self._doc_parser)
# start download stream
return self._http_request(
@@ -100,18 +97,17 @@ class HTTPDocFetcher(object):
@defer.inlineCallbacks
def _doc_parser(self, doc_info, content, total):
"""
- Insert a received document into the local replica.
-
- :param response: The body and headers of the response.
- :type response: tuple(str, dict)
- :param idx: The index count of the current operation.
- :type idx: int
+ Insert a received document into the local replica, decrypting
+ if necessary. The case where it's not decrypted is when a doc gets
+ inserted from Server side with a GPG encrypted content.
+
+ :param doc_info: Dictionary representing Document information.
+ :type doc_info: dict
+ :param content: The Document's content.
+ :type idx: str
:param total: The total number of operations.
:type total: int
"""
- # If arriving content was symmetrically encrypted, we decrypt incoming
- # document and insert into local database
-
doc = SoledadDocument(doc_info['id'], doc_info['rev'], content)
if is_symmetrically_encrypted(doc):
content = yield self._crypto.decrypt_doc(doc)
@@ -132,26 +128,23 @@ class HTTPDocFetcher(object):
def _parse_metadata(self, metadata):
"""
- Parse the response from the server containing the received document.
+ Parse the response from the server containing the sync metadata.
- :param response: The body and headers of the response.
- :type response: tuple(str, dict)
+ :param response: Metadata as string
+ :type response: str
- :return: (new_gen, new_trans_id, number_of_changes, doc_id, rev,
- content, gen, trans_id)
+ :return: (number_of_changes, new_gen, new_trans_id)
:rtype: tuple
"""
try:
metadata = json.loads(metadata)
- new_generation = metadata['new_generation']
- new_transaction_id = metadata['new_transaction_id']
- number_of_changes = metadata['number_of_changes']
+ # make sure we have replica_uid from fresh new dbs
+ if self._ensure_callback and 'replica_uid' in metadata:
+ self._ensure_callback(metadata['replica_uid'])
+ return (metadata['number_of_changes'], metadata['new_generation'],
+ metadata['new_transaction_id'])
except (ValueError, KeyError):
raise errors.BrokenSyncStream
- # make sure we have replica_uid from fresh new dbs
- if self._ensure_callback and 'replica_uid' in metadata:
- self._ensure_callback(metadata['replica_uid'])
- return number_of_changes, new_generation, new_transaction_id
def _emit_receive_status(user_data, received_docs, total):