summaryrefslogtreecommitdiff
path: root/src/leap/common/events/txclient.py
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2015-02-04 15:04:10 -0200
committerdrebs <drebs@leap.se>2015-05-27 14:37:27 -0300
commit514c1434a016b09d93e8dfc5578b14825d14005a (patch)
treec4bacce1df24a81b2de3d1343dac26eb56e30ac7 /src/leap/common/events/txclient.py
parent71c750ef9c3e53ef416d1de6e85458f16ca48d74 (diff)
[feat] refactor events to use ZMQ
Before this commit, protobuf and protobuf.socketrpc were used to serialize and transmit messages between events clients. This change implements a simpler ZMQ client/server events mechanism that uses ZMQ sockets for transmitting messages from clients to server and to redistribute such messages to subscribed clients. Closes: #6359
Diffstat (limited to 'src/leap/common/events/txclient.py')
-rw-r--r--src/leap/common/events/txclient.py185
1 files changed, 185 insertions, 0 deletions
diff --git a/src/leap/common/events/txclient.py b/src/leap/common/events/txclient.py
new file mode 100644
index 0000000..8206ed5
--- /dev/null
+++ b/src/leap/common/events/txclient.py
@@ -0,0 +1,185 @@
+# -*- coding: utf-8 -*-
+# txclient.py
+# Copyright (C) 2013, 2014, 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+The client end point of the events mechanism, implemented using txzmq.
+
+Clients are the communicating parties of the events mechanism. They
+communicate by sending messages to a server, which in turn redistributes
+messages to other clients.
+
+When a client registers a callback for a given event, it also tells the
+server that it wants to be notified whenever events of that type are sent by
+some other client.
+"""
+
+
+import logging
+import pickle
+
+import txzmq
+
+from leap.common.events.zmq_components import TxZmqClientComponent
+from leap.common.events.client import EventsClient
+from leap.common.events.client import configure_client
+from leap.common.events.server import EMIT_ADDR
+from leap.common.events.server import REG_ADDR
+from leap.common.events import catalog
+
+
+logger = logging.getLogger(__name__)
+
+
+__all__ = [
+ "configure_client",
+ "EventsTxClient",
+ "register",
+ "unregister",
+ "emit",
+ "shutdown",
+]
+
+
+class EventsTxClient(TxZmqClientComponent, EventsClient):
+ """
+ A twisted events client that listens for events in one address and
+ publishes those events to another address.
+ """
+
+ def __init__(self, emit_addr=EMIT_ADDR, reg_addr=REG_ADDR,
+ path_prefix=None):
+ """
+ Initialize the events server.
+ """
+ TxZmqClientComponent.__init__(self, path_prefix=path_prefix)
+ EventsClient.__init__(self, emit_addr, reg_addr)
+ # connect SUB first, otherwise we might miss some event sent from this
+ # same client
+ self._sub = self._zmq_connect(txzmq.ZmqSubConnection, reg_addr)
+ self._sub.gotMessage = self._gotMessage
+ self._push = self._zmq_connect(txzmq.ZmqPushConnection, emit_addr)
+
+ def _gotMessage(self, msg, tag):
+ """
+ Handle an incoming event.
+
+ :param msg: The incoming message.
+ :type msg: list(str)
+ """
+ event = getattr(catalog, tag)
+ content = pickle.loads(msg)
+ self._handle_event(event, content)
+
+ def _subscribe(self, tag):
+ """
+ Subscribe to a tag on the zmq SUB socket.
+
+ :param tag: The tag to be subscribed.
+ :type tag: str
+ """
+ self._sub.subscribe(tag)
+
+ def _unsubscribe(self, tag):
+ """
+ Unsubscribe from a tag on the zmq SUB socket.
+
+ :param tag: The tag to be unsubscribed.
+ :type tag: str
+ """
+ self._sub.unsubscribe(tag)
+
+ def _send(self, data):
+ """
+ Send data through PUSH socket.
+
+ :param data: The data to be sent.
+ :type event: str
+ """
+ self._push.send(data)
+
+ def shutdown(self):
+ TxZmqClientComponent.shutdown(self)
+ EventsClient.shutdown(self)
+
+
+def register(event, callback, uid=None, replace=False):
+ """
+ Register a callback to be executed when an event is received.
+
+ :param event: The event that triggers the callback.
+ :type event: str
+ :param callback: The callback to be executed.
+ :type callback: callable(event, content)
+ :param uid: The callback uid.
+ :type uid: str
+ :param replace: Wether an eventual callback with same ID should be
+ replaced.
+ :type replace: bool
+
+ :return: The callback uid.
+ :rtype: str
+
+ :raises CallbackAlreadyRegisteredError: when there's already a callback
+ identified by the given uid and replace is False.
+ """
+ return EventsTxClient.instance().register(
+ event, callback, uid=uid, replace=replace)
+
+
+def unregister(event, uid=None):
+ """
+ Unregister callbacks for an event.
+
+ If uid is not None, then only the callback identified by the given uid is
+ removed. Otherwise, all callbacks for the event are removed.
+
+ :param event: The event that triggers the callback.
+ :type event: str
+ :param uid: The callback uid.
+ :type uid: str
+ """
+ return EventsTxClient.instance().unregister(event, uid=uid)
+
+
+def emit(event, *content):
+ """
+ Send an event.
+
+ :param event: The event to be sent.
+ :type event: str
+ :param content: The content of the event.
+ :type content: list
+ """
+ return EventsTxClient.instance().emit(event, *content)
+
+
+def shutdown():
+ """
+ Shutdown the events client.
+ """
+ EventsTxClient.instance().shutdown()
+
+
+def instance():
+ """
+ Return an instance of the events client.
+
+ :return: An instance of the events client.
+ :rtype: EventsClientThread
+ """
+ return EventsTxClient.instance()