From 1d8b32abc821116da4f3b4f192bc06a931d30076 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 29 Nov 2017 11:42:29 -0200 Subject: [feature] add priorities for blob transfers Closes: #8691 --- src/leap/soledad/client/_db/blobs/sql.py | 68 +++++++++++++++++++++++--------- 1 file changed, 50 insertions(+), 18 deletions(-) (limited to 'src/leap/soledad/client/_db/blobs/sql.py') diff --git a/src/leap/soledad/client/_db/blobs/sql.py b/src/leap/soledad/client/_db/blobs/sql.py index ebd6c095..a89802d8 100644 --- a/src/leap/soledad/client/_db/blobs/sql.py +++ b/src/leap/soledad/client/_db/blobs/sql.py @@ -46,6 +46,14 @@ class SyncStatus: UNAVAILABLE_STATUSES = (3, 5) +class Priority: + LOW = 1 + MEDIUM = 2 + HIGH = 3 + URGENT = 4 + DEFAULT = 2 + + class SQLiteBlobBackend(object): concurrency_limit = 10 @@ -130,7 +138,7 @@ class SQLiteBlobBackend(object): @defer.inlineCallbacks def list_status(self, sync_status, namespace=''): query = 'select blob_id from sync_state where sync_status = ?' - query += 'AND namespace = ?' + query += 'AND namespace = ? ORDER BY priority DESC' values = (sync_status, namespace,) result = yield self.dbpool.runQuery(query, values) if result: @@ -139,21 +147,42 @@ class SQLiteBlobBackend(object): defer.returnValue([]) @defer.inlineCallbacks - def update_sync_status(self, blob_id, sync_status, namespace=""): - query = 'SELECT sync_status FROM sync_state WHERE blob_id = ?' - result = yield self.dbpool.runQuery(query, (blob_id,)) + def update_sync_status(self, blob_id, sync_status, namespace="", + priority=None): + retries = '(SELECT retries from sync_state' \ + ' WHERE blob_id="%s" and namespace="%s")' \ + % (blob_id, namespace) + if not priority: + priority = '(SELECT priority FROM sync_state' \ + ' WHERE blob_id="%s" AND namespace="%s")' \ + % (blob_id, namespace) + fields = 'blob_id, namespace, sync_status, retries, priority' + markers = '?, ?, ?, %s, %s' % (retries, priority) + values = [blob_id, namespace, sync_status] + insert = 'INSERT or REPLACE INTO sync_state (%s) VALUES (%s)' \ + % (fields, markers) + yield self.dbpool.runOperation(insert, tuple(values)) + @defer.inlineCallbacks + def get_priority(self, blob_id, namespace=""): + query = 'SELECT priority FROM sync_state WHERE blob_id = ?' + result = yield self.dbpool.runQuery(query, (blob_id,)) if not result: - insert = 'INSERT INTO sync_state' - insert += ' (blob_id, namespace, sync_status)' - insert += ' VALUES (?, ?, ?)' - values = (blob_id, namespace, sync_status) - yield self.dbpool.runOperation(insert, values) - return + defer.returnValue(None) + priority = result.pop()[0] + defer.returnValue(priority) - update = 'UPDATE sync_state SET sync_status = ? WHERE blob_id = ?' - values = (sync_status, blob_id,) - result = yield self.dbpool.runOperation(update, values) + @defer.inlineCallbacks + def update_priority(self, blob_id, priority, namespace=""): + old_priority = self.get_priority(blob_id, namespace=namespace) + if not old_priority: + logger.error("Can't update priority of %s: no sync status entry.") + return + if old_priority == priority: + return + update = 'UPDATE sync_state SET priority = ? WHERE blob_id = ?' + values = (priority, blob_id,) + yield self.dbpool.runOperation(update, values) def update_batch_sync_status(self, blob_id_list, sync_status, namespace=''): @@ -161,11 +190,12 @@ class SQLiteBlobBackend(object): return insert = 'INSERT or REPLACE INTO sync_state' first_blob_id, blob_id_list = blob_id_list[0], blob_id_list[1:] - insert += ' (blob_id, namespace, sync_status) VALUES (?, ?, ?)' - values = (first_blob_id, namespace, sync_status) + insert += ' (blob_id, namespace, sync_status, priority)' + insert += ' VALUES (?, ?, ?, ?)' + values = (first_blob_id, namespace, sync_status, Priority.DEFAULT) for blob_id in blob_id_list: - insert += ', (?, ?, ?)' - values += (blob_id, namespace, sync_status) + insert += ', (?, ?, ?, ?)' + values += (blob_id, namespace, sync_status, Priority.DEFAULT) return self.dbpool.runQuery(insert, values) def increment_retries(self, blob_id): @@ -212,9 +242,11 @@ def _init_sync_table(conn): blob_id PRIMARY KEY, namespace TEXT, sync_status INT default %s, + priority INT default %d, retries INT default 0)""" default_status = SyncStatus.PENDING_UPLOAD - maybe_create %= default_status + default_priority = Priority.DEFAULT + maybe_create %= (default_status, default_priority) conn.execute(maybe_create) -- cgit v1.2.3