summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordrebs <drebs@riseup.net>2017-08-30 16:35:05 -0300
committerdrebs <drebs@riseup.net>2017-08-31 10:48:17 -0300
commitd134042f5393fcd45e7f6cfdc27adbff5071be2c (patch)
tree515d720ee3f1b0c7703af7775fc0535e06bfd241
parentdff0dd3331b979e013e63d52d8e6ab285df3094b (diff)
[test] use semaphore in legacy vs blobs test
-rw-r--r--testing/tests/benchmarks/test_legacy_vs_blobs.py200
1 files changed, 93 insertions, 107 deletions
diff --git a/testing/tests/benchmarks/test_legacy_vs_blobs.py b/testing/tests/benchmarks/test_legacy_vs_blobs.py
index 646f7d3f..89a427f5 100644
--- a/testing/tests/benchmarks/test_legacy_vs_blobs.py
+++ b/testing/tests/benchmarks/test_legacy_vs_blobs.py
@@ -53,7 +53,9 @@ import uuid
from io import BytesIO
-from twisted.internet.defer import gatherResults, returnValue
+from twisted.internet.defer import gatherResults
+from twisted.internet.defer import returnValue
+from twisted.internet.defer import DeferredSemaphore
from leap.soledad.common.blobs import Flags
from leap.soledad.client._db.blobs import BlobDoc
@@ -90,7 +92,29 @@ def load_up_legacy(client, amount, content):
yield client.sync()
-def create_legacy(downloads, size):
+def process_incoming_docs(client, docs):
+ deferreds = []
+ for doc in docs:
+
+ # create fake documents that represent message
+ for name in PARTS_SIZES.keys():
+ d = client.create_doc({name: doc.content[name]})
+ deferreds.append(d)
+
+ # create one document with content
+ key = 'content'
+ d = client.create_doc({key: doc.content[key]})
+ deferreds.append(d)
+
+ # delete the old incoming document
+ d = client.delete_doc(doc)
+ deferreds.append(d)
+
+ # wait for all operatios to succeed
+ yield gatherResults(deferreds)
+
+
+def create_legacy_test(downloads, size):
@pytest.inlineCallbacks
@pytest.mark.skip(reason="avoid running for all commits")
@pytest.mark.benchmark(group="test_legacy_vs_blobs")
@@ -107,40 +131,12 @@ def create_legacy(downloads, size):
returnValue(soledad_client(force_fresh_db=True))
@pytest.inlineCallbacks
- def legacy_pipeline(clean_client):
-
- # create indexes so we can later retrieve incoming docs
- yield clean_client.create_index('incoming', 'bool(incoming)')
-
- # receive all documents from server
- yield clean_client.sync()
-
- # get incoming documents
- docs = yield clean_client.get_from_index('incoming', '1')
-
- # process incoming documents
- deferreds = []
- for doc in docs:
-
- # create fake documents that represent message
- for name in PARTS_SIZES.keys():
- d = clean_client.create_doc({name: doc.content[name]})
- deferreds.append(d)
-
- # create one document with content
- key = 'content'
- d = clean_client.create_doc({key: doc.content[key]})
- deferreds.append(d)
-
- # delete the old incoming document
- d = clean_client.delete_doc(doc)
- deferreds.append(d)
-
- # wait for all operatios to succeed
- yield gatherResults(deferreds)
-
- # sync new documents back to server
- yield clean_client.sync()
+ def legacy_pipeline(client):
+ yield client.create_index('incoming', 'bool(incoming)')
+ yield client.sync()
+ docs = yield client.get_from_index('incoming', '1')
+ yield process_incoming_docs(client, docs)
+ yield client.sync()
yield txbenchmark_with_setup(setup, legacy_pipeline)
return test
@@ -148,41 +144,76 @@ def create_legacy(downloads, size):
# ATTENTION: update the documentation in ../docs/benchmarks.rst if you change
# the number of docs or the doc sizes for the tests below.
-test_legacy_10_1000k = create_legacy(10, 1000 * 1000)
-test_legacy_100_100k = create_legacy(100, 100 * 1000)
-test_legacy_1000_10k = create_legacy(1000, 10 * 1000)
+test_legacy_10_1000k = create_legacy_test(10, 1000 * 1000)
+test_legacy_100_100k = create_legacy_test(100, 100 * 1000)
+test_legacy_1000_10k = create_legacy_test(1000, 10 * 1000)
#
# "Incoming blobs" mail pipeline:
#
-@pytest.inlineCallbacks
-def deliver(url, user_uuid, token, payload):
+# limit the amount of concurrent accesses to some resource
+semaphore = DeferredSemaphore(5)
+
+
+# deliver data to a user by using the incoming api at given url.
+def deliver(url, user_uuid, token, data):
auth = 'Token %s' % base64.b64encode('%s:%s' % (user_uuid, token))
uri = "%s/incoming/%s/%s?namespace=MX" % (url, user_uuid, uuid.uuid4().hex)
- yield treq.put(uri, headers={'Authorization': auth},
- data=BytesIO(payload))
+ return treq.put(uri, headers={'Authorization': auth}, data=BytesIO(data))
-def should_brake(i):
- return ((i + 1) % 5) == 0
+# create a bunch of local documents representing email messages
+def load_up_blobs(client, amount, data):
+ deferreds = []
+ for i in xrange(amount):
+ d = semaphore.run(
+ deliver, client.server_url, client.uuid, client.token, data)
+ deferreds.append(d)
+ return gatherResults(deferreds)
@pytest.inlineCallbacks
-def load_up_blobs(client, amount, payload):
- # create a bunch of local documents representing email messages
+def download_incoming_blobs(client):
+ # get list of pending incoming blobs
+ pending = yield client.blobmanager.remote_list(
+ namespace='MX', filter_flags=Flags.PENDING)
+
+ # download incoming blobs
deferreds = []
- for i in xrange(amount):
- d = deliver(client.server_url, client.uuid, client.token, payload)
+ for item in pending:
+ d = semaphore.run(client.blobmanager.get, item, namespace='MX')
+ deferreds.append(d)
+ incoming = yield gatherResults(deferreds)
+ returnValue((pending, incoming))
+
+
+@pytest.inlineCallbacks
+def create_local_data(client, incoming, payload):
+ deferreds = []
+ for item in incoming:
+ for name, size in PARTS_SIZES.items():
+ d = client.create_doc({name: payload(size)})
+ deferreds.append(d)
+ doc = BlobDoc(item, blob_id=uuid.uuid4().hex)
+ size = sys.getsizeof(item)
+ d = semaphore.run(
+ client.blobmanager.put, doc, size, namespace='payload')
deferreds.append(d)
- if should_brake(i):
- yield gatherResults(deferreds)
- deferreds = []
yield gatherResults(deferreds)
-def create_blobs(downloads, size):
+@pytest.inlineCallbacks
+def delete_pending_from_server(client, pending):
+ deferreds = []
+ for item in pending:
+ d = semaphore.run(client.blobmanager.delete, item, namespace='MX')
+ deferreds.append(d)
+ yield gatherResults(deferreds)
+
+
+def create_blobs_test(downloads, size):
@pytest.inlineCallbacks
@pytest.mark.skip(reason="avoid running for all commits")
@pytest.mark.benchmark(group="test_legacy_vs_blobs")
@@ -196,57 +227,12 @@ def create_blobs(downloads, size):
returnValue(soledad_client(force_fresh_db=True))
@pytest.inlineCallbacks
- def blobs_pipeline(clean_client):
-
- # get list of pending incoming blobs
- pending = yield clean_client.blobmanager.remote_list(
- namespace='MX', filter_flags=Flags.PENDING)
-
- # download incoming blobs
- deferreds = []
- incoming = []
- i = 0
- for item in pending:
- d = clean_client.blobmanager.get(item, namespace='MX')
- deferreds.append(d)
- if should_brake(i):
- incoming += yield gatherResults(deferreds)
- deferreds = []
- i += 1
- incoming += yield gatherResults(deferreds)
-
- # create data on local client
- deferreds = []
- i = 0
- for item in incoming:
- for name, size in PARTS_SIZES.items():
- d = clean_client.create_doc({name: payload(size)})
- deferreds.append(d)
- doc = BlobDoc(item, blob_id=uuid.uuid4().hex)
- size = sys.getsizeof(item)
- d = clean_client.blobmanager.put(
- doc, size, namespace='payload')
- deferreds.append(d)
- if should_brake(i):
- gatherResults(deferreds)
- deferreds = []
- i += 1
- yield gatherResults(deferreds)
-
- # delete incoming from server
- deferreds = []
- for item in pending:
- d = clean_client.blobmanager.delete(item, namespace='MX')
- deferreds.append(d)
- yield gatherResults(deferreds)
-
- # sync and send blobs in parallel
- deferreds = []
- d = clean_client.sync()
- deferreds.append(d)
- d = clean_client.blobmanager.send_missing(namespace='payload')
- deferreds.append(d)
- yield gatherResults(deferreds)
+ def blobs_pipeline(client):
+ pending, incoming = yield download_incoming_blobs(client)
+ yield create_local_data(client, incoming, payload)
+ yield delete_pending_from_server(client, pending)
+ yield client.sync()
+ yield client.blobmanager.send_missing(namespace='payload')
yield txbenchmark_with_setup(setup, blobs_pipeline)
return test
@@ -254,6 +240,6 @@ def create_blobs(downloads, size):
# ATTENTION: update the documentation in ../docs/benchmarks.rst if you change
# the number of docs or the doc sizes for the tests below.
-test_blobs_10_1000k = create_blobs(10, 1000 * 1000)
-test_blobs_100_100k = create_blobs(100, 100 * 1000)
-test_blobs_1000_10k = create_blobs(1000, 10 * 1000)
+test_blobs_10_1000k = create_blobs_test(10, 1000 * 1000)
+test_blobs_100_100k = create_blobs_test(100, 100 * 1000)
+test_blobs_1000_10k = create_blobs_test(1000, 10 * 1000)