From d374223c38862961c205139006639ed5b8878465 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 26 Nov 2016 18:11:11 -0300 Subject: [bug] make the semaphore cover all parsing Unfortunately, if a doc finishes decryption before the previous one we will still have an issue while inserting. This commits solves it by adding the parse and decrypt inside of the semaphore. --- client/src/leap/soledad/client/http_target/fetch.py | 9 +++++++-- client/src/leap/soledad/client/http_target/fetch_protocol.py | 10 +++++++++- testing/tests/sync/test_sync_target.py | 2 +- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 7d27c06d..85e2967d 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -108,6 +108,11 @@ class HTTPDocFetcher(object): :param total: The total number of operations. :type total: int """ + yield self.semaphore.run(self.__atomic_doc_parse, doc_info, content, + total) + + @defer.inlineCallbacks + def __atomic_doc_parse(self, doc_info, content, total): doc = SoledadDocument(doc_info['id'], doc_info['rev'], content) if is_symmetrically_encrypted(doc): content = yield self._crypto.decrypt_doc(doc) @@ -120,8 +125,8 @@ class HTTPDocFetcher(object): # from multiple threads is dangerous. We should bring the dbpool here # or find an alternative. Deferring to a thread only helps releasing # the reactor for other tasks as this is an IO intensive call. - yield self.semaphore.run(threads.deferToThread, self._insert_doc_cb, - doc, doc_info['gen'], doc_info['trans_id']) + yield threads.deferToThread(self._insert_doc_cb, + doc, doc_info['gen'], doc_info['trans_id']) self._received_docs += 1 user_data = {'uuid': self.uuid, 'userid': self.userid} _emit_receive_status(user_data, self._received_docs, total=total) diff --git a/client/src/leap/soledad/client/http_target/fetch_protocol.py b/client/src/leap/soledad/client/http_target/fetch_protocol.py index dd83c4f7..3322ec70 100644 --- a/client/src/leap/soledad/client/http_target/fetch_protocol.py +++ b/client/src/leap/soledad/client/http_target/fetch_protocol.py @@ -20,9 +20,12 @@ from cStringIO import StringIO from twisted.web._newclient import ResponseDone from leap.soledad.common.l2db import errors from leap.soledad.common.l2db.remote import utils +from leap.soledad.common.log import getLogger from .support import ReadBodyProtocol from .support import readBody +logger = getLogger(__name__) + class DocStreamReceiver(ReadBodyProtocol): """ @@ -120,8 +123,13 @@ class DocStreamReceiver(ReadBodyProtocol): if 'error' in self.current_doc: raise errors.BrokenSyncStream("Error from server: %s" % line) else: - self._doc_reader( + d = self._doc_reader( self.current_doc, line.strip() or None, self.total) + d.addErrback(self._error) + + def _error(self, reason): + logger.error(reason) + self.transport.loseConnection() def finish(self): """ diff --git a/testing/tests/sync/test_sync_target.py b/testing/tests/sync/test_sync_target.py index d02aba68..6ce9a5c5 100644 --- a/testing/tests/sync/test_sync_target.py +++ b/testing/tests/sync/test_sync_target.py @@ -63,7 +63,7 @@ class TestSoledadParseReceivedDocResponse(unittest.TestCase): """ def parse(self, stream): - parser = DocStreamReceiver(None, None, lambda *_: 42) + parser = DocStreamReceiver(None, None, lambda *_: defer.succeed(42)) parser.dataReceived(stream) parser.finish() -- cgit v1.2.3