Fix Exception use
[leap_pycommon.git] / src / leap / common / events / client.py
1 # -*- coding: utf-8 -*-
2 # client.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 """
19 The client end point of the events mechanism.
20
21 Clients are the communicating parties of the events mechanism. They
22 communicate by sending messages to a server, which in turn redistributes
23 messages to other clients.
24
25 When a client registers a callback for a given signal, it also tells the
26 server that it wants to be notified whenever signals of that type are sent by
27 some other client.
28 """
29
30
31 import logging
32 import threading
33
34
35 from protobuf.socketrpc import RpcService
36 from leap.common.events import (
37     events_pb2 as proto,
38     server,
39     daemon,
40     mac_auth,
41 )
42
43
44 logger = logging.getLogger(__name__)
45
46
47 # the `registered_callbacks` dictionary below should have the following
48 # format:
49 #
50 #     { event_signal: [ (uid, callback), ... ], ... }
51 #
52 registered_callbacks = {}
53
54
55 class CallbackAlreadyRegistered(Exception):
56     """
57     Raised when trying to register an already registered callback.
58     """
59     pass
60
61
62 def ensure_client_daemon():
63     """
64     Ensure the client daemon is running and listening for incoming
65     messages.
66
67     :return: the daemon instance
68     :rtype: EventsClientDaemon
69     """
70     import time
71     daemon = EventsClientDaemon.ensure(0)
72     logger.debug('ensure client daemon')
73
74     # Because we use a random port we want to wait until a port is assigned to
75     # local client daemon.
76
77     while not (EventsClientDaemon.get_instance() and
78                EventsClientDaemon.get_instance().get_port()):
79         time.sleep(0.1)
80     return daemon
81
82
83 def register(signal, callback, uid=None, replace=False, reqcbk=None,
84              timeout=1000):
85     """
86     Registers a callback to be called when a specific signal event is
87     received.
88
89     Will timeout after timeout ms if response has not been received. The
90     timeout arg is only used for asynch requests. If a reqcbk callback has
91     been supplied the timeout arg is not used. The response value will be
92     returned for a synch request but nothing will be returned for an asynch
93     request.
94
95     :param signal: the signal that causes the callback to be launched
96     :type signal: int (see the `events.proto` file)
97     :param callback: the callback to be called when the signal is received
98     :type callback: function(leap.common.events.events_pb2.SignalRequest)
99     :param uid: a unique id for the callback
100     :type uid: int
101     :param replace: should an existent callback with same uid be replaced?
102     :type replace: bool
103     :param reqcbk: a callback to be called when a response from server is
104                    received
105     :type reqcbk: function(proto.RegisterRequest, proto.EventResponse)
106     :param timeout: the timeout for synch calls
107     :type timeout: int
108
109     Might raise a CallbackAlreadyRegistered exception if there's already a
110     callback identified by the given uid and replace is False.
111
112     :return: the response from server for synch calls or nothing for asynch
113              calls.
114     :rtype: leap.common.events.events_pb2.EventsResponse or None
115     """
116     ensure_client_daemon()  # so we can receive registered signals
117     # register callback locally
118     if signal not in registered_callbacks:
119         registered_callbacks[signal] = []
120     cbklist = registered_callbacks[signal]
121     if uid and filter(lambda (x, y): x == uid, cbklist):
122         if not replace:
123             raise CallbackAlreadyRegistered()
124         else:
125             registered_callbacks[signal] = filter(lambda(x, y): x != uid,
126                                                   cbklist)
127     registered_callbacks[signal].append((uid, callback))
128     # register callback on server
129     request = proto.RegisterRequest()
130     request.event = signal
131     request.port = EventsClientDaemon.get_instance().get_port()
132     request.mac_method = mac_auth.MacMethod.MAC_NONE
133     request.mac = ""
134     service = RpcService(proto.EventsServerService_Stub,
135                          server.SERVER_PORT, 'localhost')
136     logger.info(
137         "Sending registration request to server on port %s: %s",
138         server.SERVER_PORT,
139         str(request)[:40])
140     return service.register(request, callback=reqcbk, timeout=timeout)
141
142 def unregister(signal, uid=None, reqcbk=None, timeout=1000):
143     """
144     Unregister a callback.
145
146     If C{uid} is specified, unregisters only the callback identified by that
147     unique id. Otherwise, unregisters all callbacks
148
149     :param signal: the signal that causes the callback to be launched
150     :type signal: int (see the `events.proto` file)
151     :param uid: a unique id for the callback
152     :type uid: int
153     :param reqcbk: a callback to be called when a response from server is
154                    received
155     :type reqcbk: function(proto.UnregisterRequest, proto.EventResponse)
156     :param timeout: the timeout for synch calls
157     :type timeout: int
158
159     :return: the response from server for synch calls or nothing for asynch
160              calls or None if no callback is registered for that signal or
161              uid.
162     :rtype: leap.common.events.events_pb2.EventsResponse or None
163     """
164     if signal not in registered_callbacks or not registered_callbacks[signal]:
165         logger.warning("No callback registered for signal %d." % signal)
166         return None
167     # unregister callback locally
168     cbklist = registered_callbacks[signal]
169     if uid is not None:
170         if filter(lambda (cbkuid, _): cbkuid == uid, cbklist) == []:
171             logger.warning("No callback registered for uid %d." % st)
172             return None
173         registered_callbacks[signal] = filter(lambda(x, y): x != uid, cbklist)
174     else:
175         # exclude all callbacks for given signal
176         registered_callbacks[signal] = []
177     # unregister port in server if there are no more callbacks for this signal
178     if not registered_callbacks[signal]:
179         request = proto.UnregisterRequest()
180         request.event = signal
181         request.port = EventsClientDaemon.get_instance().get_port()
182         request.mac_method = mac_auth.MacMethod.MAC_NONE
183         request.mac = ""
184         service = RpcService(proto.EventsServerService_Stub,
185                              server.SERVER_PORT, 'localhost')
186         logger.info(
187             "Sending unregistration request to server on port %s: %s",
188             server.SERVER_PORT,
189             str(request)[:40])
190         return service.unregister(request, callback=reqcbk, timeout=timeout)
191
192
193 def signal(signal, content="", mac_method="", mac="", reqcbk=None,
194            timeout=1000):
195     """
196     Send `signal` event to events server.
197
198     Will timeout after timeout ms if response has not been received. The
199     timeout arg is only used for asynch requests.  If a reqcbk callback has
200     been supplied the timeout arg is not used. The response value will be
201     returned for a synch request but nothing will be returned for an asynch
202     request.
203
204     :param signal: the signal that causes the callback to be launched
205     :type signal: int (see the `events.proto` file)
206     :param content: the contents of the event signal
207     :type content: str
208     :param mac_method: the method used for auth mac
209     :type mac_method: str
210     :param mac: the content of the auth mac
211     :type mac: str
212     :param reqcbk: a callback to be called when a response from server is
213                    received
214     :type reqcbk: function(proto.SignalRequest, proto.EventResponse)
215     :param timeout: the timeout for synch calls
216     :type timeout: int
217
218     :return: the response from server for synch calls or nothing for asynch
219              calls.
220     :rtype: leap.common.events.events_pb2.EventsResponse or None
221     """
222     request = proto.SignalRequest()
223     request.event = signal
224     request.content = content
225     request.mac_method = mac_method
226     request.mac = mac
227     service = RpcService(proto.EventsServerService_Stub, server.SERVER_PORT,
228                          'localhost')
229     logger.info("Sending signal to server: %s", str(request)[:40])
230     return service.signal(request, callback=reqcbk, timeout=timeout)
231
232
233 def ping(port, reqcbk=None, timeout=1000):
234     """
235     Ping a client running in C{port}.
236
237     :param port: the port in which the client should be listening
238     :type port: int
239     :param reqcbk: a callback to be called when a response from client is
240                    received
241     :type reqcbk: function(proto.PingRequest, proto.EventResponse)
242     :param timeout: the timeout for synch calls
243     :type timeout: int
244
245     :return: the response from client for synch calls or nothing for asynch
246              calls.
247     :rtype: leap.common.events.events_pb2.EventsResponse or None
248     """
249     request = proto.PingRequest()
250     service = RpcService(
251         proto.EventsClientService_Stub,
252         port,
253         'localhost')
254     logger.info("Pinging a client in port %d..." % port)
255     return service.ping(request, callback=reqcbk, timeout=timeout)
256
257
258 class EventsClientService(proto.EventsClientService):
259     """
260     Service for receiving signal events in clients.
261     """
262
263     def __init__(self):
264         proto.EventsClientService.__init__(self)
265
266     def signal(self, controller, request, done):
267         """
268         Receive a signal and run callbacks registered for that signal.
269
270         This method is called whenever a signal request is received from
271         server.
272
273         :param controller: used to mediate a single method call
274         :type controller: protobuf.socketrpc.controller.SocketRpcController
275         :param request: the request received from the client
276         :type request: leap.common.events.events_pb2.SignalRequest
277         :param done: callback to be called when done
278         :type done: protobuf.socketrpc.server.Callback
279         """
280         logger.info('Received signal from server: %s...' % str(request)[:40])
281
282         # run registered callbacks
283         # TODO: verify authentication using mac in incoming message
284         if request.event in registered_callbacks:
285             for (_, cbk) in registered_callbacks[request.event]:
286                 # callbacks should be prepared to receive a
287                 # events_pb2.SignalRequest.
288                 cbk(request)
289
290         # send response back to server
291         response = proto.EventResponse()
292         response.status = proto.EventResponse.OK
293         done.run(response)
294
295     def ping(self, controller, request, done):
296         """
297         Reply to a ping request.
298
299         :param controller: used to mediate a single method call
300         :type controller: protobuf.socketrpc.controller.SocketRpcController
301         :param request: the request received from the client
302         :type request: leap.common.events.events_pb2.RegisterRequest
303         :param done: callback to be called when done
304         :type done: protobuf.socketrpc.server.Callback
305         """
306         logger.info("Received ping request, sending response.")
307         response = proto.EventResponse()
308         response.status = proto.EventResponse.OK
309         done.run(response)
310
311
312 class EventsClientDaemon(daemon.EventsSingletonDaemon):
313     """
314     A daemon that listens for incoming events from server.
315     """
316
317     @classmethod
318     def ensure(cls, port):
319         """
320         Make sure the daemon is running on the given port.
321
322         :param port: the port in which the daemon should listen
323         :type port: int
324
325         :return: a daemon instance
326         :rtype: EventsClientDaemon
327         """
328         return cls.ensure_service(port, EventsClientService())