diff options
Diffstat (limited to 'src/leap/common/tests')
| -rw-r--r-- | src/leap/common/tests/test_certs.py | 99 | ||||
| -rw-r--r-- | src/leap/common/tests/test_events.py | 492 | ||||
| -rw-r--r-- | src/leap/common/tests/test_http.py | 75 | 
3 files changed, 305 insertions, 361 deletions
diff --git a/src/leap/common/tests/test_certs.py b/src/leap/common/tests/test_certs.py new file mode 100644 index 0000000..8ebc0f4 --- /dev/null +++ b/src/leap/common/tests/test_certs.py @@ -0,0 +1,99 @@ +# -*- coding: utf-8 -*- +# test_certs.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. +""" +Tests for: +    * leap/common/certs.py +""" +import os +import time + +try: +    import unittest2 as unittest +except ImportError: +    import unittest + +from leap.common import certs +from leap.common.testing.basetest import BaseLeapTest + +TEST_CERT_PEM = os.path.join( +    os.path.split(__file__)[0], +    '..', 'testing', "leaptest_combined_keycert.pem") + +# Values from the test cert file: +# Not Before: Sep  3 17:52:16 2013 GMT +# Not After : Sep  1 17:52:16 2023 GMT +CERT_NOT_BEFORE = (2013, 9, 3, 17, 52, 16, 1, 246, 0) +CERT_NOT_AFTER = (2023, 9, 1, 17, 52, 16, 4, 244, 0) + + +class CertsTest(BaseLeapTest): + +    def setUp(self): +        self.setUpEnv() + +    def tearDown(self): +        self.tearDownEnv() + +    def test_should_redownload_if_no_cert(self): +        self.assertTrue(certs.should_redownload(certfile="")) + +    def test_should_redownload_if_invalid_pem(self): +        cert_path = self.get_tempfile('test_pem_file.pem') + +        with open(cert_path, 'w') as f: +            f.write('this is some invalid data for the pem file') + +        self.assertTrue(certs.should_redownload(cert_path)) + +    def test_should_redownload_if_before(self): +        def new_now(): +            time.struct_time(CERT_NOT_BEFORE) +        self.assertTrue(certs.should_redownload(TEST_CERT_PEM, now=new_now)) + +    def test_should_redownload_if_after(self): +        def new_now(): +            time.struct_time(CERT_NOT_AFTER) +        self.assertTrue(certs.should_redownload(TEST_CERT_PEM, now=new_now)) + +    def test_not_should_redownload(self): +        self.assertFalse(certs.should_redownload(TEST_CERT_PEM)) + +    def test_is_valid_pemfile(self): +        with open(TEST_CERT_PEM) as f: +            cert_data = f.read() + +        self.assertTrue(certs.is_valid_pemfile(cert_data)) + +    def test_not_is_valid_pemfile(self): +        cert_data = 'this is some invalid data for the pem file' + +        self.assertFalse(certs.is_valid_pemfile(cert_data)) + +    def test_get_cert_time_boundaries(self): +        """ +        This test ensures us that the returned values are returned in UTC/GMT. +        """ +        with open(TEST_CERT_PEM) as f: +            cert_data = f.read() + +        valid_from, valid_to = certs.get_cert_time_boundaries(cert_data) +        self.assertEqual(tuple(valid_from), CERT_NOT_BEFORE) +        self.assertEqual(tuple(valid_to), CERT_NOT_AFTER) + + +if __name__ == "__main__": +    unittest.main() diff --git a/src/leap/common/tests/test_events.py b/src/leap/common/tests/test_events.py index 0779b2e..2ad097e 100644 --- a/src/leap/common/tests/test_events.py +++ b/src/leap/common/tests/test_events.py @@ -1,4 +1,4 @@ -## -*- coding: utf-8 -*- +# -*- coding: utf-8 -*-  # test_events.py  # Copyright (C) 2013 LEAP  # @@ -15,414 +15,184 @@  # You should have received a copy of the GNU General Public License  # along with this program. If not, see <http://www.gnu.org/licenses/>. -import unittest -import sets -import time -import socket -import threading -import random - - -from mock import Mock -from protobuf.socketrpc import RpcService -from leap.common import events -from leap.common.events import ( -    server, -    client, -    mac_auth, -) -from leap.common.events.events_pb2 import ( -    EventsServerService, -    EventsServerService_Stub, -    EventsClientService_Stub, -    EventResponse, -    SignalRequest, -    RegisterRequest, -    PingRequest, -    SOLEDAD_CREATING_KEYS, -    CLIENT_UID, -) +import os +import logging +import time -port = 8090 +from twisted.internet.reactor import callFromThread +from twisted.trial import unittest +from twisted.internet import defer -received = False +from leap.common.events import server +from leap.common.events import client +from leap.common.events import flags +from leap.common.events import txclient +from leap.common.events import catalog +from leap.common.events.errors import CallbackAlreadyRegisteredError -class EventsTestCase(unittest.TestCase): +if 'DEBUG' in os.environ: +    logging.basicConfig(level=logging.DEBUG) -    @classmethod -    def setUpClass(cls): -        server.EventsServerDaemon.ensure(8090) -        cls.callbacks = events.client.registered_callbacks -    @classmethod -    def tearDownClass(cls): -        # give some time for requests to be processed. -        time.sleep(1) +class EventsGenericClientTestCase(object):      def setUp(self): -        super(EventsTestCase, self).setUp() +        flags.set_events_enabled(True) +        self._server = server.ensure_server( +            emit_addr="tcp://127.0.0.1:0", +            reg_addr="tcp://127.0.0.1:0") +        self._client.configure_client( +            emit_addr="tcp://127.0.0.1:%d" % self._server.pull_port, +            reg_addr="tcp://127.0.0.1:%d" % self._server.pub_port)      def tearDown(self): -        #events.client.registered_callbacks = {} -        server.registered_clients = {} -        super(EventsTestCase, self).tearDown() - -    def test_service_singleton(self): -        """ -        Ensure that there's always just one instance of the server daemon -        running. -        """ -        service1 = server.EventsServerDaemon.ensure(8090) -        service2 = server.EventsServerDaemon.ensure(8090) -        self.assertEqual(service1, service2, -                         "Can't get singleton class for service.") +        self._client.shutdown() +        self._server.shutdown() +        flags.set_events_enabled(False) +        # wait a bit for sockets to close properly +        time.sleep(0.1)      def test_client_register(self):          """          Ensure clients can register callbacks.          """ -        self.assertTrue(1 not in self.callbacks, -                        'There should should be no callback for this signal.') -        events.register(1, lambda x: True) -        self.assertTrue(1 in self.callbacks, -                        'Could not register signal in local client.') -        events.register(2, lambda x: True) -        self.assertTrue(1 in self.callbacks, -                        'Could not register signal in local client.') -        self.assertTrue(2 in self.callbacks, -                        'Could not register signal in local client.') +        callbacks = self._client.instance().callbacks +        self.assertTrue(len(callbacks) == 0, +                        'There should be no callback for this event.') +        # register one event +        event1 = catalog.CLIENT_UID -    def test_register_signal_replace(self): -        """ -        Make sure clients can replace already registered callbacks. -        """ -        sig = 3 -        cbk = lambda x: True -        events.register(sig, cbk, uid=1) -        self.assertRaises(Exception, events.register, sig, lambda x: True, -                          uid=1) -        events.register(sig, lambda x: True, uid=1, replace=True) -        self.assertTrue(sig in self.callbacks, 'Could not register signal.') -        self.assertEqual(1, len(self.callbacks[sig]), -                         'Wrong number of registered callbacks.') - -    def test_signal_response_status(self): -        """ -        Ensure there's an appropriate response from server when signaling. -        """ -        sig = 4 -        request = SignalRequest() -        request.event = sig -        request.content = 'my signal contents' -        request.mac_method = mac_auth.MacMethod.MAC_NONE -        request.mac = "" -        service = RpcService(EventsServerService_Stub, port, 'localhost') -        # test synch -        response = service.signal(request, timeout=1000) -        self.assertEqual(EventResponse.OK, response.status, -                         'Wrong response status.') - -    def test_signal_executes_callback(self): -        """ -        Ensure callback is executed upon receiving signal. -        """ -        sig = CLIENT_UID -        request = SignalRequest() -        request.event = sig -        request.content = 'my signal contents' -        request.mac_method = mac_auth.MacMethod.MAC_NONE -        request.mac = "" -        service = RpcService(EventsServerService_Stub, port, 'localhost') - -        # register a callback -        flag = Mock() -        events.register(sig, lambda req: flag(req.event)) -        # signal -        response = service.signal(request) -        self.assertEqual(EventResponse.OK, response.status, -                         'Wrong response status.') -        time.sleep(1)  # wait for signal to arrive -        flag.assert_called_once_with(sig) - -    def test_events_server_service_register(self): -        """ -        Ensure the server can register clients to be signaled. -        """ -        sig = 5 -        request = RegisterRequest() -        request.event = sig -        request.port = 8091 -        request.mac_method = mac_auth.MacMethod.MAC_NONE -        request.mac = "" -        service = RpcService(EventsServerService_Stub, port, 'localhost') -        complist = server.registered_clients -        self.assertEqual({}, complist, -                         'There should be no registered_ports when ' -                         'server has just been created.') -        response = service.register(request, timeout=1000) -        self.assertTrue(sig in complist, "Signal not registered succesfully.") -        self.assertTrue(8091 in complist[sig], -                        'Failed registering client port.') - -    def test_client_request_register(self): -        """ -        Ensure clients can register themselves with server. -        """ -        sig = 6 -        complist = server.registered_clients -        self.assertTrue(sig not in complist, -                        'There should be no registered clients for this ' -                        'signal.') -        events.register(sig, lambda x: True) -        time.sleep(0.1) -        port = client.EventsClientDaemon.get_instance().get_port() -        self.assertTrue(sig in complist, 'Failed registering client.') -        self.assertTrue(port in complist[sig], -                        'Failed registering client port.') +        def cbk1(event, _): +            return True -    def test_client_receives_signal(self): -        """ -        Ensure clients can receive signals. -        """ -        sig = 7 -        flag = Mock() - -        events.register(sig, lambda req: flag(req.event)) -        request = SignalRequest() -        request.event = sig -        request.content = "" -        request.mac_method = mac_auth.MacMethod.MAC_NONE -        request.mac = "" -        service = RpcService(EventsServerService_Stub, port, 'localhost') -        response = service.signal(request, timeout=1000) -        self.assertTrue(response is not None, 'Did not receive response.') -        time.sleep(0.5) -        flag.assert_called_once_with(sig) - -    def test_client_send_signal(self): -        """ -        Ensure clients can send signals. -        """ -        sig = 8 -        response = events.signal(sig) -        self.assertTrue(response.status == response.OK, -                        'Received wrong response status when signaling.') +        uid1 = self._client.register(event1, cbk1) +        # assert for correct registration +        self.assertTrue(len(callbacks) == 1) +        self.assertTrue(callbacks[event1][uid1] == cbk1, +                        'Could not register event in local client.') +        # register another event +        event2 = catalog.CLIENT_SESSION_ID -    def test_client_unregister_all(self): -        """ -        Test that the client can unregister all events for one signal. -        """ -        sig = CLIENT_UID -        complist = server.registered_clients -        events.register(sig, lambda x: True) -        events.register(sig, lambda x: True) -        time.sleep(0.1) -        events.unregister(sig) -        time.sleep(0.1) -        port = client.EventsClientDaemon.get_instance().get_port() -        self.assertFalse(bool(complist[sig])) -        self.assertTrue(port not in complist[sig]) +        def cbk2(event, _): +            return True -    def test_client_unregister_by_uid(self): -        """ -        Test that the client can unregister an event by uid. -        """ -        sig = CLIENT_UID -        complist = server.registered_clients -        events.register(sig, lambda x: True, uid='cbkuid') -        events.register(sig, lambda x: True, uid='cbkuid2') -        time.sleep(0.1) -        events.unregister(sig, uid='cbkuid') -        time.sleep(0.1) -        port = client.EventsClientDaemon.get_instance().get_port() -        self.assertTrue(sig in complist) -        self.assertTrue(len(complist[sig]) == 1) -        self.assertTrue( -            client.registered_callbacks[sig].pop()[0] == 'cbkuid2') -        self.assertTrue(port in complist[sig]) - -    def test_server_replies_ping(self): -        """ -        Ensure server replies to a ping. -        """ -        request = PingRequest() -        service = RpcService(EventsServerService_Stub, port, 'localhost') -        response = service.ping(request, timeout=1000) -        self.assertIsNotNone(response) -        self.assertEqual(EventResponse.OK, response.status, -                         'Wrong response status.') - -    def test_client_replies_ping(self): -        """ -        Ensure clients reply to a ping. -        """ -        daemon = client.ensure_client_daemon() -        port = daemon.get_port() -        request = PingRequest() -        service = RpcService(EventsClientService_Stub, port, 'localhost') -        response = service.ping(request, timeout=1000) -        self.assertEqual(EventResponse.OK, response.status, -                         'Wrong response status.') - -    def test_server_ping(self): -        """ -        Ensure the function from server module pings correctly. -        """ -        response = server.ping() -        self.assertIsNotNone(response) -        self.assertEqual(EventResponse.OK, response.status, -                         'Wrong response status.') +        uid2 = self._client.register(event2, cbk2) +        # assert for correct registration +        self.assertTrue(len(callbacks) == 2) +        self.assertTrue(callbacks[event2][uid2] == cbk2, +                        'Could not register event in local client.') -    def test_client_ping(self): +    def test_register_signal_replace(self):          """ -        Ensure the function from client module pings correctly. +        Make sure clients can replace already registered callbacks.          """ -        daemon = client.ensure_client_daemon() -        response = client.ping(daemon.get_port()) -        self.assertIsNotNone(response) -        self.assertEqual(EventResponse.OK, response.status, -                         'Wrong response status.') +        event = catalog.CLIENT_UID +        d = defer.Deferred() -    def test_module_ping_server(self): -        """ -        Ensure the function from main module pings server correctly. -        """ -        response = events.ping_server() -        self.assertIsNotNone(response) -        self.assertEqual(EventResponse.OK, response.status, -                         'Wrong response status.') +        def cbk_fail(event, _): +            return callFromThread(d.errback, event) -    def test_module_ping_client(self): -        """ -        Ensure the function from main module pings clients correctly. -        """ -        daemon = client.ensure_client_daemon() -        response = events.ping_client(daemon.get_port()) -        self.assertIsNotNone(response) -        self.assertEqual(EventResponse.OK, response.status, -                         'Wrong response status.') +        def cbk_succeed(event, _): +            return callFromThread(d.callback, event) + +        self._client.register(event, cbk_fail, uid=1) +        self._client.register(event, cbk_succeed, uid=1, replace=True) +        self._client.emit(event, None) +        return d -    def test_ensure_server_raises_if_port_taken(self): +    def test_register_signal_replace_fails_when_replace_is_false(self):          """ -        Verify that server raises an exception if port is already taken. +        Make sure clients trying to replace already registered callbacks fail +        when replace=False          """ -        # get a random free port -        while True: -            port = random.randint(1024, 65535) -            try: -                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -                s.connect(('localhost', port)) -                s.close() -            except: -                break - -        class PortBlocker(threading.Thread): - -            def run(self): -                conns = 0 -                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -                s.bind(('localhost', port)) -                s.setblocking(1) -                s.listen(1) -                while conns < 2:  # blocks until rece -                    conns += 1 -                    s.accept() -                s.close() - -        # block the port -        taker = PortBlocker() -        taker.start() -        time.sleep(1)  # wait for thread to start. +        event = catalog.CLIENT_UID +        self._client.register(event, lambda event, _: None, uid=1)          self.assertRaises( -            server.PortAlreadyTaken, server.ensure_server, port) +            CallbackAlreadyRegisteredError, +            self._client.register, +            event, lambda event, _: None, uid=1, replace=False) -    def test_async_register(self): +    def test_register_more_than_one_callback_works(self):          """ -        Test asynchronous registering of callbacks. +        Make sure clients can replace already registered callbacks.          """ -        flag = Mock() +        event = catalog.CLIENT_UID +        d1 = defer.Deferred() + +        def cbk1(event, _): +            return callFromThread(d1.callback, event) -        # executed after async register, when response is received from server -        def reqcbk(request, response): -            flag(request.event, response.status) +        d2 = defer.Deferred() -        # callback registered by application -        def callback(request): -            pass +        def cbk2(event, _): +            return d2.callback(event) -        # passing a callback as reqcbk param makes the call asynchronous -        result = events.register(CLIENT_UID, callback, reqcbk=reqcbk) -        self.assertIsNone(result) -        events.signal(CLIENT_UID) -        time.sleep(1)  # wait for signal to arrive from server -        flag.assert_called_once_with(CLIENT_UID, EventResponse.OK) +        self._client.register(event, cbk1) +        self._client.register(event, cbk2) +        self._client.emit(event, None) +        d = defer.gatherResults([d1, d2]) +        return d -    def test_async_signal(self): +    def test_client_receives_signal(self):          """ -        Test asynchronous registering of callbacks. +        Ensure clients can receive signals.          """ -        flag = Mock() +        event = catalog.CLIENT_UID +        d = defer.Deferred() -        # executed after async signal, when response is received from server -        def reqcbk(request, response): -            flag(request.event, response.status) +        def cbk(events, _): +            callFromThread(d.callback, event) -        # passing a callback as reqcbk param makes the call asynchronous -        result = events.signal(CLIENT_UID, reqcbk=reqcbk) -        self.assertIsNone(result) -        time.sleep(1)  # wait for signal to arrive from server -        flag.assert_called_once_with(CLIENT_UID, EventResponse.OK) +        self._client.register(event, cbk) +        self._client.emit(event, None) +        return d -    def test_async_unregister(self): +    def test_client_unregister_all(self):          """ -        Test asynchronous unregistering of callbacks. +        Test that the client can unregister all events for one signal.          """ -        flag = Mock() +        event1 = catalog.CLIENT_UID +        d = defer.Deferred() +        # register more than one callback for the same event +        self._client.register( +            event1, lambda ev, _: callFromThread(d.errback, None)) +        self._client.register( +            event1, lambda ev, _: callFromThread(d.errback, None)) +        # unregister and emit the event +        self._client.unregister(event1) +        self._client.emit(event1, None) +        # register and emit another event so the deferred can succeed +        event2 = catalog.CLIENT_SESSION_ID +        self._client.register( +            event2, lambda ev, _: callFromThread(d.callback, None)) +        self._client.emit(event2, None) +        return d -        # executed after async signal, when response is received from server -        def reqcbk(request, response): -            flag(request.event, response.status) - -        # callback registered by application -        def callback(request): -            pass - -        # passing a callback as reqcbk param makes the call asynchronous -        events.register(CLIENT_UID, callback) -        result = events.unregister(CLIENT_UID, reqcbk=reqcbk) -        self.assertIsNone(result) -        time.sleep(1)  # wait for signal to arrive from server -        flag.assert_called_once_with(CLIENT_UID, EventResponse.OK) - -    def test_async_ping_server(self): +    def test_client_unregister_by_uid(self):          """ -        Test asynchronous pinging of server. +        Test that the client can unregister an event by uid.          """ -        flag = Mock() +        event = catalog.CLIENT_UID +        d = defer.Deferred() +        # register one callback that would fail +        uid = self._client.register( +            event, lambda ev, _: callFromThread(d.errback, None)) +        # register one callback that will succeed +        self._client.register( +            event, lambda ev, _: callFromThread(d.callback, None)) +        # unregister by uid and emit the event +        self._client.unregister(event, uid=uid) +        self._client.emit(event, None) +        return d -        # executed after async signal, when response is received from server -        def reqcbk(request, response): -            flag(response.status) -        result = events.ping_server(reqcbk=reqcbk) -        self.assertIsNone(result) -        time.sleep(1)  # wait for response to arrive from server. -        flag.assert_called_once_with(EventResponse.OK) +class EventsTxClientTestCase(EventsGenericClientTestCase, unittest.TestCase): + +    _client = txclient -    def test_async_ping_client(self): -        """ -        Test asynchronous pinging of client. -        """ -        flag = Mock() -        # executed after async signal, when response is received from server -        def reqcbk(request, response): -            flag(response.status) +class EventsClientTestCase(EventsGenericClientTestCase, unittest.TestCase): -        daemon = client.ensure_client_daemon() -        result = events.ping_client(daemon.get_port(), reqcbk=reqcbk) -        self.assertIsNone(result) -        time.sleep(1)  # wait for response to arrive from server. -        flag.assert_called_once_with(EventResponse.OK) +    _client = client diff --git a/src/leap/common/tests/test_http.py b/src/leap/common/tests/test_http.py new file mode 100644 index 0000000..f44550f --- /dev/null +++ b/src/leap/common/tests/test_http.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- +# test_http.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. +""" +Tests for: +    * leap/common/http.py +""" +import os +try: +    import unittest2 as unittest +except ImportError: +    import unittest + +from leap.common import http +from leap.common.testing.basetest import BaseLeapTest + +TEST_CERT_PEM = os.path.join( +    os.path.split(__file__)[0], +    '..', 'testing', "leaptest_combined_keycert.pem") + + +class HTTPClientTest(BaseLeapTest): + +    def setUp(self): +        pass + +    def tearDown(self): +        pass + +    def test_agents_sharing_pool_by_default(self): +        client = http.HTTPClient() +        client2 = http.HTTPClient(TEST_CERT_PEM) +        self.assertNotEquals( +            client._agent, client2._agent, "Expected dedicated agents") +        self.assertEquals( +            client._agent._pool, client2._agent._pool, +            "Pool was not reused by default") + +    def test_agent_can_have_dedicated_custom_pool(self): +        custom_pool = http._HTTPConnectionPool( +            None, +            timeout=10, +            maxPersistentPerHost=42, +            persistent=False +        ) +        self.assertEquals(custom_pool.maxPersistentPerHost, 42, +                          "Custom persistent connections " +                          "limit is not being respected") +        self.assertFalse(custom_pool.persistent, +                         "Custom persistence is not being respected") +        default_client = http.HTTPClient() +        custom_client = http.HTTPClient(pool=custom_pool) + +        self.assertNotEquals( +            default_client._agent, custom_client._agent, +            "No agent reuse is expected") +        self.assertEquals( +            custom_pool, custom_client._agent._pool, +            "Custom pool usage was not respected") + +if __name__ == "__main__": +    unittest.main()  | 
