summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Shyba <victor1984@riseup.net>2017-05-01 20:11:19 -0300
committerVictor Shyba <victor1984@riseup.net>2017-05-01 20:11:19 -0300
commitaba7274ed5682335bd91d94d5b122db440872378 (patch)
treed9d644e0936a9fde3f8122547d93a8f7c04d6d55
parent5854faf9b6de1297d6ebdd95ef700228c7fe5fb6 (diff)
[refactor] unify path validation
-rw-r--r--server/src/leap/soledad/server/_blobs.py32
-rw-r--r--testing/tests/blobs/test_fs_backend.py23
2 files changed, 36 insertions, 19 deletions
diff --git a/server/src/leap/soledad/server/_blobs.py b/server/src/leap/soledad/server/_blobs.py
index 72125d5a..1595a549 100644
--- a/server/src/leap/soledad/server/_blobs.py
+++ b/server/src/leap/soledad/server/_blobs.py
@@ -114,9 +114,7 @@ class FilesystemBlobsBackend(object):
def list_blobs(self, user, request):
blob_ids = []
- assert VALID_STRINGS.match(user)
- base_path = os.path.join(self.path, user)
- assert self._valid_subdir(base_path)
+ base_path = self._get_path(user)
for _, _, filenames in os.walk(base_path):
blob_ids += filenames
return json.dumps(blob_ids)
@@ -157,8 +155,7 @@ class FilesystemBlobsBackend(object):
yield fbp.startProducing(open(path, 'wb'))
def get_total_storage(self, user):
- assert VALID_STRINGS.match(user)
- return self._get_disk_usage(os.path.join(self.path, user))
+ return self._get_disk_usage(self._get_path(user))
def delete_blob(user, blob_id):
raise NotImplementedError
@@ -170,26 +167,31 @@ class FilesystemBlobsBackend(object):
def _get_disk_usage(self, start_path):
if not os.path.isdir(start_path):
defer.returnValue(0)
- assert self._valid_subdir(start_path)
cmd = ['/usr/bin/du', '-s', '-c', start_path]
output = yield utils.getProcessOutput(cmd[0], cmd[1:])
size = output.split()[0]
defer.returnValue(int(size))
- def _valid_subdir(self, desired_path):
+ def _validate_path(self, desired_path, user, blob_id):
+ if not VALID_STRINGS.match(user):
+ raise Exception("Invalid characters on user: %s" % user)
+ if blob_id and not VALID_STRINGS.match(blob_id):
+ raise Exception("Invalid characters on blob_id: %s" % blob_id)
desired_path = os.path.realpath(desired_path) # expand path references
root = os.path.realpath(self.path)
- return desired_path.startswith(root + os.sep)
+ if not desired_path.startswith(root + os.sep + user):
+ err = "User %s tried accessing a invalid path: %s" % (user,
+ desired_path)
+ raise Exception(err)
+ return desired_path
- def _get_path(self, user, blob_id):
- assert VALID_STRINGS.match(user)
- assert VALID_STRINGS.match(blob_id)
+ def _get_path(self, user, blob_id=False):
parts = [user]
- parts += [blob_id[0], blob_id[0:3], blob_id[0:6]]
- parts += [blob_id]
+ if blob_id:
+ parts += [blob_id[0], blob_id[0:3], blob_id[0:6]]
+ parts += [blob_id]
path = os.path.join(self.path, *parts)
- assert self._valid_subdir(path)
- return path
+ return self._validate_path(path, user, blob_id)
class BlobsResource(resource.Resource):
diff --git a/testing/tests/blobs/test_fs_backend.py b/testing/tests/blobs/test_fs_backend.py
index 0d7e9789..f6f2db2d 100644
--- a/testing/tests/blobs/test_fs_backend.py
+++ b/testing/tests/blobs/test_fs_backend.py
@@ -99,9 +99,24 @@ class FilesystemBackendTestCase(unittest.TestCase):
self.assertEquals(result, ['blob_0', 'blob_1'])
@pytest.mark.usefixtures("method_tmpdir")
- def test_path_validation_for_subdirectories(self):
+ def test_path_validation_on_read_blob(self):
blobs_path = self.tempdir
backend = _blobs.FilesystemBlobsBackend(blobs_path)
- self.assertFalse(backend._valid_subdir('/'))
- self.assertFalse(backend._valid_subdir(blobs_path + '../../../../../'))
- self.assertTrue(backend._valid_subdir(os.path.join(blobs_path, 'x')))
+ with pytest.raises(Exception):
+ backend.read_blob('..', '..', DummyRequest(['']))
+ with pytest.raises(Exception):
+ backend.read_blob('valid', '../../../', DummyRequest(['']))
+ with pytest.raises(Exception):
+ backend.read_blob('../../../', 'valid', DummyRequest(['']))
+
+ @pytest.mark.usefixtures("method_tmpdir")
+ @defer.inlineCallbacks
+ def test_path_validation_on_write_blob(self):
+ blobs_path = self.tempdir
+ backend = _blobs.FilesystemBlobsBackend(blobs_path)
+ with pytest.raises(Exception):
+ yield backend.write_blob('..', '..', DummyRequest(['']))
+ with pytest.raises(Exception):
+ yield backend.write_blob('valid', '../../../', DummyRequest(['']))
+ with pytest.raises(Exception):
+ yield backend.write_blob('../../../', 'valid', DummyRequest(['']))