Add possibility of unregistering in events mechanism.
[leap_pycommon.git] / src / leap / common / events / server.py
1 # -*- coding: utf-8 -*-
2 # server.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 """
18 A server for the events mechanism.
19
20 A server can receive different kinds of requests from components:
21
22   1. Registration request: store component port number to be notified when
23      a specific signal arrives.
24
25   2. Signal request: redistribute the signal to registered components.
26 """
27 import logging
28 import socket
29
30
31 from protobuf.socketrpc import RpcService
32 from leap.common.events import (
33     events_pb2 as proto,
34     daemon,
35 )
36
37
38 logger = logging.getLogger(__name__)
39
40
41 SERVER_PORT = 8090
42
43 # the `registered_components` dictionary below should have the following
44 # format:
45 #
46 #     { event_signal: [ port, ... ], ... }
47 #
48 registered_components = {}
49
50
51 def ensure_server(port=SERVER_PORT):
52     """
53     Make sure the server is running on the given port.
54
55     Attempt to connect to given local port. Upon success, assume that the
56     events server has already been started. Upon failure, start events server.
57
58     :param port: the port in which server should be listening
59     :type port: int
60
61     :return: the daemon instance or nothing
62     :rtype: EventsServerDaemon or None
63     """
64     try:
65         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
66         s.connect(('localhost', port))
67         s.close()
68         logger.info('Server is already running on port %d.', port)
69         return None
70     except socket.error:
71         logger.info('Launching server on port %d.', port)
72         return EventsServerDaemon.ensure(port)
73
74
75 class EventsServerService(proto.EventsServerService):
76     """
77     Service for receiving events in components.
78     """
79
80     def register(self, controller, request, done):
81         """
82         Register a component port to be signaled when specific events come in.
83
84         :param controller: used to mediate a single method call
85         :type controller: protobuf.socketrpc.controller.SocketRpcController
86         :param request: the request received from the component
87         :type request: leap.common.events.events_pb2.RegisterRequest
88         :param done: callback to be called when done
89         :type done: protobuf.socketrpc.server.Callback
90         """
91         logger.info("Received registration request: %s..." % str(request)[:40])
92         # add component port to signal list
93         if request.event not in registered_components:
94             registered_components[request.event] = set([])
95         registered_components[request.event].add(request.port)
96         # send response back to component
97
98         logger.debug('sending response back')
99         response = proto.EventResponse()
100         response.status = proto.EventResponse.OK
101         done.run(response)
102
103     def unregister(self, controller, request, done):
104         """
105         Unregister a component port so it will not be signaled when specific
106         events come in.
107
108         :param controller: used to mediate a single method call
109         :type controller: protobuf.socketrpc.controller.SocketRpcController
110         :param request: the request received from the component
111         :type request: leap.common.events.events_pb2.RegisterRequest
112         :param done: callback to be called when done
113         :type done: protobuf.socketrpc.server.Callback
114         """
115         logger.info(
116             "Received unregistration request: %s..." % str(request)[:40])
117         # remove component port from signal list
118         response = proto.EventResponse()
119         if request.event in registered_components:
120             try:
121                 registered_components[request.event].remove(request.port)
122                 response.status = proto.EventResponse.OK
123             except KeyError:
124                 response.status = proto.EventsResponse.ERROR
125                 response.result = 'Port %d not registered.' % request.port
126         # send response back to component
127         logger.debug('sending response back')
128         done.run(response)
129
130     def signal(self, controller, request, done):
131         """
132         Perform an RPC call to signal all components registered to receive a
133         specific signal.
134
135         :param controller: used to mediate a single method call
136         :type controller: protobuf.socketrpc.controller.SocketRpcController
137         :param request: the request received from the component
138         :type request: leap.common.events.events_pb2.SignalRequest
139         :param done: callback to be called when done
140         :type done: protobuf.socketrpc.server.Callback
141         """
142         logger.info('Received signal from component: %s...', str(request)[:40])
143         # send signal to all registered components
144         # TODO: verify signal auth
145         if request.event in registered_components:
146             for port in registered_components[request.event]:
147
148                 def callback(req, resp):
149                     logger.info("Signal received by " + str(port))
150
151                 service = RpcService(proto.EventsComponentService_Stub,
152                                      port, 'localhost')
153                 service.signal(request, callback=callback)
154         # send response back to component
155         response = proto.EventResponse()
156         response.status = proto.EventResponse.OK
157         done.run(response)
158
159
160 class EventsServerDaemon(daemon.EventsSingletonDaemon):
161     """
162     Singleton class for starting an events server daemon.
163     """
164
165     @classmethod
166     def ensure(cls, port):
167         """
168         Make sure the daemon is running on the given port.
169
170         :param port: the port in which the daemon should listen
171         :type port: int
172
173         :return: a daemon instance
174         :rtype: EventsServerDaemon
175         """
176         return cls.ensure_service(port, EventsServerService())