diff options
author | drebs <drebs@riseup.net> | 2017-10-05 15:24:46 -0300 |
---|---|---|
committer | drebs <drebs@riseup.net> | 2017-10-05 16:25:51 -0300 |
commit | 2772d76d0d66d9e094a435e110fd9f945d9e105c (patch) | |
tree | 6ac5c9c1fe78a16c649ff3d8c3e89ef4b1a78fc4 | |
parent | e98dd6b7b7366c9ae6ca18c6e94866fbf8641afe (diff) |
[bug] fix retries for blobs download
- add a MaximumRetriesError exception to encapsulate other exceptions.
- record the pending status before trying to download
- modify update_sync_status to insert or update
- modify retry tests to check number of retries
- add a test for download retry limit
-rw-r--r-- | src/leap/soledad/client/_db/blobs.py | 63 | ||||
-rw-r--r-- | tests/blobs/test_blob_manager.py | 49 | ||||
-rw-r--r-- | tests/server/test_blobs_server.py | 7 |
3 files changed, 86 insertions, 33 deletions
diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py index 527a6418..4edb77f4 100644 --- a/src/leap/soledad/client/_db/blobs.py +++ b/src/leap/soledad/client/_db/blobs.py @@ -151,6 +151,10 @@ class RetriableTransferError(Exception): pass +class MaximumRetriesError(Exception): + pass + + def sleep(seconds): d = defer.Deferred() reactor.callLater(seconds, d.callback, None) @@ -346,19 +350,21 @@ class BlobManager(object): @defer.inlineCallbacks def __send_one(self, blob_id, namespace, i, total): - logger.info("Sending blob to server (%d/%d): %s" - % (i, total, blob_id)) - fd = yield self.local.get(blob_id, namespace=namespace) - try: - yield self._encrypt_and_upload(blob_id, fd) - yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED) - except Exception as e: - yield self.local.increment_retries(blob_id) - _, retries = yield self.local.get_sync_status(blob_id) - if retries > self.max_retries: - failed_upload = SyncStatus.FAILED_UPLOAD - yield self.local.update_sync_status(blob_id, failed_upload) - raise e + logger.info("Sending blob to server (%d/%d): %s" + % (i, total, blob_id)) + fd = yield self.local.get(blob_id, namespace=namespace) + try: + yield self._encrypt_and_upload(blob_id, fd) + yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED) + except Exception as e: + yield self.local.increment_retries(blob_id) + res = yield self.local.get_sync_status(blob_id) + _, retries = res + if (retries + 1) > self.max_retries: + failed_upload = SyncStatus.FAILED_UPLOAD + yield self.local.update_sync_status(blob_id, failed_upload) + raise MaximumRetriesError(e) + raise e @defer.inlineCallbacks def fetch_missing(self, namespace=''): @@ -489,6 +495,9 @@ class BlobManager(object): logger.info("Found blob in local database: %s" % blob_id) defer.returnValue(local_blob) + yield self.local.update_sync_status( + blob_id, SyncStatus.PENDING_DOWNLOAD, namespace=namespace) + try: result = yield self._download_and_decrypt(blob_id, namespace) except Exception as e: @@ -505,12 +514,14 @@ class BlobManager(object): logger.error(message) yield self.local.increment_retries(blob_id) + if (retries + 1) >= self.max_retries: failed_download = SyncStatus.FAILED_DOWNLOAD - yield self.local.update_sync_status(blob_id, failed_download) - raise e - else: - raise RetriableTransferError(e) + yield self.local.update_sync_status( + blob_id, failed_download, namespace=namespace) + raise MaximumRetriesError(e) + + raise RetriableTransferError(e) if not result: defer.returnValue(None) @@ -684,10 +695,22 @@ class SQLiteBlobBackend(object): else: defer.returnValue([]) - def update_sync_status(self, blob_id, sync_status): - query = 'update blobs set sync_status = ? where blob_id = ?' + @defer.inlineCallbacks + def update_sync_status(self, blob_id, sync_status, namespace=""): + query = 'SELECT sync_status FROM blobs WHERE blob_id = ?' + result = yield self.dbpool.runQuery(query, (blob_id,)) + + if not result: + insert = 'INSERT INTO blobs' + insert += ' (blob_id, namespace, payload, sync_status)' + insert += ' VALUES (?, ?, zeroblob(0), ?)' + values = (blob_id, namespace, sync_status) + yield self.dbpool.runOperation(insert, values) + return + + update = 'UPDATE blobs SET sync_status = ? WHERE blob_id = ?' values = (sync_status, blob_id,) - return self.dbpool.runQuery(query, values) + result = yield self.dbpool.runOperation(update, values) def update_batch_sync_status(self, blob_id_list, sync_status, namespace=''): diff --git a/tests/blobs/test_blob_manager.py b/tests/blobs/test_blob_manager.py index c6f84e29..c151d8c3 100644 --- a/tests/blobs/test_blob_manager.py +++ b/tests/blobs/test_blob_manager.py @@ -19,16 +19,20 @@ Tests for BlobManager. """ from twisted.trial import unittest from twisted.internet import defer -from twisted.web.error import SchemeNotSupported from leap.soledad.client._db.blobs import BlobManager, BlobDoc, FIXED_REV from leap.soledad.client._db.blobs import BlobAlreadyExistsError from leap.soledad.client._db.blobs import SyncStatus +from leap.soledad.client._db.blobs import RetriableTransferError from io import BytesIO from mock import Mock from uuid import uuid4 import pytest import os +# monkey-patch the blobmanager MAX_WAIT time so tests run faster +from leap.soledad.client._db import blobs +blobs.MAX_WAIT = 1 + class BlobManagerTestCase(unittest.TestCase): @@ -155,8 +159,7 @@ class BlobManagerTestCase(unittest.TestCase): self.manager._encrypt_and_upload = Mock(return_value=upload_failure) content, blob_id = "Blob content", uuid4().hex doc1 = BlobDoc(BytesIO(content), blob_id) - with pytest.raises(SchemeNotSupported): - # should fail because manager URL is invalid + with pytest.raises(Exception): yield self.manager.put(doc1, len(content)) pending_upload = SyncStatus.PENDING_UPLOAD local_list = yield self.manager.local_list(sync_status=pending_upload) @@ -165,18 +168,44 @@ class BlobManagerTestCase(unittest.TestCase): @defer.inlineCallbacks @pytest.mark.usefixtures("method_tmpdir") def test_upload_retry_limit(self): + # prepare the manager to fail accordingly self.manager.remote_list = Mock(return_value=[]) + self.manager._encrypt_and_upload = Mock( + side_effect=RetriableTransferError) + # put a blob in local storage content, blob_id = "Blob content", uuid4().hex - doc1 = BlobDoc(BytesIO(content), blob_id) - with pytest.raises(SchemeNotSupported): - # should fail because manager URL is invalid - yield self.manager.put(doc1, len(content)) - for _ in range(self.manager.max_retries + 1): - with pytest.raises(defer.FirstError): - yield self.manager.send_missing() + yield self.manager.local.put(blob_id, BytesIO(content), len(content)) + # try to send missing + with pytest.raises(defer.FirstError): + yield self.manager.send_missing() + # assert failed state and number of retries failed_upload = SyncStatus.FAILED_UPLOAD local_list = yield self.manager.local_list(sync_status=failed_upload) self.assertIn(blob_id, local_list) + sync_status, retries = \ + yield self.manager.local.get_sync_status(blob_id) + self.assertEqual(failed_upload, sync_status) + self.assertEqual(self.manager.max_retries, retries) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_download_retry_limit(self): + # prepare the manager to fail accordingly + blob_id = uuid4().hex + self.manager.local_list = Mock(return_value=[blob_id]) + self.manager._download_and_decrypt = Mock( + side_effect=RetriableTransferError) + # try to fetch missing + with pytest.raises(defer.FirstError): + yield self.manager.fetch_missing() + # assert failed state and number of retries + failed_download = SyncStatus.FAILED_DOWNLOAD + local_list = yield self.manager.local.list(sync_status=failed_download) + self.assertIn(blob_id, local_list) + sync_status, retries = \ + yield self.manager.local.get_sync_status(blob_id) + self.assertEqual(failed_download, sync_status) + self.assertEqual(self.manager.max_retries, retries) @defer.inlineCallbacks @pytest.mark.usefixtures("method_tmpdir") diff --git a/tests/server/test_blobs_server.py b/tests/server/test_blobs_server.py index 892ae5ff..c4a00ab5 100644 --- a/tests/server/test_blobs_server.py +++ b/tests/server/test_blobs_server.py @@ -34,8 +34,9 @@ from leap.soledad.client._db.blobs import BlobAlreadyExistsError from leap.soledad.client._db.blobs import InvalidFlagsError from leap.soledad.client._db.blobs import SoledadError from leap.soledad.client._db.blobs import SyncStatus +from leap.soledad.client._db.blobs import RetriableTransferError +from leap.soledad.client._db.blobs import MaximumRetriesError from leap.soledad.client._db import blobs as client_blobs -from leap.soledad.client._crypto import InvalidBlob def sleep(x): @@ -333,7 +334,7 @@ class BlobServerTestCase(unittest.TestCase): # Corrupt the tag (last 16 bytes) corrupted_blob.seek(-16, 2) corrupted_blob.write('x' * 16) - with pytest.raises(InvalidBlob): + with pytest.raises(MaximumRetriesError): yield manager.sync() status, retries = yield manager.local.get_sync_status(blob_id) self.assertEquals(status, SyncStatus.FAILED_DOWNLOAD) @@ -370,7 +371,7 @@ class BlobServerTestCase(unittest.TestCase): manager = BlobManager(self.tempdir, self.uri, self.secret, self.secret, uuid4().hex) self.addCleanup(manager.close) - with pytest.raises(SoledadError): + with pytest.raises(RetriableTransferError): yield manager.get('missing_id') @defer.inlineCallbacks |