diff options
-rw-r--r-- | server/src/leap/soledad/server/_blobs.py | 11 | ||||
-rw-r--r-- | testing/tests/server/test_blobs_resource_validation.py | 63 |
2 files changed, 72 insertions, 2 deletions
diff --git a/server/src/leap/soledad/server/_blobs.py b/server/src/leap/soledad/server/_blobs.py index 3dd4ccb4..9dc4b9e7 100644 --- a/server/src/leap/soledad/server/_blobs.py +++ b/server/src/leap/soledad/server/_blobs.py @@ -27,6 +27,7 @@ environments. import os import base64 import json +import re from twisted.logger import Logger from twisted.web import static @@ -195,7 +196,7 @@ class BlobsResource(resource.Resource): def render_GET(self, request): logger.info("http get: %s" % request.path) - user, blob_id = request.postpath + user, blob_id = self._validate(request) if not blob_id: return self._handler.list_blobs(user, request) self._handler.tag_header(user, blob_id, request) @@ -203,7 +204,7 @@ class BlobsResource(resource.Resource): def render_PUT(self, request): logger.info("http put: %s" % request.path) - user, blob_id = request.postpath + user, blob_id = self._validate(request) d = self._handler.write_blob(user, blob_id, request) d.addCallback(lambda _: request.finish()) d.addErrback(self._error, request) @@ -214,6 +215,12 @@ class BlobsResource(resource.Resource): request.setResponseCode(500) request.finish() + def _validate(self, request): + for arg in request.postpath: + if arg and not re.match('^[a-zA-Z0-9_-]+$', arg): + raise Exception('Invalid blob resource argument: %s' % arg) + return request.postpath + if __name__ == '__main__': # A dummy blob server diff --git a/testing/tests/server/test_blobs_resource_validation.py b/testing/tests/server/test_blobs_resource_validation.py new file mode 100644 index 00000000..9fbb1b2e --- /dev/null +++ b/testing/tests/server/test_blobs_resource_validation.py @@ -0,0 +1,63 @@ +# -*- coding: utf-8 -*- +# test_blobs_resource_validation.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for invalid user or blob_id on blobs resource +""" +import pytest +from twisted.trial import unittest +from twisted.web.test.test_web import DummyRequest +from leap.soledad.server import _blobs as server_blobs + + +class BlobServerTestCase(unittest.TestCase): + + @pytest.mark.usefixtures("method_tmpdir") + def setUp(self): + self.resource = server_blobs.BlobsResource(self.tempdir) + + @pytest.mark.usefixtures("method_tmpdir") + def test_valid_arguments(self): + request = DummyRequest(['v4l1d-us3r', 'v4l1d-bl0b-1d']) + self.assertTrue(self.resource._validate(request)) + + @pytest.mark.usefixtures("method_tmpdir") + def test_invalid_user_get(self): + request = DummyRequest(['invalid user', 'valid-blob-id']) + request.path = '/blobs/' + with pytest.raises(Exception): + self.resource.render_GET(request) + + @pytest.mark.usefixtures("method_tmpdir") + def test_invalid_user_put(self): + request = DummyRequest(['invalid user', 'valid-blob-id']) + request.path = '/blobs/' + with pytest.raises(Exception): + self.resource.render_PUT(request) + + @pytest.mark.usefixtures("method_tmpdir") + def test_invalid_blob_id_get(self): + request = DummyRequest(['valid-user', 'invalid blob id']) + request.path = '/blobs/' + with pytest.raises(Exception): + self.resource.render_GET(request) + + @pytest.mark.usefixtures("method_tmpdir") + def test_invalid_blob_id_put(self): + request = DummyRequest(['valid-user', 'invalid blob id']) + request.path = '/blobs/' + with pytest.raises(Exception): + self.resource.render_PUT(request) |