1 # -*- coding: utf-8 -*-
3 # Copyright (C) 2013, 2014, 2015 LEAP
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 The client end point of the events mechanism.
22 Clients are the communicating parties of the events mechanism. They
23 communicate by sending messages to a server, which in turn redistributes
24 messages to other clients.
26 When a client registers a callback for a given event, it also tells the
27 server that it wants to be notified whenever events of that type are sent by
40 from abc import ABCMeta
41 from abc import abstractmethod
44 from zmq.eventloop import zmqstream
45 from zmq.eventloop import ioloop
47 # XXX some distros don't package libsodium, so we have to be prepared for
54 from leap.common.config import get_path_prefix
55 from leap.common.zmq_utils import zmq_has_curve
56 from leap.common.zmq_utils import maybe_create_and_get_certificates
57 from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX
59 from leap.common.events.errors import CallbackAlreadyRegisteredError
60 from leap.common.events.server import EMIT_ADDR
61 from leap.common.events.server import REG_ADDR
62 from leap.common.events import catalog
65 logger = logging.getLogger(__name__)
68 _emit_addr = EMIT_ADDR
72 def configure_client(emit_addr, reg_addr):
73 global _emit_addr, _reg_addr
74 logger.debug("Configuring client with addresses: (%s, %s)" %
75 (emit_addr, reg_addr))
76 _emit_addr = emit_addr
80 class EventsClient(object):
82 A singleton client for the events mechanism.
85 __metaclass__ = ABCMeta
88 _instance_lock = threading.Lock()
90 def __init__(self, emit_addr, reg_addr):
92 Initialize the events client.
94 logger.debug("Creating client instance.")
95 self._callbacks = collections.defaultdict(dict)
96 self._emit_addr = emit_addr
97 self._reg_addr = reg_addr
101 return self._callbacks
106 Return a singleton EventsClient instance.
108 with cls._instance_lock:
109 if cls._instance is None:
110 cls._instance = cls(_emit_addr, _reg_addr)
113 def register(self, event, callback, uid=None, replace=False):
115 Register a callback to be executed when an event is received.
117 :param event: The event that triggers the callback.
119 :param callback: The callback to be executed.
120 :type callback: callable(event, *content)
121 :param uid: The callback uid.
123 :param replace: Wether an eventual callback with same ID should be
127 :return: The callback uid.
130 :raises CallbackAlreadyRegisteredError: when there's already a callback
131 identified by the given uid and replace is False.
133 logger.debug("Subscribing to event: %s" % event)
136 elif uid in self._callbacks[event] and not replace:
137 raise CallbackAlreadyRegisteredError()
138 self._callbacks[event][uid] = callback
139 self._subscribe(str(event))
142 def unregister(self, event, uid=None):
144 Unregister callbacks for an event.
146 If uid is not None, then only the callback identified by the given uid
147 is removed. Otherwise, all callbacks for the event are removed.
149 :param event: The event that triggers the callback.
151 :param uid: The callback uid.
156 "Unregistering all callbacks from event %s." % event)
157 self._callbacks[event] = {}
160 "Unregistering callback %s from event %s." % (uid, event))
161 if uid in self._callbacks[event]:
162 del self._callbacks[event][uid]
163 if not self._callbacks[event]:
164 del self._callbacks[event]
165 self._unsubscribe(str(event))
167 def emit(self, event, *content):
171 :param event: The event to be sent.
173 :param content: The content of the event.
176 logger.debug("Emitting event: (%s, %s)" % (event, content))
177 payload = str(event) + b'\0' + pickle.dumps(content)
180 def _handle_event(self, event, content):
182 Handle an incoming event.
184 :param event: The event to be sent.
186 :param content: The content of the event.
189 logger.debug("Handling event %s..." % event)
190 for uid in self._callbacks[event]:
191 callback = self._callbacks[event][uid]
192 logger.debug("Executing callback %s." % uid)
193 self._run_callback(callback, event, content)
196 def _run_callback(self, callback, event, content):
200 :param callback: The callback to be run.
201 :type callback: callable(event, *content)
202 :param event: The event to be sent.
204 :param content: The content of the event.
210 def _subscribe(self, tag):
212 Subscribe to a tag on the zmq SUB socket.
214 :param tag: The tag to be subscribed.
220 def _unsubscribe(self, tag):
222 Unsubscribe from a tag on the zmq SUB socket.
224 :param tag: The tag to be unsubscribed.
230 def _send(self, data):
232 Send data through PUSH socket.
234 :param data: The data to be sent.
240 self.__class__.reset()
244 with cls._instance_lock:
248 class EventsIOLoop(ioloop.ZMQIOLoop):
250 An extension of zmq's ioloop that can wait until there are no callbacks
251 in the queue before stopping.
254 def stop(self, wait=False):
258 :param wait: Whether we should wait for callbacks in queue to finish
263 # prevent new callbacks from being added
264 with self._callback_lock:
266 # wait until all callbacks have been executed
267 while self._callbacks:
269 ioloop.ZMQIOLoop.stop(self)
272 class EventsClientThread(threading.Thread, EventsClient):
274 A threaded version of the events client.
277 def __init__(self, emit_addr, reg_addr):
279 Initialize the events client.
281 threading.Thread.__init__(self)
282 EventsClient.__init__(self, emit_addr, reg_addr)
283 self._lock = threading.Lock()
284 self._initialized = threading.Event()
285 self._config_prefix = os.path.join(
286 get_path_prefix(), "leap", "events")
294 Initialize ZMQ connections.
296 self._loop = EventsIOLoop()
297 self._context = zmq.Context()
298 # connect SUB first, otherwise we might miss some event sent from this
300 self._sub = self._zmq_connect_sub()
301 self._push = self._zmq_connect_push()
303 def _zmq_connect(self, socktype, address):
305 Connect to an address using with a zmq socktype.
307 :param socktype: The ZMQ socket type.
309 :param address: The address to connect to.
312 :return: A ZMQ connection stream.
315 logger.debug("Connecting %s to %s." % (socktype, address))
316 socket = self._context.socket(socktype)
317 # configure curve authentication
319 public, private = maybe_create_and_get_certificates(
320 self._config_prefix, "client")
321 server_public_file = os.path.join(
322 self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key")
323 server_public, _ = zmq.auth.load_certificate(server_public_file)
324 socket.curve_publickey = public
325 socket.curve_secretkey = private
326 socket.curve_serverkey = server_public
327 stream = zmqstream.ZMQStream(socket, self._loop)
328 socket.connect(address)
331 def _zmq_connect_push(self):
333 Initialize the client's PUSH connection.
335 :return: A ZMQ connection stream.
338 return self._zmq_connect(zmq.PUSH, self._emit_addr)
340 def _zmq_connect_sub(self):
342 Initialize the client's SUB connection.
344 :return: A ZMQ connection stream.
347 stream = self._zmq_connect(zmq.SUB, self._reg_addr)
348 stream.on_recv(self._on_recv)
351 def _on_recv(self, msg):
353 Handle an incoming message in the SUB socket.
355 :param msg: The received message.
358 ev_str, content_pickle = msg[0].split(b'\0', 1) # undo txzmq tagging
359 event = getattr(catalog, ev_str)
360 content = pickle.loads(content_pickle)
361 self._handle_event(event, content)
363 def _subscribe(self, tag):
365 Subscribe from a tag on the zmq SUB socket.
367 :param tag: The tag to be subscribed.
370 self._sub.socket.setsockopt(zmq.SUBSCRIBE, tag)
372 def _unsubscribe(self, tag):
374 Unsubscribe from a tag on the zmq SUB socket.
376 :param tag: The tag to be unsubscribed.
379 self._sub.socket.setsockopt(zmq.UNSUBSCRIBE, tag)
381 def _send(self, data):
383 Send data through PUSH socket.
385 :param data: The data to be sent.
388 # add send() as a callback for ioloop so it works between threads
389 self._loop.add_callback(lambda: self._push.send(data))
391 def _run_callback(self, callback, event, content):
395 :param callback: The callback to be run.
396 :type callback: callable(event, *content)
397 :param event: The event to be sent.
399 :param content: The content of the event.
402 self._loop.add_callback(lambda: callback(event, *content))
404 def register(self, event, callback, uid=None, replace=False):
406 Register a callback to be executed when an event is received.
408 :param event: The event that triggers the callback.
410 :param callback: The callback to be executed.
411 :type callback: callable(event, *content)
412 :param uid: The callback uid.
414 :param replace: Wether an eventual callback with same ID should be
418 :return: The callback uid.
421 :raises CallbackAlreadyRegisteredError: when there's already a
422 callback identified by the given uid and replace is False.
425 return EventsClient.register(
426 self, event, callback, uid=uid, replace=replace)
428 def unregister(self, event, uid=None):
430 Unregister callbacks for an event.
432 If uid is not None, then only the callback identified by the given uid
433 is removed. Otherwise, all callbacks for the event are removed.
435 :param event: The event that triggers the callback.
437 :param uid: The callback uid.
441 EventsClient.unregister(self, event, uid=uid)
443 def emit(self, event, *content):
447 :param event: The event to be sent.
449 :param content: The content of the event.
453 EventsClient.emit(self, event, *content)
457 Run the events client.
459 logger.debug("Starting ioloop.")
461 self._initialized.set()
464 logger.debug("Ioloop finished.")
466 def ensure_client(self):
468 Make sure the events client thread is started.
471 if not self.is_alive():
474 self._initialized.wait()
478 Shutdown the events client thread.
480 logger.debug("Shutting down client...")
483 self._loop.stop(wait=True)
484 EventsClient.shutdown(self)
489 Shutdown the events client thread.
491 EventsClientThread.instance().shutdown()
494 def register(event, callback, uid=None, replace=False):
496 Register a callback to be executed when an event is received.
498 :param event: The event that triggers the callback.
500 :param callback: The callback to be executed.
501 :type callback: callable(event, content)
502 :param uid: The callback uid.
504 :param replace: Wether an eventual callback with same ID should be
508 :return: The callback uid.
511 :raises CallbackAlreadyRegisteredError: when there's already a callback
512 identified by the given uid and replace is False.
514 return EventsClientThread.instance().register(
515 event, callback, uid=uid, replace=replace)
518 def unregister(event, uid=None):
520 Unregister callbacks for an event.
522 If uid is not None, then only the callback identified by the given uid is
523 removed. Otherwise, all callbacks for the event are removed.
525 :param event: The event that triggers the callback.
527 :param uid: The callback uid.
530 return EventsClientThread.instance().unregister(event, uid=uid)
533 def emit(event, *content):
537 :param event: The event to be sent.
539 :param content: The content of the event.
542 return EventsClientThread.instance().emit(event, *content)
547 Return an instance of the events client.
549 :return: An instance of the events client.
550 :rtype: EventsClientThread
552 return EventsClientThread.instance()