From 514c1434a016b09d93e8dfc5578b14825d14005a Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 4 Feb 2015 15:04:10 -0200 Subject: [feat] refactor events to use ZMQ Before this commit, protobuf and protobuf.socketrpc were used to serialize and transmit messages between events clients. This change implements a simpler ZMQ client/server events mechanism that uses ZMQ sockets for transmitting messages from clients to server and to redistribute such messages to subscribed clients. Closes: #6359 --- src/leap/common/events/client.py | 712 +++++++++++++++++++++++++-------------- 1 file changed, 453 insertions(+), 259 deletions(-) (limited to 'src/leap/common/events/client.py') diff --git a/src/leap/common/events/client.py b/src/leap/common/events/client.py index 83f18e0..6b234a1 100644 --- a/src/leap/common/events/client.py +++ b/src/leap/common/events/client.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # client.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013, 2014, 2015 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -14,6 +14,8 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . + + """ The client end point of the events mechanism. @@ -21,309 +23,501 @@ Clients are the communicating parties of the events mechanism. They communicate by sending messages to a server, which in turn redistributes messages to other clients. -When a client registers a callback for a given signal, it also tells the -server that it wants to be notified whenever signals of that type are sent by +When a client registers a callback for a given event, it also tells the +server that it wants to be notified whenever events of that type are sent by some other client. """ -import logging +import logging +import collections +import uuid +import threading +import time +import pickle +import os + +from abc import ABCMeta +from abc import abstractmethod + +import zmq +from zmq.eventloop import zmqstream +from zmq.eventloop import ioloop + +# XXX some distros don't package libsodium, so we have to be prepared for +# absence of zmq.auth +try: + import zmq.auth +except ImportError: + pass -from protobuf.socketrpc import RpcService +from leap.common.config import get_path_prefix +from leap.common.zmq_utils import zmq_has_curve +from leap.common.zmq_utils import maybe_create_and_get_certificates +from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX -from leap.common.events import events_pb2 as proto -from leap.common.events import server -from leap.common.events import daemon -from leap.common.events import mac_auth +from leap.common.events.errors import CallbackAlreadyRegisteredError +from leap.common.events.server import EMIT_ADDR +from leap.common.events.server import REG_ADDR +from leap.common.events import catalog logger = logging.getLogger(__name__) -# the `registered_callbacks` dictionary below should have the following -# format: -# -# { event_signal: [ (uid, callback), ... ], ... } -# -registered_callbacks = {} +_emit_addr = EMIT_ADDR +_reg_addr = REG_ADDR + +def configure_client(emit_addr, reg_addr): + global _emit_addr, _reg_addr + logger.debug("Configuring client with addresses: (%s, %s)" % + (emit_addr, reg_addr)) + _emit_addr = emit_addr + _reg_addr = reg_addr -class CallbackAlreadyRegistered(Exception): + +class EventsClient(object): """ - Raised when trying to register an already registered callback. + A singleton client for the events mechanism. """ - pass + __metaclass__ = ABCMeta -def ensure_client_daemon(): - """ - Ensure the client daemon is running and listening for incoming - messages. + _instance = None + _instance_lock = threading.Lock() - :return: the daemon instance - :rtype: EventsClientDaemon - """ - import time - daemon = EventsClientDaemon.ensure(0) - logger.debug('ensure client daemon') + def __init__(self, emit_addr, reg_addr): + """ + Initialize the events client. + """ + logger.debug("Creating client instance.") + self._callbacks = collections.defaultdict(dict) + self._emit_addr = emit_addr + self._reg_addr = reg_addr + + @property + def callbacks(self): + return self._callbacks - # Because we use a random port we want to wait until a port is assigned to - # local client daemon. + @classmethod + def instance(cls): + """ + Return a singleton EventsClient instance. + """ + with cls._instance_lock: + if cls._instance is None: + cls._instance = cls(_emit_addr, _reg_addr) + return cls._instance - while not (EventsClientDaemon.get_instance() and - EventsClientDaemon.get_instance().get_port()): - time.sleep(0.1) - return daemon + def register(self, event, callback, uid=None, replace=False): + """ + Register a callback to be executed when an event is received. + + :param event: The event that triggers the callback. + :type event: Event + :param callback: The callback to be executed. + :type callback: callable(event, *content) + :param uid: The callback uid. + :type uid: str + :param replace: Wether an eventual callback with same ID should be + replaced. + :type replace: bool + + :return: The callback uid. + :rtype: str + + :raises CallbackAlreadyRegisteredError: when there's already a callback + identified by the given uid and replace is False. + """ + logger.debug("Subscribing to event: %s" % event) + if not uid: + uid = uuid.uuid4() + elif uid in self._callbacks[event] and not replace: + raise CallbackAlreadyRegisteredError() + self._callbacks[event][uid] = callback + self._subscribe(str(event)) + return uid + + def unregister(self, event, uid=None): + """ + Unregister callbacks for an event. + If uid is not None, then only the callback identified by the given uid + is removed. Otherwise, all callbacks for the event are removed. -def register(signal, callback, uid=None, replace=False, reqcbk=None, - timeout=1000): - """ - Registers a callback to be called when a specific signal event is - received. - - Will timeout after timeout ms if response has not been received. The - timeout arg is only used for asynch requests. If a reqcbk callback has - been supplied the timeout arg is not used. The response value will be - returned for a synch request but nothing will be returned for an asynch - request. - - :param signal: the signal that causes the callback to be launched - :type signal: int (see the `events.proto` file) - :param callback: the callback to be called when the signal is received - :type callback: function(leap.common.events.events_pb2.SignalRequest) - :param uid: a unique id for the callback - :type uid: int - :param replace: should an existent callback with same uid be replaced? - :type replace: bool - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(proto.RegisterRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - Might raise a CallbackAlreadyRegistered exception if there's already a - callback identified by the given uid and replace is False. - - :return: the response from server for synch calls or nothing for asynch - calls. - :rtype: leap.common.events.events_pb2.EventsResponse or None - """ - ensure_client_daemon() # so we can receive registered signals - # register callback locally - if signal not in registered_callbacks: - registered_callbacks[signal] = [] - cbklist = registered_callbacks[signal] - - # TODO should check that the callback has the right - # number of arguments. - - if uid and filter(lambda (x, y): x == uid, cbklist): - if not replace: - raise CallbackAlreadyRegistered() + :param event: The event that triggers the callback. + :type event: Event + :param uid: The callback uid. + :type uid: str + """ + if not uid: + logger.debug( + "Unregistering all callbacks from event %s." % event) + self._callbacks[event] = {} else: - registered_callbacks[signal] = filter(lambda(x, y): x != uid, - cbklist) - registered_callbacks[signal].append((uid, callback)) - # register callback on server - request = proto.RegisterRequest() - request.event = signal - request.port = EventsClientDaemon.get_instance().get_port() - request.mac_method = mac_auth.MacMethod.MAC_NONE - request.mac = "" - service = RpcService(proto.EventsServerService_Stub, - server.SERVER_PORT, 'localhost') - logger.debug( - "Sending registration request to server on port %s: %s", - server.SERVER_PORT, - str(request)[:40]) - return service.register(request, callback=reqcbk, timeout=timeout) - - -def unregister(signal, uid=None, reqcbk=None, timeout=1000): - """ - Unregister a callback. - - If C{uid} is specified, unregisters only the callback identified by that - unique id. Otherwise, unregisters all callbacks - - :param signal: the signal that causes the callback to be launched - :type signal: int (see the `events.proto` file) - :param uid: a unique id for the callback - :type uid: int - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(proto.UnregisterRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - :return: the response from server for synch calls or nothing for asynch - calls or None if no callback is registered for that signal or - uid. - :rtype: leap.common.events.events_pb2.EventsResponse or None - """ - if signal not in registered_callbacks or not registered_callbacks[signal]: - logger.warning("No callback registered for signal %d." % signal) - return None - # unregister callback locally - cbklist = registered_callbacks[signal] - if uid is not None: - if filter(lambda (cbkuid, _): cbkuid == uid, cbklist) == []: - logger.warning("No callback registered for uid %d." % st) - return None - registered_callbacks[signal] = filter(lambda(x, y): x != uid, cbklist) - else: - # exclude all callbacks for given signal - registered_callbacks[signal] = [] - # unregister port in server if there are no more callbacks for this signal - if not registered_callbacks[signal]: - request = proto.UnregisterRequest() - request.event = signal - request.port = EventsClientDaemon.get_instance().get_port() - request.mac_method = mac_auth.MacMethod.MAC_NONE - request.mac = "" - service = RpcService(proto.EventsServerService_Stub, - server.SERVER_PORT, 'localhost') - logger.info( - "Sending unregistration request to server on port %s: %s", - server.SERVER_PORT, - str(request)[:40]) - return service.unregister(request, callback=reqcbk, timeout=timeout) - - -def signal(signal, content="", mac_method="", mac="", reqcbk=None, - timeout=1000): - """ - Send `signal` event to events server. - - Will timeout after timeout ms if response has not been received. The - timeout arg is only used for asynch requests. If a reqcbk callback has - been supplied the timeout arg is not used. The response value will be - returned for a synch request but nothing will be returned for an asynch - request. - - :param signal: the signal that causes the callback to be launched - :type signal: int (see the `events.proto` file) - :param content: the contents of the event signal - :type content: str - :param mac_method: the method used for auth mac - :type mac_method: str - :param mac: the content of the auth mac - :type mac: str - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(proto.SignalRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - :return: the response from server for synch calls or nothing for asynch - calls. - :rtype: leap.common.events.events_pb2.EventsResponse or None - """ - request = proto.SignalRequest() - request.event = signal - request.content = content - request.mac_method = mac_method - request.mac = mac - service = RpcService(proto.EventsServerService_Stub, server.SERVER_PORT, - 'localhost') - logger.debug("Sending signal to server: %s", str(request)[:40]) - return service.signal(request, callback=reqcbk, timeout=timeout) - - -def ping(port, reqcbk=None, timeout=1000): + logger.debug( + "Unregistering callback %s from event %s." % (uid, event)) + if uid in self._callbacks[event]: + del self._callbacks[event][uid] + if not self._callbacks[event]: + del self._callbacks[event] + self._unsubscribe(str(event)) + + def emit(self, event, *content): + """ + Send an event. + + :param event: The event to be sent. + :type event: Event + :param content: The content of the event. + :type content: list + """ + logger.debug("Sending event: (%s, %s)" % (event, content)) + self._send(str(event) + b'\0' + pickle.dumps(content)) + + def _handle_event(self, event, content): + """ + Handle an incoming event. + + :param msg: The incoming message. + :type msg: list(str) + """ + logger.debug("Handling event %s..." % event) + for uid in self._callbacks[event]: + callback = self._callbacks[event][uid] + logger.debug("Executing callback %s." % uid) + callback(event, *content) + + @abstractmethod + def _subscribe(self, tag): + """ + Subscribe to a tag on the zmq SUB socket. + + :param tag: The tag to be subscribed. + :type tag: str + """ + pass + + @abstractmethod + def _unsubscribe(self, tag): + """ + Unsubscribe from a tag on the zmq SUB socket. + + :param tag: The tag to be unsubscribed. + :type tag: str + """ + pass + + @abstractmethod + def _send(self, data): + """ + Send data through PUSH socket. + + :param data: The data to be sent. + :type event: str + """ + pass + + def shutdown(self): + self.__class__.reset() + + @classmethod + def reset(cls): + with cls._instance_lock: + cls._instance = None + + +class EventsIOLoop(ioloop.ZMQIOLoop): """ - Ping a client running in C{port}. - - :param port: the port in which the client should be listening - :type port: int - :param reqcbk: a callback to be called when a response from client is - received - :type reqcbk: function(proto.PingRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - :return: the response from client for synch calls or nothing for asynch - calls. - :rtype: leap.common.events.events_pb2.EventsResponse or None + An extension of zmq's ioloop that can wait until there are no callbacks + in the queue before stopping. """ - request = proto.PingRequest() - service = RpcService( - proto.EventsClientService_Stub, - port, - 'localhost') - logger.debug("Pinging a client in port %d..." % port) - return service.ping(request, callback=reqcbk, timeout=timeout) + def stop(self, wait=False): + """ + Stop the I/O loop. -class EventsClientService(proto.EventsClientService): + :param wait: Whether we should wait for callbacks in queue to finish + before stopping. + :type wait: bool + """ + if wait: + # prevent new callbacks from being added + with self._callback_lock: + self._closing = True + # wait until all callbacks have been executed + while self._callbacks: + time.sleep(0.1) + ioloop.ZMQIOLoop.stop(self) + + +class EventsClientThread(threading.Thread, EventsClient): """ - Service for receiving signal events in clients. + A threaded version of the events client. """ - def __init__(self): - proto.EventsClientService.__init__(self) + def __init__(self, emit_addr, reg_addr): + """ + Initialize the events client. + """ + threading.Thread.__init__(self) + EventsClient.__init__(self, emit_addr, reg_addr) + self._lock = threading.Lock() + self._initialized = threading.Event() + self._config_prefix = os.path.join( + get_path_prefix(), "leap", "events") + self._loop = None + self._context = None + self._push = None + self._sub = None + + def _init_zmq(self): + """ + Initialize ZMQ connections. + """ + self._loop = EventsIOLoop() + self._context = zmq.Context() + # connect SUB first, otherwise we might miss some event sent from this + # same client + self._sub = self._zmq_connect_sub() + self._push = self._zmq_connect_push() + + def _zmq_connect(self, socktype, address): + """ + Connect to an address using with a zmq socktype. + + :param socktype: The ZMQ socket type. + :type socktype: int + :param address: The address to connect to. + :type address: str - def signal(self, controller, request, done): + :return: A ZMQ connection stream. + :rtype: ZMQStream """ - Receive a signal and run callbacks registered for that signal. + logger.debug("Connecting %s to %s." % (socktype, address)) + socket = self._context.socket(socktype) + # configure curve authentication + if zmq_has_curve(): + public, private = maybe_create_and_get_certificates( + self._config_prefix, "client") + server_public_file = os.path.join( + self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key") + server_public, _ = zmq.auth.load_certificate(server_public_file) + socket.curve_publickey = public + socket.curve_secretkey = private + socket.curve_serverkey = server_public + stream = zmqstream.ZMQStream(socket, self._loop) + socket.connect(address) + return stream + + def _zmq_connect_push(self): + """ + Initialize the client's PUSH connection. - This method is called whenever a signal request is received from - server. + :return: A ZMQ connection stream. + :rtype: ZMQStream + """ + return self._zmq_connect(zmq.PUSH, self._emit_addr) - :param controller: used to mediate a single method call - :type controller: protobuf.socketrpc.controller.SocketRpcController - :param request: the request received from the client - :type request: leap.common.events.events_pb2.SignalRequest - :param done: callback to be called when done - :type done: protobuf.socketrpc.server.Callback + def _zmq_connect_sub(self): """ - logger.debug('Received signal from server: %s...' % str(request)[:40]) + Initialize the client's SUB connection. - # run registered callbacks - # TODO: verify authentication using mac in incoming message - if request.event in registered_callbacks: - for (_, cbk) in registered_callbacks[request.event]: - # callbacks should be prepared to receive a - # events_pb2.SignalRequest. - cbk(request) + :return: A ZMQ connection stream. + :rtype: ZMQStream + """ + stream = self._zmq_connect(zmq.SUB, self._reg_addr) + stream.on_recv(self._on_recv) + return stream - # send response back to server - response = proto.EventResponse() - response.status = proto.EventResponse.OK - done.run(response) + def _on_recv(self, msg): + """ + Handle an incoming message in the SUB socket. - def ping(self, controller, request, done): + :param msg: The received message. + :type msg: str """ - Reply to a ping request. + ev_str, content_pickle = msg[0].split(b'\0', 1) # undo txzmq tagging + event = getattr(catalog, ev_str) + content = pickle.loads(content_pickle) + self._handle_event(event, content) - :param controller: used to mediate a single method call - :type controller: protobuf.socketrpc.controller.SocketRpcController - :param request: the request received from the client - :type request: leap.common.events.events_pb2.RegisterRequest - :param done: callback to be called when done - :type done: protobuf.socketrpc.server.Callback + def _subscribe(self, tag): """ - logger.debug("Received ping request, sending response.") - response = proto.EventResponse() - response.status = proto.EventResponse.OK - done.run(response) + Subscribe from a tag on the zmq SUB socket. + :param tag: The tag to be subscribed. + :type tag: str + """ + self._sub.socket.setsockopt(zmq.SUBSCRIBE, tag) -class EventsClientDaemon(daemon.EventsSingletonDaemon): - """ - A daemon that listens for incoming events from server. - """ - @classmethod - def ensure(cls, port): + def _unsubscribe(self, tag): + """ + Unsubscribe from a tag on the zmq SUB socket. + + :param tag: The tag to be unsubscribed. + :type tag: str + """ + self._sub.socket.setsockopt(zmq.UNSUBSCRIBE, tag) + + def _send(self, data): + """ + Send data through PUSH socket. + + :param data: The data to be sent. + :type event: str + """ + logger.debug("Sending data: %s" % data) + # add send() as a callback for ioloop so it works between threads + self._loop.add_callback(lambda: self._push.send(data)) + + def register(self, event, callback, uid=None, replace=False): + """ + Register a callback to be executed when an event is received. + + :param event: The event that triggers the callback. + :type event: Event + :param callback: The callback to be executed. + :type callback: callable(event, *content) + :param uid: The callback uid. + :type uid: str + :param replace: Wether an eventual callback with same ID should be + replaced. + :type replace: bool + + :return: The callback uid. + :rtype: str + + :raises CallbackAlreadyRegisteredError: when there's already a + callback identified by the given uid and replace is False. """ - Make sure the daemon is running on the given port. + self.ensure_client() + return EventsClient.register(self, event, callback, uid=uid, replace=replace) - :param port: the port in which the daemon should listen - :type port: int + def unregister(self, event, uid=None): + """ + Unregister callbacks for an event. + + If uid is not None, then only the callback identified by the given uid + is removed. Otherwise, all callbacks for the event are removed. + + :param event: The event that triggers the callback. + :type event: Event + :param uid: The callback uid. + :type uid: str + """ + self.ensure_client() + EventsClient.unregister(self, event, uid=uid) + + def emit(self, event, *content): + """ + Send an event. + + :param event: The event to be sent. + :type event: Event + :param content: The content of the event. + :type content: list + """ + self.ensure_client() + EventsClient.emit(self, event, *content) + + def run(self): + """ + Run the events client. + """ + logger.debug("Starting ioloop.") + self._init_zmq() + self._initialized.set() + self._loop.start() + self._loop.close() + logger.debug("Ioloop finished.") + + def ensure_client(self): + """ + Make sure the events client thread is started. + """ + with self._lock: + if not self.is_alive(): + self.daemon = True + self.start() + self._initialized.wait() - :return: a daemon instance - :rtype: EventsClientDaemon + def shutdown(self): """ - return cls.ensure_service(port, EventsClientService()) + Shutdown the events client thread. + """ + logger.debug("Shutting down client...") + with self._lock: + if self.is_alive(): + self._loop.stop(wait=True) + EventsClient.shutdown(self) + + +def shutdown(): + """ + Shutdown the events client thread. + """ + EventsClientThread.instance().shutdown() + + +def register(event, callback, uid=None, replace=False): + """ + Register a callback to be executed when an event is received. + + :param event: The event that triggers the callback. + :type event: str + :param callback: The callback to be executed. + :type callback: callable(event, content) + :param uid: The callback uid. + :type uid: str + :param replace: Wether an eventual callback with same ID should be + replaced. + :type replace: bool + + :return: The callback uid. + :rtype: str + + :raises CallbackAlreadyRegisteredError: when there's already a callback + identified by the given uid and replace is False. + """ + return EventsClientThread.instance().register( + event, callback, uid=uid, replace=replace) + + +def unregister(event, uid=None): + """ + Unregister callbacks for an event. + + If uid is not None, then only the callback identified by the given uid is + removed. Otherwise, all callbacks for the event are removed. + + :param event: The event that triggers the callback. + :type event: str + :param uid: The callback uid. + :type uid: str + """ + return EventsClientThread.instance().unregister(event, uid=uid) + + +def emit(event, *content): + """ + Send an event. + + :param event: The event to be sent. + :type event: str + :param content: The content of the event. + :type content: list + """ + return EventsClientThread.instance().emit(event, *content) + + +def instance(): + """ + Return an instance of the events client. + + :return: An instance of the events client. + :rtype: EventsClientThread + """ + return EventsClientThread.instance() -- cgit v1.2.3