Trying to init events server raises when given port is not free.
[leap_pycommon.git] / src / leap / common / events / server.py
1 # -*- coding: utf-8 -*-
2 # server.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 """
18 A server for the events mechanism.
19
20 A server can receive different kinds of requests from clients:
21
22   1. Registration request: store client port number to be notified when
23      a specific signal arrives.
24
25   2. Signal request: redistribute the signal to registered clients.
26 """
27 import logging
28 import socket
29
30
31 from protobuf.socketrpc import RpcService
32 from leap.common.events import (
33     events_pb2 as proto,
34     daemon,
35 )
36
37
38 logger = logging.getLogger(__name__)
39
40
41 SERVER_PORT = 8090
42
43 # the `registered_clients` dictionary below should have the following
44 # format:
45 #
46 #     { event_signal: [ port, ... ], ... }
47 #
48 registered_clients = {}
49
50
51 class PortAlreadyTaken(Exception):
52     """
53     Raised when trying to open a server in a port that is already taken.
54     """
55     pass
56
57
58 def ensure_server(port=SERVER_PORT):
59     """
60     Make sure the server is running on the given port.
61
62     Attempt to connect to given local port. Upon success, assume that the
63     events server has already been started. Upon failure, start events server.
64
65     :param port: the port in which server should be listening
66     :type port: int
67
68     :return: the daemon instance or nothing
69     :rtype: EventsServerDaemon or None
70
71     :raise PortAlreadyTaken: Raised if C{port} is already taken by something
72         that is not an events server.
73     """
74     try:
75         # check if port is available
76         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
77         s.connect(('localhost', port))
78         s.close()
79         # port is taken, check if there's a server running there
80         response = ping(port)
81         if response is not None and response.status == proto.EventResponse.OK:
82             logger.info('A server is already running on port %d.', port)
83             return None
84         # port is taken, and not by an events server
85         logger.info('Port %d is taken by something not an events server.', port)
86         raise PortAlreadyTaken(port)
87     except socket.error:
88         # port is available, run a server
89         logger.info('Launching server on port %d.', port)
90         return EventsServerDaemon.ensure(port)
91
92
93 def ping(port=SERVER_PORT, reqcbk=None, timeout=1000):
94     """
95     Ping the server.
96
97     :param port: the port in which server should be listening
98     :type port: int
99     :param reqcbk: a callback to be called when a response from server is
100         received
101     :type reqcbk: function
102         callback(leap.common.events.events_pb2.EventResponse)
103     :param timeout: the timeout for synch calls
104     :type timeout: int
105     """
106     request = proto.PingRequest()
107     service = RpcService(
108         proto.EventsServerService_Stub,
109         port,
110         'localhost')
111     logger.info("Pinging server in port %d..." % port)
112     return service.ping(request, callback=reqcbk, timeout=timeout)
113
114
115 class EventsServerService(proto.EventsServerService):
116     """
117     Service for receiving events in clients.
118     """
119
120     def register(self, controller, request, done):
121         """
122         Register a client port to be signaled when specific events come in.
123
124         :param controller: used to mediate a single method call
125         :type controller: protobuf.socketrpc.controller.SocketRpcController
126         :param request: the request received from the client
127         :type request: leap.common.events.events_pb2.RegisterRequest
128         :param done: callback to be called when done
129         :type done: protobuf.socketrpc.server.Callback
130         """
131         logger.info("Received registration request: %s..." % str(request)[:40])
132         # add client port to signal list
133         if request.event not in registered_clients:
134             registered_clients[request.event] = set([])
135         registered_clients[request.event].add(request.port)
136         # send response back to client
137
138         logger.debug('sending response back')
139         response = proto.EventResponse()
140         response.status = proto.EventResponse.OK
141         done.run(response)
142
143     def unregister(self, controller, request, done):
144         """
145         Unregister a client port so it will not be signaled when specific
146         events come in.
147
148         :param controller: used to mediate a single method call
149         :type controller: protobuf.socketrpc.controller.SocketRpcController
150         :param request: the request received from the client
151         :type request: leap.common.events.events_pb2.RegisterRequest
152         :param done: callback to be called when done
153         :type done: protobuf.socketrpc.server.Callback
154         """
155         logger.info(
156             "Received unregistration request: %s..." % str(request)[:40])
157         # remove client port from signal list
158         response = proto.EventResponse()
159         if request.event in registered_clients:
160             try:
161                 registered_clients[request.event].remove(request.port)
162                 response.status = proto.EventResponse.OK
163             except KeyError:
164                 response.status = proto.EventsResponse.ERROR
165                 response.result = 'Port %d not registered.' % request.port
166         # send response back to client
167         logger.debug('sending response back')
168         done.run(response)
169
170     def signal(self, controller, request, done):
171         """
172         Perform an RPC call to signal all clients registered to receive a
173         specific signal.
174
175         :param controller: used to mediate a single method call
176         :type controller: protobuf.socketrpc.controller.SocketRpcController
177         :param request: the request received from the client
178         :type request: leap.common.events.events_pb2.SignalRequest
179         :param done: callback to be called when done
180         :type done: protobuf.socketrpc.server.Callback
181         """
182         logger.info('Received signal from client: %s...', str(request)[:40])
183         # send signal to all registered clients
184         # TODO: verify signal auth
185         if request.event in registered_clients:
186             for port in registered_clients[request.event]:
187
188                 def callback(req, resp):
189                     logger.info("Signal received by " + str(port))
190
191                 service = RpcService(proto.EventsClientService_Stub,
192                                      port, 'localhost')
193                 service.signal(request, callback=callback)
194         # send response back to client
195         response = proto.EventResponse()
196         response.status = proto.EventResponse.OK
197         done.run(response)
198
199     def ping(self, controller, request, done):
200         """
201         Reply to a ping request.
202
203         :param controller: used to mediate a single method call
204         :type controller: protobuf.socketrpc.controller.SocketRpcController
205         :param request: the request received from the client
206         :type request: leap.common.events.events_pb2.RegisterRequest
207         :param done: callback to be called when done
208         :type done: protobuf.socketrpc.server.Callback
209         """
210         logger.info("Received ping request, sending response.")
211         response = proto.EventResponse()
212         response.status = proto.EventResponse.OK
213         done.run(response)
214
215
216 class EventsServerDaemon(daemon.EventsSingletonDaemon):
217     """
218     Singleton class for starting an events server daemon.
219     """
220
221     @classmethod
222     def ensure(cls, port):
223         """
224         Make sure the daemon is running on the given port.
225
226         :param port: the port in which the daemon should listen
227         :type port: int
228
229         :return: a daemon instance
230         :rtype: EventsServerDaemon
231         """
232         return cls.ensure_service(port, EventsServerService())