summaryrefslogtreecommitdiff
path: root/src/leap/common/events/component.py
blob: 029d1ac18c33091e35e71b1cd778538cf9adfe06 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
# -*- coding: utf-8 -*-
# component.py
# Copyright (C) 2013 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""
The component end point of the events mechanism.

Components are the communicating parties of the events mechanism. They
communicate by sending messages to a server, which in turn redistributes
messages to other components.

When a component registers a callback for a given signal, it also tells the
server that it wants to be notified whenever signals of that type are sent by
some other component.
"""


import logging
import threading


from protobuf.socketrpc import RpcService
from leap.common.events import (
    events_pb2 as proto,
    server,
    daemon,
    mac_auth,
)


logger = logging.getLogger(__name__)


# the `registered_callbacks` dictionary below should have the following
# format:
#
#     { event_signal: [ (uid, callback), ... ], ... }
#
registered_callbacks = {}


class CallbackAlreadyRegistered(Exception):
    """
    Raised when trying to register an already registered callback.
    """
    pass


def ensure_component_daemon():
    """
    Ensure the component daemon is running and listening for incoming
    messages.

    :return: the daemon instance
    :rtype: EventsComponentDaemon
    """
    import time
    daemon = EventsComponentDaemon.ensure(0)
    logger.debug('ensure component daemon')

    # Because we use a random port we want to wait until a port is assigned to
    # local component daemon.

    while not (EventsComponentDaemon.get_instance() and
               EventsComponentDaemon.get_instance().get_port()):
        time.sleep(0.1)
    return daemon


def register(signal, callback, uid=None, replace=False, reqcbk=None,
             timeout=1000):
    """
    Registers a callback to be called when a specific signal event is
    received.

    Will timeout after timeout ms if response has not been received. The
    timeout arg is only used for asynch requests. If a reqcbk callback has
    been supplied the timeout arg is not used. The response value will be
    returned for a synch request but nothing will be returned for an asynch
    request.

    :param signal: the signal that causes the callback to be launched
    :type signal: int (see the `events.proto` file)
    :param callback: the callback to be called when the signal is received
    :type callback: function
        callback(leap.common.events.events_pb2.SignalRequest)
    :param uid: a unique id for the callback
    :type uid: int
    :param replace: should an existent callback with same uid be replaced?
    :type replace: bool
    :param reqcbk: a callback to be called when a response from server is
        received
    :type reqcbk: function
        callback(leap.common.events.events_pb2.EventResponse)
    :param timeout: the timeout for synch calls
    :type timeout: int

    Might raise a CallbackAlreadyRegistered exception if there's already a
    callback identified by the given uid and replace is False.

    :return: the response from server for synch calls or nothing for asynch
        calls.
    :rtype: leap.common.events.events_pb2.EventsResponse or None
    """
    ensure_component_daemon()  # so we can receive registered signals
    # register callback locally
    if signal not in registered_callbacks:
        registered_callbacks[signal] = []
    cbklist = registered_callbacks[signal]
    if uid and filter(lambda (x, y): x == uid, cbklist):
        if not replace:
            raise CallbackAlreadyRegisteredException()
        else:
            registered_callbacks[signal] = filter(lambda(x, y): x != uid,
                                                  cbklist)
    registered_callbacks[signal].append((uid, callback))
    # register callback on server
    request = proto.RegisterRequest()
    request.event = signal
    request.port = EventsComponentDaemon.get_instance().get_port()
    request.mac_method = mac_auth.MacMethod.MAC_NONE
    request.mac = ""
    service = RpcService(proto.EventsServerService_Stub,
                         server.SERVER_PORT, 'localhost')
    logger.info(
        "Sending registration request to server on port %s: %s",
        server.SERVER_PORT,
        str(request)[:40])
    return service.register(request, callback=reqcbk, timeout=timeout)

def unregister(signal, uid=None, reqcbk=None, timeout=1000):
    """
    Unregister a callback.

    If C{uid} is specified, unregisters only the callback identified by that
    unique id. Otherwise, unregisters all callbacks

    :param signal: the signal that causes the callback to be launched
    :type signal: int (see the `events.proto` file)
    :param uid: a unique id for the callback
    :type uid: int
    :param reqcbk: a callback to be called when a response from server is
        received
    :type reqcbk: function
        callback(leap.common.events.events_pb2.EventResponse)
    :param timeout: the timeout for synch calls
    :type timeout: int

    :return: the response from server for synch calls or nothing for asynch
        calls or None if no callback is registered for that signal or uid.
    :rtype: leap.common.events.events_pb2.EventsResponse or None
    """
    if signal not in registered_callbacks or not registered_callbacks[signal]:
        logger.warning("No callback registered for signal %d." % signal)
        return None
    # unregister callback locally
    cbklist = registered_callbacks[signal]
    if uid is not None:
        if filter(lambda (cbkuid, _): cbkuid == uid, cbklist) == []:
            logger.warning("No callback registered for uid %d." % st)
            return None
        registered_callbacks[signal] = filter(lambda(x, y): x != uid, cbklist)
    else:
        # exclude all callbacks for given signal
        registered_callbacks[signal] = []
    # unregister port in server if there are no more callbacks for this signal
    if not registered_callbacks[signal]:
        request = proto.UnregisterRequest()
        request.event = signal
        request.port = EventsComponentDaemon.get_instance().get_port()
        request.mac_method = mac_auth.MacMethod.MAC_NONE
        request.mac = ""
        service = RpcService(proto.EventsServerService_Stub,
                             server.SERVER_PORT, 'localhost')
        logger.info(
            "Sending unregistration request to server on port %s: %s",
            server.SERVER_PORT,
            str(request)[:40])
        return service.unregister(request, callback=reqcbk, timeout=timeout)


def signal(signal, content="", mac_method="", mac="", reqcbk=None,
           timeout=1000):
    """
    Send `signal` event to events server.

    Will timeout after timeout ms if response has not been received. The
    timeout arg is only used for asynch requests.  If a reqcbk callback has
    been supplied the timeout arg is not used. The response value will be
    returned for a synch request but nothing will be returned for an asynch
    request.

    :param signal: the signal that causes the callback to be launched
    :type signal: int (see the `events.proto` file)
    :param content: the contents of the event signal
    :type content: str
    :param mac_method: the method used for auth mac
    :type mac_method: str
    :param mac: the content of the auth mac
    :type mac: str
    :param reqcbk: a callback to be called when a response from server is
        received
    :type reqcbk: function
        callback(leap.common.events.events_pb2.EventResponse)
    :param timeout: the timeout for synch calls
    :type timeout: int

    :return: the response from server for synch calls or nothing for asynch
        calls.
    :rtype: leap.common.events.events_pb2.EventsResponse or None
    """
    request = proto.SignalRequest()
    request.event = signal
    request.content = content
    request.mac_method = mac_method
    request.mac = mac
    service = RpcService(proto.EventsServerService_Stub, server.SERVER_PORT,
                         'localhost')
    logger.info("Sending signal to server: %s", str(request)[:40])
    return service.signal(request, callback=reqcbk, timeout=timeout)


class EventsComponentService(proto.EventsComponentService):
    """
    Service for receiving signal events in components.
    """

    def __init__(self):
        proto.EventsComponentService.__init__(self)

    def signal(self, controller, request, done):
        """
        Receive a signal and run callbacks registered for that signal.

        This method is called whenever a signal request is received from
        server.

        :param controller: used to mediate a single method call
        :type controller: protobuf.socketrpc.controller.SocketRpcController
        :param request: the request received from the component
        :type request: leap.common.events.events_pb2.SignalRequest
        :param done: callback to be called when done
        :type done: protobuf.socketrpc.server.Callback
        """
        logger.info('Received signal from server: %s...' % str(request)[:40])

        # run registered callbacks
        # TODO: verify authentication using mac in incoming message
        if request.event in registered_callbacks:
            for (_, cbk) in registered_callbacks[request.event]:
                # callbacks should be prepared to receive a
                # events_pb2.SignalRequest.
                cbk(request)

        # send response back to server
        response = proto.EventResponse()
        response.status = proto.EventResponse.OK
        done.run(response)


class EventsComponentDaemon(daemon.EventsSingletonDaemon):
    """
    A daemon that listens for incoming events from server.
    """

    @classmethod
    def ensure(cls, port):
        """
        Make sure the daemon is running on the given port.

        :param port: the port in which the daemon should listen
        :type port: int

        :return: a daemon instance
        :rtype: EventsComponentDaemon
        """
        return cls.ensure_service(port, EventsComponentService())