From 0c22a7047553afdc1ed8a33bea17ccbe842e5e6e Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor1984@riseup.net>
Date: Mon, 1 May 2017 06:21:01 -0300
Subject: [feature] blobs path validation

Check if user and blob_id are valid strings, then check if the resulting
path is a subdirectory of blobs configured path.

- Related: #8800
---
 server/src/leap/soledad/server/_blobs.py | 19 +++++++++++++++++--
 testing/tests/blobs/test_fs_backend.py   |  8 ++++++++
 2 files changed, 25 insertions(+), 2 deletions(-)

diff --git a/server/src/leap/soledad/server/_blobs.py b/server/src/leap/soledad/server/_blobs.py
index 84056c0a..72125d5a 100644
--- a/server/src/leap/soledad/server/_blobs.py
+++ b/server/src/leap/soledad/server/_blobs.py
@@ -43,6 +43,8 @@ __all__ = ['BlobsResource']
 
 
 logger = Logger()
+# Used for sanitizers, we accept only letters, numbers, '-' and '_'
+VALID_STRINGS = re.compile('^[a-zA-Z0-9_-]+$')
 
 
 # TODO some error handling needed
@@ -112,7 +114,9 @@ class FilesystemBlobsBackend(object):
 
     def list_blobs(self, user, request):
         blob_ids = []
+        assert VALID_STRINGS.match(user)
         base_path = os.path.join(self.path, user)
+        assert self._valid_subdir(base_path)
         for _, _, filenames in os.walk(base_path):
             blob_ids += filenames
         return json.dumps(blob_ids)
@@ -153,6 +157,7 @@ class FilesystemBlobsBackend(object):
         yield fbp.startProducing(open(path, 'wb'))
 
     def get_total_storage(self, user):
+        assert VALID_STRINGS.match(user)
         return self._get_disk_usage(os.path.join(self.path, user))
 
     def delete_blob(user, blob_id):
@@ -165,16 +170,26 @@ class FilesystemBlobsBackend(object):
     def _get_disk_usage(self, start_path):
         if not os.path.isdir(start_path):
             defer.returnValue(0)
+        assert self._valid_subdir(start_path)
         cmd = ['/usr/bin/du', '-s', '-c', start_path]
         output = yield utils.getProcessOutput(cmd[0], cmd[1:])
         size = output.split()[0]
         defer.returnValue(int(size))
 
+    def _valid_subdir(self, desired_path):
+        desired_path = os.path.realpath(desired_path)  # expand path references
+        root = os.path.realpath(self.path)
+        return desired_path.startswith(root + os.sep)
+
     def _get_path(self, user, blob_id):
+        assert VALID_STRINGS.match(user)
+        assert VALID_STRINGS.match(blob_id)
         parts = [user]
         parts += [blob_id[0], blob_id[0:3], blob_id[0:6]]
         parts += [blob_id]
-        return os.path.join(self.path, *parts)
+        path = os.path.join(self.path, *parts)
+        assert self._valid_subdir(path)
+        return path
 
 
 class BlobsResource(resource.Resource):
@@ -217,7 +232,7 @@ class BlobsResource(resource.Resource):
 
     def _validate(self, request):
         for arg in request.postpath:
-            if arg and not re.match('^[a-zA-Z0-9_-]+$', arg):
+            if arg and not VALID_STRINGS.match(arg):
                 raise Exception('Invalid blob resource argument: %s' % arg)
         return request.postpath
 
diff --git a/testing/tests/blobs/test_fs_backend.py b/testing/tests/blobs/test_fs_backend.py
index 39ee0028..0d7e9789 100644
--- a/testing/tests/blobs/test_fs_backend.py
+++ b/testing/tests/blobs/test_fs_backend.py
@@ -97,3 +97,11 @@ class FilesystemBackendTestCase(unittest.TestCase):
         walk_mock.return_value = [(_, _, ['blob_0']), (_, _, ['blob_1'])]
         result = json.loads(backend.list_blobs('user', DummyRequest([''])))
         self.assertEquals(result, ['blob_0', 'blob_1'])
+
+    @pytest.mark.usefixtures("method_tmpdir")
+    def test_path_validation_for_subdirectories(self):
+        blobs_path = self.tempdir
+        backend = _blobs.FilesystemBlobsBackend(blobs_path)
+        self.assertFalse(backend._valid_subdir('/'))
+        self.assertFalse(backend._valid_subdir(blobs_path + '../../../../../'))
+        self.assertTrue(backend._valid_subdir(os.path.join(blobs_path, 'x')))
-- 
cgit v1.2.3