summaryrefslogtreecommitdiff
path: root/src/leap/soledad/client
diff options
context:
space:
mode:
authordrebs <drebs@riseup.net>2017-10-12 11:26:30 -0300
committerdrebs <drebs@riseup.net>2017-10-12 14:34:11 -0300
commit3efdb6e197348d2c2774adfaca8faf2d1976bc94 (patch)
tree736309a24f8d60409c164ed285b6a7f734aa193b /src/leap/soledad/client
parente0e427728848ce8bb33b4a4d6f8937ec5788d2c6 (diff)
[bug] limit number of concurrent requests to local db
When running stress tests on blobs storage, we get weird errors when hundreds of requests are made concurrently to the sqlite backend. This commit adds a limit so only 10 requests will be delivered to the backend at a time.
Diffstat (limited to 'src/leap/soledad/client')
-rw-r--r--src/leap/soledad/client/_db/blobs.py25
1 files changed, 19 insertions, 6 deletions
diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py
index 4edb77f4..e01078e4 100644
--- a/src/leap/soledad/client/_db/blobs.py
+++ b/src/leap/soledad/client/_db/blobs.py
@@ -220,7 +220,8 @@ class BlobManager(object):
in local and remote storages.
"""
max_retries = 3
- concurrency_limit = 3
+ concurrent_transfers_limit = 3
+ concurrent_writes_limit = 100
def __init__(
self, local_path, remote, key, secret, user, token=None,
@@ -248,6 +249,7 @@ class BlobManager(object):
self.secret = secret
self.user = user
self._client = HTTPClient(user, token, cert_file)
+ self.semaphore = defer.DeferredSemaphore(self.concurrent_writes_limit)
def close(self):
if hasattr(self, 'local') and self.local:
@@ -334,7 +336,7 @@ class BlobManager(object):
total = len(missing)
logger.info("Will send %d blobs to server." % total)
deferreds = []
- semaphore = defer.DeferredSemaphore(self.concurrency_limit)
+ semaphore = defer.DeferredSemaphore(self.concurrent_transfers_limit)
def release(result):
semaphore.release()
@@ -383,7 +385,7 @@ class BlobManager(object):
total = len(docs_we_want)
logger.info("Will fetch %d blobs from server." % total)
deferreds = []
- semaphore = defer.DeferredSemaphore(self.concurrency_limit)
+ semaphore = defer.DeferredSemaphore(self.concurrent_transfers_limit)
def release(result):
semaphore.release()
@@ -407,7 +409,6 @@ class BlobManager(object):
except defer.FirstError as e:
e.subFailure.raiseException()
- @defer.inlineCallbacks
def put(self, doc, size, namespace=''):
"""
Put a blob in local storage and upload it to server.
@@ -420,6 +421,10 @@ class BlobManager(object):
Optional parameter to restrict operation to a given namespace.
:type namespace: str
"""
+ return self.semaphore.run(self._put, doc, size, namespace)
+
+ @defer.inlineCallbacks
+ def _put(self, doc, size, namespace):
if (yield self.local.exists(doc.blob_id, namespace=namespace)):
error_message = "Blob already exists: %s" % doc.blob_id
raise BlobAlreadyExistsError(error_message)
@@ -433,7 +438,6 @@ class BlobManager(object):
yield self._encrypt_and_upload(doc.blob_id, fd, namespace=namespace)
yield self.local.update_sync_status(doc.blob_id, SyncStatus.SYNCED)
- @defer.inlineCallbacks
def set_flags(self, blob_id, flags, namespace=''):
"""
Set flags for a given blob_id.
@@ -450,6 +454,10 @@ class BlobManager(object):
:return: A deferred that fires when the operation finishes.
:rtype: twisted.internet.defer.Deferred
"""
+ return self.semaphore.run(self._set_flags, blob_id, flags, namespace)
+
+ @defer.inlineCallbacks
+ def _set_flags(self, blob_id, flags, namespace):
params = {'namespace': namespace} if namespace else None
flagsfd = BytesIO(json.dumps(flags))
uri = urljoin(self.remote, self.user + "/" + blob_id)
@@ -584,7 +592,6 @@ class BlobManager(object):
logger.info("Finished download: (%s, %d)" % (blob_id, size))
defer.returnValue((fd, size))
- @defer.inlineCallbacks
def delete(self, blob_id, namespace=''):
"""
Delete a blob from local and remote storages.
@@ -598,6 +605,10 @@ class BlobManager(object):
:return: A deferred that fires when the operation finishes.
:rtype: twisted.internet.defer.Deferred
"""
+ return self.semaphore.run(self._delete, blob_id, namespace)
+
+ @defer.inlineCallbacks
+ def _delete(self, blob_id, namespace):
logger.info("Staring deletion of blob: %s" % blob_id)
yield self._delete_from_remote(blob_id, namespace=namespace)
if (yield self.local.exists(blob_id, namespace=namespace)):
@@ -615,6 +626,8 @@ class BlobManager(object):
class SQLiteBlobBackend(object):
+ concurrency_limit = 10
+
def __init__(self, path, key=None, user=None):
dbname = '%s_blobs.db' % (user or 'soledad')
self.path = os.path.abspath(