1 # -*- coding: utf-8 -*-
3 # Copyright (C) 2013 LEAP
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 from twisted.internet.reactor import callFromThread
24 from twisted.trial import unittest
25 from twisted.internet import defer
27 from leap.common.events import server
28 from leap.common.events import client
29 from leap.common.events import flags
30 from leap.common.events import txclient
31 from leap.common.events import catalog
32 from leap.common.events.errors import CallbackAlreadyRegisteredError
35 if 'DEBUG' in os.environ:
36 logging.basicConfig(level=logging.DEBUG)
39 class EventsGenericClientTestCase(object):
42 flags.set_events_enabled(True)
43 self._server = server.ensure_server(
44 emit_addr="tcp://127.0.0.1:0",
45 reg_addr="tcp://127.0.0.1:0")
46 self._client.configure_client(
47 emit_addr="tcp://127.0.0.1:%d" % self._server.pull_port,
48 reg_addr="tcp://127.0.0.1:%d" % self._server.pub_port)
51 self._client.shutdown()
52 self._server.shutdown()
53 flags.set_events_enabled(False)
54 # wait a bit for sockets to close properly
57 def test_client_register(self):
59 Ensure clients can register callbacks.
61 callbacks = self._client.instance().callbacks
62 self.assertTrue(len(callbacks) == 0,
63 'There should be no callback for this event.')
65 event1 = catalog.CLIENT_UID
70 uid1 = self._client.register(event1, cbk1)
71 # assert for correct registration
72 self.assertTrue(len(callbacks) == 1)
73 self.assertTrue(callbacks[event1][uid1] == cbk1,
74 'Could not register event in local client.')
75 # register another event
76 event2 = catalog.CLIENT_SESSION_ID
81 uid2 = self._client.register(event2, cbk2)
82 # assert for correct registration
83 self.assertTrue(len(callbacks) == 2)
84 self.assertTrue(callbacks[event2][uid2] == cbk2,
85 'Could not register event in local client.')
87 def test_register_signal_replace(self):
89 Make sure clients can replace already registered callbacks.
91 event = catalog.CLIENT_UID
94 def cbk_fail(event, _):
95 return callFromThread(d.errback, event)
97 def cbk_succeed(event, _):
98 return callFromThread(d.callback, event)
100 self._client.register(event, cbk_fail, uid=1)
101 self._client.register(event, cbk_succeed, uid=1, replace=True)
102 self._client.emit(event, None)
105 def test_register_signal_replace_fails_when_replace_is_false(self):
107 Make sure clients trying to replace already registered callbacks fail
110 event = catalog.CLIENT_UID
111 self._client.register(event, lambda event, _: None, uid=1)
113 CallbackAlreadyRegisteredError,
114 self._client.register,
115 event, lambda event, _: None, uid=1, replace=False)
117 def test_register_more_than_one_callback_works(self):
119 Make sure clients can replace already registered callbacks.
121 event = catalog.CLIENT_UID
122 d1 = defer.Deferred()
125 return callFromThread(d1.callback, event)
127 d2 = defer.Deferred()
130 return d2.callback(event)
132 self._client.register(event, cbk1)
133 self._client.register(event, cbk2)
134 self._client.emit(event, None)
135 d = defer.gatherResults([d1, d2])
138 def test_client_receives_signal(self):
140 Ensure clients can receive signals.
142 event = catalog.CLIENT_UID
146 callFromThread(d.callback, event)
148 self._client.register(event, cbk)
149 self._client.emit(event, None)
152 def test_client_unregister_all(self):
154 Test that the client can unregister all events for one signal.
156 event1 = catalog.CLIENT_UID
158 # register more than one callback for the same event
159 self._client.register(
160 event1, lambda ev, _: callFromThread(d.errback, None))
161 self._client.register(
162 event1, lambda ev, _: callFromThread(d.errback, None))
163 # unregister and emit the event
164 self._client.unregister(event1)
165 self._client.emit(event1, None)
166 # register and emit another event so the deferred can succeed
167 event2 = catalog.CLIENT_SESSION_ID
168 self._client.register(
169 event2, lambda ev, _: callFromThread(d.callback, None))
170 self._client.emit(event2, None)
173 def test_client_unregister_by_uid(self):
175 Test that the client can unregister an event by uid.
177 event = catalog.CLIENT_UID
179 # register one callback that would fail
180 uid = self._client.register(
181 event, lambda ev, _: callFromThread(d.errback, None))
182 # register one callback that will succeed
183 self._client.register(
184 event, lambda ev, _: callFromThread(d.callback, None))
185 # unregister by uid and emit the event
186 self._client.unregister(event, uid=uid)
187 self._client.emit(event, None)
191 class EventsTxClientTestCase(EventsGenericClientTestCase, unittest.TestCase):
196 class EventsClientTestCase(EventsGenericClientTestCase, unittest.TestCase):