[bug] avoid the events server to block twistd daemon
[leap_pycommon.git] / src / leap / common / events / txclient.py
1 # -*- coding: utf-8 -*-
2 # txclient.py
3 # Copyright (C) 2013, 2014, 2015 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 """
18 The client end point of the events mechanism, implemented using txzmq.
19
20 Clients are the communicating parties of the events mechanism. They
21 communicate by sending messages to a server, which in turn redistributes
22 messages to other clients.
23
24 When a client registers a callback for a given event, it also tells the
25 server that it wants to be notified whenever events of that type are sent by
26 some other client.
27 """
28 import logging
29 import pickle
30
31 from leap.common.events.zmq_components import TxZmqClientComponent
32
33 import txzmq
34
35 from leap.common.events.client import EventsClient
36 from leap.common.events.client import configure_client
37 from leap.common.events.server import EMIT_ADDR
38 from leap.common.events.server import REG_ADDR
39 from leap.common.events import catalog
40
41
42 logger = logging.getLogger(__name__)
43
44
45 __all__ = [
46     "configure_client",
47     "EventsTxClient",
48     "register",
49     "unregister",
50     "emit",
51     "shutdown",
52 ]
53
54
55 class EventsTxClient(TxZmqClientComponent, EventsClient):
56     """
57     A twisted events client that listens for events in one address and
58     publishes those events to another address.
59     """
60
61     def __init__(self, emit_addr=EMIT_ADDR, reg_addr=REG_ADDR,
62                  path_prefix=None):
63         """
64         Initialize the events server.
65         """
66         TxZmqClientComponent.__init__(self, path_prefix=path_prefix)
67         EventsClient.__init__(self, emit_addr, reg_addr)
68         # connect SUB first, otherwise we might miss some event sent from this
69         # same client
70         self._sub = self._zmq_connect(txzmq.ZmqSubConnection, reg_addr)
71         self._sub.gotMessage = self._gotMessage
72
73         self._push = self._zmq_connect(txzmq.ZmqPushConnection, emit_addr)
74
75     def _gotMessage(self, msg, tag):
76         """
77         Handle an incoming event.
78
79         :param msg: The incoming message.
80         :type msg: list(str)
81         """
82         event = getattr(catalog, tag)
83         content = pickle.loads(msg)
84         self._handle_event(event, content)
85
86     def _subscribe(self, tag):
87         """
88         Subscribe to a tag on the zmq SUB socket.
89
90         :param tag: The tag to be subscribed.
91         :type tag: str
92         """
93         self._sub.subscribe(tag)
94
95     def _unsubscribe(self, tag):
96         """
97         Unsubscribe from a tag on the zmq SUB socket.
98
99         :param tag: The tag to be unsubscribed.
100         :type tag: str
101         """
102         self._sub.unsubscribe(tag)
103
104     def _send(self, data):
105         """
106         Send data through PUSH socket.
107
108         :param data: The data to be sent.
109         :type event: str
110         """
111         self._push.send(data)
112
113     def _run_callback(self, callback, event, content):
114         """
115         Run a callback.
116
117         :param callback: The callback to be run.
118         :type callback: callable(event, *content)
119         :param event: The event to be sent.
120         :type event: Event
121         :param content: The content of the event.
122         :type content: list
123         """
124         callback(event, *content)
125
126     def shutdown(self):
127         EventsClient.shutdown(self)
128
129
130 def register(event, callback, uid=None, replace=False):
131     """
132     Register a callback to be executed when an event is received.
133
134     :param event: The event that triggers the callback.
135     :type event: str
136     :param callback: The callback to be executed.
137     :type callback: callable(event, content)
138     :param uid: The callback uid.
139     :type uid: str
140     :param replace: Wether an eventual callback with same ID should be
141                     replaced.
142     :type replace: bool
143
144     :return: The callback uid.
145     :rtype: str
146
147     :raises CallbackAlreadyRegisteredError: when there's already a callback
148             identified by the given uid and replace is False.
149     """
150     return EventsTxClient.instance().register(
151         event, callback, uid=uid, replace=replace)
152
153
154 def unregister(event, uid=None):
155     """
156     Unregister callbacks for an event.
157
158     If uid is not None, then only the callback identified by the given uid is
159     removed. Otherwise, all callbacks for the event are removed.
160
161     :param event: The event that triggers the callback.
162     :type event: str
163     :param uid: The callback uid.
164     :type uid: str
165     """
166     return EventsTxClient.instance().unregister(event, uid=uid)
167
168
169 def emit(event, *content):
170     """
171     Send an event.
172
173     :param event: The event to be sent.
174     :type event: str
175     :param content: The content of the event.
176     :type content: list
177     """
178     return EventsTxClient.instance().emit(event, *content)
179
180
181 def shutdown():
182     """
183     Shutdown the events client.
184     """
185     EventsTxClient.instance().shutdown()
186
187
188 def instance():
189     """
190     Return an instance of the events client.
191
192     :return: An instance of the events client.
193     :rtype: EventsClientThread
194     """
195     return EventsTxClient.instance()