[tests] adapt events tests to recent changes
[leap_pycommon.git] / src / leap / common / events / tests / test_events.py
1 # -*- coding: utf-8 -*-
2 # test_events.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 """
18 Tests for the events framework
19 """
20 import os
21 import logging
22
23 from twisted.internet.reactor import callFromThread
24 from twisted.trial import unittest
25 from twisted.internet import defer
26
27 from txzmq import ZmqFactory
28
29 from leap.common.events import server
30 from leap.common.events import client
31 from leap.common.events import flags
32 from leap.common.events import txclient
33 from leap.common.events import catalog
34 from leap.common.events.errors import CallbackAlreadyRegisteredError
35
36
37 if 'DEBUG' in os.environ:
38     logging.basicConfig(level=logging.DEBUG)
39
40
41 class EventsGenericClientTestCase(object):
42
43     def setUp(self):
44         flags.set_events_enabled(True)
45         self.factory = ZmqFactory()
46         self._server = server.ensure_server(
47             emit_addr="tcp://127.0.0.1:0",
48             reg_addr="tcp://127.0.0.1:0",
49             factory=self.factory,
50             enable_curve=False)
51
52         self._client.configure_client(
53             emit_addr="tcp://127.0.0.1:%d" % self._server.pull_port,
54             reg_addr="tcp://127.0.0.1:%d" % self._server.pub_port,
55             factory=self.factory, enable_curve=False)
56
57     def tearDown(self):
58         flags.set_events_enabled(False)
59         self.factory.shutdown()
60         self._client.instance().reset()
61
62     def test_client_register(self):
63         """
64         Ensure clients can register callbacks.
65         """
66         callbacks = self._client.instance().callbacks
67         self.assertTrue(len(callbacks) == 0,
68                         'There should be no callback for this event.')
69         # register one event
70         event1 = catalog.CLIENT_UID
71
72         def cbk1(event, _):
73             return True
74
75         uid1 = self._client.register(event1, cbk1)
76         # assert for correct registration
77         self.assertTrue(len(callbacks) == 1)
78         self.assertTrue(callbacks[event1][uid1] == cbk1,
79                         'Could not register event in local client.')
80         # register another event
81         event2 = catalog.CLIENT_SESSION_ID
82
83         def cbk2(event, _):
84             return True
85
86         uid2 = self._client.register(event2, cbk2)
87         # assert for correct registration
88         self.assertTrue(len(callbacks) == 2)
89         self.assertTrue(callbacks[event2][uid2] == cbk2,
90                         'Could not register event in local client.')
91
92
93     def test_register_signal_replace(self):
94         """
95         Make sure clients can replace already registered callbacks.
96         """
97         event = catalog.CLIENT_UID
98         d = defer.Deferred()
99
100         def cbk_fail(event, _):
101             return callFromThread(d.errback, event)
102
103         def cbk_succeed(event, _):
104             return callFromThread(d.callback, event)
105
106         self._client.register(event, cbk_fail, uid=1)
107         self._client.register(event, cbk_succeed, uid=1, replace=True)
108         self._client.emit(event, None)
109         return d
110
111     def test_register_signal_replace_fails_when_replace_is_false(self):
112         """
113         Make sure clients trying to replace already registered callbacks fail
114         when replace=False
115         """
116         event = catalog.CLIENT_UID
117         self._client.register(event, lambda event, _: None, uid=1)
118         self.assertRaises(
119             CallbackAlreadyRegisteredError,
120             self._client.register,
121             event, lambda event, _: None, uid=1, replace=False)
122
123     def test_register_more_than_one_callback_works(self):
124         """
125         Make sure clients can replace already registered callbacks.
126         """
127         event = catalog.CLIENT_UID
128         d1 = defer.Deferred()
129
130         def cbk1(event, _):
131             return callFromThread(d1.callback, event)
132
133         d2 = defer.Deferred()
134
135         def cbk2(event, _):
136             return d2.callback(event)
137
138         self._client.register(event, cbk1)
139         self._client.register(event, cbk2)
140         self._client.emit(event, None)
141         d = defer.gatherResults([d1, d2])
142         return d
143
144     def test_client_receives_signal(self):
145         """
146         Ensure clients can receive signals.
147         """
148         event = catalog.CLIENT_UID
149         d = defer.Deferred()
150
151         def cbk(events, _):
152             callFromThread(d.callback, event)
153
154         self._client.register(event, cbk)
155         self._client.emit(event, None)
156         return d
157
158     def test_client_unregister_all(self):
159         """
160         Test that the client can unregister all events for one signal.
161         """
162         event1 = catalog.CLIENT_UID
163         d = defer.Deferred()
164         # register more than one callback for the same event
165         self._client.register(
166             event1, lambda ev, _: callFromThread(d.errback, None))
167         self._client.register(
168             event1, lambda ev, _: callFromThread(d.errback, None))
169         # unregister and emit the event
170         self._client.unregister(event1)
171         self._client.emit(event1, None)
172         # register and emit another event so the deferred can succeed
173         event2 = catalog.CLIENT_SESSION_ID
174         self._client.register(
175             event2, lambda ev, _: callFromThread(d.callback, None))
176         self._client.emit(event2, None)
177         return d
178
179     def test_client_unregister_by_uid(self):
180         """
181         Test that the client can unregister an event by uid.
182         """
183         event = catalog.CLIENT_UID
184         d = defer.Deferred()
185         # register one callback that would fail
186         uid = self._client.register(
187             event, lambda ev, _: callFromThread(d.errback, None))
188         # register one callback that will succeed
189         self._client.register(
190             event, lambda ev, _: callFromThread(d.callback, None))
191         # unregister by uid and emit the event
192         self._client.unregister(event, uid=uid)
193         self._client.emit(event, None)
194         return d
195
196
197 class EventsTxClientTestCase(EventsGenericClientTestCase, unittest.TestCase):
198
199     _client = txclient
200
201
202 class EventsClientTestCase(EventsGenericClientTestCase, unittest.TestCase):
203
204     _client = client