[style] Fixed pep8 warnings
[leap_pycommon.git] / src / leap / common / tests / test_events.py
1 # -*- coding: utf-8 -*-
2 # test_events.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
19 import os
20 import logging
21 import time
22
23 from twisted.internet.reactor import callFromThread
24 from twisted.trial import unittest
25 from twisted.internet import defer
26
27 from leap.common.events import server
28 from leap.common.events import client
29 from leap.common.events import txclient
30 from leap.common.events import catalog
31 from leap.common.events.errors import CallbackAlreadyRegisteredError
32
33
34 if 'DEBUG' in os.environ:
35     logging.basicConfig(level=logging.DEBUG)
36
37
38 class EventsGenericClientTestCase(object):
39
40     def setUp(self):
41         self._server = server.ensure_server(
42             emit_addr="tcp://127.0.0.1:0",
43             reg_addr="tcp://127.0.0.1:0")
44         self._client.configure_client(
45             emit_addr="tcp://127.0.0.1:%d" % self._server.pull_port,
46             reg_addr="tcp://127.0.0.1:%d" % self._server.pub_port)
47
48     def tearDown(self):
49         self._client.shutdown()
50         self._server.shutdown()
51         # wait a bit for sockets to close properly
52         time.sleep(0.1)
53
54     def test_client_register(self):
55         """
56         Ensure clients can register callbacks.
57         """
58         callbacks = self._client.instance().callbacks
59         self.assertTrue(len(callbacks) == 0,
60                         'There should be no callback for this event.')
61         # register one event
62         event1 = catalog.CLIENT_UID
63
64         def cbk1(event, _):
65             return True
66
67         uid1 = self._client.register(event1, cbk1)
68         # assert for correct registration
69         self.assertTrue(len(callbacks) == 1)
70         self.assertTrue(callbacks[event1][uid1] == cbk1,
71                         'Could not register event in local client.')
72         # register another event
73         event2 = catalog.CLIENT_SESSION_ID
74
75         def cbk2(event, _):
76             return True
77
78         uid2 = self._client.register(event2, cbk2)
79         # assert for correct registration
80         self.assertTrue(len(callbacks) == 2)
81         self.assertTrue(callbacks[event2][uid2] == cbk2,
82                         'Could not register event in local client.')
83
84     def test_register_signal_replace(self):
85         """
86         Make sure clients can replace already registered callbacks.
87         """
88         event = catalog.CLIENT_UID
89         d = defer.Deferred()
90
91         def cbk_fail(event, _):
92             return callFromThread(d.errback, event)
93
94         def cbk_succeed(event, _):
95             return callFromThread(d.callback, event)
96
97         self._client.register(event, cbk_fail, uid=1)
98         self._client.register(event, cbk_succeed, uid=1, replace=True)
99         self._client.emit(event, None)
100         return d
101
102     def test_register_signal_replace_fails_when_replace_is_false(self):
103         """
104         Make sure clients trying to replace already registered callbacks fail
105         when replace=False
106         """
107         event = catalog.CLIENT_UID
108         self._client.register(event, lambda event, _: None, uid=1)
109         self.assertRaises(
110             CallbackAlreadyRegisteredError,
111             self._client.register,
112             event, lambda event, _: None, uid=1, replace=False)
113
114     def test_register_more_than_one_callback_works(self):
115         """
116         Make sure clients can replace already registered callbacks.
117         """
118         event = catalog.CLIENT_UID
119         d1 = defer.Deferred()
120
121         def cbk1(event, _):
122             return callFromThread(d1.callback, event)
123
124         d2 = defer.Deferred()
125
126         def cbk2(event, _):
127             return d2.callback(event)
128
129         self._client.register(event, cbk1)
130         self._client.register(event, cbk2)
131         self._client.emit(event, None)
132         d = defer.gatherResults([d1, d2])
133         return d
134
135     def test_client_receives_signal(self):
136         """
137         Ensure clients can receive signals.
138         """
139         event = catalog.CLIENT_UID
140         d = defer.Deferred()
141
142         def cbk(events, _):
143             callFromThread(d.callback, event)
144
145         self._client.register(event, cbk)
146         self._client.emit(event, None)
147         return d
148
149     def test_client_unregister_all(self):
150         """
151         Test that the client can unregister all events for one signal.
152         """
153         event1 = catalog.CLIENT_UID
154         d = defer.Deferred()
155         # register more than one callback for the same event
156         self._client.register(
157             event1, lambda ev, _: callFromThread(d.errback, None))
158         self._client.register(
159             event1, lambda ev, _: callFromThread(d.errback, None))
160         # unregister and emit the event
161         self._client.unregister(event1)
162         self._client.emit(event1, None)
163         # register and emit another event so the deferred can succeed
164         event2 = catalog.CLIENT_SESSION_ID
165         self._client.register(
166             event2, lambda ev, _: callFromThread(d.callback, None))
167         self._client.emit(event2, None)
168         return d
169
170     def test_client_unregister_by_uid(self):
171         """
172         Test that the client can unregister an event by uid.
173         """
174         event = catalog.CLIENT_UID
175         d = defer.Deferred()
176         # register one callback that would fail
177         uid = self._client.register(
178             event, lambda ev, _: callFromThread(d.errback, None))
179         # register one callback that will succeed
180         self._client.register(
181             event, lambda ev, _: callFromThread(d.callback, None))
182         # unregister by uid and emit the event
183         self._client.unregister(event, uid=uid)
184         self._client.emit(event, None)
185         return d
186
187
188 class EventsTxClientTestCase(EventsGenericClientTestCase, unittest.TestCase):
189
190     _client = txclient
191
192
193 class EventsClientTestCase(EventsGenericClientTestCase, unittest.TestCase):
194
195     _client = client