Merge remote-tracking branch 'ivan/feature/add-leap-check' into develop
[leap_pycommon.git] / src / leap / common / events / server.py
1 # -*- coding: utf-8 -*-
2 # server.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 """
18 A server for the events mechanism.
19
20 A server can receive different kinds of requests from clients:
21
22   1. Registration request: store client port number to be notified when
23      a specific signal arrives.
24
25   2. Signal request: redistribute the signal to registered clients.
26 """
27 import logging
28 import socket
29
30
31 from protobuf.socketrpc import RpcService
32 from leap.common.events import (
33     events_pb2 as proto,
34     daemon,
35 )
36
37
38 logger = logging.getLogger(__name__)
39
40
41 SERVER_PORT = 8090
42
43 # the `registered_clients` dictionary below should have the following
44 # format:
45 #
46 #     { event_signal: [ port, ... ], ... }
47 #
48 registered_clients = {}
49
50
51 class PortAlreadyTaken(Exception):
52     """
53     Raised when trying to open a server in a port that is already taken.
54     """
55     pass
56
57
58 def ensure_server(port=SERVER_PORT):
59     """
60     Make sure the server is running on the given port.
61
62     Attempt to connect to given local port. Upon success, assume that the
63     events server has already been started. Upon failure, start events server.
64
65     :param port: the port in which server should be listening
66     :type port: int
67
68     :return: the daemon instance or nothing
69     :rtype: EventsServerDaemon or None
70
71     :raise PortAlreadyTaken: Raised if C{port} is already taken by something
72                              that is not an events server.
73     """
74     try:
75         # check if port is available
76         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
77         s.connect(('localhost', port))
78         s.close()
79         # port is taken, check if there's a server running there
80         response = ping(port)
81         if response is not None and response.status == proto.EventResponse.OK:
82             logger.info('A server is already running on port %d.', port)
83             return None
84         # port is taken, and not by an events server
85         logger.info('Port %d is taken by something not an events server.', port)
86         raise PortAlreadyTaken(port)
87     except socket.error:
88         # port is available, run a server
89         logger.info('Launching server on port %d.', port)
90         return EventsServerDaemon.ensure(port)
91
92
93 def ping(port=SERVER_PORT, reqcbk=None, timeout=1000):
94     """
95     Ping the server.
96
97     :param port: the port in which server should be listening
98     :type port: int
99     :param reqcbk: a callback to be called when a response from server is
100                    received
101     :type reqcbk: function(leap.common.events.events_pb2.EventResponse)
102     :param timeout: the timeout for synch calls
103     :type timeout: int
104
105     :return: the response from server for synch calls or nothing for asynch
106              calls.
107     :rtype: leap.common.events.events_pb2.EventsResponse or None
108     """
109     request = proto.PingRequest()
110     service = RpcService(
111         proto.EventsServerService_Stub,
112         port,
113         'localhost')
114     logger.info("Pinging server in port %d..." % port)
115     return service.ping(request, callback=reqcbk, timeout=timeout)
116
117
118 class EventsServerService(proto.EventsServerService):
119     """
120     Service for receiving events in clients.
121     """
122
123     def register(self, controller, request, done):
124         """
125         Register a client port to be signaled when specific events come in.
126
127         :param controller: used to mediate a single method call
128         :type controller: protobuf.socketrpc.controller.SocketRpcController
129         :param request: the request received from the client
130         :type request: leap.common.events.events_pb2.RegisterRequest
131         :param done: callback to be called when done
132         :type done: protobuf.socketrpc.server.Callback
133         """
134         logger.info("Received registration request: %s..." % str(request)[:40])
135         # add client port to signal list
136         if request.event not in registered_clients:
137             registered_clients[request.event] = set([])
138         registered_clients[request.event].add(request.port)
139         # send response back to client
140
141         logger.debug('sending response back')
142         response = proto.EventResponse()
143         response.status = proto.EventResponse.OK
144         done.run(response)
145
146     def unregister(self, controller, request, done):
147         """
148         Unregister a client port so it will not be signaled when specific
149         events come in.
150
151         :param controller: used to mediate a single method call
152         :type controller: protobuf.socketrpc.controller.SocketRpcController
153         :param request: the request received from the client
154         :type request: leap.common.events.events_pb2.RegisterRequest
155         :param done: callback to be called when done
156         :type done: protobuf.socketrpc.server.Callback
157         """
158         logger.info(
159             "Received unregistration request: %s..." % str(request)[:40])
160         # remove client port from signal list
161         response = proto.EventResponse()
162         if request.event in registered_clients:
163             try:
164                 registered_clients[request.event].remove(request.port)
165                 response.status = proto.EventResponse.OK
166             except KeyError:
167                 response.status = proto.EventsResponse.ERROR
168                 response.result = 'Port %d not registered.' % request.port
169         # send response back to client
170         logger.debug('sending response back')
171         done.run(response)
172
173     def signal(self, controller, request, done):
174         """
175         Perform an RPC call to signal all clients registered to receive a
176         specific signal.
177
178         :param controller: used to mediate a single method call
179         :type controller: protobuf.socketrpc.controller.SocketRpcController
180         :param request: the request received from the client
181         :type request: leap.common.events.events_pb2.SignalRequest
182         :param done: callback to be called when done
183         :type done: protobuf.socketrpc.server.Callback
184         """
185         logger.info('Received signal from client: %s...', str(request)[:40])
186         # send signal to all registered clients
187         # TODO: verify signal auth
188         if request.event in registered_clients:
189             for port in registered_clients[request.event]:
190
191                 def callback(req, resp):
192                     logger.info("Signal received by " + str(port))
193
194                 service = RpcService(proto.EventsClientService_Stub,
195                                      port, 'localhost')
196                 service.signal(request, callback=callback)
197         # send response back to client
198         response = proto.EventResponse()
199         response.status = proto.EventResponse.OK
200         done.run(response)
201
202     def ping(self, controller, request, done):
203         """
204         Reply to a ping request.
205
206         :param controller: used to mediate a single method call
207         :type controller: protobuf.socketrpc.controller.SocketRpcController
208         :param request: the request received from the client
209         :type request: leap.common.events.events_pb2.RegisterRequest
210         :param done: callback to be called when done
211         :type done: protobuf.socketrpc.server.Callback
212         """
213         logger.info("Received ping request, sending response.")
214         response = proto.EventResponse()
215         response.status = proto.EventResponse.OK
216         done.run(response)
217
218
219 class EventsServerDaemon(daemon.EventsSingletonDaemon):
220     """
221     Singleton class for starting an events server daemon.
222     """
223
224     @classmethod
225     def ensure(cls, port):
226         """
227         Make sure the daemon is running on the given port.
228
229         :param port: the port in which the daemon should listen
230         :type port: int
231
232         :return: a daemon instance
233         :rtype: EventsServerDaemon
234         """
235         return cls.ensure_service(port, EventsServerService())