From 514c1434a016b09d93e8dfc5578b14825d14005a Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 4 Feb 2015 15:04:10 -0200 Subject: [feat] refactor events to use ZMQ Before this commit, protobuf and protobuf.socketrpc were used to serialize and transmit messages between events clients. This change implements a simpler ZMQ client/server events mechanism that uses ZMQ sockets for transmitting messages from clients to server and to redistribute such messages to subscribed clients. Closes: #6359 --- src/leap/common/events/__init__.py | 315 +++++++++++++++++++------------------ 1 file changed, 163 insertions(+), 152 deletions(-) (limited to 'src/leap/common/events/__init__.py') diff --git a/src/leap/common/events/__init__.py b/src/leap/common/events/__init__.py index 0cc6573..9269b9a 100644 --- a/src/leap/common/events/__init__.py +++ b/src/leap/common/events/__init__.py @@ -15,188 +15,199 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . + """ -This is an events mechanism that uses a server to allow for signaling of -events between clients. +This is an events mechanism that uses a server to allow for emitting events +between clients. Application components should use the interface available in this file to -register callbacks to be executed upon receiving specific signals, and to send -signals to other components. +register callbacks to be executed upon receiving specific events, and to send +events to other components. -To register a callback to be executed when a specific event is signaled, use +To register a callback to be executed when a specific event is emitted, use leap.common.events.register(): >>> from leap.common.events import register ->>> from leap.common.events import events_pb2 as proto ->>> register(proto.CLIENT_UID, lambda req: do_something(req)) - -To signal an event, use leap.common.events.signal(): - ->>> from leap.common.events import signal ->>> from leap.common.events import events_pb2 as proto ->>> signal(proto.CLIENT_UID) - - -NOTE ABOUT SYNC/ASYNC REQUESTS: - -Clients always communicate with the server, and never between themselves. -When a client registers a callback for an event, the callback is saved locally -in the client and the server stores the client socket port in a list -associated with that event. When a client signals an event, the server -forwards the signal to all registered client ports, and then each client -executes its callbacks associated with that event locally. - -Each RPC call from a client to the server is followed by a response from the -server to the client. Calls to register() and signal() (and all other RPC -calls) can be synchronous or asynchronous meaning if they will or not wait for -the server's response before returning. - -This mechanism was built on top of protobuf.socketrpc, and because of this RPC -calls are made synchronous or asynchronous in the following way: - - * If RPC calls receive a parameter called `reqcbk`, then the call is made - asynchronous. That means that: +>>> from leap.common.events import catalog +>>> register(catalog.CLIENT_UID, lambda sig, content: do_something(content)) - - an eventual `timeout` parameter is not used, - - the call returns immediatelly with value None, and - - the `reqcbk` callback is executed asynchronously upon the arrival of - a response from the server. +To emit an event, use leap.common.events.emit(): - * Otherwise, if the `reqcbk` parameter is None, then the call is made in a - synchronous manner: - - - if a response from server arrives within `timeout` milliseconds, the - RPC call returns it; - - otherwise, the call returns None. +>>> from leap.common.events import emit +>>> from leap.common.events import catalog +>>> emit(catalog.CLIENT_UID) """ + import logging -import socket +import argparse + +from leap.common.events import client +from leap.common.events import server +from leap.common.events import catalog -from leap.common.events import ( - events_pb2 as proto, - server, - client, - daemon, -) +__all__ = [ + "register", + "unregister", + "emit", + "catalog", +] logger = logging.getLogger(__name__) -def register(signal, callback, uid=None, replace=False, reqcbk=None, - timeout=1000): +def register(event, callback, uid=None, replace=False): """ - Register a callback to be called when the given signal is received. - - Will timeout after timeout ms if response has not been received. The - timeout arg is only used for asynch requests. If a reqcbk callback has - been supplied the timeout arg is not used. The response value will be - returned for a synch request but nothing will be returned for an asynch - request. - - :param signal: the signal that causes the callback to be launched - :type signal: int (see the `events.proto` file) - :param callback: the callback to be called when the signal is received - :type callback: function - :param uid: a unique id for the callback - :type uid: int - :param replace: should an existent callback with same uid be replaced? + Register a callback to be executed when an event is received. + + :param event: The event that triggers the callback. + :type event: str + :param callback: The callback to be executed. + :type callback: callable(event, content) + :param uid: The callback uid. + :type uid: str + :param replace: Wether an eventual callback with same ID should be + replaced. :type replace: bool - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(leap.common.events.events_pb2.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - :return: the response from server for synch calls or nothing for asynch - calls. - :rtype: leap.common.events.events_pb2.EventsResponse or None - """ - return client.register(signal, callback, uid, replace, reqcbk, timeout) + :return: The callback uid. + :rtype: str -def unregister(signal, uid=None, reqcbk=None, timeout=1000): - """ - Unregister a callback. - - If C{uid} is specified, unregisters only the callback identified by that - unique id. Otherwise, unregisters all callbacks registered for C{signal}. - - :param signal: the signal that causes the callback to be launched - :type signal: int (see the `events.proto` file) - :param uid: a unique id for the callback - :type uid: int - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(proto.UnregisterRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - :return: the response from server for synch calls or nothing for asynch - calls. - :rtype: leap.common.events.events_pb2.EventsResponse or None + :raises CallbackAlreadyRegistered: when there's already a callback + identified by the given uid and replace is False. """ - return client.unregister(signal, uid, reqcbk, timeout) + return client.register(event, callback, uid, replace) -def signal(signal, content="", mac_method="", mac="", reqcbk=None, - timeout=1000): +def unregister(event, uid=None): """ - Send `signal` event to events server. - - Will timeout after timeout ms if response has not been received. The - timeout arg is only used for asynch requests. If a reqcbk callback has - been supplied the timeout arg is not used. The response value will be - returned for a synch request but nothing will be returned for an asynch - request. - - :param signal: the signal that causes the callback to be launched - :type signal: int (see the `events.proto` file) - :param content: the contents of the event signal - :type content: str - :param mac_method: the method used to auth mac - :type mac_method: str - :param mac: the content of the auth mac - :type mac: str - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(proto.SignalRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - :return: the response from server for synch calls or nothing for asynch - calls. - :rtype: leap.common.events.events_pb2.EventsResponse or None - """ - return client.signal(signal, content, mac_method, mac, reqcbk, timeout) + Unregister callbacks for an event. -def ping_client(port, reqcbk=None, timeout=1000): - """ - Ping a client running in C{port}. - - :param port: the port in which the client should be listening - :type port: int - :param reqcbk: a callback to be called when a response from client is - received - :type reqcbk: function(proto.PingRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int + If uid is not None, then only the callback identified by the given uid is + removed. Otherwise, all callbacks for the event are removed. + + :param event: The event that triggers the callback. + :type event: Event + :param uid: The callback uid. + :type uid: str """ - return client.ping(port, reqcbk=reqcbk, timeout=timeout) + return client.unregister(event, uid) -def ping_server(port=server.SERVER_PORT, reqcbk=None, timeout=1000): +def emit(event, *content): """ - Ping the server. - - :param port: the port in which server should be listening - :type port: int - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(proto.PingRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int + Send an event. + + :param event: The event to be sent. + :type event: Event + :param content: The content of the event. + :type content: list """ - return server.ping(port, reqcbk=reqcbk, timeout=timeout) + return client.emit(event, *content) + + +if __name__ == "__main__": + + def _echo(event, *content): + print "Received event: (%s, %s)" % (event, content) + + def _parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument( + "--debug", "-d", action="store_true", + help="print debug information") + + subparsers = parser.add_subparsers(dest="command") + + # server options + server_parser = subparsers.add_parser( + "server", help="Run an events server.") + server_parser.add_argument( + "--emit-addr", + help="The address in which to listen for events", + default=server.EMIT_ADDR) + server_parser.add_argument( + "--reg-addr", + help="The address in which to listen for registration for events.", + default=server.REG_ADDR) + + # client options + client_parser = subparsers.add_parser( + "client", help="Run an events client.") + client_parser.add_argument( + "--emit-addr", + help="The address in which to emit events.", + default=server.EMIT_ADDR) + client_parser.add_argument( + "--reg-addr", + help="The address in which to register for events.", + default=server.REG_ADDR) + group = client_parser.add_mutually_exclusive_group(required=True) + group.add_argument('--reg', help="register an event") + group.add_argument('--emit', help="send an event") + client_parser.add_argument( + '--content', help="the content of the event", default=None) + + # txclient options + txclient_parser = subparsers.add_parser( + "txclient", help="Run an events twisted client.") + txclient_parser.add_argument( + "--emit-addr", + help="The address in which to emit events.", + default=server.EMIT_ADDR) + txclient_parser.add_argument( + "--reg-addr", + help="The address in which to register for events.", + default=server.REG_ADDR) + group = txclient_parser.add_mutually_exclusive_group(required=True) + group.add_argument('--reg', help="register an event") + group.add_argument('--emit', help="send an event") + txclient_parser.add_argument( + '--content', help="the content of the event", default=None) + + return parser.parse_args() + + args = _parse_args() + + if args.debug: + logging.basicConfig(level=logging.DEBUG) + + if args.command == "server": + # run server + server.ensure_server(emit_addr=args.emit_addr, reg_addr=args.reg_addr) + from twisted.internet import reactor + reactor.run() + elif args.command == "client": + if args.reg: + event = getattr(catalog, args.reg) + # run client and register to a signal + register(event, _echo) + # make sure we stop on CTRL+C + import signal + signal.signal( + signal.SIGINT, lambda sig, frame: client.shutdown()) + # wait until client thread dies + import time + while client.EventsClientThread.instance().is_alive(): + time.sleep(0.1) + if args.emit: + # run client and emit a signal + event = getattr(catalog, args.emit) + emit(event, args.content) + client.shutdown() + elif args.command == "txclient": + from leap.common.events import txclient + register = txclient.register + emit = txclient.emit + if args.reg: + event = getattr(catalog, args.reg) + # run client and register to a signal + register(event, _echo) + from twisted.internet import reactor + reactor.run() + if args.emit: + # run client and emit a signal + event = getattr(catalog, args.emit) + emit(event, args.content) -- cgit v1.2.3 From 467b14fa2e29ecd6f41d4834b00593d8c86cddc5 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Mon, 13 Jul 2015 11:48:47 -0400 Subject: [feature] add global flag for disabling the events framework this will be used to allow the unittests to disable the events framework. this way, emit() will become a passthrough. note that, until now, the basetest class is making use of the threaded version of the client, which launches a zmq tornado-based ioloop. this is wrong, and will have to be addressed in a future commit. we'll have to make use of the global EVENTS_ENABLED flag in the txclient version when those changes are made. Related: #7259 Relases: 0.4.2 --- src/leap/common/events/__init__.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'src/leap/common/events/__init__.py') diff --git a/src/leap/common/events/__init__.py b/src/leap/common/events/__init__.py index 9269b9a..87ed8ae 100644 --- a/src/leap/common/events/__init__.py +++ b/src/leap/common/events/__init__.py @@ -14,8 +14,6 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . - - """ This is an events mechanism that uses a server to allow for emitting events between clients. @@ -37,13 +35,13 @@ To emit an event, use leap.common.events.emit(): >>> from leap.common.events import catalog >>> emit(catalog.CLIENT_UID) """ - - import logging import argparse from leap.common.events import client from leap.common.events import server +from leap.common.events.flags import set_events_enabled + from leap.common.events import catalog @@ -52,6 +50,7 @@ __all__ = [ "unregister", "emit", "catalog", + "set_events_enabled" ] -- cgit v1.2.3