dfd05333ea5a2bf567e654aa549c107cfbad4fff
[leap_pycommon.git] / src / leap / common / events / txclient.py
1 # -*- coding: utf-8 -*-
2 # txclient.py
3 # Copyright (C) 2013, 2014, 2015 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 """
18 The client end point of the events mechanism, implemented using txzmq.
19
20 Clients are the communicating parties of the events mechanism. They
21 communicate by sending messages to a server, which in turn redistributes
22 messages to other clients.
23
24 When a client registers a callback for a given event, it also tells the
25 server that it wants to be notified whenever events of that type are sent by
26 some other client.
27 """
28 import logging
29 import pickle
30
31 import txzmq
32
33 from leap.common.events.zmq_components import TxZmqClientComponent
34 from leap.common.events.client import EventsClient
35 from leap.common.events.client import configure_client
36 from leap.common.events.server import EMIT_ADDR
37 from leap.common.events.server import REG_ADDR
38 from leap.common.events import catalog
39
40
41 logger = logging.getLogger(__name__)
42
43
44 __all__ = [
45     "configure_client",
46     "EventsTxClient",
47     "register",
48     "unregister",
49     "emit",
50     "shutdown",
51 ]
52
53
54 class EventsTxClient(TxZmqClientComponent, EventsClient):
55     """
56     A twisted events client that listens for events in one address and
57     publishes those events to another address.
58     """
59
60     def __init__(self, emit_addr=EMIT_ADDR, reg_addr=REG_ADDR,
61                  path_prefix=None):
62         """
63         Initialize the events server.
64         """
65         TxZmqClientComponent.__init__(self, path_prefix=path_prefix)
66         EventsClient.__init__(self, emit_addr, reg_addr)
67         # connect SUB first, otherwise we might miss some event sent from this
68         # same client
69         self._sub = self._zmq_connect(txzmq.ZmqSubConnection, reg_addr)
70         self._sub.gotMessage = self._gotMessage
71         self._push = self._zmq_connect(txzmq.ZmqPushConnection, emit_addr)
72
73     def _gotMessage(self, msg, tag):
74         """
75         Handle an incoming event.
76
77         :param msg: The incoming message.
78         :type msg: list(str)
79         """
80         event = getattr(catalog, tag)
81         content = pickle.loads(msg)
82         self._handle_event(event, content)
83
84     def _subscribe(self, tag):
85         """
86         Subscribe to a tag on the zmq SUB socket.
87
88         :param tag: The tag to be subscribed.
89         :type tag: str
90         """
91         self._sub.subscribe(tag)
92
93     def _unsubscribe(self, tag):
94         """
95         Unsubscribe from a tag on the zmq SUB socket.
96
97         :param tag: The tag to be unsubscribed.
98         :type tag: str
99         """
100         self._sub.unsubscribe(tag)
101
102     def _send(self, data):
103         """
104         Send data through PUSH socket.
105
106         :param data: The data to be sent.
107         :type event: str
108         """
109         self._push.send(data)
110
111     def _run_callback(self, callback, event, content):
112         """
113         Run a callback.
114
115         :param callback: The callback to be run.
116         :type callback: callable(event, *content)
117         :param event: The event to be sent.
118         :type event: Event
119         :param content: The content of the event.
120         :type content: list
121         """
122         callback(event, *content)
123
124     def shutdown(self):
125         TxZmqClientComponent.shutdown(self)
126         EventsClient.shutdown(self)
127
128
129 def register(event, callback, uid=None, replace=False):
130     """
131     Register a callback to be executed when an event is received.
132
133     :param event: The event that triggers the callback.
134     :type event: str
135     :param callback: The callback to be executed.
136     :type callback: callable(event, content)
137     :param uid: The callback uid.
138     :type uid: str
139     :param replace: Wether an eventual callback with same ID should be
140                     replaced.
141     :type replace: bool
142
143     :return: The callback uid.
144     :rtype: str
145
146     :raises CallbackAlreadyRegisteredError: when there's already a callback
147             identified by the given uid and replace is False.
148     """
149     return EventsTxClient.instance().register(
150         event, callback, uid=uid, replace=replace)
151
152
153 def unregister(event, uid=None):
154     """
155     Unregister callbacks for an event.
156
157     If uid is not None, then only the callback identified by the given uid is
158     removed. Otherwise, all callbacks for the event are removed.
159
160     :param event: The event that triggers the callback.
161     :type event: str
162     :param uid: The callback uid.
163     :type uid: str
164     """
165     return EventsTxClient.instance().unregister(event, uid=uid)
166
167
168 def emit(event, *content):
169     """
170     Send an event.
171
172     :param event: The event to be sent.
173     :type event: str
174     :param content: The content of the event.
175     :type content: list
176     """
177     return EventsTxClient.instance().emit(event, *content)
178
179
180 def shutdown():
181     """
182     Shutdown the events client.
183     """
184     EventsTxClient.instance().shutdown()
185
186
187 def instance():
188     """
189     Return an instance of the events client.
190
191     :return: An instance of the events client.
192     :rtype: EventsClientThread
193     """
194     return EventsTxClient.instance()