summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Shyba <victor1984@riseup.net>2017-10-25 22:35:32 -0300
committerVictor Shyba <victor1984@riseup.net>2017-10-27 16:01:48 -0300
commit5faf22e4603d8130d11890f43f2f002821e8a976 (patch)
tree8e0a849b96d1540e4d2c695abb000eca3534a480
parent3230c9388729ae784c05575a2021c9d0995faa13 (diff)
[refactor] add a table for sync_status
As defined in #8970, this table and the new module will ease adding sync features such as priority queues and streaming. --Resolves: #8970
-rw-r--r--src/leap/soledad/client/_db/blobs/__init__.py9
-rw-r--r--src/leap/soledad/client/_db/blobs/sql.py73
-rw-r--r--src/leap/soledad/client/_db/blobs/sync.py6
-rw-r--r--tests/blobs/test_blob_manager.py43
-rw-r--r--tests/server/test_blobs_server.py6
5 files changed, 79 insertions, 58 deletions
diff --git a/src/leap/soledad/client/_db/blobs/__init__.py b/src/leap/soledad/client/_db/blobs/__init__.py
index 0120b222..3c8facba 100644
--- a/src/leap/soledad/client/_db/blobs/__init__.py
+++ b/src/leap/soledad/client/_db/blobs/__init__.py
@@ -196,8 +196,11 @@ class BlobManager(BlobsSynchronizer):
check_http_status(response.code)
defer.returnValue((yield response.json()))
- def local_list(self, namespace='', sync_status=None):
- return self.local.list(namespace, sync_status)
+ def local_list(self, namespace=''):
+ return self.local.list(namespace)
+
+ def local_list_status(self, status, namespace=''):
+ return self.local.list_status(status, namespace)
def put(self, doc, size, namespace=''):
"""
@@ -222,6 +225,8 @@ class BlobManager(BlobsSynchronizer):
# TODO this is a tee really, but ok... could do db and upload
# concurrently. not sure if we'd gain something.
yield self.local.put(doc.blob_id, fd, size=size, namespace=namespace)
+ yield self.local.update_sync_status(
+ doc.blob_id, SyncStatus.PENDING_UPLOAD)
# In fact, some kind of pipe is needed here, where each write on db
# handle gets forwarded into a write on the connection handle
fd = yield self.local.get(doc.blob_id, namespace=namespace)
diff --git a/src/leap/soledad/client/_db/blobs/sql.py b/src/leap/soledad/client/_db/blobs/sql.py
index dcca1671..72fc250b 100644
--- a/src/leap/soledad/client/_db/blobs/sql.py
+++ b/src/leap/soledad/client/_db/blobs/sql.py
@@ -55,7 +55,7 @@ class SQLiteBlobBackend(object):
'/tmp/ignored', binascii.b2a_hex(key),
is_raw_key=True, create=True)
openfun = partial(pragmas.set_init_pragmas, opts=opts,
- schema_func=_init_blob_table)
+ schema_func=_init_tables)
self.dbpool = ConnectionPool(
backend, self.path, check_same_thread=False, timeout=5,
@@ -70,19 +70,22 @@ class SQLiteBlobBackend(object):
@defer.inlineCallbacks
def put(self, blob_id, blob_fd, size=None,
- namespace='', status=SyncStatus.PENDING_UPLOAD):
- previous_state = yield self.get_sync_status(blob_id)
- unavailable = SyncStatus.UNAVAILABLE_STATUSES
- if previous_state and previous_state[0] in unavailable:
- yield self.delete(blob_id, namespace=namespace)
- status = SyncStatus.SYNCED
+ namespace=''):
logger.info("Saving blob in local database...")
- insert = 'INSERT INTO blobs (blob_id, namespace, payload, sync_status)'
- insert += ' VALUES (?, ?, zeroblob(?), ?)'
- values = (blob_id, namespace, size, status)
+ insert = 'INSERT INTO blobs (blob_id, namespace, payload)'
+ insert += ' VALUES (?, ?, zeroblob(?))'
+ values = (blob_id, namespace, size)
irow = yield self.dbpool.insertAndGetLastRowid(insert, values)
yield self.dbpool.write_blob('blobs', 'payload', irow, blob_fd)
logger.info("Finished saving blob in local database.")
+ """
+ # set as synced if it was pending
+ previous_state = yield self.get_sync_status(blob_id)
+ unavailable = SyncStatus.UNAVAILABLE_STATUSES
+ if previous_state and previous_state[0] in unavailable:
+ status = SyncStatus.SYNCED
+ yield self.update_sync_status(blob_id, status, namespace)
+ """
@defer.inlineCallbacks
def get(self, blob_id, namespace=''):
@@ -90,33 +93,32 @@ class SQLiteBlobBackend(object):
# incremental interface for blobs - and just return the raw fd instead
select = 'SELECT payload FROM blobs WHERE blob_id = ? AND namespace= ?'
values = (blob_id, namespace,)
- avoid_values = SyncStatus.UNAVAILABLE_STATUSES
- select += ' AND sync_status NOT IN (%s)'
- select %= ','.join(['?' for _ in avoid_values])
- values += avoid_values
result = yield self.dbpool.runQuery(select, values)
if result:
defer.returnValue(BytesIO(str(result[0][0])))
@defer.inlineCallbacks
def get_sync_status(self, blob_id):
- select = 'SELECT sync_status, retries FROM blobs WHERE blob_id = ?'
+ select = 'SELECT sync_status, retries FROM sync_state WHERE blob_id= ?'
result = yield self.dbpool.runQuery(select, (blob_id,))
if result:
defer.returnValue((result[0][0], result[0][1]))
@defer.inlineCallbacks
- def list(self, namespace='', sync_status=False):
+ def list(self, namespace=''):
query = 'select blob_id from blobs where namespace = ?'
values = (namespace,)
- if sync_status:
- query += ' and sync_status = ?'
- values += (sync_status,)
+ result = yield self.dbpool.runQuery(query, values)
+ if result:
+ defer.returnValue([b_id[0] for b_id in result])
else:
- avoid_values = SyncStatus.UNAVAILABLE_STATUSES
- query += ' AND sync_status NOT IN (%s)'
- query %= ','.join(['?' for _ in avoid_values])
- values += avoid_values
+ defer.returnValue([])
+
+ @defer.inlineCallbacks
+ def list_status(self, sync_status, namespace=''):
+ query = 'select blob_id from sync_state where sync_status = ?'
+ query += 'AND namespace = ?'
+ values = (sync_status, namespace,)
result = yield self.dbpool.runQuery(query, values)
if result:
defer.returnValue([b_id[0] for b_id in result])
@@ -125,34 +127,34 @@ class SQLiteBlobBackend(object):
@defer.inlineCallbacks
def update_sync_status(self, blob_id, sync_status, namespace=""):
- query = 'SELECT sync_status FROM blobs WHERE blob_id = ?'
+ query = 'SELECT sync_status FROM sync_state WHERE blob_id = ?'
result = yield self.dbpool.runQuery(query, (blob_id,))
if not result:
- insert = 'INSERT INTO blobs'
- insert += ' (blob_id, namespace, payload, sync_status)'
- insert += ' VALUES (?, ?, zeroblob(0), ?)'
+ insert = 'INSERT INTO sync_state'
+ insert += ' (blob_id, namespace, sync_status)'
+ insert += ' VALUES (?, ?, ?)'
values = (blob_id, namespace, sync_status)
yield self.dbpool.runOperation(insert, values)
return
- update = 'UPDATE blobs SET sync_status = ? WHERE blob_id = ?'
+ update = 'UPDATE sync_state SET sync_status = ? WHERE blob_id = ?'
values = (sync_status, blob_id,)
result = yield self.dbpool.runOperation(update, values)
def update_batch_sync_status(self, blob_id_list, sync_status,
namespace=''):
- insert = 'INSERT INTO blobs (blob_id, namespace, payload, sync_status)'
+ insert = 'INSERT INTO sync_state (blob_id, namespace, sync_status)'
first_blob_id, blob_id_list = blob_id_list[0], blob_id_list[1:]
- insert += ' VALUES (?, ?, zeroblob(0), ?)'
+ insert += ' VALUES (?, ?, ?)'
values = (first_blob_id, namespace, sync_status)
for blob_id in blob_id_list:
- insert += ', (?, ?, zeroblob(0), ?)'
+ insert += ', (?, ?, ?)'
values += (blob_id, namespace, sync_status)
return self.dbpool.runQuery(insert, values)
def increment_retries(self, blob_id):
- query = 'update blobs set retries = retries + 1 where blob_id = ?'
+ query = 'update sync_state set retries = retries + 1 where blob_id = ?'
return self.dbpool.runQuery(query, (blob_id,))
@defer.inlineCallbacks
@@ -175,11 +177,18 @@ class SQLiteBlobBackend(object):
return self.dbpool.runQuery(query, (blob_id, namespace,))
+def _init_tables(conn):
+ # unified init for running under the same lock
+ _init_blob_table(conn)
+ _init_sync_table(conn)
+
+
def _init_sync_table(conn):
maybe_create = """
CREATE TABLE IF NOT EXISTS
sync_state (
blob_id PRIMARY KEY,
+ namespace TEXT,
sync_status INT default %s,
retries INT default 0)"""
default_status = SyncStatus.PENDING_UPLOAD
diff --git a/src/leap/soledad/client/_db/blobs/sync.py b/src/leap/soledad/client/_db/blobs/sync.py
index 67df1d7f..d2b7bed6 100644
--- a/src/leap/soledad/client/_db/blobs/sync.py
+++ b/src/leap/soledad/client/_db/blobs/sync.py
@@ -71,7 +71,8 @@ class BlobsSynchronizer(object):
Optional parameter to restrict operation to a given namespace.
:type namespace: str
"""
- missing = yield self.local.list(namespace, SyncStatus.PENDING_UPLOAD)
+ missing = yield self.local.list_status(
+ SyncStatus.PENDING_UPLOAD, namespace)
total = len(missing)
logger.info("Will send %d blobs to server." % total)
deferreds = []
@@ -119,8 +120,7 @@ class BlobsSynchronizer(object):
:type namespace: str
"""
# TODO: Use something to prioritize user requests over general new docs
- d = self.local_list(namespace=namespace,
- sync_status=SyncStatus.PENDING_DOWNLOAD)
+ d = self.local_list_status(SyncStatus.PENDING_DOWNLOAD, namespace)
docs_we_want = yield d
total = len(docs_we_want)
logger.info("Will fetch %d blobs from server." % total)
diff --git a/tests/blobs/test_blob_manager.py b/tests/blobs/test_blob_manager.py
index 26c6506e..4b2b1135 100644
--- a/tests/blobs/test_blob_manager.py
+++ b/tests/blobs/test_blob_manager.py
@@ -31,8 +31,8 @@ import pytest
import os
# monkey-patch the blobmanager MAX_WAIT time so tests run faster
-from leap.soledad.client._db import blobs
-blobs.MAX_WAIT = 1
+from leap.soledad.client._db.blobs import sync
+sync.MAX_WAIT = 1
class BlobManagerTestCase(unittest.TestCase):
@@ -117,7 +117,8 @@ class BlobManagerTestCase(unittest.TestCase):
fd, missing_id = BytesIO('test'), uuid4().hex
self.manager._encrypt_and_upload = Mock(return_value=None)
self.manager.remote_list = Mock(return_value=[])
- yield self.manager.local.put(missing_id, fd, 4)
+ doc1 = BlobDoc(fd, missing_id)
+ yield self.manager.put(doc1, 4)
yield self.manager.send_missing()
call_list = self.manager._encrypt_and_upload.call_args_list
@@ -163,7 +164,7 @@ class BlobManagerTestCase(unittest.TestCase):
with pytest.raises(Exception):
yield self.manager.put(doc1, len(content))
pending_upload = SyncStatus.PENDING_UPLOAD
- local_list = yield self.manager.local_list(sync_status=pending_upload)
+ local_list = yield self.manager.local_list_status(pending_upload)
self.assertIn(blob_id, local_list)
@defer.inlineCallbacks
@@ -176,12 +177,14 @@ class BlobManagerTestCase(unittest.TestCase):
# put a blob in local storage
content, blob_id = "Blob content", uuid4().hex
yield self.manager.local.put(blob_id, BytesIO(content), len(content))
+ pending = SyncStatus.PENDING_UPLOAD
+ yield self.manager.local.update_sync_status(blob_id, pending)
# try to send missing
with pytest.raises(defer.FirstError):
yield self.manager.send_missing()
# assert failed state and number of retries
failed_upload = SyncStatus.FAILED_UPLOAD
- local_list = yield self.manager.local_list(sync_status=failed_upload)
+ local_list = yield self.manager.local_list_status(failed_upload)
self.assertIn(blob_id, local_list)
sync_status, retries = \
yield self.manager.local.get_sync_status(blob_id)
@@ -193,7 +196,7 @@ class BlobManagerTestCase(unittest.TestCase):
def test_download_retry_limit(self):
# prepare the manager to fail accordingly
blob_id = uuid4().hex
- self.manager.local_list = Mock(return_value=[blob_id])
+ self.manager.local_list_status = Mock(return_value=[blob_id])
self.manager._download_and_decrypt = Mock(
side_effect=RetriableTransferError)
# try to fetch missing
@@ -201,7 +204,7 @@ class BlobManagerTestCase(unittest.TestCase):
yield self.manager.fetch_missing()
# assert failed state and number of retries
failed_download = SyncStatus.FAILED_DOWNLOAD
- local_list = yield self.manager.local.list(sync_status=failed_download)
+ local_list = yield self.manager.local.list_status(failed_download)
self.assertIn(blob_id, local_list)
sync_status, retries = \
yield self.manager.local.get_sync_status(blob_id)
@@ -213,11 +216,10 @@ class BlobManagerTestCase(unittest.TestCase):
def test_local_list_doesnt_include_unavailable_blobs(self):
local = self.manager.local
unavailable_ids, deferreds = [], []
- for unavailable_status in SyncStatus.UNAVAILABLE_STATUSES:
- current_blob_id = uuid4().hex
- deferreds.append(local.put(current_blob_id, BytesIO(''), 0,
- status=unavailable_status))
- unavailable_ids.append(current_blob_id)
+ for status in SyncStatus.UNAVAILABLE_STATUSES:
+ blob_id = uuid4().hex
+ deferreds.append(local.update_sync_status(blob_id, status))
+ unavailable_ids.append(blob_id)
available_blob_id = uuid4().hex
content, length = self.cleartext, len(self.cleartext.getvalue())
deferreds.append(local.put(available_blob_id, content, length))
@@ -232,11 +234,10 @@ class BlobManagerTestCase(unittest.TestCase):
def test_get_doesnt_include_unavailable_blobs(self):
local = self.manager.local
unavailable_ids, deferreds = [], []
- for unavailable_status in SyncStatus.UNAVAILABLE_STATUSES:
- current_blob_id = uuid4().hex
- deferreds.append(local.put(current_blob_id, BytesIO(''), 0,
- status=unavailable_status))
- unavailable_ids.append(current_blob_id)
+ for status in SyncStatus.UNAVAILABLE_STATUSES:
+ blob_id = uuid4().hex
+ deferreds.append(local.update_sync_status(blob_id, status))
+ unavailable_ids.append(blob_id)
available_blob_id = uuid4().hex
content, length = self.cleartext, len(self.cleartext.getvalue())
deferreds.append(local.put(available_blob_id, content, length))
@@ -256,13 +257,15 @@ class BlobManagerTestCase(unittest.TestCase):
content, pending = self.cleartext, SyncStatus.PENDING_UPLOAD
length, deferreds = len(content.getvalue()), []
for blob_id in local_ids:
- d = local.put(blob_id, content, length, status=pending)
+ d = local.put(blob_id, content, length)
+ deferreds.append(d)
+ d = local.update_sync_status(blob_id, pending)
deferreds.append(d)
yield defer.gatherResults(deferreds)
yield self.manager.refresh_sync_status_from_server()
- d = self.manager.local_list(sync_status=SyncStatus.PENDING_UPLOAD)
+ d = self.manager.local_list_status(SyncStatus.PENDING_UPLOAD)
pending_upload_list = yield d
- d = self.manager.local_list(sync_status=SyncStatus.PENDING_DOWNLOAD)
+ d = self.manager.local_list_status(SyncStatus.PENDING_DOWNLOAD)
pending_download_list = yield d
self.assertEquals(set(pending_upload_list), set(local_ids))
self.assertEquals(set(pending_download_list), set(remote_ids))
diff --git a/tests/server/test_blobs_server.py b/tests/server/test_blobs_server.py
index c4a00ab5..1d66d0ef 100644
--- a/tests/server/test_blobs_server.py
+++ b/tests/server/test_blobs_server.py
@@ -48,7 +48,7 @@ def sleep(x):
class BlobServerTestCase(unittest.TestCase):
def setUp(self):
- client_blobs.MAX_WAIT = 0.1
+ client_blobs.sync.MAX_WAIT = 0.1
root = server_blobs.BlobsResource("filesystem", self.tempdir)
self.site = Site(root)
self.port = reactor.listenTCP(0, self.site, interface='127.0.0.1')
@@ -261,6 +261,8 @@ class BlobServerTestCase(unittest.TestCase):
self.addCleanup(manager.close)
blob_id = 'local_only_blob_id'
yield manager.local.put(blob_id, BytesIO("X"), size=1)
+ pending = SyncStatus.PENDING_UPLOAD
+ yield manager.local.update_sync_status(blob_id, pending)
yield manager.send_missing()
result = yield manager._download_and_decrypt(blob_id)
self.assertIsNotNone(result)
@@ -274,6 +276,8 @@ class BlobServerTestCase(unittest.TestCase):
self.addCleanup(manager.close)
blob_id = 'remote_only_blob_id'
yield manager.local.put(blob_id, BytesIO("X"), size=1)
+ pending = SyncStatus.PENDING_UPLOAD
+ yield manager.local.update_sync_status(blob_id, pending)
yield self.port.stopListening()
d = manager.send_missing()