From 514c1434a016b09d93e8dfc5578b14825d14005a Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 4 Feb 2015 15:04:10 -0200 Subject: [feat] refactor events to use ZMQ Before this commit, protobuf and protobuf.socketrpc were used to serialize and transmit messages between events clients. This change implements a simpler ZMQ client/server events mechanism that uses ZMQ sockets for transmitting messages from clients to server and to redistribute such messages to subscribed clients. Closes: #6359 --- src/leap/common/events/server.py | 242 ++++++++------------------------------- 1 file changed, 49 insertions(+), 193 deletions(-) (limited to 'src/leap/common/events/server.py') diff --git a/src/leap/common/events/server.py b/src/leap/common/events/server.py index 41aede3..a69202e 100644 --- a/src/leap/common/events/server.py +++ b/src/leap/common/events/server.py @@ -14,223 +14,79 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . -""" -A server for the events mechanism. - -A server can receive different kinds of requests from clients: - 1. Registration request: store client port number to be notified when - a specific signal arrives. - 2. Signal request: redistribute the signal to registered clients. """ -import logging -import socket +The server for the events mechanism. +""" -from protobuf.socketrpc import RpcService -from leap.common.events import ( - events_pb2 as proto, - daemon, -) +import logging +import txzmq +from leap.common.zmq_utils import zmq_has_curve -logger = logging.getLogger(__name__) +from leap.common.events.zmq_components import TxZmqServerComponent -SERVER_PORT = 8090 +if zmq_has_curve(): + EMIT_ADDR = "tcp://127.0.0.1:9000" + REG_ADDR = "tcp://127.0.0.1:9001" +else: + EMIT_ADDR = "ipc:///tmp/leap.common.events.socket.0" + REG_ADDR = "ipc:///tmp/leap.common.events.socket.1" -# the `registered_clients` dictionary below should have the following -# format: -# -# { event_signal: [ port, ... ], ... } -# -registered_clients = {} - -class PortAlreadyTaken(Exception): - """ - Raised when trying to open a server in a port that is already taken. - """ - pass +logger = logging.getLogger(__name__) -def ensure_server(port=SERVER_PORT): +def ensure_server(emit_addr=EMIT_ADDR, reg_addr=REG_ADDR): """ - Make sure the server is running on the given port. + Make sure the server is running in the given addresses. - Attempt to connect to given local port. Upon success, assume that the - events server has already been started. Upon failure, start events server. + :param emit_addr: The address in which to receive events from clients. + :type emit_addr: str + :param reg_addr: The address to which publish events to clients. + :type reg_addr: str - :param port: the port in which server should be listening - :type port: int - - :return: the daemon instance or nothing - :rtype: EventsServerDaemon or None - - :raise PortAlreadyTaken: Raised if C{port} is already taken by something - that is not an events server. - """ - try: - # check if port is available - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.connect(('localhost', port)) - s.close() - # port is taken, check if there's a server running there - response = ping(port=port, timeout=1000) - if response is not None and response.status == proto.EventResponse.OK: - logger.info('A server is already running on port %d.', port) - return - # port is taken, and not by an events server - logger.warning( - 'Port %d is taken by something not an events server.', port) - raise PortAlreadyTaken(port) - except socket.error: - # port is available, run a server - logger.info('Launching server on port %d.', port) - return EventsServerDaemon.ensure(port) - - -def ping(port=SERVER_PORT, reqcbk=None, timeout=1000): + :return: an events server instance + :rtype: EventsServer """ - Ping the server. - - :param port: the port in which server should be listening - :type port: int - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(proto.PingRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - :return: the response from server for synch calls or nothing for asynch - calls. - :rtype: leap.common.events.events_pb2.EventsResponse or None - """ - request = proto.PingRequest() - service = RpcService( - proto.EventsServerService_Stub, - port, - 'localhost') - logger.debug("Pinging server in port %d..." % port) - return service.ping(request, callback=reqcbk, timeout=timeout) + _server = EventsServer(emit_addr, reg_addr) + return _server -class EventsServerService(proto.EventsServerService): +class EventsServer(TxZmqServerComponent): """ - Service for receiving events in clients. + An events server that listens for events in one address and publishes those + events in another address. """ - def register(self, controller, request, done): - """ - Register a client port to be signaled when specific events come in. - - :param controller: used to mediate a single method call - :type controller: protobuf.socketrpc.controller.SocketRpcController - :param request: the request received from the client - :type request: leap.common.events.events_pb2.RegisterRequest - :param done: callback to be called when done - :type done: protobuf.socketrpc.server.Callback - """ - logger.info("Received registration request: %s..." % str(request)[:40]) - # add client port to signal list - if request.event not in registered_clients: - registered_clients[request.event] = set([]) - registered_clients[request.event].add(request.port) - # send response back to client - - logger.debug('sending response back') - response = proto.EventResponse() - response.status = proto.EventResponse.OK - done.run(response) - - def unregister(self, controller, request, done): - """ - Unregister a client port so it will not be signaled when specific - events come in. - - :param controller: used to mediate a single method call - :type controller: protobuf.socketrpc.controller.SocketRpcController - :param request: the request received from the client - :type request: leap.common.events.events_pb2.RegisterRequest - :param done: callback to be called when done - :type done: protobuf.socketrpc.server.Callback - """ - logger.info( - "Received unregistration request: %s..." % str(request)[:40]) - # remove client port from signal list - response = proto.EventResponse() - if request.event in registered_clients: - try: - registered_clients[request.event].remove(request.port) - response.status = proto.EventResponse.OK - except KeyError: - response.status = proto.EventsResponse.ERROR - response.result = 'Port %d not registered.' % request.port - # send response back to client - logger.debug('sending response back') - done.run(response) - - def signal(self, controller, request, done): - """ - Perform an RPC call to signal all clients registered to receive a - specific signal. - - :param controller: used to mediate a single method call - :type controller: protobuf.socketrpc.controller.SocketRpcController - :param request: the request received from the client - :type request: leap.common.events.events_pb2.SignalRequest - :param done: callback to be called when done - :type done: protobuf.socketrpc.server.Callback - """ - logger.debug('Received signal from client: %s...', str(request)[:40]) - # send signal to all registered clients - # TODO: verify signal auth - if request.event in registered_clients: - for port in registered_clients[request.event]: - - def callback(req, resp): - logger.debug("Signal received by " + str(port)) - - service = RpcService(proto.EventsClientService_Stub, - port, 'localhost') - service.signal(request, callback=callback) - # send response back to client - response = proto.EventResponse() - response.status = proto.EventResponse.OK - done.run(response) - - def ping(self, controller, request, done): + def __init__(self, emit_addr, reg_addr): """ - Reply to a ping request. - - :param controller: used to mediate a single method call - :type controller: protobuf.socketrpc.controller.SocketRpcController - :param request: the request received from the client - :type request: leap.common.events.events_pb2.RegisterRequest - :param done: callback to be called when done - :type done: protobuf.socketrpc.server.Callback - """ - logger.debug("Received ping request, sending response.") - response = proto.EventResponse() - response.status = proto.EventResponse.OK - done.run(response) - + Initialize the events server. -class EventsServerDaemon(daemon.EventsSingletonDaemon): - """ - Singleton class for starting an events server daemon. - """ - - @classmethod - def ensure(cls, port): + :param emit_addr: The address in which to receive events from clients. + :type emit_addr: str + :param reg_addr: The address to which publish events to clients. + :type reg_addr: str """ - Make sure the daemon is running on the given port. - - :param port: the port in which the daemon should listen - :type port: int + TxZmqServerComponent.__init__(self) + # bind PULL and PUB sockets + self._pull, self.pull_port = self._zmq_bind( + txzmq.ZmqPullConnection, emit_addr) + self._pub, self.pub_port = self._zmq_bind( + txzmq.ZmqPubConnection, reg_addr) + # set a handler for arriving messages + self._pull.onPull = self._onPull + + def _onPull(self, message): + """ + Callback executed when a message is pulled from a client. - :return: a daemon instance - :rtype: EventsServerDaemon + :param message: The message sent by the client. + :type message: str """ - return cls.ensure_service(port, EventsServerService()) + event, content = message[0].split(b"\0", 1) + logger.debug("Publishing event: %s" % event) + self._pub.publish(content, tag=event) -- cgit v1.2.3