summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordrebs <drebs@riseup.net>2017-09-01 14:16:21 -0300
committerdrebs <drebs@riseup.net>2017-09-05 11:17:51 -0300
commit7aa5c3755ecbe8a5bbb5c2579a92bd80c815340f (patch)
tree662ac35e225063c76a721380326ec9db1d50917b
parent4d9fbf01b7fc9c0da8c2e267945d10af3837f161 (diff)
[test] refactor and fixes for blobs vs legacy test
- create payloads before running tests - use different group names for different amount/sizes - move legacy index creation to setup function - limit concurrency on blobmanager to 2 (same as thread pool size) - add a VACUUM call to observe blobs db size decreasing between tests - properly cleanup server between blobs test runs - refactor blobs pipeline so it makes more sense and is easier to read
-rw-r--r--testing/tests/benchmarks/test_legacy_vs_blobs.py131
1 files changed, 82 insertions, 49 deletions
diff --git a/testing/tests/benchmarks/test_legacy_vs_blobs.py b/testing/tests/benchmarks/test_legacy_vs_blobs.py
index 89a427f5..5be6f25f 100644
--- a/testing/tests/benchmarks/test_legacy_vs_blobs.py
+++ b/testing/tests/benchmarks/test_legacy_vs_blobs.py
@@ -47,6 +47,7 @@
import base64
import pytest
+import random
import sys
import treq
import uuid
@@ -61,10 +62,17 @@ from leap.soledad.common.blobs import Flags
from leap.soledad.client._db.blobs import BlobDoc
-PARTS_SIZES = {
- 'headers': 4000,
- 'metadata': 100,
- 'flags': 500,
+def payload(size):
+ random.seed(1337) # same seed to avoid different bench results
+ payload_bytes = bytearray(random.getrandbits(8) for _ in xrange(size))
+ # encode as base64 to avoid ascii encode/decode errors
+ return base64.b64encode(payload_bytes)[:size] # remove b64 overhead
+
+
+PARTS = {
+ 'headers': payload(4000),
+ 'metadata': payload(100),
+ 'flags': payload(500),
}
@@ -92,12 +100,13 @@ def load_up_legacy(client, amount, content):
yield client.sync()
+@pytest.inlineCallbacks
def process_incoming_docs(client, docs):
deferreds = []
for doc in docs:
# create fake documents that represent message
- for name in PARTS_SIZES.keys():
+ for name in PARTS.keys():
d = client.create_doc({name: doc.content[name]})
deferreds.append(d)
@@ -114,25 +123,29 @@ def process_incoming_docs(client, docs):
yield gatherResults(deferreds)
-def create_legacy_test(downloads, size):
+def create_legacy_test(amount, size):
+ group = 'test_legacy_vs_blobs_%d_%dk' % (amount, (size / 1000))
+
@pytest.inlineCallbacks
@pytest.mark.skip(reason="avoid running for all commits")
- @pytest.mark.benchmark(group="test_legacy_vs_blobs")
- def test(soledad_client, txbenchmark_with_setup, payload):
+ @pytest.mark.benchmark(group=group)
+ def test(soledad_client, txbenchmark_with_setup):
client = soledad_client()
+ # setup the content of initial documents representing incoming emails
content = {'content': payload(size), 'incoming': True}
- for n, s in PARTS_SIZES.items():
- content[n] = payload(s)
+ for name, data in PARTS.items():
+ content[name] = data
@pytest.inlineCallbacks
def setup():
- yield load_up_legacy(client, downloads, content)
- returnValue(soledad_client(force_fresh_db=True))
+ yield load_up_legacy(client, amount, content)
+ clean_client = soledad_client(force_fresh_db=True)
+ yield clean_client.create_index('incoming', 'bool(incoming)')
+ returnValue(clean_client)
@pytest.inlineCallbacks
def legacy_pipeline(client):
- yield client.create_index('incoming', 'bool(incoming)')
yield client.sync()
docs = yield client.get_from_index('incoming', '1')
yield process_incoming_docs(client, docs)
@@ -153,8 +166,8 @@ test_legacy_1000_10k = create_legacy_test(1000, 10 * 1000)
# "Incoming blobs" mail pipeline:
#
-# limit the amount of concurrent accesses to some resource
-semaphore = DeferredSemaphore(5)
+# used to limit the amount of concurrent accesses to the blob manager
+semaphore = DeferredSemaphore(2)
# deliver data to a user by using the incoming api at given url.
@@ -164,73 +177,93 @@ def deliver(url, user_uuid, token, data):
return treq.put(uri, headers={'Authorization': auth}, data=BytesIO(data))
-# create a bunch of local documents representing email messages
+def reclaim_free_space(client):
+ return client.blobmanager.local.dbpool.runQuery("VACUUM")
+
+
+@pytest.inlineCallbacks
def load_up_blobs(client, amount, data):
+ # make sure there are no document from previous runs
+ yield client.sync()
+ _, docs = yield client.get_all_docs()
deferreds = []
+ for doc in docs:
+ d = client.delete_doc(doc)
+ deferreds.append(d)
+ yield gatherResults(deferreds)
+ yield client.sync()
+
+ # delete all payload from blobs db and server
+ ids = yield client.blobmanager.local_list(namespace='payload')
+ for blob_id in ids:
+ yield client.blobmanager.delete(blob_id, namespace='payload')
+ yield reclaim_free_space(client)
+
+ # create a bunch of incoming blobs
+ deferreds = []
+ semaphore = DeferredSemaphore(100)
for i in xrange(amount):
d = semaphore.run(
deliver, client.server_url, client.uuid, client.token, data)
deferreds.append(d)
- return gatherResults(deferreds)
+ yield gatherResults(deferreds)
@pytest.inlineCallbacks
-def download_incoming_blobs(client):
- # get list of pending incoming blobs
- pending = yield client.blobmanager.remote_list(
- namespace='MX', filter_flags=Flags.PENDING)
-
- # download incoming blobs
+def process_incoming_blobs(client, pending):
+ # process items
deferreds = []
for item in pending:
- d = semaphore.run(client.blobmanager.get, item, namespace='MX')
+ d = process_one_incoming_blob(client, item)
deferreds.append(d)
- incoming = yield gatherResults(deferreds)
- returnValue((pending, incoming))
+ yield gatherResults(deferreds)
@pytest.inlineCallbacks
-def create_local_data(client, incoming, payload):
+def process_one_incoming_blob(client, item):
+ fd = yield semaphore.run(
+ client.blobmanager.get, item, namespace='MX')
+
+ # create metadata docs
deferreds = []
- for item in incoming:
- for name, size in PARTS_SIZES.items():
- d = client.create_doc({name: payload(size)})
- deferreds.append(d)
- doc = BlobDoc(item, blob_id=uuid.uuid4().hex)
- size = sys.getsizeof(item)
- d = semaphore.run(
- client.blobmanager.put, doc, size, namespace='payload')
+ for name, data in PARTS.items():
+ d = client.create_doc({name: data})
deferreds.append(d)
+
+ # put the incoming blob as it would be done after mail processing
+ doc = BlobDoc(fd, blob_id=uuid.uuid4().hex)
+ size = sys.getsizeof(fd)
+ d = semaphore.run(
+ client.blobmanager.put, doc, size, namespace='payload')
+ deferreds.append(d)
yield gatherResults(deferreds)
+ # delete incoming blob
+ yield semaphore.run(
+ client.blobmanager.delete, item, namespace='MX')
-@pytest.inlineCallbacks
-def delete_pending_from_server(client, pending):
- deferreds = []
- for item in pending:
- d = semaphore.run(client.blobmanager.delete, item, namespace='MX')
- deferreds.append(d)
- yield gatherResults(deferreds)
+def create_blobs_test(amount, size):
+ group = 'test_legacy_vs_blobs_%d_%dk' % (amount, (size / 1000))
-def create_blobs_test(downloads, size):
@pytest.inlineCallbacks
@pytest.mark.skip(reason="avoid running for all commits")
- @pytest.mark.benchmark(group="test_legacy_vs_blobs")
- def test(soledad_client, txbenchmark_with_setup, payload):
+ @pytest.mark.benchmark(group=group)
+ def test(soledad_client, txbenchmark_with_setup):
client = soledad_client()
blob_payload = payload(size)
@pytest.inlineCallbacks
def setup():
- yield load_up_blobs(client, downloads, blob_payload)
+ yield load_up_blobs(client, amount, blob_payload)
returnValue(soledad_client(force_fresh_db=True))
@pytest.inlineCallbacks
def blobs_pipeline(client):
- pending, incoming = yield download_incoming_blobs(client)
- yield create_local_data(client, incoming, payload)
- yield delete_pending_from_server(client, pending)
+ pending = yield client.blobmanager.remote_list(
+ namespace='MX', filter_flags=Flags.PENDING)
+ yield process_incoming_blobs(client, pending)
+ # reclaim_free_space(client)
yield client.sync()
yield client.blobmanager.send_missing(namespace='payload')