From efea94cf241afb845aabba0870b33566aaaeafbc Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Fri, 30 Sep 2016 19:26:55 -0300
Subject: [bug] defer insertion to threads during download

Insertion is synchronous and blocks the reactor. That's a temporary
solution as we used to have on decpool.
---
 client/src/leap/soledad/client/http_target/fetch.py          | 11 ++++++++++-
 client/src/leap/soledad/client/http_target/fetch_protocol.py |  5 +----
 2 files changed, 11 insertions(+), 5 deletions(-)

diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py
index 5356f872..dc7bbd2c 100644
--- a/client/src/leap/soledad/client/http_target/fetch.py
+++ b/client/src/leap/soledad/client/http_target/fetch.py
@@ -15,6 +15,7 @@
 # You should have received a copy of the GNU General Public License
 # along with this program. If not, see <http://www.gnu.org/licenses/>.
 from twisted.internet import defer
+from twisted.internet import threads
 
 from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
 from leap.soledad.client.events import emit_async
@@ -51,6 +52,8 @@ class HTTPDocFetcher(object):
                       ensure_callback, sync_id):
         new_generation = last_known_generation
         new_transaction_id = last_known_trans_id
+        # Acts as a queue, ensuring line order on async processing
+        self.semaphore = defer.DeferredSemaphore(1)
 
         # we fetch the first document before fetching the rest because we need
         # to know the total number of documents to be received, and this
@@ -62,6 +65,7 @@ class HTTPDocFetcher(object):
             sync_id, self._received_docs)
         number_of_changes, ngen, ntrans =\
             self._parse_metadata(metadata)
+        yield self.semaphore.acquire()
 
         if ngen:
             new_generation = ngen
@@ -112,7 +116,12 @@ class HTTPDocFetcher(object):
         doc.set_json(content)
 
         # TODO insert blobs here on the blob backend
-        self._insert_doc_cb(doc, doc_info['gen'], doc_info['trans_id'])
+        # FIXME: This is wrong. Using a SQLite connection from multiple threads
+        # is dangerous. We should bring the dbpool here or find an alternative.
+        # Current fix only helps releasing the reactor for other tasks as this
+        # is an IO intensive call.
+        yield self.semaphore.run(threads.deferToThread, self._insert_doc_cb,
+                                 doc, doc_info['gen'], doc_info['trans_id'])
         self._received_docs += 1
         user_data = {'uuid': self.uuid, 'userid': self.userid}
         _emit_receive_status(user_data, self._received_docs, total=1000000)
diff --git a/client/src/leap/soledad/client/http_target/fetch_protocol.py b/client/src/leap/soledad/client/http_target/fetch_protocol.py
index 6ecba2b0..4d45c9d4 100644
--- a/client/src/leap/soledad/client/http_target/fetch_protocol.py
+++ b/client/src/leap/soledad/client/http_target/fetch_protocol.py
@@ -107,6 +107,7 @@ class DocStreamReceiver(protocol.Protocol):
             line, _ = utils.check_and_strip_comma(lines.pop(0))
             try:
                 self.lineReceived(line)
+                self._line += 1
             except AssertionError, e:
                 raise errors.BrokenSyncStream(e)
 
@@ -116,17 +117,13 @@ class DocStreamReceiver(protocol.Protocol):
             self._properly_finished = True
         elif self._line == 0:
             assert line == '['
-            self._line += 1
         elif self._line == 1:
-            self._line += 1
             self.metadata = json.loads(line)
             assert 'error' not in self.metadata
         elif (self._line % 2) == 0:
-            self._line += 1
             self.current_doc = json.loads(line)
             assert 'error' not in self.current_doc
         else:
-            self._line += 1
             self._doc_reader(self.current_doc, line.strip() or None)
 
     def finish(self):
-- 
cgit v1.2.3