summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2017-12-18 12:10:01 -0200
committerdrebs <drebs@leap.se>2017-12-18 12:10:15 -0200
commit481afd8f268f567072b1f480870bbda5e1b46018 (patch)
tree0f74925833ad8e75bf71fca09131274849c919b2
parent383a19aaf87bd8e2665112c1350627140958eedf (diff)
[feature] cache user quota on blobs filesystem backend
Closes: 9016
-rw-r--r--src/leap/soledad/server/_blobs.py63
1 files changed, 46 insertions, 17 deletions
diff --git a/src/leap/soledad/server/_blobs.py b/src/leap/soledad/server/_blobs.py
index 70b55264..1fe90d5c 100644
--- a/src/leap/soledad/server/_blobs.py
+++ b/src/leap/soledad/server/_blobs.py
@@ -28,12 +28,14 @@ import os
import base64
import json
import re
+import time
from twisted.web import resource
from twisted.web.client import FileBodyProducer
from twisted.web.server import NOT_DONE_YET
from twisted.internet import utils, defer
+from collections import defaultdict
from zope.interface import implementer
from leap.common.files import mkdir_p
@@ -79,6 +81,8 @@ class QuotaExceeded(Exception):
@implementer(interfaces.IBlobsBackend)
class FilesystemBlobsBackend(object):
+ USAGE_TIMEOUT = 30
+
def __init__(self, blobs_path='/tmp/blobs/', quota=200 * 1024,
concurrent_writes=50):
self.quota = quota
@@ -86,6 +90,8 @@ class FilesystemBlobsBackend(object):
if not os.path.isdir(blobs_path):
os.makedirs(blobs_path)
self.path = blobs_path
+ self.usage = defaultdict(lambda: (None, None))
+ self.usage_locks = defaultdict(defer.DeferredLock)
def __touch(self, path):
open(path, 'a')
@@ -129,21 +135,37 @@ class FilesystemBlobsBackend(object):
@defer.inlineCallbacks
def write_blob(self, user, blob_id, producer, namespace=''):
+ # limit the number of concurrent writes to disk
yield self.semaphore.acquire()
- path = self._get_path(user, blob_id, namespace)
try:
- mkdir_p(os.path.split(path)[0])
- except OSError as e:
- logger.warn("Got exception trying to create directory: %r" % e)
- if os.path.isfile(path):
- raise BlobExists
- used = yield self.get_total_storage(user)
- if used > self.quota:
- raise QuotaExceeded
- logger.info('writing blob: %s - %s' % (user, blob_id))
- with open(path, 'wb') as blobfile:
- yield producer.startProducing(blobfile)
- yield self.semaphore.release()
+ path = self._get_path(user, blob_id, namespace)
+ try:
+ mkdir_p(os.path.split(path)[0])
+ except OSError as e:
+ logger.warn("Got exception trying to create directory: %r" % e)
+ if os.path.isfile(path):
+ raise BlobExists
+ used = yield self.get_total_storage(user)
+ length = producer.length / 1024.0 # original length is in bytes
+ if used + length > self.quota:
+ raise QuotaExceeded
+ logger.info('writing blob: %s - %s' % (user, blob_id))
+ with open(path, 'wb') as blobfile:
+ yield producer.startProducing(blobfile)
+ used += length
+ yield self._update_usage(user, used)
+ finally:
+ self.semaphore.release()
+
+ @defer.inlineCallbacks
+ def _update_usage(self, user, used):
+ lock = self.usage_locks[user]
+ yield lock.acquire()
+ try:
+ _, timestamp = self.usage[user]
+ self.usage[user] = (used, timestamp)
+ finally:
+ lock.release()
def delete_blob(self, user, blob_id, namespace=''):
try:
@@ -217,12 +239,19 @@ class FilesystemBlobsBackend(object):
if flag in blob_flags:
yield blob_path
+ @defer.inlineCallbacks
def get_total_storage(self, user):
+ lock = self.usage_locks[user]
+ yield lock.acquire()
try:
- path = self._get_path(user)
- except Exception as e:
- return defer.fail(e)
- return self._get_disk_usage(path)
+ used, timestamp = self.usage[user]
+ if used is None or time.time() > timestamp + self.USAGE_TIMEOUT:
+ path = self._get_path(user)
+ used = yield self._get_disk_usage(path)
+ self.usage[user] = (used, time.time())
+ defer.returnValue(used)
+ finally:
+ lock.release()
def get_tag(self, user, blob_id, namespace=''):
try: