summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorVictor Shyba <victor1984@riseup.net>2017-10-25 22:35:32 -0300
committerVictor Shyba <victor1984@riseup.net>2017-10-27 16:01:48 -0300
commit5faf22e4603d8130d11890f43f2f002821e8a976 (patch)
tree8e0a849b96d1540e4d2c695abb000eca3534a480 /src
parent3230c9388729ae784c05575a2021c9d0995faa13 (diff)
[refactor] add a table for sync_status
As defined in #8970, this table and the new module will ease adding sync features such as priority queues and streaming. --Resolves: #8970
Diffstat (limited to 'src')
-rw-r--r--src/leap/soledad/client/_db/blobs/__init__.py9
-rw-r--r--src/leap/soledad/client/_db/blobs/sql.py73
-rw-r--r--src/leap/soledad/client/_db/blobs/sync.py6
3 files changed, 51 insertions, 37 deletions
diff --git a/src/leap/soledad/client/_db/blobs/__init__.py b/src/leap/soledad/client/_db/blobs/__init__.py
index 0120b222..3c8facba 100644
--- a/src/leap/soledad/client/_db/blobs/__init__.py
+++ b/src/leap/soledad/client/_db/blobs/__init__.py
@@ -196,8 +196,11 @@ class BlobManager(BlobsSynchronizer):
check_http_status(response.code)
defer.returnValue((yield response.json()))
- def local_list(self, namespace='', sync_status=None):
- return self.local.list(namespace, sync_status)
+ def local_list(self, namespace=''):
+ return self.local.list(namespace)
+
+ def local_list_status(self, status, namespace=''):
+ return self.local.list_status(status, namespace)
def put(self, doc, size, namespace=''):
"""
@@ -222,6 +225,8 @@ class BlobManager(BlobsSynchronizer):
# TODO this is a tee really, but ok... could do db and upload
# concurrently. not sure if we'd gain something.
yield self.local.put(doc.blob_id, fd, size=size, namespace=namespace)
+ yield self.local.update_sync_status(
+ doc.blob_id, SyncStatus.PENDING_UPLOAD)
# In fact, some kind of pipe is needed here, where each write on db
# handle gets forwarded into a write on the connection handle
fd = yield self.local.get(doc.blob_id, namespace=namespace)
diff --git a/src/leap/soledad/client/_db/blobs/sql.py b/src/leap/soledad/client/_db/blobs/sql.py
index dcca1671..72fc250b 100644
--- a/src/leap/soledad/client/_db/blobs/sql.py
+++ b/src/leap/soledad/client/_db/blobs/sql.py
@@ -55,7 +55,7 @@ class SQLiteBlobBackend(object):
'/tmp/ignored', binascii.b2a_hex(key),
is_raw_key=True, create=True)
openfun = partial(pragmas.set_init_pragmas, opts=opts,
- schema_func=_init_blob_table)
+ schema_func=_init_tables)
self.dbpool = ConnectionPool(
backend, self.path, check_same_thread=False, timeout=5,
@@ -70,19 +70,22 @@ class SQLiteBlobBackend(object):
@defer.inlineCallbacks
def put(self, blob_id, blob_fd, size=None,
- namespace='', status=SyncStatus.PENDING_UPLOAD):
- previous_state = yield self.get_sync_status(blob_id)
- unavailable = SyncStatus.UNAVAILABLE_STATUSES
- if previous_state and previous_state[0] in unavailable:
- yield self.delete(blob_id, namespace=namespace)
- status = SyncStatus.SYNCED
+ namespace=''):
logger.info("Saving blob in local database...")
- insert = 'INSERT INTO blobs (blob_id, namespace, payload, sync_status)'
- insert += ' VALUES (?, ?, zeroblob(?), ?)'
- values = (blob_id, namespace, size, status)
+ insert = 'INSERT INTO blobs (blob_id, namespace, payload)'
+ insert += ' VALUES (?, ?, zeroblob(?))'
+ values = (blob_id, namespace, size)
irow = yield self.dbpool.insertAndGetLastRowid(insert, values)
yield self.dbpool.write_blob('blobs', 'payload', irow, blob_fd)
logger.info("Finished saving blob in local database.")
+ """
+ # set as synced if it was pending
+ previous_state = yield self.get_sync_status(blob_id)
+ unavailable = SyncStatus.UNAVAILABLE_STATUSES
+ if previous_state and previous_state[0] in unavailable:
+ status = SyncStatus.SYNCED
+ yield self.update_sync_status(blob_id, status, namespace)
+ """
@defer.inlineCallbacks
def get(self, blob_id, namespace=''):
@@ -90,33 +93,32 @@ class SQLiteBlobBackend(object):
# incremental interface for blobs - and just return the raw fd instead
select = 'SELECT payload FROM blobs WHERE blob_id = ? AND namespace= ?'
values = (blob_id, namespace,)
- avoid_values = SyncStatus.UNAVAILABLE_STATUSES
- select += ' AND sync_status NOT IN (%s)'
- select %= ','.join(['?' for _ in avoid_values])
- values += avoid_values
result = yield self.dbpool.runQuery(select, values)
if result:
defer.returnValue(BytesIO(str(result[0][0])))
@defer.inlineCallbacks
def get_sync_status(self, blob_id):
- select = 'SELECT sync_status, retries FROM blobs WHERE blob_id = ?'
+ select = 'SELECT sync_status, retries FROM sync_state WHERE blob_id= ?'
result = yield self.dbpool.runQuery(select, (blob_id,))
if result:
defer.returnValue((result[0][0], result[0][1]))
@defer.inlineCallbacks
- def list(self, namespace='', sync_status=False):
+ def list(self, namespace=''):
query = 'select blob_id from blobs where namespace = ?'
values = (namespace,)
- if sync_status:
- query += ' and sync_status = ?'
- values += (sync_status,)
+ result = yield self.dbpool.runQuery(query, values)
+ if result:
+ defer.returnValue([b_id[0] for b_id in result])
else:
- avoid_values = SyncStatus.UNAVAILABLE_STATUSES
- query += ' AND sync_status NOT IN (%s)'
- query %= ','.join(['?' for _ in avoid_values])
- values += avoid_values
+ defer.returnValue([])
+
+ @defer.inlineCallbacks
+ def list_status(self, sync_status, namespace=''):
+ query = 'select blob_id from sync_state where sync_status = ?'
+ query += 'AND namespace = ?'
+ values = (sync_status, namespace,)
result = yield self.dbpool.runQuery(query, values)
if result:
defer.returnValue([b_id[0] for b_id in result])
@@ -125,34 +127,34 @@ class SQLiteBlobBackend(object):
@defer.inlineCallbacks
def update_sync_status(self, blob_id, sync_status, namespace=""):
- query = 'SELECT sync_status FROM blobs WHERE blob_id = ?'
+ query = 'SELECT sync_status FROM sync_state WHERE blob_id = ?'
result = yield self.dbpool.runQuery(query, (blob_id,))
if not result:
- insert = 'INSERT INTO blobs'
- insert += ' (blob_id, namespace, payload, sync_status)'
- insert += ' VALUES (?, ?, zeroblob(0), ?)'
+ insert = 'INSERT INTO sync_state'
+ insert += ' (blob_id, namespace, sync_status)'
+ insert += ' VALUES (?, ?, ?)'
values = (blob_id, namespace, sync_status)
yield self.dbpool.runOperation(insert, values)
return
- update = 'UPDATE blobs SET sync_status = ? WHERE blob_id = ?'
+ update = 'UPDATE sync_state SET sync_status = ? WHERE blob_id = ?'
values = (sync_status, blob_id,)
result = yield self.dbpool.runOperation(update, values)
def update_batch_sync_status(self, blob_id_list, sync_status,
namespace=''):
- insert = 'INSERT INTO blobs (blob_id, namespace, payload, sync_status)'
+ insert = 'INSERT INTO sync_state (blob_id, namespace, sync_status)'
first_blob_id, blob_id_list = blob_id_list[0], blob_id_list[1:]
- insert += ' VALUES (?, ?, zeroblob(0), ?)'
+ insert += ' VALUES (?, ?, ?)'
values = (first_blob_id, namespace, sync_status)
for blob_id in blob_id_list:
- insert += ', (?, ?, zeroblob(0), ?)'
+ insert += ', (?, ?, ?)'
values += (blob_id, namespace, sync_status)
return self.dbpool.runQuery(insert, values)
def increment_retries(self, blob_id):
- query = 'update blobs set retries = retries + 1 where blob_id = ?'
+ query = 'update sync_state set retries = retries + 1 where blob_id = ?'
return self.dbpool.runQuery(query, (blob_id,))
@defer.inlineCallbacks
@@ -175,11 +177,18 @@ class SQLiteBlobBackend(object):
return self.dbpool.runQuery(query, (blob_id, namespace,))
+def _init_tables(conn):
+ # unified init for running under the same lock
+ _init_blob_table(conn)
+ _init_sync_table(conn)
+
+
def _init_sync_table(conn):
maybe_create = """
CREATE TABLE IF NOT EXISTS
sync_state (
blob_id PRIMARY KEY,
+ namespace TEXT,
sync_status INT default %s,
retries INT default 0)"""
default_status = SyncStatus.PENDING_UPLOAD
diff --git a/src/leap/soledad/client/_db/blobs/sync.py b/src/leap/soledad/client/_db/blobs/sync.py
index 67df1d7f..d2b7bed6 100644
--- a/src/leap/soledad/client/_db/blobs/sync.py
+++ b/src/leap/soledad/client/_db/blobs/sync.py
@@ -71,7 +71,8 @@ class BlobsSynchronizer(object):
Optional parameter to restrict operation to a given namespace.
:type namespace: str
"""
- missing = yield self.local.list(namespace, SyncStatus.PENDING_UPLOAD)
+ missing = yield self.local.list_status(
+ SyncStatus.PENDING_UPLOAD, namespace)
total = len(missing)
logger.info("Will send %d blobs to server." % total)
deferreds = []
@@ -119,8 +120,7 @@ class BlobsSynchronizer(object):
:type namespace: str
"""
# TODO: Use something to prioritize user requests over general new docs
- d = self.local_list(namespace=namespace,
- sync_status=SyncStatus.PENDING_DOWNLOAD)
+ d = self.local_list_status(SyncStatus.PENDING_DOWNLOAD, namespace)
docs_we_want = yield d
total = len(docs_we_want)
logger.info("Will fetch %d blobs from server." % total)