diff options
author | drebs <drebs@leap.se> | 2017-11-29 11:42:29 -0200 |
---|---|---|
committer | drebs <drebs@leap.se> | 2017-11-30 12:42:39 -0200 |
commit | 1d8b32abc821116da4f3b4f192bc06a931d30076 (patch) | |
tree | cc66c77e4b14157dc0030675cbb8f67f553075b1 /src/leap/soledad/client/_db/blobs/__init__.py | |
parent | 02c6c19687fac181ef3e3250f8de5dd85e10d11e (diff) |
[feature] add priorities for blob transfers
Closes: #8691
Diffstat (limited to 'src/leap/soledad/client/_db/blobs/__init__.py')
-rw-r--r-- | src/leap/soledad/client/_db/blobs/__init__.py | 93 |
1 files changed, 83 insertions, 10 deletions
diff --git a/src/leap/soledad/client/_db/blobs/__init__.py b/src/leap/soledad/client/_db/blobs/__init__.py index 4699e0a0..df801850 100644 --- a/src/leap/soledad/client/_db/blobs/__init__.py +++ b/src/leap/soledad/client/_db/blobs/__init__.py @@ -24,6 +24,7 @@ import os import json import base64 +from collections import defaultdict from io import BytesIO from twisted.logger import Logger @@ -44,6 +45,7 @@ from leap.soledad.client._pipes import TruncatedTailPipe from leap.soledad.client._pipes import PreamblePipe from .sql import SyncStatus +from .sql import Priority from .sql import SQLiteBlobBackend from .sync import BlobsSynchronizer from .errors import ( @@ -139,6 +141,7 @@ class BlobManager(BlobsSynchronizer): self.user = user self._client = HTTPClient(user, token, cert_file) self.semaphore = defer.DeferredSemaphore(self.concurrent_writes_limit) + self.locks = defaultdict(defer.DeferredLock) def close(self): if hasattr(self, 'local') and self.local: @@ -207,7 +210,8 @@ class BlobManager(BlobsSynchronizer): def local_list_status(self, status, namespace=''): return self.local.list_status(status, namespace) - def put(self, doc, size, namespace='', local_only=False): + def put(self, doc, size, namespace='', local_only=False, + priority=Priority.DEFAULT): """ Put a blob in local storage and upload it to server. @@ -220,12 +224,15 @@ class BlobManager(BlobsSynchronizer): :param namespace: Optional parameter to restrict operation to a given namespace. :type namespace: str + + :return: A deferred that fires when the blob has been put. + :rtype: twisted.internet.defer.Deferred """ return self.semaphore.run( - self._put, doc, size, namespace, local_only=local_only) + self._put, doc, size, namespace, local_only, priority) @defer.inlineCallbacks - def _put(self, doc, size, namespace, local_only=False): + def _put(self, doc, size, namespace, local_only, priority): if (yield self.local.exists(doc.blob_id, namespace=namespace)): error_message = "Blob already exists: %s" % doc.blob_id raise BlobAlreadyExistsError(error_message) @@ -233,17 +240,31 @@ class BlobManager(BlobsSynchronizer): # TODO this is a tee really, but ok... could do db and upload # concurrently. not sure if we'd gain something. yield self.local.put(doc.blob_id, fd, size=size, namespace=namespace) + if local_only: yield self.local.update_sync_status( - doc.blob_id, SyncStatus.LOCAL_ONLY) + doc.blob_id, SyncStatus.LOCAL_ONLY, namespace=namespace) defer.returnValue(None) + yield self.local.update_sync_status( - doc.blob_id, SyncStatus.PENDING_UPLOAD) + doc.blob_id, SyncStatus.PENDING_UPLOAD, namespace=namespace, + priority=priority) + yield self._send(doc.blob_id, namespace, 1, 1) + + def _send(self, blob_id, namespace, i, total): + lock = self.locks[blob_id] + d = lock.run(self.__send, blob_id, namespace, i, total) + return d + + @defer.inlineCallbacks + def __send(self, blob_id, namespace, i, total): + logger.info("Sending blob to server (%d/%d): %s" + % (i, total, blob_id)) # In fact, some kind of pipe is needed here, where each write on db # handle gets forwarded into a write on the connection handle - fd = yield self.local.get(doc.blob_id, namespace=namespace) - yield self._encrypt_and_upload(doc.blob_id, fd, namespace=namespace) - yield self.local.update_sync_status(doc.blob_id, SyncStatus.SYNCED) + fd = yield self.local.get(blob_id, namespace=namespace) + yield self._encrypt_and_upload(blob_id, fd) + yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED) def set_flags(self, blob_id, flags, namespace=''): """ @@ -294,7 +315,7 @@ class BlobManager(BlobsSynchronizer): defer.returnValue((yield response.json())) @defer.inlineCallbacks - def get(self, blob_id, namespace=''): + def get(self, blob_id, namespace='', priority=Priority.DEFAULT): """ Get the blob from local storage or, if not available, from the server. @@ -304,6 +325,10 @@ class BlobManager(BlobsSynchronizer): :param namespace: Optional parameter to restrict operation to a given namespace. :type namespace: str + + :return: A deferred that fires with the file descriptor for the + contents of the blob. + :rtype: twisted.internet.defer.Deferred """ local_blob = yield self.local.get(blob_id, namespace=namespace) if local_blob: @@ -311,8 +336,19 @@ class BlobManager(BlobsSynchronizer): defer.returnValue(local_blob) yield self.local.update_sync_status( - blob_id, SyncStatus.PENDING_DOWNLOAD, namespace=namespace) + blob_id, SyncStatus.PENDING_DOWNLOAD, namespace=namespace, + priority=priority) + + fd = yield self._fetch(blob_id, namespace) + defer.returnValue(fd) + + def _fetch(self, blob_id, namespace): + lock = self.locks[blob_id] + d = lock.run(self.__fetch, blob_id, namespace) + return d + @defer.inlineCallbacks + def __fetch(self, blob_id, namespace): try: result = yield self._download_and_decrypt(blob_id, namespace) except Exception as e: @@ -347,6 +383,8 @@ class BlobManager(BlobsSynchronizer): logger.info("Got decrypted blob of type: %s" % type(blob)) blob.seek(0) yield self.local.put(blob_id, blob, size=size, namespace=namespace) + yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED, + namespace=namespace) local_blob = yield self.local.get(blob_id, namespace=namespace) defer.returnValue(local_blob) else: @@ -435,3 +473,38 @@ class BlobManager(BlobsSynchronizer): response = yield self._client.delete(uri, params=params) check_http_status(response.code, blob_id=blob_id) defer.returnValue(response) + + def set_priority(self, blob_id, priority, namespace=''): + """ + Set the transfer priority for a certain blob. + + :param blob_id: Unique identifier of a blob. + :type blob_id: str + :param priority: The priority to be set. + :type priority: int + :param namespace: Optional parameter to restrict operation to a given + namespace. + :type namespace: str + + :return: A deferred that fires after the priority has been set. + :rtype: twisted.internet.defer.Deferred + """ + d = self.local.update_priority(blob_id, priority, namespace=namespace) + return d + + def get_priority(self, blob_id, namespace=''): + """ + Get the transfer priority for a certain blob. + + :param blob_id: Unique identifier of a blob. + :type blob_id: str + :param namespace: Optional parameter to restrict operation to a given + namespace. + :type namespace: str + + :return: A deferred that fires with the current transfer priority of + the blob. + :rtype: twisted.internet.defer.Deferred + """ + d = self.local.get_priority(blob_id, namespace=namespace) + return d |