diff options
| -rw-r--r-- | client/src/leap/soledad/client/http_target/fetch.py | 11 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/http_target/fetch_protocol.py | 5 | 
2 files changed, 11 insertions, 5 deletions
| diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 5356f872..dc7bbd2c 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -15,6 +15,7 @@  # You should have received a copy of the GNU General Public License  # along with this program. If not, see <http://www.gnu.org/licenses/>.  from twisted.internet import defer +from twisted.internet import threads  from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS  from leap.soledad.client.events import emit_async @@ -51,6 +52,8 @@ class HTTPDocFetcher(object):                        ensure_callback, sync_id):          new_generation = last_known_generation          new_transaction_id = last_known_trans_id +        # Acts as a queue, ensuring line order on async processing +        self.semaphore = defer.DeferredSemaphore(1)          # we fetch the first document before fetching the rest because we need          # to know the total number of documents to be received, and this @@ -62,6 +65,7 @@ class HTTPDocFetcher(object):              sync_id, self._received_docs)          number_of_changes, ngen, ntrans =\              self._parse_metadata(metadata) +        yield self.semaphore.acquire()          if ngen:              new_generation = ngen @@ -112,7 +116,12 @@ class HTTPDocFetcher(object):          doc.set_json(content)          # TODO insert blobs here on the blob backend -        self._insert_doc_cb(doc, doc_info['gen'], doc_info['trans_id']) +        # FIXME: This is wrong. Using a SQLite connection from multiple threads +        # is dangerous. We should bring the dbpool here or find an alternative. +        # Current fix only helps releasing the reactor for other tasks as this +        # is an IO intensive call. +        yield self.semaphore.run(threads.deferToThread, self._insert_doc_cb, +                                 doc, doc_info['gen'], doc_info['trans_id'])          self._received_docs += 1          user_data = {'uuid': self.uuid, 'userid': self.userid}          _emit_receive_status(user_data, self._received_docs, total=1000000) diff --git a/client/src/leap/soledad/client/http_target/fetch_protocol.py b/client/src/leap/soledad/client/http_target/fetch_protocol.py index 6ecba2b0..4d45c9d4 100644 --- a/client/src/leap/soledad/client/http_target/fetch_protocol.py +++ b/client/src/leap/soledad/client/http_target/fetch_protocol.py @@ -107,6 +107,7 @@ class DocStreamReceiver(protocol.Protocol):              line, _ = utils.check_and_strip_comma(lines.pop(0))              try:                  self.lineReceived(line) +                self._line += 1              except AssertionError, e:                  raise errors.BrokenSyncStream(e) @@ -116,17 +117,13 @@ class DocStreamReceiver(protocol.Protocol):              self._properly_finished = True          elif self._line == 0:              assert line == '[' -            self._line += 1          elif self._line == 1: -            self._line += 1              self.metadata = json.loads(line)              assert 'error' not in self.metadata          elif (self._line % 2) == 0: -            self._line += 1              self.current_doc = json.loads(line)              assert 'error' not in self.current_doc          else: -            self._line += 1              self._doc_reader(self.current_doc, line.strip() or None)      def finish(self): | 
