diff options
| -rw-r--r-- | server/src/leap/soledad/server/_blobs.py | 19 | ||||
| -rw-r--r-- | testing/tests/blobs/test_fs_backend.py | 8 | 
2 files changed, 25 insertions, 2 deletions
| diff --git a/server/src/leap/soledad/server/_blobs.py b/server/src/leap/soledad/server/_blobs.py index 84056c0a..72125d5a 100644 --- a/server/src/leap/soledad/server/_blobs.py +++ b/server/src/leap/soledad/server/_blobs.py @@ -43,6 +43,8 @@ __all__ = ['BlobsResource']  logger = Logger() +# Used for sanitizers, we accept only letters, numbers, '-' and '_' +VALID_STRINGS = re.compile('^[a-zA-Z0-9_-]+$')  # TODO some error handling needed @@ -112,7 +114,9 @@ class FilesystemBlobsBackend(object):      def list_blobs(self, user, request):          blob_ids = [] +        assert VALID_STRINGS.match(user)          base_path = os.path.join(self.path, user) +        assert self._valid_subdir(base_path)          for _, _, filenames in os.walk(base_path):              blob_ids += filenames          return json.dumps(blob_ids) @@ -153,6 +157,7 @@ class FilesystemBlobsBackend(object):          yield fbp.startProducing(open(path, 'wb'))      def get_total_storage(self, user): +        assert VALID_STRINGS.match(user)          return self._get_disk_usage(os.path.join(self.path, user))      def delete_blob(user, blob_id): @@ -165,16 +170,26 @@ class FilesystemBlobsBackend(object):      def _get_disk_usage(self, start_path):          if not os.path.isdir(start_path):              defer.returnValue(0) +        assert self._valid_subdir(start_path)          cmd = ['/usr/bin/du', '-s', '-c', start_path]          output = yield utils.getProcessOutput(cmd[0], cmd[1:])          size = output.split()[0]          defer.returnValue(int(size)) +    def _valid_subdir(self, desired_path): +        desired_path = os.path.realpath(desired_path)  # expand path references +        root = os.path.realpath(self.path) +        return desired_path.startswith(root + os.sep) +      def _get_path(self, user, blob_id): +        assert VALID_STRINGS.match(user) +        assert VALID_STRINGS.match(blob_id)          parts = [user]          parts += [blob_id[0], blob_id[0:3], blob_id[0:6]]          parts += [blob_id] -        return os.path.join(self.path, *parts) +        path = os.path.join(self.path, *parts) +        assert self._valid_subdir(path) +        return path  class BlobsResource(resource.Resource): @@ -217,7 +232,7 @@ class BlobsResource(resource.Resource):      def _validate(self, request):          for arg in request.postpath: -            if arg and not re.match('^[a-zA-Z0-9_-]+$', arg): +            if arg and not VALID_STRINGS.match(arg):                  raise Exception('Invalid blob resource argument: %s' % arg)          return request.postpath diff --git a/testing/tests/blobs/test_fs_backend.py b/testing/tests/blobs/test_fs_backend.py index 39ee0028..0d7e9789 100644 --- a/testing/tests/blobs/test_fs_backend.py +++ b/testing/tests/blobs/test_fs_backend.py @@ -97,3 +97,11 @@ class FilesystemBackendTestCase(unittest.TestCase):          walk_mock.return_value = [(_, _, ['blob_0']), (_, _, ['blob_1'])]          result = json.loads(backend.list_blobs('user', DummyRequest([''])))          self.assertEquals(result, ['blob_0', 'blob_1']) + +    @pytest.mark.usefixtures("method_tmpdir") +    def test_path_validation_for_subdirectories(self): +        blobs_path = self.tempdir +        backend = _blobs.FilesystemBlobsBackend(blobs_path) +        self.assertFalse(backend._valid_subdir('/')) +        self.assertFalse(backend._valid_subdir(blobs_path + '../../../../../')) +        self.assertTrue(backend._valid_subdir(os.path.join(blobs_path, 'x'))) | 
