diff options
author | Victor Shyba <victor1984@riseup.net> | 2016-11-26 18:11:11 -0300 |
---|---|---|
committer | drebs <drebs@leap.se> | 2016-12-12 09:17:51 -0200 |
commit | d72e3763538d1156bcf72b643626c2111a5a02cf (patch) | |
tree | 72142e881d6a4a3162944d507d7adb27fd27296c /client/src/leap | |
parent | 42082cfa648ec10612823086e72dc2a70a0e773c (diff) |
[bug] make the semaphore cover all parsing
Unfortunately, if a doc finishes decryption before the previous one we
will still have an issue while inserting. This commits solves it by
adding the parse and decrypt inside of the semaphore.
Diffstat (limited to 'client/src/leap')
-rw-r--r-- | client/src/leap/soledad/client/http_target/fetch.py | 9 | ||||
-rw-r--r-- | client/src/leap/soledad/client/http_target/fetch_protocol.py | 10 |
2 files changed, 16 insertions, 3 deletions
diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 7d27c06d..85e2967d 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -108,6 +108,11 @@ class HTTPDocFetcher(object): :param total: The total number of operations. :type total: int """ + yield self.semaphore.run(self.__atomic_doc_parse, doc_info, content, + total) + + @defer.inlineCallbacks + def __atomic_doc_parse(self, doc_info, content, total): doc = SoledadDocument(doc_info['id'], doc_info['rev'], content) if is_symmetrically_encrypted(doc): content = yield self._crypto.decrypt_doc(doc) @@ -120,8 +125,8 @@ class HTTPDocFetcher(object): # from multiple threads is dangerous. We should bring the dbpool here # or find an alternative. Deferring to a thread only helps releasing # the reactor for other tasks as this is an IO intensive call. - yield self.semaphore.run(threads.deferToThread, self._insert_doc_cb, - doc, doc_info['gen'], doc_info['trans_id']) + yield threads.deferToThread(self._insert_doc_cb, + doc, doc_info['gen'], doc_info['trans_id']) self._received_docs += 1 user_data = {'uuid': self.uuid, 'userid': self.userid} _emit_receive_status(user_data, self._received_docs, total=total) diff --git a/client/src/leap/soledad/client/http_target/fetch_protocol.py b/client/src/leap/soledad/client/http_target/fetch_protocol.py index dd83c4f7..3322ec70 100644 --- a/client/src/leap/soledad/client/http_target/fetch_protocol.py +++ b/client/src/leap/soledad/client/http_target/fetch_protocol.py @@ -20,9 +20,12 @@ from cStringIO import StringIO from twisted.web._newclient import ResponseDone from leap.soledad.common.l2db import errors from leap.soledad.common.l2db.remote import utils +from leap.soledad.common.log import getLogger from .support import ReadBodyProtocol from .support import readBody +logger = getLogger(__name__) + class DocStreamReceiver(ReadBodyProtocol): """ @@ -120,8 +123,13 @@ class DocStreamReceiver(ReadBodyProtocol): if 'error' in self.current_doc: raise errors.BrokenSyncStream("Error from server: %s" % line) else: - self._doc_reader( + d = self._doc_reader( self.current_doc, line.strip() or None, self.total) + d.addErrback(self._error) + + def _error(self, reason): + logger.error(reason) + self.transport.loseConnection() def finish(self): """ |