1a026ea5738e4a81c95972d4828beede10eace0e
[leap_pycommon.git] / src / leap / common / events / client.py
1 # -*- coding: utf-8 -*-
2 # client.py
3 # Copyright (C) 2013, 2014, 2015 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
19 """
20 The client end point of the events mechanism.
21
22 Clients are the communicating parties of the events mechanism. They
23 communicate by sending messages to a server, which in turn redistributes
24 messages to other clients.
25
26 When a client registers a callback for a given event, it also tells the
27 server that it wants to be notified whenever events of that type are sent by
28 some other client.
29 """
30
31
32 import logging
33 import collections
34 import uuid
35 import threading
36 import time
37 import pickle
38 import os
39
40 from abc import ABCMeta
41 from abc import abstractmethod
42
43 import zmq
44 from zmq.eventloop import zmqstream
45 from zmq.eventloop import ioloop
46
47 # XXX some distros don't package libsodium, so we have to be prepared for
48 #     absence of zmq.auth
49 try:
50     import zmq.auth
51 except ImportError:
52     pass
53
54 from leap.common.config import get_path_prefix
55 from leap.common.zmq_utils import zmq_has_curve
56 from leap.common.zmq_utils import maybe_create_and_get_certificates
57 from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX
58
59 from leap.common.events.errors import CallbackAlreadyRegisteredError
60 from leap.common.events.server import EMIT_ADDR
61 from leap.common.events.server import REG_ADDR
62 from leap.common.events import catalog
63
64
65 logger = logging.getLogger(__name__)
66
67
68 _emit_addr = EMIT_ADDR
69 _reg_addr = REG_ADDR
70
71
72 def configure_client(emit_addr, reg_addr):
73     global _emit_addr, _reg_addr
74     logger.debug("Configuring client with addresses: (%s, %s)" %
75                  (emit_addr, reg_addr))
76     _emit_addr = emit_addr
77     _reg_addr = reg_addr
78
79
80 class EventsClient(object):
81     """
82     A singleton client for the events mechanism.
83     """
84
85     __metaclass__ = ABCMeta
86
87     _instance = None
88     _instance_lock = threading.Lock()
89
90     def __init__(self, emit_addr, reg_addr):
91         """
92         Initialize the events client.
93         """
94         logger.debug("Creating client instance.")
95         self._callbacks = collections.defaultdict(dict)
96         self._emit_addr = emit_addr
97         self._reg_addr = reg_addr
98
99     @property
100     def callbacks(self):
101         return self._callbacks
102
103     @classmethod
104     def instance(cls):
105         """
106         Return a singleton EventsClient instance.
107         """
108         with cls._instance_lock:
109             if cls._instance is None:
110                 cls._instance = cls(_emit_addr, _reg_addr)
111         return cls._instance
112
113     def register(self, event, callback, uid=None, replace=False):
114         """
115         Register a callback to be executed when an event is received.
116
117         :param event: The event that triggers the callback.
118         :type event: Event
119         :param callback: The callback to be executed.
120         :type callback: callable(event, *content)
121         :param uid: The callback uid.
122         :type uid: str
123         :param replace: Wether an eventual callback with same ID should be
124                         replaced.
125         :type replace: bool
126
127         :return: The callback uid.
128         :rtype: str
129
130         :raises CallbackAlreadyRegisteredError: when there's already a callback
131                 identified by the given uid and replace is False.
132         """
133         logger.debug("Subscribing to event: %s" % event)
134         if not uid:
135             uid = uuid.uuid4()
136         elif uid in self._callbacks[event] and not replace:
137             raise CallbackAlreadyRegisteredError()
138         self._callbacks[event][uid] = callback
139         self._subscribe(str(event))
140         return uid
141
142     def unregister(self, event, uid=None):
143         """
144         Unregister callbacks for an event.
145
146         If uid is not None, then only the callback identified by the given uid
147         is removed. Otherwise, all callbacks for the event are removed.
148
149         :param event: The event that triggers the callback.
150         :type event: Event
151         :param uid: The callback uid.
152         :type uid: str
153         """
154         if not uid:
155             logger.debug(
156                 "Unregistering all callbacks from event %s." % event)
157             self._callbacks[event] = {}
158         else:
159             logger.debug(
160                 "Unregistering callback %s from event %s." % (uid, event))
161             if uid in self._callbacks[event]:
162                 del self._callbacks[event][uid]
163         if not self._callbacks[event]:
164             del self._callbacks[event]
165             self._unsubscribe(str(event))
166
167     def emit(self, event, *content):
168         """
169         Send an event.
170
171         :param event: The event to be sent.
172         :type event: Event
173         :param content: The content of the event.
174         :type content: list
175         """
176         logger.debug("Emitting event: (%s, %s)" % (event, content))
177         self._send(str(event) + b'\0' + pickle.dumps(content))
178
179     def _handle_event(self, event, content):
180         """
181         Handle an incoming event.
182
183         :param event: The event to be sent.
184         :type event: Event
185         :param content: The content of the event.
186         :type content: list
187         """
188         logger.debug("Handling event %s..." % event)
189         for uid in self._callbacks[event].keys():
190             callback = self._callbacks[event][uid]
191             logger.debug("Executing callback %s." % uid)
192             self._run_callback(callback, event, content)
193
194     @abstractmethod
195     def _run_callback(self, callback, event, content):
196         """
197         Run a callback.
198
199         :param callback: The callback to be run.
200         :type callback: callable(event, *content)
201         :param event: The event to be sent.
202         :type event: Event
203         :param content: The content of the event.
204         :type content: list
205         """
206         pass
207
208     @abstractmethod
209     def _subscribe(self, tag):
210         """
211         Subscribe to a tag on the zmq SUB socket.
212
213         :param tag: The tag to be subscribed.
214         :type tag: str
215         """
216         pass
217
218     @abstractmethod
219     def _unsubscribe(self, tag):
220         """
221         Unsubscribe from a tag on the zmq SUB socket.
222
223         :param tag: The tag to be unsubscribed.
224         :type tag: str
225         """
226         pass
227
228     @abstractmethod
229     def _send(self, data):
230         """
231         Send data through PUSH socket.
232
233         :param data: The data to be sent.
234         :type event: str
235         """
236         pass
237
238     def shutdown(self):
239         self.__class__.reset()
240
241     @classmethod
242     def reset(cls):
243         with cls._instance_lock:
244             cls._instance = None
245
246
247 class EventsIOLoop(ioloop.ZMQIOLoop):
248     """
249     An extension of zmq's ioloop that can wait until there are no callbacks
250     in the queue before stopping.
251     """
252
253     def stop(self, wait=False):
254         """
255         Stop the I/O loop.
256
257         :param wait: Whether we should wait for callbacks in queue to finish
258                      before stopping.
259         :type wait: bool
260         """
261         if wait:
262             # prevent new callbacks from being added
263             with self._callback_lock:
264                 self._closing = True
265             # wait until all callbacks have been executed
266             while self._callbacks:
267                 time.sleep(0.1)
268         ioloop.ZMQIOLoop.stop(self)
269
270
271 class EventsClientThread(threading.Thread, EventsClient):
272     """
273     A threaded version of the events client.
274     """
275
276     def __init__(self, emit_addr, reg_addr):
277         """
278         Initialize the events client.
279         """
280         threading.Thread.__init__(self)
281         EventsClient.__init__(self, emit_addr, reg_addr)
282         self._lock = threading.Lock()
283         self._initialized = threading.Event()
284         self._config_prefix = os.path.join(
285             get_path_prefix(), "leap", "events")
286         self._loop = None
287         self._context = None
288         self._push = None
289         self._sub = None
290
291     def _init_zmq(self):
292         """
293         Initialize ZMQ connections.
294         """
295         self._loop = EventsIOLoop()
296         self._context = zmq.Context()
297         # connect SUB first, otherwise we might miss some event sent from this
298         # same client
299         self._sub = self._zmq_connect_sub()
300         self._push = self._zmq_connect_push()
301
302     def _zmq_connect(self, socktype, address):
303         """
304         Connect to an address using with a zmq socktype.
305
306         :param socktype: The ZMQ socket type.
307         :type socktype: int
308         :param address: The address to connect to.
309         :type address: str
310
311         :return: A ZMQ connection stream.
312         :rtype: ZMQStream
313         """
314         logger.debug("Connecting %s to %s." % (socktype, address))
315         socket = self._context.socket(socktype)
316         # configure curve authentication
317         if zmq_has_curve():
318             public, private = maybe_create_and_get_certificates(
319                 self._config_prefix, "client")
320             server_public_file = os.path.join(
321                 self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key")
322             server_public, _ = zmq.auth.load_certificate(server_public_file)
323             socket.curve_publickey = public
324             socket.curve_secretkey = private
325             socket.curve_serverkey = server_public
326         stream = zmqstream.ZMQStream(socket, self._loop)
327         socket.connect(address)
328         return stream
329
330     def _zmq_connect_push(self):
331         """
332         Initialize the client's PUSH connection.
333
334         :return: A ZMQ connection stream.
335         :rtype: ZMQStream
336         """
337         return self._zmq_connect(zmq.PUSH, self._emit_addr)
338
339     def _zmq_connect_sub(self):
340         """
341         Initialize the client's SUB connection.
342
343         :return: A ZMQ connection stream.
344         :rtype: ZMQStream
345         """
346         stream = self._zmq_connect(zmq.SUB, self._reg_addr)
347         stream.on_recv(self._on_recv)
348         return stream
349
350     def _on_recv(self, msg):
351         """
352         Handle an incoming message in the SUB socket.
353
354         :param msg: The received message.
355         :type msg: str
356         """
357         ev_str, content_pickle = msg[0].split(b'\0', 1)  # undo txzmq tagging
358         event = getattr(catalog, ev_str)
359         content = pickle.loads(content_pickle)
360         self._handle_event(event, content)
361
362     def _subscribe(self, tag):
363         """
364         Subscribe from a tag on the zmq SUB socket.
365
366         :param tag: The tag to be subscribed.
367         :type tag: str
368         """
369         self._sub.socket.setsockopt(zmq.SUBSCRIBE, tag)
370
371     def _unsubscribe(self, tag):
372         """
373         Unsubscribe from a tag on the zmq SUB socket.
374
375         :param tag: The tag to be unsubscribed.
376         :type tag: str
377         """
378         self._sub.socket.setsockopt(zmq.UNSUBSCRIBE, tag)
379
380     def _send(self, data):
381         """
382         Send data through PUSH socket.
383
384         :param data: The data to be sent.
385         :type event: str
386         """
387         # add send() as a callback for ioloop so it works between threads
388         self._loop.add_callback(lambda: self._push.send(data))
389
390     def _run_callback(self, callback, event, content):
391         """
392         Run a callback.
393
394         :param callback: The callback to be run.
395         :type callback: callable(event, *content)
396         :param event: The event to be sent.
397         :type event: Event
398         :param content: The content of the event.
399         :type content: list
400         """
401         from twisted.internet import reactor
402         reactor.callFromThread(callback, event, *content)
403
404     def register(self, event, callback, uid=None, replace=False):
405         """
406         Register a callback to be executed when an event is received.
407
408         :param event: The event that triggers the callback.
409         :type event: Event
410         :param callback: The callback to be executed.
411         :type callback: callable(event, *content)
412         :param uid: The callback uid.
413         :type uid: str
414         :param replace: Wether an eventual callback with same ID should be
415                         replaced.
416         :type replace: bool
417
418         :return: The callback uid.
419         :rtype: str
420
421         :raises CallbackAlreadyRegisteredError: when there's already a
422                 callback identified by the given uid and replace is False.
423         """
424         self.ensure_client()
425         return EventsClient.register(self, event, callback, uid=uid, replace=replace)
426
427     def unregister(self, event, uid=None):
428         """
429         Unregister callbacks for an event.
430
431         If uid is not None, then only the callback identified by the given uid
432         is removed. Otherwise, all callbacks for the event are removed.
433
434         :param event: The event that triggers the callback.
435         :type event: Event
436         :param uid: The callback uid.
437         :type uid: str
438         """
439         self.ensure_client()
440         EventsClient.unregister(self, event, uid=uid)
441
442     def emit(self, event, *content):
443         """
444         Send an event.
445
446         :param event: The event to be sent.
447         :type event: Event
448         :param content: The content of the event.
449         :type content: list
450         """
451         self.ensure_client()
452         EventsClient.emit(self, event, *content)
453
454     def run(self):
455         """
456         Run the events client.
457         """
458         logger.debug("Starting ioloop.")
459         self._init_zmq()
460         self._initialized.set()
461         self._loop.start()
462         self._loop.close()
463         logger.debug("Ioloop finished.")
464
465     def ensure_client(self):
466         """
467         Make sure the events client thread is started.
468         """
469         with self._lock:
470             if not self.is_alive():
471                 self.daemon = True
472                 self.start()
473                 self._initialized.wait()
474
475     def shutdown(self):
476         """
477         Shutdown the events client thread.
478         """
479         logger.debug("Shutting down client...")
480         with self._lock:
481             if self.is_alive():
482                 self._loop.stop(wait=True)
483         EventsClient.shutdown(self)
484
485
486 def shutdown():
487     """
488     Shutdown the events client thread.
489     """
490     EventsClientThread.instance().shutdown()
491
492
493 def register(event, callback, uid=None, replace=False):
494     """
495     Register a callback to be executed when an event is received.
496
497     :param event: The event that triggers the callback.
498     :type event: str
499     :param callback: The callback to be executed.
500     :type callback: callable(event, content)
501     :param uid: The callback uid.
502     :type uid: str
503     :param replace: Wether an eventual callback with same ID should be
504                     replaced.
505     :type replace: bool
506
507     :return: The callback uid.
508     :rtype: str
509
510     :raises CallbackAlreadyRegisteredError: when there's already a callback
511             identified by the given uid and replace is False.
512     """
513     return EventsClientThread.instance().register(
514         event, callback, uid=uid, replace=replace)
515
516
517 def unregister(event, uid=None):
518     """
519     Unregister callbacks for an event.
520
521     If uid is not None, then only the callback identified by the given uid is
522     removed. Otherwise, all callbacks for the event are removed.
523
524     :param event: The event that triggers the callback.
525     :type event: str
526     :param uid: The callback uid.
527     :type uid: str
528     """
529     return EventsClientThread.instance().unregister(event, uid=uid)
530
531
532 def emit(event, *content):
533     """
534     Send an event.
535
536     :param event: The event to be sent.
537     :type event: str
538     :param content: The content of the event.
539     :type content: list
540     """
541     return EventsClientThread.instance().emit(event, *content)
542
543
544 def instance():
545     """
546     Return an instance of the events client.
547
548     :return: An instance of the events client.
549     :rtype: EventsClientThread
550     """
551     return EventsClientThread.instance()