summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorkali <kali@leap.se>2013-05-01 04:11:26 +0900
committerkali <kali@leap.se>2013-05-11 21:59:58 +0900
commit21b57bfd059ff32201c3403bd5ecc00d4b7d3aed (patch)
treee6e5789971c8e715d5ab72f377b3ebe99599d8e3 /src
parent3d883f79cfe5f8efecd8cbab512eae65101a8c5a (diff)
whitelist openvpn cipher parameters
Diffstat (limited to 'src')
-rw-r--r--src/leap/services/eip/eipconfig.py27
-rw-r--r--src/leap/services/eip/tests/__init__.py0
-rw-r--r--src/leap/services/eip/tests/test_eipconfig.py174
3 files changed, 199 insertions, 2 deletions
diff --git a/src/leap/services/eip/eipconfig.py b/src/leap/services/eip/eipconfig.py
index 4e74687a..baf26bca 100644
--- a/src/leap/services/eip/eipconfig.py
+++ b/src/leap/services/eip/eipconfig.py
@@ -18,8 +18,9 @@
"""
Provider configuration
"""
-import os
import logging
+import os
+import re
from leap.common.check import leap_assert, leap_assert_type
from leap.common.config.baseconfig import BaseConfig
@@ -33,6 +34,8 @@ class EIPConfig(BaseConfig):
"""
Provider configuration abstraction class
"""
+ OPENVPN_ALLOWED_KEYS = ("auth", "cipher", "tls-cipher")
+ OPENVPN_CIPHERS_REGEX = re.compile("[A-Z0-9\-]+")
def __init__(self):
BaseConfig.__init__(self)
@@ -52,7 +55,24 @@ class EIPConfig(BaseConfig):
return self._safe_get_value("gateways")
def get_openvpn_configuration(self):
- return self._safe_get_value("openvpn_configuration")
+ """
+ Returns a dictionary containing the openvpn configuration
+ parameters.
+
+ These are sanitized with alphanumeric whitelist.
+
+ @returns: openvpn configuration dict
+ @rtype: C{dict}
+ """
+ ovpncfg = self._safe_get_value("openvpn_configuration")
+ config = {}
+ for key, value in ovpncfg.items():
+ if key in self.OPENVPN_ALLOWED_KEYS and value is not None:
+ sanitized_val = self.OPENVPN_CIPHERS_REGEX.findall(value)
+ if len(sanitized_val) != 0:
+ _val = sanitized_val[0]
+ config[str(key)] = str(_val)
+ return config
def get_serial(self):
return self._safe_get_value("serial")
@@ -61,6 +81,9 @@ class EIPConfig(BaseConfig):
return self._safe_get_value("version")
def get_gateway_ip(self, index=0):
+ """
+ Returns the ip of the gateway
+ """
gateways = self.get_gateways()
leap_assert(len(gateways) > 0, "We don't have any gateway!")
if index > len(gateways):
diff --git a/src/leap/services/eip/tests/__init__.py b/src/leap/services/eip/tests/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/src/leap/services/eip/tests/__init__.py
diff --git a/src/leap/services/eip/tests/test_eipconfig.py b/src/leap/services/eip/tests/test_eipconfig.py
new file mode 100644
index 00000000..1675472f
--- /dev/null
+++ b/src/leap/services/eip/tests/test_eipconfig.py
@@ -0,0 +1,174 @@
+# -*- coding: utf-8 -*-
+# test_eipconfig.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+tests for eipconfig
+"""
+import copy
+import json
+import os
+import unittest
+
+from leap.common.testing.basetest import BaseLeapTest
+from leap.services.eip.eipconfig import EIPConfig
+
+
+sample_config = {
+ "gateways": [
+ {
+ "capabilities": {
+ "adblock": False,
+ "filter_dns": True,
+ "limited": True,
+ "ports": [
+ "1194",
+ "443",
+ "53",
+ "80"
+ ],
+ "protocols": [
+ "tcp",
+ "udp"],
+ "transport": [
+ "openvpn"],
+ "user_ips": False},
+ "host": "host.dev.example.org",
+ "ip_address": "11.22.33.44",
+ "location": "cyberspace"
+ }],
+ "locations": {
+ "ankara": {
+ "country_code": "XX",
+ "hemisphere": "S",
+ "name": "Antarctica",
+ "timezone": "+2"
+ }
+ },
+ "openvpn_configuration": {
+ "auth": "SHA1",
+ "cipher": "AES-128-CBC",
+ "tls-cipher": "DHE-RSA-AES128-SHA"
+ },
+ "serial": 1,
+ "version": 1
+}
+
+
+class EIPConfigTest(BaseLeapTest):
+
+ __name__ = "eip_config_tests"
+ #provider = "testprovider.example.org"
+
+ maxDiff = None
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ #
+ # helpers
+ #
+
+ def write_config(self, data):
+ self.configfile = os.path.join(
+ self.tempdir, "eipconfig.json")
+ conf = open(self.configfile, "w")
+ conf.write(json.dumps(data))
+ conf.close()
+
+ def test_load_valid_config(self):
+ """
+ load a sample config
+ """
+ self.write_config(sample_config)
+ config = EIPConfig()
+ self.assertRaises(
+ AssertionError,
+ config.get_clusters)
+ self.assertTrue(config.load(self.configfile))
+ self.assertEqual(
+ config.get_openvpn_configuration(),
+ sample_config["openvpn_configuration"])
+ self.assertEqual(
+ config.get_gateway_ip(),
+ "11.22.33.44")
+ self.assertEqual(config.get_version(), 1)
+ self.assertEqual(config.get_serial(), 1)
+ self.assertEqual(config.get_gateways(),
+ sample_config["gateways"])
+ self.assertEqual(
+ config.get_clusters(), None)
+
+ def test_openvpnoptions(self):
+ """
+ check the sanitization of openvpn options
+ """
+ # extra parameters
+ data = copy.deepcopy(sample_config)
+ data['openvpn_configuration']["extra_param"] = "FOO"
+ self.write_config(data)
+ config = EIPConfig()
+ config.load(self.configfile)
+ self.assertEqual(
+ config.get_openvpn_configuration(),
+ sample_config["openvpn_configuration"])
+
+ # non allowed chars
+ data = copy.deepcopy(sample_config)
+ data['openvpn_configuration']["auth"] = "SHA1;"
+ self.write_config(data)
+ config = EIPConfig()
+ config.load(self.configfile)
+ self.assertEqual(
+ config.get_openvpn_configuration(),
+ sample_config["openvpn_configuration"])
+
+ # non allowed chars
+ data = copy.deepcopy(sample_config)
+ data['openvpn_configuration']["auth"] = "SHA1>`&|"
+ self.write_config(data)
+ config = EIPConfig()
+ config.load(self.configfile)
+ self.assertEqual(
+ config.get_openvpn_configuration(),
+ sample_config["openvpn_configuration"])
+
+ # lowercase
+ data = copy.deepcopy(sample_config)
+ data['openvpn_configuration']["auth"] = "shaSHA1"
+ self.write_config(data)
+ config = EIPConfig()
+ config.load(self.configfile)
+ self.assertEqual(
+ config.get_openvpn_configuration(),
+ sample_config["openvpn_configuration"])
+
+ # all characters invalid -> null value
+ data = copy.deepcopy(sample_config)
+ data['openvpn_configuration']["auth"] = "sha&*!@#;"
+ self.write_config(data)
+ config = EIPConfig()
+ config.load(self.configfile)
+ self.assertEqual(
+ config.get_openvpn_configuration(),
+ {'cipher': 'AES-128-CBC',
+ 'tls-cipher': 'DHE-RSA-AES128-SHA'})
+
+
+if __name__ == "__main__":
+ unittest.main()