1 # -*- coding: utf-8 -*-
3 # Copyright (C) 2013 LEAP
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 A server for the events mechanism.
20 A server can receive different kinds of requests from clients:
22 1. Registration request: store client port number to be notified when
23 a specific signal arrives.
25 2. Signal request: redistribute the signal to registered clients.
31 from protobuf.socketrpc import RpcService
32 from leap.common.events import (
38 logger = logging.getLogger(__name__)
43 # the `registered_clients` dictionary below should have the following
46 # { event_signal: [ port, ... ], ... }
48 registered_clients = {}
51 class PortAlreadyTaken(Exception):
53 Raised when trying to open a server in a port that is already taken.
58 def ensure_server(port=SERVER_PORT):
60 Make sure the server is running on the given port.
62 Attempt to connect to given local port. Upon success, assume that the
63 events server has already been started. Upon failure, start events server.
65 :param port: the port in which server should be listening
68 :return: the daemon instance or nothing
69 :rtype: EventsServerDaemon or None
71 :raise PortAlreadyTaken: Raised if C{port} is already taken by something
72 that is not an events server.
75 # check if port is available
76 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
77 s.connect(('localhost', port))
79 # port is taken, check if there's a server running there
80 response = ping(port=port, timeout=1000)
81 if response is not None and response.status == proto.EventResponse.OK:
82 logger.info('A server is already running on port %d.', port)
84 # port is taken, and not by an events server
86 'Port %d is taken by something not an events server.', port)
87 raise PortAlreadyTaken(port)
89 # port is available, run a server
90 logger.info('Launching server on port %d.', port)
91 return EventsServerDaemon.ensure(port)
94 def ping(port=SERVER_PORT, reqcbk=None, timeout=1000):
98 :param port: the port in which server should be listening
100 :param reqcbk: a callback to be called when a response from server is
102 :type reqcbk: function(proto.PingRequest, proto.EventResponse)
103 :param timeout: the timeout for synch calls
106 :return: the response from server for synch calls or nothing for asynch
108 :rtype: leap.common.events.events_pb2.EventsResponse or None
110 request = proto.PingRequest()
111 service = RpcService(
112 proto.EventsServerService_Stub,
115 logger.debug("Pinging server in port %d..." % port)
116 return service.ping(request, callback=reqcbk, timeout=timeout)
119 class EventsServerService(proto.EventsServerService):
121 Service for receiving events in clients.
124 def register(self, controller, request, done):
126 Register a client port to be signaled when specific events come in.
128 :param controller: used to mediate a single method call
129 :type controller: protobuf.socketrpc.controller.SocketRpcController
130 :param request: the request received from the client
131 :type request: leap.common.events.events_pb2.RegisterRequest
132 :param done: callback to be called when done
133 :type done: protobuf.socketrpc.server.Callback
135 logger.info("Received registration request: %s..." % str(request)[:40])
136 # add client port to signal list
137 if request.event not in registered_clients:
138 registered_clients[request.event] = set([])
139 registered_clients[request.event].add(request.port)
140 # send response back to client
142 logger.debug('sending response back')
143 response = proto.EventResponse()
144 response.status = proto.EventResponse.OK
147 def unregister(self, controller, request, done):
149 Unregister a client port so it will not be signaled when specific
152 :param controller: used to mediate a single method call
153 :type controller: protobuf.socketrpc.controller.SocketRpcController
154 :param request: the request received from the client
155 :type request: leap.common.events.events_pb2.RegisterRequest
156 :param done: callback to be called when done
157 :type done: protobuf.socketrpc.server.Callback
160 "Received unregistration request: %s..." % str(request)[:40])
161 # remove client port from signal list
162 response = proto.EventResponse()
163 if request.event in registered_clients:
165 registered_clients[request.event].remove(request.port)
166 response.status = proto.EventResponse.OK
168 response.status = proto.EventsResponse.ERROR
169 response.result = 'Port %d not registered.' % request.port
170 # send response back to client
171 logger.debug('sending response back')
174 def signal(self, controller, request, done):
176 Perform an RPC call to signal all clients registered to receive a
179 :param controller: used to mediate a single method call
180 :type controller: protobuf.socketrpc.controller.SocketRpcController
181 :param request: the request received from the client
182 :type request: leap.common.events.events_pb2.SignalRequest
183 :param done: callback to be called when done
184 :type done: protobuf.socketrpc.server.Callback
186 logger.debug('Received signal from client: %s...', str(request)[:40])
187 # send signal to all registered clients
188 # TODO: verify signal auth
189 if request.event in registered_clients:
190 for port in registered_clients[request.event]:
192 def callback(req, resp):
193 logger.debug("Signal received by " + str(port))
195 service = RpcService(proto.EventsClientService_Stub,
197 service.signal(request, callback=callback)
198 # send response back to client
199 response = proto.EventResponse()
200 response.status = proto.EventResponse.OK
203 def ping(self, controller, request, done):
205 Reply to a ping request.
207 :param controller: used to mediate a single method call
208 :type controller: protobuf.socketrpc.controller.SocketRpcController
209 :param request: the request received from the client
210 :type request: leap.common.events.events_pb2.RegisterRequest
211 :param done: callback to be called when done
212 :type done: protobuf.socketrpc.server.Callback
214 logger.debug("Received ping request, sending response.")
215 response = proto.EventResponse()
216 response.status = proto.EventResponse.OK
220 class EventsServerDaemon(daemon.EventsSingletonDaemon):
222 Singleton class for starting an events server daemon.
226 def ensure(cls, port):
228 Make sure the daemon is running on the given port.
230 :param port: the port in which the daemon should listen
233 :return: a daemon instance
234 :rtype: EventsServerDaemon
236 return cls.ensure_service(port, EventsServerService())