Merge remote-tracking branch 'kali/bug/renew-testing-certificates' into develop
[leap_pycommon.git] / src / leap / common / events / server.py
1 # -*- coding: utf-8 -*-
2 # server.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 """
18 A server for the events mechanism.
19
20 A server can receive different kinds of requests from clients:
21
22   1. Registration request: store client port number to be notified when
23      a specific signal arrives.
24
25   2. Signal request: redistribute the signal to registered clients.
26 """
27 import logging
28 import socket
29
30
31 from protobuf.socketrpc import RpcService
32 from leap.common.events import (
33     events_pb2 as proto,
34     daemon,
35 )
36
37
38 logger = logging.getLogger(__name__)
39
40
41 SERVER_PORT = 8090
42
43 # the `registered_clients` dictionary below should have the following
44 # format:
45 #
46 #     { event_signal: [ port, ... ], ... }
47 #
48 registered_clients = {}
49
50
51 class PortAlreadyTaken(Exception):
52     """
53     Raised when trying to open a server in a port that is already taken.
54     """
55     pass
56
57
58 def ensure_server(port=SERVER_PORT):
59     """
60     Make sure the server is running on the given port.
61
62     Attempt to connect to given local port. Upon success, assume that the
63     events server has already been started. Upon failure, start events server.
64
65     :param port: the port in which server should be listening
66     :type port: int
67
68     :return: the daemon instance or nothing
69     :rtype: EventsServerDaemon or None
70
71     :raise PortAlreadyTaken: Raised if C{port} is already taken by something
72                              that is not an events server.
73     """
74     try:
75         # check if port is available
76         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
77         s.connect(('localhost', port))
78         s.close()
79         # port is taken, check if there's a server running there
80         ping(port,
81              reqcbk=lambda req, res: process_ping(port, req, res),
82              timeout=10)
83     except socket.error:
84         # port is available, run a server
85         logger.info('Launching server on port %d.', port)
86         return EventsServerDaemon.ensure(port)
87
88
89 def process_ping(port, request, response):
90     """
91     Response callback for the ping event.
92
93     :param port: Port that is trying to be used
94     :type port: int
95     :param request: Ping request made
96     :type request: proto.PingRequest
97     :param response: Response from the event
98     :type response: proto.EventResponse
99     """
100     if response is not None and response.status == proto.EventResponse.OK:
101         logger.info('A server is already running on port %d.', port)
102         return
103     # port is taken, and not by an events server
104     logger.warning('Port %d is taken by something not an events server.', port)
105     raise PortAlreadyTaken(port)
106
107
108 def ping(port=SERVER_PORT, reqcbk=None, timeout=1000):
109     """
110     Ping the server.
111
112     :param port: the port in which server should be listening
113     :type port: int
114     :param reqcbk: a callback to be called when a response from server is
115                    received
116     :type reqcbk: function(proto.PingRequest, proto.EventResponse)
117     :param timeout: the timeout for synch calls
118     :type timeout: int
119
120     :return: the response from server for synch calls or nothing for asynch
121              calls.
122     :rtype: leap.common.events.events_pb2.EventsResponse or None
123     """
124     request = proto.PingRequest()
125     service = RpcService(
126         proto.EventsServerService_Stub,
127         port,
128         'localhost')
129     logger.debug("Pinging server in port %d..." % port)
130     return service.ping(request, callback=reqcbk, timeout=timeout)
131
132
133 class EventsServerService(proto.EventsServerService):
134     """
135     Service for receiving events in clients.
136     """
137
138     def register(self, controller, request, done):
139         """
140         Register a client port to be signaled when specific events come in.
141
142         :param controller: used to mediate a single method call
143         :type controller: protobuf.socketrpc.controller.SocketRpcController
144         :param request: the request received from the client
145         :type request: leap.common.events.events_pb2.RegisterRequest
146         :param done: callback to be called when done
147         :type done: protobuf.socketrpc.server.Callback
148         """
149         logger.info("Received registration request: %s..." % str(request)[:40])
150         # add client port to signal list
151         if request.event not in registered_clients:
152             registered_clients[request.event] = set([])
153         registered_clients[request.event].add(request.port)
154         # send response back to client
155
156         logger.debug('sending response back')
157         response = proto.EventResponse()
158         response.status = proto.EventResponse.OK
159         done.run(response)
160
161     def unregister(self, controller, request, done):
162         """
163         Unregister a client port so it will not be signaled when specific
164         events come in.
165
166         :param controller: used to mediate a single method call
167         :type controller: protobuf.socketrpc.controller.SocketRpcController
168         :param request: the request received from the client
169         :type request: leap.common.events.events_pb2.RegisterRequest
170         :param done: callback to be called when done
171         :type done: protobuf.socketrpc.server.Callback
172         """
173         logger.info(
174             "Received unregistration request: %s..." % str(request)[:40])
175         # remove client port from signal list
176         response = proto.EventResponse()
177         if request.event in registered_clients:
178             try:
179                 registered_clients[request.event].remove(request.port)
180                 response.status = proto.EventResponse.OK
181             except KeyError:
182                 response.status = proto.EventsResponse.ERROR
183                 response.result = 'Port %d not registered.' % request.port
184         # send response back to client
185         logger.debug('sending response back')
186         done.run(response)
187
188     def signal(self, controller, request, done):
189         """
190         Perform an RPC call to signal all clients registered to receive a
191         specific signal.
192
193         :param controller: used to mediate a single method call
194         :type controller: protobuf.socketrpc.controller.SocketRpcController
195         :param request: the request received from the client
196         :type request: leap.common.events.events_pb2.SignalRequest
197         :param done: callback to be called when done
198         :type done: protobuf.socketrpc.server.Callback
199         """
200         logger.debug('Received signal from client: %s...', str(request)[:40])
201         # send signal to all registered clients
202         # TODO: verify signal auth
203         if request.event in registered_clients:
204             for port in registered_clients[request.event]:
205
206                 def callback(req, resp):
207                     logger.debug("Signal received by " + str(port))
208
209                 service = RpcService(proto.EventsClientService_Stub,
210                                      port, 'localhost')
211                 service.signal(request, callback=callback)
212         # send response back to client
213         response = proto.EventResponse()
214         response.status = proto.EventResponse.OK
215         done.run(response)
216
217     def ping(self, controller, request, done):
218         """
219         Reply to a ping request.
220
221         :param controller: used to mediate a single method call
222         :type controller: protobuf.socketrpc.controller.SocketRpcController
223         :param request: the request received from the client
224         :type request: leap.common.events.events_pb2.RegisterRequest
225         :param done: callback to be called when done
226         :type done: protobuf.socketrpc.server.Callback
227         """
228         logger.debug("Received ping request, sending response.")
229         response = proto.EventResponse()
230         response.status = proto.EventResponse.OK
231         done.run(response)
232
233
234 class EventsServerDaemon(daemon.EventsSingletonDaemon):
235     """
236     Singleton class for starting an events server daemon.
237     """
238
239     @classmethod
240     def ensure(cls, port):
241         """
242         Make sure the daemon is running on the given port.
243
244         :param port: the port in which the daemon should listen
245         :type port: int
246
247         :return: a daemon instance
248         :rtype: EventsServerDaemon
249         """
250         return cls.ensure_service(port, EventsServerService())