0706fe3786e2d46b4bbc84493ccd4406785721ee
[leap_pycommon.git] / src / leap / common / events / client.py
1 # -*- coding: utf-8 -*-
2 # client.py
3 # Copyright (C) 2013, 2014, 2015 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
19 """
20 The client end point of the events mechanism.
21
22 Clients are the communicating parties of the events mechanism. They
23 communicate by sending messages to a server, which in turn redistributes
24 messages to other clients.
25
26 When a client registers a callback for a given event, it also tells the
27 server that it wants to be notified whenever events of that type are sent by
28 some other client.
29 """
30
31
32 import logging
33 import collections
34 import uuid
35 import threading
36 import time
37 import pickle
38 import os
39
40 from abc import ABCMeta
41 from abc import abstractmethod
42
43 import zmq
44 from zmq.eventloop import zmqstream
45 from zmq.eventloop import ioloop
46
47 # XXX some distros don't package libsodium, so we have to be prepared for
48 #     absence of zmq.auth
49 try:
50     import zmq.auth
51 except ImportError:
52     pass
53
54 from leap.common.config import get_path_prefix
55 from leap.common.zmq_utils import zmq_has_curve
56 from leap.common.zmq_utils import maybe_create_and_get_certificates
57 from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX
58
59 from leap.common.events.errors import CallbackAlreadyRegisteredError
60 from leap.common.events.server import EMIT_ADDR
61 from leap.common.events.server import REG_ADDR
62 from leap.common.events import catalog
63
64
65 logger = logging.getLogger(__name__)
66
67
68 _emit_addr = EMIT_ADDR
69 _reg_addr = REG_ADDR
70
71
72 def configure_client(emit_addr, reg_addr):
73     global _emit_addr, _reg_addr
74     logger.debug("Configuring client with addresses: (%s, %s)" %
75                  (emit_addr, reg_addr))
76     _emit_addr = emit_addr
77     _reg_addr = reg_addr
78
79
80 class EventsClient(object):
81     """
82     A singleton client for the events mechanism.
83     """
84
85     __metaclass__ = ABCMeta
86
87     _instance = None
88     _instance_lock = threading.Lock()
89
90     def __init__(self, emit_addr, reg_addr):
91         """
92         Initialize the events client.
93         """
94         logger.debug("Creating client instance.")
95         self._callbacks = collections.defaultdict(dict)
96         self._emit_addr = emit_addr
97         self._reg_addr = reg_addr
98
99     @property
100     def callbacks(self):
101         return self._callbacks
102
103     @classmethod
104     def instance(cls):
105         """
106         Return a singleton EventsClient instance.
107         """
108         with cls._instance_lock:
109             if cls._instance is None:
110                 cls._instance = cls(_emit_addr, _reg_addr)
111         return cls._instance
112
113     def register(self, event, callback, uid=None, replace=False):
114         """
115         Register a callback to be executed when an event is received.
116
117         :param event: The event that triggers the callback.
118         :type event: Event
119         :param callback: The callback to be executed.
120         :type callback: callable(event, *content)
121         :param uid: The callback uid.
122         :type uid: str
123         :param replace: Wether an eventual callback with same ID should be
124                         replaced.
125         :type replace: bool
126
127         :return: The callback uid.
128         :rtype: str
129
130         :raises CallbackAlreadyRegisteredError: when there's already a callback
131                 identified by the given uid and replace is False.
132         """
133         logger.debug("Subscribing to event: %s" % event)
134         if not uid:
135             uid = uuid.uuid4()
136         elif uid in self._callbacks[event] and not replace:
137             raise CallbackAlreadyRegisteredError()
138         self._callbacks[event][uid] = callback
139         self._subscribe(str(event))
140         return uid
141
142     def unregister(self, event, uid=None):
143         """
144         Unregister callbacks for an event.
145
146         If uid is not None, then only the callback identified by the given uid
147         is removed. Otherwise, all callbacks for the event are removed.
148
149         :param event: The event that triggers the callback.
150         :type event: Event
151         :param uid: The callback uid.
152         :type uid: str
153         """
154         if not uid:
155             logger.debug(
156                 "Unregistering all callbacks from event %s." % event)
157             self._callbacks[event] = {}
158         else:
159             logger.debug(
160                 "Unregistering callback %s from event %s." % (uid, event))
161             if uid in self._callbacks[event]:
162                 del self._callbacks[event][uid]
163         if not self._callbacks[event]:
164             del self._callbacks[event]
165             self._unsubscribe(str(event))
166
167     def emit(self, event, *content):
168         """
169         Send an event.
170
171         :param event: The event to be sent.
172         :type event: Event
173         :param content: The content of the event.
174         :type content: list
175         """
176         logger.debug("Sending event: (%s, %s)" % (event, content))
177         self._send(str(event) + b'\0' + pickle.dumps(content))
178
179     def _handle_event(self, event, content):
180         """
181         Handle an incoming event.
182
183         :param msg: The incoming message.
184         :type msg: list(str)
185         """
186         logger.debug("Handling event %s..." % event)
187         for uid in self._callbacks[event].keys():
188             callback = self._callbacks[event][uid]
189             logger.debug("Executing callback %s." % uid)
190             callback(event, *content)
191
192     @abstractmethod
193     def _subscribe(self, tag):
194         """
195         Subscribe to a tag on the zmq SUB socket.
196
197         :param tag: The tag to be subscribed.
198         :type tag: str
199         """
200         pass
201
202     @abstractmethod
203     def _unsubscribe(self, tag):
204         """
205         Unsubscribe from a tag on the zmq SUB socket.
206
207         :param tag: The tag to be unsubscribed.
208         :type tag: str
209         """
210         pass
211
212     @abstractmethod
213     def _send(self, data):
214         """
215         Send data through PUSH socket.
216
217         :param data: The data to be sent.
218         :type event: str
219         """
220         pass
221
222     def shutdown(self):
223         self.__class__.reset()
224
225     @classmethod
226     def reset(cls):
227         with cls._instance_lock:
228             cls._instance = None
229
230
231 class EventsIOLoop(ioloop.ZMQIOLoop):
232     """
233     An extension of zmq's ioloop that can wait until there are no callbacks
234     in the queue before stopping.
235     """
236
237     def stop(self, wait=False):
238         """
239         Stop the I/O loop.
240
241         :param wait: Whether we should wait for callbacks in queue to finish
242                      before stopping.
243         :type wait: bool
244         """
245         if wait:
246             # prevent new callbacks from being added
247             with self._callback_lock:
248                 self._closing = True
249             # wait until all callbacks have been executed
250             while self._callbacks:
251                 time.sleep(0.1)
252         ioloop.ZMQIOLoop.stop(self)
253
254
255 class EventsClientThread(threading.Thread, EventsClient):
256     """
257     A threaded version of the events client.
258     """
259
260     def __init__(self, emit_addr, reg_addr):
261         """
262         Initialize the events client.
263         """
264         threading.Thread.__init__(self)
265         EventsClient.__init__(self, emit_addr, reg_addr)
266         self._lock = threading.Lock()
267         self._initialized = threading.Event()
268         self._config_prefix = os.path.join(
269             get_path_prefix(), "leap", "events")
270         self._loop = None
271         self._context = None
272         self._push = None
273         self._sub = None
274
275     def _init_zmq(self):
276         """
277         Initialize ZMQ connections.
278         """
279         self._loop = EventsIOLoop()
280         self._context = zmq.Context()
281         # connect SUB first, otherwise we might miss some event sent from this
282         # same client
283         self._sub = self._zmq_connect_sub()
284         self._push = self._zmq_connect_push()
285
286     def _zmq_connect(self, socktype, address):
287         """
288         Connect to an address using with a zmq socktype.
289
290         :param socktype: The ZMQ socket type.
291         :type socktype: int
292         :param address: The address to connect to.
293         :type address: str
294
295         :return: A ZMQ connection stream.
296         :rtype: ZMQStream
297         """
298         logger.debug("Connecting %s to %s." % (socktype, address))
299         socket = self._context.socket(socktype)
300         # configure curve authentication
301         if zmq_has_curve():
302             public, private = maybe_create_and_get_certificates(
303                 self._config_prefix, "client")
304             server_public_file = os.path.join(
305                 self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key")
306             server_public, _ = zmq.auth.load_certificate(server_public_file)
307             socket.curve_publickey = public
308             socket.curve_secretkey = private
309             socket.curve_serverkey = server_public
310         stream = zmqstream.ZMQStream(socket, self._loop)
311         socket.connect(address)
312         return stream
313
314     def _zmq_connect_push(self):
315         """
316         Initialize the client's PUSH connection.
317
318         :return: A ZMQ connection stream.
319         :rtype: ZMQStream
320         """
321         return self._zmq_connect(zmq.PUSH, self._emit_addr)
322
323     def _zmq_connect_sub(self):
324         """
325         Initialize the client's SUB connection.
326
327         :return: A ZMQ connection stream.
328         :rtype: ZMQStream
329         """
330         stream = self._zmq_connect(zmq.SUB, self._reg_addr)
331         stream.on_recv(self._on_recv)
332         return stream
333
334     def _on_recv(self, msg):
335         """
336         Handle an incoming message in the SUB socket.
337
338         :param msg: The received message.
339         :type msg: str
340         """
341         ev_str, content_pickle = msg[0].split(b'\0', 1)  # undo txzmq tagging
342         event = getattr(catalog, ev_str)
343         content = pickle.loads(content_pickle)
344         self._handle_event(event, content)
345
346     def _subscribe(self, tag):
347         """
348         Subscribe from a tag on the zmq SUB socket.
349
350         :param tag: The tag to be subscribed.
351         :type tag: str
352         """
353         self._sub.socket.setsockopt(zmq.SUBSCRIBE, tag)
354
355     def _unsubscribe(self, tag):
356         """
357         Unsubscribe from a tag on the zmq SUB socket.
358
359         :param tag: The tag to be unsubscribed.
360         :type tag: str
361         """
362         self._sub.socket.setsockopt(zmq.UNSUBSCRIBE, tag)
363
364     def _send(self, data):
365         """
366         Send data through PUSH socket.
367
368         :param data: The data to be sent.
369         :type event: str
370         """
371         logger.debug("Sending data: %s" % data)
372         # add send() as a callback for ioloop so it works between threads
373         self._loop.add_callback(lambda: self._push.send(data))
374
375     def register(self, event, callback, uid=None, replace=False):
376         """
377         Register a callback to be executed when an event is received.
378
379         :param event: The event that triggers the callback.
380         :type event: Event
381         :param callback: The callback to be executed.
382         :type callback: callable(event, *content)
383         :param uid: The callback uid.
384         :type uid: str
385         :param replace: Wether an eventual callback with same ID should be
386                         replaced.
387         :type replace: bool
388
389         :return: The callback uid.
390         :rtype: str
391
392         :raises CallbackAlreadyRegisteredError: when there's already a
393                 callback identified by the given uid and replace is False.
394         """
395         self.ensure_client()
396         return EventsClient.register(self, event, callback, uid=uid, replace=replace)
397
398     def unregister(self, event, uid=None):
399         """
400         Unregister callbacks for an event.
401
402         If uid is not None, then only the callback identified by the given uid
403         is removed. Otherwise, all callbacks for the event are removed.
404
405         :param event: The event that triggers the callback.
406         :type event: Event
407         :param uid: The callback uid.
408         :type uid: str
409         """
410         self.ensure_client()
411         EventsClient.unregister(self, event, uid=uid)
412
413     def emit(self, event, *content):
414         """
415         Send an event.
416
417         :param event: The event to be sent.
418         :type event: Event
419         :param content: The content of the event.
420         :type content: list
421         """
422         self.ensure_client()
423         EventsClient.emit(self, event, *content)
424
425     def run(self):
426         """
427         Run the events client.
428         """
429         logger.debug("Starting ioloop.")
430         self._init_zmq()
431         self._initialized.set()
432         self._loop.start()
433         self._loop.close()
434         logger.debug("Ioloop finished.")
435
436     def ensure_client(self):
437         """
438         Make sure the events client thread is started.
439         """
440         with self._lock:
441             if not self.is_alive():
442                 self.daemon = True
443                 self.start()
444                 self._initialized.wait()
445
446     def shutdown(self):
447         """
448         Shutdown the events client thread.
449         """
450         logger.debug("Shutting down client...")
451         with self._lock:
452             if self.is_alive():
453                 self._loop.stop(wait=True)
454         EventsClient.shutdown(self)
455
456
457 def shutdown():
458     """
459     Shutdown the events client thread.
460     """
461     EventsClientThread.instance().shutdown()
462
463
464 def register(event, callback, uid=None, replace=False):
465     """
466     Register a callback to be executed when an event is received.
467
468     :param event: The event that triggers the callback.
469     :type event: str
470     :param callback: The callback to be executed.
471     :type callback: callable(event, content)
472     :param uid: The callback uid.
473     :type uid: str
474     :param replace: Wether an eventual callback with same ID should be
475                     replaced.
476     :type replace: bool
477
478     :return: The callback uid.
479     :rtype: str
480
481     :raises CallbackAlreadyRegisteredError: when there's already a callback
482             identified by the given uid and replace is False.
483     """
484     return EventsClientThread.instance().register(
485         event, callback, uid=uid, replace=replace)
486
487
488 def unregister(event, uid=None):
489     """
490     Unregister callbacks for an event.
491
492     If uid is not None, then only the callback identified by the given uid is
493     removed. Otherwise, all callbacks for the event are removed.
494
495     :param event: The event that triggers the callback.
496     :type event: str
497     :param uid: The callback uid.
498     :type uid: str
499     """
500     return EventsClientThread.instance().unregister(event, uid=uid)
501
502
503 def emit(event, *content):
504     """
505     Send an event.
506
507     :param event: The event to be sent.
508     :type event: str
509     :param content: The content of the event.
510     :type content: list
511     """
512     return EventsClientThread.instance().emit(event, *content)
513
514
515 def instance():
516     """
517     Return an instance of the events client.
518
519     :return: An instance of the events client.
520     :rtype: EventsClientThread
521     """
522     return EventsClientThread.instance()