summaryrefslogtreecommitdiff
path: root/tests/e2e/bonafide/session.py
diff options
context:
space:
mode:
authordrebs <drebs@riseup.net>2017-09-16 08:58:34 -0300
committerdrebs <drebs@riseup.net>2017-09-20 18:55:59 -0300
commit8110ef8687b155df7b24a6c083404f9624e6a160 (patch)
tree40f32d15c1fdd4eb5dbc1df553c6f094a6ec40ac /tests/e2e/bonafide/session.py
parent5518364b969f764e06eec34563ae804413253107 (diff)
[test] add e2e test for incoming mail pipeline
I had to include part of the bonafide source code because it was the easiest way to interact with the webapp. Closes: #8941
Diffstat (limited to 'tests/e2e/bonafide/session.py')
-rw-r--r--tests/e2e/bonafide/session.py248
1 files changed, 248 insertions, 0 deletions
diff --git a/tests/e2e/bonafide/session.py b/tests/e2e/bonafide/session.py
new file mode 100644
index 00000000..e4129609
--- /dev/null
+++ b/tests/e2e/bonafide/session.py
@@ -0,0 +1,248 @@
+# -*- coding: utf-8 -*-
+# session.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+LEAP Session management.
+"""
+from twisted.internet import defer, reactor
+from twisted.logger import Logger
+
+from . import _srp
+from . import provider
+from ._http import httpRequest, cookieAgentFactory
+
+
+OK = 'ok'
+
+
+def _auth_required(func):
+ """
+ Decorate a method so that it will not be called if the instance
+ attribute `is_authenticated` does not evaluate to True.
+ """
+ def decorated(*args, **kwargs):
+ instance = args[0]
+ allowed = getattr(instance, 'is_authenticated')
+ if not allowed:
+ raise RuntimeError('This method requires authentication')
+ return func(*args, **kwargs)
+ return decorated
+
+
+class Session(object):
+
+ log = Logger()
+
+ def __init__(self, credentials, api, provider_cert):
+ # TODO check if an anonymous credentials is passed.
+ # TODO move provider_cert to api object.
+ # On creation, it should be able to retrieve all the info it needs
+ # (calling bootstrap).
+ # TODO could get a "provider" object instead.
+ # this provider can have an api attribute,
+ # and a "autoconfig" attribute passed on initialization.
+ # TODO get a file-descriptor for password if not in credentials
+ # TODO merge self._request with config.Provider._http_request ?
+
+ self.username = credentials.username
+ self.password = credentials.password
+ self._provider_cert = provider_cert
+ self._api = api
+ self._initialize_session()
+
+ def _initialize_session(self):
+ self._agent = cookieAgentFactory(self._provider_cert)
+ username = self.username or ''
+ password = self.password or ''
+ self._srp_auth = _srp.SRPAuthMechanism(username, password)
+ self._srp_signup = _srp.SRPSignupMechanism()
+ self._srp_password = _srp.SRPPasswordChangeMechanism()
+ self._srp_recovery_code = _srp.SRPRecoveryCodeUpdateMechanism()
+ self._token = None
+ self._uuid = None
+
+ # Session
+
+ @property
+ def token(self):
+ return self._token
+
+ @property
+ def uuid(self):
+ return self._uuid
+
+ @property
+ def is_authenticated(self):
+ return self._srp_auth.srp_user.authenticated()
+
+ @defer.inlineCallbacks
+ def authenticate(self):
+ uri = self._api.get_handshake_uri()
+ met = self._api.get_handshake_method()
+ self.log.debug('%s to %s' % (met, uri))
+ params = self._srp_auth.get_handshake_params()
+
+ handshake = yield self._request(self._agent, uri, values=params,
+ method=met)
+
+ self._srp_auth.process_handshake(handshake)
+ uri = self._api.get_authenticate_uri(login=self.username)
+ met = self._api.get_authenticate_method()
+
+ self.log.debug('%s to %s' % (met, uri))
+ params = self._srp_auth.get_authentication_params()
+
+ auth = yield self._request(self._agent, uri, values=params,
+ method=met)
+
+ uuid, token = self._srp_auth.process_authentication(auth)
+ self._srp_auth.verify_authentication()
+
+ self._uuid = uuid
+ self._token = token
+ defer.returnValue(OK)
+
+ @_auth_required
+ @defer.inlineCallbacks
+ def logout(self):
+ uri = self._api.get_logout_uri()
+ met = self._api.get_logout_method()
+ yield self._request(self._agent, uri, method=met)
+ self.username = None
+ self.password = None
+ self._initialize_session()
+ defer.returnValue(OK)
+
+ @_auth_required
+ @defer.inlineCallbacks
+ def change_password(self, password):
+ uri = self._api.get_update_user_uri(uid=self._uuid)
+ met = self._api.get_update_user_method()
+ params = self._srp_password.get_password_params(
+ self.username, password)
+ yield self._request(self._agent, uri, values=params, method=met)
+ self.password = password
+ self._srp_auth = _srp.SRPAuthMechanism(self.username, password)
+ defer.returnValue(OK)
+
+ @_auth_required
+ @defer.inlineCallbacks
+ def update_recovery_code(self, recovery_code):
+ uri = self._api.get_update_user_uri(uid=self._uuid)
+ met = self._api.get_update_user_method()
+ params = self._srp_recovery_code.get_recovery_code_params(
+ self.username, recovery_code)
+ update = yield self._request(self._agent, uri, values=params,
+ method=met)
+ defer.returnValue(update)
+
+ # User certificates
+
+ def get_vpn_cert(self):
+ # TODO pass it to the provider object so that it can save it in the
+ # right path.
+ uri = self._api.get_vpn_cert_uri()
+ met = self._api.get_vpn_cert_method()
+ return self._request(self._agent, uri, method=met)
+
+ @_auth_required
+ def get_smtp_cert(self):
+ # TODO pass it to the provider object so that it can save it in the
+ # right path.
+ uri = self._api.get_smtp_cert_uri()
+ met = self._api.get_smtp_cert_method()
+ print met, "to", uri
+ return self._request(self._agent, uri, method=met)
+
+ def _request(self, *args, **kw):
+ # TODO: we are not pinning the TLS cert of the API
+ # maybe we can use leap.common.http
+ kw['token'] = self._token
+ return httpRequest(*args, **kw)
+
+ # User management
+
+ @defer.inlineCallbacks
+ def signup(self, username, password, invite=None):
+ # XXX should check that it_IS_NOT_authenticated
+ provider.validate_username(username)
+ uri = self._api.get_signup_uri()
+ met = self._api.get_signup_method()
+ params = self._srp_signup.get_signup_params(
+ username, password, invite)
+
+ signup = yield self._request(self._agent, uri, values=params,
+ method=met)
+ registered_user = self._srp_signup.process_signup(signup)
+ self.username = username
+ self.password = password
+ defer.returnValue((OK, registered_user))
+
+ @_auth_required
+ def update_user_record(self):
+ # FIXME to be implemented
+ pass
+
+ # Authentication-protected configuration
+
+ @defer.inlineCallbacks
+ def fetch_provider_configs(self, uri, path):
+ config = yield self._request(self._agent, uri)
+ with open(path, 'w') as cf:
+ cf.write(config)
+ defer.returnValue('ok')
+
+
+if __name__ == "__main__":
+ import os
+ import sys
+ from twisted.cred.credentials import UsernamePassword
+
+ log = Logger()
+
+ if len(sys.argv) != 4:
+ print "Usage:", sys.argv[0], "provider", "username", "password"
+ sys.exit()
+ _provider, username, password = sys.argv[1], sys.argv[2], sys.argv[3]
+ api = provider.Api('https://api.%s:4430' % _provider)
+ credentials = UsernamePassword(username, password)
+ cdev_pem = os.path.expanduser(
+ '~/.config/leap/providers/%s/keys/ca/cacert.pem' % _provider)
+ session = Session(credentials, api, cdev_pem)
+
+ def print_result(result):
+ print result
+ return result
+
+ def cbShutDown(ignored):
+ reactor.stop()
+
+ def auth_eb(failure):
+ log.error(failure)
+
+ d = session.authenticate()
+ d.addCallback(print_result)
+ d.addErrback(auth_eb)
+
+ d.addCallback(lambda _: session.get_smtp_cert())
+ # d.addCallback(lambda _: session.get_vpn_cert())
+ d.addCallback(print_result)
+ d.addErrback(auth_eb)
+
+ d.addCallback(lambda _: session.logout())
+ d.addErrback(auth_eb)
+ d.addBoth(cbShutDown)
+ reactor.run()