summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2017-11-29 11:42:29 -0200
committerdrebs <drebs@leap.se>2017-11-30 12:42:39 -0200
commit1d8b32abc821116da4f3b4f192bc06a931d30076 (patch)
treecc66c77e4b14157dc0030675cbb8f67f553075b1
parent02c6c19687fac181ef3e3250f8de5dd85e10d11e (diff)
[feature] add priorities for blob transfers
Closes: #8691
-rw-r--r--src/leap/soledad/client/_db/blobs/__init__.py93
-rw-r--r--src/leap/soledad/client/_db/blobs/sql.py68
-rw-r--r--src/leap/soledad/client/_db/blobs/sync.py106
-rw-r--r--tests/blobs/test_priorities.py207
4 files changed, 411 insertions, 63 deletions
diff --git a/src/leap/soledad/client/_db/blobs/__init__.py b/src/leap/soledad/client/_db/blobs/__init__.py
index 4699e0a0..df801850 100644
--- a/src/leap/soledad/client/_db/blobs/__init__.py
+++ b/src/leap/soledad/client/_db/blobs/__init__.py
@@ -24,6 +24,7 @@ import os
import json
import base64
+from collections import defaultdict
from io import BytesIO
from twisted.logger import Logger
@@ -44,6 +45,7 @@ from leap.soledad.client._pipes import TruncatedTailPipe
from leap.soledad.client._pipes import PreamblePipe
from .sql import SyncStatus
+from .sql import Priority
from .sql import SQLiteBlobBackend
from .sync import BlobsSynchronizer
from .errors import (
@@ -139,6 +141,7 @@ class BlobManager(BlobsSynchronizer):
self.user = user
self._client = HTTPClient(user, token, cert_file)
self.semaphore = defer.DeferredSemaphore(self.concurrent_writes_limit)
+ self.locks = defaultdict(defer.DeferredLock)
def close(self):
if hasattr(self, 'local') and self.local:
@@ -207,7 +210,8 @@ class BlobManager(BlobsSynchronizer):
def local_list_status(self, status, namespace=''):
return self.local.list_status(status, namespace)
- def put(self, doc, size, namespace='', local_only=False):
+ def put(self, doc, size, namespace='', local_only=False,
+ priority=Priority.DEFAULT):
"""
Put a blob in local storage and upload it to server.
@@ -220,12 +224,15 @@ class BlobManager(BlobsSynchronizer):
:param namespace:
Optional parameter to restrict operation to a given namespace.
:type namespace: str
+
+ :return: A deferred that fires when the blob has been put.
+ :rtype: twisted.internet.defer.Deferred
"""
return self.semaphore.run(
- self._put, doc, size, namespace, local_only=local_only)
+ self._put, doc, size, namespace, local_only, priority)
@defer.inlineCallbacks
- def _put(self, doc, size, namespace, local_only=False):
+ def _put(self, doc, size, namespace, local_only, priority):
if (yield self.local.exists(doc.blob_id, namespace=namespace)):
error_message = "Blob already exists: %s" % doc.blob_id
raise BlobAlreadyExistsError(error_message)
@@ -233,17 +240,31 @@ class BlobManager(BlobsSynchronizer):
# TODO this is a tee really, but ok... could do db and upload
# concurrently. not sure if we'd gain something.
yield self.local.put(doc.blob_id, fd, size=size, namespace=namespace)
+
if local_only:
yield self.local.update_sync_status(
- doc.blob_id, SyncStatus.LOCAL_ONLY)
+ doc.blob_id, SyncStatus.LOCAL_ONLY, namespace=namespace)
defer.returnValue(None)
+
yield self.local.update_sync_status(
- doc.blob_id, SyncStatus.PENDING_UPLOAD)
+ doc.blob_id, SyncStatus.PENDING_UPLOAD, namespace=namespace,
+ priority=priority)
+ yield self._send(doc.blob_id, namespace, 1, 1)
+
+ def _send(self, blob_id, namespace, i, total):
+ lock = self.locks[blob_id]
+ d = lock.run(self.__send, blob_id, namespace, i, total)
+ return d
+
+ @defer.inlineCallbacks
+ def __send(self, blob_id, namespace, i, total):
+ logger.info("Sending blob to server (%d/%d): %s"
+ % (i, total, blob_id))
# In fact, some kind of pipe is needed here, where each write on db
# handle gets forwarded into a write on the connection handle
- fd = yield self.local.get(doc.blob_id, namespace=namespace)
- yield self._encrypt_and_upload(doc.blob_id, fd, namespace=namespace)
- yield self.local.update_sync_status(doc.blob_id, SyncStatus.SYNCED)
+ fd = yield self.local.get(blob_id, namespace=namespace)
+ yield self._encrypt_and_upload(blob_id, fd)
+ yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED)
def set_flags(self, blob_id, flags, namespace=''):
"""
@@ -294,7 +315,7 @@ class BlobManager(BlobsSynchronizer):
defer.returnValue((yield response.json()))
@defer.inlineCallbacks
- def get(self, blob_id, namespace=''):
+ def get(self, blob_id, namespace='', priority=Priority.DEFAULT):
"""
Get the blob from local storage or, if not available, from the server.
@@ -304,6 +325,10 @@ class BlobManager(BlobsSynchronizer):
:param namespace:
Optional parameter to restrict operation to a given namespace.
:type namespace: str
+
+ :return: A deferred that fires with the file descriptor for the
+ contents of the blob.
+ :rtype: twisted.internet.defer.Deferred
"""
local_blob = yield self.local.get(blob_id, namespace=namespace)
if local_blob:
@@ -311,8 +336,19 @@ class BlobManager(BlobsSynchronizer):
defer.returnValue(local_blob)
yield self.local.update_sync_status(
- blob_id, SyncStatus.PENDING_DOWNLOAD, namespace=namespace)
+ blob_id, SyncStatus.PENDING_DOWNLOAD, namespace=namespace,
+ priority=priority)
+
+ fd = yield self._fetch(blob_id, namespace)
+ defer.returnValue(fd)
+
+ def _fetch(self, blob_id, namespace):
+ lock = self.locks[blob_id]
+ d = lock.run(self.__fetch, blob_id, namespace)
+ return d
+ @defer.inlineCallbacks
+ def __fetch(self, blob_id, namespace):
try:
result = yield self._download_and_decrypt(blob_id, namespace)
except Exception as e:
@@ -347,6 +383,8 @@ class BlobManager(BlobsSynchronizer):
logger.info("Got decrypted blob of type: %s" % type(blob))
blob.seek(0)
yield self.local.put(blob_id, blob, size=size, namespace=namespace)
+ yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED,
+ namespace=namespace)
local_blob = yield self.local.get(blob_id, namespace=namespace)
defer.returnValue(local_blob)
else:
@@ -435,3 +473,38 @@ class BlobManager(BlobsSynchronizer):
response = yield self._client.delete(uri, params=params)
check_http_status(response.code, blob_id=blob_id)
defer.returnValue(response)
+
+ def set_priority(self, blob_id, priority, namespace=''):
+ """
+ Set the transfer priority for a certain blob.
+
+ :param blob_id: Unique identifier of a blob.
+ :type blob_id: str
+ :param priority: The priority to be set.
+ :type priority: int
+ :param namespace: Optional parameter to restrict operation to a given
+ namespace.
+ :type namespace: str
+
+ :return: A deferred that fires after the priority has been set.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ d = self.local.update_priority(blob_id, priority, namespace=namespace)
+ return d
+
+ def get_priority(self, blob_id, namespace=''):
+ """
+ Get the transfer priority for a certain blob.
+
+ :param blob_id: Unique identifier of a blob.
+ :type blob_id: str
+ :param namespace: Optional parameter to restrict operation to a given
+ namespace.
+ :type namespace: str
+
+ :return: A deferred that fires with the current transfer priority of
+ the blob.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ d = self.local.get_priority(blob_id, namespace=namespace)
+ return d
diff --git a/src/leap/soledad/client/_db/blobs/sql.py b/src/leap/soledad/client/_db/blobs/sql.py
index ebd6c095..a89802d8 100644
--- a/src/leap/soledad/client/_db/blobs/sql.py
+++ b/src/leap/soledad/client/_db/blobs/sql.py
@@ -46,6 +46,14 @@ class SyncStatus:
UNAVAILABLE_STATUSES = (3, 5)
+class Priority:
+ LOW = 1
+ MEDIUM = 2
+ HIGH = 3
+ URGENT = 4
+ DEFAULT = 2
+
+
class SQLiteBlobBackend(object):
concurrency_limit = 10
@@ -130,7 +138,7 @@ class SQLiteBlobBackend(object):
@defer.inlineCallbacks
def list_status(self, sync_status, namespace=''):
query = 'select blob_id from sync_state where sync_status = ?'
- query += 'AND namespace = ?'
+ query += 'AND namespace = ? ORDER BY priority DESC'
values = (sync_status, namespace,)
result = yield self.dbpool.runQuery(query, values)
if result:
@@ -139,21 +147,42 @@ class SQLiteBlobBackend(object):
defer.returnValue([])
@defer.inlineCallbacks
- def update_sync_status(self, blob_id, sync_status, namespace=""):
- query = 'SELECT sync_status FROM sync_state WHERE blob_id = ?'
- result = yield self.dbpool.runQuery(query, (blob_id,))
+ def update_sync_status(self, blob_id, sync_status, namespace="",
+ priority=None):
+ retries = '(SELECT retries from sync_state' \
+ ' WHERE blob_id="%s" and namespace="%s")' \
+ % (blob_id, namespace)
+ if not priority:
+ priority = '(SELECT priority FROM sync_state' \
+ ' WHERE blob_id="%s" AND namespace="%s")' \
+ % (blob_id, namespace)
+ fields = 'blob_id, namespace, sync_status, retries, priority'
+ markers = '?, ?, ?, %s, %s' % (retries, priority)
+ values = [blob_id, namespace, sync_status]
+ insert = 'INSERT or REPLACE INTO sync_state (%s) VALUES (%s)' \
+ % (fields, markers)
+ yield self.dbpool.runOperation(insert, tuple(values))
+ @defer.inlineCallbacks
+ def get_priority(self, blob_id, namespace=""):
+ query = 'SELECT priority FROM sync_state WHERE blob_id = ?'
+ result = yield self.dbpool.runQuery(query, (blob_id,))
if not result:
- insert = 'INSERT INTO sync_state'
- insert += ' (blob_id, namespace, sync_status)'
- insert += ' VALUES (?, ?, ?)'
- values = (blob_id, namespace, sync_status)
- yield self.dbpool.runOperation(insert, values)
- return
+ defer.returnValue(None)
+ priority = result.pop()[0]
+ defer.returnValue(priority)
- update = 'UPDATE sync_state SET sync_status = ? WHERE blob_id = ?'
- values = (sync_status, blob_id,)
- result = yield self.dbpool.runOperation(update, values)
+ @defer.inlineCallbacks
+ def update_priority(self, blob_id, priority, namespace=""):
+ old_priority = self.get_priority(blob_id, namespace=namespace)
+ if not old_priority:
+ logger.error("Can't update priority of %s: no sync status entry.")
+ return
+ if old_priority == priority:
+ return
+ update = 'UPDATE sync_state SET priority = ? WHERE blob_id = ?'
+ values = (priority, blob_id,)
+ yield self.dbpool.runOperation(update, values)
def update_batch_sync_status(self, blob_id_list, sync_status,
namespace=''):
@@ -161,11 +190,12 @@ class SQLiteBlobBackend(object):
return
insert = 'INSERT or REPLACE INTO sync_state'
first_blob_id, blob_id_list = blob_id_list[0], blob_id_list[1:]
- insert += ' (blob_id, namespace, sync_status) VALUES (?, ?, ?)'
- values = (first_blob_id, namespace, sync_status)
+ insert += ' (blob_id, namespace, sync_status, priority)'
+ insert += ' VALUES (?, ?, ?, ?)'
+ values = (first_blob_id, namespace, sync_status, Priority.DEFAULT)
for blob_id in blob_id_list:
- insert += ', (?, ?, ?)'
- values += (blob_id, namespace, sync_status)
+ insert += ', (?, ?, ?, ?)'
+ values += (blob_id, namespace, sync_status, Priority.DEFAULT)
return self.dbpool.runQuery(insert, values)
def increment_retries(self, blob_id):
@@ -212,9 +242,11 @@ def _init_sync_table(conn):
blob_id PRIMARY KEY,
namespace TEXT,
sync_status INT default %s,
+ priority INT default %d,
retries INT default 0)"""
default_status = SyncStatus.PENDING_UPLOAD
- maybe_create %= default_status
+ default_priority = Priority.DEFAULT
+ maybe_create %= (default_status, default_priority)
conn.execute(maybe_create)
diff --git a/src/leap/soledad/client/_db/blobs/sync.py b/src/leap/soledad/client/_db/blobs/sync.py
index 3ee60305..e6397ede 100644
--- a/src/leap/soledad/client/_db/blobs/sync.py
+++ b/src/leap/soledad/client/_db/blobs/sync.py
@@ -17,12 +17,15 @@
"""
Synchronization between blobs client/server
"""
+from collections import defaultdict
from twisted.internet import defer
from twisted.internet import reactor
from twisted.logger import Logger
from twisted.internet import error
from .sql import SyncStatus
from .errors import RetriableTransferError
+
+
logger = Logger()
@@ -56,6 +59,9 @@ def with_retry(func, *args, **kwargs):
class BlobsSynchronizer(object):
+ def __init__(self):
+ self.locks = defaultdict(defer.DeferredLock)
+
@defer.inlineCallbacks
def refresh_sync_status_from_server(self, namespace=''):
d1 = self.remote_list(namespace=namespace)
@@ -82,7 +88,6 @@ class BlobsSynchronizer(object):
SyncStatus.SYNCED,
namespace=namespace)
- @defer.inlineCallbacks
def send_missing(self, namespace=''):
"""
Compare local and remote blobs and send what's missing in server.
@@ -90,30 +95,41 @@ class BlobsSynchronizer(object):
:param namespace:
Optional parameter to restrict operation to a given namespace.
:type namespace: str
+
+ :return: A deferred that fires when all local blobs were sent to
+ server.
+ :rtype: twisted.internet.defer.Deferred
"""
- missing = yield self.local.list_status(
- SyncStatus.PENDING_UPLOAD, namespace)
- total = len(missing)
- logger.info("Will send %d blobs to server." % total)
- deferreds = []
- semaphore = defer.DeferredSemaphore(self.concurrent_transfers_limit)
-
- for i in xrange(total):
- blob_id = missing.pop()
- d = semaphore.run(
- with_retry, self.__send_one, blob_id, namespace, i, total)
- deferreds.append(d)
- yield defer.gatherResults(deferreds, consumeErrors=True)
+ lock = self.locks['send_missing']
+ d = lock.run(self._send_missing, namespace)
+ return d
@defer.inlineCallbacks
- def __send_one(self, blob_id, namespace, i, total):
- logger.info("Sending blob to server (%d/%d): %s"
- % (i, total, blob_id))
- fd = yield self.local.get(blob_id, namespace=namespace)
- yield self._encrypt_and_upload(blob_id, fd)
- yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED)
+ def _send_missing(self, namespace):
+ max_transfers = self.concurrent_transfers_limit
+ semaphore = defer.DeferredSemaphore(max_transfers)
+ # the list of blobs should be refreshed often, so we run as many
+ # concurrent transfers as we can and then refresh the list
+ while True:
+ d = self.local_list_status(SyncStatus.PENDING_UPLOAD, namespace)
+ missing = yield d
+
+ if not missing:
+ break
+
+ total = len(missing)
+ now = min(total, max_transfers)
+ logger.info("There are %d pending blob uploads." % total)
+ logger.info("Will send %d blobs to server now." % now)
+ missing = missing[:now]
+ deferreds = []
+ for i in xrange(now):
+ blob_id = missing.pop(0)
+ d = semaphore.run(
+ with_retry, self._send, blob_id, namespace, i, total)
+ deferreds.append(d)
+ yield defer.gatherResults(deferreds, consumeErrors=True)
- @defer.inlineCallbacks
def fetch_missing(self, namespace=''):
"""
Compare local and remote blobs and fetch what's missing in local
@@ -122,21 +138,41 @@ class BlobsSynchronizer(object):
:param namespace:
Optional parameter to restrict operation to a given namespace.
:type namespace: str
+
+ :return: A deferred that fires when all remote blobs were received from
+ server.
+ :rtype: twisted.internet.defer.Deferred
"""
- # TODO: Use something to prioritize user requests over general new docs
- d = self.local_list_status(SyncStatus.PENDING_DOWNLOAD, namespace)
- docs_we_want = yield d
- total = len(docs_we_want)
- logger.info("Will fetch %d blobs from server." % total)
- deferreds = []
- semaphore = defer.DeferredSemaphore(self.concurrent_transfers_limit)
-
- for i in xrange(len(docs_we_want)):
- blob_id = docs_we_want.pop()
- logger.info("Fetching blob (%d/%d): %s" % (i, total, blob_id))
- d = semaphore.run(with_retry, self.get, blob_id, namespace)
- deferreds.append(d)
- yield defer.gatherResults(deferreds, consumeErrors=True)
+ lock = self.locks['fetch_missing']
+ d = lock.run(self._fetch_missing, namespace)
+ return d
+
+ @defer.inlineCallbacks
+ def _fetch_missing(self, namespace=''):
+ max_transfers = self.concurrent_transfers_limit
+ semaphore = defer.DeferredSemaphore(max_transfers)
+ # in order to make sure that transfer priorities will be met, the list
+ # of blobs to transfer should be refreshed often. What we do is run as
+ # many concurrent transfers as we can and then refresh the list
+ while True:
+ d = self.local_list_status(SyncStatus.PENDING_DOWNLOAD, namespace)
+ docs_we_want = yield d
+
+ if not docs_we_want:
+ break
+
+ total = len(docs_we_want)
+ now = min(total, max_transfers)
+ logger.info("There are %d pending blob downloads." % total)
+ logger.info("Will fetch %d blobs from server now." % now)
+ docs_we_want = docs_we_want[:now]
+ deferreds = []
+ for i in xrange(now):
+ blob_id = docs_we_want.pop(0)
+ logger.info("Fetching blob (%d/%d): %s" % (i, now, blob_id))
+ d = semaphore.run(with_retry, self._fetch, blob_id, namespace)
+ deferreds.append(d)
+ yield defer.gatherResults(deferreds, consumeErrors=True)
@defer.inlineCallbacks
def sync(self, namespace=''):
diff --git a/tests/blobs/test_priorities.py b/tests/blobs/test_priorities.py
new file mode 100644
index 00000000..fca582e5
--- /dev/null
+++ b/tests/blobs/test_priorities.py
@@ -0,0 +1,207 @@
+# -*- coding: utf-8 -*-
+# test_priorities.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for transfer priorities.
+"""
+import pytest
+
+from io import BytesIO
+from mock import Mock
+from twisted.internet import defer
+from twisted.trial import unittest
+from uuid import uuid4
+
+from leap.soledad.client._db.blobs import BlobManager
+from leap.soledad.client._db.blobs import Priority
+from leap.soledad.client._db.blobs import SyncStatus
+from leap.soledad.client._document import BlobDoc
+
+
+class BlobPrioritiesTests(unittest.TestCase):
+
+ def setUp(self):
+ self.cleartext = BytesIO('patriarchy is opression')
+ self.secret = 'A' * 96
+ self.manager = BlobManager(
+ self.tempdir, '',
+ 'A' * 32, self.secret,
+ uuid4().hex, 'token', None)
+ self.addCleanup(self.manager.close)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_get_sets_default_priority(self):
+ self.manager._download_and_decrypt = Mock(return_value=None)
+ missing_blob_id = uuid4().hex
+ result = yield self.manager.get(missing_blob_id)
+ self.assertIsNone(result)
+ priority = yield self.manager.get_priority(missing_blob_id)
+ self.assertEqual(Priority.DEFAULT, priority)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_get_sets_priority(self):
+ self.manager._download_and_decrypt = Mock(return_value=None)
+ missing_blob_id = uuid4().hex
+ urgent = Priority.URGENT
+ result = yield self.manager.get(missing_blob_id, priority=urgent)
+ self.assertIsNone(result)
+ priority = yield self.manager.get_priority(missing_blob_id)
+ self.assertEqual(urgent, priority)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_put_sets_default_priority(self):
+ upload_failure = defer.fail(Exception())
+ self.manager._encrypt_and_upload = Mock(return_value=upload_failure)
+ content, blob_id = "Blob content", uuid4().hex
+ doc1 = BlobDoc(BytesIO(content), blob_id)
+ with pytest.raises(Exception):
+ yield self.manager.put(doc1, len(content))
+ priority = yield self.manager.get_priority(blob_id)
+ self.assertEqual(Priority.DEFAULT, priority)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_put_sets_priority(self):
+ upload_failure = defer.fail(Exception())
+ self.manager._encrypt_and_upload = Mock(return_value=upload_failure)
+ content, blob_id = "Blob content", uuid4().hex
+ doc1 = BlobDoc(BytesIO(content), blob_id)
+ urgent = Priority.URGENT
+ with pytest.raises(Exception):
+ yield self.manager.put(doc1, len(content), priority=urgent)
+ priority = yield self.manager.get_priority(blob_id)
+ self.assertEqual(urgent, priority)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_set_priority_sets_priority(self):
+ self.manager._download_and_decrypt = Mock(return_value=None)
+ missing_blob_id = uuid4().hex
+ result = yield self.manager.get(missing_blob_id)
+ self.assertIsNone(result)
+ urgent = Priority.URGENT
+ yield self.manager.set_priority(missing_blob_id, urgent)
+ priority = yield self.manager.get_priority(missing_blob_id)
+ self.assertEqual(urgent, priority)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_local_list_status_orders_by_priority(self):
+ self.manager._download_and_decrypt = Mock(return_value=None)
+
+ def _get(priority):
+ missing_blob_id = uuid4().hex
+ d = self.manager.get(missing_blob_id, priority=priority)
+ d.addCallback(lambda _: missing_blob_id)
+ return d
+
+ # get some blobs in arbitrary order
+ low = yield _get(Priority.LOW)
+ high = yield _get(Priority.HIGH)
+ medium = yield _get(Priority.MEDIUM)
+ urgent = yield _get(Priority.URGENT)
+
+ # make sure they are ordered by priority
+ status = SyncStatus.PENDING_DOWNLOAD
+ pending = yield self.manager.local_list_status(status)
+ self.assertEqual([urgent, high, medium, low], pending)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_refresh_sync_status_from_server_saves_default_priorities(self):
+ remote_ids = [uuid4().hex for _ in range(10)]
+ self.manager.remote_list = Mock(return_value=defer.succeed(remote_ids))
+ yield self.manager.refresh_sync_status_from_server()
+ for blob_id in remote_ids:
+ priority = yield self.manager.get_priority(blob_id)
+ self.assertEquals(Priority.DEFAULT, priority)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_fetch_missing_fetches_with_priority(self):
+
+ # pretend we have some pending downloads
+ deferreds = [
+ self.manager.local.update_sync_status(
+ 'low', SyncStatus.PENDING_DOWNLOAD,
+ priority=Priority.LOW),
+ self.manager.local.update_sync_status(
+ 'high', SyncStatus.PENDING_DOWNLOAD,
+ priority=Priority.HIGH),
+ self.manager.local.update_sync_status(
+ 'medium', SyncStatus.PENDING_DOWNLOAD,
+ priority=Priority.MEDIUM),
+ self.manager.local.update_sync_status(
+ 'urgent', SyncStatus.PENDING_DOWNLOAD,
+ priority=Priority.URGENT),
+ ]
+ yield defer.gatherResults(deferreds)
+
+ # make sure download "succeeds" so fetching works
+ content = 'vegan muffin'
+ fd = BytesIO(content)
+ size = len(content)
+ self.manager._download_and_decrypt = Mock(return_value=(fd, size))
+ self.manager.concurrent_transfers_limit = 1
+
+ # this is the operation we are interested in
+ yield self.manager.fetch_missing()
+
+ # retrieve the order in which blob transfers were made
+ calls = self.manager._download_and_decrypt.mock_calls
+ order = map(lambda c: c[1][0], calls)
+ self.assertEqual(['urgent', 'high', 'medium', 'low'], order)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_send_missing_sends_with_priority(self):
+
+ # pretend we have some pending uploads
+ _send = self.manager._send
+ self.manager._send = Mock(return_value=None)
+ content = "vegan cake"
+ length = len(content)
+ deferreds = [
+ self.manager.put(
+ BlobDoc(BytesIO(content), 'low'), length,
+ priority=Priority.LOW),
+ self.manager.put(
+ BlobDoc(BytesIO(content), 'high'), length,
+ priority=Priority.HIGH),
+ self.manager.put(
+ BlobDoc(BytesIO(content), 'medium'), length,
+ priority=Priority.MEDIUM),
+ self.manager.put(
+ BlobDoc(BytesIO(content), 'urgent'), length,
+ priority=Priority.URGENT),
+ ]
+ yield defer.gatherResults(deferreds)
+
+ # make sure upload "succeeds" so sending works
+ self.manager._send = _send
+ self.manager._encrypt_and_upload = Mock(return_value=None)
+
+ # this is the operation we are interested in
+ self.manager.concurrent_transfers_limit = 1
+ yield self.manager.send_missing()
+
+ # retrieve the order in which blob transfers were made
+ calls = self.manager._encrypt_and_upload.mock_calls
+ order = map(lambda c: c[1][0], calls)
+ self.assertEqual(['urgent', 'high', 'medium', 'low'], order)