summaryrefslogtreecommitdiff
path: root/common/src/leap/soledad/common/tests/test_server.py
diff options
context:
space:
mode:
Diffstat (limited to 'common/src/leap/soledad/common/tests/test_server.py')
-rw-r--r--common/src/leap/soledad/common/tests/test_server.py129
1 files changed, 0 insertions, 129 deletions
diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py
index 20fe8579..bf6c1515 100644
--- a/common/src/leap/soledad/common/tests/test_server.py
+++ b/common/src/leap/soledad/common/tests/test_server.py
@@ -43,7 +43,6 @@ from leap.soledad.common.tests.util import (
from leap.soledad.common import crypto
from leap.soledad.client import Soledad
-from leap.soledad.server import LockResource
from leap.soledad.server import load_configuration
from leap.soledad.server import CONFIG_DEFAULTS
from leap.soledad.server.auth import URLToAuthorization
@@ -494,134 +493,6 @@ class EncryptedSyncTestCase(
return self._test_encrypted_sym_sync(doc_size=2, number_of_docs=100)
-class LockResourceTestCase(
- CouchDBTestCase, TestCaseWithServer):
-
- """
- Tests for use of PUT and DELETE on lock resource.
- """
-
- @staticmethod
- def make_app_with_state(state):
- return make_token_soledad_app(state)
-
- make_document_for_test = make_soledad_document_for_test
-
- sync_target = soledad_sync_target
-
- def setUp(self):
- # the order of the following initializations is crucial because of
- # dependencies.
- # XXX explain better
- CouchDBTestCase.setUp(self)
- self.tempdir = tempfile.mkdtemp(prefix="leap_tests-")
- TestCaseWithServer.setUp(self)
- # create the databases
- db = CouchDatabase.open_database(
- urljoin(self.couch_url, ('shared-%s' % (uuid4().hex))),
- create=True,
- ensure_ddocs=True)
- self.addCleanup(db.delete_database)
- self._state = CouchServerState(self.couch_url)
- self._state.open_database = mock.Mock(return_value=db)
-
- def tearDown(self):
- CouchDBTestCase.tearDown(self)
- TestCaseWithServer.tearDown(self)
-
- def test__try_obtain_filesystem_lock(self):
- responder = mock.Mock()
- lock_uuid = uuid4().hex
- lr = LockResource(lock_uuid, self._state, responder)
- self.assertFalse(lr._lock.locked)
- self.assertTrue(lr._try_obtain_filesystem_lock())
- self.assertTrue(lr._lock.locked)
- lr._try_release_filesystem_lock()
-
- def test__try_release_filesystem_lock(self):
- responder = mock.Mock()
- lock_uuid = uuid4().hex
- lr = LockResource(lock_uuid, self._state, responder)
- lr._try_obtain_filesystem_lock()
- self.assertTrue(lr._lock.locked)
- lr._try_release_filesystem_lock()
- self.assertFalse(lr._lock.locked)
-
- def test_put(self):
- responder = mock.Mock()
- lock_uuid = uuid4().hex
- lr = LockResource(lock_uuid, self._state, responder)
- # lock!
- lr.put({}, None)
- # assert lock document was correctly written
- lock_doc = lr._shared_db.get_doc('lock-' + lock_uuid)
- self.assertIsNotNone(lock_doc)
- self.assertTrue(LockResource.TIMESTAMP_KEY in lock_doc.content)
- self.assertTrue(LockResource.LOCK_TOKEN_KEY in lock_doc.content)
- timestamp = lock_doc.content[LockResource.TIMESTAMP_KEY]
- token = lock_doc.content[LockResource.LOCK_TOKEN_KEY]
- self.assertTrue(timestamp < time.time())
- self.assertTrue(time.time() < timestamp + LockResource.TIMEOUT)
- # assert response to user
- responder.send_response_json.assert_called_with(
- 201, token=token,
- timeout=LockResource.TIMEOUT)
-
- def test_delete(self):
- responder = mock.Mock()
- lock_uuid = uuid4().hex
- lr = LockResource(lock_uuid, self._state, responder)
- # lock!
- lr.put({}, None)
- lock_doc = lr._shared_db.get_doc('lock-' + lock_uuid)
- token = lock_doc.content[LockResource.LOCK_TOKEN_KEY]
- # unlock!
- lr.delete({'token': token}, None)
- self.assertFalse(lr._lock.locked)
- self.assertIsNone(lr._shared_db.get_doc('lock-' + lock_uuid))
- responder.send_response_json.assert_called_with(200)
-
- def test_put_while_locked_fails(self):
- responder = mock.Mock()
- lock_uuid = uuid4().hex
- lr = LockResource(lock_uuid, self._state, responder)
- # lock!
- lr.put({}, None)
- # try to lock again!
- lr.put({}, None)
- self.assertEqual(
- len(responder.send_response_json.call_args), 2)
- self.assertEqual(
- responder.send_response_json.call_args[0], (403,))
- self.assertEqual(
- len(responder.send_response_json.call_args[1]), 2)
- self.assertTrue(
- 'remaining' in responder.send_response_json.call_args[1])
- self.assertTrue(
- responder.send_response_json.call_args[1]['remaining'] > 0)
-
- def test_unlock_unexisting_lock_fails(self):
- responder = mock.Mock()
- lock_uuid = uuid4().hex
- lr = LockResource(lock_uuid, self._state, responder)
- # unlock!
- lr.delete({'token': 'anything'}, None)
- responder.send_response_json.assert_called_with(
- 404, error='lock not found')
-
- def test_unlock_with_wrong_token_fails(self):
- responder = mock.Mock()
- lock_uuid = uuid4().hex
- lr = LockResource(lock_uuid, self._state, responder)
- # lock!
- lr.put({}, None)
- # unlock!
- lr.delete({'token': 'wrongtoken'}, None)
- self.assertIsNotNone(lr._shared_db.get_doc('lock-' + lock_uuid))
- responder.send_response_json.assert_called_with(
- 401, error='unlock unauthorized')
-
-
class ConfigurationParsingTest(unittest.TestCase):
def setUp(self):