diff options
Diffstat (limited to 'src/leap/soledad/tests')
-rw-r--r-- | src/leap/soledad/tests/test_leap_backend.py | 62 |
1 files changed, 56 insertions, 6 deletions
diff --git a/src/leap/soledad/tests/test_leap_backend.py b/src/leap/soledad/tests/test_leap_backend.py index 893090d7..ffb242fe 100644 --- a/src/leap/soledad/tests/test_leap_backend.py +++ b/src/leap/soledad/tests/test_leap_backend.py @@ -28,6 +28,9 @@ except ImportError: import cStringIO +from u1db.remote import http_client + + from leap.soledad.backends import leap_backend from leap.soledad.server import ( SoledadApp, @@ -142,15 +145,18 @@ class TestLeapClientBase(test_http_client.TestHTTPClientBase): This class should be used to test Token auth. """ - def getClient(self, **kwds): + def getClientWithToken(self, **kwds): self.startServer() - client = http_client.HTTPClientBase(self.getURL('dbase'), **kwds) - client.set_token_credentials = auth.set_token_credentials - def _sign_request(method, url_query, params): - return auth._sign_request(http_db, method, url_query, params) + class _HTTPClientWithToken(http_client.HTTPClientBase): + + def set_token_credentials(self, uuid, token): + auth.set_token_credentials(self, uuid, token) - client._sign_request = _sign_request + def _sign_request(self, method, url_query, params): + return auth._sign_request(self, method, url_query, params) + + return _HTTPClientWithToken(self.getURL('dbase'), **kwds) def test_oauth(self): """ @@ -170,6 +176,50 @@ class TestLeapClientBase(test_http_client.TestHTTPClientBase): """ pass + def app(self, environ, start_response): + res = test_http_client.TestHTTPClientBase.app(self, environ, start_response) + if res is not None: + return res + # mime solead application here. + if '/token' in environ['PATH_INFO']: + auth = environ.get(SoledadAuthMiddleware.HTTP_AUTH_KEY) + if not auth: + start_response("401 Unauthorized", + [('Content-Type', 'application/json')]) + return [json.dumps({"error": "unauthorized", + "message": e.message})] + scheme, encoded = auth.split(None, 1) + if scheme.lower() != 'token': + start_response("401 Unauthorized", + [('Content-Type', 'application/json')]) + return [json.dumps({"error": "unauthorized", + "message": e.message})] + uuid, token = encoded.decode('base64').split(':', 1) + if uuid != 'user-uuid' and token != 'auth-token': + return unauth_err("Incorrect address or token.") + start_response("200 OK", [('Content-Type', 'application/json')]) + return [json.dumps([environ['PATH_INFO'], uuid, token])] + + def test_token(self): + """ + Test if token is sent correctly. + """ + cli = self.getClientWithToken() + cli.set_token_credentials('user-uuid', 'auth-token') + res, headers = cli._request('GET', ['doc', 'token']) + self.assertEqual( + ['/dbase/doc/token', 'user-uuid', 'auth-token'], json.loads(res)) + + def test_token_ctr_creds(self): + cli = self.getClientWithToken(creds={'token': { + 'uuid': 'user-uuid', + 'token': 'auth-token', + }}) + res, headers = cli._request('GET', ['doc', 'token']) + self.assertEqual( + ['/dbase/doc/token', 'user-uuid', 'auth-token'], json.loads(res)) + + #----------------------------------------------------------------------------- # The following tests come from `u1db.tests.test_document`. |