[feature] reactor-based authenticator
[leap_pycommon.git] / src / leap / common / events / client.py
1 # -*- coding: utf-8 -*-
2 # client.py
3 # Copyright (C) 2013, 2014, 2015 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 """
18 The client end point of the events mechanism.
19
20 Clients are the communicating parties of the events mechanism. They
21 communicate by sending messages to a server, which in turn redistributes
22 messages to other clients.
23
24 When a client registers a callback for a given event, it also tells the
25 server that it wants to be notified whenever events of that type are sent by
26 some other client.
27 """
28 import logging
29 import collections
30 import uuid
31 import threading
32 import time
33 import pickle
34 import os
35
36 from abc import ABCMeta
37 from abc import abstractmethod
38
39 import zmq
40 from zmq.eventloop import zmqstream
41 from zmq.eventloop import ioloop
42
43 # XXX some distros don't package libsodium, so we have to be prepared for
44 #     absence of zmq.auth
45 try:
46     import zmq.auth
47 except ImportError:
48     pass
49
50 from leap.common.config import flags, get_path_prefix
51 from leap.common.zmq_utils import zmq_has_curve
52 from leap.common.zmq_utils import maybe_create_and_get_certificates
53 from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX
54
55 from leap.common.events.errors import CallbackAlreadyRegisteredError
56 from leap.common.events.server import EMIT_ADDR
57 from leap.common.events.server import REG_ADDR
58 from leap.common.events import catalog
59
60
61 logger = logging.getLogger(__name__)
62
63
64 _emit_addr = EMIT_ADDR
65 _reg_addr = REG_ADDR
66
67
68 def configure_client(emit_addr, reg_addr):
69     global _emit_addr, _reg_addr
70     logger.debug("Configuring client with addresses: (%s, %s)" %
71                  (emit_addr, reg_addr))
72     _emit_addr = emit_addr
73     _reg_addr = reg_addr
74
75
76 class EventsClient(object):
77     """
78     A singleton client for the events mechanism.
79     """
80
81     __metaclass__ = ABCMeta
82
83     _instance = None
84     _instance_lock = threading.Lock()
85
86     def __init__(self, emit_addr, reg_addr):
87         """
88         Initialize the events client.
89         """
90         logger.debug("Creating client instance.")
91         self._callbacks = collections.defaultdict(dict)
92         self._emit_addr = emit_addr
93         self._reg_addr = reg_addr
94
95     @property
96     def callbacks(self):
97         return self._callbacks
98
99     @classmethod
100     def instance(cls):
101         """
102         Return a singleton EventsClient instance.
103         """
104         with cls._instance_lock:
105             if cls._instance is None:
106                 cls._instance = cls(_emit_addr, _reg_addr)
107         return cls._instance
108
109     def register(self, event, callback, uid=None, replace=False):
110         """
111         Register a callback to be executed when an event is received.
112
113         :param event: The event that triggers the callback.
114         :type event: Event
115         :param callback: The callback to be executed.
116         :type callback: callable(event, *content)
117         :param uid: The callback uid.
118         :type uid: str
119         :param replace: Wether an eventual callback with same ID should be
120                         replaced.
121         :type replace: bool
122
123         :return: The callback uid.
124         :rtype: str
125
126         :raises CallbackAlreadyRegisteredError: when there's already a callback
127                 identified by the given uid and replace is False.
128         """
129         logger.debug("Subscribing to event: %s" % event)
130         if not uid:
131             uid = uuid.uuid4()
132         elif uid in self._callbacks[event] and not replace:
133             raise CallbackAlreadyRegisteredError()
134         self._callbacks[event][uid] = callback
135         self._subscribe(str(event))
136         return uid
137
138     def unregister(self, event, uid=None):
139         """
140         Unregister callbacks for an event.
141
142         If uid is not None, then only the callback identified by the given uid
143         is removed. Otherwise, all callbacks for the event are removed.
144
145         :param event: The event that triggers the callback.
146         :type event: Event
147         :param uid: The callback uid.
148         :type uid: str
149         """
150         if not uid:
151             logger.debug(
152                 "Unregistering all callbacks from event %s." % event)
153             self._callbacks[event] = {}
154         else:
155             logger.debug(
156                 "Unregistering callback %s from event %s." % (uid, event))
157             if uid in self._callbacks[event]:
158                 del self._callbacks[event][uid]
159         if not self._callbacks[event]:
160             del self._callbacks[event]
161             self._unsubscribe(str(event))
162
163     def emit(self, event, *content):
164         """
165         Send an event.
166
167         :param event: The event to be sent.
168         :type event: Event
169         :param content: The content of the event.
170         :type content: list
171         """
172         logger.debug("Emitting event: (%s, %s)" % (event, content))
173         payload = str(event) + b'\0' + pickle.dumps(content)
174         self._send(payload)
175
176     def _handle_event(self, event, content):
177         """
178         Handle an incoming event.
179
180         :param event: The event to be sent.
181         :type event: Event
182         :param content: The content of the event.
183         :type content: list
184         """
185         logger.debug("Handling event %s..." % event)
186         for uid in self._callbacks[event]:
187             callback = self._callbacks[event][uid]
188             logger.debug("Executing callback %s." % uid)
189             self._run_callback(callback, event, content)
190
191     @abstractmethod
192     def _run_callback(self, callback, event, content):
193         """
194         Run a callback.
195
196         :param callback: The callback to be run.
197         :type callback: callable(event, *content)
198         :param event: The event to be sent.
199         :type event: Event
200         :param content: The content of the event.
201         :type content: list
202         """
203         pass
204
205     @abstractmethod
206     def _subscribe(self, tag):
207         """
208         Subscribe to a tag on the zmq SUB socket.
209
210         :param tag: The tag to be subscribed.
211         :type tag: str
212         """
213         pass
214
215     @abstractmethod
216     def _unsubscribe(self, tag):
217         """
218         Unsubscribe from a tag on the zmq SUB socket.
219
220         :param tag: The tag to be unsubscribed.
221         :type tag: str
222         """
223         pass
224
225     @abstractmethod
226     def _send(self, data):
227         """
228         Send data through PUSH socket.
229
230         :param data: The data to be sent.
231         :type event: str
232         """
233         pass
234
235     def shutdown(self):
236         self.__class__.reset()
237
238     @classmethod
239     def reset(cls):
240         with cls._instance_lock:
241             cls._instance = None
242
243
244 class EventsIOLoop(ioloop.ZMQIOLoop):
245     """
246     An extension of zmq's ioloop that can wait until there are no callbacks
247     in the queue before stopping.
248     """
249
250     def stop(self, wait=False):
251         """
252         Stop the I/O loop.
253
254         :param wait: Whether we should wait for callbacks in queue to finish
255                      before stopping.
256         :type wait: bool
257         """
258         if wait:
259             # prevent new callbacks from being added
260             with self._callback_lock:
261                 self._closing = True
262             # wait until all callbacks have been executed
263             while self._callbacks:
264                 time.sleep(0.1)
265         ioloop.ZMQIOLoop.stop(self)
266
267
268 class EventsClientThread(threading.Thread, EventsClient):
269     """
270     A threaded version of the events client.
271     """
272
273     def __init__(self, emit_addr, reg_addr):
274         """
275         Initialize the events client.
276         """
277         threading.Thread.__init__(self)
278         EventsClient.__init__(self, emit_addr, reg_addr)
279         self._lock = threading.Lock()
280         self._initialized = threading.Event()
281         self._config_prefix = os.path.join(
282             get_path_prefix(flags.STANDALONE), "leap", "events")
283         self._loop = None
284         self._context = None
285         self._push = None
286         self._sub = None
287
288     def _init_zmq(self):
289         """
290         Initialize ZMQ connections.
291         """
292         self._loop = EventsIOLoop()
293         self._context = zmq.Context()
294         # connect SUB first, otherwise we might miss some event sent from this
295         # same client
296         self._sub = self._zmq_connect_sub()
297         self._push = self._zmq_connect_push()
298
299     def _zmq_connect(self, socktype, address):
300         """
301         Connect to an address using with a zmq socktype.
302
303         :param socktype: The ZMQ socket type.
304         :type socktype: int
305         :param address: The address to connect to.
306         :type address: str
307
308         :return: A ZMQ connection stream.
309         :rtype: ZMQStream
310         """
311         logger.debug("Connecting %s to %s." % (socktype, address))
312         socket = self._context.socket(socktype)
313         # configure curve authentication
314         if zmq_has_curve():
315             public, private = maybe_create_and_get_certificates(
316                 self._config_prefix, "client")
317             server_public_file = os.path.join(
318                 self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key")
319             server_public, _ = zmq.auth.load_certificate(server_public_file)
320             socket.curve_publickey = public
321             socket.curve_secretkey = private
322             socket.curve_serverkey = server_public
323         stream = zmqstream.ZMQStream(socket, self._loop)
324         socket.connect(address)
325         return stream
326
327     def _zmq_connect_push(self):
328         """
329         Initialize the client's PUSH connection.
330
331         :return: A ZMQ connection stream.
332         :rtype: ZMQStream
333         """
334         return self._zmq_connect(zmq.PUSH, self._emit_addr)
335
336     def _zmq_connect_sub(self):
337         """
338         Initialize the client's SUB connection.
339
340         :return: A ZMQ connection stream.
341         :rtype: ZMQStream
342         """
343         stream = self._zmq_connect(zmq.SUB, self._reg_addr)
344         stream.on_recv(self._on_recv)
345         return stream
346
347     def _on_recv(self, msg):
348         """
349         Handle an incoming message in the SUB socket.
350
351         :param msg: The received message.
352         :type msg: str
353         """
354         ev_str, content_pickle = msg[0].split(b'\0', 1)  # undo txzmq tagging
355         event = getattr(catalog, ev_str)
356         content = pickle.loads(content_pickle)
357         self._handle_event(event, content)
358
359     def _subscribe(self, tag):
360         """
361         Subscribe from a tag on the zmq SUB socket.
362
363         :param tag: The tag to be subscribed.
364         :type tag: str
365         """
366         self._sub.socket.setsockopt(zmq.SUBSCRIBE, tag)
367
368     def _unsubscribe(self, tag):
369         """
370         Unsubscribe from a tag on the zmq SUB socket.
371
372         :param tag: The tag to be unsubscribed.
373         :type tag: str
374         """
375         self._sub.socket.setsockopt(zmq.UNSUBSCRIBE, tag)
376
377     def _send(self, data):
378         """
379         Send data through PUSH socket.
380
381         :param data: The data to be sent.
382         :type event: str
383         """
384         # add send() as a callback for ioloop so it works between threads
385         self._loop.add_callback(lambda: self._push.send(data))
386
387     def _run_callback(self, callback, event, content):
388         """
389         Run a callback.
390
391         :param callback: The callback to be run.
392         :type callback: callable(event, *content)
393         :param event: The event to be sent.
394         :type event: Event
395         :param content: The content of the event.
396         :type content: list
397         """
398         self._loop.add_callback(lambda: callback(event, *content))
399
400     def register(self, event, callback, uid=None, replace=False):
401         """
402         Register a callback to be executed when an event is received.
403
404         :param event: The event that triggers the callback.
405         :type event: Event
406         :param callback: The callback to be executed.
407         :type callback: callable(event, *content)
408         :param uid: The callback uid.
409         :type uid: str
410         :param replace: Wether an eventual callback with same ID should be
411                         replaced.
412         :type replace: bool
413
414         :return: The callback uid.
415         :rtype: str
416
417         :raises CallbackAlreadyRegisteredError: when there's already a
418                 callback identified by the given uid and replace is False.
419         """
420         self.ensure_client()
421         return EventsClient.register(
422             self, event, callback, uid=uid, replace=replace)
423
424     def unregister(self, event, uid=None):
425         """
426         Unregister callbacks for an event.
427
428         If uid is not None, then only the callback identified by the given uid
429         is removed. Otherwise, all callbacks for the event are removed.
430
431         :param event: The event that triggers the callback.
432         :type event: Event
433         :param uid: The callback uid.
434         :type uid: str
435         """
436         self.ensure_client()
437         EventsClient.unregister(self, event, uid=uid)
438
439     def emit(self, event, *content):
440         """
441         Send an event.
442
443         :param event: The event to be sent.
444         :type event: Event
445         :param content: The content of the event.
446         :type content: list
447         """
448         self.ensure_client()
449         EventsClient.emit(self, event, *content)
450
451     def run(self):
452         """
453         Run the events client.
454         """
455         logger.debug("Starting ioloop.")
456         self._init_zmq()
457         self._initialized.set()
458         self._loop.start()
459         self._loop.close()
460         logger.debug("Ioloop finished.")
461
462     def ensure_client(self):
463         """
464         Make sure the events client thread is started.
465         """
466         with self._lock:
467             if not self.is_alive():
468                 self.daemon = True
469                 self.start()
470                 self._initialized.wait()
471
472     def shutdown(self):
473         """
474         Shutdown the events client thread.
475         """
476         logger.debug("Shutting down client...")
477         with self._lock:
478             if self.is_alive():
479                 self._loop.stop(wait=True)
480         EventsClient.shutdown(self)
481
482
483 def shutdown():
484     """
485     Shutdown the events client thread.
486     """
487     EventsClientThread.instance().shutdown()
488
489
490 def register(event, callback, uid=None, replace=False):
491     """
492     Register a callback to be executed when an event is received.
493
494     :param event: The event that triggers the callback.
495     :type event: str
496     :param callback: The callback to be executed.
497     :type callback: callable(event, content)
498     :param uid: The callback uid.
499     :type uid: str
500     :param replace: Wether an eventual callback with same ID should be
501                     replaced.
502     :type replace: bool
503
504     :return: The callback uid.
505     :rtype: str
506
507     :raises CallbackAlreadyRegisteredError: when there's already a callback
508             identified by the given uid and replace is False.
509     """
510     return EventsClientThread.instance().register(
511         event, callback, uid=uid, replace=replace)
512
513
514 def unregister(event, uid=None):
515     """
516     Unregister callbacks for an event.
517
518     If uid is not None, then only the callback identified by the given uid is
519     removed. Otherwise, all callbacks for the event are removed.
520
521     :param event: The event that triggers the callback.
522     :type event: str
523     :param uid: The callback uid.
524     :type uid: str
525     """
526     return EventsClientThread.instance().unregister(event, uid=uid)
527
528
529 def emit(event, *content):
530     """
531     Send an event.
532
533     :param event: The event to be sent.
534     :type event: str
535     :param content: The content of the event.
536     :type content: list
537     """
538     return EventsClientThread.instance().emit(event, *content)
539
540
541 def instance():
542     """
543     Return an instance of the events client.
544
545     :return: An instance of the events client.
546     :rtype: EventsClientThread
547     """
548     return EventsClientThread.instance()