summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/leap/soledad/client/_db/blobs.py44
-rw-r--r--tests/server/test_blobs_server.py44
2 files changed, 71 insertions, 17 deletions
diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py
index d25a5ca6..bbb21b7c 100644
--- a/src/leap/soledad/client/_db/blobs.py
+++ b/src/leap/soledad/client/_db/blobs.py
@@ -41,6 +41,7 @@ from leap.common.files import mkdir_p
from .._document import BlobDoc
from .._crypto import DocInfo
+from .._crypto import InvalidBlob
from .._crypto import BlobEncryptor
from .._crypto import BlobDecryptor
from .._crypto import EncryptionSchemeNotImplementedException
@@ -146,22 +147,31 @@ def check_http_status(code, blob_id=None, flags=None):
raise SoledadError("Server Error: %s" % code)
+class RetriableTransferException(Exception):
+ pass
+
+
def sleep(seconds):
d = defer.Deferred()
reactor.callLater(seconds, d.callback, None)
return d
+MAX_WAIT = 60 # In seconds. Max time between retries
+
+
@defer.inlineCallbacks
def with_retry(func, *args, **kwargs):
- retry_wait, max_wait = 1, 60
+ retry_wait = 1
+ retriable_errors = (error.ConnectError, error.ConnectionClosed,
+ RetriableTransferException,)
while True:
try:
yield func(*args, **kwargs)
break
- except(error.ConnectError, error.ConnectionClosed):
+ except retriable_errors:
yield sleep(retry_wait)
- retry_wait = min(retry_wait + 10, max_wait)
+ retry_wait = min(retry_wait + 10, MAX_WAIT)
class DecrypterBuffer(object):
@@ -365,9 +375,12 @@ class BlobManager(object):
@defer.inlineCallbacks
def sync(self, namespace=''):
- yield self.refresh_sync_status_from_server(namespace)
- yield self.fetch_missing(namespace)
- yield self.send_missing(namespace)
+ try:
+ yield self.refresh_sync_status_from_server(namespace)
+ yield self.fetch_missing(namespace)
+ yield self.send_missing(namespace)
+ except defer.FirstError, e:
+ e.subFailure.raiseException()
@defer.inlineCallbacks
def put(self, doc, size, namespace=''):
@@ -515,7 +528,24 @@ class BlobManager(object):
# incrementally collect the body of the response
yield treq.collect(response, buf.write)
- fd, size = buf.close()
+ try:
+ fd, size = buf.close()
+ except InvalidBlob, e:
+ _, retries = yield self.local.get_sync_status(blob_id)
+ message = "Corrupted blob received from server! ID: %s\n"
+ message += "Retries: %s - Attempts left: %s\n"
+ message += "There is a chance of tampering. If this problem "
+ message += "persists, please check your connection then report to "
+ message += "your provider sysadmin and submit a bug report."
+ message = message % (blob_id, retries, 3 - retries)
+ logger.error(message)
+ yield self.local.increment_retries(blob_id)
+ if (retries + 1) >= 3:
+ failed_download = SyncStatus.FAILED_DOWNLOAD
+ yield self.local.update_sync_status(blob_id, failed_download)
+ raise e
+ else:
+ raise RetriableTransferException()
logger.info("Finished download: (%s, %d)" % (blob_id, size))
defer.returnValue((fd, size))
diff --git a/tests/server/test_blobs_server.py b/tests/server/test_blobs_server.py
index e3bc761d..892ae5ff 100644
--- a/tests/server/test_blobs_server.py
+++ b/tests/server/test_blobs_server.py
@@ -33,11 +33,21 @@ from leap.soledad.client._db.blobs import BlobManager
from leap.soledad.client._db.blobs import BlobAlreadyExistsError
from leap.soledad.client._db.blobs import InvalidFlagsError
from leap.soledad.client._db.blobs import SoledadError
+from leap.soledad.client._db.blobs import SyncStatus
+from leap.soledad.client._db import blobs as client_blobs
+from leap.soledad.client._crypto import InvalidBlob
+
+
+def sleep(x):
+ d = defer.Deferred()
+ reactor.callLater(x, d.callback, None)
+ return d
class BlobServerTestCase(unittest.TestCase):
def setUp(self):
+ client_blobs.MAX_WAIT = 0.1
root = server_blobs.BlobsResource("filesystem", self.tempdir)
self.site = Site(root)
self.port = reactor.listenTCP(0, self.site, interface='127.0.0.1')
@@ -265,12 +275,8 @@ class BlobServerTestCase(unittest.TestCase):
yield manager.local.put(blob_id, BytesIO("X"), size=1)
yield self.port.stopListening()
- def sleep(x):
- d = defer.Deferred()
- reactor.callLater(x, d.callback, None)
- return d
d = manager.send_missing()
- yield sleep(1)
+ yield sleep(0.1)
self.port = reactor.listenTCP(
self.host.port, self.site, interface='127.0.0.1')
yield d
@@ -302,12 +308,8 @@ class BlobServerTestCase(unittest.TestCase):
yield manager.refresh_sync_status_from_server()
yield self.port.stopListening()
- def sleep(x):
- d = defer.Deferred()
- reactor.callLater(x, d.callback, None)
- return d
d = manager.fetch_missing()
- yield sleep(1)
+ yield sleep(0.1)
self.port = reactor.listenTCP(
self.host.port, self.site, interface='127.0.0.1')
yield d
@@ -317,6 +319,28 @@ class BlobServerTestCase(unittest.TestCase):
@defer.inlineCallbacks
@pytest.mark.usefixtures("method_tmpdir")
+ def test_download_corrupted_tag_marks_blob_as_failed(self):
+ user_id = uuid4().hex
+ manager = BlobManager(self.tempdir, self.uri, self.secret,
+ self.secret, user_id)
+ self.addCleanup(manager.close)
+ blob_id = 'corrupted'
+ yield manager._encrypt_and_upload(blob_id, BytesIO("corrupted"))
+ parts = ['default'] + [blob_id[0], blob_id[0:3], blob_id[0:6]]
+ parts += [blob_id]
+ corrupted_blob_path = os.path.join(self.tempdir, user_id, *parts)
+ with open(corrupted_blob_path, 'r+b') as corrupted_blob:
+ # Corrupt the tag (last 16 bytes)
+ corrupted_blob.seek(-16, 2)
+ corrupted_blob.write('x' * 16)
+ with pytest.raises(InvalidBlob):
+ yield manager.sync()
+ status, retries = yield manager.local.get_sync_status(blob_id)
+ self.assertEquals(status, SyncStatus.FAILED_DOWNLOAD)
+ self.assertEquals(retries, 3)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
def test_upload_then_delete_updates_list(self):
manager = BlobManager('', self.uri, self.secret,
self.secret, uuid4().hex)