[refactor] remove use of reactor in threaded version of events client
[leap_pycommon.git] / src / leap / common / events / client.py
1 # -*- coding: utf-8 -*-
2 # client.py
3 # Copyright (C) 2013, 2014, 2015 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
19 """
20 The client end point of the events mechanism.
21
22 Clients are the communicating parties of the events mechanism. They
23 communicate by sending messages to a server, which in turn redistributes
24 messages to other clients.
25
26 When a client registers a callback for a given event, it also tells the
27 server that it wants to be notified whenever events of that type are sent by
28 some other client.
29 """
30
31
32 import logging
33 import collections
34 import uuid
35 import threading
36 import time
37 import pickle
38 import os
39
40 from abc import ABCMeta
41 from abc import abstractmethod
42
43 import zmq
44 from zmq.eventloop import zmqstream
45 from zmq.eventloop import ioloop
46
47 # XXX some distros don't package libsodium, so we have to be prepared for
48 #     absence of zmq.auth
49 try:
50     import zmq.auth
51 except ImportError:
52     pass
53
54 from leap.common.config import get_path_prefix
55 from leap.common.zmq_utils import zmq_has_curve
56 from leap.common.zmq_utils import maybe_create_and_get_certificates
57 from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX
58
59 from leap.common.events.errors import CallbackAlreadyRegisteredError
60 from leap.common.events.server import EMIT_ADDR
61 from leap.common.events.server import REG_ADDR
62 from leap.common.events import catalog
63
64
65 logger = logging.getLogger(__name__)
66
67
68 _emit_addr = EMIT_ADDR
69 _reg_addr = REG_ADDR
70
71
72 def configure_client(emit_addr, reg_addr):
73     global _emit_addr, _reg_addr
74     logger.debug("Configuring client with addresses: (%s, %s)" %
75                  (emit_addr, reg_addr))
76     _emit_addr = emit_addr
77     _reg_addr = reg_addr
78
79
80 class EventsClient(object):
81     """
82     A singleton client for the events mechanism.
83     """
84
85     __metaclass__ = ABCMeta
86
87     _instance = None
88     _instance_lock = threading.Lock()
89
90     def __init__(self, emit_addr, reg_addr):
91         """
92         Initialize the events client.
93         """
94         logger.debug("Creating client instance.")
95         self._callbacks = collections.defaultdict(dict)
96         self._emit_addr = emit_addr
97         self._reg_addr = reg_addr
98
99     @property
100     def callbacks(self):
101         return self._callbacks
102
103     @classmethod
104     def instance(cls):
105         """
106         Return a singleton EventsClient instance.
107         """
108         with cls._instance_lock:
109             if cls._instance is None:
110                 cls._instance = cls(_emit_addr, _reg_addr)
111         return cls._instance
112
113     def register(self, event, callback, uid=None, replace=False):
114         """
115         Register a callback to be executed when an event is received.
116
117         :param event: The event that triggers the callback.
118         :type event: Event
119         :param callback: The callback to be executed.
120         :type callback: callable(event, *content)
121         :param uid: The callback uid.
122         :type uid: str
123         :param replace: Wether an eventual callback with same ID should be
124                         replaced.
125         :type replace: bool
126
127         :return: The callback uid.
128         :rtype: str
129
130         :raises CallbackAlreadyRegisteredError: when there's already a callback
131                 identified by the given uid and replace is False.
132         """
133         logger.debug("Subscribing to event: %s" % event)
134         if not uid:
135             uid = uuid.uuid4()
136         elif uid in self._callbacks[event] and not replace:
137             raise CallbackAlreadyRegisteredError()
138         self._callbacks[event][uid] = callback
139         self._subscribe(str(event))
140         return uid
141
142     def unregister(self, event, uid=None):
143         """
144         Unregister callbacks for an event.
145
146         If uid is not None, then only the callback identified by the given uid
147         is removed. Otherwise, all callbacks for the event are removed.
148
149         :param event: The event that triggers the callback.
150         :type event: Event
151         :param uid: The callback uid.
152         :type uid: str
153         """
154         if not uid:
155             logger.debug(
156                 "Unregistering all callbacks from event %s." % event)
157             self._callbacks[event] = {}
158         else:
159             logger.debug(
160                 "Unregistering callback %s from event %s." % (uid, event))
161             if uid in self._callbacks[event]:
162                 del self._callbacks[event][uid]
163         if not self._callbacks[event]:
164             del self._callbacks[event]
165             self._unsubscribe(str(event))
166
167     def emit(self, event, *content):
168         """
169         Send an event.
170
171         :param event: The event to be sent.
172         :type event: Event
173         :param content: The content of the event.
174         :type content: list
175         """
176         logger.debug("Emitting event: (%s, %s)" % (event, content))
177         payload = str(event) + b'\0' + pickle.dumps(content)
178         self._send(payload)
179
180     def _handle_event(self, event, content):
181         """
182         Handle an incoming event.
183
184         :param event: The event to be sent.
185         :type event: Event
186         :param content: The content of the event.
187         :type content: list
188         """
189         logger.debug("Handling event %s..." % event)
190         for uid in self._callbacks[event]:
191             callback = self._callbacks[event][uid]
192             logger.debug("Executing callback %s." % uid)
193             self._run_callback(callback, event, content)
194
195     @abstractmethod
196     def _run_callback(self, callback, event, content):
197         """
198         Run a callback.
199
200         :param callback: The callback to be run.
201         :type callback: callable(event, *content)
202         :param event: The event to be sent.
203         :type event: Event
204         :param content: The content of the event.
205         :type content: list
206         """
207         pass
208
209     @abstractmethod
210     def _subscribe(self, tag):
211         """
212         Subscribe to a tag on the zmq SUB socket.
213
214         :param tag: The tag to be subscribed.
215         :type tag: str
216         """
217         pass
218
219     @abstractmethod
220     def _unsubscribe(self, tag):
221         """
222         Unsubscribe from a tag on the zmq SUB socket.
223
224         :param tag: The tag to be unsubscribed.
225         :type tag: str
226         """
227         pass
228
229     @abstractmethod
230     def _send(self, data):
231         """
232         Send data through PUSH socket.
233
234         :param data: The data to be sent.
235         :type event: str
236         """
237         pass
238
239     def shutdown(self):
240         self.__class__.reset()
241
242     @classmethod
243     def reset(cls):
244         with cls._instance_lock:
245             cls._instance = None
246
247
248 class EventsIOLoop(ioloop.ZMQIOLoop):
249     """
250     An extension of zmq's ioloop that can wait until there are no callbacks
251     in the queue before stopping.
252     """
253
254     def stop(self, wait=False):
255         """
256         Stop the I/O loop.
257
258         :param wait: Whether we should wait for callbacks in queue to finish
259                      before stopping.
260         :type wait: bool
261         """
262         if wait:
263             # prevent new callbacks from being added
264             with self._callback_lock:
265                 self._closing = True
266             # wait until all callbacks have been executed
267             while self._callbacks:
268                 time.sleep(0.1)
269         ioloop.ZMQIOLoop.stop(self)
270
271
272 class EventsClientThread(threading.Thread, EventsClient):
273     """
274     A threaded version of the events client.
275     """
276
277     def __init__(self, emit_addr, reg_addr):
278         """
279         Initialize the events client.
280         """
281         threading.Thread.__init__(self)
282         EventsClient.__init__(self, emit_addr, reg_addr)
283         self._lock = threading.Lock()
284         self._initialized = threading.Event()
285         self._config_prefix = os.path.join(
286             get_path_prefix(), "leap", "events")
287         self._loop = None
288         self._context = None
289         self._push = None
290         self._sub = None
291
292     def _init_zmq(self):
293         """
294         Initialize ZMQ connections.
295         """
296         self._loop = EventsIOLoop()
297         self._context = zmq.Context()
298         # connect SUB first, otherwise we might miss some event sent from this
299         # same client
300         self._sub = self._zmq_connect_sub()
301         self._push = self._zmq_connect_push()
302
303     def _zmq_connect(self, socktype, address):
304         """
305         Connect to an address using with a zmq socktype.
306
307         :param socktype: The ZMQ socket type.
308         :type socktype: int
309         :param address: The address to connect to.
310         :type address: str
311
312         :return: A ZMQ connection stream.
313         :rtype: ZMQStream
314         """
315         logger.debug("Connecting %s to %s." % (socktype, address))
316         socket = self._context.socket(socktype)
317         # configure curve authentication
318         if zmq_has_curve():
319             public, private = maybe_create_and_get_certificates(
320                 self._config_prefix, "client")
321             server_public_file = os.path.join(
322                 self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key")
323             server_public, _ = zmq.auth.load_certificate(server_public_file)
324             socket.curve_publickey = public
325             socket.curve_secretkey = private
326             socket.curve_serverkey = server_public
327         stream = zmqstream.ZMQStream(socket, self._loop)
328         socket.connect(address)
329         return stream
330
331     def _zmq_connect_push(self):
332         """
333         Initialize the client's PUSH connection.
334
335         :return: A ZMQ connection stream.
336         :rtype: ZMQStream
337         """
338         return self._zmq_connect(zmq.PUSH, self._emit_addr)
339
340     def _zmq_connect_sub(self):
341         """
342         Initialize the client's SUB connection.
343
344         :return: A ZMQ connection stream.
345         :rtype: ZMQStream
346         """
347         stream = self._zmq_connect(zmq.SUB, self._reg_addr)
348         stream.on_recv(self._on_recv)
349         return stream
350
351     def _on_recv(self, msg):
352         """
353         Handle an incoming message in the SUB socket.
354
355         :param msg: The received message.
356         :type msg: str
357         """
358         ev_str, content_pickle = msg[0].split(b'\0', 1)  # undo txzmq tagging
359         event = getattr(catalog, ev_str)
360         content = pickle.loads(content_pickle)
361         self._handle_event(event, content)
362
363     def _subscribe(self, tag):
364         """
365         Subscribe from a tag on the zmq SUB socket.
366
367         :param tag: The tag to be subscribed.
368         :type tag: str
369         """
370         self._sub.socket.setsockopt(zmq.SUBSCRIBE, tag)
371
372     def _unsubscribe(self, tag):
373         """
374         Unsubscribe from a tag on the zmq SUB socket.
375
376         :param tag: The tag to be unsubscribed.
377         :type tag: str
378         """
379         self._sub.socket.setsockopt(zmq.UNSUBSCRIBE, tag)
380
381     def _send(self, data):
382         """
383         Send data through PUSH socket.
384
385         :param data: The data to be sent.
386         :type event: str
387         """
388         # add send() as a callback for ioloop so it works between threads
389         self._loop.add_callback(lambda: self._push.send(data))
390
391     def _run_callback(self, callback, event, content):
392         """
393         Run a callback.
394
395         :param callback: The callback to be run.
396         :type callback: callable(event, *content)
397         :param event: The event to be sent.
398         :type event: Event
399         :param content: The content of the event.
400         :type content: list
401         """
402         self._loop.add_callback(lambda: callback(event, *content))
403
404     def register(self, event, callback, uid=None, replace=False):
405         """
406         Register a callback to be executed when an event is received.
407
408         :param event: The event that triggers the callback.
409         :type event: Event
410         :param callback: The callback to be executed.
411         :type callback: callable(event, *content)
412         :param uid: The callback uid.
413         :type uid: str
414         :param replace: Wether an eventual callback with same ID should be
415                         replaced.
416         :type replace: bool
417
418         :return: The callback uid.
419         :rtype: str
420
421         :raises CallbackAlreadyRegisteredError: when there's already a
422                 callback identified by the given uid and replace is False.
423         """
424         self.ensure_client()
425         return EventsClient.register(
426             self, event, callback, uid=uid, replace=replace)
427
428     def unregister(self, event, uid=None):
429         """
430         Unregister callbacks for an event.
431
432         If uid is not None, then only the callback identified by the given uid
433         is removed. Otherwise, all callbacks for the event are removed.
434
435         :param event: The event that triggers the callback.
436         :type event: Event
437         :param uid: The callback uid.
438         :type uid: str
439         """
440         self.ensure_client()
441         EventsClient.unregister(self, event, uid=uid)
442
443     def emit(self, event, *content):
444         """
445         Send an event.
446
447         :param event: The event to be sent.
448         :type event: Event
449         :param content: The content of the event.
450         :type content: list
451         """
452         self.ensure_client()
453         EventsClient.emit(self, event, *content)
454
455     def run(self):
456         """
457         Run the events client.
458         """
459         logger.debug("Starting ioloop.")
460         self._init_zmq()
461         self._initialized.set()
462         self._loop.start()
463         self._loop.close()
464         logger.debug("Ioloop finished.")
465
466     def ensure_client(self):
467         """
468         Make sure the events client thread is started.
469         """
470         with self._lock:
471             if not self.is_alive():
472                 self.daemon = True
473                 self.start()
474                 self._initialized.wait()
475
476     def shutdown(self):
477         """
478         Shutdown the events client thread.
479         """
480         logger.debug("Shutting down client...")
481         with self._lock:
482             if self.is_alive():
483                 self._loop.stop(wait=True)
484         EventsClient.shutdown(self)
485
486
487 def shutdown():
488     """
489     Shutdown the events client thread.
490     """
491     EventsClientThread.instance().shutdown()
492
493
494 def register(event, callback, uid=None, replace=False):
495     """
496     Register a callback to be executed when an event is received.
497
498     :param event: The event that triggers the callback.
499     :type event: str
500     :param callback: The callback to be executed.
501     :type callback: callable(event, content)
502     :param uid: The callback uid.
503     :type uid: str
504     :param replace: Wether an eventual callback with same ID should be
505                     replaced.
506     :type replace: bool
507
508     :return: The callback uid.
509     :rtype: str
510
511     :raises CallbackAlreadyRegisteredError: when there's already a callback
512             identified by the given uid and replace is False.
513     """
514     return EventsClientThread.instance().register(
515         event, callback, uid=uid, replace=replace)
516
517
518 def unregister(event, uid=None):
519     """
520     Unregister callbacks for an event.
521
522     If uid is not None, then only the callback identified by the given uid is
523     removed. Otherwise, all callbacks for the event are removed.
524
525     :param event: The event that triggers the callback.
526     :type event: str
527     :param uid: The callback uid.
528     :type uid: str
529     """
530     return EventsClientThread.instance().unregister(event, uid=uid)
531
532
533 def emit(event, *content):
534     """
535     Send an event.
536
537     :param event: The event to be sent.
538     :type event: str
539     :param content: The content of the event.
540     :type content: list
541     """
542     return EventsClientThread.instance().emit(event, *content)
543
544
545 def instance():
546     """
547     Return an instance of the events client.
548
549     :return: An instance of the events client.
550     :rtype: EventsClientThread
551     """
552     return EventsClientThread.instance()