diff options
author | drebs <drebs@riseup.net> | 2017-10-12 11:26:30 -0300 |
---|---|---|
committer | drebs <drebs@riseup.net> | 2017-10-12 13:39:00 -0300 |
commit | fdfb55ad4d0110356393bd018bd9ed1c66bbbbcf (patch) | |
tree | 100568f7023734e971c030a7f8126994e6e0a6f8 /src/leap/soledad/client | |
parent | 07f94a9a6f281069a0441cafce3f8a92e6d03e8b (diff) |
[bug] limit number of concurrent requests to local db
When running stress tests on blobs storage, we get weird errors when
hundreds of requests are made concurrently to the sqlite backend. This
commit adds a limit so only 10 requests will be delivered to the backend
at a time.
Diffstat (limited to 'src/leap/soledad/client')
-rw-r--r-- | src/leap/soledad/client/_db/blobs.py | 25 |
1 files changed, 19 insertions, 6 deletions
diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py index 4edb77f4..e01078e4 100644 --- a/src/leap/soledad/client/_db/blobs.py +++ b/src/leap/soledad/client/_db/blobs.py @@ -220,7 +220,8 @@ class BlobManager(object): in local and remote storages. """ max_retries = 3 - concurrency_limit = 3 + concurrent_transfers_limit = 3 + concurrent_writes_limit = 100 def __init__( self, local_path, remote, key, secret, user, token=None, @@ -248,6 +249,7 @@ class BlobManager(object): self.secret = secret self.user = user self._client = HTTPClient(user, token, cert_file) + self.semaphore = defer.DeferredSemaphore(self.concurrent_writes_limit) def close(self): if hasattr(self, 'local') and self.local: @@ -334,7 +336,7 @@ class BlobManager(object): total = len(missing) logger.info("Will send %d blobs to server." % total) deferreds = [] - semaphore = defer.DeferredSemaphore(self.concurrency_limit) + semaphore = defer.DeferredSemaphore(self.concurrent_transfers_limit) def release(result): semaphore.release() @@ -383,7 +385,7 @@ class BlobManager(object): total = len(docs_we_want) logger.info("Will fetch %d blobs from server." % total) deferreds = [] - semaphore = defer.DeferredSemaphore(self.concurrency_limit) + semaphore = defer.DeferredSemaphore(self.concurrent_transfers_limit) def release(result): semaphore.release() @@ -407,7 +409,6 @@ class BlobManager(object): except defer.FirstError as e: e.subFailure.raiseException() - @defer.inlineCallbacks def put(self, doc, size, namespace=''): """ Put a blob in local storage and upload it to server. @@ -420,6 +421,10 @@ class BlobManager(object): Optional parameter to restrict operation to a given namespace. :type namespace: str """ + return self.semaphore.run(self._put, doc, size, namespace) + + @defer.inlineCallbacks + def _put(self, doc, size, namespace): if (yield self.local.exists(doc.blob_id, namespace=namespace)): error_message = "Blob already exists: %s" % doc.blob_id raise BlobAlreadyExistsError(error_message) @@ -433,7 +438,6 @@ class BlobManager(object): yield self._encrypt_and_upload(doc.blob_id, fd, namespace=namespace) yield self.local.update_sync_status(doc.blob_id, SyncStatus.SYNCED) - @defer.inlineCallbacks def set_flags(self, blob_id, flags, namespace=''): """ Set flags for a given blob_id. @@ -450,6 +454,10 @@ class BlobManager(object): :return: A deferred that fires when the operation finishes. :rtype: twisted.internet.defer.Deferred """ + return self.semaphore.run(self._set_flags, blob_id, flags, namespace) + + @defer.inlineCallbacks + def _set_flags(self, blob_id, flags, namespace): params = {'namespace': namespace} if namespace else None flagsfd = BytesIO(json.dumps(flags)) uri = urljoin(self.remote, self.user + "/" + blob_id) @@ -584,7 +592,6 @@ class BlobManager(object): logger.info("Finished download: (%s, %d)" % (blob_id, size)) defer.returnValue((fd, size)) - @defer.inlineCallbacks def delete(self, blob_id, namespace=''): """ Delete a blob from local and remote storages. @@ -598,6 +605,10 @@ class BlobManager(object): :return: A deferred that fires when the operation finishes. :rtype: twisted.internet.defer.Deferred """ + return self.semaphore.run(self._delete, blob_id, namespace) + + @defer.inlineCallbacks + def _delete(self, blob_id, namespace): logger.info("Staring deletion of blob: %s" % blob_id) yield self._delete_from_remote(blob_id, namespace=namespace) if (yield self.local.exists(blob_id, namespace=namespace)): @@ -615,6 +626,8 @@ class BlobManager(object): class SQLiteBlobBackend(object): + concurrency_limit = 10 + def __init__(self, path, key=None, user=None): dbname = '%s_blobs.db' % (user or 'soledad') self.path = os.path.abspath( |