From 514c1434a016b09d93e8dfc5578b14825d14005a Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 4 Feb 2015 15:04:10 -0200 Subject: [feat] refactor events to use ZMQ Before this commit, protobuf and protobuf.socketrpc were used to serialize and transmit messages between events clients. This change implements a simpler ZMQ client/server events mechanism that uses ZMQ sockets for transmitting messages from clients to server and to redistribute such messages to subscribed clients. Closes: #6359 --- src/leap/common/events/README.rst | 117 ++++++++++++++++++++++++++++---------- 1 file changed, 88 insertions(+), 29 deletions(-) (limited to 'src/leap/common/events/README.rst') diff --git a/src/leap/common/events/README.rst b/src/leap/common/events/README.rst index 2e7f254..f455cc8 100644 --- a/src/leap/common/events/README.rst +++ b/src/leap/common/events/README.rst @@ -1,19 +1,83 @@ Events mechanism ================ -The events mechanism allows for clients to send signal events to each -other by means of a centralized server. Clients can register with the -server to receive signals of certain types, and they can also send signals to -the server that will then redistribute these signals to registered clients. +The events mechanism allows for clients to send events to each other by means +of a centralized server. Clients can register with the server to receive +events of certain types, and they can also send events to the server that will +then redistribute these events to registered clients. -Listening daemons ------------------ +ZMQ connections and events redistribution +----------------------------------------- -Both clients and the server listen for incoming messages by using a -listening daemon that runs in its own thread. The server daemon has to be -started explicitly, while clients daemon will be started whenever a -client registers with the server to receive messages. +Clients and server use ZMQ connection patterns to communicate. Clients can +push events to the server, and may subscribe to events published by the +server. The server in turn pulls events from clients and publishes them to +subscribed clients. + +Clients connect to the server's zmq pub socket, and register to specific +events indicating which callbacks should be executed when that event is +received: + + + EventsServer + .------------. + |PULL PUB| + '------------' + ^^ + || + reg(1, cbk1) |'--------------. reg(2, cbk2) + | | + | | + .------------. .------------. .------------. + |PUSH SUB| |PUSH SUB| |PUSH SUB| + '------------' '------------' '------------' + EventsClient EventsClient EventsClient + + +A client that wants to send an event connects to the server's zmq pull socket +and pushes the event to the server. The server then redistributes it to all +clients subscribed to that event. + + + EventsServer + .------------. + |PULL---->PUB| + '------------' + ^ |. + | |. +sig(1, 'foo') .----------------' |'............... + | | . + | v . + .------------. .------------. .------------. + |PUSH SUB| |PUSH SUB| |PUSH SUB| + '------------' '------------' '------------' + EventsClient EventsClient EventsClient + | + v + cbk1(1, 'foo') + + +Any client may emit or subscribe to an event. ZMQ will manage sockets and +reuse the connection whenever it can. + + + EventsServer + .------------. + |PULL---->PUB| + '------------' + ^ .| + | .| +sig(2, 'bar') .-----------------' .'--------------. + | . | + | . v + .------------. .------------. .------------. + |PUSH SUB| |PUSH SUB| |PUSH SUB| + '------------' '------------' '------------' + EventsClient EventsClient EventsClient + | + v + cbk2(2, 'bar') How to use it @@ -22,32 +86,27 @@ How to use it To start the events server: >>> from leap.common.events import server ->>> server.ensure_server(port=8090) +>>> server.ensure_server( + emit_addr="tcp://127.0.0.1:9000", + reg_addr="tcp://127.0.0.1:9001") -To register a callback to be called when a given signal is raised: +To register a callback to be called when a given event is raised: ->>> from leap.common.events import ( ->>> register, ->>> events_pb2 as proto, ->>> ) +>>> from leap.common.events import register +>>> from leap.common.events import catalog >>> ->>> def mycallback(sigreq): ->>> print str(sigreq) +>>> def mycbk(event, *content): +>>> print "%s, %s" (str(event), str(content)) >>> ->>> events.register(signal=proto.CLIENT_UID, callback=mycallback) +>>> register(catalog.CLIENT_UID, callback=mycbk) -To signal an event: +To emit an event: ->>> from leap.common.events import ( ->>> signal, ->>> events_pb2 as proto, ->>> ) ->>> signal(proto.CLIENT_UID) +>>> from leap.common.events import emit +>>> from leap.common.events import catalog +>>> emit(catalog.CLIENT_UID) Adding events ------------- -* Add the new event under enum ``Event`` in ``events.proto`` -* Compile the new protocolbuffers file:: - - make +To add a new event, just add it to ``catalog.py``. -- cgit v1.2.3