Fix events exception raising when ensuring server. Closes #3515.
[leap_pycommon.git] / src / leap / common / events / server.py
1 # -*- coding: utf-8 -*-
2 # server.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 """
18 A server for the events mechanism.
19
20 A server can receive different kinds of requests from clients:
21
22   1. Registration request: store client port number to be notified when
23      a specific signal arrives.
24
25   2. Signal request: redistribute the signal to registered clients.
26 """
27 import logging
28 import socket
29
30
31 from protobuf.socketrpc import RpcService
32 from leap.common.events import (
33     events_pb2 as proto,
34     daemon,
35 )
36
37
38 logger = logging.getLogger(__name__)
39
40
41 SERVER_PORT = 8090
42
43 # the `registered_clients` dictionary below should have the following
44 # format:
45 #
46 #     { event_signal: [ port, ... ], ... }
47 #
48 registered_clients = {}
49
50
51 class PortAlreadyTaken(Exception):
52     """
53     Raised when trying to open a server in a port that is already taken.
54     """
55     pass
56
57
58 def ensure_server(port=SERVER_PORT):
59     """
60     Make sure the server is running on the given port.
61
62     Attempt to connect to given local port. Upon success, assume that the
63     events server has already been started. Upon failure, start events server.
64
65     :param port: the port in which server should be listening
66     :type port: int
67
68     :return: the daemon instance or nothing
69     :rtype: EventsServerDaemon or None
70
71     :raise PortAlreadyTaken: Raised if C{port} is already taken by something
72                              that is not an events server.
73     """
74     try:
75         # check if port is available
76         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
77         s.connect(('localhost', port))
78         s.close()
79         # port is taken, check if there's a server running there
80         response = ping(port=port, timeout=1000)
81         if response is not None and response.status == proto.EventResponse.OK:
82             logger.info('A server is already running on port %d.', port)
83             return
84         # port is taken, and not by an events server
85         logger.warning(
86             'Port %d is taken by something not an events server.', port)
87         raise PortAlreadyTaken(port)
88     except socket.error:
89         # port is available, run a server
90         logger.info('Launching server on port %d.', port)
91         return EventsServerDaemon.ensure(port)
92
93
94 def ping(port=SERVER_PORT, reqcbk=None, timeout=1000):
95     """
96     Ping the server.
97
98     :param port: the port in which server should be listening
99     :type port: int
100     :param reqcbk: a callback to be called when a response from server is
101                    received
102     :type reqcbk: function(proto.PingRequest, proto.EventResponse)
103     :param timeout: the timeout for synch calls
104     :type timeout: int
105
106     :return: the response from server for synch calls or nothing for asynch
107              calls.
108     :rtype: leap.common.events.events_pb2.EventsResponse or None
109     """
110     request = proto.PingRequest()
111     service = RpcService(
112         proto.EventsServerService_Stub,
113         port,
114         'localhost')
115     logger.debug("Pinging server in port %d..." % port)
116     return service.ping(request, callback=reqcbk, timeout=timeout)
117
118
119 class EventsServerService(proto.EventsServerService):
120     """
121     Service for receiving events in clients.
122     """
123
124     def register(self, controller, request, done):
125         """
126         Register a client port to be signaled when specific events come in.
127
128         :param controller: used to mediate a single method call
129         :type controller: protobuf.socketrpc.controller.SocketRpcController
130         :param request: the request received from the client
131         :type request: leap.common.events.events_pb2.RegisterRequest
132         :param done: callback to be called when done
133         :type done: protobuf.socketrpc.server.Callback
134         """
135         logger.info("Received registration request: %s..." % str(request)[:40])
136         # add client port to signal list
137         if request.event not in registered_clients:
138             registered_clients[request.event] = set([])
139         registered_clients[request.event].add(request.port)
140         # send response back to client
141
142         logger.debug('sending response back')
143         response = proto.EventResponse()
144         response.status = proto.EventResponse.OK
145         done.run(response)
146
147     def unregister(self, controller, request, done):
148         """
149         Unregister a client port so it will not be signaled when specific
150         events come in.
151
152         :param controller: used to mediate a single method call
153         :type controller: protobuf.socketrpc.controller.SocketRpcController
154         :param request: the request received from the client
155         :type request: leap.common.events.events_pb2.RegisterRequest
156         :param done: callback to be called when done
157         :type done: protobuf.socketrpc.server.Callback
158         """
159         logger.info(
160             "Received unregistration request: %s..." % str(request)[:40])
161         # remove client port from signal list
162         response = proto.EventResponse()
163         if request.event in registered_clients:
164             try:
165                 registered_clients[request.event].remove(request.port)
166                 response.status = proto.EventResponse.OK
167             except KeyError:
168                 response.status = proto.EventsResponse.ERROR
169                 response.result = 'Port %d not registered.' % request.port
170         # send response back to client
171         logger.debug('sending response back')
172         done.run(response)
173
174     def signal(self, controller, request, done):
175         """
176         Perform an RPC call to signal all clients registered to receive a
177         specific signal.
178
179         :param controller: used to mediate a single method call
180         :type controller: protobuf.socketrpc.controller.SocketRpcController
181         :param request: the request received from the client
182         :type request: leap.common.events.events_pb2.SignalRequest
183         :param done: callback to be called when done
184         :type done: protobuf.socketrpc.server.Callback
185         """
186         logger.debug('Received signal from client: %s...', str(request)[:40])
187         # send signal to all registered clients
188         # TODO: verify signal auth
189         if request.event in registered_clients:
190             for port in registered_clients[request.event]:
191
192                 def callback(req, resp):
193                     logger.debug("Signal received by " + str(port))
194
195                 service = RpcService(proto.EventsClientService_Stub,
196                                      port, 'localhost')
197                 service.signal(request, callback=callback)
198         # send response back to client
199         response = proto.EventResponse()
200         response.status = proto.EventResponse.OK
201         done.run(response)
202
203     def ping(self, controller, request, done):
204         """
205         Reply to a ping request.
206
207         :param controller: used to mediate a single method call
208         :type controller: protobuf.socketrpc.controller.SocketRpcController
209         :param request: the request received from the client
210         :type request: leap.common.events.events_pb2.RegisterRequest
211         :param done: callback to be called when done
212         :type done: protobuf.socketrpc.server.Callback
213         """
214         logger.debug("Received ping request, sending response.")
215         response = proto.EventResponse()
216         response.status = proto.EventResponse.OK
217         done.run(response)
218
219
220 class EventsServerDaemon(daemon.EventsSingletonDaemon):
221     """
222     Singleton class for starting an events server daemon.
223     """
224
225     @classmethod
226     def ensure(cls, port):
227         """
228         Make sure the daemon is running on the given port.
229
230         :param port: the port in which the daemon should listen
231         :type port: int
232
233         :return: a daemon instance
234         :rtype: EventsServerDaemon
235         """
236         return cls.ensure_service(port, EventsServerService())