diff options
| -rw-r--r-- | src/leap/soledad/client/_db/blobs.py | 44 | ||||
| -rw-r--r-- | tests/server/test_blobs_server.py | 44 | 
2 files changed, 71 insertions, 17 deletions
| diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py index d25a5ca6..bbb21b7c 100644 --- a/src/leap/soledad/client/_db/blobs.py +++ b/src/leap/soledad/client/_db/blobs.py @@ -41,6 +41,7 @@ from leap.common.files import mkdir_p  from .._document import BlobDoc  from .._crypto import DocInfo +from .._crypto import InvalidBlob  from .._crypto import BlobEncryptor  from .._crypto import BlobDecryptor  from .._crypto import EncryptionSchemeNotImplementedException @@ -146,22 +147,31 @@ def check_http_status(code, blob_id=None, flags=None):          raise SoledadError("Server Error: %s" % code) +class RetriableTransferException(Exception): +    pass + +  def sleep(seconds):      d = defer.Deferred()      reactor.callLater(seconds, d.callback, None)      return d +MAX_WAIT = 60  # In seconds. Max time between retries + +  @defer.inlineCallbacks  def with_retry(func, *args, **kwargs): -    retry_wait, max_wait = 1, 60 +    retry_wait = 1 +    retriable_errors = (error.ConnectError, error.ConnectionClosed, +                        RetriableTransferException,)      while True:          try:              yield func(*args, **kwargs)              break -        except(error.ConnectError, error.ConnectionClosed): +        except retriable_errors:              yield sleep(retry_wait) -            retry_wait = min(retry_wait + 10, max_wait) +            retry_wait = min(retry_wait + 10, MAX_WAIT)  class DecrypterBuffer(object): @@ -365,9 +375,12 @@ class BlobManager(object):      @defer.inlineCallbacks      def sync(self, namespace=''): -        yield self.refresh_sync_status_from_server(namespace) -        yield self.fetch_missing(namespace) -        yield self.send_missing(namespace) +        try: +            yield self.refresh_sync_status_from_server(namespace) +            yield self.fetch_missing(namespace) +            yield self.send_missing(namespace) +        except defer.FirstError, e: +            e.subFailure.raiseException()      @defer.inlineCallbacks      def put(self, doc, size, namespace=''): @@ -515,7 +528,24 @@ class BlobManager(object):          # incrementally collect the body of the response          yield treq.collect(response, buf.write) -        fd, size = buf.close() +        try: +            fd, size = buf.close() +        except InvalidBlob, e: +            _, retries = yield self.local.get_sync_status(blob_id) +            message = "Corrupted blob received from server! ID: %s\n" +            message += "Retries: %s - Attempts left: %s\n" +            message += "There is a chance of tampering. If this problem " +            message += "persists, please check your connection then report to " +            message += "your provider sysadmin and submit a bug report." +            message = message % (blob_id, retries, 3 - retries) +            logger.error(message) +            yield self.local.increment_retries(blob_id) +            if (retries + 1) >= 3: +                failed_download = SyncStatus.FAILED_DOWNLOAD +                yield self.local.update_sync_status(blob_id, failed_download) +                raise e +            else: +                raise RetriableTransferException()          logger.info("Finished download: (%s, %d)" % (blob_id, size))          defer.returnValue((fd, size)) diff --git a/tests/server/test_blobs_server.py b/tests/server/test_blobs_server.py index e3bc761d..892ae5ff 100644 --- a/tests/server/test_blobs_server.py +++ b/tests/server/test_blobs_server.py @@ -33,11 +33,21 @@ from leap.soledad.client._db.blobs import BlobManager  from leap.soledad.client._db.blobs import BlobAlreadyExistsError  from leap.soledad.client._db.blobs import InvalidFlagsError  from leap.soledad.client._db.blobs import SoledadError +from leap.soledad.client._db.blobs import SyncStatus +from leap.soledad.client._db import blobs as client_blobs +from leap.soledad.client._crypto import InvalidBlob + + +def sleep(x): +    d = defer.Deferred() +    reactor.callLater(x, d.callback, None) +    return d  class BlobServerTestCase(unittest.TestCase):      def setUp(self): +        client_blobs.MAX_WAIT = 0.1          root = server_blobs.BlobsResource("filesystem", self.tempdir)          self.site = Site(root)          self.port = reactor.listenTCP(0, self.site, interface='127.0.0.1') @@ -265,12 +275,8 @@ class BlobServerTestCase(unittest.TestCase):          yield manager.local.put(blob_id, BytesIO("X"), size=1)          yield self.port.stopListening() -        def sleep(x): -            d = defer.Deferred() -            reactor.callLater(x, d.callback, None) -            return d          d = manager.send_missing() -        yield sleep(1) +        yield sleep(0.1)          self.port = reactor.listenTCP(              self.host.port, self.site, interface='127.0.0.1')          yield d @@ -302,12 +308,8 @@ class BlobServerTestCase(unittest.TestCase):          yield manager.refresh_sync_status_from_server()          yield self.port.stopListening() -        def sleep(x): -            d = defer.Deferred() -            reactor.callLater(x, d.callback, None) -            return d          d = manager.fetch_missing() -        yield sleep(1) +        yield sleep(0.1)          self.port = reactor.listenTCP(              self.host.port, self.site, interface='127.0.0.1')          yield d @@ -317,6 +319,28 @@ class BlobServerTestCase(unittest.TestCase):      @defer.inlineCallbacks      @pytest.mark.usefixtures("method_tmpdir") +    def test_download_corrupted_tag_marks_blob_as_failed(self): +        user_id = uuid4().hex +        manager = BlobManager(self.tempdir, self.uri, self.secret, +                              self.secret, user_id) +        self.addCleanup(manager.close) +        blob_id = 'corrupted' +        yield manager._encrypt_and_upload(blob_id, BytesIO("corrupted")) +        parts = ['default'] + [blob_id[0], blob_id[0:3], blob_id[0:6]] +        parts += [blob_id] +        corrupted_blob_path = os.path.join(self.tempdir, user_id, *parts) +        with open(corrupted_blob_path, 'r+b') as corrupted_blob: +            # Corrupt the tag (last 16 bytes) +            corrupted_blob.seek(-16, 2) +            corrupted_blob.write('x' * 16) +        with pytest.raises(InvalidBlob): +            yield manager.sync() +        status, retries = yield manager.local.get_sync_status(blob_id) +        self.assertEquals(status, SyncStatus.FAILED_DOWNLOAD) +        self.assertEquals(retries, 3) + +    @defer.inlineCallbacks +    @pytest.mark.usefixtures("method_tmpdir")      def test_upload_then_delete_updates_list(self):          manager = BlobManager('', self.uri, self.secret,                                self.secret, uuid4().hex) | 
