1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
|
# -*- coding: utf-8 -*-
# test_events.py
# Copyright (C) 2013 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import logging
import time
from twisted.internet.reactor import callFromThread
from twisted.trial import unittest
from twisted.internet import defer
from leap.common.events import server
from leap.common.events import client
from leap.common.events import flags
from leap.common.events import txclient
from leap.common.events import catalog
from leap.common.events.errors import CallbackAlreadyRegisteredError
if 'DEBUG' in os.environ:
logging.basicConfig(level=logging.DEBUG)
class EventsGenericClientTestCase(object):
def setUp(self):
self._server = server.ensure_server(
emit_addr="tcp://127.0.0.1:0",
reg_addr="tcp://127.0.0.1:0")
self._client.configure_client(
emit_addr="tcp://127.0.0.1:%d" % self._server.pull_port,
reg_addr="tcp://127.0.0.1:%d" % self._server.pub_port)
flags.set_events_enabled(True)
def tearDown(self):
self._client.shutdown()
self._server.shutdown()
flags.set_events_enabled(False)
# wait a bit for sockets to close properly
time.sleep(0.1)
def test_client_register(self):
"""
Ensure clients can register callbacks.
"""
callbacks = self._client.instance().callbacks
self.assertTrue(len(callbacks) == 0,
'There should be no callback for this event.')
# register one event
event1 = catalog.CLIENT_UID
def cbk1(event, _):
return True
uid1 = self._client.register(event1, cbk1)
# assert for correct registration
self.assertTrue(len(callbacks) == 1)
self.assertTrue(callbacks[event1][uid1] == cbk1,
'Could not register event in local client.')
# register another event
event2 = catalog.CLIENT_SESSION_ID
def cbk2(event, _):
return True
uid2 = self._client.register(event2, cbk2)
# assert for correct registration
self.assertTrue(len(callbacks) == 2)
self.assertTrue(callbacks[event2][uid2] == cbk2,
'Could not register event in local client.')
def test_register_signal_replace(self):
"""
Make sure clients can replace already registered callbacks.
"""
event = catalog.CLIENT_UID
d = defer.Deferred()
def cbk_fail(event, _):
return callFromThread(d.errback, event)
def cbk_succeed(event, _):
return callFromThread(d.callback, event)
self._client.register(event, cbk_fail, uid=1)
self._client.register(event, cbk_succeed, uid=1, replace=True)
self._client.emit(event, None)
return d
def test_register_signal_replace_fails_when_replace_is_false(self):
"""
Make sure clients trying to replace already registered callbacks fail
when replace=False
"""
event = catalog.CLIENT_UID
self._client.register(event, lambda event, _: None, uid=1)
self.assertRaises(
CallbackAlreadyRegisteredError,
self._client.register,
event, lambda event, _: None, uid=1, replace=False)
def test_register_more_than_one_callback_works(self):
"""
Make sure clients can replace already registered callbacks.
"""
event = catalog.CLIENT_UID
d1 = defer.Deferred()
def cbk1(event, _):
return callFromThread(d1.callback, event)
d2 = defer.Deferred()
def cbk2(event, _):
return d2.callback(event)
self._client.register(event, cbk1)
self._client.register(event, cbk2)
self._client.emit(event, None)
d = defer.gatherResults([d1, d2])
return d
def test_client_receives_signal(self):
"""
Ensure clients can receive signals.
"""
event = catalog.CLIENT_UID
d = defer.Deferred()
def cbk(events, _):
callFromThread(d.callback, event)
self._client.register(event, cbk)
self._client.emit(event, None)
return d
def test_client_unregister_all(self):
"""
Test that the client can unregister all events for one signal.
"""
event1 = catalog.CLIENT_UID
d = defer.Deferred()
# register more than one callback for the same event
self._client.register(
event1, lambda ev, _: callFromThread(d.errback, None))
self._client.register(
event1, lambda ev, _: callFromThread(d.errback, None))
# unregister and emit the event
self._client.unregister(event1)
self._client.emit(event1, None)
# register and emit another event so the deferred can succeed
event2 = catalog.CLIENT_SESSION_ID
self._client.register(
event2, lambda ev, _: callFromThread(d.callback, None))
self._client.emit(event2, None)
return d
def test_client_unregister_by_uid(self):
"""
Test that the client can unregister an event by uid.
"""
event = catalog.CLIENT_UID
d = defer.Deferred()
# register one callback that would fail
uid = self._client.register(
event, lambda ev, _: callFromThread(d.errback, None))
# register one callback that will succeed
self._client.register(
event, lambda ev, _: callFromThread(d.callback, None))
# unregister by uid and emit the event
self._client.unregister(event, uid=uid)
self._client.emit(event, None)
return d
class EventsTxClientTestCase(EventsGenericClientTestCase, unittest.TestCase):
_client = txclient
class EventsClientTestCase(EventsGenericClientTestCase, unittest.TestCase):
_client = client
|