From 1d8b32abc821116da4f3b4f192bc06a931d30076 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 29 Nov 2017 11:42:29 -0200 Subject: [feature] add priorities for blob transfers Closes: #8691 --- src/leap/soledad/client/_db/blobs/__init__.py | 93 ++++++++++-- src/leap/soledad/client/_db/blobs/sql.py | 68 ++++++--- src/leap/soledad/client/_db/blobs/sync.py | 106 ++++++++----- tests/blobs/test_priorities.py | 207 ++++++++++++++++++++++++++ 4 files changed, 411 insertions(+), 63 deletions(-) create mode 100644 tests/blobs/test_priorities.py diff --git a/src/leap/soledad/client/_db/blobs/__init__.py b/src/leap/soledad/client/_db/blobs/__init__.py index 4699e0a0..df801850 100644 --- a/src/leap/soledad/client/_db/blobs/__init__.py +++ b/src/leap/soledad/client/_db/blobs/__init__.py @@ -24,6 +24,7 @@ import os import json import base64 +from collections import defaultdict from io import BytesIO from twisted.logger import Logger @@ -44,6 +45,7 @@ from leap.soledad.client._pipes import TruncatedTailPipe from leap.soledad.client._pipes import PreamblePipe from .sql import SyncStatus +from .sql import Priority from .sql import SQLiteBlobBackend from .sync import BlobsSynchronizer from .errors import ( @@ -139,6 +141,7 @@ class BlobManager(BlobsSynchronizer): self.user = user self._client = HTTPClient(user, token, cert_file) self.semaphore = defer.DeferredSemaphore(self.concurrent_writes_limit) + self.locks = defaultdict(defer.DeferredLock) def close(self): if hasattr(self, 'local') and self.local: @@ -207,7 +210,8 @@ class BlobManager(BlobsSynchronizer): def local_list_status(self, status, namespace=''): return self.local.list_status(status, namespace) - def put(self, doc, size, namespace='', local_only=False): + def put(self, doc, size, namespace='', local_only=False, + priority=Priority.DEFAULT): """ Put a blob in local storage and upload it to server. @@ -220,12 +224,15 @@ class BlobManager(BlobsSynchronizer): :param namespace: Optional parameter to restrict operation to a given namespace. :type namespace: str + + :return: A deferred that fires when the blob has been put. + :rtype: twisted.internet.defer.Deferred """ return self.semaphore.run( - self._put, doc, size, namespace, local_only=local_only) + self._put, doc, size, namespace, local_only, priority) @defer.inlineCallbacks - def _put(self, doc, size, namespace, local_only=False): + def _put(self, doc, size, namespace, local_only, priority): if (yield self.local.exists(doc.blob_id, namespace=namespace)): error_message = "Blob already exists: %s" % doc.blob_id raise BlobAlreadyExistsError(error_message) @@ -233,17 +240,31 @@ class BlobManager(BlobsSynchronizer): # TODO this is a tee really, but ok... could do db and upload # concurrently. not sure if we'd gain something. yield self.local.put(doc.blob_id, fd, size=size, namespace=namespace) + if local_only: yield self.local.update_sync_status( - doc.blob_id, SyncStatus.LOCAL_ONLY) + doc.blob_id, SyncStatus.LOCAL_ONLY, namespace=namespace) defer.returnValue(None) + yield self.local.update_sync_status( - doc.blob_id, SyncStatus.PENDING_UPLOAD) + doc.blob_id, SyncStatus.PENDING_UPLOAD, namespace=namespace, + priority=priority) + yield self._send(doc.blob_id, namespace, 1, 1) + + def _send(self, blob_id, namespace, i, total): + lock = self.locks[blob_id] + d = lock.run(self.__send, blob_id, namespace, i, total) + return d + + @defer.inlineCallbacks + def __send(self, blob_id, namespace, i, total): + logger.info("Sending blob to server (%d/%d): %s" + % (i, total, blob_id)) # In fact, some kind of pipe is needed here, where each write on db # handle gets forwarded into a write on the connection handle - fd = yield self.local.get(doc.blob_id, namespace=namespace) - yield self._encrypt_and_upload(doc.blob_id, fd, namespace=namespace) - yield self.local.update_sync_status(doc.blob_id, SyncStatus.SYNCED) + fd = yield self.local.get(blob_id, namespace=namespace) + yield self._encrypt_and_upload(blob_id, fd) + yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED) def set_flags(self, blob_id, flags, namespace=''): """ @@ -294,7 +315,7 @@ class BlobManager(BlobsSynchronizer): defer.returnValue((yield response.json())) @defer.inlineCallbacks - def get(self, blob_id, namespace=''): + def get(self, blob_id, namespace='', priority=Priority.DEFAULT): """ Get the blob from local storage or, if not available, from the server. @@ -304,6 +325,10 @@ class BlobManager(BlobsSynchronizer): :param namespace: Optional parameter to restrict operation to a given namespace. :type namespace: str + + :return: A deferred that fires with the file descriptor for the + contents of the blob. + :rtype: twisted.internet.defer.Deferred """ local_blob = yield self.local.get(blob_id, namespace=namespace) if local_blob: @@ -311,8 +336,19 @@ class BlobManager(BlobsSynchronizer): defer.returnValue(local_blob) yield self.local.update_sync_status( - blob_id, SyncStatus.PENDING_DOWNLOAD, namespace=namespace) + blob_id, SyncStatus.PENDING_DOWNLOAD, namespace=namespace, + priority=priority) + + fd = yield self._fetch(blob_id, namespace) + defer.returnValue(fd) + + def _fetch(self, blob_id, namespace): + lock = self.locks[blob_id] + d = lock.run(self.__fetch, blob_id, namespace) + return d + @defer.inlineCallbacks + def __fetch(self, blob_id, namespace): try: result = yield self._download_and_decrypt(blob_id, namespace) except Exception as e: @@ -347,6 +383,8 @@ class BlobManager(BlobsSynchronizer): logger.info("Got decrypted blob of type: %s" % type(blob)) blob.seek(0) yield self.local.put(blob_id, blob, size=size, namespace=namespace) + yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED, + namespace=namespace) local_blob = yield self.local.get(blob_id, namespace=namespace) defer.returnValue(local_blob) else: @@ -435,3 +473,38 @@ class BlobManager(BlobsSynchronizer): response = yield self._client.delete(uri, params=params) check_http_status(response.code, blob_id=blob_id) defer.returnValue(response) + + def set_priority(self, blob_id, priority, namespace=''): + """ + Set the transfer priority for a certain blob. + + :param blob_id: Unique identifier of a blob. + :type blob_id: str + :param priority: The priority to be set. + :type priority: int + :param namespace: Optional parameter to restrict operation to a given + namespace. + :type namespace: str + + :return: A deferred that fires after the priority has been set. + :rtype: twisted.internet.defer.Deferred + """ + d = self.local.update_priority(blob_id, priority, namespace=namespace) + return d + + def get_priority(self, blob_id, namespace=''): + """ + Get the transfer priority for a certain blob. + + :param blob_id: Unique identifier of a blob. + :type blob_id: str + :param namespace: Optional parameter to restrict operation to a given + namespace. + :type namespace: str + + :return: A deferred that fires with the current transfer priority of + the blob. + :rtype: twisted.internet.defer.Deferred + """ + d = self.local.get_priority(blob_id, namespace=namespace) + return d diff --git a/src/leap/soledad/client/_db/blobs/sql.py b/src/leap/soledad/client/_db/blobs/sql.py index ebd6c095..a89802d8 100644 --- a/src/leap/soledad/client/_db/blobs/sql.py +++ b/src/leap/soledad/client/_db/blobs/sql.py @@ -46,6 +46,14 @@ class SyncStatus: UNAVAILABLE_STATUSES = (3, 5) +class Priority: + LOW = 1 + MEDIUM = 2 + HIGH = 3 + URGENT = 4 + DEFAULT = 2 + + class SQLiteBlobBackend(object): concurrency_limit = 10 @@ -130,7 +138,7 @@ class SQLiteBlobBackend(object): @defer.inlineCallbacks def list_status(self, sync_status, namespace=''): query = 'select blob_id from sync_state where sync_status = ?' - query += 'AND namespace = ?' + query += 'AND namespace = ? ORDER BY priority DESC' values = (sync_status, namespace,) result = yield self.dbpool.runQuery(query, values) if result: @@ -139,21 +147,42 @@ class SQLiteBlobBackend(object): defer.returnValue([]) @defer.inlineCallbacks - def update_sync_status(self, blob_id, sync_status, namespace=""): - query = 'SELECT sync_status FROM sync_state WHERE blob_id = ?' - result = yield self.dbpool.runQuery(query, (blob_id,)) + def update_sync_status(self, blob_id, sync_status, namespace="", + priority=None): + retries = '(SELECT retries from sync_state' \ + ' WHERE blob_id="%s" and namespace="%s")' \ + % (blob_id, namespace) + if not priority: + priority = '(SELECT priority FROM sync_state' \ + ' WHERE blob_id="%s" AND namespace="%s")' \ + % (blob_id, namespace) + fields = 'blob_id, namespace, sync_status, retries, priority' + markers = '?, ?, ?, %s, %s' % (retries, priority) + values = [blob_id, namespace, sync_status] + insert = 'INSERT or REPLACE INTO sync_state (%s) VALUES (%s)' \ + % (fields, markers) + yield self.dbpool.runOperation(insert, tuple(values)) + @defer.inlineCallbacks + def get_priority(self, blob_id, namespace=""): + query = 'SELECT priority FROM sync_state WHERE blob_id = ?' + result = yield self.dbpool.runQuery(query, (blob_id,)) if not result: - insert = 'INSERT INTO sync_state' - insert += ' (blob_id, namespace, sync_status)' - insert += ' VALUES (?, ?, ?)' - values = (blob_id, namespace, sync_status) - yield self.dbpool.runOperation(insert, values) - return + defer.returnValue(None) + priority = result.pop()[0] + defer.returnValue(priority) - update = 'UPDATE sync_state SET sync_status = ? WHERE blob_id = ?' - values = (sync_status, blob_id,) - result = yield self.dbpool.runOperation(update, values) + @defer.inlineCallbacks + def update_priority(self, blob_id, priority, namespace=""): + old_priority = self.get_priority(blob_id, namespace=namespace) + if not old_priority: + logger.error("Can't update priority of %s: no sync status entry.") + return + if old_priority == priority: + return + update = 'UPDATE sync_state SET priority = ? WHERE blob_id = ?' + values = (priority, blob_id,) + yield self.dbpool.runOperation(update, values) def update_batch_sync_status(self, blob_id_list, sync_status, namespace=''): @@ -161,11 +190,12 @@ class SQLiteBlobBackend(object): return insert = 'INSERT or REPLACE INTO sync_state' first_blob_id, blob_id_list = blob_id_list[0], blob_id_list[1:] - insert += ' (blob_id, namespace, sync_status) VALUES (?, ?, ?)' - values = (first_blob_id, namespace, sync_status) + insert += ' (blob_id, namespace, sync_status, priority)' + insert += ' VALUES (?, ?, ?, ?)' + values = (first_blob_id, namespace, sync_status, Priority.DEFAULT) for blob_id in blob_id_list: - insert += ', (?, ?, ?)' - values += (blob_id, namespace, sync_status) + insert += ', (?, ?, ?, ?)' + values += (blob_id, namespace, sync_status, Priority.DEFAULT) return self.dbpool.runQuery(insert, values) def increment_retries(self, blob_id): @@ -212,9 +242,11 @@ def _init_sync_table(conn): blob_id PRIMARY KEY, namespace TEXT, sync_status INT default %s, + priority INT default %d, retries INT default 0)""" default_status = SyncStatus.PENDING_UPLOAD - maybe_create %= default_status + default_priority = Priority.DEFAULT + maybe_create %= (default_status, default_priority) conn.execute(maybe_create) diff --git a/src/leap/soledad/client/_db/blobs/sync.py b/src/leap/soledad/client/_db/blobs/sync.py index 3ee60305..e6397ede 100644 --- a/src/leap/soledad/client/_db/blobs/sync.py +++ b/src/leap/soledad/client/_db/blobs/sync.py @@ -17,12 +17,15 @@ """ Synchronization between blobs client/server """ +from collections import defaultdict from twisted.internet import defer from twisted.internet import reactor from twisted.logger import Logger from twisted.internet import error from .sql import SyncStatus from .errors import RetriableTransferError + + logger = Logger() @@ -56,6 +59,9 @@ def with_retry(func, *args, **kwargs): class BlobsSynchronizer(object): + def __init__(self): + self.locks = defaultdict(defer.DeferredLock) + @defer.inlineCallbacks def refresh_sync_status_from_server(self, namespace=''): d1 = self.remote_list(namespace=namespace) @@ -82,7 +88,6 @@ class BlobsSynchronizer(object): SyncStatus.SYNCED, namespace=namespace) - @defer.inlineCallbacks def send_missing(self, namespace=''): """ Compare local and remote blobs and send what's missing in server. @@ -90,30 +95,41 @@ class BlobsSynchronizer(object): :param namespace: Optional parameter to restrict operation to a given namespace. :type namespace: str + + :return: A deferred that fires when all local blobs were sent to + server. + :rtype: twisted.internet.defer.Deferred """ - missing = yield self.local.list_status( - SyncStatus.PENDING_UPLOAD, namespace) - total = len(missing) - logger.info("Will send %d blobs to server." % total) - deferreds = [] - semaphore = defer.DeferredSemaphore(self.concurrent_transfers_limit) - - for i in xrange(total): - blob_id = missing.pop() - d = semaphore.run( - with_retry, self.__send_one, blob_id, namespace, i, total) - deferreds.append(d) - yield defer.gatherResults(deferreds, consumeErrors=True) + lock = self.locks['send_missing'] + d = lock.run(self._send_missing, namespace) + return d @defer.inlineCallbacks - def __send_one(self, blob_id, namespace, i, total): - logger.info("Sending blob to server (%d/%d): %s" - % (i, total, blob_id)) - fd = yield self.local.get(blob_id, namespace=namespace) - yield self._encrypt_and_upload(blob_id, fd) - yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED) + def _send_missing(self, namespace): + max_transfers = self.concurrent_transfers_limit + semaphore = defer.DeferredSemaphore(max_transfers) + # the list of blobs should be refreshed often, so we run as many + # concurrent transfers as we can and then refresh the list + while True: + d = self.local_list_status(SyncStatus.PENDING_UPLOAD, namespace) + missing = yield d + + if not missing: + break + + total = len(missing) + now = min(total, max_transfers) + logger.info("There are %d pending blob uploads." % total) + logger.info("Will send %d blobs to server now." % now) + missing = missing[:now] + deferreds = [] + for i in xrange(now): + blob_id = missing.pop(0) + d = semaphore.run( + with_retry, self._send, blob_id, namespace, i, total) + deferreds.append(d) + yield defer.gatherResults(deferreds, consumeErrors=True) - @defer.inlineCallbacks def fetch_missing(self, namespace=''): """ Compare local and remote blobs and fetch what's missing in local @@ -122,21 +138,41 @@ class BlobsSynchronizer(object): :param namespace: Optional parameter to restrict operation to a given namespace. :type namespace: str + + :return: A deferred that fires when all remote blobs were received from + server. + :rtype: twisted.internet.defer.Deferred """ - # TODO: Use something to prioritize user requests over general new docs - d = self.local_list_status(SyncStatus.PENDING_DOWNLOAD, namespace) - docs_we_want = yield d - total = len(docs_we_want) - logger.info("Will fetch %d blobs from server." % total) - deferreds = [] - semaphore = defer.DeferredSemaphore(self.concurrent_transfers_limit) - - for i in xrange(len(docs_we_want)): - blob_id = docs_we_want.pop() - logger.info("Fetching blob (%d/%d): %s" % (i, total, blob_id)) - d = semaphore.run(with_retry, self.get, blob_id, namespace) - deferreds.append(d) - yield defer.gatherResults(deferreds, consumeErrors=True) + lock = self.locks['fetch_missing'] + d = lock.run(self._fetch_missing, namespace) + return d + + @defer.inlineCallbacks + def _fetch_missing(self, namespace=''): + max_transfers = self.concurrent_transfers_limit + semaphore = defer.DeferredSemaphore(max_transfers) + # in order to make sure that transfer priorities will be met, the list + # of blobs to transfer should be refreshed often. What we do is run as + # many concurrent transfers as we can and then refresh the list + while True: + d = self.local_list_status(SyncStatus.PENDING_DOWNLOAD, namespace) + docs_we_want = yield d + + if not docs_we_want: + break + + total = len(docs_we_want) + now = min(total, max_transfers) + logger.info("There are %d pending blob downloads." % total) + logger.info("Will fetch %d blobs from server now." % now) + docs_we_want = docs_we_want[:now] + deferreds = [] + for i in xrange(now): + blob_id = docs_we_want.pop(0) + logger.info("Fetching blob (%d/%d): %s" % (i, now, blob_id)) + d = semaphore.run(with_retry, self._fetch, blob_id, namespace) + deferreds.append(d) + yield defer.gatherResults(deferreds, consumeErrors=True) @defer.inlineCallbacks def sync(self, namespace=''): diff --git a/tests/blobs/test_priorities.py b/tests/blobs/test_priorities.py new file mode 100644 index 00000000..fca582e5 --- /dev/null +++ b/tests/blobs/test_priorities.py @@ -0,0 +1,207 @@ +# -*- coding: utf-8 -*- +# test_priorities.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for transfer priorities. +""" +import pytest + +from io import BytesIO +from mock import Mock +from twisted.internet import defer +from twisted.trial import unittest +from uuid import uuid4 + +from leap.soledad.client._db.blobs import BlobManager +from leap.soledad.client._db.blobs import Priority +from leap.soledad.client._db.blobs import SyncStatus +from leap.soledad.client._document import BlobDoc + + +class BlobPrioritiesTests(unittest.TestCase): + + def setUp(self): + self.cleartext = BytesIO('patriarchy is opression') + self.secret = 'A' * 96 + self.manager = BlobManager( + self.tempdir, '', + 'A' * 32, self.secret, + uuid4().hex, 'token', None) + self.addCleanup(self.manager.close) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_get_sets_default_priority(self): + self.manager._download_and_decrypt = Mock(return_value=None) + missing_blob_id = uuid4().hex + result = yield self.manager.get(missing_blob_id) + self.assertIsNone(result) + priority = yield self.manager.get_priority(missing_blob_id) + self.assertEqual(Priority.DEFAULT, priority) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_get_sets_priority(self): + self.manager._download_and_decrypt = Mock(return_value=None) + missing_blob_id = uuid4().hex + urgent = Priority.URGENT + result = yield self.manager.get(missing_blob_id, priority=urgent) + self.assertIsNone(result) + priority = yield self.manager.get_priority(missing_blob_id) + self.assertEqual(urgent, priority) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_put_sets_default_priority(self): + upload_failure = defer.fail(Exception()) + self.manager._encrypt_and_upload = Mock(return_value=upload_failure) + content, blob_id = "Blob content", uuid4().hex + doc1 = BlobDoc(BytesIO(content), blob_id) + with pytest.raises(Exception): + yield self.manager.put(doc1, len(content)) + priority = yield self.manager.get_priority(blob_id) + self.assertEqual(Priority.DEFAULT, priority) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_put_sets_priority(self): + upload_failure = defer.fail(Exception()) + self.manager._encrypt_and_upload = Mock(return_value=upload_failure) + content, blob_id = "Blob content", uuid4().hex + doc1 = BlobDoc(BytesIO(content), blob_id) + urgent = Priority.URGENT + with pytest.raises(Exception): + yield self.manager.put(doc1, len(content), priority=urgent) + priority = yield self.manager.get_priority(blob_id) + self.assertEqual(urgent, priority) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_set_priority_sets_priority(self): + self.manager._download_and_decrypt = Mock(return_value=None) + missing_blob_id = uuid4().hex + result = yield self.manager.get(missing_blob_id) + self.assertIsNone(result) + urgent = Priority.URGENT + yield self.manager.set_priority(missing_blob_id, urgent) + priority = yield self.manager.get_priority(missing_blob_id) + self.assertEqual(urgent, priority) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_local_list_status_orders_by_priority(self): + self.manager._download_and_decrypt = Mock(return_value=None) + + def _get(priority): + missing_blob_id = uuid4().hex + d = self.manager.get(missing_blob_id, priority=priority) + d.addCallback(lambda _: missing_blob_id) + return d + + # get some blobs in arbitrary order + low = yield _get(Priority.LOW) + high = yield _get(Priority.HIGH) + medium = yield _get(Priority.MEDIUM) + urgent = yield _get(Priority.URGENT) + + # make sure they are ordered by priority + status = SyncStatus.PENDING_DOWNLOAD + pending = yield self.manager.local_list_status(status) + self.assertEqual([urgent, high, medium, low], pending) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_refresh_sync_status_from_server_saves_default_priorities(self): + remote_ids = [uuid4().hex for _ in range(10)] + self.manager.remote_list = Mock(return_value=defer.succeed(remote_ids)) + yield self.manager.refresh_sync_status_from_server() + for blob_id in remote_ids: + priority = yield self.manager.get_priority(blob_id) + self.assertEquals(Priority.DEFAULT, priority) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_fetch_missing_fetches_with_priority(self): + + # pretend we have some pending downloads + deferreds = [ + self.manager.local.update_sync_status( + 'low', SyncStatus.PENDING_DOWNLOAD, + priority=Priority.LOW), + self.manager.local.update_sync_status( + 'high', SyncStatus.PENDING_DOWNLOAD, + priority=Priority.HIGH), + self.manager.local.update_sync_status( + 'medium', SyncStatus.PENDING_DOWNLOAD, + priority=Priority.MEDIUM), + self.manager.local.update_sync_status( + 'urgent', SyncStatus.PENDING_DOWNLOAD, + priority=Priority.URGENT), + ] + yield defer.gatherResults(deferreds) + + # make sure download "succeeds" so fetching works + content = 'vegan muffin' + fd = BytesIO(content) + size = len(content) + self.manager._download_and_decrypt = Mock(return_value=(fd, size)) + self.manager.concurrent_transfers_limit = 1 + + # this is the operation we are interested in + yield self.manager.fetch_missing() + + # retrieve the order in which blob transfers were made + calls = self.manager._download_and_decrypt.mock_calls + order = map(lambda c: c[1][0], calls) + self.assertEqual(['urgent', 'high', 'medium', 'low'], order) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_send_missing_sends_with_priority(self): + + # pretend we have some pending uploads + _send = self.manager._send + self.manager._send = Mock(return_value=None) + content = "vegan cake" + length = len(content) + deferreds = [ + self.manager.put( + BlobDoc(BytesIO(content), 'low'), length, + priority=Priority.LOW), + self.manager.put( + BlobDoc(BytesIO(content), 'high'), length, + priority=Priority.HIGH), + self.manager.put( + BlobDoc(BytesIO(content), 'medium'), length, + priority=Priority.MEDIUM), + self.manager.put( + BlobDoc(BytesIO(content), 'urgent'), length, + priority=Priority.URGENT), + ] + yield defer.gatherResults(deferreds) + + # make sure upload "succeeds" so sending works + self.manager._send = _send + self.manager._encrypt_and_upload = Mock(return_value=None) + + # this is the operation we are interested in + self.manager.concurrent_transfers_limit = 1 + yield self.manager.send_missing() + + # retrieve the order in which blob transfers were made + calls = self.manager._encrypt_and_upload.mock_calls + order = map(lambda c: c[1][0], calls) + self.assertEqual(['urgent', 'high', 'medium', 'low'], order) -- cgit v1.2.3