From 1f1412f3c31dfba10135ceae4641313ee48318c8 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 24 Jul 2013 14:01:30 -0300 Subject: Refactor events so components become clients. --- src/leap/common/events/server.py | 54 ++++++++++++++++++++-------------------- 1 file changed, 27 insertions(+), 27 deletions(-) (limited to 'src/leap/common/events/server.py') diff --git a/src/leap/common/events/server.py b/src/leap/common/events/server.py index d53c218..8a0d4e5 100644 --- a/src/leap/common/events/server.py +++ b/src/leap/common/events/server.py @@ -17,12 +17,12 @@ """ A server for the events mechanism. -A server can receive different kinds of requests from components: +A server can receive different kinds of requests from clients: - 1. Registration request: store component port number to be notified when + 1. Registration request: store client port number to be notified when a specific signal arrives. - 2. Signal request: redistribute the signal to registered components. + 2. Signal request: redistribute the signal to registered clients. """ import logging import socket @@ -40,12 +40,12 @@ logger = logging.getLogger(__name__) SERVER_PORT = 8090 -# the `registered_components` dictionary below should have the following +# the `registered_clients` dictionary below should have the following # format: # # { event_signal: [ port, ... ], ... } # -registered_components = {} +registered_clients = {} def ensure_server(port=SERVER_PORT): @@ -74,26 +74,26 @@ def ensure_server(port=SERVER_PORT): class EventsServerService(proto.EventsServerService): """ - Service for receiving events in components. + Service for receiving events in clients. """ def register(self, controller, request, done): """ - Register a component port to be signaled when specific events come in. + Register a client port to be signaled when specific events come in. :param controller: used to mediate a single method call :type controller: protobuf.socketrpc.controller.SocketRpcController - :param request: the request received from the component + :param request: the request received from the client :type request: leap.common.events.events_pb2.RegisterRequest :param done: callback to be called when done :type done: protobuf.socketrpc.server.Callback """ logger.info("Received registration request: %s..." % str(request)[:40]) - # add component port to signal list - if request.event not in registered_components: - registered_components[request.event] = set([]) - registered_components[request.event].add(request.port) - # send response back to component + # add client port to signal list + if request.event not in registered_clients: + registered_clients[request.event] = set([]) + registered_clients[request.event].add(request.port) + # send response back to client logger.debug('sending response back') response = proto.EventResponse() @@ -102,56 +102,56 @@ class EventsServerService(proto.EventsServerService): def unregister(self, controller, request, done): """ - Unregister a component port so it will not be signaled when specific + Unregister a client port so it will not be signaled when specific events come in. :param controller: used to mediate a single method call :type controller: protobuf.socketrpc.controller.SocketRpcController - :param request: the request received from the component + :param request: the request received from the client :type request: leap.common.events.events_pb2.RegisterRequest :param done: callback to be called when done :type done: protobuf.socketrpc.server.Callback """ logger.info( "Received unregistration request: %s..." % str(request)[:40]) - # remove component port from signal list + # remove client port from signal list response = proto.EventResponse() - if request.event in registered_components: + if request.event in registered_clients: try: - registered_components[request.event].remove(request.port) + registered_clients[request.event].remove(request.port) response.status = proto.EventResponse.OK except KeyError: response.status = proto.EventsResponse.ERROR response.result = 'Port %d not registered.' % request.port - # send response back to component + # send response back to client logger.debug('sending response back') done.run(response) def signal(self, controller, request, done): """ - Perform an RPC call to signal all components registered to receive a + Perform an RPC call to signal all clients registered to receive a specific signal. :param controller: used to mediate a single method call :type controller: protobuf.socketrpc.controller.SocketRpcController - :param request: the request received from the component + :param request: the request received from the client :type request: leap.common.events.events_pb2.SignalRequest :param done: callback to be called when done :type done: protobuf.socketrpc.server.Callback """ - logger.info('Received signal from component: %s...', str(request)[:40]) - # send signal to all registered components + logger.info('Received signal from client: %s...', str(request)[:40]) + # send signal to all registered clients # TODO: verify signal auth - if request.event in registered_components: - for port in registered_components[request.event]: + if request.event in registered_clients: + for port in registered_clients[request.event]: def callback(req, resp): logger.info("Signal received by " + str(port)) - service = RpcService(proto.EventsComponentService_Stub, + service = RpcService(proto.EventsClientService_Stub, port, 'localhost') service.signal(request, callback=callback) - # send response back to component + # send response back to client response = proto.EventResponse() response.status = proto.EventResponse.OK done.run(response) -- cgit v1.2.3 From b7c74e4f293d0e611ea038e04022fbe700a8cb42 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 24 Jul 2013 16:26:52 -0300 Subject: Trying to init events server raises when given port is not free. * Also fix and improve some tests. --- src/leap/common/events/server.py | 60 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 58 insertions(+), 2 deletions(-) (limited to 'src/leap/common/events/server.py') diff --git a/src/leap/common/events/server.py b/src/leap/common/events/server.py index 8a0d4e5..daccc61 100644 --- a/src/leap/common/events/server.py +++ b/src/leap/common/events/server.py @@ -48,6 +48,13 @@ SERVER_PORT = 8090 registered_clients = {} +class PortAlreadyTaken(Exception): + """ + Raised when trying to open a server in a port that is already taken. + """ + pass + + def ensure_server(port=SERVER_PORT): """ Make sure the server is running on the given port. @@ -60,18 +67,51 @@ def ensure_server(port=SERVER_PORT): :return: the daemon instance or nothing :rtype: EventsServerDaemon or None + + :raise PortAlreadyTaken: Raised if C{port} is already taken by something + that is not an events server. """ try: + # check if port is available s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect(('localhost', port)) s.close() - logger.info('Server is already running on port %d.', port) - return None + # port is taken, check if there's a server running there + response = ping(port) + if response is not None and response.status == proto.EventResponse.OK: + logger.info('A server is already running on port %d.', port) + return None + # port is taken, and not by an events server + logger.info('Port %d is taken by something not an events server.', port) + raise PortAlreadyTaken(port) except socket.error: + # port is available, run a server logger.info('Launching server on port %d.', port) return EventsServerDaemon.ensure(port) +def ping(port=SERVER_PORT, reqcbk=None, timeout=1000): + """ + Ping the server. + + :param port: the port in which server should be listening + :type port: int + :param reqcbk: a callback to be called when a response from server is + received + :type reqcbk: function + callback(leap.common.events.events_pb2.EventResponse) + :param timeout: the timeout for synch calls + :type timeout: int + """ + request = proto.PingRequest() + service = RpcService( + proto.EventsServerService_Stub, + port, + 'localhost') + logger.info("Pinging server in port %d..." % port) + return service.ping(request, callback=reqcbk, timeout=timeout) + + class EventsServerService(proto.EventsServerService): """ Service for receiving events in clients. @@ -156,6 +196,22 @@ class EventsServerService(proto.EventsServerService): response.status = proto.EventResponse.OK done.run(response) + def ping(self, controller, request, done): + """ + Reply to a ping request. + + :param controller: used to mediate a single method call + :type controller: protobuf.socketrpc.controller.SocketRpcController + :param request: the request received from the client + :type request: leap.common.events.events_pb2.RegisterRequest + :param done: callback to be called when done + :type done: protobuf.socketrpc.server.Callback + """ + logger.info("Received ping request, sending response.") + response = proto.EventResponse() + response.status = proto.EventResponse.OK + done.run(response) + class EventsServerDaemon(daemon.EventsSingletonDaemon): """ -- cgit v1.2.3 From e7fa419f13e0afbb3f2653a4b0d8330dd45bbfd0 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 24 Jul 2013 17:51:03 -0300 Subject: Add doc and tests about events sync/async behaviour. * Also fix docstrings identation so sphynx doesn't complain. --- src/leap/common/events/server.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) (limited to 'src/leap/common/events/server.py') diff --git a/src/leap/common/events/server.py b/src/leap/common/events/server.py index daccc61..a7d4da9 100644 --- a/src/leap/common/events/server.py +++ b/src/leap/common/events/server.py @@ -69,7 +69,7 @@ def ensure_server(port=SERVER_PORT): :rtype: EventsServerDaemon or None :raise PortAlreadyTaken: Raised if C{port} is already taken by something - that is not an events server. + that is not an events server. """ try: # check if port is available @@ -97,11 +97,14 @@ def ping(port=SERVER_PORT, reqcbk=None, timeout=1000): :param port: the port in which server should be listening :type port: int :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function - callback(leap.common.events.events_pb2.EventResponse) + received + :type reqcbk: function(leap.common.events.events_pb2.EventResponse) :param timeout: the timeout for synch calls :type timeout: int + + :return: the response from server for synch calls or nothing for asynch + calls. + :rtype: leap.common.events.events_pb2.EventsResponse or None """ request = proto.PingRequest() service = RpcService( -- cgit v1.2.3