summaryrefslogtreecommitdiff
path: root/testing/tests/blobs/test_blob_manager.py
diff options
context:
space:
mode:
Diffstat (limited to 'testing/tests/blobs/test_blob_manager.py')
-rw-r--r--testing/tests/blobs/test_blob_manager.py30
1 files changed, 30 insertions, 0 deletions
diff --git a/testing/tests/blobs/test_blob_manager.py b/testing/tests/blobs/test_blob_manager.py
index 087c17e6..dd57047d 100644
--- a/testing/tests/blobs/test_blob_manager.py
+++ b/testing/tests/blobs/test_blob_manager.py
@@ -19,8 +19,10 @@ Tests for BlobManager.
"""
from twisted.trial import unittest
from twisted.internet import defer
+from twisted.web.error import SchemeNotSupported
from leap.soledad.client._db.blobs import BlobManager, BlobDoc, FIXED_REV
from leap.soledad.client._db.blobs import BlobAlreadyExistsError
+from leap.soledad.client._db.blobs import SyncStatus
from io import BytesIO
from mock import Mock
from uuid import uuid4
@@ -145,3 +147,31 @@ class BlobManagerTestCase(unittest.TestCase):
self.assertEquals(0, len(local_list))
params = {'namespace': ''}
self.manager._delete_from_remote.assert_called_with(blob_id, **params)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_local_sync_status_pending_upload(self):
+ upload_failure = defer.fail(Exception())
+ self.manager._encrypt_and_upload = Mock(return_value=upload_failure)
+ content, blob_id = "Blob content", uuid4().hex
+ doc1 = BlobDoc(BytesIO(content), blob_id)
+ with pytest.raises(Exception):
+ yield self.manager.put(doc1, len(content))
+ pending_upload = SyncStatus.PENDING_UPLOAD
+ local_list = yield self.manager.local_list(sync_status=pending_upload)
+ self.assertIn(blob_id, local_list)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_upload_retry_limit(self):
+ self.manager.remote_list = Mock(return_value=[])
+ content, blob_id = "Blob content", uuid4().hex
+ doc1 = BlobDoc(BytesIO(content), blob_id)
+ with pytest.raises(Exception):
+ yield self.manager.put(doc1, len(content))
+ for _ in range(self.manager.max_retries + 1):
+ with pytest.raises(SchemeNotSupported):
+ yield self.manager.send_missing()
+ failed_upload = SyncStatus.FAILED_UPLOAD
+ local_list = yield self.manager.local_list(sync_status=failed_upload)
+ self.assertIn(blob_id, local_list)