summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordrebs <drebs@riseup.net>2017-10-05 15:24:46 -0300
committerdrebs <drebs@riseup.net>2017-10-05 16:25:51 -0300
commit2772d76d0d66d9e094a435e110fd9f945d9e105c (patch)
tree6ac5c9c1fe78a16c649ff3d8c3e89ef4b1a78fc4
parente98dd6b7b7366c9ae6ca18c6e94866fbf8641afe (diff)
[bug] fix retries for blobs download
- add a MaximumRetriesError exception to encapsulate other exceptions. - record the pending status before trying to download - modify update_sync_status to insert or update - modify retry tests to check number of retries - add a test for download retry limit
-rw-r--r--src/leap/soledad/client/_db/blobs.py63
-rw-r--r--tests/blobs/test_blob_manager.py49
-rw-r--r--tests/server/test_blobs_server.py7
3 files changed, 86 insertions, 33 deletions
diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py
index 527a6418..4edb77f4 100644
--- a/src/leap/soledad/client/_db/blobs.py
+++ b/src/leap/soledad/client/_db/blobs.py
@@ -151,6 +151,10 @@ class RetriableTransferError(Exception):
pass
+class MaximumRetriesError(Exception):
+ pass
+
+
def sleep(seconds):
d = defer.Deferred()
reactor.callLater(seconds, d.callback, None)
@@ -346,19 +350,21 @@ class BlobManager(object):
@defer.inlineCallbacks
def __send_one(self, blob_id, namespace, i, total):
- logger.info("Sending blob to server (%d/%d): %s"
- % (i, total, blob_id))
- fd = yield self.local.get(blob_id, namespace=namespace)
- try:
- yield self._encrypt_and_upload(blob_id, fd)
- yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED)
- except Exception as e:
- yield self.local.increment_retries(blob_id)
- _, retries = yield self.local.get_sync_status(blob_id)
- if retries > self.max_retries:
- failed_upload = SyncStatus.FAILED_UPLOAD
- yield self.local.update_sync_status(blob_id, failed_upload)
- raise e
+ logger.info("Sending blob to server (%d/%d): %s"
+ % (i, total, blob_id))
+ fd = yield self.local.get(blob_id, namespace=namespace)
+ try:
+ yield self._encrypt_and_upload(blob_id, fd)
+ yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED)
+ except Exception as e:
+ yield self.local.increment_retries(blob_id)
+ res = yield self.local.get_sync_status(blob_id)
+ _, retries = res
+ if (retries + 1) > self.max_retries:
+ failed_upload = SyncStatus.FAILED_UPLOAD
+ yield self.local.update_sync_status(blob_id, failed_upload)
+ raise MaximumRetriesError(e)
+ raise e
@defer.inlineCallbacks
def fetch_missing(self, namespace=''):
@@ -489,6 +495,9 @@ class BlobManager(object):
logger.info("Found blob in local database: %s" % blob_id)
defer.returnValue(local_blob)
+ yield self.local.update_sync_status(
+ blob_id, SyncStatus.PENDING_DOWNLOAD, namespace=namespace)
+
try:
result = yield self._download_and_decrypt(blob_id, namespace)
except Exception as e:
@@ -505,12 +514,14 @@ class BlobManager(object):
logger.error(message)
yield self.local.increment_retries(blob_id)
+
if (retries + 1) >= self.max_retries:
failed_download = SyncStatus.FAILED_DOWNLOAD
- yield self.local.update_sync_status(blob_id, failed_download)
- raise e
- else:
- raise RetriableTransferError(e)
+ yield self.local.update_sync_status(
+ blob_id, failed_download, namespace=namespace)
+ raise MaximumRetriesError(e)
+
+ raise RetriableTransferError(e)
if not result:
defer.returnValue(None)
@@ -684,10 +695,22 @@ class SQLiteBlobBackend(object):
else:
defer.returnValue([])
- def update_sync_status(self, blob_id, sync_status):
- query = 'update blobs set sync_status = ? where blob_id = ?'
+ @defer.inlineCallbacks
+ def update_sync_status(self, blob_id, sync_status, namespace=""):
+ query = 'SELECT sync_status FROM blobs WHERE blob_id = ?'
+ result = yield self.dbpool.runQuery(query, (blob_id,))
+
+ if not result:
+ insert = 'INSERT INTO blobs'
+ insert += ' (blob_id, namespace, payload, sync_status)'
+ insert += ' VALUES (?, ?, zeroblob(0), ?)'
+ values = (blob_id, namespace, sync_status)
+ yield self.dbpool.runOperation(insert, values)
+ return
+
+ update = 'UPDATE blobs SET sync_status = ? WHERE blob_id = ?'
values = (sync_status, blob_id,)
- return self.dbpool.runQuery(query, values)
+ result = yield self.dbpool.runOperation(update, values)
def update_batch_sync_status(self, blob_id_list, sync_status,
namespace=''):
diff --git a/tests/blobs/test_blob_manager.py b/tests/blobs/test_blob_manager.py
index c6f84e29..c151d8c3 100644
--- a/tests/blobs/test_blob_manager.py
+++ b/tests/blobs/test_blob_manager.py
@@ -19,16 +19,20 @@ Tests for BlobManager.
"""
from twisted.trial import unittest
from twisted.internet import defer
-from twisted.web.error import SchemeNotSupported
from leap.soledad.client._db.blobs import BlobManager, BlobDoc, FIXED_REV
from leap.soledad.client._db.blobs import BlobAlreadyExistsError
from leap.soledad.client._db.blobs import SyncStatus
+from leap.soledad.client._db.blobs import RetriableTransferError
from io import BytesIO
from mock import Mock
from uuid import uuid4
import pytest
import os
+# monkey-patch the blobmanager MAX_WAIT time so tests run faster
+from leap.soledad.client._db import blobs
+blobs.MAX_WAIT = 1
+
class BlobManagerTestCase(unittest.TestCase):
@@ -155,8 +159,7 @@ class BlobManagerTestCase(unittest.TestCase):
self.manager._encrypt_and_upload = Mock(return_value=upload_failure)
content, blob_id = "Blob content", uuid4().hex
doc1 = BlobDoc(BytesIO(content), blob_id)
- with pytest.raises(SchemeNotSupported):
- # should fail because manager URL is invalid
+ with pytest.raises(Exception):
yield self.manager.put(doc1, len(content))
pending_upload = SyncStatus.PENDING_UPLOAD
local_list = yield self.manager.local_list(sync_status=pending_upload)
@@ -165,18 +168,44 @@ class BlobManagerTestCase(unittest.TestCase):
@defer.inlineCallbacks
@pytest.mark.usefixtures("method_tmpdir")
def test_upload_retry_limit(self):
+ # prepare the manager to fail accordingly
self.manager.remote_list = Mock(return_value=[])
+ self.manager._encrypt_and_upload = Mock(
+ side_effect=RetriableTransferError)
+ # put a blob in local storage
content, blob_id = "Blob content", uuid4().hex
- doc1 = BlobDoc(BytesIO(content), blob_id)
- with pytest.raises(SchemeNotSupported):
- # should fail because manager URL is invalid
- yield self.manager.put(doc1, len(content))
- for _ in range(self.manager.max_retries + 1):
- with pytest.raises(defer.FirstError):
- yield self.manager.send_missing()
+ yield self.manager.local.put(blob_id, BytesIO(content), len(content))
+ # try to send missing
+ with pytest.raises(defer.FirstError):
+ yield self.manager.send_missing()
+ # assert failed state and number of retries
failed_upload = SyncStatus.FAILED_UPLOAD
local_list = yield self.manager.local_list(sync_status=failed_upload)
self.assertIn(blob_id, local_list)
+ sync_status, retries = \
+ yield self.manager.local.get_sync_status(blob_id)
+ self.assertEqual(failed_upload, sync_status)
+ self.assertEqual(self.manager.max_retries, retries)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_download_retry_limit(self):
+ # prepare the manager to fail accordingly
+ blob_id = uuid4().hex
+ self.manager.local_list = Mock(return_value=[blob_id])
+ self.manager._download_and_decrypt = Mock(
+ side_effect=RetriableTransferError)
+ # try to fetch missing
+ with pytest.raises(defer.FirstError):
+ yield self.manager.fetch_missing()
+ # assert failed state and number of retries
+ failed_download = SyncStatus.FAILED_DOWNLOAD
+ local_list = yield self.manager.local.list(sync_status=failed_download)
+ self.assertIn(blob_id, local_list)
+ sync_status, retries = \
+ yield self.manager.local.get_sync_status(blob_id)
+ self.assertEqual(failed_download, sync_status)
+ self.assertEqual(self.manager.max_retries, retries)
@defer.inlineCallbacks
@pytest.mark.usefixtures("method_tmpdir")
diff --git a/tests/server/test_blobs_server.py b/tests/server/test_blobs_server.py
index 892ae5ff..c4a00ab5 100644
--- a/tests/server/test_blobs_server.py
+++ b/tests/server/test_blobs_server.py
@@ -34,8 +34,9 @@ from leap.soledad.client._db.blobs import BlobAlreadyExistsError
from leap.soledad.client._db.blobs import InvalidFlagsError
from leap.soledad.client._db.blobs import SoledadError
from leap.soledad.client._db.blobs import SyncStatus
+from leap.soledad.client._db.blobs import RetriableTransferError
+from leap.soledad.client._db.blobs import MaximumRetriesError
from leap.soledad.client._db import blobs as client_blobs
-from leap.soledad.client._crypto import InvalidBlob
def sleep(x):
@@ -333,7 +334,7 @@ class BlobServerTestCase(unittest.TestCase):
# Corrupt the tag (last 16 bytes)
corrupted_blob.seek(-16, 2)
corrupted_blob.write('x' * 16)
- with pytest.raises(InvalidBlob):
+ with pytest.raises(MaximumRetriesError):
yield manager.sync()
status, retries = yield manager.local.get_sync_status(blob_id)
self.assertEquals(status, SyncStatus.FAILED_DOWNLOAD)
@@ -370,7 +371,7 @@ class BlobServerTestCase(unittest.TestCase):
manager = BlobManager(self.tempdir, self.uri, self.secret,
self.secret, uuid4().hex)
self.addCleanup(manager.close)
- with pytest.raises(SoledadError):
+ with pytest.raises(RetriableTransferError):
yield manager.get('missing_id')
@defer.inlineCallbacks