diff options
author | Victor Shyba <victor1984@riseup.net> | 2016-11-26 18:11:11 -0300 |
---|---|---|
committer | Victor Shyba <victor1984@riseup.net> | 2016-11-28 13:47:30 -0300 |
commit | d374223c38862961c205139006639ed5b8878465 (patch) | |
tree | 5fe4babbb20bf27111fb19b16456180c4fc236ec | |
parent | 7006b88db2a5f6bca5b401800d8e14821371216a (diff) |
[bug] make the semaphore cover all parsing
Unfortunately, if a doc finishes decryption before the previous one we
will still have an issue while inserting. This commits solves it by
adding the parse and decrypt inside of the semaphore.
-rw-r--r-- | client/src/leap/soledad/client/http_target/fetch.py | 9 | ||||
-rw-r--r-- | client/src/leap/soledad/client/http_target/fetch_protocol.py | 10 | ||||
-rw-r--r-- | testing/tests/sync/test_sync_target.py | 2 |
3 files changed, 17 insertions, 4 deletions
diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 7d27c06d..85e2967d 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -108,6 +108,11 @@ class HTTPDocFetcher(object): :param total: The total number of operations. :type total: int """ + yield self.semaphore.run(self.__atomic_doc_parse, doc_info, content, + total) + + @defer.inlineCallbacks + def __atomic_doc_parse(self, doc_info, content, total): doc = SoledadDocument(doc_info['id'], doc_info['rev'], content) if is_symmetrically_encrypted(doc): content = yield self._crypto.decrypt_doc(doc) @@ -120,8 +125,8 @@ class HTTPDocFetcher(object): # from multiple threads is dangerous. We should bring the dbpool here # or find an alternative. Deferring to a thread only helps releasing # the reactor for other tasks as this is an IO intensive call. - yield self.semaphore.run(threads.deferToThread, self._insert_doc_cb, - doc, doc_info['gen'], doc_info['trans_id']) + yield threads.deferToThread(self._insert_doc_cb, + doc, doc_info['gen'], doc_info['trans_id']) self._received_docs += 1 user_data = {'uuid': self.uuid, 'userid': self.userid} _emit_receive_status(user_data, self._received_docs, total=total) diff --git a/client/src/leap/soledad/client/http_target/fetch_protocol.py b/client/src/leap/soledad/client/http_target/fetch_protocol.py index dd83c4f7..3322ec70 100644 --- a/client/src/leap/soledad/client/http_target/fetch_protocol.py +++ b/client/src/leap/soledad/client/http_target/fetch_protocol.py @@ -20,9 +20,12 @@ from cStringIO import StringIO from twisted.web._newclient import ResponseDone from leap.soledad.common.l2db import errors from leap.soledad.common.l2db.remote import utils +from leap.soledad.common.log import getLogger from .support import ReadBodyProtocol from .support import readBody +logger = getLogger(__name__) + class DocStreamReceiver(ReadBodyProtocol): """ @@ -120,8 +123,13 @@ class DocStreamReceiver(ReadBodyProtocol): if 'error' in self.current_doc: raise errors.BrokenSyncStream("Error from server: %s" % line) else: - self._doc_reader( + d = self._doc_reader( self.current_doc, line.strip() or None, self.total) + d.addErrback(self._error) + + def _error(self, reason): + logger.error(reason) + self.transport.loseConnection() def finish(self): """ diff --git a/testing/tests/sync/test_sync_target.py b/testing/tests/sync/test_sync_target.py index d02aba68..6ce9a5c5 100644 --- a/testing/tests/sync/test_sync_target.py +++ b/testing/tests/sync/test_sync_target.py @@ -63,7 +63,7 @@ class TestSoledadParseReceivedDocResponse(unittest.TestCase): """ def parse(self, stream): - parser = DocStreamReceiver(None, None, lambda *_: 42) + parser = DocStreamReceiver(None, None, lambda *_: defer.succeed(42)) parser.dataReceived(stream) parser.finish() |