summaryrefslogtreecommitdiff
path: root/tests/e2e/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/e2e/utils.py')
-rw-r--r--tests/e2e/utils.py178
1 files changed, 178 insertions, 0 deletions
diff --git a/tests/e2e/utils.py b/tests/e2e/utils.py
new file mode 100644
index 00000000..3fca28b8
--- /dev/null
+++ b/tests/e2e/utils.py
@@ -0,0 +1,178 @@
+import email
+import json
+import os
+import pytest
+import random
+import time
+import treq
+import urllib
+
+from string import ascii_lowercase
+from email.mime.text import MIMEText
+from subprocess import Popen, PIPE
+
+from twisted.internet import reactor
+from twisted.internet.defer import returnValue
+from twisted.web.client import Agent
+from twisted.web.client import BrowserLikePolicyForHTTPS
+from twisted.internet.ssl import Certificate
+from twisted.cred.credentials import UsernamePassword
+
+import pgpy
+from pgpy.constants import (
+ PubKeyAlgorithm,
+ KeyFlags,
+ HashAlgorithm,
+ SymmetricKeyAlgorithm,
+ CompressionAlgorithm
+)
+
+from bonafide import provider
+from bonafide.session import Session
+
+from leap.soledad.common.blobs import Flags
+
+
+_provider = 'cdev.bitmask.net'
+
+uri = "https://api.%s:4430/1/" % _provider
+ca = "https://%s/ca.crt" % _provider
+
+
+random.seed()
+
+
+#
+# session management: user creation and authentication
+#
+
+def _get_invite_code():
+ invite = os.environ.get('INVITE_CODE')
+ if not invite:
+ raise Exception('The INVITE_CODE environment variable is empty, but '
+ 'we need it set to interact with the provider.')
+ return invite
+
+
+@pytest.inlineCallbacks
+def _get_ca_file(tmpdir):
+ response = yield treq.get(ca)
+ pemdata = yield response.text()
+ fname = os.path.join(tmpdir.strpath, 'cacert.pem')
+ with open(fname, 'w') as f:
+ f.write(pemdata)
+ returnValue(fname)
+
+
+@pytest.inlineCallbacks
+def get_session(tmpdir):
+ # setup user params
+ invite = _get_invite_code()
+ username = ''.join(random.choice(ascii_lowercase) for i in range(20))
+ # users starting with "test_user" get removed by cron on a regular basis
+ username = 'tmp_user_e2e_' + username
+ passphrase = ''.join(random.choice(ascii_lowercase) for i in range(20))
+
+ # create user and login
+ credentials = UsernamePassword(username, passphrase)
+ api = provider.Api('https://api.%s:4430' % _provider)
+ cdev_pem = yield _get_ca_file(tmpdir)
+ session = Session(credentials, api, cdev_pem)
+ print("creating user")
+ yield session.signup(username, passphrase, invite=invite)
+ print("logging in")
+ yield session.authenticate()
+ returnValue(session)
+
+
+#
+# OpenPGP key creation and upload
+#
+
+def gen_key(username):
+ print("generating OpenPGP key pair")
+ key = pgpy.PGPKey.new(PubKeyAlgorithm.RSAEncryptOrSign, 4096)
+ uid = pgpy.PGPUID.new(username, email='%s@%s' % (username, _provider))
+ key.add_uid(
+ uid,
+ usage={KeyFlags.EncryptCommunications},
+ hashes=[HashAlgorithm.SHA512],
+ ciphers=[SymmetricKeyAlgorithm.AES256],
+ compression=[CompressionAlgorithm.Uncompressed]
+ )
+ return key
+
+
+@pytest.inlineCallbacks
+def _get_http_client():
+ response = yield treq.get(ca)
+ pemdata = yield response.text()
+ cert = Certificate.loadPEM(pemdata)
+ policy = BrowserLikePolicyForHTTPS(trustRoot=cert)
+ agent = Agent(reactor, contextFactory=policy)
+ client = treq.client.HTTPClient(agent)
+ returnValue(client)
+
+
+@pytest.inlineCallbacks
+def put_key(uuid, token, data):
+ print("uploading public key to server")
+ client = yield _get_http_client()
+ headers = {
+ 'Authorization': [str('Token token=%s' % token)],
+ 'Content-Type': ['application/x-www-form-urlencoded'],
+ }
+ data = str(urllib.urlencode({'user[public_key]': data}))
+ response = yield client.put(
+ '%s/users/%s.json' % (uri, uuid),
+ headers=headers,
+ data=data)
+ assert response.code == 204
+
+
+#
+# mail sending
+#
+
+def send_email(username):
+ address = "%s@%s" % (username, _provider)
+ print("sending email to %s" % address)
+ secret = ''.join(random.choice(ascii_lowercase) for i in range(20))
+ msg = MIMEText(secret)
+ msg["To"] = address
+ msg["Subject"] = "e2e test token"
+ p = Popen(["/usr/sbin/sendmail", "-t"], stdin=PIPE)
+ p.communicate(msg.as_string())
+ return secret
+
+
+#
+# incoming message retrieval
+#
+
+@pytest.inlineCallbacks
+def get_incoming_fd(client):
+ pending = []
+ attempts = 1
+ while not pending:
+ print("attempting to fetch incoming blob (%d/10)" % attempts)
+ pending = yield client.blobmanager.remote_list(
+ namespace='MX', filter_flags=Flags.PENDING)
+ if not pending and attempts == 10:
+ raise Exception("Timed out waiting for message to get delivered.")
+ attempts += 1
+ time.sleep(1)
+ assert len(pending) == 1
+ fd = yield client.blobmanager.get(pending.pop(), namespace='MX')
+ returnValue(fd)
+
+
+def get_received_secret(key, fd):
+ print("decoding incoming blob to get the secret")
+ encrypted = pgpy.PGPMessage.from_blob(fd.read())
+ decrypted = key.decrypt(encrypted)
+ doc_content = json.loads(decrypted.message)
+ content = doc_content['content']
+ email_message = email.message_from_string(content)
+ received_secret = email_message.get_payload().strip()
+ return received_secret