summaryrefslogtreecommitdiff
path: root/src/leap/common/tests
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/common/tests')
-rw-r--r--src/leap/common/tests/test_certs.py99
-rw-r--r--src/leap/common/tests/test_events.py492
-rw-r--r--src/leap/common/tests/test_http.py75
3 files changed, 305 insertions, 361 deletions
diff --git a/src/leap/common/tests/test_certs.py b/src/leap/common/tests/test_certs.py
new file mode 100644
index 0000000..8ebc0f4
--- /dev/null
+++ b/src/leap/common/tests/test_certs.py
@@ -0,0 +1,99 @@
+# -*- coding: utf-8 -*-
+# test_certs.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for:
+ * leap/common/certs.py
+"""
+import os
+import time
+
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+
+from leap.common import certs
+from leap.common.testing.basetest import BaseLeapTest
+
+TEST_CERT_PEM = os.path.join(
+ os.path.split(__file__)[0],
+ '..', 'testing', "leaptest_combined_keycert.pem")
+
+# Values from the test cert file:
+# Not Before: Sep 3 17:52:16 2013 GMT
+# Not After : Sep 1 17:52:16 2023 GMT
+CERT_NOT_BEFORE = (2013, 9, 3, 17, 52, 16, 1, 246, 0)
+CERT_NOT_AFTER = (2023, 9, 1, 17, 52, 16, 4, 244, 0)
+
+
+class CertsTest(BaseLeapTest):
+
+ def setUp(self):
+ self.setUpEnv()
+
+ def tearDown(self):
+ self.tearDownEnv()
+
+ def test_should_redownload_if_no_cert(self):
+ self.assertTrue(certs.should_redownload(certfile=""))
+
+ def test_should_redownload_if_invalid_pem(self):
+ cert_path = self.get_tempfile('test_pem_file.pem')
+
+ with open(cert_path, 'w') as f:
+ f.write('this is some invalid data for the pem file')
+
+ self.assertTrue(certs.should_redownload(cert_path))
+
+ def test_should_redownload_if_before(self):
+ def new_now():
+ time.struct_time(CERT_NOT_BEFORE)
+ self.assertTrue(certs.should_redownload(TEST_CERT_PEM, now=new_now))
+
+ def test_should_redownload_if_after(self):
+ def new_now():
+ time.struct_time(CERT_NOT_AFTER)
+ self.assertTrue(certs.should_redownload(TEST_CERT_PEM, now=new_now))
+
+ def test_not_should_redownload(self):
+ self.assertFalse(certs.should_redownload(TEST_CERT_PEM))
+
+ def test_is_valid_pemfile(self):
+ with open(TEST_CERT_PEM) as f:
+ cert_data = f.read()
+
+ self.assertTrue(certs.is_valid_pemfile(cert_data))
+
+ def test_not_is_valid_pemfile(self):
+ cert_data = 'this is some invalid data for the pem file'
+
+ self.assertFalse(certs.is_valid_pemfile(cert_data))
+
+ def test_get_cert_time_boundaries(self):
+ """
+ This test ensures us that the returned values are returned in UTC/GMT.
+ """
+ with open(TEST_CERT_PEM) as f:
+ cert_data = f.read()
+
+ valid_from, valid_to = certs.get_cert_time_boundaries(cert_data)
+ self.assertEqual(tuple(valid_from), CERT_NOT_BEFORE)
+ self.assertEqual(tuple(valid_to), CERT_NOT_AFTER)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/src/leap/common/tests/test_events.py b/src/leap/common/tests/test_events.py
index 0779b2e..2ad097e 100644
--- a/src/leap/common/tests/test_events.py
+++ b/src/leap/common/tests/test_events.py
@@ -1,4 +1,4 @@
-## -*- coding: utf-8 -*-
+# -*- coding: utf-8 -*-
# test_events.py
# Copyright (C) 2013 LEAP
#
@@ -15,414 +15,184 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import unittest
-import sets
-import time
-import socket
-import threading
-import random
-
-
-from mock import Mock
-from protobuf.socketrpc import RpcService
-from leap.common import events
-from leap.common.events import (
- server,
- client,
- mac_auth,
-)
-from leap.common.events.events_pb2 import (
- EventsServerService,
- EventsServerService_Stub,
- EventsClientService_Stub,
- EventResponse,
- SignalRequest,
- RegisterRequest,
- PingRequest,
- SOLEDAD_CREATING_KEYS,
- CLIENT_UID,
-)
+import os
+import logging
+import time
-port = 8090
+from twisted.internet.reactor import callFromThread
+from twisted.trial import unittest
+from twisted.internet import defer
-received = False
+from leap.common.events import server
+from leap.common.events import client
+from leap.common.events import flags
+from leap.common.events import txclient
+from leap.common.events import catalog
+from leap.common.events.errors import CallbackAlreadyRegisteredError
-class EventsTestCase(unittest.TestCase):
+if 'DEBUG' in os.environ:
+ logging.basicConfig(level=logging.DEBUG)
- @classmethod
- def setUpClass(cls):
- server.EventsServerDaemon.ensure(8090)
- cls.callbacks = events.client.registered_callbacks
- @classmethod
- def tearDownClass(cls):
- # give some time for requests to be processed.
- time.sleep(1)
+class EventsGenericClientTestCase(object):
def setUp(self):
- super(EventsTestCase, self).setUp()
+ flags.set_events_enabled(True)
+ self._server = server.ensure_server(
+ emit_addr="tcp://127.0.0.1:0",
+ reg_addr="tcp://127.0.0.1:0")
+ self._client.configure_client(
+ emit_addr="tcp://127.0.0.1:%d" % self._server.pull_port,
+ reg_addr="tcp://127.0.0.1:%d" % self._server.pub_port)
def tearDown(self):
- #events.client.registered_callbacks = {}
- server.registered_clients = {}
- super(EventsTestCase, self).tearDown()
-
- def test_service_singleton(self):
- """
- Ensure that there's always just one instance of the server daemon
- running.
- """
- service1 = server.EventsServerDaemon.ensure(8090)
- service2 = server.EventsServerDaemon.ensure(8090)
- self.assertEqual(service1, service2,
- "Can't get singleton class for service.")
+ self._client.shutdown()
+ self._server.shutdown()
+ flags.set_events_enabled(False)
+ # wait a bit for sockets to close properly
+ time.sleep(0.1)
def test_client_register(self):
"""
Ensure clients can register callbacks.
"""
- self.assertTrue(1 not in self.callbacks,
- 'There should should be no callback for this signal.')
- events.register(1, lambda x: True)
- self.assertTrue(1 in self.callbacks,
- 'Could not register signal in local client.')
- events.register(2, lambda x: True)
- self.assertTrue(1 in self.callbacks,
- 'Could not register signal in local client.')
- self.assertTrue(2 in self.callbacks,
- 'Could not register signal in local client.')
+ callbacks = self._client.instance().callbacks
+ self.assertTrue(len(callbacks) == 0,
+ 'There should be no callback for this event.')
+ # register one event
+ event1 = catalog.CLIENT_UID
- def test_register_signal_replace(self):
- """
- Make sure clients can replace already registered callbacks.
- """
- sig = 3
- cbk = lambda x: True
- events.register(sig, cbk, uid=1)
- self.assertRaises(Exception, events.register, sig, lambda x: True,
- uid=1)
- events.register(sig, lambda x: True, uid=1, replace=True)
- self.assertTrue(sig in self.callbacks, 'Could not register signal.')
- self.assertEqual(1, len(self.callbacks[sig]),
- 'Wrong number of registered callbacks.')
-
- def test_signal_response_status(self):
- """
- Ensure there's an appropriate response from server when signaling.
- """
- sig = 4
- request = SignalRequest()
- request.event = sig
- request.content = 'my signal contents'
- request.mac_method = mac_auth.MacMethod.MAC_NONE
- request.mac = ""
- service = RpcService(EventsServerService_Stub, port, 'localhost')
- # test synch
- response = service.signal(request, timeout=1000)
- self.assertEqual(EventResponse.OK, response.status,
- 'Wrong response status.')
-
- def test_signal_executes_callback(self):
- """
- Ensure callback is executed upon receiving signal.
- """
- sig = CLIENT_UID
- request = SignalRequest()
- request.event = sig
- request.content = 'my signal contents'
- request.mac_method = mac_auth.MacMethod.MAC_NONE
- request.mac = ""
- service = RpcService(EventsServerService_Stub, port, 'localhost')
-
- # register a callback
- flag = Mock()
- events.register(sig, lambda req: flag(req.event))
- # signal
- response = service.signal(request)
- self.assertEqual(EventResponse.OK, response.status,
- 'Wrong response status.')
- time.sleep(1) # wait for signal to arrive
- flag.assert_called_once_with(sig)
-
- def test_events_server_service_register(self):
- """
- Ensure the server can register clients to be signaled.
- """
- sig = 5
- request = RegisterRequest()
- request.event = sig
- request.port = 8091
- request.mac_method = mac_auth.MacMethod.MAC_NONE
- request.mac = ""
- service = RpcService(EventsServerService_Stub, port, 'localhost')
- complist = server.registered_clients
- self.assertEqual({}, complist,
- 'There should be no registered_ports when '
- 'server has just been created.')
- response = service.register(request, timeout=1000)
- self.assertTrue(sig in complist, "Signal not registered succesfully.")
- self.assertTrue(8091 in complist[sig],
- 'Failed registering client port.')
-
- def test_client_request_register(self):
- """
- Ensure clients can register themselves with server.
- """
- sig = 6
- complist = server.registered_clients
- self.assertTrue(sig not in complist,
- 'There should be no registered clients for this '
- 'signal.')
- events.register(sig, lambda x: True)
- time.sleep(0.1)
- port = client.EventsClientDaemon.get_instance().get_port()
- self.assertTrue(sig in complist, 'Failed registering client.')
- self.assertTrue(port in complist[sig],
- 'Failed registering client port.')
+ def cbk1(event, _):
+ return True
- def test_client_receives_signal(self):
- """
- Ensure clients can receive signals.
- """
- sig = 7
- flag = Mock()
-
- events.register(sig, lambda req: flag(req.event))
- request = SignalRequest()
- request.event = sig
- request.content = ""
- request.mac_method = mac_auth.MacMethod.MAC_NONE
- request.mac = ""
- service = RpcService(EventsServerService_Stub, port, 'localhost')
- response = service.signal(request, timeout=1000)
- self.assertTrue(response is not None, 'Did not receive response.')
- time.sleep(0.5)
- flag.assert_called_once_with(sig)
-
- def test_client_send_signal(self):
- """
- Ensure clients can send signals.
- """
- sig = 8
- response = events.signal(sig)
- self.assertTrue(response.status == response.OK,
- 'Received wrong response status when signaling.')
+ uid1 = self._client.register(event1, cbk1)
+ # assert for correct registration
+ self.assertTrue(len(callbacks) == 1)
+ self.assertTrue(callbacks[event1][uid1] == cbk1,
+ 'Could not register event in local client.')
+ # register another event
+ event2 = catalog.CLIENT_SESSION_ID
- def test_client_unregister_all(self):
- """
- Test that the client can unregister all events for one signal.
- """
- sig = CLIENT_UID
- complist = server.registered_clients
- events.register(sig, lambda x: True)
- events.register(sig, lambda x: True)
- time.sleep(0.1)
- events.unregister(sig)
- time.sleep(0.1)
- port = client.EventsClientDaemon.get_instance().get_port()
- self.assertFalse(bool(complist[sig]))
- self.assertTrue(port not in complist[sig])
+ def cbk2(event, _):
+ return True
- def test_client_unregister_by_uid(self):
- """
- Test that the client can unregister an event by uid.
- """
- sig = CLIENT_UID
- complist = server.registered_clients
- events.register(sig, lambda x: True, uid='cbkuid')
- events.register(sig, lambda x: True, uid='cbkuid2')
- time.sleep(0.1)
- events.unregister(sig, uid='cbkuid')
- time.sleep(0.1)
- port = client.EventsClientDaemon.get_instance().get_port()
- self.assertTrue(sig in complist)
- self.assertTrue(len(complist[sig]) == 1)
- self.assertTrue(
- client.registered_callbacks[sig].pop()[0] == 'cbkuid2')
- self.assertTrue(port in complist[sig])
-
- def test_server_replies_ping(self):
- """
- Ensure server replies to a ping.
- """
- request = PingRequest()
- service = RpcService(EventsServerService_Stub, port, 'localhost')
- response = service.ping(request, timeout=1000)
- self.assertIsNotNone(response)
- self.assertEqual(EventResponse.OK, response.status,
- 'Wrong response status.')
-
- def test_client_replies_ping(self):
- """
- Ensure clients reply to a ping.
- """
- daemon = client.ensure_client_daemon()
- port = daemon.get_port()
- request = PingRequest()
- service = RpcService(EventsClientService_Stub, port, 'localhost')
- response = service.ping(request, timeout=1000)
- self.assertEqual(EventResponse.OK, response.status,
- 'Wrong response status.')
-
- def test_server_ping(self):
- """
- Ensure the function from server module pings correctly.
- """
- response = server.ping()
- self.assertIsNotNone(response)
- self.assertEqual(EventResponse.OK, response.status,
- 'Wrong response status.')
+ uid2 = self._client.register(event2, cbk2)
+ # assert for correct registration
+ self.assertTrue(len(callbacks) == 2)
+ self.assertTrue(callbacks[event2][uid2] == cbk2,
+ 'Could not register event in local client.')
- def test_client_ping(self):
+ def test_register_signal_replace(self):
"""
- Ensure the function from client module pings correctly.
+ Make sure clients can replace already registered callbacks.
"""
- daemon = client.ensure_client_daemon()
- response = client.ping(daemon.get_port())
- self.assertIsNotNone(response)
- self.assertEqual(EventResponse.OK, response.status,
- 'Wrong response status.')
+ event = catalog.CLIENT_UID
+ d = defer.Deferred()
- def test_module_ping_server(self):
- """
- Ensure the function from main module pings server correctly.
- """
- response = events.ping_server()
- self.assertIsNotNone(response)
- self.assertEqual(EventResponse.OK, response.status,
- 'Wrong response status.')
+ def cbk_fail(event, _):
+ return callFromThread(d.errback, event)
- def test_module_ping_client(self):
- """
- Ensure the function from main module pings clients correctly.
- """
- daemon = client.ensure_client_daemon()
- response = events.ping_client(daemon.get_port())
- self.assertIsNotNone(response)
- self.assertEqual(EventResponse.OK, response.status,
- 'Wrong response status.')
+ def cbk_succeed(event, _):
+ return callFromThread(d.callback, event)
+
+ self._client.register(event, cbk_fail, uid=1)
+ self._client.register(event, cbk_succeed, uid=1, replace=True)
+ self._client.emit(event, None)
+ return d
- def test_ensure_server_raises_if_port_taken(self):
+ def test_register_signal_replace_fails_when_replace_is_false(self):
"""
- Verify that server raises an exception if port is already taken.
+ Make sure clients trying to replace already registered callbacks fail
+ when replace=False
"""
- # get a random free port
- while True:
- port = random.randint(1024, 65535)
- try:
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect(('localhost', port))
- s.close()
- except:
- break
-
- class PortBlocker(threading.Thread):
-
- def run(self):
- conns = 0
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.bind(('localhost', port))
- s.setblocking(1)
- s.listen(1)
- while conns < 2: # blocks until rece
- conns += 1
- s.accept()
- s.close()
-
- # block the port
- taker = PortBlocker()
- taker.start()
- time.sleep(1) # wait for thread to start.
+ event = catalog.CLIENT_UID
+ self._client.register(event, lambda event, _: None, uid=1)
self.assertRaises(
- server.PortAlreadyTaken, server.ensure_server, port)
+ CallbackAlreadyRegisteredError,
+ self._client.register,
+ event, lambda event, _: None, uid=1, replace=False)
- def test_async_register(self):
+ def test_register_more_than_one_callback_works(self):
"""
- Test asynchronous registering of callbacks.
+ Make sure clients can replace already registered callbacks.
"""
- flag = Mock()
+ event = catalog.CLIENT_UID
+ d1 = defer.Deferred()
+
+ def cbk1(event, _):
+ return callFromThread(d1.callback, event)
- # executed after async register, when response is received from server
- def reqcbk(request, response):
- flag(request.event, response.status)
+ d2 = defer.Deferred()
- # callback registered by application
- def callback(request):
- pass
+ def cbk2(event, _):
+ return d2.callback(event)
- # passing a callback as reqcbk param makes the call asynchronous
- result = events.register(CLIENT_UID, callback, reqcbk=reqcbk)
- self.assertIsNone(result)
- events.signal(CLIENT_UID)
- time.sleep(1) # wait for signal to arrive from server
- flag.assert_called_once_with(CLIENT_UID, EventResponse.OK)
+ self._client.register(event, cbk1)
+ self._client.register(event, cbk2)
+ self._client.emit(event, None)
+ d = defer.gatherResults([d1, d2])
+ return d
- def test_async_signal(self):
+ def test_client_receives_signal(self):
"""
- Test asynchronous registering of callbacks.
+ Ensure clients can receive signals.
"""
- flag = Mock()
+ event = catalog.CLIENT_UID
+ d = defer.Deferred()
- # executed after async signal, when response is received from server
- def reqcbk(request, response):
- flag(request.event, response.status)
+ def cbk(events, _):
+ callFromThread(d.callback, event)
- # passing a callback as reqcbk param makes the call asynchronous
- result = events.signal(CLIENT_UID, reqcbk=reqcbk)
- self.assertIsNone(result)
- time.sleep(1) # wait for signal to arrive from server
- flag.assert_called_once_with(CLIENT_UID, EventResponse.OK)
+ self._client.register(event, cbk)
+ self._client.emit(event, None)
+ return d
- def test_async_unregister(self):
+ def test_client_unregister_all(self):
"""
- Test asynchronous unregistering of callbacks.
+ Test that the client can unregister all events for one signal.
"""
- flag = Mock()
+ event1 = catalog.CLIENT_UID
+ d = defer.Deferred()
+ # register more than one callback for the same event
+ self._client.register(
+ event1, lambda ev, _: callFromThread(d.errback, None))
+ self._client.register(
+ event1, lambda ev, _: callFromThread(d.errback, None))
+ # unregister and emit the event
+ self._client.unregister(event1)
+ self._client.emit(event1, None)
+ # register and emit another event so the deferred can succeed
+ event2 = catalog.CLIENT_SESSION_ID
+ self._client.register(
+ event2, lambda ev, _: callFromThread(d.callback, None))
+ self._client.emit(event2, None)
+ return d
- # executed after async signal, when response is received from server
- def reqcbk(request, response):
- flag(request.event, response.status)
-
- # callback registered by application
- def callback(request):
- pass
-
- # passing a callback as reqcbk param makes the call asynchronous
- events.register(CLIENT_UID, callback)
- result = events.unregister(CLIENT_UID, reqcbk=reqcbk)
- self.assertIsNone(result)
- time.sleep(1) # wait for signal to arrive from server
- flag.assert_called_once_with(CLIENT_UID, EventResponse.OK)
-
- def test_async_ping_server(self):
+ def test_client_unregister_by_uid(self):
"""
- Test asynchronous pinging of server.
+ Test that the client can unregister an event by uid.
"""
- flag = Mock()
+ event = catalog.CLIENT_UID
+ d = defer.Deferred()
+ # register one callback that would fail
+ uid = self._client.register(
+ event, lambda ev, _: callFromThread(d.errback, None))
+ # register one callback that will succeed
+ self._client.register(
+ event, lambda ev, _: callFromThread(d.callback, None))
+ # unregister by uid and emit the event
+ self._client.unregister(event, uid=uid)
+ self._client.emit(event, None)
+ return d
- # executed after async signal, when response is received from server
- def reqcbk(request, response):
- flag(response.status)
- result = events.ping_server(reqcbk=reqcbk)
- self.assertIsNone(result)
- time.sleep(1) # wait for response to arrive from server.
- flag.assert_called_once_with(EventResponse.OK)
+class EventsTxClientTestCase(EventsGenericClientTestCase, unittest.TestCase):
+
+ _client = txclient
- def test_async_ping_client(self):
- """
- Test asynchronous pinging of client.
- """
- flag = Mock()
- # executed after async signal, when response is received from server
- def reqcbk(request, response):
- flag(response.status)
+class EventsClientTestCase(EventsGenericClientTestCase, unittest.TestCase):
- daemon = client.ensure_client_daemon()
- result = events.ping_client(daemon.get_port(), reqcbk=reqcbk)
- self.assertIsNone(result)
- time.sleep(1) # wait for response to arrive from server.
- flag.assert_called_once_with(EventResponse.OK)
+ _client = client
diff --git a/src/leap/common/tests/test_http.py b/src/leap/common/tests/test_http.py
new file mode 100644
index 0000000..f44550f
--- /dev/null
+++ b/src/leap/common/tests/test_http.py
@@ -0,0 +1,75 @@
+# -*- coding: utf-8 -*-
+# test_http.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for:
+ * leap/common/http.py
+"""
+import os
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+
+from leap.common import http
+from leap.common.testing.basetest import BaseLeapTest
+
+TEST_CERT_PEM = os.path.join(
+ os.path.split(__file__)[0],
+ '..', 'testing', "leaptest_combined_keycert.pem")
+
+
+class HTTPClientTest(BaseLeapTest):
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_agents_sharing_pool_by_default(self):
+ client = http.HTTPClient()
+ client2 = http.HTTPClient(TEST_CERT_PEM)
+ self.assertNotEquals(
+ client._agent, client2._agent, "Expected dedicated agents")
+ self.assertEquals(
+ client._agent._pool, client2._agent._pool,
+ "Pool was not reused by default")
+
+ def test_agent_can_have_dedicated_custom_pool(self):
+ custom_pool = http._HTTPConnectionPool(
+ None,
+ timeout=10,
+ maxPersistentPerHost=42,
+ persistent=False
+ )
+ self.assertEquals(custom_pool.maxPersistentPerHost, 42,
+ "Custom persistent connections "
+ "limit is not being respected")
+ self.assertFalse(custom_pool.persistent,
+ "Custom persistence is not being respected")
+ default_client = http.HTTPClient()
+ custom_client = http.HTTPClient(pool=custom_pool)
+
+ self.assertNotEquals(
+ default_client._agent, custom_client._agent,
+ "No agent reuse is expected")
+ self.assertEquals(
+ custom_pool, custom_client._agent._pool,
+ "Custom pool usage was not respected")
+
+if __name__ == "__main__":
+ unittest.main()