diff options
-rw-r--r-- | src/leap/soledad/server/_blobs.py | 45 | ||||
-rw-r--r-- | src/leap/soledad/server/_incoming.py | 62 | ||||
-rw-r--r-- | src/leap/soledad/server/interfaces.py | 11 | ||||
-rw-r--r-- | tests/blobs/test_fs_backend.py | 16 | ||||
-rw-r--r-- | tests/server/test_incoming_server.py | 5 |
5 files changed, 91 insertions, 48 deletions
diff --git a/src/leap/soledad/server/_blobs.py b/src/leap/soledad/server/_blobs.py index 893f752e..86bbcdcb 100644 --- a/src/leap/soledad/server/_blobs.py +++ b/src/leap/soledad/server/_blobs.py @@ -65,6 +65,18 @@ class BlobNotFound(Exception): """ +class BlobExists(Exception): + """ + Raised when a blob already exists in data storage backend. + """ + + +class QuotaExceeded(Exception): + """ + Raised when the quota would be exceeded if an operation would be held. + """ + + @implementer(interfaces.IBlobsBackend) class FilesystemBlobsBackend(object): @@ -107,7 +119,7 @@ class FilesystemBlobsBackend(object): flags_file.write(raw_flags) @defer.inlineCallbacks - def write_blob(self, user, blob_id, request, namespace=''): + def write_blob(self, user, blob_id, fd, namespace=''): yield self.semaphore.acquire() path = self._get_path(user, blob_id, namespace) try: @@ -115,18 +127,12 @@ class FilesystemBlobsBackend(object): except OSError as e: logger.warn("Got exception trying to create directory: %r" % e) if os.path.isfile(path): - # 409 - Conflict - request.setResponseCode(409) - request.write("Blob already exists: %s" % blob_id) - defer.returnValue(None) + raise BlobExists used = yield self.get_total_storage(user) if used > self.quota: - logger.error("Error 507: Quota exceeded for user: %s" % user) - request.setResponseCode(507) - request.write('Quota Exceeded!') - defer.returnValue(None) + raise QuotaExceeded logger.info('writing blob: %s - %s' % (user, blob_id)) - fbp = FileBodyProducer(request.content) + fbp = FileBodyProducer(fd) with open(path, 'wb') as blobfile: yield fbp.startProducing(blobfile) yield self.semaphore.release() @@ -304,8 +310,25 @@ class BlobsResource(resource.Resource): def render_PUT(self, request): logger.info("http put: %s" % request.path) user, blob_id, namespace = self._validate(request) - d = self._handler.write_blob(user, blob_id, request, namespace) + + def catchBlobExists(failure): + failure.trap(BlobExists) + request.setResponseCode(409) + request.write("Blob already exists: %s" % blob_id) + request.finish() + + def catchQuotaExceeded(failure): + failure.trap(QuotaExceeded) + logger.error("Error 507: Quota exceeded for user: %s" % user) + request.setResponseCode(507) + request.write('Quota Exceeded!') + request.finish() + + fd = request.content + d = self._handler.write_blob(user, blob_id, fd, namespace=namespace) d.addCallback(lambda _: request.finish()) + d.addErrback(catchBlobExists) + d.addErrback(catchQuotaExceeded) d.addErrback(self._error, request) return NOT_DONE_YET diff --git a/src/leap/soledad/server/_incoming.py b/src/leap/soledad/server/_incoming.py index be172b22..362039af 100644 --- a/src/leap/soledad/server/_incoming.py +++ b/src/leap/soledad/server/_incoming.py @@ -34,6 +34,8 @@ from leap.soledad.common.log import getLogger from leap.soledad.server._config import get_config from leap.soledad.server._blobs import BlobsServerState +from leap.soledad.server._blobs import BlobExists +from leap.soledad.server._blobs import QuotaExceeded __all__ = ['IncomingResource'] @@ -65,29 +67,49 @@ class IncomingResource(Resource): scheme = EncryptionSchemes.PUBKEY db = self.factory.open_database(uuid) if uses_legacy(db): - try: - doc = ServerDocument(doc_id) - content = request.content.read() - doc.content = self.formatter.format(content, scheme) - db.put_doc(doc) - self._finish(request) - except Exception as e: - self._error(e, request) + self._put_legacy(doc_id, scheme, db, request) else: - raw_content = request.content.read() - preamble = self.formatter.preamble(raw_content, doc_id) - request.content = BytesIO(preamble + ' ' + raw_content) - d = db.write_blob(uuid, doc_id, request, namespace='MX') - # FIXME: We really need to decouple request handling from the - # backend! This is very ugly, but will change when this refactor - # is done. - flags = [Flags.PENDING] - d.addCallback(lambda _: db.set_flags(uuid, doc_id, flags, - namespace='MX')) - d.addCallback(lambda _: self._finish(request)) - d.addErrback(self._error, request) + self._put_incoming(uuid, doc_id, scheme, db, request) return NOT_DONE_YET + def _put_legacy(self, doc_id, scheme, db, request): + try: + doc = ServerDocument(doc_id) + content = request.content.read() + doc.content = self.formatter.format(content, scheme) + db.put_doc(doc) + self._finish(request) + except Exception as e: + self._error(e, request) + + def _put_incoming(self, user, blob_id, scheme, db, request): + raw_content = request.content.read() + preamble = self.formatter.preamble(raw_content, blob_id) + request.content = BytesIO(preamble + ' ' + raw_content) + + def catchBlobExists(failure): + failure.trap(BlobExists) + request.setResponseCode(409) + request.write("Blob already exists: %s" % blob_id) + request.finish() + + def catchQuotaExceeded(failure): + failure.trap(QuotaExceeded) + logger.error("Error 507: Quota exceeded for user: %s" % user) + request.setResponseCode(507) + request.write('Quota Exceeded!') + request.finish() + + fd = request.content + d = db.write_blob(user, blob_id, fd, namespace='MX') + flags = [Flags.PENDING] + d.addCallback(lambda _: db.set_flags(user, blob_id, flags, + namespace='MX')) + d.addCallback(lambda _: request.finish()) + d.addErrback(catchBlobExists) + d.addErrback(catchQuotaExceeded) + d.addErrback(self._error, request) + def _finish(self, request): request.write('{"success": true}') request.finish() diff --git a/src/leap/soledad/server/interfaces.py b/src/leap/soledad/server/interfaces.py index a4dead70..11d75f11 100644 --- a/src/leap/soledad/server/interfaces.py +++ b/src/leap/soledad/server/interfaces.py @@ -45,22 +45,23 @@ class IBlobsBackend(Interface): :rtype: twisted.web.resource.Resource """ - def write_blob(user, blob_id, request, namespace=''): + def write_blob(user, blob_id, fd, namespace=''): """ - Write a blob to the backend storage after reading it from a request. + Write a blob to the backend storage. :param user: The id of the user who owns the blob. :type user: str :param blob_id: The id of the blob. :type blob_id: str - :param request: A representation of all of the information about the - request that is being made. - :type request: twisted.web.server.Request + :param fd: A file-like object into which the contents of the blob + should be written. + :type fd: file-like :param namespace: An optional namespace for the blob. :type namespace: str :return: A deferred that fires when the blob has been written to the backend storage. + :rtype: twisted.internet.defer.Deferred """ def delete_blob(user, blob_id, namespace=''): diff --git a/tests/blobs/test_fs_backend.py b/tests/blobs/test_fs_backend.py index 58d8690d..5a136f00 100644 --- a/tests/blobs/test_fs_backend.py +++ b/tests/blobs/test_fs_backend.py @@ -82,10 +82,9 @@ class FilesystemBackendTestCase(unittest.TestCase): def test_cannot_overwrite(self, isfile): isfile.return_value = True backend = _blobs.FilesystemBlobsBackend(blobs_path=self.tempdir) - request = DummyRequest(['']) - yield backend.write_blob('user', 'blob_id', request) - self.assertEquals(request.written[0], "Blob already exists: blob_id") - self.assertEquals(request.responseCode, 409) + with pytest.raises(_blobs.BlobExists): + fd = Mock() + yield backend.write_blob('user', 'blob_id', fd) @pytest.mark.usefixtures("method_tmpdir") @mock.patch.object(os.path, 'isfile') @@ -93,14 +92,11 @@ class FilesystemBackendTestCase(unittest.TestCase): def test_write_cannot_exceed_quota(self, isfile): isfile.return_value = False backend = _blobs.FilesystemBlobsBackend(blobs_path=self.tempdir) - request = Mock() - backend.get_total_storage = lambda x: 100 backend.quota = 90 - yield backend.write_blob('user', 'blob_id', request) - - request.setResponseCode.assert_called_once_with(507) - request.write.assert_called_once_with('Quota Exceeded!') + with pytest.raises(_blobs.QuotaExceeded): + fd = Mock() + yield backend.write_blob('user', 'blob_id', fd) @pytest.mark.usefixtures("method_tmpdir") def test_get_path_partitioning_by_default(self): diff --git a/tests/server/test_incoming_server.py b/tests/server/test_incoming_server.py index 42637c87..d5127077 100644 --- a/tests/server/test_incoming_server.py +++ b/tests/server/test_incoming_server.py @@ -84,8 +84,9 @@ class IncomingOnCouchServerTestCase(CouchDBTestCase): db = self.state.open_database(user_id) request = DummyRequest([user_id, doc_id]) - yield db.read_blob(user_id, doc_id, request, 'MX') - flags = db.get_flags(user_id, doc_id, 'MX') + res = db.read_blob(user_id, doc_id, namespace='MX') + yield res.render_GET(request) + flags = db.get_flags(user_id, doc_id, namespace='MX') expected_preamble = formatter.preamble(content, doc_id) expected_preamble = decode_preamble(expected_preamble, True) written_preamble, written_content = request.written[0].split() |