1 # -*- coding: utf-8 -*-
3 # Copyright (C) 2013, 2014, 2015 LEAP
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 The client end point of the events mechanism, implemented using txzmq.
20 Clients are the communicating parties of the events mechanism. They
21 communicate by sending messages to a server, which in turn redistributes
22 messages to other clients.
24 When a client registers a callback for a given event, it also tells the
25 server that it wants to be notified whenever events of that type are sent by
31 from leap.common.events.zmq_components import TxZmqClientComponent
35 from leap.common.events.client import EventsClient
36 from leap.common.events.client import configure_client
37 from leap.common.events.server import EMIT_ADDR
38 from leap.common.events.server import REG_ADDR
39 from leap.common.events import catalog
42 logger = logging.getLogger(__name__)
55 class EventsTxClient(TxZmqClientComponent, EventsClient):
57 A twisted events client that listens for events in one address and
58 publishes those events to another address.
61 def __init__(self, emit_addr=EMIT_ADDR, reg_addr=REG_ADDR,
64 Initialize the events server.
66 TxZmqClientComponent.__init__(self, path_prefix=path_prefix)
67 EventsClient.__init__(self, emit_addr, reg_addr)
68 # connect SUB first, otherwise we might miss some event sent from this
70 self._sub = self._zmq_connect(txzmq.ZmqSubConnection, reg_addr)
71 self._sub.gotMessage = self._gotMessage
73 self._push = self._zmq_connect(txzmq.ZmqPushConnection, emit_addr)
75 def _gotMessage(self, msg, tag):
77 Handle an incoming event.
79 :param msg: The incoming message.
82 event = getattr(catalog, tag)
83 content = pickle.loads(msg)
84 self._handle_event(event, content)
86 def _subscribe(self, tag):
88 Subscribe to a tag on the zmq SUB socket.
90 :param tag: The tag to be subscribed.
93 self._sub.subscribe(tag)
95 def _unsubscribe(self, tag):
97 Unsubscribe from a tag on the zmq SUB socket.
99 :param tag: The tag to be unsubscribed.
102 self._sub.unsubscribe(tag)
104 def _send(self, data):
106 Send data through PUSH socket.
108 :param data: The data to be sent.
111 self._push.send(data)
113 def _run_callback(self, callback, event, content):
117 :param callback: The callback to be run.
118 :type callback: callable(event, *content)
119 :param event: The event to be sent.
121 :param content: The content of the event.
124 callback(event, *content)
127 EventsClient.shutdown(self)
130 def register(event, callback, uid=None, replace=False):
132 Register a callback to be executed when an event is received.
134 :param event: The event that triggers the callback.
136 :param callback: The callback to be executed.
137 :type callback: callable(event, content)
138 :param uid: The callback uid.
140 :param replace: Wether an eventual callback with same ID should be
144 :return: The callback uid.
147 :raises CallbackAlreadyRegisteredError: when there's already a callback
148 identified by the given uid and replace is False.
150 return EventsTxClient.instance().register(
151 event, callback, uid=uid, replace=replace)
154 def unregister(event, uid=None):
156 Unregister callbacks for an event.
158 If uid is not None, then only the callback identified by the given uid is
159 removed. Otherwise, all callbacks for the event are removed.
161 :param event: The event that triggers the callback.
163 :param uid: The callback uid.
166 return EventsTxClient.instance().unregister(event, uid=uid)
169 def emit(event, *content):
173 :param event: The event to be sent.
175 :param content: The content of the event.
178 return EventsTxClient.instance().emit(event, *content)
183 Shutdown the events client.
185 EventsTxClient.instance().shutdown()
190 Return an instance of the events client.
192 :return: An instance of the events client.
193 :rtype: EventsClientThread
195 return EventsTxClient.instance()