summaryrefslogtreecommitdiff
path: root/src/leap/common/events/component.py
diff options
context:
space:
mode:
authorTomás Touceda <chiiph@leap.se>2013-06-28 14:55:33 -0300
committerTomás Touceda <chiiph@leap.se>2013-06-28 14:55:33 -0300
commitd071efe20a285e579111f568e2ff994a6171249f (patch)
tree824ea959c02c8ed3af36c0cec8421e0748d311cc /src/leap/common/events/component.py
parentb966eec96fd3621908baab8697de8e2ed61ba136 (diff)
parenta0f78f9d708cc6fe686ee5860cdc34909391b14b (diff)
Merge branch 'release-0.2.5'
Diffstat (limited to 'src/leap/common/events/component.py')
-rw-r--r--src/leap/common/events/component.py290
1 files changed, 290 insertions, 0 deletions
diff --git a/src/leap/common/events/component.py b/src/leap/common/events/component.py
new file mode 100644
index 0000000..029d1ac
--- /dev/null
+++ b/src/leap/common/events/component.py
@@ -0,0 +1,290 @@
+# -*- coding: utf-8 -*-
+# component.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+The component end point of the events mechanism.
+
+Components are the communicating parties of the events mechanism. They
+communicate by sending messages to a server, which in turn redistributes
+messages to other components.
+
+When a component registers a callback for a given signal, it also tells the
+server that it wants to be notified whenever signals of that type are sent by
+some other component.
+"""
+
+
+import logging
+import threading
+
+
+from protobuf.socketrpc import RpcService
+from leap.common.events import (
+ events_pb2 as proto,
+ server,
+ daemon,
+ mac_auth,
+)
+
+
+logger = logging.getLogger(__name__)
+
+
+# the `registered_callbacks` dictionary below should have the following
+# format:
+#
+# { event_signal: [ (uid, callback), ... ], ... }
+#
+registered_callbacks = {}
+
+
+class CallbackAlreadyRegistered(Exception):
+ """
+ Raised when trying to register an already registered callback.
+ """
+ pass
+
+
+def ensure_component_daemon():
+ """
+ Ensure the component daemon is running and listening for incoming
+ messages.
+
+ :return: the daemon instance
+ :rtype: EventsComponentDaemon
+ """
+ import time
+ daemon = EventsComponentDaemon.ensure(0)
+ logger.debug('ensure component daemon')
+
+ # Because we use a random port we want to wait until a port is assigned to
+ # local component daemon.
+
+ while not (EventsComponentDaemon.get_instance() and
+ EventsComponentDaemon.get_instance().get_port()):
+ time.sleep(0.1)
+ return daemon
+
+
+def register(signal, callback, uid=None, replace=False, reqcbk=None,
+ timeout=1000):
+ """
+ Registers a callback to be called when a specific signal event is
+ received.
+
+ Will timeout after timeout ms if response has not been received. The
+ timeout arg is only used for asynch requests. If a reqcbk callback has
+ been supplied the timeout arg is not used. The response value will be
+ returned for a synch request but nothing will be returned for an asynch
+ request.
+
+ :param signal: the signal that causes the callback to be launched
+ :type signal: int (see the `events.proto` file)
+ :param callback: the callback to be called when the signal is received
+ :type callback: function
+ callback(leap.common.events.events_pb2.SignalRequest)
+ :param uid: a unique id for the callback
+ :type uid: int
+ :param replace: should an existent callback with same uid be replaced?
+ :type replace: bool
+ :param reqcbk: a callback to be called when a response from server is
+ received
+ :type reqcbk: function
+ callback(leap.common.events.events_pb2.EventResponse)
+ :param timeout: the timeout for synch calls
+ :type timeout: int
+
+ Might raise a CallbackAlreadyRegistered exception if there's already a
+ callback identified by the given uid and replace is False.
+
+ :return: the response from server for synch calls or nothing for asynch
+ calls.
+ :rtype: leap.common.events.events_pb2.EventsResponse or None
+ """
+ ensure_component_daemon() # so we can receive registered signals
+ # register callback locally
+ if signal not in registered_callbacks:
+ registered_callbacks[signal] = []
+ cbklist = registered_callbacks[signal]
+ if uid and filter(lambda (x, y): x == uid, cbklist):
+ if not replace:
+ raise CallbackAlreadyRegisteredException()
+ else:
+ registered_callbacks[signal] = filter(lambda(x, y): x != uid,
+ cbklist)
+ registered_callbacks[signal].append((uid, callback))
+ # register callback on server
+ request = proto.RegisterRequest()
+ request.event = signal
+ request.port = EventsComponentDaemon.get_instance().get_port()
+ request.mac_method = mac_auth.MacMethod.MAC_NONE
+ request.mac = ""
+ service = RpcService(proto.EventsServerService_Stub,
+ server.SERVER_PORT, 'localhost')
+ logger.info(
+ "Sending registration request to server on port %s: %s",
+ server.SERVER_PORT,
+ str(request)[:40])
+ return service.register(request, callback=reqcbk, timeout=timeout)
+
+def unregister(signal, uid=None, reqcbk=None, timeout=1000):
+ """
+ Unregister a callback.
+
+ If C{uid} is specified, unregisters only the callback identified by that
+ unique id. Otherwise, unregisters all callbacks
+
+ :param signal: the signal that causes the callback to be launched
+ :type signal: int (see the `events.proto` file)
+ :param uid: a unique id for the callback
+ :type uid: int
+ :param reqcbk: a callback to be called when a response from server is
+ received
+ :type reqcbk: function
+ callback(leap.common.events.events_pb2.EventResponse)
+ :param timeout: the timeout for synch calls
+ :type timeout: int
+
+ :return: the response from server for synch calls or nothing for asynch
+ calls or None if no callback is registered for that signal or uid.
+ :rtype: leap.common.events.events_pb2.EventsResponse or None
+ """
+ if signal not in registered_callbacks or not registered_callbacks[signal]:
+ logger.warning("No callback registered for signal %d." % signal)
+ return None
+ # unregister callback locally
+ cbklist = registered_callbacks[signal]
+ if uid is not None:
+ if filter(lambda (cbkuid, _): cbkuid == uid, cbklist) == []:
+ logger.warning("No callback registered for uid %d." % st)
+ return None
+ registered_callbacks[signal] = filter(lambda(x, y): x != uid, cbklist)
+ else:
+ # exclude all callbacks for given signal
+ registered_callbacks[signal] = []
+ # unregister port in server if there are no more callbacks for this signal
+ if not registered_callbacks[signal]:
+ request = proto.UnregisterRequest()
+ request.event = signal
+ request.port = EventsComponentDaemon.get_instance().get_port()
+ request.mac_method = mac_auth.MacMethod.MAC_NONE
+ request.mac = ""
+ service = RpcService(proto.EventsServerService_Stub,
+ server.SERVER_PORT, 'localhost')
+ logger.info(
+ "Sending unregistration request to server on port %s: %s",
+ server.SERVER_PORT,
+ str(request)[:40])
+ return service.unregister(request, callback=reqcbk, timeout=timeout)
+
+
+def signal(signal, content="", mac_method="", mac="", reqcbk=None,
+ timeout=1000):
+ """
+ Send `signal` event to events server.
+
+ Will timeout after timeout ms if response has not been received. The
+ timeout arg is only used for asynch requests. If a reqcbk callback has
+ been supplied the timeout arg is not used. The response value will be
+ returned for a synch request but nothing will be returned for an asynch
+ request.
+
+ :param signal: the signal that causes the callback to be launched
+ :type signal: int (see the `events.proto` file)
+ :param content: the contents of the event signal
+ :type content: str
+ :param mac_method: the method used for auth mac
+ :type mac_method: str
+ :param mac: the content of the auth mac
+ :type mac: str
+ :param reqcbk: a callback to be called when a response from server is
+ received
+ :type reqcbk: function
+ callback(leap.common.events.events_pb2.EventResponse)
+ :param timeout: the timeout for synch calls
+ :type timeout: int
+
+ :return: the response from server for synch calls or nothing for asynch
+ calls.
+ :rtype: leap.common.events.events_pb2.EventsResponse or None
+ """
+ request = proto.SignalRequest()
+ request.event = signal
+ request.content = content
+ request.mac_method = mac_method
+ request.mac = mac
+ service = RpcService(proto.EventsServerService_Stub, server.SERVER_PORT,
+ 'localhost')
+ logger.info("Sending signal to server: %s", str(request)[:40])
+ return service.signal(request, callback=reqcbk, timeout=timeout)
+
+
+class EventsComponentService(proto.EventsComponentService):
+ """
+ Service for receiving signal events in components.
+ """
+
+ def __init__(self):
+ proto.EventsComponentService.__init__(self)
+
+ def signal(self, controller, request, done):
+ """
+ Receive a signal and run callbacks registered for that signal.
+
+ This method is called whenever a signal request is received from
+ server.
+
+ :param controller: used to mediate a single method call
+ :type controller: protobuf.socketrpc.controller.SocketRpcController
+ :param request: the request received from the component
+ :type request: leap.common.events.events_pb2.SignalRequest
+ :param done: callback to be called when done
+ :type done: protobuf.socketrpc.server.Callback
+ """
+ logger.info('Received signal from server: %s...' % str(request)[:40])
+
+ # run registered callbacks
+ # TODO: verify authentication using mac in incoming message
+ if request.event in registered_callbacks:
+ for (_, cbk) in registered_callbacks[request.event]:
+ # callbacks should be prepared to receive a
+ # events_pb2.SignalRequest.
+ cbk(request)
+
+ # send response back to server
+ response = proto.EventResponse()
+ response.status = proto.EventResponse.OK
+ done.run(response)
+
+
+class EventsComponentDaemon(daemon.EventsSingletonDaemon):
+ """
+ A daemon that listens for incoming events from server.
+ """
+
+ @classmethod
+ def ensure(cls, port):
+ """
+ Make sure the daemon is running on the given port.
+
+ :param port: the port in which the daemon should listen
+ :type port: int
+
+ :return: a daemon instance
+ :rtype: EventsComponentDaemon
+ """
+ return cls.ensure_service(port, EventsComponentService())