From 0c22a7047553afdc1ed8a33bea17ccbe842e5e6e Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 1 May 2017 06:21:01 -0300 Subject: [feature] blobs path validation Check if user and blob_id are valid strings, then check if the resulting path is a subdirectory of blobs configured path. - Related: #8800 --- server/src/leap/soledad/server/_blobs.py | 19 +++++++++++++++++-- testing/tests/blobs/test_fs_backend.py | 8 ++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/server/src/leap/soledad/server/_blobs.py b/server/src/leap/soledad/server/_blobs.py index 84056c0a..72125d5a 100644 --- a/server/src/leap/soledad/server/_blobs.py +++ b/server/src/leap/soledad/server/_blobs.py @@ -43,6 +43,8 @@ __all__ = ['BlobsResource'] logger = Logger() +# Used for sanitizers, we accept only letters, numbers, '-' and '_' +VALID_STRINGS = re.compile('^[a-zA-Z0-9_-]+$') # TODO some error handling needed @@ -112,7 +114,9 @@ class FilesystemBlobsBackend(object): def list_blobs(self, user, request): blob_ids = [] + assert VALID_STRINGS.match(user) base_path = os.path.join(self.path, user) + assert self._valid_subdir(base_path) for _, _, filenames in os.walk(base_path): blob_ids += filenames return json.dumps(blob_ids) @@ -153,6 +157,7 @@ class FilesystemBlobsBackend(object): yield fbp.startProducing(open(path, 'wb')) def get_total_storage(self, user): + assert VALID_STRINGS.match(user) return self._get_disk_usage(os.path.join(self.path, user)) def delete_blob(user, blob_id): @@ -165,16 +170,26 @@ class FilesystemBlobsBackend(object): def _get_disk_usage(self, start_path): if not os.path.isdir(start_path): defer.returnValue(0) + assert self._valid_subdir(start_path) cmd = ['/usr/bin/du', '-s', '-c', start_path] output = yield utils.getProcessOutput(cmd[0], cmd[1:]) size = output.split()[0] defer.returnValue(int(size)) + def _valid_subdir(self, desired_path): + desired_path = os.path.realpath(desired_path) # expand path references + root = os.path.realpath(self.path) + return desired_path.startswith(root + os.sep) + def _get_path(self, user, blob_id): + assert VALID_STRINGS.match(user) + assert VALID_STRINGS.match(blob_id) parts = [user] parts += [blob_id[0], blob_id[0:3], blob_id[0:6]] parts += [blob_id] - return os.path.join(self.path, *parts) + path = os.path.join(self.path, *parts) + assert self._valid_subdir(path) + return path class BlobsResource(resource.Resource): @@ -217,7 +232,7 @@ class BlobsResource(resource.Resource): def _validate(self, request): for arg in request.postpath: - if arg and not re.match('^[a-zA-Z0-9_-]+$', arg): + if arg and not VALID_STRINGS.match(arg): raise Exception('Invalid blob resource argument: %s' % arg) return request.postpath diff --git a/testing/tests/blobs/test_fs_backend.py b/testing/tests/blobs/test_fs_backend.py index 39ee0028..0d7e9789 100644 --- a/testing/tests/blobs/test_fs_backend.py +++ b/testing/tests/blobs/test_fs_backend.py @@ -97,3 +97,11 @@ class FilesystemBackendTestCase(unittest.TestCase): walk_mock.return_value = [(_, _, ['blob_0']), (_, _, ['blob_1'])] result = json.loads(backend.list_blobs('user', DummyRequest(['']))) self.assertEquals(result, ['blob_0', 'blob_1']) + + @pytest.mark.usefixtures("method_tmpdir") + def test_path_validation_for_subdirectories(self): + blobs_path = self.tempdir + backend = _blobs.FilesystemBlobsBackend(blobs_path) + self.assertFalse(backend._valid_subdir('/')) + self.assertFalse(backend._valid_subdir(blobs_path + '../../../../../')) + self.assertTrue(backend._valid_subdir(os.path.join(blobs_path, 'x'))) -- cgit v1.2.3