From d134042f5393fcd45e7f6cfdc27adbff5071be2c Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 30 Aug 2017 16:35:05 -0300 Subject: [test] use semaphore in legacy vs blobs test --- testing/tests/benchmarks/test_legacy_vs_blobs.py | 200 +++++++++++------------ 1 file changed, 93 insertions(+), 107 deletions(-) (limited to 'testing') diff --git a/testing/tests/benchmarks/test_legacy_vs_blobs.py b/testing/tests/benchmarks/test_legacy_vs_blobs.py index 646f7d3f..89a427f5 100644 --- a/testing/tests/benchmarks/test_legacy_vs_blobs.py +++ b/testing/tests/benchmarks/test_legacy_vs_blobs.py @@ -53,7 +53,9 @@ import uuid from io import BytesIO -from twisted.internet.defer import gatherResults, returnValue +from twisted.internet.defer import gatherResults +from twisted.internet.defer import returnValue +from twisted.internet.defer import DeferredSemaphore from leap.soledad.common.blobs import Flags from leap.soledad.client._db.blobs import BlobDoc @@ -90,7 +92,29 @@ def load_up_legacy(client, amount, content): yield client.sync() -def create_legacy(downloads, size): +def process_incoming_docs(client, docs): + deferreds = [] + for doc in docs: + + # create fake documents that represent message + for name in PARTS_SIZES.keys(): + d = client.create_doc({name: doc.content[name]}) + deferreds.append(d) + + # create one document with content + key = 'content' + d = client.create_doc({key: doc.content[key]}) + deferreds.append(d) + + # delete the old incoming document + d = client.delete_doc(doc) + deferreds.append(d) + + # wait for all operatios to succeed + yield gatherResults(deferreds) + + +def create_legacy_test(downloads, size): @pytest.inlineCallbacks @pytest.mark.skip(reason="avoid running for all commits") @pytest.mark.benchmark(group="test_legacy_vs_blobs") @@ -107,40 +131,12 @@ def create_legacy(downloads, size): returnValue(soledad_client(force_fresh_db=True)) @pytest.inlineCallbacks - def legacy_pipeline(clean_client): - - # create indexes so we can later retrieve incoming docs - yield clean_client.create_index('incoming', 'bool(incoming)') - - # receive all documents from server - yield clean_client.sync() - - # get incoming documents - docs = yield clean_client.get_from_index('incoming', '1') - - # process incoming documents - deferreds = [] - for doc in docs: - - # create fake documents that represent message - for name in PARTS_SIZES.keys(): - d = clean_client.create_doc({name: doc.content[name]}) - deferreds.append(d) - - # create one document with content - key = 'content' - d = clean_client.create_doc({key: doc.content[key]}) - deferreds.append(d) - - # delete the old incoming document - d = clean_client.delete_doc(doc) - deferreds.append(d) - - # wait for all operatios to succeed - yield gatherResults(deferreds) - - # sync new documents back to server - yield clean_client.sync() + def legacy_pipeline(client): + yield client.create_index('incoming', 'bool(incoming)') + yield client.sync() + docs = yield client.get_from_index('incoming', '1') + yield process_incoming_docs(client, docs) + yield client.sync() yield txbenchmark_with_setup(setup, legacy_pipeline) return test @@ -148,41 +144,76 @@ def create_legacy(downloads, size): # ATTENTION: update the documentation in ../docs/benchmarks.rst if you change # the number of docs or the doc sizes for the tests below. -test_legacy_10_1000k = create_legacy(10, 1000 * 1000) -test_legacy_100_100k = create_legacy(100, 100 * 1000) -test_legacy_1000_10k = create_legacy(1000, 10 * 1000) +test_legacy_10_1000k = create_legacy_test(10, 1000 * 1000) +test_legacy_100_100k = create_legacy_test(100, 100 * 1000) +test_legacy_1000_10k = create_legacy_test(1000, 10 * 1000) # # "Incoming blobs" mail pipeline: # -@pytest.inlineCallbacks -def deliver(url, user_uuid, token, payload): +# limit the amount of concurrent accesses to some resource +semaphore = DeferredSemaphore(5) + + +# deliver data to a user by using the incoming api at given url. +def deliver(url, user_uuid, token, data): auth = 'Token %s' % base64.b64encode('%s:%s' % (user_uuid, token)) uri = "%s/incoming/%s/%s?namespace=MX" % (url, user_uuid, uuid.uuid4().hex) - yield treq.put(uri, headers={'Authorization': auth}, - data=BytesIO(payload)) + return treq.put(uri, headers={'Authorization': auth}, data=BytesIO(data)) -def should_brake(i): - return ((i + 1) % 5) == 0 +# create a bunch of local documents representing email messages +def load_up_blobs(client, amount, data): + deferreds = [] + for i in xrange(amount): + d = semaphore.run( + deliver, client.server_url, client.uuid, client.token, data) + deferreds.append(d) + return gatherResults(deferreds) @pytest.inlineCallbacks -def load_up_blobs(client, amount, payload): - # create a bunch of local documents representing email messages +def download_incoming_blobs(client): + # get list of pending incoming blobs + pending = yield client.blobmanager.remote_list( + namespace='MX', filter_flags=Flags.PENDING) + + # download incoming blobs deferreds = [] - for i in xrange(amount): - d = deliver(client.server_url, client.uuid, client.token, payload) + for item in pending: + d = semaphore.run(client.blobmanager.get, item, namespace='MX') + deferreds.append(d) + incoming = yield gatherResults(deferreds) + returnValue((pending, incoming)) + + +@pytest.inlineCallbacks +def create_local_data(client, incoming, payload): + deferreds = [] + for item in incoming: + for name, size in PARTS_SIZES.items(): + d = client.create_doc({name: payload(size)}) + deferreds.append(d) + doc = BlobDoc(item, blob_id=uuid.uuid4().hex) + size = sys.getsizeof(item) + d = semaphore.run( + client.blobmanager.put, doc, size, namespace='payload') deferreds.append(d) - if should_brake(i): - yield gatherResults(deferreds) - deferreds = [] yield gatherResults(deferreds) -def create_blobs(downloads, size): +@pytest.inlineCallbacks +def delete_pending_from_server(client, pending): + deferreds = [] + for item in pending: + d = semaphore.run(client.blobmanager.delete, item, namespace='MX') + deferreds.append(d) + yield gatherResults(deferreds) + + +def create_blobs_test(downloads, size): @pytest.inlineCallbacks @pytest.mark.skip(reason="avoid running for all commits") @pytest.mark.benchmark(group="test_legacy_vs_blobs") @@ -196,57 +227,12 @@ def create_blobs(downloads, size): returnValue(soledad_client(force_fresh_db=True)) @pytest.inlineCallbacks - def blobs_pipeline(clean_client): - - # get list of pending incoming blobs - pending = yield clean_client.blobmanager.remote_list( - namespace='MX', filter_flags=Flags.PENDING) - - # download incoming blobs - deferreds = [] - incoming = [] - i = 0 - for item in pending: - d = clean_client.blobmanager.get(item, namespace='MX') - deferreds.append(d) - if should_brake(i): - incoming += yield gatherResults(deferreds) - deferreds = [] - i += 1 - incoming += yield gatherResults(deferreds) - - # create data on local client - deferreds = [] - i = 0 - for item in incoming: - for name, size in PARTS_SIZES.items(): - d = clean_client.create_doc({name: payload(size)}) - deferreds.append(d) - doc = BlobDoc(item, blob_id=uuid.uuid4().hex) - size = sys.getsizeof(item) - d = clean_client.blobmanager.put( - doc, size, namespace='payload') - deferreds.append(d) - if should_brake(i): - gatherResults(deferreds) - deferreds = [] - i += 1 - yield gatherResults(deferreds) - - # delete incoming from server - deferreds = [] - for item in pending: - d = clean_client.blobmanager.delete(item, namespace='MX') - deferreds.append(d) - yield gatherResults(deferreds) - - # sync and send blobs in parallel - deferreds = [] - d = clean_client.sync() - deferreds.append(d) - d = clean_client.blobmanager.send_missing(namespace='payload') - deferreds.append(d) - yield gatherResults(deferreds) + def blobs_pipeline(client): + pending, incoming = yield download_incoming_blobs(client) + yield create_local_data(client, incoming, payload) + yield delete_pending_from_server(client, pending) + yield client.sync() + yield client.blobmanager.send_missing(namespace='payload') yield txbenchmark_with_setup(setup, blobs_pipeline) return test @@ -254,6 +240,6 @@ def create_blobs(downloads, size): # ATTENTION: update the documentation in ../docs/benchmarks.rst if you change # the number of docs or the doc sizes for the tests below. -test_blobs_10_1000k = create_blobs(10, 1000 * 1000) -test_blobs_100_100k = create_blobs(100, 100 * 1000) -test_blobs_1000_10k = create_blobs(1000, 10 * 1000) +test_blobs_10_1000k = create_blobs_test(10, 1000 * 1000) +test_blobs_100_100k = create_blobs_test(100, 100 * 1000) +test_blobs_1000_10k = create_blobs_test(1000, 10 * 1000) -- cgit v1.2.3