1 # -*- coding: utf-8 -*-
3 # Copyright (C) 2013 LEAP
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 A server for the events mechanism.
20 A server can receive different kinds of requests from clients:
22 1. Registration request: store client port number to be notified when
23 a specific signal arrives.
25 2. Signal request: redistribute the signal to registered clients.
31 from protobuf.socketrpc import RpcService
32 from leap.common.events import (
38 logger = logging.getLogger(__name__)
43 # the `registered_clients` dictionary below should have the following
46 # { event_signal: [ port, ... ], ... }
48 registered_clients = {}
51 class PortAlreadyTaken(Exception):
53 Raised when trying to open a server in a port that is already taken.
58 def ensure_server(port=SERVER_PORT):
60 Make sure the server is running on the given port.
62 Attempt to connect to given local port. Upon success, assume that the
63 events server has already been started. Upon failure, start events server.
65 :param port: the port in which server should be listening
68 :return: the daemon instance or nothing
69 :rtype: EventsServerDaemon or None
71 :raise PortAlreadyTaken: Raised if C{port} is already taken by something
72 that is not an events server.
75 # check if port is available
76 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
77 s.connect(('localhost', port))
79 # port is taken, check if there's a server running there
81 if response is not None and response.status == proto.EventResponse.OK:
82 logger.info('A server is already running on port %d.', port)
84 # port is taken, and not by an events server
85 logger.info('Port %d is taken by something not an events server.', port)
86 raise PortAlreadyTaken(port)
88 # port is available, run a server
89 logger.info('Launching server on port %d.', port)
90 return EventsServerDaemon.ensure(port)
93 def ping(port=SERVER_PORT, reqcbk=None, timeout=1000):
97 :param port: the port in which server should be listening
99 :param reqcbk: a callback to be called when a response from server is
101 :type reqcbk: function
102 callback(leap.common.events.events_pb2.EventResponse)
103 :param timeout: the timeout for synch calls
106 request = proto.PingRequest()
107 service = RpcService(
108 proto.EventsServerService_Stub,
111 logger.info("Pinging server in port %d..." % port)
112 return service.ping(request, callback=reqcbk, timeout=timeout)
115 class EventsServerService(proto.EventsServerService):
117 Service for receiving events in clients.
120 def register(self, controller, request, done):
122 Register a client port to be signaled when specific events come in.
124 :param controller: used to mediate a single method call
125 :type controller: protobuf.socketrpc.controller.SocketRpcController
126 :param request: the request received from the client
127 :type request: leap.common.events.events_pb2.RegisterRequest
128 :param done: callback to be called when done
129 :type done: protobuf.socketrpc.server.Callback
131 logger.info("Received registration request: %s..." % str(request)[:40])
132 # add client port to signal list
133 if request.event not in registered_clients:
134 registered_clients[request.event] = set([])
135 registered_clients[request.event].add(request.port)
136 # send response back to client
138 logger.debug('sending response back')
139 response = proto.EventResponse()
140 response.status = proto.EventResponse.OK
143 def unregister(self, controller, request, done):
145 Unregister a client port so it will not be signaled when specific
148 :param controller: used to mediate a single method call
149 :type controller: protobuf.socketrpc.controller.SocketRpcController
150 :param request: the request received from the client
151 :type request: leap.common.events.events_pb2.RegisterRequest
152 :param done: callback to be called when done
153 :type done: protobuf.socketrpc.server.Callback
156 "Received unregistration request: %s..." % str(request)[:40])
157 # remove client port from signal list
158 response = proto.EventResponse()
159 if request.event in registered_clients:
161 registered_clients[request.event].remove(request.port)
162 response.status = proto.EventResponse.OK
164 response.status = proto.EventsResponse.ERROR
165 response.result = 'Port %d not registered.' % request.port
166 # send response back to client
167 logger.debug('sending response back')
170 def signal(self, controller, request, done):
172 Perform an RPC call to signal all clients registered to receive a
175 :param controller: used to mediate a single method call
176 :type controller: protobuf.socketrpc.controller.SocketRpcController
177 :param request: the request received from the client
178 :type request: leap.common.events.events_pb2.SignalRequest
179 :param done: callback to be called when done
180 :type done: protobuf.socketrpc.server.Callback
182 logger.info('Received signal from client: %s...', str(request)[:40])
183 # send signal to all registered clients
184 # TODO: verify signal auth
185 if request.event in registered_clients:
186 for port in registered_clients[request.event]:
188 def callback(req, resp):
189 logger.info("Signal received by " + str(port))
191 service = RpcService(proto.EventsClientService_Stub,
193 service.signal(request, callback=callback)
194 # send response back to client
195 response = proto.EventResponse()
196 response.status = proto.EventResponse.OK
199 def ping(self, controller, request, done):
201 Reply to a ping request.
203 :param controller: used to mediate a single method call
204 :type controller: protobuf.socketrpc.controller.SocketRpcController
205 :param request: the request received from the client
206 :type request: leap.common.events.events_pb2.RegisterRequest
207 :param done: callback to be called when done
208 :type done: protobuf.socketrpc.server.Callback
210 logger.info("Received ping request, sending response.")
211 response = proto.EventResponse()
212 response.status = proto.EventResponse.OK
216 class EventsServerDaemon(daemon.EventsSingletonDaemon):
218 Singleton class for starting an events server daemon.
222 def ensure(cls, port):
224 Make sure the daemon is running on the given port.
226 :param port: the port in which the daemon should listen
229 :return: a daemon instance
230 :rtype: EventsServerDaemon
232 return cls.ensure_service(port, EventsServerService())