summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordrebs <drebs@riseup.net>2017-10-12 11:26:30 -0300
committerdrebs <drebs@riseup.net>2017-10-12 13:39:00 -0300
commitfdfb55ad4d0110356393bd018bd9ed1c66bbbbcf (patch)
tree100568f7023734e971c030a7f8126994e6e0a6f8
parent07f94a9a6f281069a0441cafce3f8a92e6d03e8b (diff)
[bug] limit number of concurrent requests to local db
When running stress tests on blobs storage, we get weird errors when hundreds of requests are made concurrently to the sqlite backend. This commit adds a limit so only 10 requests will be delivered to the backend at a time.
-rw-r--r--src/leap/soledad/client/_db/blobs.py25
1 files changed, 19 insertions, 6 deletions
diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py
index 4edb77f4..e01078e4 100644
--- a/src/leap/soledad/client/_db/blobs.py
+++ b/src/leap/soledad/client/_db/blobs.py
@@ -220,7 +220,8 @@ class BlobManager(object):
in local and remote storages.
"""
max_retries = 3
- concurrency_limit = 3
+ concurrent_transfers_limit = 3
+ concurrent_writes_limit = 100
def __init__(
self, local_path, remote, key, secret, user, token=None,
@@ -248,6 +249,7 @@ class BlobManager(object):
self.secret = secret
self.user = user
self._client = HTTPClient(user, token, cert_file)
+ self.semaphore = defer.DeferredSemaphore(self.concurrent_writes_limit)
def close(self):
if hasattr(self, 'local') and self.local:
@@ -334,7 +336,7 @@ class BlobManager(object):
total = len(missing)
logger.info("Will send %d blobs to server." % total)
deferreds = []
- semaphore = defer.DeferredSemaphore(self.concurrency_limit)
+ semaphore = defer.DeferredSemaphore(self.concurrent_transfers_limit)
def release(result):
semaphore.release()
@@ -383,7 +385,7 @@ class BlobManager(object):
total = len(docs_we_want)
logger.info("Will fetch %d blobs from server." % total)
deferreds = []
- semaphore = defer.DeferredSemaphore(self.concurrency_limit)
+ semaphore = defer.DeferredSemaphore(self.concurrent_transfers_limit)
def release(result):
semaphore.release()
@@ -407,7 +409,6 @@ class BlobManager(object):
except defer.FirstError as e:
e.subFailure.raiseException()
- @defer.inlineCallbacks
def put(self, doc, size, namespace=''):
"""
Put a blob in local storage and upload it to server.
@@ -420,6 +421,10 @@ class BlobManager(object):
Optional parameter to restrict operation to a given namespace.
:type namespace: str
"""
+ return self.semaphore.run(self._put, doc, size, namespace)
+
+ @defer.inlineCallbacks
+ def _put(self, doc, size, namespace):
if (yield self.local.exists(doc.blob_id, namespace=namespace)):
error_message = "Blob already exists: %s" % doc.blob_id
raise BlobAlreadyExistsError(error_message)
@@ -433,7 +438,6 @@ class BlobManager(object):
yield self._encrypt_and_upload(doc.blob_id, fd, namespace=namespace)
yield self.local.update_sync_status(doc.blob_id, SyncStatus.SYNCED)
- @defer.inlineCallbacks
def set_flags(self, blob_id, flags, namespace=''):
"""
Set flags for a given blob_id.
@@ -450,6 +454,10 @@ class BlobManager(object):
:return: A deferred that fires when the operation finishes.
:rtype: twisted.internet.defer.Deferred
"""
+ return self.semaphore.run(self._set_flags, blob_id, flags, namespace)
+
+ @defer.inlineCallbacks
+ def _set_flags(self, blob_id, flags, namespace):
params = {'namespace': namespace} if namespace else None
flagsfd = BytesIO(json.dumps(flags))
uri = urljoin(self.remote, self.user + "/" + blob_id)
@@ -584,7 +592,6 @@ class BlobManager(object):
logger.info("Finished download: (%s, %d)" % (blob_id, size))
defer.returnValue((fd, size))
- @defer.inlineCallbacks
def delete(self, blob_id, namespace=''):
"""
Delete a blob from local and remote storages.
@@ -598,6 +605,10 @@ class BlobManager(object):
:return: A deferred that fires when the operation finishes.
:rtype: twisted.internet.defer.Deferred
"""
+ return self.semaphore.run(self._delete, blob_id, namespace)
+
+ @defer.inlineCallbacks
+ def _delete(self, blob_id, namespace):
logger.info("Staring deletion of blob: %s" % blob_id)
yield self._delete_from_remote(blob_id, namespace=namespace)
if (yield self.local.exists(blob_id, namespace=namespace)):
@@ -615,6 +626,8 @@ class BlobManager(object):
class SQLiteBlobBackend(object):
+ concurrency_limit = 10
+
def __init__(self, path, key=None, user=None):
dbname = '%s_blobs.db' % (user or 'soledad')
self.path = os.path.abspath(