Fix events exception raising when ensuring server. Closes #3515.
[leap_pycommon.git] / src / leap / common / events / server.py
index 5cc1add..41aede3 100644 (file)
 #
 # You should have received a copy of the GNU General Public License
 # along with this program. If not, see <http://www.gnu.org/licenses/>.
-
 """
 A server for the events mechanism.
 
-A server can receive different kinds of requests from components:
+A server can receive different kinds of requests from clients:
 
-  1. Registration request: store component port number to be notified when
+  1. Registration request: store client port number to be notified when
      a specific signal arrives.
 
-  2. Signal request: redistribute the signal to registered components.
+  2. Signal request: redistribute the signal to registered clients.
 """
-
-
 import logging
-import sets
+import socket
 
 
 from protobuf.socketrpc import RpcService
@@ -43,12 +40,19 @@ logger = logging.getLogger(__name__)
 
 SERVER_PORT = 8090
 
-# the `registered_components` dictionary below should have the following
+# the `registered_clients` dictionary below should have the following
 # format:
 #
 #     { event_signal: [ port, ... ], ... }
 #
-registered_components = {}
+registered_clients = {}
+
+
+class PortAlreadyTaken(Exception):
+    """
+    Raised when trying to open a server in a port that is already taken.
+    """
+    pass
 
 
 def ensure_server(port=SERVER_PORT):
@@ -58,74 +62,156 @@ def ensure_server(port=SERVER_PORT):
     Attempt to connect to given local port. Upon success, assume that the
     events server has already been started. Upon failure, start events server.
 
-    @param port: the port in which server should be listening
-    @type port: int
+    :param port: the port in which server should be listening
+    :type port: int
+
+    :return: the daemon instance or nothing
+    :rtype: EventsServerDaemon or None
 
-    @return: the daemon instance or nothing
-    @rtype: EventsServerDaemon or None
+    :raise PortAlreadyTaken: Raised if C{port} is already taken by something
+                             that is not an events server.
     """
     try:
+        # check if port is available
         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         s.connect(('localhost', port))
         s.close()
-        logger.info('Server is already running on port %d.', port)
-        return None
+        # port is taken, check if there's a server running there
+        response = ping(port=port, timeout=1000)
+        if response is not None and response.status == proto.EventResponse.OK:
+            logger.info('A server is already running on port %d.', port)
+            return
+        # port is taken, and not by an events server
+        logger.warning(
+            'Port %d is taken by something not an events server.', port)
+        raise PortAlreadyTaken(port)
     except socket.error:
+        # port is available, run a server
         logger.info('Launching server on port %d.', port)
         return EventsServerDaemon.ensure(port)
 
 
+def ping(port=SERVER_PORT, reqcbk=None, timeout=1000):
+    """
+    Ping the server.
+
+    :param port: the port in which server should be listening
+    :type port: int
+    :param reqcbk: a callback to be called when a response from server is
+                   received
+    :type reqcbk: function(proto.PingRequest, proto.EventResponse)
+    :param timeout: the timeout for synch calls
+    :type timeout: int
+
+    :return: the response from server for synch calls or nothing for asynch
+             calls.
+    :rtype: leap.common.events.events_pb2.EventsResponse or None
+    """
+    request = proto.PingRequest()
+    service = RpcService(
+        proto.EventsServerService_Stub,
+        port,
+        'localhost')
+    logger.debug("Pinging server in port %d..." % port)
+    return service.ping(request, callback=reqcbk, timeout=timeout)
+
+
 class EventsServerService(proto.EventsServerService):
     """
-    Service for receiving events in components.
+    Service for receiving events in clients.
     """
 
     def register(self, controller, request, done):
         """
-        Register a component port to be signaled when specific events come in.
-
-        @param controller: used to mediate a single method call
-        @type controller: protobuf.socketrpc.controller.SocketRpcController
-        @param request: the request received from the component
-        @type request: leap.common.events.events_pb2.RegisterRequest
-        @param done: callback to be called when done
-        @type done: protobuf.socketrpc.server.Callback
+        Register a client port to be signaled when specific events come in.
+
+        :param controller: used to mediate a single method call
+        :type controller: protobuf.socketrpc.controller.SocketRpcController
+        :param request: the request received from the client
+        :type request: leap.common.events.events_pb2.RegisterRequest
+        :param done: callback to be called when done
+        :type done: protobuf.socketrpc.server.Callback
         """
-        logger.info("Received registration request: %s" % str(request))
-        # add component port to signal list
-        if request.event not in registered_components:
-            registered_components[request.event] = sets.Set()
-        registered_components[request.event].add(request.port)
-        # send response back to component
+        logger.info("Received registration request: %s..." % str(request)[:40])
+        # add client port to signal list
+        if request.event not in registered_clients:
+            registered_clients[request.event] = set([])
+        registered_clients[request.event].add(request.port)
+        # send response back to client
+
+        logger.debug('sending response back')
         response = proto.EventResponse()
         response.status = proto.EventResponse.OK
         done.run(response)
 
+    def unregister(self, controller, request, done):
+        """
+        Unregister a client port so it will not be signaled when specific
+        events come in.
+
+        :param controller: used to mediate a single method call
+        :type controller: protobuf.socketrpc.controller.SocketRpcController
+        :param request: the request received from the client
+        :type request: leap.common.events.events_pb2.RegisterRequest
+        :param done: callback to be called when done
+        :type done: protobuf.socketrpc.server.Callback
+        """
+        logger.info(
+            "Received unregistration request: %s..." % str(request)[:40])
+        # remove client port from signal list
+        response = proto.EventResponse()
+        if request.event in registered_clients:
+            try:
+                registered_clients[request.event].remove(request.port)
+                response.status = proto.EventResponse.OK
+            except KeyError:
+                response.status = proto.EventsResponse.ERROR
+                response.result = 'Port %d not registered.' % request.port
+        # send response back to client
+        logger.debug('sending response back')
+        done.run(response)
+
     def signal(self, controller, request, done):
         """
-        Perform an RPC call to signal all components registered to receive a
+        Perform an RPC call to signal all clients registered to receive a
         specific signal.
 
-        @param controller: used to mediate a single method call
-        @type controller: protobuf.socketrpc.controller.SocketRpcController
-        @param request: the request received from the component
-        @type request: leap.common.events.events_pb2.SignalRequest
-        @param done: callback to be called when done
-        @type done: protobuf.socketrpc.server.Callback
+        :param controller: used to mediate a single method call
+        :type controller: protobuf.socketrpc.controller.SocketRpcController
+        :param request: the request received from the client
+        :type request: leap.common.events.events_pb2.SignalRequest
+        :param done: callback to be called when done
+        :type done: protobuf.socketrpc.server.Callback
         """
-        logger.info('Received signal from component: %s', str(request))
-        # send signal to all registered components
+        logger.debug('Received signal from client: %s...', str(request)[:40])
+        # send signal to all registered clients
         # TODO: verify signal auth
-        if request.event in registered_components:
-            for port in registered_components[request.event]:
+        if request.event in registered_clients:
+            for port in registered_clients[request.event]:
 
                 def callback(req, resp):
-                    logger.info("Signal received by " + str(port))
+                    logger.debug("Signal received by " + str(port))
 
-                service = RpcService(proto.EventsComponentService_Stub,
+                service = RpcService(proto.EventsClientService_Stub,
                                      port, 'localhost')
                 service.signal(request, callback=callback)
-        # send response back to component
+        # send response back to client
+        response = proto.EventResponse()
+        response.status = proto.EventResponse.OK
+        done.run(response)
+
+    def ping(self, controller, request, done):
+        """
+        Reply to a ping request.
+
+        :param controller: used to mediate a single method call
+        :type controller: protobuf.socketrpc.controller.SocketRpcController
+        :param request: the request received from the client
+        :type request: leap.common.events.events_pb2.RegisterRequest
+        :param done: callback to be called when done
+        :type done: protobuf.socketrpc.server.Callback
+        """
+        logger.debug("Received ping request, sending response.")
         response = proto.EventResponse()
         response.status = proto.EventResponse.OK
         done.run(response)
@@ -141,10 +227,10 @@ class EventsServerDaemon(daemon.EventsSingletonDaemon):
         """
         Make sure the daemon is running on the given port.
 
-        @param port: the port in which the daemon should listen
-        @type port: int
+        :param port: the port in which the daemon should listen
+        :type port: int
 
-        @return: a daemon instance
-        @rtype: EventsServerDaemon
+        :return: a daemon instance
+        :rtype: EventsServerDaemon
         """
         return cls.ensure_service(port, EventsServerService())