summaryrefslogtreecommitdiff
path: root/src/leap/common/tests
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/common/tests')
-rw-r--r--src/leap/common/tests/test_certs.py10
-rw-r--r--src/leap/common/tests/test_events.py54
-rw-r--r--src/leap/common/tests/test_http.py75
3 files changed, 122 insertions, 17 deletions
diff --git a/src/leap/common/tests/test_certs.py b/src/leap/common/tests/test_certs.py
index 999071f..8ebc0f4 100644
--- a/src/leap/common/tests/test_certs.py
+++ b/src/leap/common/tests/test_certs.py
@@ -43,10 +43,10 @@ CERT_NOT_AFTER = (2023, 9, 1, 17, 52, 16, 4, 244, 0)
class CertsTest(BaseLeapTest):
def setUp(self):
- pass
+ self.setUpEnv()
def tearDown(self):
- pass
+ self.tearDownEnv()
def test_should_redownload_if_no_cert(self):
self.assertTrue(certs.should_redownload(certfile=""))
@@ -60,11 +60,13 @@ class CertsTest(BaseLeapTest):
self.assertTrue(certs.should_redownload(cert_path))
def test_should_redownload_if_before(self):
- new_now = lambda: time.struct_time(CERT_NOT_BEFORE)
+ def new_now():
+ time.struct_time(CERT_NOT_BEFORE)
self.assertTrue(certs.should_redownload(TEST_CERT_PEM, now=new_now))
def test_should_redownload_if_after(self):
- new_now = lambda: time.struct_time(CERT_NOT_AFTER)
+ def new_now():
+ time.struct_time(CERT_NOT_AFTER)
self.assertTrue(certs.should_redownload(TEST_CERT_PEM, now=new_now))
def test_not_should_redownload(self):
diff --git a/src/leap/common/tests/test_events.py b/src/leap/common/tests/test_events.py
index 7ef3e1b..2ad097e 100644
--- a/src/leap/common/tests/test_events.py
+++ b/src/leap/common/tests/test_events.py
@@ -1,4 +1,4 @@
-## -*- coding: utf-8 -*-
+# -*- coding: utf-8 -*-
# test_events.py
# Copyright (C) 2013 LEAP
#
@@ -20,11 +20,13 @@ import os
import logging
import time
+from twisted.internet.reactor import callFromThread
from twisted.trial import unittest
from twisted.internet import defer
from leap.common.events import server
from leap.common.events import client
+from leap.common.events import flags
from leap.common.events import txclient
from leap.common.events import catalog
from leap.common.events.errors import CallbackAlreadyRegisteredError
@@ -37,6 +39,7 @@ if 'DEBUG' in os.environ:
class EventsGenericClientTestCase(object):
def setUp(self):
+ flags.set_events_enabled(True)
self._server = server.ensure_server(
emit_addr="tcp://127.0.0.1:0",
reg_addr="tcp://127.0.0.1:0")
@@ -47,6 +50,7 @@ class EventsGenericClientTestCase(object):
def tearDown(self):
self._client.shutdown()
self._server.shutdown()
+ flags.set_events_enabled(False)
# wait a bit for sockets to close properly
time.sleep(0.1)
@@ -59,7 +63,10 @@ class EventsGenericClientTestCase(object):
'There should be no callback for this event.')
# register one event
event1 = catalog.CLIENT_UID
- cbk1 = lambda event, _: True
+
+ def cbk1(event, _):
+ return True
+
uid1 = self._client.register(event1, cbk1)
# assert for correct registration
self.assertTrue(len(callbacks) == 1)
@@ -67,7 +74,10 @@ class EventsGenericClientTestCase(object):
'Could not register event in local client.')
# register another event
event2 = catalog.CLIENT_SESSION_ID
- cbk2 = lambda event, _: True
+
+ def cbk2(event, _):
+ return True
+
uid2 = self._client.register(event2, cbk2)
# assert for correct registration
self.assertTrue(len(callbacks) == 2)
@@ -80,8 +90,13 @@ class EventsGenericClientTestCase(object):
"""
event = catalog.CLIENT_UID
d = defer.Deferred()
- cbk_fail = lambda event, _: d.errback(event)
- cbk_succeed = lambda event, _: d.callback(event)
+
+ def cbk_fail(event, _):
+ return callFromThread(d.errback, event)
+
+ def cbk_succeed(event, _):
+ return callFromThread(d.callback, event)
+
self._client.register(event, cbk_fail, uid=1)
self._client.register(event, cbk_succeed, uid=1, replace=True)
self._client.emit(event, None)
@@ -105,9 +120,15 @@ class EventsGenericClientTestCase(object):
"""
event = catalog.CLIENT_UID
d1 = defer.Deferred()
- cbk1 = lambda event, _: d1.callback(event)
+
+ def cbk1(event, _):
+ return callFromThread(d1.callback, event)
+
d2 = defer.Deferred()
- cbk2 = lambda event, _: d2.callback(event)
+
+ def cbk2(event, _):
+ return d2.callback(event)
+
self._client.register(event, cbk1)
self._client.register(event, cbk2)
self._client.emit(event, None)
@@ -120,8 +141,10 @@ class EventsGenericClientTestCase(object):
"""
event = catalog.CLIENT_UID
d = defer.Deferred()
+
def cbk(events, _):
- d.callback(event)
+ callFromThread(d.callback, event)
+
self._client.register(event, cbk)
self._client.emit(event, None)
return d
@@ -133,14 +156,17 @@ class EventsGenericClientTestCase(object):
event1 = catalog.CLIENT_UID
d = defer.Deferred()
# register more than one callback for the same event
- self._client.register(event1, lambda ev, _: d.errback(None))
- self._client.register(event1, lambda ev, _: d.errback(None))
+ self._client.register(
+ event1, lambda ev, _: callFromThread(d.errback, None))
+ self._client.register(
+ event1, lambda ev, _: callFromThread(d.errback, None))
# unregister and emit the event
self._client.unregister(event1)
self._client.emit(event1, None)
# register and emit another event so the deferred can succeed
event2 = catalog.CLIENT_SESSION_ID
- self._client.register(event2, lambda ev, _: d.callback(None))
+ self._client.register(
+ event2, lambda ev, _: callFromThread(d.callback, None))
self._client.emit(event2, None)
return d
@@ -151,9 +177,11 @@ class EventsGenericClientTestCase(object):
event = catalog.CLIENT_UID
d = defer.Deferred()
# register one callback that would fail
- uid = self._client.register(event, lambda ev, _: d.errback(None))
+ uid = self._client.register(
+ event, lambda ev, _: callFromThread(d.errback, None))
# register one callback that will succeed
- self._client.register(event, lambda ev, _: d.callback(None))
+ self._client.register(
+ event, lambda ev, _: callFromThread(d.callback, None))
# unregister by uid and emit the event
self._client.unregister(event, uid=uid)
self._client.emit(event, None)
diff --git a/src/leap/common/tests/test_http.py b/src/leap/common/tests/test_http.py
new file mode 100644
index 0000000..f44550f
--- /dev/null
+++ b/src/leap/common/tests/test_http.py
@@ -0,0 +1,75 @@
+# -*- coding: utf-8 -*-
+# test_http.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for:
+ * leap/common/http.py
+"""
+import os
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+
+from leap.common import http
+from leap.common.testing.basetest import BaseLeapTest
+
+TEST_CERT_PEM = os.path.join(
+ os.path.split(__file__)[0],
+ '..', 'testing', "leaptest_combined_keycert.pem")
+
+
+class HTTPClientTest(BaseLeapTest):
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_agents_sharing_pool_by_default(self):
+ client = http.HTTPClient()
+ client2 = http.HTTPClient(TEST_CERT_PEM)
+ self.assertNotEquals(
+ client._agent, client2._agent, "Expected dedicated agents")
+ self.assertEquals(
+ client._agent._pool, client2._agent._pool,
+ "Pool was not reused by default")
+
+ def test_agent_can_have_dedicated_custom_pool(self):
+ custom_pool = http._HTTPConnectionPool(
+ None,
+ timeout=10,
+ maxPersistentPerHost=42,
+ persistent=False
+ )
+ self.assertEquals(custom_pool.maxPersistentPerHost, 42,
+ "Custom persistent connections "
+ "limit is not being respected")
+ self.assertFalse(custom_pool.persistent,
+ "Custom persistence is not being respected")
+ default_client = http.HTTPClient()
+ custom_client = http.HTTPClient(pool=custom_pool)
+
+ self.assertNotEquals(
+ default_client._agent, custom_client._agent,
+ "No agent reuse is expected")
+ self.assertEquals(
+ custom_pool, custom_client._agent._pool,
+ "Custom pool usage was not respected")
+
+if __name__ == "__main__":
+ unittest.main()