summaryrefslogtreecommitdiff
path: root/src/leap/common/tests
diff options
context:
space:
mode:
authorMicah Anderson <micah@riseup.net>2013-08-13 14:25:57 -0400
committerMicah Anderson <micah@riseup.net>2013-08-13 14:27:01 -0400
commitddff9c2dda51fd28dcdc8e2db998d7254f69a41b (patch)
treeb000723d9c7e54f0ac163e54f8d0d6fcd829209a /src/leap/common/tests
parentcc8dd1e7f3d64ae6d7d69ed08a4346bfabfe7b22 (diff)
parent0e721b1b47c3b94f6d4d6709e34b6b816f9fd810 (diff)
Merge tag '0.3.0' into debian
Tag leap.common version 0.3.0
Diffstat (limited to 'src/leap/common/tests')
-rw-r--r--src/leap/common/tests/test_check.py52
-rw-r--r--src/leap/common/tests/test_crypto.py88
-rw-r--r--src/leap/common/tests/test_events.py316
3 files changed, 324 insertions, 132 deletions
diff --git a/src/leap/common/tests/test_check.py b/src/leap/common/tests/test_check.py
new file mode 100644
index 0000000..6ce8493
--- /dev/null
+++ b/src/leap/common/tests/test_check.py
@@ -0,0 +1,52 @@
+# -*- coding: utf-8 -*-
+# test_check.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for:
+ * leap/common/check.py
+"""
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+
+import mock
+
+from leap.common import check
+
+class CheckTests(unittest.TestCase):
+ def test_raises_on_false_condition(self):
+ with self.assertRaises(AssertionError):
+ check.leap_assert(False, "Condition")
+
+ def test_raises_on_none_condition(self):
+ with self.assertRaises(AssertionError):
+ check.leap_assert(None, "Condition")
+
+ def test_suceeds_with_good_condition(self):
+ check.leap_assert(True, "")
+
+ def test_raises_on_bad_type(self):
+ with self.assertRaises(AssertionError):
+ check.leap_assert_type(42, str)
+
+ def test_succeeds_on_good_type(self):
+ check.leap_assert_type(42, int)
+
+ @mock.patch("traceback.extract_stack", mock.MagicMock(return_value=None))
+ def test_does_not_raise_on_bug(self):
+ with self.assertRaises(AssertionError):
+ check.leap_assert(False, "")
diff --git a/src/leap/common/tests/test_crypto.py b/src/leap/common/tests/test_crypto.py
deleted file mode 100644
index ae7dc71..0000000
--- a/src/leap/common/tests/test_crypto.py
+++ /dev/null
@@ -1,88 +0,0 @@
-## -*- coding: utf-8 -*-
-# test_crypto.py
-# Copyright (C) 2013 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
-"""
-Tests for the crypto submodule.
-"""
-
-
-import os
-import binascii
-
-
-from leap.common.testing.basetest import BaseLeapTest
-from leap.common import crypto
-from Crypto import Random
-
-
-class CryptoTestCase(BaseLeapTest):
-
- def setUp(self):
- pass
-
- def tearDown(self):
- pass
-
- def test_encrypt_decrypt_sym(self):
- # generate 256-bit key
- key = Random.new().read(32)
- iv, cyphertext = crypto.encrypt_sym(
- 'data', key,
- method=crypto.EncryptionMethods.AES_256_CTR)
- self.assertTrue(cyphertext is not None)
- self.assertTrue(cyphertext != '')
- self.assertTrue(cyphertext != 'data')
- plaintext = crypto.decrypt_sym(
- cyphertext, key, iv=iv,
- method=crypto.EncryptionMethods.AES_256_CTR)
- self.assertEqual('data', plaintext)
-
- def test_decrypt_with_wrong_iv_fails(self):
- key = Random.new().read(32)
- iv, cyphertext = crypto.encrypt_sym(
- 'data', key,
- method=crypto.EncryptionMethods.AES_256_CTR)
- self.assertTrue(cyphertext is not None)
- self.assertTrue(cyphertext != '')
- self.assertTrue(cyphertext != 'data')
- # get a different iv by changing the first byte
- rawiv = binascii.a2b_base64(iv)
- wrongiv = rawiv
- while wrongiv == rawiv:
- wrongiv = os.urandom(1) + rawiv[1:]
- plaintext = crypto.decrypt_sym(
- cyphertext, key, iv=binascii.b2a_base64(wrongiv),
- method=crypto.EncryptionMethods.AES_256_CTR)
- self.assertNotEqual('data', plaintext)
-
- def test_decrypt_with_wrong_key_fails(self):
- key = Random.new().read(32)
- iv, cyphertext = crypto.encrypt_sym(
- 'data', key,
- method=crypto.EncryptionMethods.AES_256_CTR)
- self.assertTrue(cyphertext is not None)
- self.assertTrue(cyphertext != '')
- self.assertTrue(cyphertext != 'data')
- wrongkey = Random.new().read(32) # 256-bits key
- # ensure keys are different in case we are extremely lucky
- while wrongkey == key:
- wrongkey = Random.new().read(32)
- plaintext = crypto.decrypt_sym(
- cyphertext, wrongkey, iv=iv,
- method=crypto.EncryptionMethods.AES_256_CTR)
- self.assertNotEqual('data', plaintext)
diff --git a/src/leap/common/tests/test_events.py b/src/leap/common/tests/test_events.py
index 8c0bd36..0779b2e 100644
--- a/src/leap/common/tests/test_events.py
+++ b/src/leap/common/tests/test_events.py
@@ -18,19 +18,27 @@
import unittest
import sets
import time
+import socket
+import threading
+import random
+
+
+from mock import Mock
from protobuf.socketrpc import RpcService
from leap.common import events
from leap.common.events import (
server,
- component,
+ client,
mac_auth,
)
from leap.common.events.events_pb2 import (
EventsServerService,
EventsServerService_Stub,
+ EventsClientService_Stub,
EventResponse,
SignalRequest,
RegisterRequest,
+ PingRequest,
SOLEDAD_CREATING_KEYS,
CLIENT_UID,
)
@@ -39,11 +47,6 @@ from leap.common.events.events_pb2 import (
port = 8090
received = False
-local_callback_executed = False
-
-
-def callback(request, reponse):
- return True
class EventsTestCase(unittest.TestCase):
@@ -51,7 +54,7 @@ class EventsTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
server.EventsServerDaemon.ensure(8090)
- cls.callbacks = events.component.registered_callbacks
+ cls.callbacks = events.client.registered_callbacks
@classmethod
def tearDownClass(cls):
@@ -62,8 +65,8 @@ class EventsTestCase(unittest.TestCase):
super(EventsTestCase, self).setUp()
def tearDown(self):
- #events.component.registered_callbacks = {}
- server.registered_components = {}
+ #events.client.registered_callbacks = {}
+ server.registered_clients = {}
super(EventsTestCase, self).tearDown()
def test_service_singleton(self):
@@ -76,24 +79,24 @@ class EventsTestCase(unittest.TestCase):
self.assertEqual(service1, service2,
"Can't get singleton class for service.")
- def test_component_register(self):
+ def test_client_register(self):
"""
- Ensure components can register callbacks.
+ Ensure clients can register callbacks.
"""
self.assertTrue(1 not in self.callbacks,
'There should should be no callback for this signal.')
events.register(1, lambda x: True)
self.assertTrue(1 in self.callbacks,
- 'Could not register signal in local component.')
+ 'Could not register signal in local client.')
events.register(2, lambda x: True)
self.assertTrue(1 in self.callbacks,
- 'Could not register signal in local component.')
+ 'Could not register signal in local client.')
self.assertTrue(2 in self.callbacks,
- 'Could not register signal in local component.')
+ 'Could not register signal in local client.')
def test_register_signal_replace(self):
"""
- Make sure components can replace already registered callbacks.
+ Make sure clients can replace already registered callbacks.
"""
sig = 3
cbk = lambda x: True
@@ -120,21 +123,32 @@ class EventsTestCase(unittest.TestCase):
response = service.signal(request, timeout=1000)
self.assertEqual(EventResponse.OK, response.status,
'Wrong response status.')
- # test asynch
- def local_callback(request, response):
- global local_callback_executed
- local_callback_executed = True
+ def test_signal_executes_callback(self):
+ """
+ Ensure callback is executed upon receiving signal.
+ """
+ sig = CLIENT_UID
+ request = SignalRequest()
+ request.event = sig
+ request.content = 'my signal contents'
+ request.mac_method = mac_auth.MacMethod.MAC_NONE
+ request.mac = ""
+ service = RpcService(EventsServerService_Stub, port, 'localhost')
- events.register(sig, local_callback)
- service.signal(request, callback=local_callback)
- time.sleep(0.1)
- self.assertTrue(local_callback_executed,
- 'Local callback did not execute.')
+ # register a callback
+ flag = Mock()
+ events.register(sig, lambda req: flag(req.event))
+ # signal
+ response = service.signal(request)
+ self.assertEqual(EventResponse.OK, response.status,
+ 'Wrong response status.')
+ time.sleep(1) # wait for signal to arrive
+ flag.assert_called_once_with(sig)
def test_events_server_service_register(self):
"""
- Ensure the server can register components to be signaled.
+ Ensure the server can register clients to be signaled.
"""
sig = 5
request = RegisterRequest()
@@ -143,42 +157,39 @@ class EventsTestCase(unittest.TestCase):
request.mac_method = mac_auth.MacMethod.MAC_NONE
request.mac = ""
service = RpcService(EventsServerService_Stub, port, 'localhost')
- complist = server.registered_components
+ complist = server.registered_clients
self.assertEqual({}, complist,
'There should be no registered_ports when '
'server has just been created.')
response = service.register(request, timeout=1000)
self.assertTrue(sig in complist, "Signal not registered succesfully.")
self.assertTrue(8091 in complist[sig],
- 'Failed registering component port.')
+ 'Failed registering client port.')
- def test_component_request_register(self):
+ def test_client_request_register(self):
"""
- Ensure components can register themselves with server.
+ Ensure clients can register themselves with server.
"""
sig = 6
- complist = server.registered_components
+ complist = server.registered_clients
self.assertTrue(sig not in complist,
- 'There should be no registered components for this '
+ 'There should be no registered clients for this '
'signal.')
events.register(sig, lambda x: True)
time.sleep(0.1)
- port = component.EventsComponentDaemon.get_instance().get_port()
- self.assertTrue(sig in complist, 'Failed registering component.')
+ port = client.EventsClientDaemon.get_instance().get_port()
+ self.assertTrue(sig in complist, 'Failed registering client.')
self.assertTrue(port in complist[sig],
- 'Failed registering component port.')
+ 'Failed registering client port.')
- def test_component_receives_signal(self):
+ def test_client_receives_signal(self):
"""
- Ensure components can receive signals.
+ Ensure clients can receive signals.
"""
sig = 7
+ flag = Mock()
- def getsig(param=None):
- global received
- received = True
-
- events.register(sig, getsig)
+ events.register(sig, lambda req: flag(req.event))
request = SignalRequest()
request.event = sig
request.content = ""
@@ -188,13 +199,230 @@ class EventsTestCase(unittest.TestCase):
response = service.signal(request, timeout=1000)
self.assertTrue(response is not None, 'Did not receive response.')
time.sleep(0.5)
- self.assertTrue(received, 'Did not receive signal back.')
+ flag.assert_called_once_with(sig)
- def test_component_send_signal(self):
+ def test_client_send_signal(self):
"""
- Ensure components can send signals.
+ Ensure clients can send signals.
"""
sig = 8
response = events.signal(sig)
self.assertTrue(response.status == response.OK,
'Received wrong response status when signaling.')
+
+ def test_client_unregister_all(self):
+ """
+ Test that the client can unregister all events for one signal.
+ """
+ sig = CLIENT_UID
+ complist = server.registered_clients
+ events.register(sig, lambda x: True)
+ events.register(sig, lambda x: True)
+ time.sleep(0.1)
+ events.unregister(sig)
+ time.sleep(0.1)
+ port = client.EventsClientDaemon.get_instance().get_port()
+ self.assertFalse(bool(complist[sig]))
+ self.assertTrue(port not in complist[sig])
+
+ def test_client_unregister_by_uid(self):
+ """
+ Test that the client can unregister an event by uid.
+ """
+ sig = CLIENT_UID
+ complist = server.registered_clients
+ events.register(sig, lambda x: True, uid='cbkuid')
+ events.register(sig, lambda x: True, uid='cbkuid2')
+ time.sleep(0.1)
+ events.unregister(sig, uid='cbkuid')
+ time.sleep(0.1)
+ port = client.EventsClientDaemon.get_instance().get_port()
+ self.assertTrue(sig in complist)
+ self.assertTrue(len(complist[sig]) == 1)
+ self.assertTrue(
+ client.registered_callbacks[sig].pop()[0] == 'cbkuid2')
+ self.assertTrue(port in complist[sig])
+
+ def test_server_replies_ping(self):
+ """
+ Ensure server replies to a ping.
+ """
+ request = PingRequest()
+ service = RpcService(EventsServerService_Stub, port, 'localhost')
+ response = service.ping(request, timeout=1000)
+ self.assertIsNotNone(response)
+ self.assertEqual(EventResponse.OK, response.status,
+ 'Wrong response status.')
+
+ def test_client_replies_ping(self):
+ """
+ Ensure clients reply to a ping.
+ """
+ daemon = client.ensure_client_daemon()
+ port = daemon.get_port()
+ request = PingRequest()
+ service = RpcService(EventsClientService_Stub, port, 'localhost')
+ response = service.ping(request, timeout=1000)
+ self.assertEqual(EventResponse.OK, response.status,
+ 'Wrong response status.')
+
+ def test_server_ping(self):
+ """
+ Ensure the function from server module pings correctly.
+ """
+ response = server.ping()
+ self.assertIsNotNone(response)
+ self.assertEqual(EventResponse.OK, response.status,
+ 'Wrong response status.')
+
+ def test_client_ping(self):
+ """
+ Ensure the function from client module pings correctly.
+ """
+ daemon = client.ensure_client_daemon()
+ response = client.ping(daemon.get_port())
+ self.assertIsNotNone(response)
+ self.assertEqual(EventResponse.OK, response.status,
+ 'Wrong response status.')
+
+ def test_module_ping_server(self):
+ """
+ Ensure the function from main module pings server correctly.
+ """
+ response = events.ping_server()
+ self.assertIsNotNone(response)
+ self.assertEqual(EventResponse.OK, response.status,
+ 'Wrong response status.')
+
+ def test_module_ping_client(self):
+ """
+ Ensure the function from main module pings clients correctly.
+ """
+ daemon = client.ensure_client_daemon()
+ response = events.ping_client(daemon.get_port())
+ self.assertIsNotNone(response)
+ self.assertEqual(EventResponse.OK, response.status,
+ 'Wrong response status.')
+
+ def test_ensure_server_raises_if_port_taken(self):
+ """
+ Verify that server raises an exception if port is already taken.
+ """
+ # get a random free port
+ while True:
+ port = random.randint(1024, 65535)
+ try:
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect(('localhost', port))
+ s.close()
+ except:
+ break
+
+ class PortBlocker(threading.Thread):
+
+ def run(self):
+ conns = 0
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.bind(('localhost', port))
+ s.setblocking(1)
+ s.listen(1)
+ while conns < 2: # blocks until rece
+ conns += 1
+ s.accept()
+ s.close()
+
+ # block the port
+ taker = PortBlocker()
+ taker.start()
+ time.sleep(1) # wait for thread to start.
+ self.assertRaises(
+ server.PortAlreadyTaken, server.ensure_server, port)
+
+ def test_async_register(self):
+ """
+ Test asynchronous registering of callbacks.
+ """
+ flag = Mock()
+
+ # executed after async register, when response is received from server
+ def reqcbk(request, response):
+ flag(request.event, response.status)
+
+ # callback registered by application
+ def callback(request):
+ pass
+
+ # passing a callback as reqcbk param makes the call asynchronous
+ result = events.register(CLIENT_UID, callback, reqcbk=reqcbk)
+ self.assertIsNone(result)
+ events.signal(CLIENT_UID)
+ time.sleep(1) # wait for signal to arrive from server
+ flag.assert_called_once_with(CLIENT_UID, EventResponse.OK)
+
+ def test_async_signal(self):
+ """
+ Test asynchronous registering of callbacks.
+ """
+ flag = Mock()
+
+ # executed after async signal, when response is received from server
+ def reqcbk(request, response):
+ flag(request.event, response.status)
+
+ # passing a callback as reqcbk param makes the call asynchronous
+ result = events.signal(CLIENT_UID, reqcbk=reqcbk)
+ self.assertIsNone(result)
+ time.sleep(1) # wait for signal to arrive from server
+ flag.assert_called_once_with(CLIENT_UID, EventResponse.OK)
+
+ def test_async_unregister(self):
+ """
+ Test asynchronous unregistering of callbacks.
+ """
+ flag = Mock()
+
+ # executed after async signal, when response is received from server
+ def reqcbk(request, response):
+ flag(request.event, response.status)
+
+ # callback registered by application
+ def callback(request):
+ pass
+
+ # passing a callback as reqcbk param makes the call asynchronous
+ events.register(CLIENT_UID, callback)
+ result = events.unregister(CLIENT_UID, reqcbk=reqcbk)
+ self.assertIsNone(result)
+ time.sleep(1) # wait for signal to arrive from server
+ flag.assert_called_once_with(CLIENT_UID, EventResponse.OK)
+
+ def test_async_ping_server(self):
+ """
+ Test asynchronous pinging of server.
+ """
+ flag = Mock()
+
+ # executed after async signal, when response is received from server
+ def reqcbk(request, response):
+ flag(response.status)
+
+ result = events.ping_server(reqcbk=reqcbk)
+ self.assertIsNone(result)
+ time.sleep(1) # wait for response to arrive from server.
+ flag.assert_called_once_with(EventResponse.OK)
+
+ def test_async_ping_client(self):
+ """
+ Test asynchronous pinging of client.
+ """
+ flag = Mock()
+
+ # executed after async signal, when response is received from server
+ def reqcbk(request, response):
+ flag(response.status)
+
+ daemon = client.ensure_client_daemon()
+ result = events.ping_client(daemon.get_port(), reqcbk=reqcbk)
+ self.assertIsNone(result)
+ time.sleep(1) # wait for response to arrive from server.
+ flag.assert_called_once_with(EventResponse.OK)