diff options
| author | Victor Shyba <victor1984@riseup.net> | 2016-11-26 18:11:11 -0300 | 
|---|---|---|
| committer | drebs <drebs@leap.se> | 2016-12-12 09:17:51 -0200 | 
| commit | d72e3763538d1156bcf72b643626c2111a5a02cf (patch) | |
| tree | 72142e881d6a4a3162944d507d7adb27fd27296c /client/src | |
| parent | 42082cfa648ec10612823086e72dc2a70a0e773c (diff) | |
[bug] make the semaphore cover all parsing
Unfortunately, if a doc finishes decryption before the previous one we
will still have an issue while inserting. This commits solves it by
adding the parse and decrypt inside of the semaphore.
Diffstat (limited to 'client/src')
| -rw-r--r-- | client/src/leap/soledad/client/http_target/fetch.py | 9 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/http_target/fetch_protocol.py | 10 | 
2 files changed, 16 insertions, 3 deletions
diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 7d27c06d..85e2967d 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -108,6 +108,11 @@ class HTTPDocFetcher(object):          :param total: The total number of operations.          :type total: int          """ +        yield self.semaphore.run(self.__atomic_doc_parse, doc_info, content, +                                 total) + +    @defer.inlineCallbacks +    def __atomic_doc_parse(self, doc_info, content, total):          doc = SoledadDocument(doc_info['id'], doc_info['rev'], content)          if is_symmetrically_encrypted(doc):              content = yield self._crypto.decrypt_doc(doc) @@ -120,8 +125,8 @@ class HTTPDocFetcher(object):          # from multiple threads is dangerous. We should bring the dbpool here          # or find an alternative.  Deferring to a thread only helps releasing          # the reactor for other tasks as this is an IO intensive call. -        yield self.semaphore.run(threads.deferToThread, self._insert_doc_cb, -                                 doc, doc_info['gen'], doc_info['trans_id']) +        yield threads.deferToThread(self._insert_doc_cb, +                                    doc, doc_info['gen'], doc_info['trans_id'])          self._received_docs += 1          user_data = {'uuid': self.uuid, 'userid': self.userid}          _emit_receive_status(user_data, self._received_docs, total=total) diff --git a/client/src/leap/soledad/client/http_target/fetch_protocol.py b/client/src/leap/soledad/client/http_target/fetch_protocol.py index dd83c4f7..3322ec70 100644 --- a/client/src/leap/soledad/client/http_target/fetch_protocol.py +++ b/client/src/leap/soledad/client/http_target/fetch_protocol.py @@ -20,9 +20,12 @@ from cStringIO import StringIO  from twisted.web._newclient import ResponseDone  from leap.soledad.common.l2db import errors  from leap.soledad.common.l2db.remote import utils +from leap.soledad.common.log import getLogger  from .support import ReadBodyProtocol  from .support import readBody +logger = getLogger(__name__) +  class DocStreamReceiver(ReadBodyProtocol):      """ @@ -120,8 +123,13 @@ class DocStreamReceiver(ReadBodyProtocol):              if 'error' in self.current_doc:                  raise errors.BrokenSyncStream("Error from server: %s" % line)          else: -            self._doc_reader( +            d = self._doc_reader(                  self.current_doc, line.strip() or None, self.total) +            d.addErrback(self._error) + +    def _error(self, reason): +        logger.error(reason) +        self.transport.loseConnection()      def finish(self):          """  | 
