[tests] adapt events tests to recent changes
[leap_pycommon.git] / src / leap / common / events / tests / test_auth.py
1 # -*- coding: utf-8 -*-
2 # test_zmq_components.py
3 # Copyright (C) 2014 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 """
18 Tests for the auth module.
19 """
20 import os
21
22 from twisted.trial import unittest
23 from txzmq import ZmqFactory
24
25 from leap.common.events import auth
26 from leap.common.testing.basetest import BaseLeapTest
27 from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX
28 from leap.common.zmq_utils import maybe_create_and_get_certificates
29
30 from txzmq.test import _wait
31
32
33 class ZmqAuthTestCase(unittest.TestCase, BaseLeapTest):
34
35     def setUp(self):
36         self.setUpEnv(launch_events_server=False)
37
38         self.factory = ZmqFactory()
39         self._config_prefix = os.path.join(self.tempdir, "leap", "events")
40
41         self.public, self.secret = maybe_create_and_get_certificates(
42             self._config_prefix, 'server')
43
44         self.authenticator = auth.TxAuthenticator(self.factory)
45         self.authenticator.start()
46         self.auth_req = auth.TxAuthenticationRequest(self.factory)
47
48     def tearDown(self):
49         self.factory.shutdown()
50         self.tearDownEnv()
51
52     def test_curve_auth(self):
53         self.auth_req.start()
54         self.auth_req.allow('127.0.0.1')
55         public_keys_dir = os.path.join(self._config_prefix, PUBLIC_KEYS_PREFIX)
56         self.auth_req.configure_curve(domain="*", location=public_keys_dir)
57
58         def check(ignored):
59             authenticator = self.authenticator.authenticator
60             certs = authenticator.certs['*']
61             self.failUnlessEqual(authenticator.whitelist, set([u'127.0.0.1']))
62             self.failUnlessEqual(certs[certs.keys()[0]], True)
63
64         return _wait(0.1).addCallback(check)