summaryrefslogtreecommitdiff
path: root/src/leap/common/events/txclient.py
blob: f3c183e2e4c8722bccc05cc890f394484818446f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# -*- coding: utf-8 -*-
# txclient.py
# Copyright (C) 2013, 2014, 2015 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
The client end point of the events mechanism, implemented using txzmq.

Clients are the communicating parties of the events mechanism. They
communicate by sending messages to a server, which in turn redistributes
messages to other clients.

When a client registers a callback for a given event, it also tells the
server that it wants to be notified whenever events of that type are sent by
some other client.
"""
import logging
import pickle

import txzmq

from leap.common.events.zmq_components import TxZmqClientComponent
from leap.common.events.client import EventsClient
from leap.common.events.client import configure_client
from leap.common.events.server import EMIT_ADDR
from leap.common.events.server import REG_ADDR
from leap.common.events import catalog, flags


logger = logging.getLogger(__name__)


__all__ = [
    "configure_client",
    "EventsTxClient",
    "register",
    "unregister",
    "emit",
    "shutdown",
]


class EventsTxClient(TxZmqClientComponent, EventsClient):
    """
    A twisted events client that listens for events in one address and
    publishes those events to another address.
    """

    def __init__(self, emit_addr=EMIT_ADDR, reg_addr=REG_ADDR,
                 path_prefix=None):
        """
        Initialize the events server.
        """
        TxZmqClientComponent.__init__(self, path_prefix=path_prefix)
        EventsClient.__init__(self, emit_addr, reg_addr)
        # connect SUB first, otherwise we might miss some event sent from this
        # same client
        self._sub = self._zmq_connect(txzmq.ZmqSubConnection, reg_addr)
        self._sub.gotMessage = self._gotMessage
        self._push = self._zmq_connect(txzmq.ZmqPushConnection, emit_addr)

    def _gotMessage(self, msg, tag):
        """
        Handle an incoming event.

        :param msg: The incoming message.
        :type msg: list(str)
        """
        event = getattr(catalog, tag)
        content = pickle.loads(msg)
        self._handle_event(event, content)

    def _subscribe(self, tag):
        """
        Subscribe to a tag on the zmq SUB socket.

        :param tag: The tag to be subscribed.
        :type tag: str
        """
        self._sub.subscribe(tag)

    def _unsubscribe(self, tag):
        """
        Unsubscribe from a tag on the zmq SUB socket.

        :param tag: The tag to be unsubscribed.
        :type tag: str
        """
        self._sub.unsubscribe(tag)

    def _send(self, data):
        """
        Send data through PUSH socket.

        :param data: The data to be sent.
        :type event: str
        """
        self._push.send(data)

    def _run_callback(self, callback, event, content):
        """
        Run a callback.

        :param callback: The callback to be run.
        :type callback: callable(event, *content)
        :param event: The event to be sent.
        :type event: Event
        :param content: The content of the event.
        :type content: list
        """
        callback(event, *content)

    def shutdown(self):
        TxZmqClientComponent.shutdown(self)
        EventsClient.shutdown(self)


def register(event, callback, uid=None, replace=False):
    """
    Register a callback to be executed when an event is received.

    :param event: The event that triggers the callback.
    :type event: str
    :param callback: The callback to be executed.
    :type callback: callable(event, content)
    :param uid: The callback uid.
    :type uid: str
    :param replace: Wether an eventual callback with same ID should be
                    replaced.
    :type replace: bool

    :return: The callback uid.
    :rtype: str

    :raises CallbackAlreadyRegisteredError: when there's already a callback
            identified by the given uid and replace is False.
    """
    return EventsTxClient.instance().register(
        event, callback, uid=uid, replace=replace)


def unregister(event, uid=None):
    """
    Unregister callbacks for an event.

    If uid is not None, then only the callback identified by the given uid is
    removed. Otherwise, all callbacks for the event are removed.

    :param event: The event that triggers the callback.
    :type event: str
    :param uid: The callback uid.
    :type uid: str
    """
    return EventsTxClient.instance().unregister(event, uid=uid)


def emit(event, *content):
    """
    Send an event.

    :param event: The event to be sent.
    :type event: str
    :param content: The content of the event.
    :type content: list
    """
    return EventsTxClient.instance().emit(event, *content)


def shutdown():
    """
    Shutdown the events client.
    """
    EventsTxClient.instance().shutdown()


def instance():
    """
    Return an instance of the events client.

    :return: An instance of the events client.
    :rtype: EventsClientThread
    """
    return EventsTxClient.instance()