summaryrefslogtreecommitdiff
path: root/testing/tests/benchmarks/conftest.py
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2016-11-17 22:35:21 -0200
committerdrebs <drebs@leap.se>2016-12-12 09:15:21 -0200
commit378a07113a713a7c25f0fb8510d18ecdae2198bd (patch)
treee1a0d7f7e6c1b252ea412b32d5f7e048e5db4326 /testing/tests/benchmarks/conftest.py
parent87259d4210e3488b00876d7ec83a8cc21e341712 (diff)
[test] rename benchmark tests directory and tag
Diffstat (limited to 'testing/tests/benchmarks/conftest.py')
-rw-r--r--testing/tests/benchmarks/conftest.py57
1 files changed, 57 insertions, 0 deletions
diff --git a/testing/tests/benchmarks/conftest.py b/testing/tests/benchmarks/conftest.py
new file mode 100644
index 00000000..a9cc3464
--- /dev/null
+++ b/testing/tests/benchmarks/conftest.py
@@ -0,0 +1,57 @@
+import pytest
+import random
+import base64
+
+from twisted.internet import threads, reactor
+
+
+# we have to manually setup the events server in order to be able to signal
+# events. This is usually done by the enclosing application using soledad
+# client (i.e. bitmask client).
+from leap.common.events import server
+server.ensure_server()
+
+
+def pytest_addoption(parser):
+ parser.addoption(
+ "--num-docs", type="int", default=100,
+ help="the number of documents to use in performance tests")
+
+
+@pytest.fixture()
+def payload():
+ def generate(size):
+ random.seed(1337) # same seed to avoid different bench results
+ payload_bytes = bytearray(random.getrandbits(8) for _ in xrange(size))
+ # encode as base64 to avoid ascii encode/decode errors
+ return base64.b64encode(payload_bytes)[:size] # remove b64 overhead
+ return generate
+
+
+@pytest.fixture()
+def txbenchmark(benchmark):
+ def blockOnThread(*args, **kwargs):
+ return threads.deferToThread(
+ benchmark, threads.blockingCallFromThread,
+ reactor, *args, **kwargs)
+ return blockOnThread
+
+
+@pytest.fixture()
+def txbenchmark_with_setup(benchmark):
+ def blockOnThreadWithSetup(setup, f):
+ def blocking_runner(*args, **kwargs):
+ return threads.blockingCallFromThread(reactor, f, *args, **kwargs)
+
+ def blocking_setup():
+ args = threads.blockingCallFromThread(reactor, setup)
+ try:
+ return tuple(arg for arg in args), {}
+ except TypeError:
+ return ((args,), {}) if args else None
+
+ def bench():
+ return benchmark.pedantic(blocking_runner, setup=blocking_setup,
+ rounds=4, warmup_rounds=1)
+ return threads.deferToThread(bench)
+ return blockOnThreadWithSetup