Add doc and tests about events sync/async behaviour.
authordrebs <drebs@leap.se>
Wed, 24 Jul 2013 20:51:03 +0000 (17:51 -0300)
committerdrebs <drebs@leap.se>
Wed, 24 Jul 2013 21:08:36 +0000 (18:08 -0300)
* Also fix docstrings identation so sphynx doesn't complain.

src/leap/common/events/__init__.py
src/leap/common/events/client.py
src/leap/common/events/server.py
src/leap/common/tests/test_events.py

index 388ee17..a6fe7c3 100644 (file)
 # along with this program. If not, see <http://www.gnu.org/licenses/>.
 
 """
-An events mechanism that allows for signaling of events between clients.
+This is an events mechanism that uses a server to allow for signaling of
+events between clients.
+
+Application components should use the interface available in this file to
+register callbacks to be executed upon receiving specific signals, and to send
+signals to other components.
+
+To register a callback to be executed when a specific event is signaled, use
+leap.common.events.register():
+
+>>> from leap.common.events import register
+>>> from leap.common.events.proto import CLIENT_UID
+>>> register(CLIENT_UID, lambda req: do_something(req))
+
+To signal an event, use leap.common.events.signal():
+
+>>> from leap.common.events import signal
+>>> from leap.common.events.proto import CLIENT_UID
+>>> signal(CLIENT_UID)
+
+
+NOTE ABOUT SYNC/ASYNC REQUESTS:
+
+Clients always communicate with the server, and never between themselves.
+When a client registers a callback for an event, the callback is saved locally
+in the client and the server stores the client socket port in a list
+associated with that event. When a client signals an event, the server
+forwards the signal to all registered client ports, and then each client
+executes its callbacks associated with that event locally.
+
+Each RPC call from a client to the server is followed by a response from the
+server to the client. Calls to register() and signal() (and all other RPC
+calls) can be synchronous or asynchronous meaning if they will or not wait for
+the server's response before returning.
+
+This mechanism was built on top of protobuf.socketrpc, and because of this RPC
+calls are made synchronous or asynchronous in the following way:
+
+  * If RPC calls receive a parameter called `reqcbk`, then the call is made
+    asynchronous. That means that:
+
+        - an eventual `timeout` parameter is not used,
+        - the call returns immediatelly with value None, and
+        - the `reqcbk` callback is executed asynchronously upon the arrival of
+          a response from the server.
+
+  * Otherwise, if the `reqcbk` parameter is None, then the call is made in a
+    synchronous manner:
+
+        - if a response from server arrives within `timeout` milliseconds, the
+          RPC call returns it;
+        - otherwise, the call returns None.
 """
 
 import logging
@@ -54,9 +105,8 @@ def register(signal, callback, uid=None, replace=False, reqcbk=None,
     :param replace: should an existent callback with same uid be replaced?
     :type replace: bool
     :param reqcbk: a callback to be called when a response from server is
-        received
-    :type reqcbk: function
-        callback(leap.common.events.events_pb2.EventResponse)
+                   received
+    :type reqcbk: function(leap.common.events.events_pb2.EventResponse)
     :param timeout: the timeout for synch calls
     :type timeout: int
 
@@ -79,14 +129,13 @@ def unregister(signal, uid=None, reqcbk=None, timeout=1000):
     :param uid: a unique id for the callback
     :type uid: int
     :param reqcbk: a callback to be called when a response from server is
-        received
-    :type reqcbk: function
-        callback(leap.common.events.events_pb2.EventResponse)
+                   received
+    :type reqcbk: function(leap.common.events.events_pb2.EventResponse)
     :param timeout: the timeout for synch calls
     :type timeout: int
 
     :return: the response from server for synch calls or nothing for asynch
-        calls.
+             calls.
     :rtype: leap.common.events.events_pb2.EventsResponse or None
     """
     return client.unregister(signal, uid, reqcbk, timeout)
@@ -112,14 +161,13 @@ def signal(signal, content="", mac_method="", mac="", reqcbk=None,
     :param mac: the content of the auth mac
     :type mac: str
     :param reqcbk: a callback to be called when a response from server is
-        received
-    :type reqcbk: function
-        callback(leap.common.events.events_pb2.EventResponse)
+                   received
+    :type reqcbk: function(leap.common.events.events_pb2.EventResponse)
     :param timeout: the timeout for synch calls
     :type timeout: int
 
     :return: the response from server for synch calls or nothing for asynch
-        calls.
+             calls.
     :rtype: leap.common.events.events_pb2.EventsResponse or None
     """
     return client.signal(signal, content, mac_method, mac, reqcbk, timeout)
@@ -131,9 +179,8 @@ def ping_client(port, reqcbk=None, timeout=1000):
     :param port: the port in which the client should be listening
     :type port: int
     :param reqcbk: a callback to be called when a response from client is
-        received
-    :type reqcbk: function
-        callback(leap.common.events.events_pb2.EventResponse)
+                   received
+    :type reqcbk: function(leap.common.events.events_pb2.EventResponse)
     :param timeout: the timeout for synch calls
     :type timeout: int
     """
@@ -147,9 +194,8 @@ def ping_server(port=server.SERVER_PORT, reqcbk=None, timeout=1000):
     :param port: the port in which server should be listening
     :type port: int
     :param reqcbk: a callback to be called when a response from server is
-        received
-    :type reqcbk: function
-        callback(leap.common.events.events_pb2.EventResponse)
+                   received
+    :type reqcbk: function(leap.common.events.events_pb2.EventResponse)
     :param timeout: the timeout for synch calls
     :type timeout: int
     """
index 587b02a..55f14ab 100644 (file)
@@ -95,16 +95,14 @@ def register(signal, callback, uid=None, replace=False, reqcbk=None,
     :param signal: the signal that causes the callback to be launched
     :type signal: int (see the `events.proto` file)
     :param callback: the callback to be called when the signal is received
-    :type callback: function
-        callback(leap.common.events.events_pb2.SignalRequest)
+    :type callback: function(leap.common.events.events_pb2.SignalRequest)
     :param uid: a unique id for the callback
     :type uid: int
     :param replace: should an existent callback with same uid be replaced?
     :type replace: bool
     :param reqcbk: a callback to be called when a response from server is
-        received
-    :type reqcbk: function
-        callback(leap.common.events.events_pb2.EventResponse)
+                   received
+    :type reqcbk: function(leap.common.events.events_pb2.EventResponse)
     :param timeout: the timeout for synch calls
     :type timeout: int
 
@@ -112,7 +110,7 @@ def register(signal, callback, uid=None, replace=False, reqcbk=None,
     callback identified by the given uid and replace is False.
 
     :return: the response from server for synch calls or nothing for asynch
-        calls.
+             calls.
     :rtype: leap.common.events.events_pb2.EventsResponse or None
     """
     ensure_client_daemon()  # so we can receive registered signals
@@ -153,14 +151,14 @@ def unregister(signal, uid=None, reqcbk=None, timeout=1000):
     :param uid: a unique id for the callback
     :type uid: int
     :param reqcbk: a callback to be called when a response from server is
-        received
-    :type reqcbk: function
-        callback(leap.common.events.events_pb2.EventResponse)
+                   received
+    :type reqcbk: function(leap.common.events.events_pb2.EventResponse)
     :param timeout: the timeout for synch calls
     :type timeout: int
 
     :return: the response from server for synch calls or nothing for asynch
-        calls or None if no callback is registered for that signal or uid.
+             calls or None if no callback is registered for that signal or
+             uid.
     :rtype: leap.common.events.events_pb2.EventsResponse or None
     """
     if signal not in registered_callbacks or not registered_callbacks[signal]:
@@ -212,14 +210,13 @@ def signal(signal, content="", mac_method="", mac="", reqcbk=None,
     :param mac: the content of the auth mac
     :type mac: str
     :param reqcbk: a callback to be called when a response from server is
-        received
-    :type reqcbk: function
-        callback(leap.common.events.events_pb2.EventResponse)
+                   received
+    :type reqcbk: function(leap.common.events.events_pb2.EventResponse)
     :param timeout: the timeout for synch calls
     :type timeout: int
 
     :return: the response from server for synch calls or nothing for asynch
-        calls.
+             calls.
     :rtype: leap.common.events.events_pb2.EventsResponse or None
     """
     request = proto.SignalRequest()
@@ -240,11 +237,14 @@ def ping(port, reqcbk=None, timeout=1000):
     :param port: the port in which the client should be listening
     :type port: int
     :param reqcbk: a callback to be called when a response from client is
-        received
-    :type reqcbk: function
-        callback(leap.common.events.events_pb2.EventResponse)
+                   received
+    :type reqcbk: function(leap.common.events.events_pb2.EventResponse)
     :param timeout: the timeout for synch calls
     :type timeout: int
+
+    :return: the response from client for synch calls or nothing for asynch
+             calls.
+    :rtype: leap.common.events.events_pb2.EventsResponse or None
     """
     request = proto.PingRequest()
     service = RpcService(
index daccc61..a7d4da9 100644 (file)
@@ -69,7 +69,7 @@ def ensure_server(port=SERVER_PORT):
     :rtype: EventsServerDaemon or None
 
     :raise PortAlreadyTaken: Raised if C{port} is already taken by something
-        that is not an events server.
+                             that is not an events server.
     """
     try:
         # check if port is available
@@ -97,11 +97,14 @@ def ping(port=SERVER_PORT, reqcbk=None, timeout=1000):
     :param port: the port in which server should be listening
     :type port: int
     :param reqcbk: a callback to be called when a response from server is
-        received
-    :type reqcbk: function
-        callback(leap.common.events.events_pb2.EventResponse)
+                   received
+    :type reqcbk: function(leap.common.events.events_pb2.EventResponse)
     :param timeout: the timeout for synch calls
     :type timeout: int
+
+    :return: the response from server for synch calls or nothing for asynch
+             calls.
+    :rtype: leap.common.events.events_pb2.EventsResponse or None
     """
     request = proto.PingRequest()
     service = RpcService(
index 90124b4..bc04dd6 100644 (file)
@@ -337,3 +337,92 @@ class EventsTestCase(unittest.TestCase):
         time.sleep(1)  # wait for thread to start.
         self.assertRaises(
             server.PortAlreadyTaken, server.ensure_server, port)
+
+    def test_async_register(self):
+        """
+        Test asynchronous registering of callbacks.
+        """
+        flag = Mock()
+
+        # executed after async register, when response is received from server
+        def reqcbk(request, response):
+            flag(request.event)
+
+        # callback registered by application
+        def callback(request):
+            pass
+
+        # passing a callback as reqcbk param makes the call asynchronous
+        result = events.register(CLIENT_UID, callback, reqcbk=reqcbk)
+        self.assertIsNone(result)
+        events.signal(CLIENT_UID)
+        time.sleep(1)  # wait for signal to arrive from server
+        flag.assert_called_once_with(CLIENT_UID)
+
+    def test_async_signal(self):
+        """
+        Test asynchronous registering of callbacks.
+        """
+        flag = Mock()
+
+        # executed after async signal, when response is received from server
+        def reqcbk(request, response):
+            flag(request.event)
+
+        # passing a callback as reqcbk param makes the call asynchronous
+        result = events.signal(CLIENT_UID, reqcbk=reqcbk)
+        self.assertIsNone(result)
+        time.sleep(1)  # wait for signal to arrive from server
+        flag.assert_called_once_with(CLIENT_UID)
+
+    def test_async_unregister(self):
+        """
+        Test asynchronous unregistering of callbacks.
+        """
+        flag = Mock()
+
+        # executed after async signal, when response is received from server
+        def reqcbk(request, response):
+            flag(request.event)
+
+        # callback registered by application
+        def callback(request):
+            pass
+
+        # passing a callback as reqcbk param makes the call asynchronous
+        events.register(CLIENT_UID, callback)
+        result = events.unregister(CLIENT_UID, reqcbk=reqcbk)
+        self.assertIsNone(result)
+        time.sleep(1)  # wait for signal to arrive from server
+        flag.assert_called_once_with(CLIENT_UID)
+
+    def test_async_ping_server(self):
+        """
+        Test asynchronous pinging of server.
+        """
+        flag = Mock()
+
+        # executed after async signal, when response is received from server
+        def reqcbk(request, response):
+            flag()
+
+        result = events.ping_server(reqcbk=reqcbk)
+        self.assertIsNone(result)
+        time.sleep(1)  # wait for response to arrive from server.
+        flag.assert_called_once_with()
+
+    def test_async_ping_client(self):
+        """
+        Test asynchronous pinging of client.
+        """
+        flag = Mock()
+
+        # executed after async signal, when response is received from server
+        def reqcbk(request, response):
+            flag()
+
+        daemon = client.ensure_client_daemon()
+        result = events.ping_client(daemon.get_port(), reqcbk=reqcbk)
+        self.assertIsNone(result)
+        time.sleep(1)  # wait for response to arrive from server.
+        flag.assert_called_once_with()