Merge branch 'release-0.3.1'
[leap_pycommon.git] / src / leap / common / events / server.py
1 # -*- coding: utf-8 -*-
2 # server.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 """
18 A server for the events mechanism.
19
20 A server can receive different kinds of requests from clients:
21
22   1. Registration request: store client port number to be notified when
23      a specific signal arrives.
24
25   2. Signal request: redistribute the signal to registered clients.
26 """
27 import logging
28 import socket
29
30
31 from protobuf.socketrpc import RpcService
32 from leap.common.events import (
33     events_pb2 as proto,
34     daemon,
35 )
36
37
38 logger = logging.getLogger(__name__)
39
40
41 SERVER_PORT = 8090
42
43 # the `registered_clients` dictionary below should have the following
44 # format:
45 #
46 #     { event_signal: [ port, ... ], ... }
47 #
48 registered_clients = {}
49
50
51 class PortAlreadyTaken(Exception):
52     """
53     Raised when trying to open a server in a port that is already taken.
54     """
55     pass
56
57
58 def ensure_server(port=SERVER_PORT):
59     """
60     Make sure the server is running on the given port.
61
62     Attempt to connect to given local port. Upon success, assume that the
63     events server has already been started. Upon failure, start events server.
64
65     :param port: the port in which server should be listening
66     :type port: int
67
68     :return: the daemon instance or nothing
69     :rtype: EventsServerDaemon or None
70
71     :raise PortAlreadyTaken: Raised if C{port} is already taken by something
72                              that is not an events server.
73     """
74     try:
75         # check if port is available
76         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
77         s.connect(('localhost', port))
78         s.close()
79         # port is taken, check if there's a server running there
80         ping(port,
81              reqcbk=lambda req, res: process_ping(port, req, res),
82              timeout=10)
83     except socket.error:
84         # port is available, run a server
85         logger.info('Launching server on port %d.', port)
86         return EventsServerDaemon.ensure(port)
87
88 def process_ping(port, request, response):
89     """
90     Response callback for the ping event.
91
92     :param port: Port that is trying to be used
93     :type port: int
94     :param request: Ping request made
95     :type request: proto.PingRequest
96     :param response: Response from the event
97     :type response: proto.EventResponse
98     """
99     if response is not None and response.status == proto.EventResponse.OK:
100         logger.info('A server is already running on port %d.', port)
101         return
102     # port is taken, and not by an events server
103     logger.info('Port %d is taken by something not an events server.', port)
104     raise PortAlreadyTaken(port)
105
106
107 def ping(port=SERVER_PORT, reqcbk=None, timeout=1000):
108     """
109     Ping the server.
110
111     :param port: the port in which server should be listening
112     :type port: int
113     :param reqcbk: a callback to be called when a response from server is
114                    received
115     :type reqcbk: function(proto.PingRequest, proto.EventResponse)
116     :param timeout: the timeout for synch calls
117     :type timeout: int
118
119     :return: the response from server for synch calls or nothing for asynch
120              calls.
121     :rtype: leap.common.events.events_pb2.EventsResponse or None
122     """
123     request = proto.PingRequest()
124     service = RpcService(
125         proto.EventsServerService_Stub,
126         port,
127         'localhost')
128     logger.info("Pinging server in port %d..." % port)
129     return service.ping(request, callback=reqcbk, timeout=timeout)
130
131
132 class EventsServerService(proto.EventsServerService):
133     """
134     Service for receiving events in clients.
135     """
136
137     def register(self, controller, request, done):
138         """
139         Register a client port to be signaled when specific events come in.
140
141         :param controller: used to mediate a single method call
142         :type controller: protobuf.socketrpc.controller.SocketRpcController
143         :param request: the request received from the client
144         :type request: leap.common.events.events_pb2.RegisterRequest
145         :param done: callback to be called when done
146         :type done: protobuf.socketrpc.server.Callback
147         """
148         logger.info("Received registration request: %s..." % str(request)[:40])
149         # add client port to signal list
150         if request.event not in registered_clients:
151             registered_clients[request.event] = set([])
152         registered_clients[request.event].add(request.port)
153         # send response back to client
154
155         logger.debug('sending response back')
156         response = proto.EventResponse()
157         response.status = proto.EventResponse.OK
158         done.run(response)
159
160     def unregister(self, controller, request, done):
161         """
162         Unregister a client port so it will not be signaled when specific
163         events come in.
164
165         :param controller: used to mediate a single method call
166         :type controller: protobuf.socketrpc.controller.SocketRpcController
167         :param request: the request received from the client
168         :type request: leap.common.events.events_pb2.RegisterRequest
169         :param done: callback to be called when done
170         :type done: protobuf.socketrpc.server.Callback
171         """
172         logger.info(
173             "Received unregistration request: %s..." % str(request)[:40])
174         # remove client port from signal list
175         response = proto.EventResponse()
176         if request.event in registered_clients:
177             try:
178                 registered_clients[request.event].remove(request.port)
179                 response.status = proto.EventResponse.OK
180             except KeyError:
181                 response.status = proto.EventsResponse.ERROR
182                 response.result = 'Port %d not registered.' % request.port
183         # send response back to client
184         logger.debug('sending response back')
185         done.run(response)
186
187     def signal(self, controller, request, done):
188         """
189         Perform an RPC call to signal all clients registered to receive a
190         specific signal.
191
192         :param controller: used to mediate a single method call
193         :type controller: protobuf.socketrpc.controller.SocketRpcController
194         :param request: the request received from the client
195         :type request: leap.common.events.events_pb2.SignalRequest
196         :param done: callback to be called when done
197         :type done: protobuf.socketrpc.server.Callback
198         """
199         logger.info('Received signal from client: %s...', str(request)[:40])
200         # send signal to all registered clients
201         # TODO: verify signal auth
202         if request.event in registered_clients:
203             for port in registered_clients[request.event]:
204
205                 def callback(req, resp):
206                     logger.info("Signal received by " + str(port))
207
208                 service = RpcService(proto.EventsClientService_Stub,
209                                      port, 'localhost')
210                 service.signal(request, callback=callback)
211         # send response back to client
212         response = proto.EventResponse()
213         response.status = proto.EventResponse.OK
214         done.run(response)
215
216     def ping(self, controller, request, done):
217         """
218         Reply to a ping request.
219
220         :param controller: used to mediate a single method call
221         :type controller: protobuf.socketrpc.controller.SocketRpcController
222         :param request: the request received from the client
223         :type request: leap.common.events.events_pb2.RegisterRequest
224         :param done: callback to be called when done
225         :type done: protobuf.socketrpc.server.Callback
226         """
227         logger.info("Received ping request, sending response.")
228         response = proto.EventResponse()
229         response.status = proto.EventResponse.OK
230         done.run(response)
231
232
233 class EventsServerDaemon(daemon.EventsSingletonDaemon):
234     """
235     Singleton class for starting an events server daemon.
236     """
237
238     @classmethod
239     def ensure(cls, port):
240         """
241         Make sure the daemon is running on the given port.
242
243         :param port: the port in which the daemon should listen
244         :type port: int
245
246         :return: a daemon instance
247         :rtype: EventsServerDaemon
248         """
249         return cls.ensure_service(port, EventsServerService())