From 058d15265134db653987a6fff052cf81299bab51 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 11 Sep 2017 22:18:38 -0300 Subject: [bug] use sql file handler from adbapi threadpool This commit makes all write calls happen inside the same thread that opened the blob handle. Doing it outside using FileBodyProducer will yield and run the writes across random reactor threads. This is an attempt to fix #8945 -- Resolves: #8945 --- src/leap/soledad/client/_db/blobs.py | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) (limited to 'src/leap/soledad/client/_db/blobs.py') diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py index 1447e3bb..5e61ad36 100644 --- a/src/leap/soledad/client/_db/blobs.py +++ b/src/leap/soledad/client/_db/blobs.py @@ -31,7 +31,6 @@ from functools import partial from twisted.logger import Logger from twisted.enterprise import adbapi from twisted.internet import defer -from twisted.web.client import FileBodyProducer import treq @@ -125,6 +124,23 @@ class ConnectionPool(adbapi.ConnectionPool): """ return self.runInteraction(self._blob, table, column, irow, flags) + def write(self, table, column, irow, blob_fd): + return self.runInteraction(self._write_blob, table, column, irow, + blob_fd) + + def _write_blob(self, trans, table, column, irow, blob_fd): + blob_fd.seek(0) + # XXX I have to copy the buffer here so that I'm able to + # return a non-closed file to the caller (blobmanager.get) + # FIXME should remove this duplication! + # have a look at how treq does cope with closing the handle + # for uploading a file + with trans._connection.blob(table, column, irow, 1) as handle: + data = blob_fd.read(2**12) + while data: + handle.write(data) + data = blob_fd.read(2**12) + def _blob(self, trans, table, column, irow, flags): # TODO: should not use transaction private variable here handle = trans._connection.blob(table, column, irow, flags) @@ -475,18 +491,8 @@ class SQLiteBlobBackend(object): insert += ' VALUES (?, ?, zeroblob(?), ?)' values = (blob_id, namespace, size, status) irow = yield self.dbpool.insertAndGetLastRowid(insert, values) - handle = yield self.dbpool.blob('blobs', 'payload', irow, 1) - blob_fd.seek(0) - # XXX I have to copy the buffer here so that I'm able to - # return a non-closed file to the caller (blobmanager.get) - # FIXME should remove this duplication! - # have a look at how treq does cope with closing the handle - # for uploading a file - producer = FileBodyProducer(blob_fd) - with handle: - done = yield producer.startProducing(handle) + yield self.dbpool.write('blobs', 'payload', irow, blob_fd) logger.info("Finished saving blob in local database.") - defer.returnValue(done) @defer.inlineCallbacks def get(self, blob_id, namespace=''): -- cgit v1.2.3