diff options
-rw-r--r-- | server/src/leap/soledad/server/_blobs.py | 32 | ||||
-rw-r--r-- | testing/tests/blobs/test_fs_backend.py | 23 |
2 files changed, 36 insertions, 19 deletions
diff --git a/server/src/leap/soledad/server/_blobs.py b/server/src/leap/soledad/server/_blobs.py index 72125d5a..1595a549 100644 --- a/server/src/leap/soledad/server/_blobs.py +++ b/server/src/leap/soledad/server/_blobs.py @@ -114,9 +114,7 @@ class FilesystemBlobsBackend(object): def list_blobs(self, user, request): blob_ids = [] - assert VALID_STRINGS.match(user) - base_path = os.path.join(self.path, user) - assert self._valid_subdir(base_path) + base_path = self._get_path(user) for _, _, filenames in os.walk(base_path): blob_ids += filenames return json.dumps(blob_ids) @@ -157,8 +155,7 @@ class FilesystemBlobsBackend(object): yield fbp.startProducing(open(path, 'wb')) def get_total_storage(self, user): - assert VALID_STRINGS.match(user) - return self._get_disk_usage(os.path.join(self.path, user)) + return self._get_disk_usage(self._get_path(user)) def delete_blob(user, blob_id): raise NotImplementedError @@ -170,26 +167,31 @@ class FilesystemBlobsBackend(object): def _get_disk_usage(self, start_path): if not os.path.isdir(start_path): defer.returnValue(0) - assert self._valid_subdir(start_path) cmd = ['/usr/bin/du', '-s', '-c', start_path] output = yield utils.getProcessOutput(cmd[0], cmd[1:]) size = output.split()[0] defer.returnValue(int(size)) - def _valid_subdir(self, desired_path): + def _validate_path(self, desired_path, user, blob_id): + if not VALID_STRINGS.match(user): + raise Exception("Invalid characters on user: %s" % user) + if blob_id and not VALID_STRINGS.match(blob_id): + raise Exception("Invalid characters on blob_id: %s" % blob_id) desired_path = os.path.realpath(desired_path) # expand path references root = os.path.realpath(self.path) - return desired_path.startswith(root + os.sep) + if not desired_path.startswith(root + os.sep + user): + err = "User %s tried accessing a invalid path: %s" % (user, + desired_path) + raise Exception(err) + return desired_path - def _get_path(self, user, blob_id): - assert VALID_STRINGS.match(user) - assert VALID_STRINGS.match(blob_id) + def _get_path(self, user, blob_id=False): parts = [user] - parts += [blob_id[0], blob_id[0:3], blob_id[0:6]] - parts += [blob_id] + if blob_id: + parts += [blob_id[0], blob_id[0:3], blob_id[0:6]] + parts += [blob_id] path = os.path.join(self.path, *parts) - assert self._valid_subdir(path) - return path + return self._validate_path(path, user, blob_id) class BlobsResource(resource.Resource): diff --git a/testing/tests/blobs/test_fs_backend.py b/testing/tests/blobs/test_fs_backend.py index 0d7e9789..f6f2db2d 100644 --- a/testing/tests/blobs/test_fs_backend.py +++ b/testing/tests/blobs/test_fs_backend.py @@ -99,9 +99,24 @@ class FilesystemBackendTestCase(unittest.TestCase): self.assertEquals(result, ['blob_0', 'blob_1']) @pytest.mark.usefixtures("method_tmpdir") - def test_path_validation_for_subdirectories(self): + def test_path_validation_on_read_blob(self): blobs_path = self.tempdir backend = _blobs.FilesystemBlobsBackend(blobs_path) - self.assertFalse(backend._valid_subdir('/')) - self.assertFalse(backend._valid_subdir(blobs_path + '../../../../../')) - self.assertTrue(backend._valid_subdir(os.path.join(blobs_path, 'x'))) + with pytest.raises(Exception): + backend.read_blob('..', '..', DummyRequest([''])) + with pytest.raises(Exception): + backend.read_blob('valid', '../../../', DummyRequest([''])) + with pytest.raises(Exception): + backend.read_blob('../../../', 'valid', DummyRequest([''])) + + @pytest.mark.usefixtures("method_tmpdir") + @defer.inlineCallbacks + def test_path_validation_on_write_blob(self): + blobs_path = self.tempdir + backend = _blobs.FilesystemBlobsBackend(blobs_path) + with pytest.raises(Exception): + yield backend.write_blob('..', '..', DummyRequest([''])) + with pytest.raises(Exception): + yield backend.write_blob('valid', '../../../', DummyRequest([''])) + with pytest.raises(Exception): + yield backend.write_blob('../../../', 'valid', DummyRequest([''])) |