diff options
| -rw-r--r-- | src/leap/common/events/__init__.py | 82 | ||||
| -rw-r--r-- | src/leap/common/events/client.py | 34 | ||||
| -rw-r--r-- | src/leap/common/events/server.py | 11 | ||||
| -rw-r--r-- | src/leap/common/tests/test_events.py | 89 | 
4 files changed, 177 insertions, 39 deletions
| diff --git a/src/leap/common/events/__init__.py b/src/leap/common/events/__init__.py index 388ee17..a6fe7c3 100644 --- a/src/leap/common/events/__init__.py +++ b/src/leap/common/events/__init__.py @@ -16,7 +16,58 @@  # along with this program. If not, see <http://www.gnu.org/licenses/>.  """ -An events mechanism that allows for signaling of events between clients. +This is an events mechanism that uses a server to allow for signaling of +events between clients. + +Application components should use the interface available in this file to +register callbacks to be executed upon receiving specific signals, and to send +signals to other components. + +To register a callback to be executed when a specific event is signaled, use +leap.common.events.register(): + +>>> from leap.common.events import register +>>> from leap.common.events.proto import CLIENT_UID +>>> register(CLIENT_UID, lambda req: do_something(req)) + +To signal an event, use leap.common.events.signal(): + +>>> from leap.common.events import signal +>>> from leap.common.events.proto import CLIENT_UID +>>> signal(CLIENT_UID) + + +NOTE ABOUT SYNC/ASYNC REQUESTS: + +Clients always communicate with the server, and never between themselves. +When a client registers a callback for an event, the callback is saved locally +in the client and the server stores the client socket port in a list +associated with that event. When a client signals an event, the server +forwards the signal to all registered client ports, and then each client +executes its callbacks associated with that event locally. + +Each RPC call from a client to the server is followed by a response from the +server to the client. Calls to register() and signal() (and all other RPC +calls) can be synchronous or asynchronous meaning if they will or not wait for +the server's response before returning. + +This mechanism was built on top of protobuf.socketrpc, and because of this RPC +calls are made synchronous or asynchronous in the following way: + +  * If RPC calls receive a parameter called `reqcbk`, then the call is made +    asynchronous. That means that: + +        - an eventual `timeout` parameter is not used, +        - the call returns immediatelly with value None, and +        - the `reqcbk` callback is executed asynchronously upon the arrival of +          a response from the server. + +  * Otherwise, if the `reqcbk` parameter is None, then the call is made in a +    synchronous manner: + +        - if a response from server arrives within `timeout` milliseconds, the +          RPC call returns it; +        - otherwise, the call returns None.  """  import logging @@ -54,9 +105,8 @@ def register(signal, callback, uid=None, replace=False, reqcbk=None,      :param replace: should an existent callback with same uid be replaced?      :type replace: bool      :param reqcbk: a callback to be called when a response from server is -        received -    :type reqcbk: function -        callback(leap.common.events.events_pb2.EventResponse) +                   received +    :type reqcbk: function(leap.common.events.events_pb2.EventResponse)      :param timeout: the timeout for synch calls      :type timeout: int @@ -79,14 +129,13 @@ def unregister(signal, uid=None, reqcbk=None, timeout=1000):      :param uid: a unique id for the callback      :type uid: int      :param reqcbk: a callback to be called when a response from server is -        received -    :type reqcbk: function -        callback(leap.common.events.events_pb2.EventResponse) +                   received +    :type reqcbk: function(leap.common.events.events_pb2.EventResponse)      :param timeout: the timeout for synch calls      :type timeout: int      :return: the response from server for synch calls or nothing for asynch -        calls. +             calls.      :rtype: leap.common.events.events_pb2.EventsResponse or None      """      return client.unregister(signal, uid, reqcbk, timeout) @@ -112,14 +161,13 @@ def signal(signal, content="", mac_method="", mac="", reqcbk=None,      :param mac: the content of the auth mac      :type mac: str      :param reqcbk: a callback to be called when a response from server is -        received -    :type reqcbk: function -        callback(leap.common.events.events_pb2.EventResponse) +                   received +    :type reqcbk: function(leap.common.events.events_pb2.EventResponse)      :param timeout: the timeout for synch calls      :type timeout: int      :return: the response from server for synch calls or nothing for asynch -        calls. +             calls.      :rtype: leap.common.events.events_pb2.EventsResponse or None      """      return client.signal(signal, content, mac_method, mac, reqcbk, timeout) @@ -131,9 +179,8 @@ def ping_client(port, reqcbk=None, timeout=1000):      :param port: the port in which the client should be listening      :type port: int      :param reqcbk: a callback to be called when a response from client is -        received -    :type reqcbk: function -        callback(leap.common.events.events_pb2.EventResponse) +                   received +    :type reqcbk: function(leap.common.events.events_pb2.EventResponse)      :param timeout: the timeout for synch calls      :type timeout: int      """ @@ -147,9 +194,8 @@ def ping_server(port=server.SERVER_PORT, reqcbk=None, timeout=1000):      :param port: the port in which server should be listening      :type port: int      :param reqcbk: a callback to be called when a response from server is -        received -    :type reqcbk: function -        callback(leap.common.events.events_pb2.EventResponse) +                   received +    :type reqcbk: function(leap.common.events.events_pb2.EventResponse)      :param timeout: the timeout for synch calls      :type timeout: int      """ diff --git a/src/leap/common/events/client.py b/src/leap/common/events/client.py index 587b02a..55f14ab 100644 --- a/src/leap/common/events/client.py +++ b/src/leap/common/events/client.py @@ -95,16 +95,14 @@ def register(signal, callback, uid=None, replace=False, reqcbk=None,      :param signal: the signal that causes the callback to be launched      :type signal: int (see the `events.proto` file)      :param callback: the callback to be called when the signal is received -    :type callback: function -        callback(leap.common.events.events_pb2.SignalRequest) +    :type callback: function(leap.common.events.events_pb2.SignalRequest)      :param uid: a unique id for the callback      :type uid: int      :param replace: should an existent callback with same uid be replaced?      :type replace: bool      :param reqcbk: a callback to be called when a response from server is -        received -    :type reqcbk: function -        callback(leap.common.events.events_pb2.EventResponse) +                   received +    :type reqcbk: function(leap.common.events.events_pb2.EventResponse)      :param timeout: the timeout for synch calls      :type timeout: int @@ -112,7 +110,7 @@ def register(signal, callback, uid=None, replace=False, reqcbk=None,      callback identified by the given uid and replace is False.      :return: the response from server for synch calls or nothing for asynch -        calls. +             calls.      :rtype: leap.common.events.events_pb2.EventsResponse or None      """      ensure_client_daemon()  # so we can receive registered signals @@ -153,14 +151,14 @@ def unregister(signal, uid=None, reqcbk=None, timeout=1000):      :param uid: a unique id for the callback      :type uid: int      :param reqcbk: a callback to be called when a response from server is -        received -    :type reqcbk: function -        callback(leap.common.events.events_pb2.EventResponse) +                   received +    :type reqcbk: function(leap.common.events.events_pb2.EventResponse)      :param timeout: the timeout for synch calls      :type timeout: int      :return: the response from server for synch calls or nothing for asynch -        calls or None if no callback is registered for that signal or uid. +             calls or None if no callback is registered for that signal or +             uid.      :rtype: leap.common.events.events_pb2.EventsResponse or None      """      if signal not in registered_callbacks or not registered_callbacks[signal]: @@ -212,14 +210,13 @@ def signal(signal, content="", mac_method="", mac="", reqcbk=None,      :param mac: the content of the auth mac      :type mac: str      :param reqcbk: a callback to be called when a response from server is -        received -    :type reqcbk: function -        callback(leap.common.events.events_pb2.EventResponse) +                   received +    :type reqcbk: function(leap.common.events.events_pb2.EventResponse)      :param timeout: the timeout for synch calls      :type timeout: int      :return: the response from server for synch calls or nothing for asynch -        calls. +             calls.      :rtype: leap.common.events.events_pb2.EventsResponse or None      """      request = proto.SignalRequest() @@ -240,11 +237,14 @@ def ping(port, reqcbk=None, timeout=1000):      :param port: the port in which the client should be listening      :type port: int      :param reqcbk: a callback to be called when a response from client is -        received -    :type reqcbk: function -        callback(leap.common.events.events_pb2.EventResponse) +                   received +    :type reqcbk: function(leap.common.events.events_pb2.EventResponse)      :param timeout: the timeout for synch calls      :type timeout: int + +    :return: the response from client for synch calls or nothing for asynch +             calls. +    :rtype: leap.common.events.events_pb2.EventsResponse or None      """      request = proto.PingRequest()      service = RpcService( diff --git a/src/leap/common/events/server.py b/src/leap/common/events/server.py index daccc61..a7d4da9 100644 --- a/src/leap/common/events/server.py +++ b/src/leap/common/events/server.py @@ -69,7 +69,7 @@ def ensure_server(port=SERVER_PORT):      :rtype: EventsServerDaemon or None      :raise PortAlreadyTaken: Raised if C{port} is already taken by something -        that is not an events server. +                             that is not an events server.      """      try:          # check if port is available @@ -97,11 +97,14 @@ def ping(port=SERVER_PORT, reqcbk=None, timeout=1000):      :param port: the port in which server should be listening      :type port: int      :param reqcbk: a callback to be called when a response from server is -        received -    :type reqcbk: function -        callback(leap.common.events.events_pb2.EventResponse) +                   received +    :type reqcbk: function(leap.common.events.events_pb2.EventResponse)      :param timeout: the timeout for synch calls      :type timeout: int + +    :return: the response from server for synch calls or nothing for asynch +             calls. +    :rtype: leap.common.events.events_pb2.EventsResponse or None      """      request = proto.PingRequest()      service = RpcService( diff --git a/src/leap/common/tests/test_events.py b/src/leap/common/tests/test_events.py index 90124b4..bc04dd6 100644 --- a/src/leap/common/tests/test_events.py +++ b/src/leap/common/tests/test_events.py @@ -337,3 +337,92 @@ class EventsTestCase(unittest.TestCase):          time.sleep(1)  # wait for thread to start.          self.assertRaises(              server.PortAlreadyTaken, server.ensure_server, port) + +    def test_async_register(self): +        """ +        Test asynchronous registering of callbacks. +        """ +        flag = Mock() + +        # executed after async register, when response is received from server +        def reqcbk(request, response): +            flag(request.event) + +        # callback registered by application +        def callback(request): +            pass + +        # passing a callback as reqcbk param makes the call asynchronous +        result = events.register(CLIENT_UID, callback, reqcbk=reqcbk) +        self.assertIsNone(result) +        events.signal(CLIENT_UID) +        time.sleep(1)  # wait for signal to arrive from server +        flag.assert_called_once_with(CLIENT_UID) + +    def test_async_signal(self): +        """ +        Test asynchronous registering of callbacks. +        """ +        flag = Mock() + +        # executed after async signal, when response is received from server +        def reqcbk(request, response): +            flag(request.event) + +        # passing a callback as reqcbk param makes the call asynchronous +        result = events.signal(CLIENT_UID, reqcbk=reqcbk) +        self.assertIsNone(result) +        time.sleep(1)  # wait for signal to arrive from server +        flag.assert_called_once_with(CLIENT_UID) + +    def test_async_unregister(self): +        """ +        Test asynchronous unregistering of callbacks. +        """ +        flag = Mock() + +        # executed after async signal, when response is received from server +        def reqcbk(request, response): +            flag(request.event) + +        # callback registered by application +        def callback(request): +            pass + +        # passing a callback as reqcbk param makes the call asynchronous +        events.register(CLIENT_UID, callback) +        result = events.unregister(CLIENT_UID, reqcbk=reqcbk) +        self.assertIsNone(result) +        time.sleep(1)  # wait for signal to arrive from server +        flag.assert_called_once_with(CLIENT_UID) + +    def test_async_ping_server(self): +        """ +        Test asynchronous pinging of server. +        """ +        flag = Mock() + +        # executed after async signal, when response is received from server +        def reqcbk(request, response): +            flag() + +        result = events.ping_server(reqcbk=reqcbk) +        self.assertIsNone(result) +        time.sleep(1)  # wait for response to arrive from server. +        flag.assert_called_once_with() + +    def test_async_ping_client(self): +        """ +        Test asynchronous pinging of client. +        """ +        flag = Mock() + +        # executed after async signal, when response is received from server +        def reqcbk(request, response): +            flag() + +        daemon = client.ensure_client_daemon() +        result = events.ping_client(daemon.get_port(), reqcbk=reqcbk) +        self.assertIsNone(result) +        time.sleep(1)  # wait for response to arrive from server. +        flag.assert_called_once_with() | 
