summaryrefslogtreecommitdiff
path: root/src/leap/common/events/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/common/events/__init__.py')
-rw-r--r--src/leap/common/events/__init__.py315
1 files changed, 163 insertions, 152 deletions
diff --git a/src/leap/common/events/__init__.py b/src/leap/common/events/__init__.py
index 0cc6573..9269b9a 100644
--- a/src/leap/common/events/__init__.py
+++ b/src/leap/common/events/__init__.py
@@ -15,188 +15,199 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
"""
-This is an events mechanism that uses a server to allow for signaling of
-events between clients.
+This is an events mechanism that uses a server to allow for emitting events
+between clients.
Application components should use the interface available in this file to
-register callbacks to be executed upon receiving specific signals, and to send
-signals to other components.
+register callbacks to be executed upon receiving specific events, and to send
+events to other components.
-To register a callback to be executed when a specific event is signaled, use
+To register a callback to be executed when a specific event is emitted, use
leap.common.events.register():
>>> from leap.common.events import register
->>> from leap.common.events import events_pb2 as proto
->>> register(proto.CLIENT_UID, lambda req: do_something(req))
-
-To signal an event, use leap.common.events.signal():
-
->>> from leap.common.events import signal
->>> from leap.common.events import events_pb2 as proto
->>> signal(proto.CLIENT_UID)
-
-
-NOTE ABOUT SYNC/ASYNC REQUESTS:
-
-Clients always communicate with the server, and never between themselves.
-When a client registers a callback for an event, the callback is saved locally
-in the client and the server stores the client socket port in a list
-associated with that event. When a client signals an event, the server
-forwards the signal to all registered client ports, and then each client
-executes its callbacks associated with that event locally.
-
-Each RPC call from a client to the server is followed by a response from the
-server to the client. Calls to register() and signal() (and all other RPC
-calls) can be synchronous or asynchronous meaning if they will or not wait for
-the server's response before returning.
-
-This mechanism was built on top of protobuf.socketrpc, and because of this RPC
-calls are made synchronous or asynchronous in the following way:
-
- * If RPC calls receive a parameter called `reqcbk`, then the call is made
- asynchronous. That means that:
+>>> from leap.common.events import catalog
+>>> register(catalog.CLIENT_UID, lambda sig, content: do_something(content))
- - an eventual `timeout` parameter is not used,
- - the call returns immediatelly with value None, and
- - the `reqcbk` callback is executed asynchronously upon the arrival of
- a response from the server.
+To emit an event, use leap.common.events.emit():
- * Otherwise, if the `reqcbk` parameter is None, then the call is made in a
- synchronous manner:
-
- - if a response from server arrives within `timeout` milliseconds, the
- RPC call returns it;
- - otherwise, the call returns None.
+>>> from leap.common.events import emit
+>>> from leap.common.events import catalog
+>>> emit(catalog.CLIENT_UID)
"""
+
import logging
-import socket
+import argparse
+
+from leap.common.events import client
+from leap.common.events import server
+from leap.common.events import catalog
-from leap.common.events import (
- events_pb2 as proto,
- server,
- client,
- daemon,
-)
+__all__ = [
+ "register",
+ "unregister",
+ "emit",
+ "catalog",
+]
logger = logging.getLogger(__name__)
-def register(signal, callback, uid=None, replace=False, reqcbk=None,
- timeout=1000):
+def register(event, callback, uid=None, replace=False):
"""
- Register a callback to be called when the given signal is received.
-
- Will timeout after timeout ms if response has not been received. The
- timeout arg is only used for asynch requests. If a reqcbk callback has
- been supplied the timeout arg is not used. The response value will be
- returned for a synch request but nothing will be returned for an asynch
- request.
-
- :param signal: the signal that causes the callback to be launched
- :type signal: int (see the `events.proto` file)
- :param callback: the callback to be called when the signal is received
- :type callback: function
- :param uid: a unique id for the callback
- :type uid: int
- :param replace: should an existent callback with same uid be replaced?
+ Register a callback to be executed when an event is received.
+
+ :param event: The event that triggers the callback.
+ :type event: str
+ :param callback: The callback to be executed.
+ :type callback: callable(event, content)
+ :param uid: The callback uid.
+ :type uid: str
+ :param replace: Wether an eventual callback with same ID should be
+ replaced.
:type replace: bool
- :param reqcbk: a callback to be called when a response from server is
- received
- :type reqcbk: function(leap.common.events.events_pb2.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
-
- :return: the response from server for synch calls or nothing for asynch
- calls.
- :rtype: leap.common.events.events_pb2.EventsResponse or None
- """
- return client.register(signal, callback, uid, replace, reqcbk, timeout)
+ :return: The callback uid.
+ :rtype: str
-def unregister(signal, uid=None, reqcbk=None, timeout=1000):
- """
- Unregister a callback.
-
- If C{uid} is specified, unregisters only the callback identified by that
- unique id. Otherwise, unregisters all callbacks registered for C{signal}.
-
- :param signal: the signal that causes the callback to be launched
- :type signal: int (see the `events.proto` file)
- :param uid: a unique id for the callback
- :type uid: int
- :param reqcbk: a callback to be called when a response from server is
- received
- :type reqcbk: function(proto.UnregisterRequest, proto.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
-
- :return: the response from server for synch calls or nothing for asynch
- calls.
- :rtype: leap.common.events.events_pb2.EventsResponse or None
+ :raises CallbackAlreadyRegistered: when there's already a callback
+ identified by the given uid and replace is False.
"""
- return client.unregister(signal, uid, reqcbk, timeout)
+ return client.register(event, callback, uid, replace)
-def signal(signal, content="", mac_method="", mac="", reqcbk=None,
- timeout=1000):
+def unregister(event, uid=None):
"""
- Send `signal` event to events server.
-
- Will timeout after timeout ms if response has not been received. The
- timeout arg is only used for asynch requests. If a reqcbk callback has
- been supplied the timeout arg is not used. The response value will be
- returned for a synch request but nothing will be returned for an asynch
- request.
-
- :param signal: the signal that causes the callback to be launched
- :type signal: int (see the `events.proto` file)
- :param content: the contents of the event signal
- :type content: str
- :param mac_method: the method used to auth mac
- :type mac_method: str
- :param mac: the content of the auth mac
- :type mac: str
- :param reqcbk: a callback to be called when a response from server is
- received
- :type reqcbk: function(proto.SignalRequest, proto.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
-
- :return: the response from server for synch calls or nothing for asynch
- calls.
- :rtype: leap.common.events.events_pb2.EventsResponse or None
- """
- return client.signal(signal, content, mac_method, mac, reqcbk, timeout)
+ Unregister callbacks for an event.
-def ping_client(port, reqcbk=None, timeout=1000):
- """
- Ping a client running in C{port}.
-
- :param port: the port in which the client should be listening
- :type port: int
- :param reqcbk: a callback to be called when a response from client is
- received
- :type reqcbk: function(proto.PingRequest, proto.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
+ If uid is not None, then only the callback identified by the given uid is
+ removed. Otherwise, all callbacks for the event are removed.
+
+ :param event: The event that triggers the callback.
+ :type event: Event
+ :param uid: The callback uid.
+ :type uid: str
"""
- return client.ping(port, reqcbk=reqcbk, timeout=timeout)
+ return client.unregister(event, uid)
-def ping_server(port=server.SERVER_PORT, reqcbk=None, timeout=1000):
+def emit(event, *content):
"""
- Ping the server.
-
- :param port: the port in which server should be listening
- :type port: int
- :param reqcbk: a callback to be called when a response from server is
- received
- :type reqcbk: function(proto.PingRequest, proto.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
+ Send an event.
+
+ :param event: The event to be sent.
+ :type event: Event
+ :param content: The content of the event.
+ :type content: list
"""
- return server.ping(port, reqcbk=reqcbk, timeout=timeout)
+ return client.emit(event, *content)
+
+
+if __name__ == "__main__":
+
+ def _echo(event, *content):
+ print "Received event: (%s, %s)" % (event, content)
+
+ def _parse_args():
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "--debug", "-d", action="store_true",
+ help="print debug information")
+
+ subparsers = parser.add_subparsers(dest="command")
+
+ # server options
+ server_parser = subparsers.add_parser(
+ "server", help="Run an events server.")
+ server_parser.add_argument(
+ "--emit-addr",
+ help="The address in which to listen for events",
+ default=server.EMIT_ADDR)
+ server_parser.add_argument(
+ "--reg-addr",
+ help="The address in which to listen for registration for events.",
+ default=server.REG_ADDR)
+
+ # client options
+ client_parser = subparsers.add_parser(
+ "client", help="Run an events client.")
+ client_parser.add_argument(
+ "--emit-addr",
+ help="The address in which to emit events.",
+ default=server.EMIT_ADDR)
+ client_parser.add_argument(
+ "--reg-addr",
+ help="The address in which to register for events.",
+ default=server.REG_ADDR)
+ group = client_parser.add_mutually_exclusive_group(required=True)
+ group.add_argument('--reg', help="register an event")
+ group.add_argument('--emit', help="send an event")
+ client_parser.add_argument(
+ '--content', help="the content of the event", default=None)
+
+ # txclient options
+ txclient_parser = subparsers.add_parser(
+ "txclient", help="Run an events twisted client.")
+ txclient_parser.add_argument(
+ "--emit-addr",
+ help="The address in which to emit events.",
+ default=server.EMIT_ADDR)
+ txclient_parser.add_argument(
+ "--reg-addr",
+ help="The address in which to register for events.",
+ default=server.REG_ADDR)
+ group = txclient_parser.add_mutually_exclusive_group(required=True)
+ group.add_argument('--reg', help="register an event")
+ group.add_argument('--emit', help="send an event")
+ txclient_parser.add_argument(
+ '--content', help="the content of the event", default=None)
+
+ return parser.parse_args()
+
+ args = _parse_args()
+
+ if args.debug:
+ logging.basicConfig(level=logging.DEBUG)
+
+ if args.command == "server":
+ # run server
+ server.ensure_server(emit_addr=args.emit_addr, reg_addr=args.reg_addr)
+ from twisted.internet import reactor
+ reactor.run()
+ elif args.command == "client":
+ if args.reg:
+ event = getattr(catalog, args.reg)
+ # run client and register to a signal
+ register(event, _echo)
+ # make sure we stop on CTRL+C
+ import signal
+ signal.signal(
+ signal.SIGINT, lambda sig, frame: client.shutdown())
+ # wait until client thread dies
+ import time
+ while client.EventsClientThread.instance().is_alive():
+ time.sleep(0.1)
+ if args.emit:
+ # run client and emit a signal
+ event = getattr(catalog, args.emit)
+ emit(event, args.content)
+ client.shutdown()
+ elif args.command == "txclient":
+ from leap.common.events import txclient
+ register = txclient.register
+ emit = txclient.emit
+ if args.reg:
+ event = getattr(catalog, args.reg)
+ # run client and register to a signal
+ register(event, _echo)
+ from twisted.internet import reactor
+ reactor.run()
+ if args.emit:
+ # run client and emit a signal
+ event = getattr(catalog, args.emit)
+ emit(event, args.content)