diff options
| author | drebs <drebs@leap.se> | 2015-02-04 15:04:10 -0200 | 
|---|---|---|
| committer | drebs <drebs@leap.se> | 2015-05-27 14:37:27 -0300 | 
| commit | 514c1434a016b09d93e8dfc5578b14825d14005a (patch) | |
| tree | c4bacce1df24a81b2de3d1343dac26eb56e30ac7 /src/leap/common/tests | |
| parent | 71c750ef9c3e53ef416d1de6e85458f16ca48d74 (diff) | |
[feat] refactor events to use ZMQ
Before this commit, protobuf and protobuf.socketrpc were used to serialize and
transmit messages between events clients. This change implements a simpler ZMQ
client/server events mechanism that uses ZMQ sockets for transmitting messages
from clients to server and to redistribute such messages to subscribed
clients.
Closes: #6359
Diffstat (limited to 'src/leap/common/tests')
| -rw-r--r-- | src/leap/common/tests/test_events.py | 468 | 
1 files changed, 105 insertions, 363 deletions
| diff --git a/src/leap/common/tests/test_events.py b/src/leap/common/tests/test_events.py index 0779b2e..7ef3e1b 100644 --- a/src/leap/common/tests/test_events.py +++ b/src/leap/common/tests/test_events.py @@ -15,414 +15,156 @@  # You should have received a copy of the GNU General Public License  # along with this program. If not, see <http://www.gnu.org/licenses/>. -import unittest -import sets -import time -import socket -import threading -import random - - -from mock import Mock -from protobuf.socketrpc import RpcService -from leap.common import events -from leap.common.events import ( -    server, -    client, -    mac_auth, -) -from leap.common.events.events_pb2 import ( -    EventsServerService, -    EventsServerService_Stub, -    EventsClientService_Stub, -    EventResponse, -    SignalRequest, -    RegisterRequest, -    PingRequest, -    SOLEDAD_CREATING_KEYS, -    CLIENT_UID, -) +import os +import logging +import time -port = 8090 +from twisted.trial import unittest +from twisted.internet import defer -received = False +from leap.common.events import server +from leap.common.events import client +from leap.common.events import txclient +from leap.common.events import catalog +from leap.common.events.errors import CallbackAlreadyRegisteredError -class EventsTestCase(unittest.TestCase): +if 'DEBUG' in os.environ: +    logging.basicConfig(level=logging.DEBUG) -    @classmethod -    def setUpClass(cls): -        server.EventsServerDaemon.ensure(8090) -        cls.callbacks = events.client.registered_callbacks -    @classmethod -    def tearDownClass(cls): -        # give some time for requests to be processed. -        time.sleep(1) +class EventsGenericClientTestCase(object):      def setUp(self): -        super(EventsTestCase, self).setUp() +        self._server = server.ensure_server( +            emit_addr="tcp://127.0.0.1:0", +            reg_addr="tcp://127.0.0.1:0") +        self._client.configure_client( +            emit_addr="tcp://127.0.0.1:%d" % self._server.pull_port, +            reg_addr="tcp://127.0.0.1:%d" % self._server.pub_port)      def tearDown(self): -        #events.client.registered_callbacks = {} -        server.registered_clients = {} -        super(EventsTestCase, self).tearDown() - -    def test_service_singleton(self): -        """ -        Ensure that there's always just one instance of the server daemon -        running. -        """ -        service1 = server.EventsServerDaemon.ensure(8090) -        service2 = server.EventsServerDaemon.ensure(8090) -        self.assertEqual(service1, service2, -                         "Can't get singleton class for service.") +        self._client.shutdown() +        self._server.shutdown() +        # wait a bit for sockets to close properly +        time.sleep(0.1)      def test_client_register(self):          """          Ensure clients can register callbacks.          """ -        self.assertTrue(1 not in self.callbacks, -                        'There should should be no callback for this signal.') -        events.register(1, lambda x: True) -        self.assertTrue(1 in self.callbacks, -                        'Could not register signal in local client.') -        events.register(2, lambda x: True) -        self.assertTrue(1 in self.callbacks, -                        'Could not register signal in local client.') -        self.assertTrue(2 in self.callbacks, -                        'Could not register signal in local client.') +        callbacks = self._client.instance().callbacks +        self.assertTrue(len(callbacks) == 0, +                        'There should be no callback for this event.') +        # register one event +        event1 = catalog.CLIENT_UID +        cbk1 = lambda event, _: True +        uid1 = self._client.register(event1, cbk1) +        # assert for correct registration +        self.assertTrue(len(callbacks) == 1) +        self.assertTrue(callbacks[event1][uid1] == cbk1, +                        'Could not register event in local client.') +        # register another event +        event2 = catalog.CLIENT_SESSION_ID +        cbk2 = lambda event, _: True +        uid2 = self._client.register(event2, cbk2) +        # assert for correct registration +        self.assertTrue(len(callbacks) == 2) +        self.assertTrue(callbacks[event2][uid2] == cbk2, +                        'Could not register event in local client.')      def test_register_signal_replace(self):          """          Make sure clients can replace already registered callbacks.          """ -        sig = 3 -        cbk = lambda x: True -        events.register(sig, cbk, uid=1) -        self.assertRaises(Exception, events.register, sig, lambda x: True, -                          uid=1) -        events.register(sig, lambda x: True, uid=1, replace=True) -        self.assertTrue(sig in self.callbacks, 'Could not register signal.') -        self.assertEqual(1, len(self.callbacks[sig]), -                         'Wrong number of registered callbacks.') - -    def test_signal_response_status(self): -        """ -        Ensure there's an appropriate response from server when signaling. -        """ -        sig = 4 -        request = SignalRequest() -        request.event = sig -        request.content = 'my signal contents' -        request.mac_method = mac_auth.MacMethod.MAC_NONE -        request.mac = "" -        service = RpcService(EventsServerService_Stub, port, 'localhost') -        # test synch -        response = service.signal(request, timeout=1000) -        self.assertEqual(EventResponse.OK, response.status, -                         'Wrong response status.') +        event = catalog.CLIENT_UID +        d = defer.Deferred() +        cbk_fail = lambda event, _: d.errback(event) +        cbk_succeed = lambda event, _: d.callback(event) +        self._client.register(event, cbk_fail, uid=1) +        self._client.register(event, cbk_succeed, uid=1, replace=True) +        self._client.emit(event, None) +        return d -    def test_signal_executes_callback(self): -        """ -        Ensure callback is executed upon receiving signal. +    def test_register_signal_replace_fails_when_replace_is_false(self):          """ -        sig = CLIENT_UID -        request = SignalRequest() -        request.event = sig -        request.content = 'my signal contents' -        request.mac_method = mac_auth.MacMethod.MAC_NONE -        request.mac = "" -        service = RpcService(EventsServerService_Stub, port, 'localhost') - -        # register a callback -        flag = Mock() -        events.register(sig, lambda req: flag(req.event)) -        # signal -        response = service.signal(request) -        self.assertEqual(EventResponse.OK, response.status, -                         'Wrong response status.') -        time.sleep(1)  # wait for signal to arrive -        flag.assert_called_once_with(sig) - -    def test_events_server_service_register(self): +        Make sure clients trying to replace already registered callbacks fail +        when replace=False          """ -        Ensure the server can register clients to be signaled. -        """ -        sig = 5 -        request = RegisterRequest() -        request.event = sig -        request.port = 8091 -        request.mac_method = mac_auth.MacMethod.MAC_NONE -        request.mac = "" -        service = RpcService(EventsServerService_Stub, port, 'localhost') -        complist = server.registered_clients -        self.assertEqual({}, complist, -                         'There should be no registered_ports when ' -                         'server has just been created.') -        response = service.register(request, timeout=1000) -        self.assertTrue(sig in complist, "Signal not registered succesfully.") -        self.assertTrue(8091 in complist[sig], -                        'Failed registering client port.') +        event = catalog.CLIENT_UID +        self._client.register(event, lambda event, _: None, uid=1) +        self.assertRaises( +            CallbackAlreadyRegisteredError, +            self._client.register, +            event, lambda event, _: None, uid=1, replace=False) -    def test_client_request_register(self): +    def test_register_more_than_one_callback_works(self):          """ -        Ensure clients can register themselves with server. +        Make sure clients can replace already registered callbacks.          """ -        sig = 6 -        complist = server.registered_clients -        self.assertTrue(sig not in complist, -                        'There should be no registered clients for this ' -                        'signal.') -        events.register(sig, lambda x: True) -        time.sleep(0.1) -        port = client.EventsClientDaemon.get_instance().get_port() -        self.assertTrue(sig in complist, 'Failed registering client.') -        self.assertTrue(port in complist[sig], -                        'Failed registering client port.') +        event = catalog.CLIENT_UID +        d1 = defer.Deferred() +        cbk1 = lambda event, _: d1.callback(event) +        d2 = defer.Deferred() +        cbk2 = lambda event, _: d2.callback(event) +        self._client.register(event, cbk1) +        self._client.register(event, cbk2) +        self._client.emit(event, None) +        d = defer.gatherResults([d1, d2]) +        return d      def test_client_receives_signal(self):          """          Ensure clients can receive signals.          """ -        sig = 7 -        flag = Mock() - -        events.register(sig, lambda req: flag(req.event)) -        request = SignalRequest() -        request.event = sig -        request.content = "" -        request.mac_method = mac_auth.MacMethod.MAC_NONE -        request.mac = "" -        service = RpcService(EventsServerService_Stub, port, 'localhost') -        response = service.signal(request, timeout=1000) -        self.assertTrue(response is not None, 'Did not receive response.') -        time.sleep(0.5) -        flag.assert_called_once_with(sig) - -    def test_client_send_signal(self): -        """ -        Ensure clients can send signals. -        """ -        sig = 8 -        response = events.signal(sig) -        self.assertTrue(response.status == response.OK, -                        'Received wrong response status when signaling.') +        event = catalog.CLIENT_UID +        d = defer.Deferred() +        def cbk(events, _): +            d.callback(event) +        self._client.register(event, cbk) +        self._client.emit(event, None) +        return d      def test_client_unregister_all(self):          """          Test that the client can unregister all events for one signal.          """ -        sig = CLIENT_UID -        complist = server.registered_clients -        events.register(sig, lambda x: True) -        events.register(sig, lambda x: True) -        time.sleep(0.1) -        events.unregister(sig) -        time.sleep(0.1) -        port = client.EventsClientDaemon.get_instance().get_port() -        self.assertFalse(bool(complist[sig])) -        self.assertTrue(port not in complist[sig]) +        event1 = catalog.CLIENT_UID +        d = defer.Deferred() +        # register more than one callback for the same event +        self._client.register(event1, lambda ev, _: d.errback(None)) +        self._client.register(event1, lambda ev, _: d.errback(None)) +        # unregister and emit the event +        self._client.unregister(event1) +        self._client.emit(event1, None) +        # register and emit another event so the deferred can succeed +        event2 = catalog.CLIENT_SESSION_ID +        self._client.register(event2, lambda ev, _: d.callback(None)) +        self._client.emit(event2, None) +        return d      def test_client_unregister_by_uid(self):          """          Test that the client can unregister an event by uid.          """ -        sig = CLIENT_UID -        complist = server.registered_clients -        events.register(sig, lambda x: True, uid='cbkuid') -        events.register(sig, lambda x: True, uid='cbkuid2') -        time.sleep(0.1) -        events.unregister(sig, uid='cbkuid') -        time.sleep(0.1) -        port = client.EventsClientDaemon.get_instance().get_port() -        self.assertTrue(sig in complist) -        self.assertTrue(len(complist[sig]) == 1) -        self.assertTrue( -            client.registered_callbacks[sig].pop()[0] == 'cbkuid2') -        self.assertTrue(port in complist[sig]) - -    def test_server_replies_ping(self): -        """ -        Ensure server replies to a ping. -        """ -        request = PingRequest() -        service = RpcService(EventsServerService_Stub, port, 'localhost') -        response = service.ping(request, timeout=1000) -        self.assertIsNotNone(response) -        self.assertEqual(EventResponse.OK, response.status, -                         'Wrong response status.') - -    def test_client_replies_ping(self): -        """ -        Ensure clients reply to a ping. -        """ -        daemon = client.ensure_client_daemon() -        port = daemon.get_port() -        request = PingRequest() -        service = RpcService(EventsClientService_Stub, port, 'localhost') -        response = service.ping(request, timeout=1000) -        self.assertEqual(EventResponse.OK, response.status, -                         'Wrong response status.') - -    def test_server_ping(self): -        """ -        Ensure the function from server module pings correctly. -        """ -        response = server.ping() -        self.assertIsNotNone(response) -        self.assertEqual(EventResponse.OK, response.status, -                         'Wrong response status.') - -    def test_client_ping(self): -        """ -        Ensure the function from client module pings correctly. -        """ -        daemon = client.ensure_client_daemon() -        response = client.ping(daemon.get_port()) -        self.assertIsNotNone(response) -        self.assertEqual(EventResponse.OK, response.status, -                         'Wrong response status.') - -    def test_module_ping_server(self): -        """ -        Ensure the function from main module pings server correctly. -        """ -        response = events.ping_server() -        self.assertIsNotNone(response) -        self.assertEqual(EventResponse.OK, response.status, -                         'Wrong response status.') - -    def test_module_ping_client(self): -        """ -        Ensure the function from main module pings clients correctly. -        """ -        daemon = client.ensure_client_daemon() -        response = events.ping_client(daemon.get_port()) -        self.assertIsNotNone(response) -        self.assertEqual(EventResponse.OK, response.status, -                         'Wrong response status.') - -    def test_ensure_server_raises_if_port_taken(self): -        """ -        Verify that server raises an exception if port is already taken. -        """ -        # get a random free port -        while True: -            port = random.randint(1024, 65535) -            try: -                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -                s.connect(('localhost', port)) -                s.close() -            except: -                break - -        class PortBlocker(threading.Thread): - -            def run(self): -                conns = 0 -                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -                s.bind(('localhost', port)) -                s.setblocking(1) -                s.listen(1) -                while conns < 2:  # blocks until rece -                    conns += 1 -                    s.accept() -                s.close() - -        # block the port -        taker = PortBlocker() -        taker.start() -        time.sleep(1)  # wait for thread to start. -        self.assertRaises( -            server.PortAlreadyTaken, server.ensure_server, port) - -    def test_async_register(self): -        """ -        Test asynchronous registering of callbacks. -        """ -        flag = Mock() +        event = catalog.CLIENT_UID +        d = defer.Deferred() +        # register one callback that would fail +        uid = self._client.register(event, lambda ev, _: d.errback(None)) +        # register one callback that will succeed +        self._client.register(event, lambda ev, _: d.callback(None)) +        # unregister by uid and emit the event +        self._client.unregister(event, uid=uid) +        self._client.emit(event, None) +        return d -        # executed after async register, when response is received from server -        def reqcbk(request, response): -            flag(request.event, response.status) -        # callback registered by application -        def callback(request): -            pass +class EventsTxClientTestCase(EventsGenericClientTestCase, unittest.TestCase): -        # passing a callback as reqcbk param makes the call asynchronous -        result = events.register(CLIENT_UID, callback, reqcbk=reqcbk) -        self.assertIsNone(result) -        events.signal(CLIENT_UID) -        time.sleep(1)  # wait for signal to arrive from server -        flag.assert_called_once_with(CLIENT_UID, EventResponse.OK) +    _client = txclient -    def test_async_signal(self): -        """ -        Test asynchronous registering of callbacks. -        """ -        flag = Mock() - -        # executed after async signal, when response is received from server -        def reqcbk(request, response): -            flag(request.event, response.status) - -        # passing a callback as reqcbk param makes the call asynchronous -        result = events.signal(CLIENT_UID, reqcbk=reqcbk) -        self.assertIsNone(result) -        time.sleep(1)  # wait for signal to arrive from server -        flag.assert_called_once_with(CLIENT_UID, EventResponse.OK) - -    def test_async_unregister(self): -        """ -        Test asynchronous unregistering of callbacks. -        """ -        flag = Mock() - -        # executed after async signal, when response is received from server -        def reqcbk(request, response): -            flag(request.event, response.status) - -        # callback registered by application -        def callback(request): -            pass - -        # passing a callback as reqcbk param makes the call asynchronous -        events.register(CLIENT_UID, callback) -        result = events.unregister(CLIENT_UID, reqcbk=reqcbk) -        self.assertIsNone(result) -        time.sleep(1)  # wait for signal to arrive from server -        flag.assert_called_once_with(CLIENT_UID, EventResponse.OK) - -    def test_async_ping_server(self): -        """ -        Test asynchronous pinging of server. -        """ -        flag = Mock() - -        # executed after async signal, when response is received from server -        def reqcbk(request, response): -            flag(response.status) - -        result = events.ping_server(reqcbk=reqcbk) -        self.assertIsNone(result) -        time.sleep(1)  # wait for response to arrive from server. -        flag.assert_called_once_with(EventResponse.OK) - -    def test_async_ping_client(self): -        """ -        Test asynchronous pinging of client. -        """ -        flag = Mock() -        # executed after async signal, when response is received from server -        def reqcbk(request, response): -            flag(response.status) +class EventsClientTestCase(EventsGenericClientTestCase, unittest.TestCase): -        daemon = client.ensure_client_daemon() -        result = events.ping_client(daemon.get_port(), reqcbk=reqcbk) -        self.assertIsNone(result) -        time.sleep(1)  # wait for response to arrive from server. -        flag.assert_called_once_with(EventResponse.OK) +    _client = client | 
