From 7aa5c3755ecbe8a5bbb5c2579a92bd80c815340f Mon Sep 17 00:00:00 2001 From: drebs Date: Fri, 1 Sep 2017 14:16:21 -0300 Subject: [test] refactor and fixes for blobs vs legacy test - create payloads before running tests - use different group names for different amount/sizes - move legacy index creation to setup function - limit concurrency on blobmanager to 2 (same as thread pool size) - add a VACUUM call to observe blobs db size decreasing between tests - properly cleanup server between blobs test runs - refactor blobs pipeline so it makes more sense and is easier to read --- testing/tests/benchmarks/test_legacy_vs_blobs.py | 131 ++++++++++++++--------- 1 file changed, 82 insertions(+), 49 deletions(-) diff --git a/testing/tests/benchmarks/test_legacy_vs_blobs.py b/testing/tests/benchmarks/test_legacy_vs_blobs.py index 89a427f5..5be6f25f 100644 --- a/testing/tests/benchmarks/test_legacy_vs_blobs.py +++ b/testing/tests/benchmarks/test_legacy_vs_blobs.py @@ -47,6 +47,7 @@ import base64 import pytest +import random import sys import treq import uuid @@ -61,10 +62,17 @@ from leap.soledad.common.blobs import Flags from leap.soledad.client._db.blobs import BlobDoc -PARTS_SIZES = { - 'headers': 4000, - 'metadata': 100, - 'flags': 500, +def payload(size): + random.seed(1337) # same seed to avoid different bench results + payload_bytes = bytearray(random.getrandbits(8) for _ in xrange(size)) + # encode as base64 to avoid ascii encode/decode errors + return base64.b64encode(payload_bytes)[:size] # remove b64 overhead + + +PARTS = { + 'headers': payload(4000), + 'metadata': payload(100), + 'flags': payload(500), } @@ -92,12 +100,13 @@ def load_up_legacy(client, amount, content): yield client.sync() +@pytest.inlineCallbacks def process_incoming_docs(client, docs): deferreds = [] for doc in docs: # create fake documents that represent message - for name in PARTS_SIZES.keys(): + for name in PARTS.keys(): d = client.create_doc({name: doc.content[name]}) deferreds.append(d) @@ -114,25 +123,29 @@ def process_incoming_docs(client, docs): yield gatherResults(deferreds) -def create_legacy_test(downloads, size): +def create_legacy_test(amount, size): + group = 'test_legacy_vs_blobs_%d_%dk' % (amount, (size / 1000)) + @pytest.inlineCallbacks @pytest.mark.skip(reason="avoid running for all commits") - @pytest.mark.benchmark(group="test_legacy_vs_blobs") - def test(soledad_client, txbenchmark_with_setup, payload): + @pytest.mark.benchmark(group=group) + def test(soledad_client, txbenchmark_with_setup): client = soledad_client() + # setup the content of initial documents representing incoming emails content = {'content': payload(size), 'incoming': True} - for n, s in PARTS_SIZES.items(): - content[n] = payload(s) + for name, data in PARTS.items(): + content[name] = data @pytest.inlineCallbacks def setup(): - yield load_up_legacy(client, downloads, content) - returnValue(soledad_client(force_fresh_db=True)) + yield load_up_legacy(client, amount, content) + clean_client = soledad_client(force_fresh_db=True) + yield clean_client.create_index('incoming', 'bool(incoming)') + returnValue(clean_client) @pytest.inlineCallbacks def legacy_pipeline(client): - yield client.create_index('incoming', 'bool(incoming)') yield client.sync() docs = yield client.get_from_index('incoming', '1') yield process_incoming_docs(client, docs) @@ -153,8 +166,8 @@ test_legacy_1000_10k = create_legacy_test(1000, 10 * 1000) # "Incoming blobs" mail pipeline: # -# limit the amount of concurrent accesses to some resource -semaphore = DeferredSemaphore(5) +# used to limit the amount of concurrent accesses to the blob manager +semaphore = DeferredSemaphore(2) # deliver data to a user by using the incoming api at given url. @@ -164,73 +177,93 @@ def deliver(url, user_uuid, token, data): return treq.put(uri, headers={'Authorization': auth}, data=BytesIO(data)) -# create a bunch of local documents representing email messages +def reclaim_free_space(client): + return client.blobmanager.local.dbpool.runQuery("VACUUM") + + +@pytest.inlineCallbacks def load_up_blobs(client, amount, data): + # make sure there are no document from previous runs + yield client.sync() + _, docs = yield client.get_all_docs() deferreds = [] + for doc in docs: + d = client.delete_doc(doc) + deferreds.append(d) + yield gatherResults(deferreds) + yield client.sync() + + # delete all payload from blobs db and server + ids = yield client.blobmanager.local_list(namespace='payload') + for blob_id in ids: + yield client.blobmanager.delete(blob_id, namespace='payload') + yield reclaim_free_space(client) + + # create a bunch of incoming blobs + deferreds = [] + semaphore = DeferredSemaphore(100) for i in xrange(amount): d = semaphore.run( deliver, client.server_url, client.uuid, client.token, data) deferreds.append(d) - return gatherResults(deferreds) + yield gatherResults(deferreds) @pytest.inlineCallbacks -def download_incoming_blobs(client): - # get list of pending incoming blobs - pending = yield client.blobmanager.remote_list( - namespace='MX', filter_flags=Flags.PENDING) - - # download incoming blobs +def process_incoming_blobs(client, pending): + # process items deferreds = [] for item in pending: - d = semaphore.run(client.blobmanager.get, item, namespace='MX') + d = process_one_incoming_blob(client, item) deferreds.append(d) - incoming = yield gatherResults(deferreds) - returnValue((pending, incoming)) + yield gatherResults(deferreds) @pytest.inlineCallbacks -def create_local_data(client, incoming, payload): +def process_one_incoming_blob(client, item): + fd = yield semaphore.run( + client.blobmanager.get, item, namespace='MX') + + # create metadata docs deferreds = [] - for item in incoming: - for name, size in PARTS_SIZES.items(): - d = client.create_doc({name: payload(size)}) - deferreds.append(d) - doc = BlobDoc(item, blob_id=uuid.uuid4().hex) - size = sys.getsizeof(item) - d = semaphore.run( - client.blobmanager.put, doc, size, namespace='payload') + for name, data in PARTS.items(): + d = client.create_doc({name: data}) deferreds.append(d) + + # put the incoming blob as it would be done after mail processing + doc = BlobDoc(fd, blob_id=uuid.uuid4().hex) + size = sys.getsizeof(fd) + d = semaphore.run( + client.blobmanager.put, doc, size, namespace='payload') + deferreds.append(d) yield gatherResults(deferreds) + # delete incoming blob + yield semaphore.run( + client.blobmanager.delete, item, namespace='MX') -@pytest.inlineCallbacks -def delete_pending_from_server(client, pending): - deferreds = [] - for item in pending: - d = semaphore.run(client.blobmanager.delete, item, namespace='MX') - deferreds.append(d) - yield gatherResults(deferreds) +def create_blobs_test(amount, size): + group = 'test_legacy_vs_blobs_%d_%dk' % (amount, (size / 1000)) -def create_blobs_test(downloads, size): @pytest.inlineCallbacks @pytest.mark.skip(reason="avoid running for all commits") - @pytest.mark.benchmark(group="test_legacy_vs_blobs") - def test(soledad_client, txbenchmark_with_setup, payload): + @pytest.mark.benchmark(group=group) + def test(soledad_client, txbenchmark_with_setup): client = soledad_client() blob_payload = payload(size) @pytest.inlineCallbacks def setup(): - yield load_up_blobs(client, downloads, blob_payload) + yield load_up_blobs(client, amount, blob_payload) returnValue(soledad_client(force_fresh_db=True)) @pytest.inlineCallbacks def blobs_pipeline(client): - pending, incoming = yield download_incoming_blobs(client) - yield create_local_data(client, incoming, payload) - yield delete_pending_from_server(client, pending) + pending = yield client.blobmanager.remote_list( + namespace='MX', filter_flags=Flags.PENDING) + yield process_incoming_blobs(client, pending) + # reclaim_free_space(client) yield client.sync() yield client.blobmanager.send_missing(namespace='payload') -- cgit v1.2.3