diff options
| -rw-r--r-- | client/src/leap/soledad/client/http_target/fetch.py | 9 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/http_target/fetch_protocol.py | 10 | ||||
| -rw-r--r-- | testing/tests/sync/test_sync_target.py | 2 | 
3 files changed, 17 insertions, 4 deletions
diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 7d27c06d..85e2967d 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -108,6 +108,11 @@ class HTTPDocFetcher(object):          :param total: The total number of operations.          :type total: int          """ +        yield self.semaphore.run(self.__atomic_doc_parse, doc_info, content, +                                 total) + +    @defer.inlineCallbacks +    def __atomic_doc_parse(self, doc_info, content, total):          doc = SoledadDocument(doc_info['id'], doc_info['rev'], content)          if is_symmetrically_encrypted(doc):              content = yield self._crypto.decrypt_doc(doc) @@ -120,8 +125,8 @@ class HTTPDocFetcher(object):          # from multiple threads is dangerous. We should bring the dbpool here          # or find an alternative.  Deferring to a thread only helps releasing          # the reactor for other tasks as this is an IO intensive call. -        yield self.semaphore.run(threads.deferToThread, self._insert_doc_cb, -                                 doc, doc_info['gen'], doc_info['trans_id']) +        yield threads.deferToThread(self._insert_doc_cb, +                                    doc, doc_info['gen'], doc_info['trans_id'])          self._received_docs += 1          user_data = {'uuid': self.uuid, 'userid': self.userid}          _emit_receive_status(user_data, self._received_docs, total=total) diff --git a/client/src/leap/soledad/client/http_target/fetch_protocol.py b/client/src/leap/soledad/client/http_target/fetch_protocol.py index dd83c4f7..3322ec70 100644 --- a/client/src/leap/soledad/client/http_target/fetch_protocol.py +++ b/client/src/leap/soledad/client/http_target/fetch_protocol.py @@ -20,9 +20,12 @@ from cStringIO import StringIO  from twisted.web._newclient import ResponseDone  from leap.soledad.common.l2db import errors  from leap.soledad.common.l2db.remote import utils +from leap.soledad.common.log import getLogger  from .support import ReadBodyProtocol  from .support import readBody +logger = getLogger(__name__) +  class DocStreamReceiver(ReadBodyProtocol):      """ @@ -120,8 +123,13 @@ class DocStreamReceiver(ReadBodyProtocol):              if 'error' in self.current_doc:                  raise errors.BrokenSyncStream("Error from server: %s" % line)          else: -            self._doc_reader( +            d = self._doc_reader(                  self.current_doc, line.strip() or None, self.total) +            d.addErrback(self._error) + +    def _error(self, reason): +        logger.error(reason) +        self.transport.loseConnection()      def finish(self):          """ diff --git a/testing/tests/sync/test_sync_target.py b/testing/tests/sync/test_sync_target.py index d02aba68..6ce9a5c5 100644 --- a/testing/tests/sync/test_sync_target.py +++ b/testing/tests/sync/test_sync_target.py @@ -63,7 +63,7 @@ class TestSoledadParseReceivedDocResponse(unittest.TestCase):      """      def parse(self, stream): -        parser = DocStreamReceiver(None, None, lambda *_: 42) +        parser = DocStreamReceiver(None, None, lambda *_: defer.succeed(42))          parser.dataReceived(stream)          parser.finish()  | 
