diff options
| -rw-r--r-- | src/leap/common/events/daemon.py | 208 | ||||
| -rw-r--r-- | src/leap/common/events/server.py | 150 | 
2 files changed, 358 insertions, 0 deletions
| diff --git a/src/leap/common/events/daemon.py b/src/leap/common/events/daemon.py new file mode 100644 index 0000000..40242c9 --- /dev/null +++ b/src/leap/common/events/daemon.py @@ -0,0 +1,208 @@ +# -*- coding: utf-8 -*- +# daemon.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +A singleton daemon for running RPC services using protobuf.socketrpc. +""" + + +import logging +import threading + + +from protobuf.socketrpc.server import ( +    SocketRpcServer, +    ThreadedTCPServer, +    SocketHandler, +) + + +logger = logging.getLogger(__name__) + + +class ServiceAlreadyRunningException(Exception): +    """ +    Raised whenever a service is already running in this process but someone +    attemped to start it in a different port. +    """ + + +class EventsRpcServer(SocketRpcServer): +    """ +    RPC server used in server and component interfaces to receive messages. +    """ + +    def __init__(self, port, host='localhost'): +        """ +        Initialize a RPC server. + +        @param port: the port in which to listen for incoming messages +        @type port: int +        @param host: the address to bind to +        @type host: str +        """ +        SocketRpcServer.__init__(self, port, host) +        self._server = None + +    def run(self): +        """ +        Run the server. +        """ +        logger.info('Running server on port %d.' % self.port) +        # parent implementation does not hold the server instance, so we do it +        # here. +        self._server = ThreadedTCPServer((self.host, self.port), +                                        SocketHandler, self) +        # if we chose to use a random port, fetch the port number info. +        if self.port is 0: +            self.port = self._server.socket.getsockname()[1] +        self._server.serve_forever() + +    def stop(self): +        """ +        Stop the server. +        """ +        self._server.shutdown() + + +class EventsSingletonDaemon(threading.Thread): +    """ +    Singleton class for for launching and terminating a daemon. + +    This class is used so every part of the mechanism that needs to listen for +    messages can launch its own daemon (thread) to do the job. +    """ + +    # Singleton instance +    __instance = None + +    def __new__(cls, *args, **kwargs): +        """ +        Return a singleton instance if it exists or create and initialize one. +        """ +        if len(args) is not 2: +            raise TypeError("__init__() takes exactly 2 arguments (%d given)" +                            % len(args)) +        if cls.__instance is None: +            cls.__instance = object.__new__( +                EventsSingletonDaemon, *args, **kwargs) +            cls.__initialize(cls.__instance, args[0], args[1]) +        return cls.__instance + +    @staticmethod +    def __initialize(self, port, service): +        """ +        Initialize a singleton daemon. + +        This is a static method disguised as instance method that actually +        does the initialization of the daemon instance. + +        @param port: the port in which to listen for incoming messages +        @type port: int +        @param service: the service to provide in this daemon +        @type service: google.protobuf.service.Service +        """ +        threading.Thread.__init__(self) +        self._port = port +        self._service = service +        self._server = EventsRpcServer(self._port) +        self._server.registerService(self._service) +        self.daemon = True + +    def __init__(self): +        """ +        Singleton placeholder initialization method. + +        Initialization is made in __new__ so we can always return the same +        instance upon object creation. +        """ +        pass + +    @classmethod +    def ensure(cls, port): +        """ +        Make sure the daemon instance is running. + +        Each implementation of this method should call `self.ensure_service` +        with the appropriate service from the `events.proto` definitions, and +        return the daemon instance. + +        @param port: the port in which the daemon should be listening +        @type port: int + +        @return: a daemon instance +        @rtype: EventsSingletonDaemon +        """ +        raise NotImplementedError(self.ensure) + +    @classmethod +    def ensure_service(cls, port, service): +        """ +        Start the singleton instance if not already running. + +        Might return ServiceAlreadyRunningException + +        @param port: the port in which the daemon should be listening +        @type port: int + +        @return: a daemon instance +        @rtype: EventsSingletonDaemon +        """ +        daemon = cls(port, service) +        if not daemon.is_alive(): +            daemon.start() +        elif port and port != cls.__instance._port: +            # service is running in this process but someone is trying to +            # start it in another port +            raise ServiceAlreadyRunningException( +                "Service is already running in this process on port %d." +                % self.__instance._port) +        return daemon + +    @classmethod +    def get_instance(cls): +        """ +        Retrieve singleton instance of this daemon. + +        @return: a daemon instance +        @rtype: EventsSingletonDaemon +        """ +        return cls.__instance + +    def run(self): +        """ +        Run the server. +        """ +        self._server.run() + +    def stop(self): +        """ +        Stop the daemon. +        """ +        self._server.stop() + +    def get_port(self): +        """ +        Retrieve the value of the port to which the service running in this +        daemon is binded to. + +        @return: the port to which the daemon is binded to +        @rtype: int +        """ +        if self._port is 0: +            self._port = self._server.port +        return self._port diff --git a/src/leap/common/events/server.py b/src/leap/common/events/server.py new file mode 100644 index 0000000..5cc1add --- /dev/null +++ b/src/leap/common/events/server.py @@ -0,0 +1,150 @@ +# -*- coding: utf-8 -*- +# server.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +A server for the events mechanism. + +A server can receive different kinds of requests from components: + +  1. Registration request: store component port number to be notified when +     a specific signal arrives. + +  2. Signal request: redistribute the signal to registered components. +""" + + +import logging +import sets + + +from protobuf.socketrpc import RpcService +from leap.common.events import ( +    events_pb2 as proto, +    daemon, +) + + +logger = logging.getLogger(__name__) + + +SERVER_PORT = 8090 + +# the `registered_components` dictionary below should have the following +# format: +# +#     { event_signal: [ port, ... ], ... } +# +registered_components = {} + + +def ensure_server(port=SERVER_PORT): +    """ +    Make sure the server is running on the given port. + +    Attempt to connect to given local port. Upon success, assume that the +    events server has already been started. Upon failure, start events server. + +    @param port: the port in which server should be listening +    @type port: int + +    @return: the daemon instance or nothing +    @rtype: EventsServerDaemon or None +    """ +    try: +        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +        s.connect(('localhost', port)) +        s.close() +        logger.info('Server is already running on port %d.', port) +        return None +    except socket.error: +        logger.info('Launching server on port %d.', port) +        return EventsServerDaemon.ensure(port) + + +class EventsServerService(proto.EventsServerService): +    """ +    Service for receiving events in components. +    """ + +    def register(self, controller, request, done): +        """ +        Register a component port to be signaled when specific events come in. + +        @param controller: used to mediate a single method call +        @type controller: protobuf.socketrpc.controller.SocketRpcController +        @param request: the request received from the component +        @type request: leap.common.events.events_pb2.RegisterRequest +        @param done: callback to be called when done +        @type done: protobuf.socketrpc.server.Callback +        """ +        logger.info("Received registration request: %s" % str(request)) +        # add component port to signal list +        if request.event not in registered_components: +            registered_components[request.event] = sets.Set() +        registered_components[request.event].add(request.port) +        # send response back to component +        response = proto.EventResponse() +        response.status = proto.EventResponse.OK +        done.run(response) + +    def signal(self, controller, request, done): +        """ +        Perform an RPC call to signal all components registered to receive a +        specific signal. + +        @param controller: used to mediate a single method call +        @type controller: protobuf.socketrpc.controller.SocketRpcController +        @param request: the request received from the component +        @type request: leap.common.events.events_pb2.SignalRequest +        @param done: callback to be called when done +        @type done: protobuf.socketrpc.server.Callback +        """ +        logger.info('Received signal from component: %s', str(request)) +        # send signal to all registered components +        # TODO: verify signal auth +        if request.event in registered_components: +            for port in registered_components[request.event]: + +                def callback(req, resp): +                    logger.info("Signal received by " + str(port)) + +                service = RpcService(proto.EventsComponentService_Stub, +                                     port, 'localhost') +                service.signal(request, callback=callback) +        # send response back to component +        response = proto.EventResponse() +        response.status = proto.EventResponse.OK +        done.run(response) + + +class EventsServerDaemon(daemon.EventsSingletonDaemon): +    """ +    Singleton class for starting an events server daemon. +    """ + +    @classmethod +    def ensure(cls, port): +        """ +        Make sure the daemon is running on the given port. + +        @param port: the port in which the daemon should listen +        @type port: int + +        @return: a daemon instance +        @rtype: EventsServerDaemon +        """ +        return cls.ensure_service(port, EventsServerService()) | 
