diff options
| -rw-r--r-- | src/leap/soledad/client/_db/blobs.py | 25 | 
1 files changed, 19 insertions, 6 deletions
| diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py index 4edb77f4..e01078e4 100644 --- a/src/leap/soledad/client/_db/blobs.py +++ b/src/leap/soledad/client/_db/blobs.py @@ -220,7 +220,8 @@ class BlobManager(object):      in local and remote storages.      """      max_retries = 3 -    concurrency_limit = 3 +    concurrent_transfers_limit = 3 +    concurrent_writes_limit = 100      def __init__(              self, local_path, remote, key, secret, user, token=None, @@ -248,6 +249,7 @@ class BlobManager(object):          self.secret = secret          self.user = user          self._client = HTTPClient(user, token, cert_file) +        self.semaphore = defer.DeferredSemaphore(self.concurrent_writes_limit)      def close(self):          if hasattr(self, 'local') and self.local: @@ -334,7 +336,7 @@ class BlobManager(object):          total = len(missing)          logger.info("Will send %d blobs to server." % total)          deferreds = [] -        semaphore = defer.DeferredSemaphore(self.concurrency_limit) +        semaphore = defer.DeferredSemaphore(self.concurrent_transfers_limit)          def release(result):              semaphore.release() @@ -383,7 +385,7 @@ class BlobManager(object):          total = len(docs_we_want)          logger.info("Will fetch %d blobs from server." % total)          deferreds = [] -        semaphore = defer.DeferredSemaphore(self.concurrency_limit) +        semaphore = defer.DeferredSemaphore(self.concurrent_transfers_limit)          def release(result):              semaphore.release() @@ -407,7 +409,6 @@ class BlobManager(object):          except defer.FirstError as e:              e.subFailure.raiseException() -    @defer.inlineCallbacks      def put(self, doc, size, namespace=''):          """          Put a blob in local storage and upload it to server. @@ -420,6 +421,10 @@ class BlobManager(object):              Optional parameter to restrict operation to a given namespace.          :type namespace: str          """ +        return self.semaphore.run(self._put, doc, size, namespace) + +    @defer.inlineCallbacks +    def _put(self, doc, size, namespace):          if (yield self.local.exists(doc.blob_id, namespace=namespace)):              error_message = "Blob already exists: %s" % doc.blob_id              raise BlobAlreadyExistsError(error_message) @@ -433,7 +438,6 @@ class BlobManager(object):          yield self._encrypt_and_upload(doc.blob_id, fd, namespace=namespace)          yield self.local.update_sync_status(doc.blob_id, SyncStatus.SYNCED) -    @defer.inlineCallbacks      def set_flags(self, blob_id, flags, namespace=''):          """          Set flags for a given blob_id. @@ -450,6 +454,10 @@ class BlobManager(object):          :return: A deferred that fires when the operation finishes.          :rtype: twisted.internet.defer.Deferred          """ +        return self.semaphore.run(self._set_flags, blob_id, flags, namespace) + +    @defer.inlineCallbacks +    def _set_flags(self, blob_id, flags, namespace):          params = {'namespace': namespace} if namespace else None          flagsfd = BytesIO(json.dumps(flags))          uri = urljoin(self.remote, self.user + "/" + blob_id) @@ -584,7 +592,6 @@ class BlobManager(object):          logger.info("Finished download: (%s, %d)" % (blob_id, size))          defer.returnValue((fd, size)) -    @defer.inlineCallbacks      def delete(self, blob_id, namespace=''):          """          Delete a blob from local and remote storages. @@ -598,6 +605,10 @@ class BlobManager(object):          :return: A deferred that fires when the operation finishes.          :rtype: twisted.internet.defer.Deferred          """ +        return self.semaphore.run(self._delete, blob_id, namespace) + +    @defer.inlineCallbacks +    def _delete(self, blob_id, namespace):          logger.info("Staring deletion of blob: %s" % blob_id)          yield self._delete_from_remote(blob_id, namespace=namespace)          if (yield self.local.exists(blob_id, namespace=namespace)): @@ -615,6 +626,8 @@ class BlobManager(object):  class SQLiteBlobBackend(object): +    concurrency_limit = 10 +      def __init__(self, path, key=None, user=None):          dbname = '%s_blobs.db' % (user or 'soledad')          self.path = os.path.abspath( | 
