From b8bdbcda11bccaf2110c441b9d4000d0fe73048a Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 26 Sep 2017 17:46:15 -0300 Subject: [feature] notify, retry and fail from invalid tag Notify, log something meaninful and retry at most 3 times before marking the download as unusable (FAILED_DOWNLOAD). -- Related: #8825 --- src/leap/soledad/client/_db/blobs.py | 44 ++++++++++++++++++++++++++++++------ tests/server/test_blobs_server.py | 44 ++++++++++++++++++++++++++++-------- 2 files changed, 71 insertions(+), 17 deletions(-) diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py index d25a5ca6..bbb21b7c 100644 --- a/src/leap/soledad/client/_db/blobs.py +++ b/src/leap/soledad/client/_db/blobs.py @@ -41,6 +41,7 @@ from leap.common.files import mkdir_p from .._document import BlobDoc from .._crypto import DocInfo +from .._crypto import InvalidBlob from .._crypto import BlobEncryptor from .._crypto import BlobDecryptor from .._crypto import EncryptionSchemeNotImplementedException @@ -146,22 +147,31 @@ def check_http_status(code, blob_id=None, flags=None): raise SoledadError("Server Error: %s" % code) +class RetriableTransferException(Exception): + pass + + def sleep(seconds): d = defer.Deferred() reactor.callLater(seconds, d.callback, None) return d +MAX_WAIT = 60 # In seconds. Max time between retries + + @defer.inlineCallbacks def with_retry(func, *args, **kwargs): - retry_wait, max_wait = 1, 60 + retry_wait = 1 + retriable_errors = (error.ConnectError, error.ConnectionClosed, + RetriableTransferException,) while True: try: yield func(*args, **kwargs) break - except(error.ConnectError, error.ConnectionClosed): + except retriable_errors: yield sleep(retry_wait) - retry_wait = min(retry_wait + 10, max_wait) + retry_wait = min(retry_wait + 10, MAX_WAIT) class DecrypterBuffer(object): @@ -365,9 +375,12 @@ class BlobManager(object): @defer.inlineCallbacks def sync(self, namespace=''): - yield self.refresh_sync_status_from_server(namespace) - yield self.fetch_missing(namespace) - yield self.send_missing(namespace) + try: + yield self.refresh_sync_status_from_server(namespace) + yield self.fetch_missing(namespace) + yield self.send_missing(namespace) + except defer.FirstError, e: + e.subFailure.raiseException() @defer.inlineCallbacks def put(self, doc, size, namespace=''): @@ -515,7 +528,24 @@ class BlobManager(object): # incrementally collect the body of the response yield treq.collect(response, buf.write) - fd, size = buf.close() + try: + fd, size = buf.close() + except InvalidBlob, e: + _, retries = yield self.local.get_sync_status(blob_id) + message = "Corrupted blob received from server! ID: %s\n" + message += "Retries: %s - Attempts left: %s\n" + message += "There is a chance of tampering. If this problem " + message += "persists, please check your connection then report to " + message += "your provider sysadmin and submit a bug report." + message = message % (blob_id, retries, 3 - retries) + logger.error(message) + yield self.local.increment_retries(blob_id) + if (retries + 1) >= 3: + failed_download = SyncStatus.FAILED_DOWNLOAD + yield self.local.update_sync_status(blob_id, failed_download) + raise e + else: + raise RetriableTransferException() logger.info("Finished download: (%s, %d)" % (blob_id, size)) defer.returnValue((fd, size)) diff --git a/tests/server/test_blobs_server.py b/tests/server/test_blobs_server.py index e3bc761d..892ae5ff 100644 --- a/tests/server/test_blobs_server.py +++ b/tests/server/test_blobs_server.py @@ -33,11 +33,21 @@ from leap.soledad.client._db.blobs import BlobManager from leap.soledad.client._db.blobs import BlobAlreadyExistsError from leap.soledad.client._db.blobs import InvalidFlagsError from leap.soledad.client._db.blobs import SoledadError +from leap.soledad.client._db.blobs import SyncStatus +from leap.soledad.client._db import blobs as client_blobs +from leap.soledad.client._crypto import InvalidBlob + + +def sleep(x): + d = defer.Deferred() + reactor.callLater(x, d.callback, None) + return d class BlobServerTestCase(unittest.TestCase): def setUp(self): + client_blobs.MAX_WAIT = 0.1 root = server_blobs.BlobsResource("filesystem", self.tempdir) self.site = Site(root) self.port = reactor.listenTCP(0, self.site, interface='127.0.0.1') @@ -265,12 +275,8 @@ class BlobServerTestCase(unittest.TestCase): yield manager.local.put(blob_id, BytesIO("X"), size=1) yield self.port.stopListening() - def sleep(x): - d = defer.Deferred() - reactor.callLater(x, d.callback, None) - return d d = manager.send_missing() - yield sleep(1) + yield sleep(0.1) self.port = reactor.listenTCP( self.host.port, self.site, interface='127.0.0.1') yield d @@ -302,12 +308,8 @@ class BlobServerTestCase(unittest.TestCase): yield manager.refresh_sync_status_from_server() yield self.port.stopListening() - def sleep(x): - d = defer.Deferred() - reactor.callLater(x, d.callback, None) - return d d = manager.fetch_missing() - yield sleep(1) + yield sleep(0.1) self.port = reactor.listenTCP( self.host.port, self.site, interface='127.0.0.1') yield d @@ -315,6 +317,28 @@ class BlobServerTestCase(unittest.TestCase): self.assertIsNotNone(result) self.assertEquals(result.getvalue(), "X") + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_download_corrupted_tag_marks_blob_as_failed(self): + user_id = uuid4().hex + manager = BlobManager(self.tempdir, self.uri, self.secret, + self.secret, user_id) + self.addCleanup(manager.close) + blob_id = 'corrupted' + yield manager._encrypt_and_upload(blob_id, BytesIO("corrupted")) + parts = ['default'] + [blob_id[0], blob_id[0:3], blob_id[0:6]] + parts += [blob_id] + corrupted_blob_path = os.path.join(self.tempdir, user_id, *parts) + with open(corrupted_blob_path, 'r+b') as corrupted_blob: + # Corrupt the tag (last 16 bytes) + corrupted_blob.seek(-16, 2) + corrupted_blob.write('x' * 16) + with pytest.raises(InvalidBlob): + yield manager.sync() + status, retries = yield manager.local.get_sync_status(blob_id) + self.assertEquals(status, SyncStatus.FAILED_DOWNLOAD) + self.assertEquals(retries, 3) + @defer.inlineCallbacks @pytest.mark.usefixtures("method_tmpdir") def test_upload_then_delete_updates_list(self): -- cgit v1.2.3