summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/leap/soledad/server/_blobs.py45
-rw-r--r--src/leap/soledad/server/_incoming.py62
-rw-r--r--src/leap/soledad/server/interfaces.py11
-rw-r--r--tests/blobs/test_fs_backend.py16
-rw-r--r--tests/server/test_incoming_server.py5
5 files changed, 91 insertions, 48 deletions
diff --git a/src/leap/soledad/server/_blobs.py b/src/leap/soledad/server/_blobs.py
index 893f752e..86bbcdcb 100644
--- a/src/leap/soledad/server/_blobs.py
+++ b/src/leap/soledad/server/_blobs.py
@@ -65,6 +65,18 @@ class BlobNotFound(Exception):
"""
+class BlobExists(Exception):
+ """
+ Raised when a blob already exists in data storage backend.
+ """
+
+
+class QuotaExceeded(Exception):
+ """
+ Raised when the quota would be exceeded if an operation would be held.
+ """
+
+
@implementer(interfaces.IBlobsBackend)
class FilesystemBlobsBackend(object):
@@ -107,7 +119,7 @@ class FilesystemBlobsBackend(object):
flags_file.write(raw_flags)
@defer.inlineCallbacks
- def write_blob(self, user, blob_id, request, namespace=''):
+ def write_blob(self, user, blob_id, fd, namespace=''):
yield self.semaphore.acquire()
path = self._get_path(user, blob_id, namespace)
try:
@@ -115,18 +127,12 @@ class FilesystemBlobsBackend(object):
except OSError as e:
logger.warn("Got exception trying to create directory: %r" % e)
if os.path.isfile(path):
- # 409 - Conflict
- request.setResponseCode(409)
- request.write("Blob already exists: %s" % blob_id)
- defer.returnValue(None)
+ raise BlobExists
used = yield self.get_total_storage(user)
if used > self.quota:
- logger.error("Error 507: Quota exceeded for user: %s" % user)
- request.setResponseCode(507)
- request.write('Quota Exceeded!')
- defer.returnValue(None)
+ raise QuotaExceeded
logger.info('writing blob: %s - %s' % (user, blob_id))
- fbp = FileBodyProducer(request.content)
+ fbp = FileBodyProducer(fd)
with open(path, 'wb') as blobfile:
yield fbp.startProducing(blobfile)
yield self.semaphore.release()
@@ -304,8 +310,25 @@ class BlobsResource(resource.Resource):
def render_PUT(self, request):
logger.info("http put: %s" % request.path)
user, blob_id, namespace = self._validate(request)
- d = self._handler.write_blob(user, blob_id, request, namespace)
+
+ def catchBlobExists(failure):
+ failure.trap(BlobExists)
+ request.setResponseCode(409)
+ request.write("Blob already exists: %s" % blob_id)
+ request.finish()
+
+ def catchQuotaExceeded(failure):
+ failure.trap(QuotaExceeded)
+ logger.error("Error 507: Quota exceeded for user: %s" % user)
+ request.setResponseCode(507)
+ request.write('Quota Exceeded!')
+ request.finish()
+
+ fd = request.content
+ d = self._handler.write_blob(user, blob_id, fd, namespace=namespace)
d.addCallback(lambda _: request.finish())
+ d.addErrback(catchBlobExists)
+ d.addErrback(catchQuotaExceeded)
d.addErrback(self._error, request)
return NOT_DONE_YET
diff --git a/src/leap/soledad/server/_incoming.py b/src/leap/soledad/server/_incoming.py
index be172b22..362039af 100644
--- a/src/leap/soledad/server/_incoming.py
+++ b/src/leap/soledad/server/_incoming.py
@@ -34,6 +34,8 @@ from leap.soledad.common.log import getLogger
from leap.soledad.server._config import get_config
from leap.soledad.server._blobs import BlobsServerState
+from leap.soledad.server._blobs import BlobExists
+from leap.soledad.server._blobs import QuotaExceeded
__all__ = ['IncomingResource']
@@ -65,29 +67,49 @@ class IncomingResource(Resource):
scheme = EncryptionSchemes.PUBKEY
db = self.factory.open_database(uuid)
if uses_legacy(db):
- try:
- doc = ServerDocument(doc_id)
- content = request.content.read()
- doc.content = self.formatter.format(content, scheme)
- db.put_doc(doc)
- self._finish(request)
- except Exception as e:
- self._error(e, request)
+ self._put_legacy(doc_id, scheme, db, request)
else:
- raw_content = request.content.read()
- preamble = self.formatter.preamble(raw_content, doc_id)
- request.content = BytesIO(preamble + ' ' + raw_content)
- d = db.write_blob(uuid, doc_id, request, namespace='MX')
- # FIXME: We really need to decouple request handling from the
- # backend! This is very ugly, but will change when this refactor
- # is done.
- flags = [Flags.PENDING]
- d.addCallback(lambda _: db.set_flags(uuid, doc_id, flags,
- namespace='MX'))
- d.addCallback(lambda _: self._finish(request))
- d.addErrback(self._error, request)
+ self._put_incoming(uuid, doc_id, scheme, db, request)
return NOT_DONE_YET
+ def _put_legacy(self, doc_id, scheme, db, request):
+ try:
+ doc = ServerDocument(doc_id)
+ content = request.content.read()
+ doc.content = self.formatter.format(content, scheme)
+ db.put_doc(doc)
+ self._finish(request)
+ except Exception as e:
+ self._error(e, request)
+
+ def _put_incoming(self, user, blob_id, scheme, db, request):
+ raw_content = request.content.read()
+ preamble = self.formatter.preamble(raw_content, blob_id)
+ request.content = BytesIO(preamble + ' ' + raw_content)
+
+ def catchBlobExists(failure):
+ failure.trap(BlobExists)
+ request.setResponseCode(409)
+ request.write("Blob already exists: %s" % blob_id)
+ request.finish()
+
+ def catchQuotaExceeded(failure):
+ failure.trap(QuotaExceeded)
+ logger.error("Error 507: Quota exceeded for user: %s" % user)
+ request.setResponseCode(507)
+ request.write('Quota Exceeded!')
+ request.finish()
+
+ fd = request.content
+ d = db.write_blob(user, blob_id, fd, namespace='MX')
+ flags = [Flags.PENDING]
+ d.addCallback(lambda _: db.set_flags(user, blob_id, flags,
+ namespace='MX'))
+ d.addCallback(lambda _: request.finish())
+ d.addErrback(catchBlobExists)
+ d.addErrback(catchQuotaExceeded)
+ d.addErrback(self._error, request)
+
def _finish(self, request):
request.write('{"success": true}')
request.finish()
diff --git a/src/leap/soledad/server/interfaces.py b/src/leap/soledad/server/interfaces.py
index a4dead70..11d75f11 100644
--- a/src/leap/soledad/server/interfaces.py
+++ b/src/leap/soledad/server/interfaces.py
@@ -45,22 +45,23 @@ class IBlobsBackend(Interface):
:rtype: twisted.web.resource.Resource
"""
- def write_blob(user, blob_id, request, namespace=''):
+ def write_blob(user, blob_id, fd, namespace=''):
"""
- Write a blob to the backend storage after reading it from a request.
+ Write a blob to the backend storage.
:param user: The id of the user who owns the blob.
:type user: str
:param blob_id: The id of the blob.
:type blob_id: str
- :param request: A representation of all of the information about the
- request that is being made.
- :type request: twisted.web.server.Request
+ :param fd: A file-like object into which the contents of the blob
+ should be written.
+ :type fd: file-like
:param namespace: An optional namespace for the blob.
:type namespace: str
:return: A deferred that fires when the blob has been written to the
backend storage.
+ :rtype: twisted.internet.defer.Deferred
"""
def delete_blob(user, blob_id, namespace=''):
diff --git a/tests/blobs/test_fs_backend.py b/tests/blobs/test_fs_backend.py
index 58d8690d..5a136f00 100644
--- a/tests/blobs/test_fs_backend.py
+++ b/tests/blobs/test_fs_backend.py
@@ -82,10 +82,9 @@ class FilesystemBackendTestCase(unittest.TestCase):
def test_cannot_overwrite(self, isfile):
isfile.return_value = True
backend = _blobs.FilesystemBlobsBackend(blobs_path=self.tempdir)
- request = DummyRequest([''])
- yield backend.write_blob('user', 'blob_id', request)
- self.assertEquals(request.written[0], "Blob already exists: blob_id")
- self.assertEquals(request.responseCode, 409)
+ with pytest.raises(_blobs.BlobExists):
+ fd = Mock()
+ yield backend.write_blob('user', 'blob_id', fd)
@pytest.mark.usefixtures("method_tmpdir")
@mock.patch.object(os.path, 'isfile')
@@ -93,14 +92,11 @@ class FilesystemBackendTestCase(unittest.TestCase):
def test_write_cannot_exceed_quota(self, isfile):
isfile.return_value = False
backend = _blobs.FilesystemBlobsBackend(blobs_path=self.tempdir)
- request = Mock()
-
backend.get_total_storage = lambda x: 100
backend.quota = 90
- yield backend.write_blob('user', 'blob_id', request)
-
- request.setResponseCode.assert_called_once_with(507)
- request.write.assert_called_once_with('Quota Exceeded!')
+ with pytest.raises(_blobs.QuotaExceeded):
+ fd = Mock()
+ yield backend.write_blob('user', 'blob_id', fd)
@pytest.mark.usefixtures("method_tmpdir")
def test_get_path_partitioning_by_default(self):
diff --git a/tests/server/test_incoming_server.py b/tests/server/test_incoming_server.py
index 42637c87..d5127077 100644
--- a/tests/server/test_incoming_server.py
+++ b/tests/server/test_incoming_server.py
@@ -84,8 +84,9 @@ class IncomingOnCouchServerTestCase(CouchDBTestCase):
db = self.state.open_database(user_id)
request = DummyRequest([user_id, doc_id])
- yield db.read_blob(user_id, doc_id, request, 'MX')
- flags = db.get_flags(user_id, doc_id, 'MX')
+ res = db.read_blob(user_id, doc_id, namespace='MX')
+ yield res.render_GET(request)
+ flags = db.get_flags(user_id, doc_id, namespace='MX')
expected_preamble = formatter.preamble(content, doc_id)
expected_preamble = decode_preamble(expected_preamble, True)
written_preamble, written_content = request.written[0].split()