summaryrefslogtreecommitdiff
path: root/src/leap/soledad/client/_db/blobs/__init__.py
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2017-11-29 11:42:29 -0200
committerdrebs <drebs@leap.se>2017-11-30 12:42:39 -0200
commit1d8b32abc821116da4f3b4f192bc06a931d30076 (patch)
treecc66c77e4b14157dc0030675cbb8f67f553075b1 /src/leap/soledad/client/_db/blobs/__init__.py
parent02c6c19687fac181ef3e3250f8de5dd85e10d11e (diff)
[feature] add priorities for blob transfers
Closes: #8691
Diffstat (limited to 'src/leap/soledad/client/_db/blobs/__init__.py')
-rw-r--r--src/leap/soledad/client/_db/blobs/__init__.py93
1 files changed, 83 insertions, 10 deletions
diff --git a/src/leap/soledad/client/_db/blobs/__init__.py b/src/leap/soledad/client/_db/blobs/__init__.py
index 4699e0a0..df801850 100644
--- a/src/leap/soledad/client/_db/blobs/__init__.py
+++ b/src/leap/soledad/client/_db/blobs/__init__.py
@@ -24,6 +24,7 @@ import os
import json
import base64
+from collections import defaultdict
from io import BytesIO
from twisted.logger import Logger
@@ -44,6 +45,7 @@ from leap.soledad.client._pipes import TruncatedTailPipe
from leap.soledad.client._pipes import PreamblePipe
from .sql import SyncStatus
+from .sql import Priority
from .sql import SQLiteBlobBackend
from .sync import BlobsSynchronizer
from .errors import (
@@ -139,6 +141,7 @@ class BlobManager(BlobsSynchronizer):
self.user = user
self._client = HTTPClient(user, token, cert_file)
self.semaphore = defer.DeferredSemaphore(self.concurrent_writes_limit)
+ self.locks = defaultdict(defer.DeferredLock)
def close(self):
if hasattr(self, 'local') and self.local:
@@ -207,7 +210,8 @@ class BlobManager(BlobsSynchronizer):
def local_list_status(self, status, namespace=''):
return self.local.list_status(status, namespace)
- def put(self, doc, size, namespace='', local_only=False):
+ def put(self, doc, size, namespace='', local_only=False,
+ priority=Priority.DEFAULT):
"""
Put a blob in local storage and upload it to server.
@@ -220,12 +224,15 @@ class BlobManager(BlobsSynchronizer):
:param namespace:
Optional parameter to restrict operation to a given namespace.
:type namespace: str
+
+ :return: A deferred that fires when the blob has been put.
+ :rtype: twisted.internet.defer.Deferred
"""
return self.semaphore.run(
- self._put, doc, size, namespace, local_only=local_only)
+ self._put, doc, size, namespace, local_only, priority)
@defer.inlineCallbacks
- def _put(self, doc, size, namespace, local_only=False):
+ def _put(self, doc, size, namespace, local_only, priority):
if (yield self.local.exists(doc.blob_id, namespace=namespace)):
error_message = "Blob already exists: %s" % doc.blob_id
raise BlobAlreadyExistsError(error_message)
@@ -233,17 +240,31 @@ class BlobManager(BlobsSynchronizer):
# TODO this is a tee really, but ok... could do db and upload
# concurrently. not sure if we'd gain something.
yield self.local.put(doc.blob_id, fd, size=size, namespace=namespace)
+
if local_only:
yield self.local.update_sync_status(
- doc.blob_id, SyncStatus.LOCAL_ONLY)
+ doc.blob_id, SyncStatus.LOCAL_ONLY, namespace=namespace)
defer.returnValue(None)
+
yield self.local.update_sync_status(
- doc.blob_id, SyncStatus.PENDING_UPLOAD)
+ doc.blob_id, SyncStatus.PENDING_UPLOAD, namespace=namespace,
+ priority=priority)
+ yield self._send(doc.blob_id, namespace, 1, 1)
+
+ def _send(self, blob_id, namespace, i, total):
+ lock = self.locks[blob_id]
+ d = lock.run(self.__send, blob_id, namespace, i, total)
+ return d
+
+ @defer.inlineCallbacks
+ def __send(self, blob_id, namespace, i, total):
+ logger.info("Sending blob to server (%d/%d): %s"
+ % (i, total, blob_id))
# In fact, some kind of pipe is needed here, where each write on db
# handle gets forwarded into a write on the connection handle
- fd = yield self.local.get(doc.blob_id, namespace=namespace)
- yield self._encrypt_and_upload(doc.blob_id, fd, namespace=namespace)
- yield self.local.update_sync_status(doc.blob_id, SyncStatus.SYNCED)
+ fd = yield self.local.get(blob_id, namespace=namespace)
+ yield self._encrypt_and_upload(blob_id, fd)
+ yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED)
def set_flags(self, blob_id, flags, namespace=''):
"""
@@ -294,7 +315,7 @@ class BlobManager(BlobsSynchronizer):
defer.returnValue((yield response.json()))
@defer.inlineCallbacks
- def get(self, blob_id, namespace=''):
+ def get(self, blob_id, namespace='', priority=Priority.DEFAULT):
"""
Get the blob from local storage or, if not available, from the server.
@@ -304,6 +325,10 @@ class BlobManager(BlobsSynchronizer):
:param namespace:
Optional parameter to restrict operation to a given namespace.
:type namespace: str
+
+ :return: A deferred that fires with the file descriptor for the
+ contents of the blob.
+ :rtype: twisted.internet.defer.Deferred
"""
local_blob = yield self.local.get(blob_id, namespace=namespace)
if local_blob:
@@ -311,8 +336,19 @@ class BlobManager(BlobsSynchronizer):
defer.returnValue(local_blob)
yield self.local.update_sync_status(
- blob_id, SyncStatus.PENDING_DOWNLOAD, namespace=namespace)
+ blob_id, SyncStatus.PENDING_DOWNLOAD, namespace=namespace,
+ priority=priority)
+
+ fd = yield self._fetch(blob_id, namespace)
+ defer.returnValue(fd)
+
+ def _fetch(self, blob_id, namespace):
+ lock = self.locks[blob_id]
+ d = lock.run(self.__fetch, blob_id, namespace)
+ return d
+ @defer.inlineCallbacks
+ def __fetch(self, blob_id, namespace):
try:
result = yield self._download_and_decrypt(blob_id, namespace)
except Exception as e:
@@ -347,6 +383,8 @@ class BlobManager(BlobsSynchronizer):
logger.info("Got decrypted blob of type: %s" % type(blob))
blob.seek(0)
yield self.local.put(blob_id, blob, size=size, namespace=namespace)
+ yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED,
+ namespace=namespace)
local_blob = yield self.local.get(blob_id, namespace=namespace)
defer.returnValue(local_blob)
else:
@@ -435,3 +473,38 @@ class BlobManager(BlobsSynchronizer):
response = yield self._client.delete(uri, params=params)
check_http_status(response.code, blob_id=blob_id)
defer.returnValue(response)
+
+ def set_priority(self, blob_id, priority, namespace=''):
+ """
+ Set the transfer priority for a certain blob.
+
+ :param blob_id: Unique identifier of a blob.
+ :type blob_id: str
+ :param priority: The priority to be set.
+ :type priority: int
+ :param namespace: Optional parameter to restrict operation to a given
+ namespace.
+ :type namespace: str
+
+ :return: A deferred that fires after the priority has been set.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ d = self.local.update_priority(blob_id, priority, namespace=namespace)
+ return d
+
+ def get_priority(self, blob_id, namespace=''):
+ """
+ Get the transfer priority for a certain blob.
+
+ :param blob_id: Unique identifier of a blob.
+ :type blob_id: str
+ :param namespace: Optional parameter to restrict operation to a given
+ namespace.
+ :type namespace: str
+
+ :return: A deferred that fires with the current transfer priority of
+ the blob.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ d = self.local.get_priority(blob_id, namespace=namespace)
+ return d