summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Shyba <victor1984@riseup.net>2016-11-26 18:11:11 -0300
committerVictor Shyba <victor1984@riseup.net>2016-11-28 13:47:30 -0300
commitd374223c38862961c205139006639ed5b8878465 (patch)
tree5fe4babbb20bf27111fb19b16456180c4fc236ec
parent7006b88db2a5f6bca5b401800d8e14821371216a (diff)
[bug] make the semaphore cover all parsing
Unfortunately, if a doc finishes decryption before the previous one we will still have an issue while inserting. This commits solves it by adding the parse and decrypt inside of the semaphore.
-rw-r--r--client/src/leap/soledad/client/http_target/fetch.py9
-rw-r--r--client/src/leap/soledad/client/http_target/fetch_protocol.py10
-rw-r--r--testing/tests/sync/test_sync_target.py2
3 files changed, 17 insertions, 4 deletions
diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py
index 7d27c06d..85e2967d 100644
--- a/client/src/leap/soledad/client/http_target/fetch.py
+++ b/client/src/leap/soledad/client/http_target/fetch.py
@@ -108,6 +108,11 @@ class HTTPDocFetcher(object):
:param total: The total number of operations.
:type total: int
"""
+ yield self.semaphore.run(self.__atomic_doc_parse, doc_info, content,
+ total)
+
+ @defer.inlineCallbacks
+ def __atomic_doc_parse(self, doc_info, content, total):
doc = SoledadDocument(doc_info['id'], doc_info['rev'], content)
if is_symmetrically_encrypted(doc):
content = yield self._crypto.decrypt_doc(doc)
@@ -120,8 +125,8 @@ class HTTPDocFetcher(object):
# from multiple threads is dangerous. We should bring the dbpool here
# or find an alternative. Deferring to a thread only helps releasing
# the reactor for other tasks as this is an IO intensive call.
- yield self.semaphore.run(threads.deferToThread, self._insert_doc_cb,
- doc, doc_info['gen'], doc_info['trans_id'])
+ yield threads.deferToThread(self._insert_doc_cb,
+ doc, doc_info['gen'], doc_info['trans_id'])
self._received_docs += 1
user_data = {'uuid': self.uuid, 'userid': self.userid}
_emit_receive_status(user_data, self._received_docs, total=total)
diff --git a/client/src/leap/soledad/client/http_target/fetch_protocol.py b/client/src/leap/soledad/client/http_target/fetch_protocol.py
index dd83c4f7..3322ec70 100644
--- a/client/src/leap/soledad/client/http_target/fetch_protocol.py
+++ b/client/src/leap/soledad/client/http_target/fetch_protocol.py
@@ -20,9 +20,12 @@ from cStringIO import StringIO
from twisted.web._newclient import ResponseDone
from leap.soledad.common.l2db import errors
from leap.soledad.common.l2db.remote import utils
+from leap.soledad.common.log import getLogger
from .support import ReadBodyProtocol
from .support import readBody
+logger = getLogger(__name__)
+
class DocStreamReceiver(ReadBodyProtocol):
"""
@@ -120,8 +123,13 @@ class DocStreamReceiver(ReadBodyProtocol):
if 'error' in self.current_doc:
raise errors.BrokenSyncStream("Error from server: %s" % line)
else:
- self._doc_reader(
+ d = self._doc_reader(
self.current_doc, line.strip() or None, self.total)
+ d.addErrback(self._error)
+
+ def _error(self, reason):
+ logger.error(reason)
+ self.transport.loseConnection()
def finish(self):
"""
diff --git a/testing/tests/sync/test_sync_target.py b/testing/tests/sync/test_sync_target.py
index d02aba68..6ce9a5c5 100644
--- a/testing/tests/sync/test_sync_target.py
+++ b/testing/tests/sync/test_sync_target.py
@@ -63,7 +63,7 @@ class TestSoledadParseReceivedDocResponse(unittest.TestCase):
"""
def parse(self, stream):
- parser = DocStreamReceiver(None, None, lambda *_: 42)
+ parser = DocStreamReceiver(None, None, lambda *_: defer.succeed(42))
parser.dataReceived(stream)
parser.finish()