summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Shyba <victor.shyba@gmail.com>2016-09-30 19:26:55 -0300
committerdrebs <drebs@leap.se>2016-12-12 09:12:00 -0200
commitefea94cf241afb845aabba0870b33566aaaeafbc (patch)
tree3d58dd286875d1d369cf7c9afa04e2cf860b4338
parent3a93b3d33e4e1c44397e3ad377b04d0b140a65bf (diff)
[bug] defer insertion to threads during download
Insertion is synchronous and blocks the reactor. That's a temporary solution as we used to have on decpool.
-rw-r--r--client/src/leap/soledad/client/http_target/fetch.py11
-rw-r--r--client/src/leap/soledad/client/http_target/fetch_protocol.py5
2 files changed, 11 insertions, 5 deletions
diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py
index 5356f872..dc7bbd2c 100644
--- a/client/src/leap/soledad/client/http_target/fetch.py
+++ b/client/src/leap/soledad/client/http_target/fetch.py
@@ -15,6 +15,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from twisted.internet import defer
+from twisted.internet import threads
from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
from leap.soledad.client.events import emit_async
@@ -51,6 +52,8 @@ class HTTPDocFetcher(object):
ensure_callback, sync_id):
new_generation = last_known_generation
new_transaction_id = last_known_trans_id
+ # Acts as a queue, ensuring line order on async processing
+ self.semaphore = defer.DeferredSemaphore(1)
# we fetch the first document before fetching the rest because we need
# to know the total number of documents to be received, and this
@@ -62,6 +65,7 @@ class HTTPDocFetcher(object):
sync_id, self._received_docs)
number_of_changes, ngen, ntrans =\
self._parse_metadata(metadata)
+ yield self.semaphore.acquire()
if ngen:
new_generation = ngen
@@ -112,7 +116,12 @@ class HTTPDocFetcher(object):
doc.set_json(content)
# TODO insert blobs here on the blob backend
- self._insert_doc_cb(doc, doc_info['gen'], doc_info['trans_id'])
+ # FIXME: This is wrong. Using a SQLite connection from multiple threads
+ # is dangerous. We should bring the dbpool here or find an alternative.
+ # Current fix only helps releasing the reactor for other tasks as this
+ # is an IO intensive call.
+ yield self.semaphore.run(threads.deferToThread, self._insert_doc_cb,
+ doc, doc_info['gen'], doc_info['trans_id'])
self._received_docs += 1
user_data = {'uuid': self.uuid, 'userid': self.userid}
_emit_receive_status(user_data, self._received_docs, total=1000000)
diff --git a/client/src/leap/soledad/client/http_target/fetch_protocol.py b/client/src/leap/soledad/client/http_target/fetch_protocol.py
index 6ecba2b0..4d45c9d4 100644
--- a/client/src/leap/soledad/client/http_target/fetch_protocol.py
+++ b/client/src/leap/soledad/client/http_target/fetch_protocol.py
@@ -107,6 +107,7 @@ class DocStreamReceiver(protocol.Protocol):
line, _ = utils.check_and_strip_comma(lines.pop(0))
try:
self.lineReceived(line)
+ self._line += 1
except AssertionError, e:
raise errors.BrokenSyncStream(e)
@@ -116,17 +117,13 @@ class DocStreamReceiver(protocol.Protocol):
self._properly_finished = True
elif self._line == 0:
assert line == '['
- self._line += 1
elif self._line == 1:
- self._line += 1
self.metadata = json.loads(line)
assert 'error' not in self.metadata
elif (self._line % 2) == 0:
- self._line += 1
self.current_doc = json.loads(line)
assert 'error' not in self.current_doc
else:
- self._line += 1
self._doc_reader(self.current_doc, line.strip() or None)
def finish(self):