1 # -*- coding: utf-8 -*-
3 # Copyright (C) 2013, 2014, 2015 LEAP
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 The client end point of the events mechanism.
20 Clients are the communicating parties of the events mechanism. They
21 communicate by sending messages to a server, which in turn redistributes
22 messages to other clients.
24 When a client registers a callback for a given event, it also tells the
25 server that it wants to be notified whenever events of that type are sent by
36 from abc import ABCMeta
37 from abc import abstractmethod
40 from zmq.eventloop import zmqstream
41 from zmq.eventloop import ioloop
43 # XXX some distros don't package libsodium, so we have to be prepared for
50 from leap.common.config import flags, get_path_prefix
51 from leap.common.zmq_utils import zmq_has_curve
52 from leap.common.zmq_utils import maybe_create_and_get_certificates
53 from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX
55 from leap.common.events.errors import CallbackAlreadyRegisteredError
56 from leap.common.events.server import EMIT_ADDR
57 from leap.common.events.server import REG_ADDR
58 from leap.common.events import catalog
61 logger = logging.getLogger(__name__)
64 _emit_addr = EMIT_ADDR
68 def configure_client(emit_addr, reg_addr):
69 global _emit_addr, _reg_addr
70 logger.debug("Configuring client with addresses: (%s, %s)" %
71 (emit_addr, reg_addr))
72 _emit_addr = emit_addr
76 class EventsClient(object):
78 A singleton client for the events mechanism.
81 __metaclass__ = ABCMeta
84 _instance_lock = threading.Lock()
86 def __init__(self, emit_addr, reg_addr):
88 Initialize the events client.
90 logger.debug("Creating client instance.")
91 self._callbacks = collections.defaultdict(dict)
92 self._emit_addr = emit_addr
93 self._reg_addr = reg_addr
97 return self._callbacks
102 Return a singleton EventsClient instance.
104 with cls._instance_lock:
105 if cls._instance is None:
106 cls._instance = cls(_emit_addr, _reg_addr)
109 def register(self, event, callback, uid=None, replace=False):
111 Register a callback to be executed when an event is received.
113 :param event: The event that triggers the callback.
115 :param callback: The callback to be executed.
116 :type callback: callable(event, *content)
117 :param uid: The callback uid.
119 :param replace: Wether an eventual callback with same ID should be
123 :return: The callback uid.
126 :raises CallbackAlreadyRegisteredError: when there's already a callback
127 identified by the given uid and replace is False.
129 logger.debug("Subscribing to event: %s" % event)
132 elif uid in self._callbacks[event] and not replace:
133 raise CallbackAlreadyRegisteredError()
134 self._callbacks[event][uid] = callback
135 self._subscribe(str(event))
138 def unregister(self, event, uid=None):
140 Unregister callbacks for an event.
142 If uid is not None, then only the callback identified by the given uid
143 is removed. Otherwise, all callbacks for the event are removed.
145 :param event: The event that triggers the callback.
147 :param uid: The callback uid.
152 "Unregistering all callbacks from event %s." % event)
153 self._callbacks[event] = {}
156 "Unregistering callback %s from event %s." % (uid, event))
157 if uid in self._callbacks[event]:
158 del self._callbacks[event][uid]
159 if not self._callbacks[event]:
160 del self._callbacks[event]
161 self._unsubscribe(str(event))
163 def emit(self, event, *content):
167 :param event: The event to be sent.
169 :param content: The content of the event.
172 logger.debug("Emitting event: (%s, %s)" % (event, content))
173 payload = str(event) + b'\0' + pickle.dumps(content)
176 def _handle_event(self, event, content):
178 Handle an incoming event.
180 :param event: The event to be sent.
182 :param content: The content of the event.
185 logger.debug("Handling event %s..." % event)
186 for uid in self._callbacks[event]:
187 callback = self._callbacks[event][uid]
188 logger.debug("Executing callback %s." % uid)
189 self._run_callback(callback, event, content)
192 def _run_callback(self, callback, event, content):
196 :param callback: The callback to be run.
197 :type callback: callable(event, *content)
198 :param event: The event to be sent.
200 :param content: The content of the event.
206 def _subscribe(self, tag):
208 Subscribe to a tag on the zmq SUB socket.
210 :param tag: The tag to be subscribed.
216 def _unsubscribe(self, tag):
218 Unsubscribe from a tag on the zmq SUB socket.
220 :param tag: The tag to be unsubscribed.
226 def _send(self, data):
228 Send data through PUSH socket.
230 :param data: The data to be sent.
236 self.__class__.reset()
240 with cls._instance_lock:
244 class EventsIOLoop(ioloop.ZMQIOLoop):
246 An extension of zmq's ioloop that can wait until there are no callbacks
247 in the queue before stopping.
250 def stop(self, wait=False):
254 :param wait: Whether we should wait for callbacks in queue to finish
259 # prevent new callbacks from being added
260 with self._callback_lock:
262 # wait until all callbacks have been executed
263 while self._callbacks:
265 ioloop.ZMQIOLoop.stop(self)
268 class EventsClientThread(threading.Thread, EventsClient):
270 A threaded version of the events client.
273 def __init__(self, emit_addr, reg_addr):
275 Initialize the events client.
277 threading.Thread.__init__(self)
278 EventsClient.__init__(self, emit_addr, reg_addr)
279 self._lock = threading.Lock()
280 self._initialized = threading.Event()
281 self._config_prefix = os.path.join(
282 get_path_prefix(flags.STANDALONE), "leap", "events")
290 Initialize ZMQ connections.
292 self._loop = EventsIOLoop()
293 self._context = zmq.Context()
294 # connect SUB first, otherwise we might miss some event sent from this
296 self._sub = self._zmq_connect_sub()
297 self._push = self._zmq_connect_push()
299 def _zmq_connect(self, socktype, address):
301 Connect to an address using with a zmq socktype.
303 :param socktype: The ZMQ socket type.
305 :param address: The address to connect to.
308 :return: A ZMQ connection stream.
311 logger.debug("Connecting %s to %s." % (socktype, address))
312 socket = self._context.socket(socktype)
313 # configure curve authentication
315 public, private = maybe_create_and_get_certificates(
316 self._config_prefix, "client")
317 server_public_file = os.path.join(
318 self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key")
319 server_public, _ = zmq.auth.load_certificate(server_public_file)
320 socket.curve_publickey = public
321 socket.curve_secretkey = private
322 socket.curve_serverkey = server_public
323 stream = zmqstream.ZMQStream(socket, self._loop)
324 socket.connect(address)
327 def _zmq_connect_push(self):
329 Initialize the client's PUSH connection.
331 :return: A ZMQ connection stream.
334 return self._zmq_connect(zmq.PUSH, self._emit_addr)
336 def _zmq_connect_sub(self):
338 Initialize the client's SUB connection.
340 :return: A ZMQ connection stream.
343 stream = self._zmq_connect(zmq.SUB, self._reg_addr)
344 stream.on_recv(self._on_recv)
347 def _on_recv(self, msg):
349 Handle an incoming message in the SUB socket.
351 :param msg: The received message.
354 ev_str, content_pickle = msg[0].split(b'\0', 1) # undo txzmq tagging
355 event = getattr(catalog, ev_str)
356 content = pickle.loads(content_pickle)
357 self._handle_event(event, content)
359 def _subscribe(self, tag):
361 Subscribe from a tag on the zmq SUB socket.
363 :param tag: The tag to be subscribed.
366 self._sub.socket.setsockopt(zmq.SUBSCRIBE, tag)
368 def _unsubscribe(self, tag):
370 Unsubscribe from a tag on the zmq SUB socket.
372 :param tag: The tag to be unsubscribed.
375 self._sub.socket.setsockopt(zmq.UNSUBSCRIBE, tag)
377 def _send(self, data):
379 Send data through PUSH socket.
381 :param data: The data to be sent.
384 # add send() as a callback for ioloop so it works between threads
385 self._loop.add_callback(lambda: self._push.send(data))
387 def _run_callback(self, callback, event, content):
391 :param callback: The callback to be run.
392 :type callback: callable(event, *content)
393 :param event: The event to be sent.
395 :param content: The content of the event.
398 self._loop.add_callback(lambda: callback(event, *content))
400 def register(self, event, callback, uid=None, replace=False):
402 Register a callback to be executed when an event is received.
404 :param event: The event that triggers the callback.
406 :param callback: The callback to be executed.
407 :type callback: callable(event, *content)
408 :param uid: The callback uid.
410 :param replace: Wether an eventual callback with same ID should be
414 :return: The callback uid.
417 :raises CallbackAlreadyRegisteredError: when there's already a
418 callback identified by the given uid and replace is False.
421 return EventsClient.register(
422 self, event, callback, uid=uid, replace=replace)
424 def unregister(self, event, uid=None):
426 Unregister callbacks for an event.
428 If uid is not None, then only the callback identified by the given uid
429 is removed. Otherwise, all callbacks for the event are removed.
431 :param event: The event that triggers the callback.
433 :param uid: The callback uid.
437 EventsClient.unregister(self, event, uid=uid)
439 def emit(self, event, *content):
443 :param event: The event to be sent.
445 :param content: The content of the event.
449 EventsClient.emit(self, event, *content)
453 Run the events client.
455 logger.debug("Starting ioloop.")
457 self._initialized.set()
460 logger.debug("Ioloop finished.")
462 def ensure_client(self):
464 Make sure the events client thread is started.
467 if not self.is_alive():
470 self._initialized.wait()
474 Shutdown the events client thread.
476 logger.debug("Shutting down client...")
479 self._loop.stop(wait=True)
480 EventsClient.shutdown(self)
485 Shutdown the events client thread.
487 EventsClientThread.instance().shutdown()
490 def register(event, callback, uid=None, replace=False):
492 Register a callback to be executed when an event is received.
494 :param event: The event that triggers the callback.
496 :param callback: The callback to be executed.
497 :type callback: callable(event, content)
498 :param uid: The callback uid.
500 :param replace: Wether an eventual callback with same ID should be
504 :return: The callback uid.
507 :raises CallbackAlreadyRegisteredError: when there's already a callback
508 identified by the given uid and replace is False.
510 return EventsClientThread.instance().register(
511 event, callback, uid=uid, replace=replace)
514 def unregister(event, uid=None):
516 Unregister callbacks for an event.
518 If uid is not None, then only the callback identified by the given uid is
519 removed. Otherwise, all callbacks for the event are removed.
521 :param event: The event that triggers the callback.
523 :param uid: The callback uid.
526 return EventsClientThread.instance().unregister(event, uid=uid)
529 def emit(event, *content):
533 :param event: The event to be sent.
535 :param content: The content of the event.
538 return EventsClientThread.instance().emit(event, *content)
543 Return an instance of the events client.
545 :return: An instance of the events client.
546 :rtype: EventsClientThread
548 return EventsClientThread.instance()