From efea94cf241afb845aabba0870b33566aaaeafbc Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 30 Sep 2016 19:26:55 -0300 Subject: [bug] defer insertion to threads during download Insertion is synchronous and blocks the reactor. That's a temporary solution as we used to have on decpool. --- client/src/leap/soledad/client/http_target/fetch.py | 11 ++++++++++- client/src/leap/soledad/client/http_target/fetch_protocol.py | 5 +---- 2 files changed, 11 insertions(+), 5 deletions(-) (limited to 'client') diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 5356f872..dc7bbd2c 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -15,6 +15,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . from twisted.internet import defer +from twisted.internet import threads from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS from leap.soledad.client.events import emit_async @@ -51,6 +52,8 @@ class HTTPDocFetcher(object): ensure_callback, sync_id): new_generation = last_known_generation new_transaction_id = last_known_trans_id + # Acts as a queue, ensuring line order on async processing + self.semaphore = defer.DeferredSemaphore(1) # we fetch the first document before fetching the rest because we need # to know the total number of documents to be received, and this @@ -62,6 +65,7 @@ class HTTPDocFetcher(object): sync_id, self._received_docs) number_of_changes, ngen, ntrans =\ self._parse_metadata(metadata) + yield self.semaphore.acquire() if ngen: new_generation = ngen @@ -112,7 +116,12 @@ class HTTPDocFetcher(object): doc.set_json(content) # TODO insert blobs here on the blob backend - self._insert_doc_cb(doc, doc_info['gen'], doc_info['trans_id']) + # FIXME: This is wrong. Using a SQLite connection from multiple threads + # is dangerous. We should bring the dbpool here or find an alternative. + # Current fix only helps releasing the reactor for other tasks as this + # is an IO intensive call. + yield self.semaphore.run(threads.deferToThread, self._insert_doc_cb, + doc, doc_info['gen'], doc_info['trans_id']) self._received_docs += 1 user_data = {'uuid': self.uuid, 'userid': self.userid} _emit_receive_status(user_data, self._received_docs, total=1000000) diff --git a/client/src/leap/soledad/client/http_target/fetch_protocol.py b/client/src/leap/soledad/client/http_target/fetch_protocol.py index 6ecba2b0..4d45c9d4 100644 --- a/client/src/leap/soledad/client/http_target/fetch_protocol.py +++ b/client/src/leap/soledad/client/http_target/fetch_protocol.py @@ -107,6 +107,7 @@ class DocStreamReceiver(protocol.Protocol): line, _ = utils.check_and_strip_comma(lines.pop(0)) try: self.lineReceived(line) + self._line += 1 except AssertionError, e: raise errors.BrokenSyncStream(e) @@ -116,17 +117,13 @@ class DocStreamReceiver(protocol.Protocol): self._properly_finished = True elif self._line == 0: assert line == '[' - self._line += 1 elif self._line == 1: - self._line += 1 self.metadata = json.loads(line) assert 'error' not in self.metadata elif (self._line % 2) == 0: - self._line += 1 self.current_doc = json.loads(line) assert 'error' not in self.current_doc else: - self._line += 1 self._doc_reader(self.current_doc, line.strip() or None) def finish(self): -- cgit v1.2.3