diff options
Diffstat (limited to 'testing/tests')
| -rw-r--r-- | testing/tests/benchmarks/test_legacy_vs_blobs.py | 200 | 
1 files changed, 93 insertions, 107 deletions
| diff --git a/testing/tests/benchmarks/test_legacy_vs_blobs.py b/testing/tests/benchmarks/test_legacy_vs_blobs.py index 646f7d3f..89a427f5 100644 --- a/testing/tests/benchmarks/test_legacy_vs_blobs.py +++ b/testing/tests/benchmarks/test_legacy_vs_blobs.py @@ -53,7 +53,9 @@ import uuid  from io import BytesIO -from twisted.internet.defer import gatherResults, returnValue +from twisted.internet.defer import gatherResults +from twisted.internet.defer import returnValue +from twisted.internet.defer import DeferredSemaphore  from leap.soledad.common.blobs import Flags  from leap.soledad.client._db.blobs import BlobDoc @@ -90,7 +92,29 @@ def load_up_legacy(client, amount, content):      yield client.sync() -def create_legacy(downloads, size): +def process_incoming_docs(client, docs): +    deferreds = [] +    for doc in docs: + +        # create fake documents that represent message +        for name in PARTS_SIZES.keys(): +            d = client.create_doc({name: doc.content[name]}) +            deferreds.append(d) + +        # create one document with content +        key = 'content' +        d = client.create_doc({key: doc.content[key]}) +        deferreds.append(d) + +        # delete the old incoming document +        d = client.delete_doc(doc) +        deferreds.append(d) + +    # wait for all operatios to succeed +    yield gatherResults(deferreds) + + +def create_legacy_test(downloads, size):      @pytest.inlineCallbacks      @pytest.mark.skip(reason="avoid running for all commits")      @pytest.mark.benchmark(group="test_legacy_vs_blobs") @@ -107,40 +131,12 @@ def create_legacy(downloads, size):              returnValue(soledad_client(force_fresh_db=True))          @pytest.inlineCallbacks -        def legacy_pipeline(clean_client): - -            # create indexes so we can later retrieve incoming docs -            yield clean_client.create_index('incoming', 'bool(incoming)') - -            # receive all documents from server -            yield clean_client.sync() - -            # get incoming documents -            docs = yield clean_client.get_from_index('incoming', '1') - -            # process incoming documents -            deferreds = [] -            for doc in docs: - -                # create fake documents that represent message -                for name in PARTS_SIZES.keys(): -                    d = clean_client.create_doc({name: doc.content[name]}) -                    deferreds.append(d) - -                # create one document with content -                key = 'content' -                d = clean_client.create_doc({key: doc.content[key]}) -                deferreds.append(d) - -                # delete the old incoming document -                d = clean_client.delete_doc(doc) -                deferreds.append(d) - -            # wait for all operatios to succeed -            yield gatherResults(deferreds) - -            # sync new documents back to server -            yield clean_client.sync() +        def legacy_pipeline(client): +            yield client.create_index('incoming', 'bool(incoming)') +            yield client.sync() +            docs = yield client.get_from_index('incoming', '1') +            yield process_incoming_docs(client, docs) +            yield client.sync()          yield txbenchmark_with_setup(setup, legacy_pipeline)      return test @@ -148,41 +144,76 @@ def create_legacy(downloads, size):  # ATTENTION: update the documentation in ../docs/benchmarks.rst if you change  # the number of docs or the doc sizes for the tests below. -test_legacy_10_1000k = create_legacy(10, 1000 * 1000) -test_legacy_100_100k = create_legacy(100, 100 * 1000) -test_legacy_1000_10k = create_legacy(1000, 10 * 1000) +test_legacy_10_1000k = create_legacy_test(10, 1000 * 1000) +test_legacy_100_100k = create_legacy_test(100, 100 * 1000) +test_legacy_1000_10k = create_legacy_test(1000, 10 * 1000)  #  # "Incoming blobs" mail pipeline:  # -@pytest.inlineCallbacks -def deliver(url, user_uuid, token, payload): +# limit the amount of concurrent accesses to some resource +semaphore = DeferredSemaphore(5) + + +# deliver data to a user by using the incoming api at given url. +def deliver(url, user_uuid, token, data):      auth = 'Token %s' % base64.b64encode('%s:%s' % (user_uuid, token))      uri = "%s/incoming/%s/%s?namespace=MX" % (url, user_uuid, uuid.uuid4().hex) -    yield treq.put(uri, headers={'Authorization': auth}, -                   data=BytesIO(payload)) +    return treq.put(uri, headers={'Authorization': auth}, data=BytesIO(data)) -def should_brake(i): -    return ((i + 1) % 5) == 0 +# create a bunch of local documents representing email messages +def load_up_blobs(client, amount, data): +    deferreds = [] +    for i in xrange(amount): +        d = semaphore.run( +            deliver, client.server_url, client.uuid, client.token, data) +        deferreds.append(d) +    return gatherResults(deferreds)  @pytest.inlineCallbacks -def load_up_blobs(client, amount, payload): -    # create a bunch of local documents representing email messages +def download_incoming_blobs(client): +    # get list of pending incoming blobs +    pending = yield client.blobmanager.remote_list( +        namespace='MX', filter_flags=Flags.PENDING) + +    # download incoming blobs      deferreds = [] -    for i in xrange(amount): -        d = deliver(client.server_url, client.uuid, client.token, payload) +    for item in pending: +        d = semaphore.run(client.blobmanager.get, item, namespace='MX') +        deferreds.append(d) +    incoming = yield gatherResults(deferreds) +    returnValue((pending, incoming)) + + +@pytest.inlineCallbacks +def create_local_data(client, incoming, payload): +    deferreds = [] +    for item in incoming: +        for name, size in PARTS_SIZES.items(): +            d = client.create_doc({name: payload(size)}) +            deferreds.append(d) +        doc = BlobDoc(item, blob_id=uuid.uuid4().hex) +        size = sys.getsizeof(item) +        d = semaphore.run( +            client.blobmanager.put, doc, size, namespace='payload')          deferreds.append(d) -        if should_brake(i): -            yield gatherResults(deferreds) -            deferreds = []      yield gatherResults(deferreds) -def create_blobs(downloads, size): +@pytest.inlineCallbacks +def delete_pending_from_server(client, pending): +    deferreds = [] +    for item in pending: +        d = semaphore.run(client.blobmanager.delete, item, namespace='MX') +        deferreds.append(d) +    yield gatherResults(deferreds) + + +def create_blobs_test(downloads, size):      @pytest.inlineCallbacks      @pytest.mark.skip(reason="avoid running for all commits")      @pytest.mark.benchmark(group="test_legacy_vs_blobs") @@ -196,57 +227,12 @@ def create_blobs(downloads, size):              returnValue(soledad_client(force_fresh_db=True))          @pytest.inlineCallbacks -        def blobs_pipeline(clean_client): - -            # get list of pending incoming blobs -            pending = yield clean_client.blobmanager.remote_list( -                namespace='MX', filter_flags=Flags.PENDING) - -            # download incoming blobs -            deferreds = [] -            incoming = [] -            i = 0 -            for item in pending: -                d = clean_client.blobmanager.get(item, namespace='MX') -                deferreds.append(d) -                if should_brake(i): -                    incoming += yield gatherResults(deferreds) -                    deferreds = [] -                i += 1 -            incoming += yield gatherResults(deferreds) - -            # create data on local client -            deferreds = [] -            i = 0 -            for item in incoming: -                for name, size in PARTS_SIZES.items(): -                    d = clean_client.create_doc({name: payload(size)}) -                    deferreds.append(d) -                doc = BlobDoc(item, blob_id=uuid.uuid4().hex) -                size = sys.getsizeof(item) -                d = clean_client.blobmanager.put( -                    doc, size, namespace='payload') -                deferreds.append(d) -                if should_brake(i): -                    gatherResults(deferreds) -                    deferreds = [] -                i += 1 -            yield gatherResults(deferreds) - -            # delete incoming from server -            deferreds = [] -            for item in pending: -                d = clean_client.blobmanager.delete(item, namespace='MX') -                deferreds.append(d) -            yield gatherResults(deferreds) - -            # sync and send blobs in parallel -            deferreds = [] -            d = clean_client.sync() -            deferreds.append(d) -            d = clean_client.blobmanager.send_missing(namespace='payload') -            deferreds.append(d) -            yield gatherResults(deferreds) +        def blobs_pipeline(client): +            pending, incoming = yield download_incoming_blobs(client) +            yield create_local_data(client, incoming, payload) +            yield delete_pending_from_server(client, pending) +            yield client.sync() +            yield client.blobmanager.send_missing(namespace='payload')          yield txbenchmark_with_setup(setup, blobs_pipeline)      return test @@ -254,6 +240,6 @@ def create_blobs(downloads, size):  # ATTENTION: update the documentation in ../docs/benchmarks.rst if you change  # the number of docs or the doc sizes for the tests below. -test_blobs_10_1000k = create_blobs(10, 1000 * 1000) -test_blobs_100_100k = create_blobs(100, 100 * 1000) -test_blobs_1000_10k = create_blobs(1000, 10 * 1000) +test_blobs_10_1000k = create_blobs_test(10, 1000 * 1000) +test_blobs_100_100k = create_blobs_test(100, 100 * 1000) +test_blobs_1000_10k = create_blobs_test(1000, 10 * 1000) | 
