summaryrefslogtreecommitdiff
path: root/common/src/leap/soledad/common/tests/test_server.py
diff options
context:
space:
mode:
Diffstat (limited to 'common/src/leap/soledad/common/tests/test_server.py')
-rw-r--r--common/src/leap/soledad/common/tests/test_server.py189
1 files changed, 180 insertions, 9 deletions
diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py
index 1ea4d615..83df192b 100644
--- a/common/src/leap/soledad/common/tests/test_server.py
+++ b/common/src/leap/soledad/common/tests/test_server.py
@@ -24,6 +24,7 @@ import os
import tempfile
import simplejson as json
import mock
+import time
from leap.common.testing.basetest import BaseLeapTest
@@ -45,7 +46,7 @@ from leap.soledad.client import (
Soledad,
target,
)
-from leap.soledad.server import SoledadApp
+from leap.soledad.server import SoledadApp, LockResource
from leap.soledad.server.auth import URLToAuthorization
@@ -86,9 +87,8 @@ class ServerAuthorizationTestCase(BaseLeapTest):
/user-db/sync-from/{source} | GET, PUT, POST
"""
uuid = 'myuuid'
- authmap = URLToAuthorization(
- uuid, SoledadApp.SHARED_DB_NAME, SoledadApp.USER_DB_PREFIX)
- dbname = authmap._uuid_dbname(uuid)
+ authmap = URLToAuthorization(uuid,)
+ dbname = authmap._user_db_name
# test global auth
self.assertTrue(
authmap.is_authorized(self._make_environ('/', 'GET')))
@@ -202,8 +202,7 @@ class ServerAuthorizationTestCase(BaseLeapTest):
Test if authorization fails for a wrong dbname.
"""
uuid = 'myuuid'
- authmap = URLToAuthorization(
- uuid, SoledadApp.SHARED_DB_NAME, SoledadApp.USER_DB_PREFIX)
+ authmap = URLToAuthorization(uuid)
dbname = 'somedb'
# test wrong-db database resource auth
self.assertFalse(
@@ -273,7 +272,7 @@ class EncryptedSyncTestCase(
sync_target = token_leap_sync_target
- def _soledad_instance(self, user='user-uuid', passphrase='123',
+ def _soledad_instance(self, user='user-uuid', passphrase=u'123',
prefix='',
secrets_path=Soledad.STORAGE_SECRETS_FILE_NAME,
local_db_path='soledad.u1db', server_url='',
@@ -293,6 +292,8 @@ class EncryptedSyncTestCase(
get_doc = mock.Mock(return_value=None)
put_doc = mock.Mock(side_effect=_put_doc_side_effect)
+ lock = mock.Mock(return_value=('atoken', 300))
+ unlock = mock.Mock()
def __call__(self):
return self
@@ -310,8 +311,8 @@ class EncryptedSyncTestCase(
secret_id=secret_id)
def make_app(self):
- self.request_state = CouchServerState(
- self._couch_url, 'shared', 'tokens', 'user-')
+ self.request_state = CouchServerState(self._couch_url, 'shared',
+ 'tokens')
return self.make_app_with_state(self.request_state)
def setUp(self):
@@ -375,3 +376,173 @@ class EncryptedSyncTestCase(
doc2 = doclist[0]
# assert incoming doc is equal to the first sent doc
self.assertEqual(doc1, doc2)
+
+ def test_encrypted_sym_sync_with_unicode_passphrase(self):
+ """
+ Test the complete syncing chain between two soledad dbs using a
+ Soledad server backed by a couch database, using an unicode
+ passphrase.
+ """
+ self.startServer()
+ # instantiate soledad and create a document
+ sol1 = self._soledad_instance(
+ # token is verified in test_target.make_token_soledad_app
+ auth_token='auth-token',
+ passphrase=u'ãáàäéàëíìïóòöõúùüñç',
+ )
+ _, doclist = sol1.get_all_docs()
+ self.assertEqual([], doclist)
+ doc1 = sol1.create_doc(json.loads(simple_doc))
+ # sync with server
+ sol1._server_url = self.getURL()
+ sol1.sync()
+ # assert doc was sent to couch db
+ db = CouchDatabase(
+ self._couch_url,
+ # the name of the user database is "user-<uuid>".
+ 'user-user-uuid',
+ )
+ _, doclist = db.get_all_docs()
+ self.assertEqual(1, len(doclist))
+ couchdoc = doclist[0]
+ # assert document structure in couch server
+ self.assertEqual(doc1.doc_id, couchdoc.doc_id)
+ self.assertEqual(doc1.rev, couchdoc.rev)
+ self.assertEqual(6, len(couchdoc.content))
+ self.assertTrue(target.ENC_JSON_KEY in couchdoc.content)
+ self.assertTrue(target.ENC_SCHEME_KEY in couchdoc.content)
+ self.assertTrue(target.ENC_METHOD_KEY in couchdoc.content)
+ self.assertTrue(target.ENC_IV_KEY in couchdoc.content)
+ self.assertTrue(target.MAC_KEY in couchdoc.content)
+ self.assertTrue(target.MAC_METHOD_KEY in couchdoc.content)
+ # instantiate soledad with empty db, but with same secrets path
+ sol2 = self._soledad_instance(
+ prefix='x',
+ auth_token='auth-token',
+ passphrase=u'ãáàäéàëíìïóòöõúùüñç',
+ )
+ _, doclist = sol2.get_all_docs()
+ self.assertEqual([], doclist)
+ sol2._secrets_path = sol1.secrets_path
+ sol2._load_secrets()
+ sol2._set_secret_id(sol1._secret_id)
+ # sync the new instance
+ sol2._server_url = self.getURL()
+ sol2.sync()
+ _, doclist = sol2.get_all_docs()
+ self.assertEqual(1, len(doclist))
+ doc2 = doclist[0]
+ # assert incoming doc is equal to the first sent doc
+ self.assertEqual(doc1, doc2)
+
+
+class LockResourceTestCase(
+ CouchDBTestCase, TestCaseWithServer):
+ """
+ Tests for use of PUT and DELETE on lock resource.
+ """
+
+ @staticmethod
+ def make_app_with_state(state):
+ return make_token_soledad_app(state)
+
+ make_document_for_test = make_leap_document_for_test
+
+ sync_target = token_leap_sync_target
+
+ def setUp(self):
+ TestCaseWithServer.setUp(self)
+ CouchDBTestCase.setUp(self)
+ self.tempdir = tempfile.mkdtemp(prefix="leap_tests-")
+ self._couch_url = 'http://localhost:' + str(self.wrapper.port)
+ self._state = CouchServerState(
+ self._couch_url, 'shared', 'tokens')
+
+ def tearDown(self):
+ CouchDBTestCase.tearDown(self)
+ TestCaseWithServer.tearDown(self)
+
+ def test__try_obtain_filesystem_lock(self):
+ responder = mock.Mock()
+ lr = LockResource('uuid', self._state, responder)
+ self.assertFalse(lr._lock.locked)
+ self.assertTrue(lr._try_obtain_filesystem_lock())
+ self.assertTrue(lr._lock.locked)
+ lr._try_release_filesystem_lock()
+
+ def test__try_release_filesystem_lock(self):
+ responder = mock.Mock()
+ lr = LockResource('uuid', self._state, responder)
+ lr._try_obtain_filesystem_lock()
+ self.assertTrue(lr._lock.locked)
+ lr._try_release_filesystem_lock()
+ self.assertFalse(lr._lock.locked)
+
+ def test_put(self):
+ responder = mock.Mock()
+ lr = LockResource('uuid', self._state, responder)
+ # lock!
+ lr.put({}, None)
+ # assert lock document was correctly written
+ lock_doc = lr._shared_db.get_doc('lock-uuid')
+ self.assertIsNotNone(lock_doc)
+ self.assertTrue(LockResource.TIMESTAMP_KEY in lock_doc.content)
+ self.assertTrue(LockResource.LOCK_TOKEN_KEY in lock_doc.content)
+ timestamp = lock_doc.content[LockResource.TIMESTAMP_KEY]
+ token = lock_doc.content[LockResource.LOCK_TOKEN_KEY]
+ self.assertTrue(timestamp < time.time())
+ self.assertTrue(time.time() < timestamp + LockResource.TIMEOUT)
+ # assert response to user
+ responder.send_response_json.assert_called_with(
+ 201, token=token,
+ timeout=LockResource.TIMEOUT)
+
+ def test_delete(self):
+ responder = mock.Mock()
+ lr = LockResource('uuid', self._state, responder)
+ # lock!
+ lr.put({}, None)
+ lock_doc = lr._shared_db.get_doc('lock-uuid')
+ token = lock_doc.content[LockResource.LOCK_TOKEN_KEY]
+ # unlock!
+ lr.delete({'token': token}, None)
+ self.assertFalse(lr._lock.locked)
+ self.assertIsNone(lr._shared_db.get_doc('lock-uuid'))
+ responder.send_response_json.assert_called_with(200)
+
+ def test_put_while_locked_fails(self):
+ responder = mock.Mock()
+ lr = LockResource('uuid', self._state, responder)
+ # lock!
+ lr.put({}, None)
+ # try to lock again!
+ lr.put({}, None)
+ self.assertEqual(
+ len(responder.send_response_json.call_args), 2)
+ self.assertEqual(
+ responder.send_response_json.call_args[0], (403,))
+ self.assertEqual(
+ len(responder.send_response_json.call_args[1]), 2)
+ self.assertTrue(
+ 'remaining' in responder.send_response_json.call_args[1])
+ self.assertTrue(
+ responder.send_response_json.call_args[1]['remaining'] > 0)
+
+ def test_unlock_unexisting_lock_fails(self):
+ responder = mock.Mock()
+ lr = LockResource('uuid', self._state, responder)
+ # unlock!
+ lr.delete({'token': 'anything'}, None)
+ responder.send_response_json.assert_called_with(
+ 404, error='lock not found')
+
+ def test_unlock_with_wrong_token_fails(self):
+ responder = mock.Mock()
+ lr = LockResource('uuid', self._state, responder)
+ # lock!
+ lr.put({}, None)
+ # unlock!
+ lr.delete({'token': 'wrongtoken'}, None)
+ self.assertIsNotNone(lr._shared_db.get_doc('lock-uuid'))
+ responder.send_response_json.assert_called_with(
+ 401, error='unlock unauthorized')