1 # -*- coding: utf-8 -*-
3 # Copyright (C) 2013, 2014, 2015 LEAP
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 The client end point of the events mechanism.
22 Clients are the communicating parties of the events mechanism. They
23 communicate by sending messages to a server, which in turn redistributes
24 messages to other clients.
26 When a client registers a callback for a given event, it also tells the
27 server that it wants to be notified whenever events of that type are sent by
40 from abc import ABCMeta
41 from abc import abstractmethod
44 from zmq.eventloop import zmqstream
45 from zmq.eventloop import ioloop
47 # XXX some distros don't package libsodium, so we have to be prepared for
54 from leap.common.config import get_path_prefix
55 from leap.common.zmq_utils import zmq_has_curve
56 from leap.common.zmq_utils import maybe_create_and_get_certificates
57 from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX
59 from leap.common.events.errors import CallbackAlreadyRegisteredError
60 from leap.common.events.server import EMIT_ADDR
61 from leap.common.events.server import REG_ADDR
62 from leap.common.events import catalog
65 logger = logging.getLogger(__name__)
68 _emit_addr = EMIT_ADDR
72 def configure_client(emit_addr, reg_addr):
73 global _emit_addr, _reg_addr
74 logger.debug("Configuring client with addresses: (%s, %s)" %
75 (emit_addr, reg_addr))
76 _emit_addr = emit_addr
80 class EventsClient(object):
82 A singleton client for the events mechanism.
85 __metaclass__ = ABCMeta
88 _instance_lock = threading.Lock()
90 def __init__(self, emit_addr, reg_addr):
92 Initialize the events client.
94 logger.debug("Creating client instance.")
95 self._callbacks = collections.defaultdict(dict)
96 self._emit_addr = emit_addr
97 self._reg_addr = reg_addr
101 return self._callbacks
106 Return a singleton EventsClient instance.
108 with cls._instance_lock:
109 if cls._instance is None:
110 cls._instance = cls(_emit_addr, _reg_addr)
113 def register(self, event, callback, uid=None, replace=False):
115 Register a callback to be executed when an event is received.
117 :param event: The event that triggers the callback.
119 :param callback: The callback to be executed.
120 :type callback: callable(event, *content)
121 :param uid: The callback uid.
123 :param replace: Wether an eventual callback with same ID should be
127 :return: The callback uid.
130 :raises CallbackAlreadyRegisteredError: when there's already a callback
131 identified by the given uid and replace is False.
133 logger.debug("Subscribing to event: %s" % event)
136 elif uid in self._callbacks[event] and not replace:
137 raise CallbackAlreadyRegisteredError()
138 self._callbacks[event][uid] = callback
139 self._subscribe(str(event))
142 def unregister(self, event, uid=None):
144 Unregister callbacks for an event.
146 If uid is not None, then only the callback identified by the given uid
147 is removed. Otherwise, all callbacks for the event are removed.
149 :param event: The event that triggers the callback.
151 :param uid: The callback uid.
156 "Unregistering all callbacks from event %s." % event)
157 self._callbacks[event] = {}
160 "Unregistering callback %s from event %s." % (uid, event))
161 if uid in self._callbacks[event]:
162 del self._callbacks[event][uid]
163 if not self._callbacks[event]:
164 del self._callbacks[event]
165 self._unsubscribe(str(event))
167 def emit(self, event, *content):
171 :param event: The event to be sent.
173 :param content: The content of the event.
176 logger.debug("Emitting event: (%s, %s)" % (event, content))
177 self._send(str(event) + b'\0' + pickle.dumps(content))
179 def _handle_event(self, event, content):
181 Handle an incoming event.
183 :param msg: The incoming message.
186 logger.debug("Handling event %s..." % event)
187 for uid in self._callbacks[event].keys():
188 callback = self._callbacks[event][uid]
189 logger.debug("Executing callback %s." % uid)
190 callback(event, *content)
193 def _subscribe(self, tag):
195 Subscribe to a tag on the zmq SUB socket.
197 :param tag: The tag to be subscribed.
203 def _unsubscribe(self, tag):
205 Unsubscribe from a tag on the zmq SUB socket.
207 :param tag: The tag to be unsubscribed.
213 def _send(self, data):
215 Send data through PUSH socket.
217 :param data: The data to be sent.
223 self.__class__.reset()
227 with cls._instance_lock:
231 class EventsIOLoop(ioloop.ZMQIOLoop):
233 An extension of zmq's ioloop that can wait until there are no callbacks
234 in the queue before stopping.
237 def stop(self, wait=False):
241 :param wait: Whether we should wait for callbacks in queue to finish
246 # prevent new callbacks from being added
247 with self._callback_lock:
249 # wait until all callbacks have been executed
250 while self._callbacks:
252 ioloop.ZMQIOLoop.stop(self)
255 class EventsClientThread(threading.Thread, EventsClient):
257 A threaded version of the events client.
260 def __init__(self, emit_addr, reg_addr):
262 Initialize the events client.
264 threading.Thread.__init__(self)
265 EventsClient.__init__(self, emit_addr, reg_addr)
266 self._lock = threading.Lock()
267 self._initialized = threading.Event()
268 self._config_prefix = os.path.join(
269 get_path_prefix(), "leap", "events")
277 Initialize ZMQ connections.
279 self._loop = EventsIOLoop()
280 self._context = zmq.Context()
281 # connect SUB first, otherwise we might miss some event sent from this
283 self._sub = self._zmq_connect_sub()
284 self._push = self._zmq_connect_push()
286 def _zmq_connect(self, socktype, address):
288 Connect to an address using with a zmq socktype.
290 :param socktype: The ZMQ socket type.
292 :param address: The address to connect to.
295 :return: A ZMQ connection stream.
298 logger.debug("Connecting %s to %s." % (socktype, address))
299 socket = self._context.socket(socktype)
300 # configure curve authentication
302 public, private = maybe_create_and_get_certificates(
303 self._config_prefix, "client")
304 server_public_file = os.path.join(
305 self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key")
306 server_public, _ = zmq.auth.load_certificate(server_public_file)
307 socket.curve_publickey = public
308 socket.curve_secretkey = private
309 socket.curve_serverkey = server_public
310 stream = zmqstream.ZMQStream(socket, self._loop)
311 socket.connect(address)
314 def _zmq_connect_push(self):
316 Initialize the client's PUSH connection.
318 :return: A ZMQ connection stream.
321 return self._zmq_connect(zmq.PUSH, self._emit_addr)
323 def _zmq_connect_sub(self):
325 Initialize the client's SUB connection.
327 :return: A ZMQ connection stream.
330 stream = self._zmq_connect(zmq.SUB, self._reg_addr)
331 stream.on_recv(self._on_recv)
334 def _on_recv(self, msg):
336 Handle an incoming message in the SUB socket.
338 :param msg: The received message.
341 ev_str, content_pickle = msg[0].split(b'\0', 1) # undo txzmq tagging
342 event = getattr(catalog, ev_str)
343 content = pickle.loads(content_pickle)
344 self._handle_event(event, content)
346 def _subscribe(self, tag):
348 Subscribe from a tag on the zmq SUB socket.
350 :param tag: The tag to be subscribed.
353 self._sub.socket.setsockopt(zmq.SUBSCRIBE, tag)
355 def _unsubscribe(self, tag):
357 Unsubscribe from a tag on the zmq SUB socket.
359 :param tag: The tag to be unsubscribed.
362 self._sub.socket.setsockopt(zmq.UNSUBSCRIBE, tag)
364 def _send(self, data):
366 Send data through PUSH socket.
368 :param data: The data to be sent.
371 # add send() as a callback for ioloop so it works between threads
372 self._loop.add_callback(lambda: self._push.send(data))
374 def register(self, event, callback, uid=None, replace=False):
376 Register a callback to be executed when an event is received.
378 :param event: The event that triggers the callback.
380 :param callback: The callback to be executed.
381 :type callback: callable(event, *content)
382 :param uid: The callback uid.
384 :param replace: Wether an eventual callback with same ID should be
388 :return: The callback uid.
391 :raises CallbackAlreadyRegisteredError: when there's already a
392 callback identified by the given uid and replace is False.
395 return EventsClient.register(self, event, callback, uid=uid, replace=replace)
397 def unregister(self, event, uid=None):
399 Unregister callbacks for an event.
401 If uid is not None, then only the callback identified by the given uid
402 is removed. Otherwise, all callbacks for the event are removed.
404 :param event: The event that triggers the callback.
406 :param uid: The callback uid.
410 EventsClient.unregister(self, event, uid=uid)
412 def emit(self, event, *content):
416 :param event: The event to be sent.
418 :param content: The content of the event.
422 EventsClient.emit(self, event, *content)
426 Run the events client.
428 logger.debug("Starting ioloop.")
430 self._initialized.set()
433 logger.debug("Ioloop finished.")
435 def ensure_client(self):
437 Make sure the events client thread is started.
440 if not self.is_alive():
443 self._initialized.wait()
447 Shutdown the events client thread.
449 logger.debug("Shutting down client...")
452 self._loop.stop(wait=True)
453 EventsClient.shutdown(self)
458 Shutdown the events client thread.
460 EventsClientThread.instance().shutdown()
463 def register(event, callback, uid=None, replace=False):
465 Register a callback to be executed when an event is received.
467 :param event: The event that triggers the callback.
469 :param callback: The callback to be executed.
470 :type callback: callable(event, content)
471 :param uid: The callback uid.
473 :param replace: Wether an eventual callback with same ID should be
477 :return: The callback uid.
480 :raises CallbackAlreadyRegisteredError: when there's already a callback
481 identified by the given uid and replace is False.
483 return EventsClientThread.instance().register(
484 event, callback, uid=uid, replace=replace)
487 def unregister(event, uid=None):
489 Unregister callbacks for an event.
491 If uid is not None, then only the callback identified by the given uid is
492 removed. Otherwise, all callbacks for the event are removed.
494 :param event: The event that triggers the callback.
496 :param uid: The callback uid.
499 return EventsClientThread.instance().unregister(event, uid=uid)
502 def emit(event, *content):
506 :param event: The event to be sent.
508 :param content: The content of the event.
511 return EventsClientThread.instance().emit(event, *content)
516 Return an instance of the events client.
518 :return: An instance of the events client.
519 :rtype: EventsClientThread
521 return EventsClientThread.instance()