diff options
| -rw-r--r-- | testing/tests/benchmarks/test_legacy_vs_blobs.py | 131 | 
1 files changed, 82 insertions, 49 deletions
| diff --git a/testing/tests/benchmarks/test_legacy_vs_blobs.py b/testing/tests/benchmarks/test_legacy_vs_blobs.py index 89a427f5..5be6f25f 100644 --- a/testing/tests/benchmarks/test_legacy_vs_blobs.py +++ b/testing/tests/benchmarks/test_legacy_vs_blobs.py @@ -47,6 +47,7 @@  import base64  import pytest +import random  import sys  import treq  import uuid @@ -61,10 +62,17 @@ from leap.soledad.common.blobs import Flags  from leap.soledad.client._db.blobs import BlobDoc -PARTS_SIZES = { -    'headers': 4000, -    'metadata': 100, -    'flags': 500, +def payload(size): +    random.seed(1337)  # same seed to avoid different bench results +    payload_bytes = bytearray(random.getrandbits(8) for _ in xrange(size)) +    # encode as base64 to avoid ascii encode/decode errors +    return base64.b64encode(payload_bytes)[:size]  # remove b64 overhead + + +PARTS = { +    'headers': payload(4000), +    'metadata': payload(100), +    'flags': payload(500),  } @@ -92,12 +100,13 @@ def load_up_legacy(client, amount, content):      yield client.sync() +@pytest.inlineCallbacks  def process_incoming_docs(client, docs):      deferreds = []      for doc in docs:          # create fake documents that represent message -        for name in PARTS_SIZES.keys(): +        for name in PARTS.keys():              d = client.create_doc({name: doc.content[name]})              deferreds.append(d) @@ -114,25 +123,29 @@ def process_incoming_docs(client, docs):      yield gatherResults(deferreds) -def create_legacy_test(downloads, size): +def create_legacy_test(amount, size): +    group = 'test_legacy_vs_blobs_%d_%dk' % (amount, (size / 1000)) +      @pytest.inlineCallbacks      @pytest.mark.skip(reason="avoid running for all commits") -    @pytest.mark.benchmark(group="test_legacy_vs_blobs") -    def test(soledad_client, txbenchmark_with_setup, payload): +    @pytest.mark.benchmark(group=group) +    def test(soledad_client, txbenchmark_with_setup):          client = soledad_client() +        # setup the content of initial documents representing incoming emails          content = {'content': payload(size), 'incoming': True} -        for n, s in PARTS_SIZES.items(): -            content[n] = payload(s) +        for name, data in PARTS.items(): +            content[name] = data          @pytest.inlineCallbacks          def setup(): -            yield load_up_legacy(client, downloads, content) -            returnValue(soledad_client(force_fresh_db=True)) +            yield load_up_legacy(client, amount, content) +            clean_client = soledad_client(force_fresh_db=True) +            yield clean_client.create_index('incoming', 'bool(incoming)') +            returnValue(clean_client)          @pytest.inlineCallbacks          def legacy_pipeline(client): -            yield client.create_index('incoming', 'bool(incoming)')              yield client.sync()              docs = yield client.get_from_index('incoming', '1')              yield process_incoming_docs(client, docs) @@ -153,8 +166,8 @@ test_legacy_1000_10k = create_legacy_test(1000, 10 * 1000)  # "Incoming blobs" mail pipeline:  # -# limit the amount of concurrent accesses to some resource -semaphore = DeferredSemaphore(5) +# used to limit the amount of concurrent accesses to the blob manager +semaphore = DeferredSemaphore(2)  # deliver data to a user by using the incoming api at given url. @@ -164,73 +177,93 @@ def deliver(url, user_uuid, token, data):      return treq.put(uri, headers={'Authorization': auth}, data=BytesIO(data)) -# create a bunch of local documents representing email messages +def reclaim_free_space(client): +    return client.blobmanager.local.dbpool.runQuery("VACUUM") + + +@pytest.inlineCallbacks  def load_up_blobs(client, amount, data): +    # make sure there are no document from previous runs +    yield client.sync() +    _, docs = yield client.get_all_docs()      deferreds = [] +    for doc in docs: +        d = client.delete_doc(doc) +        deferreds.append(d) +    yield gatherResults(deferreds) +    yield client.sync() + +    # delete all payload from blobs db and server +    ids = yield client.blobmanager.local_list(namespace='payload') +    for blob_id in ids: +        yield client.blobmanager.delete(blob_id, namespace='payload') +    yield reclaim_free_space(client) + +    # create a bunch of incoming blobs +    deferreds = [] +    semaphore = DeferredSemaphore(100)      for i in xrange(amount):          d = semaphore.run(              deliver, client.server_url, client.uuid, client.token, data)          deferreds.append(d) -    return gatherResults(deferreds) +    yield gatherResults(deferreds)  @pytest.inlineCallbacks -def download_incoming_blobs(client): -    # get list of pending incoming blobs -    pending = yield client.blobmanager.remote_list( -        namespace='MX', filter_flags=Flags.PENDING) - -    # download incoming blobs +def process_incoming_blobs(client, pending): +    # process items      deferreds = []      for item in pending: -        d = semaphore.run(client.blobmanager.get, item, namespace='MX') +        d = process_one_incoming_blob(client, item)          deferreds.append(d) -    incoming = yield gatherResults(deferreds) -    returnValue((pending, incoming)) +    yield gatherResults(deferreds)  @pytest.inlineCallbacks -def create_local_data(client, incoming, payload): +def process_one_incoming_blob(client, item): +    fd = yield semaphore.run( +        client.blobmanager.get, item, namespace='MX') + +    # create metadata docs      deferreds = [] -    for item in incoming: -        for name, size in PARTS_SIZES.items(): -            d = client.create_doc({name: payload(size)}) -            deferreds.append(d) -        doc = BlobDoc(item, blob_id=uuid.uuid4().hex) -        size = sys.getsizeof(item) -        d = semaphore.run( -            client.blobmanager.put, doc, size, namespace='payload') +    for name, data in PARTS.items(): +        d = client.create_doc({name: data})          deferreds.append(d) + +    # put the incoming blob as it would be done after mail processing +    doc = BlobDoc(fd, blob_id=uuid.uuid4().hex) +    size = sys.getsizeof(fd) +    d = semaphore.run( +        client.blobmanager.put, doc, size, namespace='payload') +    deferreds.append(d)      yield gatherResults(deferreds) +    # delete incoming blob +    yield semaphore.run( +        client.blobmanager.delete, item, namespace='MX') -@pytest.inlineCallbacks -def delete_pending_from_server(client, pending): -    deferreds = [] -    for item in pending: -        d = semaphore.run(client.blobmanager.delete, item, namespace='MX') -        deferreds.append(d) -    yield gatherResults(deferreds) +def create_blobs_test(amount, size): +    group = 'test_legacy_vs_blobs_%d_%dk' % (amount, (size / 1000)) -def create_blobs_test(downloads, size):      @pytest.inlineCallbacks      @pytest.mark.skip(reason="avoid running for all commits") -    @pytest.mark.benchmark(group="test_legacy_vs_blobs") -    def test(soledad_client, txbenchmark_with_setup, payload): +    @pytest.mark.benchmark(group=group) +    def test(soledad_client, txbenchmark_with_setup):          client = soledad_client()          blob_payload = payload(size)          @pytest.inlineCallbacks          def setup(): -            yield load_up_blobs(client, downloads, blob_payload) +            yield load_up_blobs(client, amount, blob_payload)              returnValue(soledad_client(force_fresh_db=True))          @pytest.inlineCallbacks          def blobs_pipeline(client): -            pending, incoming = yield download_incoming_blobs(client) -            yield create_local_data(client, incoming, payload) -            yield delete_pending_from_server(client, pending) +            pending = yield client.blobmanager.remote_list( +                namespace='MX', filter_flags=Flags.PENDING) +            yield process_incoming_blobs(client, pending) +            # reclaim_free_space(client)              yield client.sync()              yield client.blobmanager.send_missing(namespace='payload') | 
