1 # -*- coding: utf-8 -*-
3 # Copyright (C) 2013, 2014, 2015 LEAP
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 The client end point of the events mechanism, implemented using txzmq.
20 Clients are the communicating parties of the events mechanism. They
21 communicate by sending messages to a server, which in turn redistributes
22 messages to other clients.
24 When a client registers a callback for a given event, it also tells the
25 server that it wants to be notified whenever events of that type are sent by
33 from leap.common.events.zmq_components import TxZmqClientComponent
34 from leap.common.events.client import EventsClient
35 from leap.common.events.client import configure_client
36 from leap.common.events.server import EMIT_ADDR
37 from leap.common.events.server import REG_ADDR
38 from leap.common.events import catalog
41 logger = logging.getLogger(__name__)
54 class EventsTxClient(TxZmqClientComponent, EventsClient):
56 A twisted events client that listens for events in one address and
57 publishes those events to another address.
60 def __init__(self, emit_addr=EMIT_ADDR, reg_addr=REG_ADDR,
63 Initialize the events server.
65 TxZmqClientComponent.__init__(self, path_prefix=path_prefix)
66 EventsClient.__init__(self, emit_addr, reg_addr)
67 # connect SUB first, otherwise we might miss some event sent from this
69 self._sub = self._zmq_connect(txzmq.ZmqSubConnection, reg_addr)
70 self._sub.gotMessage = self._gotMessage
71 self._push = self._zmq_connect(txzmq.ZmqPushConnection, emit_addr)
73 def _gotMessage(self, msg, tag):
75 Handle an incoming event.
77 :param msg: The incoming message.
80 event = getattr(catalog, tag)
81 content = pickle.loads(msg)
82 self._handle_event(event, content)
84 def _subscribe(self, tag):
86 Subscribe to a tag on the zmq SUB socket.
88 :param tag: The tag to be subscribed.
91 self._sub.subscribe(tag)
93 def _unsubscribe(self, tag):
95 Unsubscribe from a tag on the zmq SUB socket.
97 :param tag: The tag to be unsubscribed.
100 self._sub.unsubscribe(tag)
102 def _send(self, data):
104 Send data through PUSH socket.
106 :param data: The data to be sent.
109 self._push.send(data)
111 def _run_callback(self, callback, event, content):
115 :param callback: The callback to be run.
116 :type callback: callable(event, *content)
117 :param event: The event to be sent.
119 :param content: The content of the event.
122 callback(event, *content)
125 TxZmqClientComponent.shutdown(self)
126 EventsClient.shutdown(self)
129 def register(event, callback, uid=None, replace=False):
131 Register a callback to be executed when an event is received.
133 :param event: The event that triggers the callback.
135 :param callback: The callback to be executed.
136 :type callback: callable(event, content)
137 :param uid: The callback uid.
139 :param replace: Wether an eventual callback with same ID should be
143 :return: The callback uid.
146 :raises CallbackAlreadyRegisteredError: when there's already a callback
147 identified by the given uid and replace is False.
149 return EventsTxClient.instance().register(
150 event, callback, uid=uid, replace=replace)
153 def unregister(event, uid=None):
155 Unregister callbacks for an event.
157 If uid is not None, then only the callback identified by the given uid is
158 removed. Otherwise, all callbacks for the event are removed.
160 :param event: The event that triggers the callback.
162 :param uid: The callback uid.
165 return EventsTxClient.instance().unregister(event, uid=uid)
168 def emit(event, *content):
172 :param event: The event to be sent.
174 :param content: The content of the event.
177 return EventsTxClient.instance().emit(event, *content)
182 Shutdown the events client.
184 EventsTxClient.instance().shutdown()
189 Return an instance of the events client.
191 :return: An instance of the events client.
192 :rtype: EventsClientThread
194 return EventsTxClient.instance()