diff options
Diffstat (limited to 'common/src/leap/soledad/common/tests/test_server.py')
-rw-r--r-- | common/src/leap/soledad/common/tests/test_server.py | 189 |
1 files changed, 180 insertions, 9 deletions
diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py index 1ea4d615..83df192b 100644 --- a/common/src/leap/soledad/common/tests/test_server.py +++ b/common/src/leap/soledad/common/tests/test_server.py @@ -24,6 +24,7 @@ import os import tempfile import simplejson as json import mock +import time from leap.common.testing.basetest import BaseLeapTest @@ -45,7 +46,7 @@ from leap.soledad.client import ( Soledad, target, ) -from leap.soledad.server import SoledadApp +from leap.soledad.server import SoledadApp, LockResource from leap.soledad.server.auth import URLToAuthorization @@ -86,9 +87,8 @@ class ServerAuthorizationTestCase(BaseLeapTest): /user-db/sync-from/{source} | GET, PUT, POST """ uuid = 'myuuid' - authmap = URLToAuthorization( - uuid, SoledadApp.SHARED_DB_NAME, SoledadApp.USER_DB_PREFIX) - dbname = authmap._uuid_dbname(uuid) + authmap = URLToAuthorization(uuid,) + dbname = authmap._user_db_name # test global auth self.assertTrue( authmap.is_authorized(self._make_environ('/', 'GET'))) @@ -202,8 +202,7 @@ class ServerAuthorizationTestCase(BaseLeapTest): Test if authorization fails for a wrong dbname. """ uuid = 'myuuid' - authmap = URLToAuthorization( - uuid, SoledadApp.SHARED_DB_NAME, SoledadApp.USER_DB_PREFIX) + authmap = URLToAuthorization(uuid) dbname = 'somedb' # test wrong-db database resource auth self.assertFalse( @@ -273,7 +272,7 @@ class EncryptedSyncTestCase( sync_target = token_leap_sync_target - def _soledad_instance(self, user='user-uuid', passphrase='123', + def _soledad_instance(self, user='user-uuid', passphrase=u'123', prefix='', secrets_path=Soledad.STORAGE_SECRETS_FILE_NAME, local_db_path='soledad.u1db', server_url='', @@ -293,6 +292,8 @@ class EncryptedSyncTestCase( get_doc = mock.Mock(return_value=None) put_doc = mock.Mock(side_effect=_put_doc_side_effect) + lock = mock.Mock(return_value=('atoken', 300)) + unlock = mock.Mock() def __call__(self): return self @@ -310,8 +311,8 @@ class EncryptedSyncTestCase( secret_id=secret_id) def make_app(self): - self.request_state = CouchServerState( - self._couch_url, 'shared', 'tokens', 'user-') + self.request_state = CouchServerState(self._couch_url, 'shared', + 'tokens') return self.make_app_with_state(self.request_state) def setUp(self): @@ -375,3 +376,173 @@ class EncryptedSyncTestCase( doc2 = doclist[0] # assert incoming doc is equal to the first sent doc self.assertEqual(doc1, doc2) + + def test_encrypted_sym_sync_with_unicode_passphrase(self): + """ + Test the complete syncing chain between two soledad dbs using a + Soledad server backed by a couch database, using an unicode + passphrase. + """ + self.startServer() + # instantiate soledad and create a document + sol1 = self._soledad_instance( + # token is verified in test_target.make_token_soledad_app + auth_token='auth-token', + passphrase=u'ãáàäéàëíìïóòöõúùüñç', + ) + _, doclist = sol1.get_all_docs() + self.assertEqual([], doclist) + doc1 = sol1.create_doc(json.loads(simple_doc)) + # sync with server + sol1._server_url = self.getURL() + sol1.sync() + # assert doc was sent to couch db + db = CouchDatabase( + self._couch_url, + # the name of the user database is "user-<uuid>". + 'user-user-uuid', + ) + _, doclist = db.get_all_docs() + self.assertEqual(1, len(doclist)) + couchdoc = doclist[0] + # assert document structure in couch server + self.assertEqual(doc1.doc_id, couchdoc.doc_id) + self.assertEqual(doc1.rev, couchdoc.rev) + self.assertEqual(6, len(couchdoc.content)) + self.assertTrue(target.ENC_JSON_KEY in couchdoc.content) + self.assertTrue(target.ENC_SCHEME_KEY in couchdoc.content) + self.assertTrue(target.ENC_METHOD_KEY in couchdoc.content) + self.assertTrue(target.ENC_IV_KEY in couchdoc.content) + self.assertTrue(target.MAC_KEY in couchdoc.content) + self.assertTrue(target.MAC_METHOD_KEY in couchdoc.content) + # instantiate soledad with empty db, but with same secrets path + sol2 = self._soledad_instance( + prefix='x', + auth_token='auth-token', + passphrase=u'ãáàäéàëíìïóòöõúùüñç', + ) + _, doclist = sol2.get_all_docs() + self.assertEqual([], doclist) + sol2._secrets_path = sol1.secrets_path + sol2._load_secrets() + sol2._set_secret_id(sol1._secret_id) + # sync the new instance + sol2._server_url = self.getURL() + sol2.sync() + _, doclist = sol2.get_all_docs() + self.assertEqual(1, len(doclist)) + doc2 = doclist[0] + # assert incoming doc is equal to the first sent doc + self.assertEqual(doc1, doc2) + + +class LockResourceTestCase( + CouchDBTestCase, TestCaseWithServer): + """ + Tests for use of PUT and DELETE on lock resource. + """ + + @staticmethod + def make_app_with_state(state): + return make_token_soledad_app(state) + + make_document_for_test = make_leap_document_for_test + + sync_target = token_leap_sync_target + + def setUp(self): + TestCaseWithServer.setUp(self) + CouchDBTestCase.setUp(self) + self.tempdir = tempfile.mkdtemp(prefix="leap_tests-") + self._couch_url = 'http://localhost:' + str(self.wrapper.port) + self._state = CouchServerState( + self._couch_url, 'shared', 'tokens') + + def tearDown(self): + CouchDBTestCase.tearDown(self) + TestCaseWithServer.tearDown(self) + + def test__try_obtain_filesystem_lock(self): + responder = mock.Mock() + lr = LockResource('uuid', self._state, responder) + self.assertFalse(lr._lock.locked) + self.assertTrue(lr._try_obtain_filesystem_lock()) + self.assertTrue(lr._lock.locked) + lr._try_release_filesystem_lock() + + def test__try_release_filesystem_lock(self): + responder = mock.Mock() + lr = LockResource('uuid', self._state, responder) + lr._try_obtain_filesystem_lock() + self.assertTrue(lr._lock.locked) + lr._try_release_filesystem_lock() + self.assertFalse(lr._lock.locked) + + def test_put(self): + responder = mock.Mock() + lr = LockResource('uuid', self._state, responder) + # lock! + lr.put({}, None) + # assert lock document was correctly written + lock_doc = lr._shared_db.get_doc('lock-uuid') + self.assertIsNotNone(lock_doc) + self.assertTrue(LockResource.TIMESTAMP_KEY in lock_doc.content) + self.assertTrue(LockResource.LOCK_TOKEN_KEY in lock_doc.content) + timestamp = lock_doc.content[LockResource.TIMESTAMP_KEY] + token = lock_doc.content[LockResource.LOCK_TOKEN_KEY] + self.assertTrue(timestamp < time.time()) + self.assertTrue(time.time() < timestamp + LockResource.TIMEOUT) + # assert response to user + responder.send_response_json.assert_called_with( + 201, token=token, + timeout=LockResource.TIMEOUT) + + def test_delete(self): + responder = mock.Mock() + lr = LockResource('uuid', self._state, responder) + # lock! + lr.put({}, None) + lock_doc = lr._shared_db.get_doc('lock-uuid') + token = lock_doc.content[LockResource.LOCK_TOKEN_KEY] + # unlock! + lr.delete({'token': token}, None) + self.assertFalse(lr._lock.locked) + self.assertIsNone(lr._shared_db.get_doc('lock-uuid')) + responder.send_response_json.assert_called_with(200) + + def test_put_while_locked_fails(self): + responder = mock.Mock() + lr = LockResource('uuid', self._state, responder) + # lock! + lr.put({}, None) + # try to lock again! + lr.put({}, None) + self.assertEqual( + len(responder.send_response_json.call_args), 2) + self.assertEqual( + responder.send_response_json.call_args[0], (403,)) + self.assertEqual( + len(responder.send_response_json.call_args[1]), 2) + self.assertTrue( + 'remaining' in responder.send_response_json.call_args[1]) + self.assertTrue( + responder.send_response_json.call_args[1]['remaining'] > 0) + + def test_unlock_unexisting_lock_fails(self): + responder = mock.Mock() + lr = LockResource('uuid', self._state, responder) + # unlock! + lr.delete({'token': 'anything'}, None) + responder.send_response_json.assert_called_with( + 404, error='lock not found') + + def test_unlock_with_wrong_token_fails(self): + responder = mock.Mock() + lr = LockResource('uuid', self._state, responder) + # lock! + lr.put({}, None) + # unlock! + lr.delete({'token': 'wrongtoken'}, None) + self.assertIsNotNone(lr._shared_db.get_doc('lock-uuid')) + responder.send_response_json.assert_called_with( + 401, error='unlock unauthorized') |