diff options
author | Victor Shyba <victor1984@riseup.net> | 2017-10-25 22:35:32 -0300 |
---|---|---|
committer | Victor Shyba <victor1984@riseup.net> | 2017-10-27 16:01:48 -0300 |
commit | 5faf22e4603d8130d11890f43f2f002821e8a976 (patch) | |
tree | 8e0a849b96d1540e4d2c695abb000eca3534a480 | |
parent | 3230c9388729ae784c05575a2021c9d0995faa13 (diff) |
[refactor] add a table for sync_status
As defined in #8970, this table and the new module will ease adding sync
features such as priority queues and streaming.
--Resolves: #8970
-rw-r--r-- | src/leap/soledad/client/_db/blobs/__init__.py | 9 | ||||
-rw-r--r-- | src/leap/soledad/client/_db/blobs/sql.py | 73 | ||||
-rw-r--r-- | src/leap/soledad/client/_db/blobs/sync.py | 6 | ||||
-rw-r--r-- | tests/blobs/test_blob_manager.py | 43 | ||||
-rw-r--r-- | tests/server/test_blobs_server.py | 6 |
5 files changed, 79 insertions, 58 deletions
diff --git a/src/leap/soledad/client/_db/blobs/__init__.py b/src/leap/soledad/client/_db/blobs/__init__.py index 0120b222..3c8facba 100644 --- a/src/leap/soledad/client/_db/blobs/__init__.py +++ b/src/leap/soledad/client/_db/blobs/__init__.py @@ -196,8 +196,11 @@ class BlobManager(BlobsSynchronizer): check_http_status(response.code) defer.returnValue((yield response.json())) - def local_list(self, namespace='', sync_status=None): - return self.local.list(namespace, sync_status) + def local_list(self, namespace=''): + return self.local.list(namespace) + + def local_list_status(self, status, namespace=''): + return self.local.list_status(status, namespace) def put(self, doc, size, namespace=''): """ @@ -222,6 +225,8 @@ class BlobManager(BlobsSynchronizer): # TODO this is a tee really, but ok... could do db and upload # concurrently. not sure if we'd gain something. yield self.local.put(doc.blob_id, fd, size=size, namespace=namespace) + yield self.local.update_sync_status( + doc.blob_id, SyncStatus.PENDING_UPLOAD) # In fact, some kind of pipe is needed here, where each write on db # handle gets forwarded into a write on the connection handle fd = yield self.local.get(doc.blob_id, namespace=namespace) diff --git a/src/leap/soledad/client/_db/blobs/sql.py b/src/leap/soledad/client/_db/blobs/sql.py index dcca1671..72fc250b 100644 --- a/src/leap/soledad/client/_db/blobs/sql.py +++ b/src/leap/soledad/client/_db/blobs/sql.py @@ -55,7 +55,7 @@ class SQLiteBlobBackend(object): '/tmp/ignored', binascii.b2a_hex(key), is_raw_key=True, create=True) openfun = partial(pragmas.set_init_pragmas, opts=opts, - schema_func=_init_blob_table) + schema_func=_init_tables) self.dbpool = ConnectionPool( backend, self.path, check_same_thread=False, timeout=5, @@ -70,19 +70,22 @@ class SQLiteBlobBackend(object): @defer.inlineCallbacks def put(self, blob_id, blob_fd, size=None, - namespace='', status=SyncStatus.PENDING_UPLOAD): - previous_state = yield self.get_sync_status(blob_id) - unavailable = SyncStatus.UNAVAILABLE_STATUSES - if previous_state and previous_state[0] in unavailable: - yield self.delete(blob_id, namespace=namespace) - status = SyncStatus.SYNCED + namespace=''): logger.info("Saving blob in local database...") - insert = 'INSERT INTO blobs (blob_id, namespace, payload, sync_status)' - insert += ' VALUES (?, ?, zeroblob(?), ?)' - values = (blob_id, namespace, size, status) + insert = 'INSERT INTO blobs (blob_id, namespace, payload)' + insert += ' VALUES (?, ?, zeroblob(?))' + values = (blob_id, namespace, size) irow = yield self.dbpool.insertAndGetLastRowid(insert, values) yield self.dbpool.write_blob('blobs', 'payload', irow, blob_fd) logger.info("Finished saving blob in local database.") + """ + # set as synced if it was pending + previous_state = yield self.get_sync_status(blob_id) + unavailable = SyncStatus.UNAVAILABLE_STATUSES + if previous_state and previous_state[0] in unavailable: + status = SyncStatus.SYNCED + yield self.update_sync_status(blob_id, status, namespace) + """ @defer.inlineCallbacks def get(self, blob_id, namespace=''): @@ -90,33 +93,32 @@ class SQLiteBlobBackend(object): # incremental interface for blobs - and just return the raw fd instead select = 'SELECT payload FROM blobs WHERE blob_id = ? AND namespace= ?' values = (blob_id, namespace,) - avoid_values = SyncStatus.UNAVAILABLE_STATUSES - select += ' AND sync_status NOT IN (%s)' - select %= ','.join(['?' for _ in avoid_values]) - values += avoid_values result = yield self.dbpool.runQuery(select, values) if result: defer.returnValue(BytesIO(str(result[0][0]))) @defer.inlineCallbacks def get_sync_status(self, blob_id): - select = 'SELECT sync_status, retries FROM blobs WHERE blob_id = ?' + select = 'SELECT sync_status, retries FROM sync_state WHERE blob_id= ?' result = yield self.dbpool.runQuery(select, (blob_id,)) if result: defer.returnValue((result[0][0], result[0][1])) @defer.inlineCallbacks - def list(self, namespace='', sync_status=False): + def list(self, namespace=''): query = 'select blob_id from blobs where namespace = ?' values = (namespace,) - if sync_status: - query += ' and sync_status = ?' - values += (sync_status,) + result = yield self.dbpool.runQuery(query, values) + if result: + defer.returnValue([b_id[0] for b_id in result]) else: - avoid_values = SyncStatus.UNAVAILABLE_STATUSES - query += ' AND sync_status NOT IN (%s)' - query %= ','.join(['?' for _ in avoid_values]) - values += avoid_values + defer.returnValue([]) + + @defer.inlineCallbacks + def list_status(self, sync_status, namespace=''): + query = 'select blob_id from sync_state where sync_status = ?' + query += 'AND namespace = ?' + values = (sync_status, namespace,) result = yield self.dbpool.runQuery(query, values) if result: defer.returnValue([b_id[0] for b_id in result]) @@ -125,34 +127,34 @@ class SQLiteBlobBackend(object): @defer.inlineCallbacks def update_sync_status(self, blob_id, sync_status, namespace=""): - query = 'SELECT sync_status FROM blobs WHERE blob_id = ?' + query = 'SELECT sync_status FROM sync_state WHERE blob_id = ?' result = yield self.dbpool.runQuery(query, (blob_id,)) if not result: - insert = 'INSERT INTO blobs' - insert += ' (blob_id, namespace, payload, sync_status)' - insert += ' VALUES (?, ?, zeroblob(0), ?)' + insert = 'INSERT INTO sync_state' + insert += ' (blob_id, namespace, sync_status)' + insert += ' VALUES (?, ?, ?)' values = (blob_id, namespace, sync_status) yield self.dbpool.runOperation(insert, values) return - update = 'UPDATE blobs SET sync_status = ? WHERE blob_id = ?' + update = 'UPDATE sync_state SET sync_status = ? WHERE blob_id = ?' values = (sync_status, blob_id,) result = yield self.dbpool.runOperation(update, values) def update_batch_sync_status(self, blob_id_list, sync_status, namespace=''): - insert = 'INSERT INTO blobs (blob_id, namespace, payload, sync_status)' + insert = 'INSERT INTO sync_state (blob_id, namespace, sync_status)' first_blob_id, blob_id_list = blob_id_list[0], blob_id_list[1:] - insert += ' VALUES (?, ?, zeroblob(0), ?)' + insert += ' VALUES (?, ?, ?)' values = (first_blob_id, namespace, sync_status) for blob_id in blob_id_list: - insert += ', (?, ?, zeroblob(0), ?)' + insert += ', (?, ?, ?)' values += (blob_id, namespace, sync_status) return self.dbpool.runQuery(insert, values) def increment_retries(self, blob_id): - query = 'update blobs set retries = retries + 1 where blob_id = ?' + query = 'update sync_state set retries = retries + 1 where blob_id = ?' return self.dbpool.runQuery(query, (blob_id,)) @defer.inlineCallbacks @@ -175,11 +177,18 @@ class SQLiteBlobBackend(object): return self.dbpool.runQuery(query, (blob_id, namespace,)) +def _init_tables(conn): + # unified init for running under the same lock + _init_blob_table(conn) + _init_sync_table(conn) + + def _init_sync_table(conn): maybe_create = """ CREATE TABLE IF NOT EXISTS sync_state ( blob_id PRIMARY KEY, + namespace TEXT, sync_status INT default %s, retries INT default 0)""" default_status = SyncStatus.PENDING_UPLOAD diff --git a/src/leap/soledad/client/_db/blobs/sync.py b/src/leap/soledad/client/_db/blobs/sync.py index 67df1d7f..d2b7bed6 100644 --- a/src/leap/soledad/client/_db/blobs/sync.py +++ b/src/leap/soledad/client/_db/blobs/sync.py @@ -71,7 +71,8 @@ class BlobsSynchronizer(object): Optional parameter to restrict operation to a given namespace. :type namespace: str """ - missing = yield self.local.list(namespace, SyncStatus.PENDING_UPLOAD) + missing = yield self.local.list_status( + SyncStatus.PENDING_UPLOAD, namespace) total = len(missing) logger.info("Will send %d blobs to server." % total) deferreds = [] @@ -119,8 +120,7 @@ class BlobsSynchronizer(object): :type namespace: str """ # TODO: Use something to prioritize user requests over general new docs - d = self.local_list(namespace=namespace, - sync_status=SyncStatus.PENDING_DOWNLOAD) + d = self.local_list_status(SyncStatus.PENDING_DOWNLOAD, namespace) docs_we_want = yield d total = len(docs_we_want) logger.info("Will fetch %d blobs from server." % total) diff --git a/tests/blobs/test_blob_manager.py b/tests/blobs/test_blob_manager.py index 26c6506e..4b2b1135 100644 --- a/tests/blobs/test_blob_manager.py +++ b/tests/blobs/test_blob_manager.py @@ -31,8 +31,8 @@ import pytest import os # monkey-patch the blobmanager MAX_WAIT time so tests run faster -from leap.soledad.client._db import blobs -blobs.MAX_WAIT = 1 +from leap.soledad.client._db.blobs import sync +sync.MAX_WAIT = 1 class BlobManagerTestCase(unittest.TestCase): @@ -117,7 +117,8 @@ class BlobManagerTestCase(unittest.TestCase): fd, missing_id = BytesIO('test'), uuid4().hex self.manager._encrypt_and_upload = Mock(return_value=None) self.manager.remote_list = Mock(return_value=[]) - yield self.manager.local.put(missing_id, fd, 4) + doc1 = BlobDoc(fd, missing_id) + yield self.manager.put(doc1, 4) yield self.manager.send_missing() call_list = self.manager._encrypt_and_upload.call_args_list @@ -163,7 +164,7 @@ class BlobManagerTestCase(unittest.TestCase): with pytest.raises(Exception): yield self.manager.put(doc1, len(content)) pending_upload = SyncStatus.PENDING_UPLOAD - local_list = yield self.manager.local_list(sync_status=pending_upload) + local_list = yield self.manager.local_list_status(pending_upload) self.assertIn(blob_id, local_list) @defer.inlineCallbacks @@ -176,12 +177,14 @@ class BlobManagerTestCase(unittest.TestCase): # put a blob in local storage content, blob_id = "Blob content", uuid4().hex yield self.manager.local.put(blob_id, BytesIO(content), len(content)) + pending = SyncStatus.PENDING_UPLOAD + yield self.manager.local.update_sync_status(blob_id, pending) # try to send missing with pytest.raises(defer.FirstError): yield self.manager.send_missing() # assert failed state and number of retries failed_upload = SyncStatus.FAILED_UPLOAD - local_list = yield self.manager.local_list(sync_status=failed_upload) + local_list = yield self.manager.local_list_status(failed_upload) self.assertIn(blob_id, local_list) sync_status, retries = \ yield self.manager.local.get_sync_status(blob_id) @@ -193,7 +196,7 @@ class BlobManagerTestCase(unittest.TestCase): def test_download_retry_limit(self): # prepare the manager to fail accordingly blob_id = uuid4().hex - self.manager.local_list = Mock(return_value=[blob_id]) + self.manager.local_list_status = Mock(return_value=[blob_id]) self.manager._download_and_decrypt = Mock( side_effect=RetriableTransferError) # try to fetch missing @@ -201,7 +204,7 @@ class BlobManagerTestCase(unittest.TestCase): yield self.manager.fetch_missing() # assert failed state and number of retries failed_download = SyncStatus.FAILED_DOWNLOAD - local_list = yield self.manager.local.list(sync_status=failed_download) + local_list = yield self.manager.local.list_status(failed_download) self.assertIn(blob_id, local_list) sync_status, retries = \ yield self.manager.local.get_sync_status(blob_id) @@ -213,11 +216,10 @@ class BlobManagerTestCase(unittest.TestCase): def test_local_list_doesnt_include_unavailable_blobs(self): local = self.manager.local unavailable_ids, deferreds = [], [] - for unavailable_status in SyncStatus.UNAVAILABLE_STATUSES: - current_blob_id = uuid4().hex - deferreds.append(local.put(current_blob_id, BytesIO(''), 0, - status=unavailable_status)) - unavailable_ids.append(current_blob_id) + for status in SyncStatus.UNAVAILABLE_STATUSES: + blob_id = uuid4().hex + deferreds.append(local.update_sync_status(blob_id, status)) + unavailable_ids.append(blob_id) available_blob_id = uuid4().hex content, length = self.cleartext, len(self.cleartext.getvalue()) deferreds.append(local.put(available_blob_id, content, length)) @@ -232,11 +234,10 @@ class BlobManagerTestCase(unittest.TestCase): def test_get_doesnt_include_unavailable_blobs(self): local = self.manager.local unavailable_ids, deferreds = [], [] - for unavailable_status in SyncStatus.UNAVAILABLE_STATUSES: - current_blob_id = uuid4().hex - deferreds.append(local.put(current_blob_id, BytesIO(''), 0, - status=unavailable_status)) - unavailable_ids.append(current_blob_id) + for status in SyncStatus.UNAVAILABLE_STATUSES: + blob_id = uuid4().hex + deferreds.append(local.update_sync_status(blob_id, status)) + unavailable_ids.append(blob_id) available_blob_id = uuid4().hex content, length = self.cleartext, len(self.cleartext.getvalue()) deferreds.append(local.put(available_blob_id, content, length)) @@ -256,13 +257,15 @@ class BlobManagerTestCase(unittest.TestCase): content, pending = self.cleartext, SyncStatus.PENDING_UPLOAD length, deferreds = len(content.getvalue()), [] for blob_id in local_ids: - d = local.put(blob_id, content, length, status=pending) + d = local.put(blob_id, content, length) + deferreds.append(d) + d = local.update_sync_status(blob_id, pending) deferreds.append(d) yield defer.gatherResults(deferreds) yield self.manager.refresh_sync_status_from_server() - d = self.manager.local_list(sync_status=SyncStatus.PENDING_UPLOAD) + d = self.manager.local_list_status(SyncStatus.PENDING_UPLOAD) pending_upload_list = yield d - d = self.manager.local_list(sync_status=SyncStatus.PENDING_DOWNLOAD) + d = self.manager.local_list_status(SyncStatus.PENDING_DOWNLOAD) pending_download_list = yield d self.assertEquals(set(pending_upload_list), set(local_ids)) self.assertEquals(set(pending_download_list), set(remote_ids)) diff --git a/tests/server/test_blobs_server.py b/tests/server/test_blobs_server.py index c4a00ab5..1d66d0ef 100644 --- a/tests/server/test_blobs_server.py +++ b/tests/server/test_blobs_server.py @@ -48,7 +48,7 @@ def sleep(x): class BlobServerTestCase(unittest.TestCase): def setUp(self): - client_blobs.MAX_WAIT = 0.1 + client_blobs.sync.MAX_WAIT = 0.1 root = server_blobs.BlobsResource("filesystem", self.tempdir) self.site = Site(root) self.port = reactor.listenTCP(0, self.site, interface='127.0.0.1') @@ -261,6 +261,8 @@ class BlobServerTestCase(unittest.TestCase): self.addCleanup(manager.close) blob_id = 'local_only_blob_id' yield manager.local.put(blob_id, BytesIO("X"), size=1) + pending = SyncStatus.PENDING_UPLOAD + yield manager.local.update_sync_status(blob_id, pending) yield manager.send_missing() result = yield manager._download_and_decrypt(blob_id) self.assertIsNotNone(result) @@ -274,6 +276,8 @@ class BlobServerTestCase(unittest.TestCase): self.addCleanup(manager.close) blob_id = 'remote_only_blob_id' yield manager.local.put(blob_id, BytesIO("X"), size=1) + pending = SyncStatus.PENDING_UPLOAD + yield manager.local.update_sync_status(blob_id, pending) yield self.port.stopListening() d = manager.send_missing() |