summaryrefslogtreecommitdiff
path: root/src/leap/common/events/client.py
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2015-02-04 15:04:10 -0200
committerdrebs <drebs@leap.se>2015-05-27 14:37:27 -0300
commit514c1434a016b09d93e8dfc5578b14825d14005a (patch)
treec4bacce1df24a81b2de3d1343dac26eb56e30ac7 /src/leap/common/events/client.py
parent71c750ef9c3e53ef416d1de6e85458f16ca48d74 (diff)
[feat] refactor events to use ZMQ
Before this commit, protobuf and protobuf.socketrpc were used to serialize and transmit messages between events clients. This change implements a simpler ZMQ client/server events mechanism that uses ZMQ sockets for transmitting messages from clients to server and to redistribute such messages to subscribed clients. Closes: #6359
Diffstat (limited to 'src/leap/common/events/client.py')
-rw-r--r--src/leap/common/events/client.py712
1 files changed, 453 insertions, 259 deletions
diff --git a/src/leap/common/events/client.py b/src/leap/common/events/client.py
index 83f18e0..6b234a1 100644
--- a/src/leap/common/events/client.py
+++ b/src/leap/common/events/client.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# client.py
-# Copyright (C) 2013 LEAP
+# Copyright (C) 2013, 2014, 2015 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -14,6 +14,8 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
"""
The client end point of the events mechanism.
@@ -21,309 +23,501 @@ Clients are the communicating parties of the events mechanism. They
communicate by sending messages to a server, which in turn redistributes
messages to other clients.
-When a client registers a callback for a given signal, it also tells the
-server that it wants to be notified whenever signals of that type are sent by
+When a client registers a callback for a given event, it also tells the
+server that it wants to be notified whenever events of that type are sent by
some other client.
"""
-import logging
+import logging
+import collections
+import uuid
+import threading
+import time
+import pickle
+import os
+
+from abc import ABCMeta
+from abc import abstractmethod
+
+import zmq
+from zmq.eventloop import zmqstream
+from zmq.eventloop import ioloop
+
+# XXX some distros don't package libsodium, so we have to be prepared for
+# absence of zmq.auth
+try:
+ import zmq.auth
+except ImportError:
+ pass
-from protobuf.socketrpc import RpcService
+from leap.common.config import get_path_prefix
+from leap.common.zmq_utils import zmq_has_curve
+from leap.common.zmq_utils import maybe_create_and_get_certificates
+from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX
-from leap.common.events import events_pb2 as proto
-from leap.common.events import server
-from leap.common.events import daemon
-from leap.common.events import mac_auth
+from leap.common.events.errors import CallbackAlreadyRegisteredError
+from leap.common.events.server import EMIT_ADDR
+from leap.common.events.server import REG_ADDR
+from leap.common.events import catalog
logger = logging.getLogger(__name__)
-# the `registered_callbacks` dictionary below should have the following
-# format:
-#
-# { event_signal: [ (uid, callback), ... ], ... }
-#
-registered_callbacks = {}
+_emit_addr = EMIT_ADDR
+_reg_addr = REG_ADDR
+
+def configure_client(emit_addr, reg_addr):
+ global _emit_addr, _reg_addr
+ logger.debug("Configuring client with addresses: (%s, %s)" %
+ (emit_addr, reg_addr))
+ _emit_addr = emit_addr
+ _reg_addr = reg_addr
-class CallbackAlreadyRegistered(Exception):
+
+class EventsClient(object):
"""
- Raised when trying to register an already registered callback.
+ A singleton client for the events mechanism.
"""
- pass
+ __metaclass__ = ABCMeta
-def ensure_client_daemon():
- """
- Ensure the client daemon is running and listening for incoming
- messages.
+ _instance = None
+ _instance_lock = threading.Lock()
- :return: the daemon instance
- :rtype: EventsClientDaemon
- """
- import time
- daemon = EventsClientDaemon.ensure(0)
- logger.debug('ensure client daemon')
+ def __init__(self, emit_addr, reg_addr):
+ """
+ Initialize the events client.
+ """
+ logger.debug("Creating client instance.")
+ self._callbacks = collections.defaultdict(dict)
+ self._emit_addr = emit_addr
+ self._reg_addr = reg_addr
+
+ @property
+ def callbacks(self):
+ return self._callbacks
- # Because we use a random port we want to wait until a port is assigned to
- # local client daemon.
+ @classmethod
+ def instance(cls):
+ """
+ Return a singleton EventsClient instance.
+ """
+ with cls._instance_lock:
+ if cls._instance is None:
+ cls._instance = cls(_emit_addr, _reg_addr)
+ return cls._instance
- while not (EventsClientDaemon.get_instance() and
- EventsClientDaemon.get_instance().get_port()):
- time.sleep(0.1)
- return daemon
+ def register(self, event, callback, uid=None, replace=False):
+ """
+ Register a callback to be executed when an event is received.
+
+ :param event: The event that triggers the callback.
+ :type event: Event
+ :param callback: The callback to be executed.
+ :type callback: callable(event, *content)
+ :param uid: The callback uid.
+ :type uid: str
+ :param replace: Wether an eventual callback with same ID should be
+ replaced.
+ :type replace: bool
+
+ :return: The callback uid.
+ :rtype: str
+
+ :raises CallbackAlreadyRegisteredError: when there's already a callback
+ identified by the given uid and replace is False.
+ """
+ logger.debug("Subscribing to event: %s" % event)
+ if not uid:
+ uid = uuid.uuid4()
+ elif uid in self._callbacks[event] and not replace:
+ raise CallbackAlreadyRegisteredError()
+ self._callbacks[event][uid] = callback
+ self._subscribe(str(event))
+ return uid
+
+ def unregister(self, event, uid=None):
+ """
+ Unregister callbacks for an event.
+ If uid is not None, then only the callback identified by the given uid
+ is removed. Otherwise, all callbacks for the event are removed.
-def register(signal, callback, uid=None, replace=False, reqcbk=None,
- timeout=1000):
- """
- Registers a callback to be called when a specific signal event is
- received.
-
- Will timeout after timeout ms if response has not been received. The
- timeout arg is only used for asynch requests. If a reqcbk callback has
- been supplied the timeout arg is not used. The response value will be
- returned for a synch request but nothing will be returned for an asynch
- request.
-
- :param signal: the signal that causes the callback to be launched
- :type signal: int (see the `events.proto` file)
- :param callback: the callback to be called when the signal is received
- :type callback: function(leap.common.events.events_pb2.SignalRequest)
- :param uid: a unique id for the callback
- :type uid: int
- :param replace: should an existent callback with same uid be replaced?
- :type replace: bool
- :param reqcbk: a callback to be called when a response from server is
- received
- :type reqcbk: function(proto.RegisterRequest, proto.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
-
- Might raise a CallbackAlreadyRegistered exception if there's already a
- callback identified by the given uid and replace is False.
-
- :return: the response from server for synch calls or nothing for asynch
- calls.
- :rtype: leap.common.events.events_pb2.EventsResponse or None
- """
- ensure_client_daemon() # so we can receive registered signals
- # register callback locally
- if signal not in registered_callbacks:
- registered_callbacks[signal] = []
- cbklist = registered_callbacks[signal]
-
- # TODO should check that the callback has the right
- # number of arguments.
-
- if uid and filter(lambda (x, y): x == uid, cbklist):
- if not replace:
- raise CallbackAlreadyRegistered()
+ :param event: The event that triggers the callback.
+ :type event: Event
+ :param uid: The callback uid.
+ :type uid: str
+ """
+ if not uid:
+ logger.debug(
+ "Unregistering all callbacks from event %s." % event)
+ self._callbacks[event] = {}
else:
- registered_callbacks[signal] = filter(lambda(x, y): x != uid,
- cbklist)
- registered_callbacks[signal].append((uid, callback))
- # register callback on server
- request = proto.RegisterRequest()
- request.event = signal
- request.port = EventsClientDaemon.get_instance().get_port()
- request.mac_method = mac_auth.MacMethod.MAC_NONE
- request.mac = ""
- service = RpcService(proto.EventsServerService_Stub,
- server.SERVER_PORT, 'localhost')
- logger.debug(
- "Sending registration request to server on port %s: %s",
- server.SERVER_PORT,
- str(request)[:40])
- return service.register(request, callback=reqcbk, timeout=timeout)
-
-
-def unregister(signal, uid=None, reqcbk=None, timeout=1000):
- """
- Unregister a callback.
-
- If C{uid} is specified, unregisters only the callback identified by that
- unique id. Otherwise, unregisters all callbacks
-
- :param signal: the signal that causes the callback to be launched
- :type signal: int (see the `events.proto` file)
- :param uid: a unique id for the callback
- :type uid: int
- :param reqcbk: a callback to be called when a response from server is
- received
- :type reqcbk: function(proto.UnregisterRequest, proto.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
-
- :return: the response from server for synch calls or nothing for asynch
- calls or None if no callback is registered for that signal or
- uid.
- :rtype: leap.common.events.events_pb2.EventsResponse or None
- """
- if signal not in registered_callbacks or not registered_callbacks[signal]:
- logger.warning("No callback registered for signal %d." % signal)
- return None
- # unregister callback locally
- cbklist = registered_callbacks[signal]
- if uid is not None:
- if filter(lambda (cbkuid, _): cbkuid == uid, cbklist) == []:
- logger.warning("No callback registered for uid %d." % st)
- return None
- registered_callbacks[signal] = filter(lambda(x, y): x != uid, cbklist)
- else:
- # exclude all callbacks for given signal
- registered_callbacks[signal] = []
- # unregister port in server if there are no more callbacks for this signal
- if not registered_callbacks[signal]:
- request = proto.UnregisterRequest()
- request.event = signal
- request.port = EventsClientDaemon.get_instance().get_port()
- request.mac_method = mac_auth.MacMethod.MAC_NONE
- request.mac = ""
- service = RpcService(proto.EventsServerService_Stub,
- server.SERVER_PORT, 'localhost')
- logger.info(
- "Sending unregistration request to server on port %s: %s",
- server.SERVER_PORT,
- str(request)[:40])
- return service.unregister(request, callback=reqcbk, timeout=timeout)
-
-
-def signal(signal, content="", mac_method="", mac="", reqcbk=None,
- timeout=1000):
- """
- Send `signal` event to events server.
-
- Will timeout after timeout ms if response has not been received. The
- timeout arg is only used for asynch requests. If a reqcbk callback has
- been supplied the timeout arg is not used. The response value will be
- returned for a synch request but nothing will be returned for an asynch
- request.
-
- :param signal: the signal that causes the callback to be launched
- :type signal: int (see the `events.proto` file)
- :param content: the contents of the event signal
- :type content: str
- :param mac_method: the method used for auth mac
- :type mac_method: str
- :param mac: the content of the auth mac
- :type mac: str
- :param reqcbk: a callback to be called when a response from server is
- received
- :type reqcbk: function(proto.SignalRequest, proto.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
-
- :return: the response from server for synch calls or nothing for asynch
- calls.
- :rtype: leap.common.events.events_pb2.EventsResponse or None
- """
- request = proto.SignalRequest()
- request.event = signal
- request.content = content
- request.mac_method = mac_method
- request.mac = mac
- service = RpcService(proto.EventsServerService_Stub, server.SERVER_PORT,
- 'localhost')
- logger.debug("Sending signal to server: %s", str(request)[:40])
- return service.signal(request, callback=reqcbk, timeout=timeout)
-
-
-def ping(port, reqcbk=None, timeout=1000):
+ logger.debug(
+ "Unregistering callback %s from event %s." % (uid, event))
+ if uid in self._callbacks[event]:
+ del self._callbacks[event][uid]
+ if not self._callbacks[event]:
+ del self._callbacks[event]
+ self._unsubscribe(str(event))
+
+ def emit(self, event, *content):
+ """
+ Send an event.
+
+ :param event: The event to be sent.
+ :type event: Event
+ :param content: The content of the event.
+ :type content: list
+ """
+ logger.debug("Sending event: (%s, %s)" % (event, content))
+ self._send(str(event) + b'\0' + pickle.dumps(content))
+
+ def _handle_event(self, event, content):
+ """
+ Handle an incoming event.
+
+ :param msg: The incoming message.
+ :type msg: list(str)
+ """
+ logger.debug("Handling event %s..." % event)
+ for uid in self._callbacks[event]:
+ callback = self._callbacks[event][uid]
+ logger.debug("Executing callback %s." % uid)
+ callback(event, *content)
+
+ @abstractmethod
+ def _subscribe(self, tag):
+ """
+ Subscribe to a tag on the zmq SUB socket.
+
+ :param tag: The tag to be subscribed.
+ :type tag: str
+ """
+ pass
+
+ @abstractmethod
+ def _unsubscribe(self, tag):
+ """
+ Unsubscribe from a tag on the zmq SUB socket.
+
+ :param tag: The tag to be unsubscribed.
+ :type tag: str
+ """
+ pass
+
+ @abstractmethod
+ def _send(self, data):
+ """
+ Send data through PUSH socket.
+
+ :param data: The data to be sent.
+ :type event: str
+ """
+ pass
+
+ def shutdown(self):
+ self.__class__.reset()
+
+ @classmethod
+ def reset(cls):
+ with cls._instance_lock:
+ cls._instance = None
+
+
+class EventsIOLoop(ioloop.ZMQIOLoop):
"""
- Ping a client running in C{port}.
-
- :param port: the port in which the client should be listening
- :type port: int
- :param reqcbk: a callback to be called when a response from client is
- received
- :type reqcbk: function(proto.PingRequest, proto.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
-
- :return: the response from client for synch calls or nothing for asynch
- calls.
- :rtype: leap.common.events.events_pb2.EventsResponse or None
+ An extension of zmq's ioloop that can wait until there are no callbacks
+ in the queue before stopping.
"""
- request = proto.PingRequest()
- service = RpcService(
- proto.EventsClientService_Stub,
- port,
- 'localhost')
- logger.debug("Pinging a client in port %d..." % port)
- return service.ping(request, callback=reqcbk, timeout=timeout)
+ def stop(self, wait=False):
+ """
+ Stop the I/O loop.
-class EventsClientService(proto.EventsClientService):
+ :param wait: Whether we should wait for callbacks in queue to finish
+ before stopping.
+ :type wait: bool
+ """
+ if wait:
+ # prevent new callbacks from being added
+ with self._callback_lock:
+ self._closing = True
+ # wait until all callbacks have been executed
+ while self._callbacks:
+ time.sleep(0.1)
+ ioloop.ZMQIOLoop.stop(self)
+
+
+class EventsClientThread(threading.Thread, EventsClient):
"""
- Service for receiving signal events in clients.
+ A threaded version of the events client.
"""
- def __init__(self):
- proto.EventsClientService.__init__(self)
+ def __init__(self, emit_addr, reg_addr):
+ """
+ Initialize the events client.
+ """
+ threading.Thread.__init__(self)
+ EventsClient.__init__(self, emit_addr, reg_addr)
+ self._lock = threading.Lock()
+ self._initialized = threading.Event()
+ self._config_prefix = os.path.join(
+ get_path_prefix(), "leap", "events")
+ self._loop = None
+ self._context = None
+ self._push = None
+ self._sub = None
+
+ def _init_zmq(self):
+ """
+ Initialize ZMQ connections.
+ """
+ self._loop = EventsIOLoop()
+ self._context = zmq.Context()
+ # connect SUB first, otherwise we might miss some event sent from this
+ # same client
+ self._sub = self._zmq_connect_sub()
+ self._push = self._zmq_connect_push()
+
+ def _zmq_connect(self, socktype, address):
+ """
+ Connect to an address using with a zmq socktype.
+
+ :param socktype: The ZMQ socket type.
+ :type socktype: int
+ :param address: The address to connect to.
+ :type address: str
- def signal(self, controller, request, done):
+ :return: A ZMQ connection stream.
+ :rtype: ZMQStream
"""
- Receive a signal and run callbacks registered for that signal.
+ logger.debug("Connecting %s to %s." % (socktype, address))
+ socket = self._context.socket(socktype)
+ # configure curve authentication
+ if zmq_has_curve():
+ public, private = maybe_create_and_get_certificates(
+ self._config_prefix, "client")
+ server_public_file = os.path.join(
+ self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key")
+ server_public, _ = zmq.auth.load_certificate(server_public_file)
+ socket.curve_publickey = public
+ socket.curve_secretkey = private
+ socket.curve_serverkey = server_public
+ stream = zmqstream.ZMQStream(socket, self._loop)
+ socket.connect(address)
+ return stream
+
+ def _zmq_connect_push(self):
+ """
+ Initialize the client's PUSH connection.
- This method is called whenever a signal request is received from
- server.
+ :return: A ZMQ connection stream.
+ :rtype: ZMQStream
+ """
+ return self._zmq_connect(zmq.PUSH, self._emit_addr)
- :param controller: used to mediate a single method call
- :type controller: protobuf.socketrpc.controller.SocketRpcController
- :param request: the request received from the client
- :type request: leap.common.events.events_pb2.SignalRequest
- :param done: callback to be called when done
- :type done: protobuf.socketrpc.server.Callback
+ def _zmq_connect_sub(self):
"""
- logger.debug('Received signal from server: %s...' % str(request)[:40])
+ Initialize the client's SUB connection.
- # run registered callbacks
- # TODO: verify authentication using mac in incoming message
- if request.event in registered_callbacks:
- for (_, cbk) in registered_callbacks[request.event]:
- # callbacks should be prepared to receive a
- # events_pb2.SignalRequest.
- cbk(request)
+ :return: A ZMQ connection stream.
+ :rtype: ZMQStream
+ """
+ stream = self._zmq_connect(zmq.SUB, self._reg_addr)
+ stream.on_recv(self._on_recv)
+ return stream
- # send response back to server
- response = proto.EventResponse()
- response.status = proto.EventResponse.OK
- done.run(response)
+ def _on_recv(self, msg):
+ """
+ Handle an incoming message in the SUB socket.
- def ping(self, controller, request, done):
+ :param msg: The received message.
+ :type msg: str
"""
- Reply to a ping request.
+ ev_str, content_pickle = msg[0].split(b'\0', 1) # undo txzmq tagging
+ event = getattr(catalog, ev_str)
+ content = pickle.loads(content_pickle)
+ self._handle_event(event, content)
- :param controller: used to mediate a single method call
- :type controller: protobuf.socketrpc.controller.SocketRpcController
- :param request: the request received from the client
- :type request: leap.common.events.events_pb2.RegisterRequest
- :param done: callback to be called when done
- :type done: protobuf.socketrpc.server.Callback
+ def _subscribe(self, tag):
"""
- logger.debug("Received ping request, sending response.")
- response = proto.EventResponse()
- response.status = proto.EventResponse.OK
- done.run(response)
+ Subscribe from a tag on the zmq SUB socket.
+ :param tag: The tag to be subscribed.
+ :type tag: str
+ """
+ self._sub.socket.setsockopt(zmq.SUBSCRIBE, tag)
-class EventsClientDaemon(daemon.EventsSingletonDaemon):
- """
- A daemon that listens for incoming events from server.
- """
- @classmethod
- def ensure(cls, port):
+ def _unsubscribe(self, tag):
+ """
+ Unsubscribe from a tag on the zmq SUB socket.
+
+ :param tag: The tag to be unsubscribed.
+ :type tag: str
+ """
+ self._sub.socket.setsockopt(zmq.UNSUBSCRIBE, tag)
+
+ def _send(self, data):
+ """
+ Send data through PUSH socket.
+
+ :param data: The data to be sent.
+ :type event: str
+ """
+ logger.debug("Sending data: %s" % data)
+ # add send() as a callback for ioloop so it works between threads
+ self._loop.add_callback(lambda: self._push.send(data))
+
+ def register(self, event, callback, uid=None, replace=False):
+ """
+ Register a callback to be executed when an event is received.
+
+ :param event: The event that triggers the callback.
+ :type event: Event
+ :param callback: The callback to be executed.
+ :type callback: callable(event, *content)
+ :param uid: The callback uid.
+ :type uid: str
+ :param replace: Wether an eventual callback with same ID should be
+ replaced.
+ :type replace: bool
+
+ :return: The callback uid.
+ :rtype: str
+
+ :raises CallbackAlreadyRegisteredError: when there's already a
+ callback identified by the given uid and replace is False.
"""
- Make sure the daemon is running on the given port.
+ self.ensure_client()
+ return EventsClient.register(self, event, callback, uid=uid, replace=replace)
- :param port: the port in which the daemon should listen
- :type port: int
+ def unregister(self, event, uid=None):
+ """
+ Unregister callbacks for an event.
+
+ If uid is not None, then only the callback identified by the given uid
+ is removed. Otherwise, all callbacks for the event are removed.
+
+ :param event: The event that triggers the callback.
+ :type event: Event
+ :param uid: The callback uid.
+ :type uid: str
+ """
+ self.ensure_client()
+ EventsClient.unregister(self, event, uid=uid)
+
+ def emit(self, event, *content):
+ """
+ Send an event.
+
+ :param event: The event to be sent.
+ :type event: Event
+ :param content: The content of the event.
+ :type content: list
+ """
+ self.ensure_client()
+ EventsClient.emit(self, event, *content)
+
+ def run(self):
+ """
+ Run the events client.
+ """
+ logger.debug("Starting ioloop.")
+ self._init_zmq()
+ self._initialized.set()
+ self._loop.start()
+ self._loop.close()
+ logger.debug("Ioloop finished.")
+
+ def ensure_client(self):
+ """
+ Make sure the events client thread is started.
+ """
+ with self._lock:
+ if not self.is_alive():
+ self.daemon = True
+ self.start()
+ self._initialized.wait()
- :return: a daemon instance
- :rtype: EventsClientDaemon
+ def shutdown(self):
"""
- return cls.ensure_service(port, EventsClientService())
+ Shutdown the events client thread.
+ """
+ logger.debug("Shutting down client...")
+ with self._lock:
+ if self.is_alive():
+ self._loop.stop(wait=True)
+ EventsClient.shutdown(self)
+
+
+def shutdown():
+ """
+ Shutdown the events client thread.
+ """
+ EventsClientThread.instance().shutdown()
+
+
+def register(event, callback, uid=None, replace=False):
+ """
+ Register a callback to be executed when an event is received.
+
+ :param event: The event that triggers the callback.
+ :type event: str
+ :param callback: The callback to be executed.
+ :type callback: callable(event, content)
+ :param uid: The callback uid.
+ :type uid: str
+ :param replace: Wether an eventual callback with same ID should be
+ replaced.
+ :type replace: bool
+
+ :return: The callback uid.
+ :rtype: str
+
+ :raises CallbackAlreadyRegisteredError: when there's already a callback
+ identified by the given uid and replace is False.
+ """
+ return EventsClientThread.instance().register(
+ event, callback, uid=uid, replace=replace)
+
+
+def unregister(event, uid=None):
+ """
+ Unregister callbacks for an event.
+
+ If uid is not None, then only the callback identified by the given uid is
+ removed. Otherwise, all callbacks for the event are removed.
+
+ :param event: The event that triggers the callback.
+ :type event: str
+ :param uid: The callback uid.
+ :type uid: str
+ """
+ return EventsClientThread.instance().unregister(event, uid=uid)
+
+
+def emit(event, *content):
+ """
+ Send an event.
+
+ :param event: The event to be sent.
+ :type event: str
+ :param content: The content of the event.
+ :type content: list
+ """
+ return EventsClientThread.instance().emit(event, *content)
+
+
+def instance():
+ """
+ Return an instance of the events client.
+
+ :return: An instance of the events client.
+ :rtype: EventsClientThread
+ """
+ return EventsClientThread.instance()