From 3efdb6e197348d2c2774adfaca8faf2d1976bc94 Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 12 Oct 2017 11:26:30 -0300 Subject: [bug] limit number of concurrent requests to local db When running stress tests on blobs storage, we get weird errors when hundreds of requests are made concurrently to the sqlite backend. This commit adds a limit so only 10 requests will be delivered to the backend at a time. --- src/leap/soledad/client/_db/blobs.py | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) (limited to 'src/leap/soledad/client') diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py index 4edb77f4..e01078e4 100644 --- a/src/leap/soledad/client/_db/blobs.py +++ b/src/leap/soledad/client/_db/blobs.py @@ -220,7 +220,8 @@ class BlobManager(object): in local and remote storages. """ max_retries = 3 - concurrency_limit = 3 + concurrent_transfers_limit = 3 + concurrent_writes_limit = 100 def __init__( self, local_path, remote, key, secret, user, token=None, @@ -248,6 +249,7 @@ class BlobManager(object): self.secret = secret self.user = user self._client = HTTPClient(user, token, cert_file) + self.semaphore = defer.DeferredSemaphore(self.concurrent_writes_limit) def close(self): if hasattr(self, 'local') and self.local: @@ -334,7 +336,7 @@ class BlobManager(object): total = len(missing) logger.info("Will send %d blobs to server." % total) deferreds = [] - semaphore = defer.DeferredSemaphore(self.concurrency_limit) + semaphore = defer.DeferredSemaphore(self.concurrent_transfers_limit) def release(result): semaphore.release() @@ -383,7 +385,7 @@ class BlobManager(object): total = len(docs_we_want) logger.info("Will fetch %d blobs from server." % total) deferreds = [] - semaphore = defer.DeferredSemaphore(self.concurrency_limit) + semaphore = defer.DeferredSemaphore(self.concurrent_transfers_limit) def release(result): semaphore.release() @@ -407,7 +409,6 @@ class BlobManager(object): except defer.FirstError as e: e.subFailure.raiseException() - @defer.inlineCallbacks def put(self, doc, size, namespace=''): """ Put a blob in local storage and upload it to server. @@ -420,6 +421,10 @@ class BlobManager(object): Optional parameter to restrict operation to a given namespace. :type namespace: str """ + return self.semaphore.run(self._put, doc, size, namespace) + + @defer.inlineCallbacks + def _put(self, doc, size, namespace): if (yield self.local.exists(doc.blob_id, namespace=namespace)): error_message = "Blob already exists: %s" % doc.blob_id raise BlobAlreadyExistsError(error_message) @@ -433,7 +438,6 @@ class BlobManager(object): yield self._encrypt_and_upload(doc.blob_id, fd, namespace=namespace) yield self.local.update_sync_status(doc.blob_id, SyncStatus.SYNCED) - @defer.inlineCallbacks def set_flags(self, blob_id, flags, namespace=''): """ Set flags for a given blob_id. @@ -450,6 +454,10 @@ class BlobManager(object): :return: A deferred that fires when the operation finishes. :rtype: twisted.internet.defer.Deferred """ + return self.semaphore.run(self._set_flags, blob_id, flags, namespace) + + @defer.inlineCallbacks + def _set_flags(self, blob_id, flags, namespace): params = {'namespace': namespace} if namespace else None flagsfd = BytesIO(json.dumps(flags)) uri = urljoin(self.remote, self.user + "/" + blob_id) @@ -584,7 +592,6 @@ class BlobManager(object): logger.info("Finished download: (%s, %d)" % (blob_id, size)) defer.returnValue((fd, size)) - @defer.inlineCallbacks def delete(self, blob_id, namespace=''): """ Delete a blob from local and remote storages. @@ -598,6 +605,10 @@ class BlobManager(object): :return: A deferred that fires when the operation finishes. :rtype: twisted.internet.defer.Deferred """ + return self.semaphore.run(self._delete, blob_id, namespace) + + @defer.inlineCallbacks + def _delete(self, blob_id, namespace): logger.info("Staring deletion of blob: %s" % blob_id) yield self._delete_from_remote(blob_id, namespace=namespace) if (yield self.local.exists(blob_id, namespace=namespace)): @@ -615,6 +626,8 @@ class BlobManager(object): class SQLiteBlobBackend(object): + concurrency_limit = 10 + def __init__(self, path, key=None, user=None): dbname = '%s_blobs.db' % (user or 'soledad') self.path = os.path.abspath( -- cgit v1.2.3