[tests] adapt events tests to recent changes
[leap_pycommon.git] / src / leap / common / events / txclient.py
1 # -*- coding: utf-8 -*-
2 # txclient.py
3 # Copyright (C) 2013, 2014, 2015 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 """
18 The client end point of the events mechanism, implemented using txzmq.
19
20 Clients are the communicating parties of the events mechanism. They
21 communicate by sending messages to a server, which in turn redistributes
22 messages to other clients.
23
24 When a client registers a callback for a given event, it also tells the
25 server that it wants to be notified whenever events of that type are sent by
26 some other client.
27 """
28 import logging
29 import pickle
30
31 import txzmq
32
33 from leap.common.events.zmq_components import TxZmqClientComponent
34 from leap.common.events.client import EventsClient
35 from leap.common.events.client import configure_client
36 from leap.common.events.server import EMIT_ADDR
37 from leap.common.events.server import REG_ADDR
38 from leap.common.events import catalog
39
40
41 logger = logging.getLogger(__name__)
42
43
44 __all__ = [
45     "configure_client",
46     "EventsTxClient",
47     "register",
48     "unregister",
49     "emit",
50     "shutdown",
51 ]
52
53
54 class EventsTxClient(TxZmqClientComponent, EventsClient):
55     """
56     A twisted events client that listens for events in one address and
57     publishes those events to another address.
58     """
59
60     def __init__(self, emit_addr=EMIT_ADDR, reg_addr=REG_ADDR,
61                  path_prefix=None, factory=None, enable_curve=True):
62         """
63         Initialize the events client.
64         """
65         TxZmqClientComponent.__init__(
66             self, path_prefix=path_prefix, factory=factory,
67             enable_curve=enable_curve)
68         EventsClient.__init__(self, emit_addr, reg_addr)
69         # connect SUB first, otherwise we might miss some event sent from this
70         # same client
71         self._sub = self._zmq_connect(txzmq.ZmqSubConnection, reg_addr)
72         self._sub.gotMessage = self._gotMessage
73
74         self._push = self._zmq_connect(txzmq.ZmqPushConnection, emit_addr)
75
76     def _gotMessage(self, msg, tag):
77         """
78         Handle an incoming event.
79
80         :param msg: The incoming message.
81         :type msg: list(str)
82         """
83         event = getattr(catalog, tag)
84         content = pickle.loads(msg)
85         self._handle_event(event, content)
86
87     def _subscribe(self, tag):
88         """
89         Subscribe to a tag on the zmq SUB socket.
90
91         :param tag: The tag to be subscribed.
92         :type tag: str
93         """
94         self._sub.subscribe(tag)
95
96     def _unsubscribe(self, tag):
97         """
98         Unsubscribe from a tag on the zmq SUB socket.
99
100         :param tag: The tag to be unsubscribed.
101         :type tag: str
102         """
103         self._sub.unsubscribe(tag)
104
105     def _send(self, data):
106         """
107         Send data through PUSH socket.
108
109         :param data: The data to be sent.
110         :type event: str
111         """
112         self._push.send(data)
113
114     def _run_callback(self, callback, event, content):
115         """
116         Run a callback.
117
118         :param callback: The callback to be run.
119         :type callback: callable(event, *content)
120         :param event: The event to be sent.
121         :type event: Event
122         :param content: The content of the event.
123         :type content: list
124         """
125         callback(event, *content)
126
127     def shutdown(self):
128         EventsClient.shutdown(self)
129
130
131 def register(event, callback, uid=None, replace=False):
132     """
133     Register a callback to be executed when an event is received.
134
135     :param event: The event that triggers the callback.
136     :type event: str
137     :param callback: The callback to be executed.
138     :type callback: callable(event, content)
139     :param uid: The callback uid.
140     :type uid: str
141     :param replace: Wether an eventual callback with same ID should be
142                     replaced.
143     :type replace: bool
144
145     :return: The callback uid.
146     :rtype: str
147
148     :raises CallbackAlreadyRegisteredError: when there's already a callback
149             identified by the given uid and replace is False.
150     """
151     return EventsTxClient.instance().register(
152         event, callback, uid=uid, replace=replace)
153
154
155 def unregister(event, uid=None):
156     """
157     Unregister callbacks for an event.
158
159     If uid is not None, then only the callback identified by the given uid is
160     removed. Otherwise, all callbacks for the event are removed.
161
162     :param event: The event that triggers the callback.
163     :type event: str
164     :param uid: The callback uid.
165     :type uid: str
166     """
167     return EventsTxClient.instance().unregister(event, uid=uid)
168
169
170 def emit(event, *content):
171     """
172     Send an event.
173
174     :param event: The event to be sent.
175     :type event: str
176     :param content: The content of the event.
177     :type content: list
178     """
179     return EventsTxClient.instance().emit(event, *content)
180
181
182 def shutdown():
183     """
184     Shutdown the events client.
185     """
186     EventsTxClient.instance().shutdown()
187
188
189 def instance():
190     """
191     Return an instance of the events client.
192
193     :return: An instance of the events client.
194     :rtype: EventsClientThread
195     """
196     return EventsTxClient.instance()