diff options
Diffstat (limited to 'client/src/leap/soledad/client/http_target/fetch.py')
-rw-r--r-- | client/src/leap/soledad/client/http_target/fetch.py | 28 |
1 files changed, 15 insertions, 13 deletions
diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 1b4351ea..036b5b21 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -53,19 +53,21 @@ class HTTPDocFetcher(object): ensure_callback, sync_id): new_generation = last_known_generation new_transaction_id = last_known_trans_id + self._received_docs = 0 # Acts as a queue, ensuring line order on async processing + # as `self._insert_doc_cb` cant be run concurrently or out of order. + # DeferredSemaphore solves the concurrency and its implementation uses + # a queue, solving the ordering. + # FIXME: Find a proper solution to avoid surprises on Twisted changes self.semaphore = defer.DeferredSemaphore(1) - # we fetch the first document before fetching the rest because we need - # to know the total number of documents to be received, and this - # information comes as metadata to each request. - - self._received_docs = 0 metadata = yield self._fetch_all( last_known_generation, last_known_trans_id, sync_id, self._received_docs) - number_of_changes, ngen, ntrans =\ - self._parse_metadata(metadata) + metadata = self._parse_metadata(metadata) + number_of_changes, ngen, ntrans = metadata + + # wait for pending inserts yield self.semaphore.acquire() if ngen: @@ -106,8 +108,8 @@ class HTTPDocFetcher(object): :param total: The total number of operations. :type total: int """ - # If arriving content was symmetrically encrypted, we decrypt - # decrypt incoming document and insert into local database + # If arriving content was symmetrically encrypted, we decrypt incoming + # document and insert into local database doc = SoledadDocument(doc_info['id'], doc_info['rev'], content) @@ -117,10 +119,10 @@ class HTTPDocFetcher(object): doc.set_json(content) # TODO insert blobs here on the blob backend - # FIXME: This is wrong. Using a SQLite connection from multiple threads - # is dangerous. We should bring the dbpool here or find an alternative. - # Current fix only helps releasing the reactor for other tasks as this - # is an IO intensive call. + # FIXME: This is wrong. Using the very same SQLite connection object + # from multiple threads is dangerous. We should bring the dbpool here + # or find an alternative. Deferring to a thread only helps releasing + # the reactor for other tasks as this is an IO intensive call. yield self.semaphore.run(threads.deferToThread, self._insert_doc_cb, doc, doc_info['gen'], doc_info['trans_id']) self._received_docs += 1 |