From 5faf22e4603d8130d11890f43f2f002821e8a976 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 25 Oct 2017 22:35:32 -0300 Subject: [refactor] add a table for sync_status As defined in #8970, this table and the new module will ease adding sync features such as priority queues and streaming. --Resolves: #8970 --- src/leap/soledad/client/_db/blobs/__init__.py | 9 +++- src/leap/soledad/client/_db/blobs/sql.py | 73 +++++++++++++++------------ src/leap/soledad/client/_db/blobs/sync.py | 6 +-- 3 files changed, 51 insertions(+), 37 deletions(-) (limited to 'src/leap/soledad/client/_db/blobs') diff --git a/src/leap/soledad/client/_db/blobs/__init__.py b/src/leap/soledad/client/_db/blobs/__init__.py index 0120b222..3c8facba 100644 --- a/src/leap/soledad/client/_db/blobs/__init__.py +++ b/src/leap/soledad/client/_db/blobs/__init__.py @@ -196,8 +196,11 @@ class BlobManager(BlobsSynchronizer): check_http_status(response.code) defer.returnValue((yield response.json())) - def local_list(self, namespace='', sync_status=None): - return self.local.list(namespace, sync_status) + def local_list(self, namespace=''): + return self.local.list(namespace) + + def local_list_status(self, status, namespace=''): + return self.local.list_status(status, namespace) def put(self, doc, size, namespace=''): """ @@ -222,6 +225,8 @@ class BlobManager(BlobsSynchronizer): # TODO this is a tee really, but ok... could do db and upload # concurrently. not sure if we'd gain something. yield self.local.put(doc.blob_id, fd, size=size, namespace=namespace) + yield self.local.update_sync_status( + doc.blob_id, SyncStatus.PENDING_UPLOAD) # In fact, some kind of pipe is needed here, where each write on db # handle gets forwarded into a write on the connection handle fd = yield self.local.get(doc.blob_id, namespace=namespace) diff --git a/src/leap/soledad/client/_db/blobs/sql.py b/src/leap/soledad/client/_db/blobs/sql.py index dcca1671..72fc250b 100644 --- a/src/leap/soledad/client/_db/blobs/sql.py +++ b/src/leap/soledad/client/_db/blobs/sql.py @@ -55,7 +55,7 @@ class SQLiteBlobBackend(object): '/tmp/ignored', binascii.b2a_hex(key), is_raw_key=True, create=True) openfun = partial(pragmas.set_init_pragmas, opts=opts, - schema_func=_init_blob_table) + schema_func=_init_tables) self.dbpool = ConnectionPool( backend, self.path, check_same_thread=False, timeout=5, @@ -70,19 +70,22 @@ class SQLiteBlobBackend(object): @defer.inlineCallbacks def put(self, blob_id, blob_fd, size=None, - namespace='', status=SyncStatus.PENDING_UPLOAD): - previous_state = yield self.get_sync_status(blob_id) - unavailable = SyncStatus.UNAVAILABLE_STATUSES - if previous_state and previous_state[0] in unavailable: - yield self.delete(blob_id, namespace=namespace) - status = SyncStatus.SYNCED + namespace=''): logger.info("Saving blob in local database...") - insert = 'INSERT INTO blobs (blob_id, namespace, payload, sync_status)' - insert += ' VALUES (?, ?, zeroblob(?), ?)' - values = (blob_id, namespace, size, status) + insert = 'INSERT INTO blobs (blob_id, namespace, payload)' + insert += ' VALUES (?, ?, zeroblob(?))' + values = (blob_id, namespace, size) irow = yield self.dbpool.insertAndGetLastRowid(insert, values) yield self.dbpool.write_blob('blobs', 'payload', irow, blob_fd) logger.info("Finished saving blob in local database.") + """ + # set as synced if it was pending + previous_state = yield self.get_sync_status(blob_id) + unavailable = SyncStatus.UNAVAILABLE_STATUSES + if previous_state and previous_state[0] in unavailable: + status = SyncStatus.SYNCED + yield self.update_sync_status(blob_id, status, namespace) + """ @defer.inlineCallbacks def get(self, blob_id, namespace=''): @@ -90,33 +93,32 @@ class SQLiteBlobBackend(object): # incremental interface for blobs - and just return the raw fd instead select = 'SELECT payload FROM blobs WHERE blob_id = ? AND namespace= ?' values = (blob_id, namespace,) - avoid_values = SyncStatus.UNAVAILABLE_STATUSES - select += ' AND sync_status NOT IN (%s)' - select %= ','.join(['?' for _ in avoid_values]) - values += avoid_values result = yield self.dbpool.runQuery(select, values) if result: defer.returnValue(BytesIO(str(result[0][0]))) @defer.inlineCallbacks def get_sync_status(self, blob_id): - select = 'SELECT sync_status, retries FROM blobs WHERE blob_id = ?' + select = 'SELECT sync_status, retries FROM sync_state WHERE blob_id= ?' result = yield self.dbpool.runQuery(select, (blob_id,)) if result: defer.returnValue((result[0][0], result[0][1])) @defer.inlineCallbacks - def list(self, namespace='', sync_status=False): + def list(self, namespace=''): query = 'select blob_id from blobs where namespace = ?' values = (namespace,) - if sync_status: - query += ' and sync_status = ?' - values += (sync_status,) + result = yield self.dbpool.runQuery(query, values) + if result: + defer.returnValue([b_id[0] for b_id in result]) else: - avoid_values = SyncStatus.UNAVAILABLE_STATUSES - query += ' AND sync_status NOT IN (%s)' - query %= ','.join(['?' for _ in avoid_values]) - values += avoid_values + defer.returnValue([]) + + @defer.inlineCallbacks + def list_status(self, sync_status, namespace=''): + query = 'select blob_id from sync_state where sync_status = ?' + query += 'AND namespace = ?' + values = (sync_status, namespace,) result = yield self.dbpool.runQuery(query, values) if result: defer.returnValue([b_id[0] for b_id in result]) @@ -125,34 +127,34 @@ class SQLiteBlobBackend(object): @defer.inlineCallbacks def update_sync_status(self, blob_id, sync_status, namespace=""): - query = 'SELECT sync_status FROM blobs WHERE blob_id = ?' + query = 'SELECT sync_status FROM sync_state WHERE blob_id = ?' result = yield self.dbpool.runQuery(query, (blob_id,)) if not result: - insert = 'INSERT INTO blobs' - insert += ' (blob_id, namespace, payload, sync_status)' - insert += ' VALUES (?, ?, zeroblob(0), ?)' + insert = 'INSERT INTO sync_state' + insert += ' (blob_id, namespace, sync_status)' + insert += ' VALUES (?, ?, ?)' values = (blob_id, namespace, sync_status) yield self.dbpool.runOperation(insert, values) return - update = 'UPDATE blobs SET sync_status = ? WHERE blob_id = ?' + update = 'UPDATE sync_state SET sync_status = ? WHERE blob_id = ?' values = (sync_status, blob_id,) result = yield self.dbpool.runOperation(update, values) def update_batch_sync_status(self, blob_id_list, sync_status, namespace=''): - insert = 'INSERT INTO blobs (blob_id, namespace, payload, sync_status)' + insert = 'INSERT INTO sync_state (blob_id, namespace, sync_status)' first_blob_id, blob_id_list = blob_id_list[0], blob_id_list[1:] - insert += ' VALUES (?, ?, zeroblob(0), ?)' + insert += ' VALUES (?, ?, ?)' values = (first_blob_id, namespace, sync_status) for blob_id in blob_id_list: - insert += ', (?, ?, zeroblob(0), ?)' + insert += ', (?, ?, ?)' values += (blob_id, namespace, sync_status) return self.dbpool.runQuery(insert, values) def increment_retries(self, blob_id): - query = 'update blobs set retries = retries + 1 where blob_id = ?' + query = 'update sync_state set retries = retries + 1 where blob_id = ?' return self.dbpool.runQuery(query, (blob_id,)) @defer.inlineCallbacks @@ -175,11 +177,18 @@ class SQLiteBlobBackend(object): return self.dbpool.runQuery(query, (blob_id, namespace,)) +def _init_tables(conn): + # unified init for running under the same lock + _init_blob_table(conn) + _init_sync_table(conn) + + def _init_sync_table(conn): maybe_create = """ CREATE TABLE IF NOT EXISTS sync_state ( blob_id PRIMARY KEY, + namespace TEXT, sync_status INT default %s, retries INT default 0)""" default_status = SyncStatus.PENDING_UPLOAD diff --git a/src/leap/soledad/client/_db/blobs/sync.py b/src/leap/soledad/client/_db/blobs/sync.py index 67df1d7f..d2b7bed6 100644 --- a/src/leap/soledad/client/_db/blobs/sync.py +++ b/src/leap/soledad/client/_db/blobs/sync.py @@ -71,7 +71,8 @@ class BlobsSynchronizer(object): Optional parameter to restrict operation to a given namespace. :type namespace: str """ - missing = yield self.local.list(namespace, SyncStatus.PENDING_UPLOAD) + missing = yield self.local.list_status( + SyncStatus.PENDING_UPLOAD, namespace) total = len(missing) logger.info("Will send %d blobs to server." % total) deferreds = [] @@ -119,8 +120,7 @@ class BlobsSynchronizer(object): :type namespace: str """ # TODO: Use something to prioritize user requests over general new docs - d = self.local_list(namespace=namespace, - sync_status=SyncStatus.PENDING_DOWNLOAD) + d = self.local_list_status(SyncStatus.PENDING_DOWNLOAD, namespace) docs_we_want = yield d total = len(docs_we_want) logger.info("Will fetch %d blobs from server." % total) -- cgit v1.2.3