diff options
-rw-r--r-- | src/leap/soledad/server/_blobs.py | 63 |
1 files changed, 46 insertions, 17 deletions
diff --git a/src/leap/soledad/server/_blobs.py b/src/leap/soledad/server/_blobs.py index 70b55264..1fe90d5c 100644 --- a/src/leap/soledad/server/_blobs.py +++ b/src/leap/soledad/server/_blobs.py @@ -28,12 +28,14 @@ import os import base64 import json import re +import time from twisted.web import resource from twisted.web.client import FileBodyProducer from twisted.web.server import NOT_DONE_YET from twisted.internet import utils, defer +from collections import defaultdict from zope.interface import implementer from leap.common.files import mkdir_p @@ -79,6 +81,8 @@ class QuotaExceeded(Exception): @implementer(interfaces.IBlobsBackend) class FilesystemBlobsBackend(object): + USAGE_TIMEOUT = 30 + def __init__(self, blobs_path='/tmp/blobs/', quota=200 * 1024, concurrent_writes=50): self.quota = quota @@ -86,6 +90,8 @@ class FilesystemBlobsBackend(object): if not os.path.isdir(blobs_path): os.makedirs(blobs_path) self.path = blobs_path + self.usage = defaultdict(lambda: (None, None)) + self.usage_locks = defaultdict(defer.DeferredLock) def __touch(self, path): open(path, 'a') @@ -129,21 +135,37 @@ class FilesystemBlobsBackend(object): @defer.inlineCallbacks def write_blob(self, user, blob_id, producer, namespace=''): + # limit the number of concurrent writes to disk yield self.semaphore.acquire() - path = self._get_path(user, blob_id, namespace) try: - mkdir_p(os.path.split(path)[0]) - except OSError as e: - logger.warn("Got exception trying to create directory: %r" % e) - if os.path.isfile(path): - raise BlobExists - used = yield self.get_total_storage(user) - if used > self.quota: - raise QuotaExceeded - logger.info('writing blob: %s - %s' % (user, blob_id)) - with open(path, 'wb') as blobfile: - yield producer.startProducing(blobfile) - yield self.semaphore.release() + path = self._get_path(user, blob_id, namespace) + try: + mkdir_p(os.path.split(path)[0]) + except OSError as e: + logger.warn("Got exception trying to create directory: %r" % e) + if os.path.isfile(path): + raise BlobExists + used = yield self.get_total_storage(user) + length = producer.length / 1024.0 # original length is in bytes + if used + length > self.quota: + raise QuotaExceeded + logger.info('writing blob: %s - %s' % (user, blob_id)) + with open(path, 'wb') as blobfile: + yield producer.startProducing(blobfile) + used += length + yield self._update_usage(user, used) + finally: + self.semaphore.release() + + @defer.inlineCallbacks + def _update_usage(self, user, used): + lock = self.usage_locks[user] + yield lock.acquire() + try: + _, timestamp = self.usage[user] + self.usage[user] = (used, timestamp) + finally: + lock.release() def delete_blob(self, user, blob_id, namespace=''): try: @@ -217,12 +239,19 @@ class FilesystemBlobsBackend(object): if flag in blob_flags: yield blob_path + @defer.inlineCallbacks def get_total_storage(self, user): + lock = self.usage_locks[user] + yield lock.acquire() try: - path = self._get_path(user) - except Exception as e: - return defer.fail(e) - return self._get_disk_usage(path) + used, timestamp = self.usage[user] + if used is None or time.time() > timestamp + self.USAGE_TIMEOUT: + path = self._get_path(user) + used = yield self._get_disk_usage(path) + self.usage[user] = (used, time.time()) + defer.returnValue(used) + finally: + lock.release() def get_tag(self, user, blob_id, namespace=''): try: |