From 1e3de25ce10156655bcb1bc879f5340baa889ead Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 16 Sep 2016 04:37:20 -0300 Subject: [bug] disable decpool Temporary fix for server streaming --- .../src/leap/soledad/client/http_target/fetch.py | 128 ++++++++------------- 1 file changed, 49 insertions(+), 79 deletions(-) (limited to 'client/src/leap/soledad') diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 184c5883..0fb5040f 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -51,6 +51,7 @@ class HTTPDocFetcher(object): @defer.inlineCallbacks def _receive_docs(self, last_known_generation, last_known_trans_id, ensure_callback, sync_id, defer_decryption): + defer_decryption = False self._queue_for_decrypt = defer_decryption \ and self._sync_db is not None @@ -73,42 +74,17 @@ class HTTPDocFetcher(object): # to know the total number of documents to be received, and this # information comes as metadata to each request. - doc = yield self._receive_one_doc( + docs = yield self._fetch_all( last_known_generation, last_known_trans_id, sync_id, 0) self._received_docs = 0 - number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1) + number_of_changes, ngen, ntrans =\ + self._insert_received_docs(docs, 1, 1) if ngen: new_generation = ngen new_transaction_id = ntrans - # --------------------------------------------------------------------- - # maybe receive the rest of the documents - # --------------------------------------------------------------------- - - # launch many asynchronous fetches and inserts of received documents - # in the temporary sync db. Will wait for all results before - # continuing. - - received = 1 - deferreds = [] - while received < number_of_changes: - d = self._receive_one_doc( - last_known_generation, - last_known_trans_id, sync_id, received) - d.addCallback( - self._insert_received_doc, - received + 1, # the index of the current received doc - number_of_changes) - deferreds.append(d) - received += 1 - results = yield defer.gatherResults(deferreds) - - # get generation and transaction id of target after insertions - if deferreds: - _, new_generation, new_transaction_id = results.pop() - # --------------------------------------------------------------------- # wait for async decryption to finish # --------------------------------------------------------------------- @@ -119,8 +95,8 @@ class HTTPDocFetcher(object): defer.returnValue([new_generation, new_transaction_id]) - def _receive_one_doc(self, last_known_generation, - last_known_trans_id, sync_id, received): + def _fetch_all(self, last_known_generation, + last_known_trans_id, sync_id, received): # add remote replica metadata to the request body = RequestBody( last_known_generation=last_known_generation, @@ -136,7 +112,7 @@ class HTTPDocFetcher(object): body=str(body), content_type='application/x-soledad-sync-get') - def _insert_received_doc(self, response, idx, total): + def _insert_received_docs(self, response, idx, total): """ Insert a received document into the local replica. @@ -147,47 +123,47 @@ class HTTPDocFetcher(object): :param total: The total number of operations. :type total: int """ - new_generation, new_transaction_id, number_of_changes, doc_id, \ - rev, content, gen, trans_id = \ + new_generation, new_transaction_id, number_of_changes, entries =\ self._parse_received_doc_response(response) if self._sync_decr_pool and not self._sync_decr_pool.running: self._sync_decr_pool.start(number_of_changes) - if doc_id is not None: - # decrypt incoming document and insert into local database - # ------------------------------------------------------------- - # symmetric decryption of document's contents - # ------------------------------------------------------------- - # If arriving content was symmetrically encrypted, we decrypt it. - # We do it inline if defer_decryption flag is False or no sync_db - # was defined, otherwise we defer it writing it to the received - # docs table. - doc = SoledadDocument(doc_id, rev, content) - if is_symmetrically_encrypted(doc): - if self._queue_for_decrypt: - self._sync_decr_pool.insert_encrypted_received_doc( - doc.doc_id, doc.rev, doc.content, gen, trans_id, - idx) - else: - # defer_decryption is False or no-sync-db fallback - doc.set_json(self._crypto.decrypt_doc(doc)) - self._insert_doc_cb(doc, gen, trans_id) - else: - # not symmetrically encrypted doc, insert it directly - # or save it in the decrypted stage. - if self._queue_for_decrypt: - self._sync_decr_pool.insert_received_doc( - doc.doc_id, doc.rev, doc.content, gen, trans_id, - idx) + for doc_id, rev, content, gen, trans_id in entries: + if doc_id is not None: + # decrypt incoming document and insert into local database + # --------------------------------------------------------- + # symmetric decryption of document's contents + # --------------------------------------------------------- + # If arriving content was symmetrically encrypted, we decrypt + # it. We do it inline if defer_decryption flag is False or no + # sync_db was defined, otherwise we defer it writing it to the + # received docs table. + doc = SoledadDocument(doc_id, rev, content) + if is_symmetrically_encrypted(doc): + if self._queue_for_decrypt: + self._sync_decr_pool.insert_encrypted_received_doc( + doc.doc_id, doc.rev, doc.content, gen, trans_id, + idx) + else: + # defer_decryption is False or no-sync-db fallback + doc.set_json(self._crypto.decrypt_doc(doc)) + self._insert_doc_cb(doc, gen, trans_id) else: - self._insert_doc_cb(doc, gen, trans_id) - # ------------------------------------------------------------- - # end of symmetric decryption - # ------------------------------------------------------------- - self._received_docs += 1 - user_data = {'uuid': self.uuid, 'userid': self.userid} - _emit_receive_status(user_data, self._received_docs, total) + # not symmetrically encrypted doc, insert it directly + # or save it in the decrypted stage. + if self._queue_for_decrypt: + self._sync_decr_pool.insert_received_doc( + doc.doc_id, doc.rev, doc.content, gen, trans_id, + idx) + else: + self._insert_doc_cb(doc, gen, trans_id) + # ------------------------------------------------------------- + # end of symmetric decryption + # ------------------------------------------------------------- + self._received_docs += 1 + user_data = {'uuid': self.uuid, 'userid': self.userid} + _emit_receive_status(user_data, self._received_docs, total) return number_of_changes, new_generation, new_transaction_id def _parse_received_doc_response(self, response): @@ -223,23 +199,17 @@ class HTTPDocFetcher(object): if self._ensure_callback and 'replica_uid' in metadata: self._ensure_callback(metadata['replica_uid']) # parse incoming document info - doc_id = None - rev = None - content = None - gen = None - trans_id = None - if number_of_changes > 0: + entries = [] + for data in data[1:]: try: - entry = json.loads(data[1]) - doc_id = entry['id'] - rev = entry['rev'] - content = entry['content'] - gen = entry['gen'] - trans_id = entry['trans_id'] + line, comma = utils.check_and_strip_comma(data) + entry = json.loads(line) + entries.append((entry['id'], entry['rev'], entry['content'], + entry['gen'], entry['trans_id'])) except (IndexError, KeyError): raise errors.BrokenSyncStream return new_generation, new_transaction_id, number_of_changes, \ - doc_id, rev, content, gen, trans_id + entries def _setup_sync_decr_pool(self): """ -- cgit v1.2.3