Trying to init events server raises when given port is not free.
[leap_pycommon.git] / src / leap / common / events / client.py
1 # -*- coding: utf-8 -*-
2 # client.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 """
19 The client end point of the events mechanism.
20
21 Clients are the communicating parties of the events mechanism. They
22 communicate by sending messages to a server, which in turn redistributes
23 messages to other clients.
24
25 When a client registers a callback for a given signal, it also tells the
26 server that it wants to be notified whenever signals of that type are sent by
27 some other client.
28 """
29
30
31 import logging
32 import threading
33
34
35 from protobuf.socketrpc import RpcService
36 from leap.common.events import (
37     events_pb2 as proto,
38     server,
39     daemon,
40     mac_auth,
41 )
42
43
44 logger = logging.getLogger(__name__)
45
46
47 # the `registered_callbacks` dictionary below should have the following
48 # format:
49 #
50 #     { event_signal: [ (uid, callback), ... ], ... }
51 #
52 registered_callbacks = {}
53
54
55 class CallbackAlreadyRegistered(Exception):
56     """
57     Raised when trying to register an already registered callback.
58     """
59     pass
60
61
62 def ensure_client_daemon():
63     """
64     Ensure the client daemon is running and listening for incoming
65     messages.
66
67     :return: the daemon instance
68     :rtype: EventsClientDaemon
69     """
70     import time
71     daemon = EventsClientDaemon.ensure(0)
72     logger.debug('ensure client daemon')
73
74     # Because we use a random port we want to wait until a port is assigned to
75     # local client daemon.
76
77     while not (EventsClientDaemon.get_instance() and
78                EventsClientDaemon.get_instance().get_port()):
79         time.sleep(0.1)
80     return daemon
81
82
83 def register(signal, callback, uid=None, replace=False, reqcbk=None,
84              timeout=1000):
85     """
86     Registers a callback to be called when a specific signal event is
87     received.
88
89     Will timeout after timeout ms if response has not been received. The
90     timeout arg is only used for asynch requests. If a reqcbk callback has
91     been supplied the timeout arg is not used. The response value will be
92     returned for a synch request but nothing will be returned for an asynch
93     request.
94
95     :param signal: the signal that causes the callback to be launched
96     :type signal: int (see the `events.proto` file)
97     :param callback: the callback to be called when the signal is received
98     :type callback: function
99         callback(leap.common.events.events_pb2.SignalRequest)
100     :param uid: a unique id for the callback
101     :type uid: int
102     :param replace: should an existent callback with same uid be replaced?
103     :type replace: bool
104     :param reqcbk: a callback to be called when a response from server is
105         received
106     :type reqcbk: function
107         callback(leap.common.events.events_pb2.EventResponse)
108     :param timeout: the timeout for synch calls
109     :type timeout: int
110
111     Might raise a CallbackAlreadyRegistered exception if there's already a
112     callback identified by the given uid and replace is False.
113
114     :return: the response from server for synch calls or nothing for asynch
115         calls.
116     :rtype: leap.common.events.events_pb2.EventsResponse or None
117     """
118     ensure_client_daemon()  # so we can receive registered signals
119     # register callback locally
120     if signal not in registered_callbacks:
121         registered_callbacks[signal] = []
122     cbklist = registered_callbacks[signal]
123     if uid and filter(lambda (x, y): x == uid, cbklist):
124         if not replace:
125             raise CallbackAlreadyRegisteredException()
126         else:
127             registered_callbacks[signal] = filter(lambda(x, y): x != uid,
128                                                   cbklist)
129     registered_callbacks[signal].append((uid, callback))
130     # register callback on server
131     request = proto.RegisterRequest()
132     request.event = signal
133     request.port = EventsClientDaemon.get_instance().get_port()
134     request.mac_method = mac_auth.MacMethod.MAC_NONE
135     request.mac = ""
136     service = RpcService(proto.EventsServerService_Stub,
137                          server.SERVER_PORT, 'localhost')
138     logger.info(
139         "Sending registration request to server on port %s: %s",
140         server.SERVER_PORT,
141         str(request)[:40])
142     return service.register(request, callback=reqcbk, timeout=timeout)
143
144 def unregister(signal, uid=None, reqcbk=None, timeout=1000):
145     """
146     Unregister a callback.
147
148     If C{uid} is specified, unregisters only the callback identified by that
149     unique id. Otherwise, unregisters all callbacks
150
151     :param signal: the signal that causes the callback to be launched
152     :type signal: int (see the `events.proto` file)
153     :param uid: a unique id for the callback
154     :type uid: int
155     :param reqcbk: a callback to be called when a response from server is
156         received
157     :type reqcbk: function
158         callback(leap.common.events.events_pb2.EventResponse)
159     :param timeout: the timeout for synch calls
160     :type timeout: int
161
162     :return: the response from server for synch calls or nothing for asynch
163         calls or None if no callback is registered for that signal or uid.
164     :rtype: leap.common.events.events_pb2.EventsResponse or None
165     """
166     if signal not in registered_callbacks or not registered_callbacks[signal]:
167         logger.warning("No callback registered for signal %d." % signal)
168         return None
169     # unregister callback locally
170     cbklist = registered_callbacks[signal]
171     if uid is not None:
172         if filter(lambda (cbkuid, _): cbkuid == uid, cbklist) == []:
173             logger.warning("No callback registered for uid %d." % st)
174             return None
175         registered_callbacks[signal] = filter(lambda(x, y): x != uid, cbklist)
176     else:
177         # exclude all callbacks for given signal
178         registered_callbacks[signal] = []
179     # unregister port in server if there are no more callbacks for this signal
180     if not registered_callbacks[signal]:
181         request = proto.UnregisterRequest()
182         request.event = signal
183         request.port = EventsClientDaemon.get_instance().get_port()
184         request.mac_method = mac_auth.MacMethod.MAC_NONE
185         request.mac = ""
186         service = RpcService(proto.EventsServerService_Stub,
187                              server.SERVER_PORT, 'localhost')
188         logger.info(
189             "Sending unregistration request to server on port %s: %s",
190             server.SERVER_PORT,
191             str(request)[:40])
192         return service.unregister(request, callback=reqcbk, timeout=timeout)
193
194
195 def signal(signal, content="", mac_method="", mac="", reqcbk=None,
196            timeout=1000):
197     """
198     Send `signal` event to events server.
199
200     Will timeout after timeout ms if response has not been received. The
201     timeout arg is only used for asynch requests.  If a reqcbk callback has
202     been supplied the timeout arg is not used. The response value will be
203     returned for a synch request but nothing will be returned for an asynch
204     request.
205
206     :param signal: the signal that causes the callback to be launched
207     :type signal: int (see the `events.proto` file)
208     :param content: the contents of the event signal
209     :type content: str
210     :param mac_method: the method used for auth mac
211     :type mac_method: str
212     :param mac: the content of the auth mac
213     :type mac: str
214     :param reqcbk: a callback to be called when a response from server is
215         received
216     :type reqcbk: function
217         callback(leap.common.events.events_pb2.EventResponse)
218     :param timeout: the timeout for synch calls
219     :type timeout: int
220
221     :return: the response from server for synch calls or nothing for asynch
222         calls.
223     :rtype: leap.common.events.events_pb2.EventsResponse or None
224     """
225     request = proto.SignalRequest()
226     request.event = signal
227     request.content = content
228     request.mac_method = mac_method
229     request.mac = mac
230     service = RpcService(proto.EventsServerService_Stub, server.SERVER_PORT,
231                          'localhost')
232     logger.info("Sending signal to server: %s", str(request)[:40])
233     return service.signal(request, callback=reqcbk, timeout=timeout)
234
235
236 def ping(port, reqcbk=None, timeout=1000):
237     """
238     Ping a client running in C{port}.
239
240     :param port: the port in which the client should be listening
241     :type port: int
242     :param reqcbk: a callback to be called when a response from client is
243         received
244     :type reqcbk: function
245         callback(leap.common.events.events_pb2.EventResponse)
246     :param timeout: the timeout for synch calls
247     :type timeout: int
248     """
249     request = proto.PingRequest()
250     service = RpcService(
251         proto.EventsClientService_Stub,
252         port,
253         'localhost')
254     logger.info("Pinging a client in port %d..." % port)
255     return service.ping(request, callback=reqcbk, timeout=timeout)
256
257
258 class EventsClientService(proto.EventsClientService):
259     """
260     Service for receiving signal events in clients.
261     """
262
263     def __init__(self):
264         proto.EventsClientService.__init__(self)
265
266     def signal(self, controller, request, done):
267         """
268         Receive a signal and run callbacks registered for that signal.
269
270         This method is called whenever a signal request is received from
271         server.
272
273         :param controller: used to mediate a single method call
274         :type controller: protobuf.socketrpc.controller.SocketRpcController
275         :param request: the request received from the client
276         :type request: leap.common.events.events_pb2.SignalRequest
277         :param done: callback to be called when done
278         :type done: protobuf.socketrpc.server.Callback
279         """
280         logger.info('Received signal from server: %s...' % str(request)[:40])
281
282         # run registered callbacks
283         # TODO: verify authentication using mac in incoming message
284         if request.event in registered_callbacks:
285             for (_, cbk) in registered_callbacks[request.event]:
286                 # callbacks should be prepared to receive a
287                 # events_pb2.SignalRequest.
288                 cbk(request)
289
290         # send response back to server
291         response = proto.EventResponse()
292         response.status = proto.EventResponse.OK
293         done.run(response)
294
295     def ping(self, controller, request, done):
296         """
297         Reply to a ping request.
298
299         :param controller: used to mediate a single method call
300         :type controller: protobuf.socketrpc.controller.SocketRpcController
301         :param request: the request received from the client
302         :type request: leap.common.events.events_pb2.RegisterRequest
303         :param done: callback to be called when done
304         :type done: protobuf.socketrpc.server.Callback
305         """
306         logger.info("Received ping request, sending response.")
307         response = proto.EventResponse()
308         response.status = proto.EventResponse.OK
309         done.run(response)
310
311
312 class EventsClientDaemon(daemon.EventsSingletonDaemon):
313     """
314     A daemon that listens for incoming events from server.
315     """
316
317     @classmethod
318     def ensure(cls, port):
319         """
320         Make sure the daemon is running on the given port.
321
322         :param port: the port in which the daemon should listen
323         :type port: int
324
325         :return: a daemon instance
326         :rtype: EventsClientDaemon
327         """
328         return cls.ensure_service(port, EventsClientService())