1 # -*- coding: utf-8 -*-
3 # Copyright (C) 2013 LEAP
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 from twisted.internet.reactor import callFromThread
24 from twisted.trial import unittest
25 from twisted.internet import defer
27 from leap.common.events import server
28 from leap.common.events import client
29 from leap.common.events import txclient
30 from leap.common.events import catalog
31 from leap.common.events.errors import CallbackAlreadyRegisteredError
34 if 'DEBUG' in os.environ:
35 logging.basicConfig(level=logging.DEBUG)
38 class EventsGenericClientTestCase(object):
41 self._server = server.ensure_server(
42 emit_addr="tcp://127.0.0.1:0",
43 reg_addr="tcp://127.0.0.1:0")
44 self._client.configure_client(
45 emit_addr="tcp://127.0.0.1:%d" % self._server.pull_port,
46 reg_addr="tcp://127.0.0.1:%d" % self._server.pub_port)
49 self._client.shutdown()
50 self._server.shutdown()
51 # wait a bit for sockets to close properly
54 def test_client_register(self):
56 Ensure clients can register callbacks.
58 callbacks = self._client.instance().callbacks
59 self.assertTrue(len(callbacks) == 0,
60 'There should be no callback for this event.')
62 event1 = catalog.CLIENT_UID
67 uid1 = self._client.register(event1, cbk1)
68 # assert for correct registration
69 self.assertTrue(len(callbacks) == 1)
70 self.assertTrue(callbacks[event1][uid1] == cbk1,
71 'Could not register event in local client.')
72 # register another event
73 event2 = catalog.CLIENT_SESSION_ID
78 uid2 = self._client.register(event2, cbk2)
79 # assert for correct registration
80 self.assertTrue(len(callbacks) == 2)
81 self.assertTrue(callbacks[event2][uid2] == cbk2,
82 'Could not register event in local client.')
84 def test_register_signal_replace(self):
86 Make sure clients can replace already registered callbacks.
88 event = catalog.CLIENT_UID
91 def cbk_fail(event, _):
92 return callFromThread(d.errback, event)
94 def cbk_succeed(event, _):
95 return callFromThread(d.callback, event)
97 self._client.register(event, cbk_fail, uid=1)
98 self._client.register(event, cbk_succeed, uid=1, replace=True)
99 self._client.emit(event, None)
102 def test_register_signal_replace_fails_when_replace_is_false(self):
104 Make sure clients trying to replace already registered callbacks fail
107 event = catalog.CLIENT_UID
108 self._client.register(event, lambda event, _: None, uid=1)
110 CallbackAlreadyRegisteredError,
111 self._client.register,
112 event, lambda event, _: None, uid=1, replace=False)
114 def test_register_more_than_one_callback_works(self):
116 Make sure clients can replace already registered callbacks.
118 event = catalog.CLIENT_UID
119 d1 = defer.Deferred()
122 return callFromThread(d1.callback, event)
124 d2 = defer.Deferred()
127 return d2.callback(event)
129 self._client.register(event, cbk1)
130 self._client.register(event, cbk2)
131 self._client.emit(event, None)
132 d = defer.gatherResults([d1, d2])
135 def test_client_receives_signal(self):
137 Ensure clients can receive signals.
139 event = catalog.CLIENT_UID
143 callFromThread(d.callback, event)
145 self._client.register(event, cbk)
146 self._client.emit(event, None)
149 def test_client_unregister_all(self):
151 Test that the client can unregister all events for one signal.
153 event1 = catalog.CLIENT_UID
155 # register more than one callback for the same event
156 self._client.register(
157 event1, lambda ev, _: callFromThread(d.errback, None))
158 self._client.register(
159 event1, lambda ev, _: callFromThread(d.errback, None))
160 # unregister and emit the event
161 self._client.unregister(event1)
162 self._client.emit(event1, None)
163 # register and emit another event so the deferred can succeed
164 event2 = catalog.CLIENT_SESSION_ID
165 self._client.register(
166 event2, lambda ev, _: callFromThread(d.callback, None))
167 self._client.emit(event2, None)
170 def test_client_unregister_by_uid(self):
172 Test that the client can unregister an event by uid.
174 event = catalog.CLIENT_UID
176 # register one callback that would fail
177 uid = self._client.register(
178 event, lambda ev, _: callFromThread(d.errback, None))
179 # register one callback that will succeed
180 self._client.register(
181 event, lambda ev, _: callFromThread(d.callback, None))
182 # unregister by uid and emit the event
183 self._client.unregister(event, uid=uid)
184 self._client.emit(event, None)
188 class EventsTxClientTestCase(EventsGenericClientTestCase, unittest.TestCase):
193 class EventsClientTestCase(EventsGenericClientTestCase, unittest.TestCase):