From 514c1434a016b09d93e8dfc5578b14825d14005a Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 4 Feb 2015 15:04:10 -0200 Subject: [feat] refactor events to use ZMQ Before this commit, protobuf and protobuf.socketrpc were used to serialize and transmit messages between events clients. This change implements a simpler ZMQ client/server events mechanism that uses ZMQ sockets for transmitting messages from clients to server and to redistribute such messages to subscribed clients. Closes: #6359 --- src/leap/common/tests/test_events.py | 468 ++++++++--------------------------- 1 file changed, 105 insertions(+), 363 deletions(-) (limited to 'src/leap/common/tests') diff --git a/src/leap/common/tests/test_events.py b/src/leap/common/tests/test_events.py index 0779b2e..7ef3e1b 100644 --- a/src/leap/common/tests/test_events.py +++ b/src/leap/common/tests/test_events.py @@ -15,414 +15,156 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import unittest -import sets -import time -import socket -import threading -import random - - -from mock import Mock -from protobuf.socketrpc import RpcService -from leap.common import events -from leap.common.events import ( - server, - client, - mac_auth, -) -from leap.common.events.events_pb2 import ( - EventsServerService, - EventsServerService_Stub, - EventsClientService_Stub, - EventResponse, - SignalRequest, - RegisterRequest, - PingRequest, - SOLEDAD_CREATING_KEYS, - CLIENT_UID, -) +import os +import logging +import time -port = 8090 +from twisted.trial import unittest +from twisted.internet import defer -received = False +from leap.common.events import server +from leap.common.events import client +from leap.common.events import txclient +from leap.common.events import catalog +from leap.common.events.errors import CallbackAlreadyRegisteredError -class EventsTestCase(unittest.TestCase): +if 'DEBUG' in os.environ: + logging.basicConfig(level=logging.DEBUG) - @classmethod - def setUpClass(cls): - server.EventsServerDaemon.ensure(8090) - cls.callbacks = events.client.registered_callbacks - @classmethod - def tearDownClass(cls): - # give some time for requests to be processed. - time.sleep(1) +class EventsGenericClientTestCase(object): def setUp(self): - super(EventsTestCase, self).setUp() + self._server = server.ensure_server( + emit_addr="tcp://127.0.0.1:0", + reg_addr="tcp://127.0.0.1:0") + self._client.configure_client( + emit_addr="tcp://127.0.0.1:%d" % self._server.pull_port, + reg_addr="tcp://127.0.0.1:%d" % self._server.pub_port) def tearDown(self): - #events.client.registered_callbacks = {} - server.registered_clients = {} - super(EventsTestCase, self).tearDown() - - def test_service_singleton(self): - """ - Ensure that there's always just one instance of the server daemon - running. - """ - service1 = server.EventsServerDaemon.ensure(8090) - service2 = server.EventsServerDaemon.ensure(8090) - self.assertEqual(service1, service2, - "Can't get singleton class for service.") + self._client.shutdown() + self._server.shutdown() + # wait a bit for sockets to close properly + time.sleep(0.1) def test_client_register(self): """ Ensure clients can register callbacks. """ - self.assertTrue(1 not in self.callbacks, - 'There should should be no callback for this signal.') - events.register(1, lambda x: True) - self.assertTrue(1 in self.callbacks, - 'Could not register signal in local client.') - events.register(2, lambda x: True) - self.assertTrue(1 in self.callbacks, - 'Could not register signal in local client.') - self.assertTrue(2 in self.callbacks, - 'Could not register signal in local client.') + callbacks = self._client.instance().callbacks + self.assertTrue(len(callbacks) == 0, + 'There should be no callback for this event.') + # register one event + event1 = catalog.CLIENT_UID + cbk1 = lambda event, _: True + uid1 = self._client.register(event1, cbk1) + # assert for correct registration + self.assertTrue(len(callbacks) == 1) + self.assertTrue(callbacks[event1][uid1] == cbk1, + 'Could not register event in local client.') + # register another event + event2 = catalog.CLIENT_SESSION_ID + cbk2 = lambda event, _: True + uid2 = self._client.register(event2, cbk2) + # assert for correct registration + self.assertTrue(len(callbacks) == 2) + self.assertTrue(callbacks[event2][uid2] == cbk2, + 'Could not register event in local client.') def test_register_signal_replace(self): """ Make sure clients can replace already registered callbacks. """ - sig = 3 - cbk = lambda x: True - events.register(sig, cbk, uid=1) - self.assertRaises(Exception, events.register, sig, lambda x: True, - uid=1) - events.register(sig, lambda x: True, uid=1, replace=True) - self.assertTrue(sig in self.callbacks, 'Could not register signal.') - self.assertEqual(1, len(self.callbacks[sig]), - 'Wrong number of registered callbacks.') - - def test_signal_response_status(self): - """ - Ensure there's an appropriate response from server when signaling. - """ - sig = 4 - request = SignalRequest() - request.event = sig - request.content = 'my signal contents' - request.mac_method = mac_auth.MacMethod.MAC_NONE - request.mac = "" - service = RpcService(EventsServerService_Stub, port, 'localhost') - # test synch - response = service.signal(request, timeout=1000) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') + event = catalog.CLIENT_UID + d = defer.Deferred() + cbk_fail = lambda event, _: d.errback(event) + cbk_succeed = lambda event, _: d.callback(event) + self._client.register(event, cbk_fail, uid=1) + self._client.register(event, cbk_succeed, uid=1, replace=True) + self._client.emit(event, None) + return d - def test_signal_executes_callback(self): - """ - Ensure callback is executed upon receiving signal. + def test_register_signal_replace_fails_when_replace_is_false(self): """ - sig = CLIENT_UID - request = SignalRequest() - request.event = sig - request.content = 'my signal contents' - request.mac_method = mac_auth.MacMethod.MAC_NONE - request.mac = "" - service = RpcService(EventsServerService_Stub, port, 'localhost') - - # register a callback - flag = Mock() - events.register(sig, lambda req: flag(req.event)) - # signal - response = service.signal(request) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') - time.sleep(1) # wait for signal to arrive - flag.assert_called_once_with(sig) - - def test_events_server_service_register(self): + Make sure clients trying to replace already registered callbacks fail + when replace=False """ - Ensure the server can register clients to be signaled. - """ - sig = 5 - request = RegisterRequest() - request.event = sig - request.port = 8091 - request.mac_method = mac_auth.MacMethod.MAC_NONE - request.mac = "" - service = RpcService(EventsServerService_Stub, port, 'localhost') - complist = server.registered_clients - self.assertEqual({}, complist, - 'There should be no registered_ports when ' - 'server has just been created.') - response = service.register(request, timeout=1000) - self.assertTrue(sig in complist, "Signal not registered succesfully.") - self.assertTrue(8091 in complist[sig], - 'Failed registering client port.') + event = catalog.CLIENT_UID + self._client.register(event, lambda event, _: None, uid=1) + self.assertRaises( + CallbackAlreadyRegisteredError, + self._client.register, + event, lambda event, _: None, uid=1, replace=False) - def test_client_request_register(self): + def test_register_more_than_one_callback_works(self): """ - Ensure clients can register themselves with server. + Make sure clients can replace already registered callbacks. """ - sig = 6 - complist = server.registered_clients - self.assertTrue(sig not in complist, - 'There should be no registered clients for this ' - 'signal.') - events.register(sig, lambda x: True) - time.sleep(0.1) - port = client.EventsClientDaemon.get_instance().get_port() - self.assertTrue(sig in complist, 'Failed registering client.') - self.assertTrue(port in complist[sig], - 'Failed registering client port.') + event = catalog.CLIENT_UID + d1 = defer.Deferred() + cbk1 = lambda event, _: d1.callback(event) + d2 = defer.Deferred() + cbk2 = lambda event, _: d2.callback(event) + self._client.register(event, cbk1) + self._client.register(event, cbk2) + self._client.emit(event, None) + d = defer.gatherResults([d1, d2]) + return d def test_client_receives_signal(self): """ Ensure clients can receive signals. """ - sig = 7 - flag = Mock() - - events.register(sig, lambda req: flag(req.event)) - request = SignalRequest() - request.event = sig - request.content = "" - request.mac_method = mac_auth.MacMethod.MAC_NONE - request.mac = "" - service = RpcService(EventsServerService_Stub, port, 'localhost') - response = service.signal(request, timeout=1000) - self.assertTrue(response is not None, 'Did not receive response.') - time.sleep(0.5) - flag.assert_called_once_with(sig) - - def test_client_send_signal(self): - """ - Ensure clients can send signals. - """ - sig = 8 - response = events.signal(sig) - self.assertTrue(response.status == response.OK, - 'Received wrong response status when signaling.') + event = catalog.CLIENT_UID + d = defer.Deferred() + def cbk(events, _): + d.callback(event) + self._client.register(event, cbk) + self._client.emit(event, None) + return d def test_client_unregister_all(self): """ Test that the client can unregister all events for one signal. """ - sig = CLIENT_UID - complist = server.registered_clients - events.register(sig, lambda x: True) - events.register(sig, lambda x: True) - time.sleep(0.1) - events.unregister(sig) - time.sleep(0.1) - port = client.EventsClientDaemon.get_instance().get_port() - self.assertFalse(bool(complist[sig])) - self.assertTrue(port not in complist[sig]) + event1 = catalog.CLIENT_UID + d = defer.Deferred() + # register more than one callback for the same event + self._client.register(event1, lambda ev, _: d.errback(None)) + self._client.register(event1, lambda ev, _: d.errback(None)) + # unregister and emit the event + self._client.unregister(event1) + self._client.emit(event1, None) + # register and emit another event so the deferred can succeed + event2 = catalog.CLIENT_SESSION_ID + self._client.register(event2, lambda ev, _: d.callback(None)) + self._client.emit(event2, None) + return d def test_client_unregister_by_uid(self): """ Test that the client can unregister an event by uid. """ - sig = CLIENT_UID - complist = server.registered_clients - events.register(sig, lambda x: True, uid='cbkuid') - events.register(sig, lambda x: True, uid='cbkuid2') - time.sleep(0.1) - events.unregister(sig, uid='cbkuid') - time.sleep(0.1) - port = client.EventsClientDaemon.get_instance().get_port() - self.assertTrue(sig in complist) - self.assertTrue(len(complist[sig]) == 1) - self.assertTrue( - client.registered_callbacks[sig].pop()[0] == 'cbkuid2') - self.assertTrue(port in complist[sig]) - - def test_server_replies_ping(self): - """ - Ensure server replies to a ping. - """ - request = PingRequest() - service = RpcService(EventsServerService_Stub, port, 'localhost') - response = service.ping(request, timeout=1000) - self.assertIsNotNone(response) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') - - def test_client_replies_ping(self): - """ - Ensure clients reply to a ping. - """ - daemon = client.ensure_client_daemon() - port = daemon.get_port() - request = PingRequest() - service = RpcService(EventsClientService_Stub, port, 'localhost') - response = service.ping(request, timeout=1000) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') - - def test_server_ping(self): - """ - Ensure the function from server module pings correctly. - """ - response = server.ping() - self.assertIsNotNone(response) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') - - def test_client_ping(self): - """ - Ensure the function from client module pings correctly. - """ - daemon = client.ensure_client_daemon() - response = client.ping(daemon.get_port()) - self.assertIsNotNone(response) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') - - def test_module_ping_server(self): - """ - Ensure the function from main module pings server correctly. - """ - response = events.ping_server() - self.assertIsNotNone(response) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') - - def test_module_ping_client(self): - """ - Ensure the function from main module pings clients correctly. - """ - daemon = client.ensure_client_daemon() - response = events.ping_client(daemon.get_port()) - self.assertIsNotNone(response) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') - - def test_ensure_server_raises_if_port_taken(self): - """ - Verify that server raises an exception if port is already taken. - """ - # get a random free port - while True: - port = random.randint(1024, 65535) - try: - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.connect(('localhost', port)) - s.close() - except: - break - - class PortBlocker(threading.Thread): - - def run(self): - conns = 0 - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.bind(('localhost', port)) - s.setblocking(1) - s.listen(1) - while conns < 2: # blocks until rece - conns += 1 - s.accept() - s.close() - - # block the port - taker = PortBlocker() - taker.start() - time.sleep(1) # wait for thread to start. - self.assertRaises( - server.PortAlreadyTaken, server.ensure_server, port) - - def test_async_register(self): - """ - Test asynchronous registering of callbacks. - """ - flag = Mock() + event = catalog.CLIENT_UID + d = defer.Deferred() + # register one callback that would fail + uid = self._client.register(event, lambda ev, _: d.errback(None)) + # register one callback that will succeed + self._client.register(event, lambda ev, _: d.callback(None)) + # unregister by uid and emit the event + self._client.unregister(event, uid=uid) + self._client.emit(event, None) + return d - # executed after async register, when response is received from server - def reqcbk(request, response): - flag(request.event, response.status) - # callback registered by application - def callback(request): - pass +class EventsTxClientTestCase(EventsGenericClientTestCase, unittest.TestCase): - # passing a callback as reqcbk param makes the call asynchronous - result = events.register(CLIENT_UID, callback, reqcbk=reqcbk) - self.assertIsNone(result) - events.signal(CLIENT_UID) - time.sleep(1) # wait for signal to arrive from server - flag.assert_called_once_with(CLIENT_UID, EventResponse.OK) + _client = txclient - def test_async_signal(self): - """ - Test asynchronous registering of callbacks. - """ - flag = Mock() - - # executed after async signal, when response is received from server - def reqcbk(request, response): - flag(request.event, response.status) - - # passing a callback as reqcbk param makes the call asynchronous - result = events.signal(CLIENT_UID, reqcbk=reqcbk) - self.assertIsNone(result) - time.sleep(1) # wait for signal to arrive from server - flag.assert_called_once_with(CLIENT_UID, EventResponse.OK) - - def test_async_unregister(self): - """ - Test asynchronous unregistering of callbacks. - """ - flag = Mock() - - # executed after async signal, when response is received from server - def reqcbk(request, response): - flag(request.event, response.status) - - # callback registered by application - def callback(request): - pass - - # passing a callback as reqcbk param makes the call asynchronous - events.register(CLIENT_UID, callback) - result = events.unregister(CLIENT_UID, reqcbk=reqcbk) - self.assertIsNone(result) - time.sleep(1) # wait for signal to arrive from server - flag.assert_called_once_with(CLIENT_UID, EventResponse.OK) - - def test_async_ping_server(self): - """ - Test asynchronous pinging of server. - """ - flag = Mock() - - # executed after async signal, when response is received from server - def reqcbk(request, response): - flag(response.status) - - result = events.ping_server(reqcbk=reqcbk) - self.assertIsNone(result) - time.sleep(1) # wait for response to arrive from server. - flag.assert_called_once_with(EventResponse.OK) - - def test_async_ping_client(self): - """ - Test asynchronous pinging of client. - """ - flag = Mock() - # executed after async signal, when response is received from server - def reqcbk(request, response): - flag(response.status) +class EventsClientTestCase(EventsGenericClientTestCase, unittest.TestCase): - daemon = client.ensure_client_daemon() - result = events.ping_client(daemon.get_port(), reqcbk=reqcbk) - self.assertIsNone(result) - time.sleep(1) # wait for response to arrive from server. - flag.assert_called_once_with(EventResponse.OK) + _client = client -- cgit v1.2.3