summaryrefslogtreecommitdiff
path: root/client/src/leap
diff options
context:
space:
mode:
authorVictor Shyba <victor1984@riseup.net>2016-11-26 18:11:11 -0300
committerdrebs <drebs@leap.se>2016-12-12 09:17:51 -0200
commitd72e3763538d1156bcf72b643626c2111a5a02cf (patch)
tree72142e881d6a4a3162944d507d7adb27fd27296c /client/src/leap
parent42082cfa648ec10612823086e72dc2a70a0e773c (diff)
[bug] make the semaphore cover all parsing
Unfortunately, if a doc finishes decryption before the previous one we will still have an issue while inserting. This commits solves it by adding the parse and decrypt inside of the semaphore.
Diffstat (limited to 'client/src/leap')
-rw-r--r--client/src/leap/soledad/client/http_target/fetch.py9
-rw-r--r--client/src/leap/soledad/client/http_target/fetch_protocol.py10
2 files changed, 16 insertions, 3 deletions
diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py
index 7d27c06d..85e2967d 100644
--- a/client/src/leap/soledad/client/http_target/fetch.py
+++ b/client/src/leap/soledad/client/http_target/fetch.py
@@ -108,6 +108,11 @@ class HTTPDocFetcher(object):
:param total: The total number of operations.
:type total: int
"""
+ yield self.semaphore.run(self.__atomic_doc_parse, doc_info, content,
+ total)
+
+ @defer.inlineCallbacks
+ def __atomic_doc_parse(self, doc_info, content, total):
doc = SoledadDocument(doc_info['id'], doc_info['rev'], content)
if is_symmetrically_encrypted(doc):
content = yield self._crypto.decrypt_doc(doc)
@@ -120,8 +125,8 @@ class HTTPDocFetcher(object):
# from multiple threads is dangerous. We should bring the dbpool here
# or find an alternative. Deferring to a thread only helps releasing
# the reactor for other tasks as this is an IO intensive call.
- yield self.semaphore.run(threads.deferToThread, self._insert_doc_cb,
- doc, doc_info['gen'], doc_info['trans_id'])
+ yield threads.deferToThread(self._insert_doc_cb,
+ doc, doc_info['gen'], doc_info['trans_id'])
self._received_docs += 1
user_data = {'uuid': self.uuid, 'userid': self.userid}
_emit_receive_status(user_data, self._received_docs, total=total)
diff --git a/client/src/leap/soledad/client/http_target/fetch_protocol.py b/client/src/leap/soledad/client/http_target/fetch_protocol.py
index dd83c4f7..3322ec70 100644
--- a/client/src/leap/soledad/client/http_target/fetch_protocol.py
+++ b/client/src/leap/soledad/client/http_target/fetch_protocol.py
@@ -20,9 +20,12 @@ from cStringIO import StringIO
from twisted.web._newclient import ResponseDone
from leap.soledad.common.l2db import errors
from leap.soledad.common.l2db.remote import utils
+from leap.soledad.common.log import getLogger
from .support import ReadBodyProtocol
from .support import readBody
+logger = getLogger(__name__)
+
class DocStreamReceiver(ReadBodyProtocol):
"""
@@ -120,8 +123,13 @@ class DocStreamReceiver(ReadBodyProtocol):
if 'error' in self.current_doc:
raise errors.BrokenSyncStream("Error from server: %s" % line)
else:
- self._doc_reader(
+ d = self._doc_reader(
self.current_doc, line.strip() or None, self.total)
+ d.addErrback(self._error)
+
+ def _error(self, reason):
+ logger.error(reason)
+ self.transport.loseConnection()
def finish(self):
"""