summaryrefslogtreecommitdiff
path: root/src/leap/soledad
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/soledad')
-rw-r--r--src/leap/soledad/tests/test_leap_backend.py62
1 files changed, 56 insertions, 6 deletions
diff --git a/src/leap/soledad/tests/test_leap_backend.py b/src/leap/soledad/tests/test_leap_backend.py
index 893090d7..ffb242fe 100644
--- a/src/leap/soledad/tests/test_leap_backend.py
+++ b/src/leap/soledad/tests/test_leap_backend.py
@@ -28,6 +28,9 @@ except ImportError:
import cStringIO
+from u1db.remote import http_client
+
+
from leap.soledad.backends import leap_backend
from leap.soledad.server import (
SoledadApp,
@@ -142,15 +145,18 @@ class TestLeapClientBase(test_http_client.TestHTTPClientBase):
This class should be used to test Token auth.
"""
- def getClient(self, **kwds):
+ def getClientWithToken(self, **kwds):
self.startServer()
- client = http_client.HTTPClientBase(self.getURL('dbase'), **kwds)
- client.set_token_credentials = auth.set_token_credentials
- def _sign_request(method, url_query, params):
- return auth._sign_request(http_db, method, url_query, params)
+ class _HTTPClientWithToken(http_client.HTTPClientBase):
+
+ def set_token_credentials(self, uuid, token):
+ auth.set_token_credentials(self, uuid, token)
- client._sign_request = _sign_request
+ def _sign_request(self, method, url_query, params):
+ return auth._sign_request(self, method, url_query, params)
+
+ return _HTTPClientWithToken(self.getURL('dbase'), **kwds)
def test_oauth(self):
"""
@@ -170,6 +176,50 @@ class TestLeapClientBase(test_http_client.TestHTTPClientBase):
"""
pass
+ def app(self, environ, start_response):
+ res = test_http_client.TestHTTPClientBase.app(self, environ, start_response)
+ if res is not None:
+ return res
+ # mime solead application here.
+ if '/token' in environ['PATH_INFO']:
+ auth = environ.get(SoledadAuthMiddleware.HTTP_AUTH_KEY)
+ if not auth:
+ start_response("401 Unauthorized",
+ [('Content-Type', 'application/json')])
+ return [json.dumps({"error": "unauthorized",
+ "message": e.message})]
+ scheme, encoded = auth.split(None, 1)
+ if scheme.lower() != 'token':
+ start_response("401 Unauthorized",
+ [('Content-Type', 'application/json')])
+ return [json.dumps({"error": "unauthorized",
+ "message": e.message})]
+ uuid, token = encoded.decode('base64').split(':', 1)
+ if uuid != 'user-uuid' and token != 'auth-token':
+ return unauth_err("Incorrect address or token.")
+ start_response("200 OK", [('Content-Type', 'application/json')])
+ return [json.dumps([environ['PATH_INFO'], uuid, token])]
+
+ def test_token(self):
+ """
+ Test if token is sent correctly.
+ """
+ cli = self.getClientWithToken()
+ cli.set_token_credentials('user-uuid', 'auth-token')
+ res, headers = cli._request('GET', ['doc', 'token'])
+ self.assertEqual(
+ ['/dbase/doc/token', 'user-uuid', 'auth-token'], json.loads(res))
+
+ def test_token_ctr_creds(self):
+ cli = self.getClientWithToken(creds={'token': {
+ 'uuid': 'user-uuid',
+ 'token': 'auth-token',
+ }})
+ res, headers = cli._request('GET', ['doc', 'token'])
+ self.assertEqual(
+ ['/dbase/doc/token', 'user-uuid', 'auth-token'], json.loads(res))
+
+
#-----------------------------------------------------------------------------
# The following tests come from `u1db.tests.test_document`.