1 ## -*- coding: utf-8 -*-
3 # Copyright (C) 2013 LEAP
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 from twisted.internet.reactor import callFromThread
24 from twisted.trial import unittest
25 from twisted.internet import defer
27 from leap.common.events import server
28 from leap.common.events import client
29 from leap.common.events import txclient
30 from leap.common.events import catalog
31 from leap.common.events.errors import CallbackAlreadyRegisteredError
34 if 'DEBUG' in os.environ:
35 logging.basicConfig(level=logging.DEBUG)
38 class EventsGenericClientTestCase(object):
41 self._server = server.ensure_server(
42 emit_addr="tcp://127.0.0.1:0",
43 reg_addr="tcp://127.0.0.1:0")
44 self._client.configure_client(
45 emit_addr="tcp://127.0.0.1:%d" % self._server.pull_port,
46 reg_addr="tcp://127.0.0.1:%d" % self._server.pub_port)
49 self._client.shutdown()
50 self._server.shutdown()
51 # wait a bit for sockets to close properly
54 def test_client_register(self):
56 Ensure clients can register callbacks.
58 callbacks = self._client.instance().callbacks
59 self.assertTrue(len(callbacks) == 0,
60 'There should be no callback for this event.')
62 event1 = catalog.CLIENT_UID
63 cbk1 = lambda event, _: True
64 uid1 = self._client.register(event1, cbk1)
65 # assert for correct registration
66 self.assertTrue(len(callbacks) == 1)
67 self.assertTrue(callbacks[event1][uid1] == cbk1,
68 'Could not register event in local client.')
69 # register another event
70 event2 = catalog.CLIENT_SESSION_ID
71 cbk2 = lambda event, _: True
72 uid2 = self._client.register(event2, cbk2)
73 # assert for correct registration
74 self.assertTrue(len(callbacks) == 2)
75 self.assertTrue(callbacks[event2][uid2] == cbk2,
76 'Could not register event in local client.')
78 def test_register_signal_replace(self):
80 Make sure clients can replace already registered callbacks.
82 event = catalog.CLIENT_UID
84 cbk_fail = lambda event, _: callFromThread(d.errback, event)
85 cbk_succeed = lambda event, _: callFromThread(d.callback, event)
86 self._client.register(event, cbk_fail, uid=1)
87 self._client.register(event, cbk_succeed, uid=1, replace=True)
88 self._client.emit(event, None)
91 def test_register_signal_replace_fails_when_replace_is_false(self):
93 Make sure clients trying to replace already registered callbacks fail
96 event = catalog.CLIENT_UID
97 self._client.register(event, lambda event, _: None, uid=1)
99 CallbackAlreadyRegisteredError,
100 self._client.register,
101 event, lambda event, _: None, uid=1, replace=False)
103 def test_register_more_than_one_callback_works(self):
105 Make sure clients can replace already registered callbacks.
107 event = catalog.CLIENT_UID
108 d1 = defer.Deferred()
109 cbk1 = lambda event, _: callFromThread(d1.callback, event)
110 d2 = defer.Deferred()
111 cbk2 = lambda event, _: callFromThread(d2.callback, event)
112 self._client.register(event, cbk1)
113 self._client.register(event, cbk2)
114 self._client.emit(event, None)
115 d = defer.gatherResults([d1, d2])
118 def test_client_receives_signal(self):
120 Ensure clients can receive signals.
122 event = catalog.CLIENT_UID
125 callFromThread(d.callback, event)
126 self._client.register(event, cbk)
127 self._client.emit(event, None)
130 def test_client_unregister_all(self):
132 Test that the client can unregister all events for one signal.
134 event1 = catalog.CLIENT_UID
136 # register more than one callback for the same event
137 self._client.register(
138 event1, lambda ev, _: callFromThread(d.errback, None))
139 self._client.register(
140 event1, lambda ev, _: callFromThread(d.errback, None))
141 # unregister and emit the event
142 self._client.unregister(event1)
143 self._client.emit(event1, None)
144 # register and emit another event so the deferred can succeed
145 event2 = catalog.CLIENT_SESSION_ID
146 self._client.register(
147 event2, lambda ev, _: callFromThread(d.callback, None))
148 self._client.emit(event2, None)
151 def test_client_unregister_by_uid(self):
153 Test that the client can unregister an event by uid.
155 event = catalog.CLIENT_UID
157 # register one callback that would fail
158 uid = self._client.register(
159 event, lambda ev, _: callFromThread(d.errback, None))
160 # register one callback that will succeed
161 self._client.register(
162 event, lambda ev, _: callFromThread(d.callback, None))
163 # unregister by uid and emit the event
164 self._client.unregister(event, uid=uid)
165 self._client.emit(event, None)
169 class EventsTxClientTestCase(EventsGenericClientTestCase, unittest.TestCase):
174 class EventsClientTestCase(EventsGenericClientTestCase, unittest.TestCase):