import email import json import os import pytest import random import time import treq import urllib from string import ascii_lowercase from email.mime.text import MIMEText from subprocess import Popen, PIPE from twisted.internet import reactor from twisted.internet.defer import returnValue from twisted.web.client import Agent from twisted.web.client import BrowserLikePolicyForHTTPS from twisted.internet.ssl import Certificate from twisted.cred.credentials import UsernamePassword import pgpy from pgpy.constants import ( PubKeyAlgorithm, KeyFlags, HashAlgorithm, SymmetricKeyAlgorithm, CompressionAlgorithm ) from bonafide import provider from bonafide.session import Session from leap.soledad.common.blobs import Flags _provider = 'cdev.bitmask.net' uri = "https://api.%s:4430/1/" % _provider ca = "https://%s/ca.crt" % _provider random.seed() # # session management: user creation and authentication # def _get_invite_code(): invite = os.environ.get('INVITE_CODE') if not invite: raise Exception('The INVITE_CODE environment variable is empty, but ' 'we need it set to interact with the provider.') return invite @pytest.inlineCallbacks def _get_ca_file(tmpdir): response = yield treq.get(ca) pemdata = yield response.text() fname = os.path.join(tmpdir.strpath, 'cacert.pem') with open(fname, 'w') as f: f.write(pemdata) returnValue(fname) @pytest.inlineCallbacks def get_session(tmpdir): # setup user params invite = _get_invite_code() username = ''.join(random.choice(ascii_lowercase) for i in range(20)) # users starting with "test_user" get removed by cron on a regular basis username = 'tmp_user_e2e_' + username passphrase = ''.join(random.choice(ascii_lowercase) for i in range(20)) # create user and login credentials = UsernamePassword(username, passphrase) api = provider.Api('https://api.%s:4430' % _provider) cdev_pem = yield _get_ca_file(tmpdir) session = Session(credentials, api, cdev_pem) print("creating user") yield session.signup(username, passphrase, invite=invite) print("logging in") yield session.authenticate() returnValue(session) # # OpenPGP key creation and upload # def gen_key(username): print("generating OpenPGP key pair") key = pgpy.PGPKey.new(PubKeyAlgorithm.RSAEncryptOrSign, 4096) uid = pgpy.PGPUID.new(username, email='%s@%s' % (username, _provider)) key.add_uid( uid, usage={KeyFlags.EncryptCommunications}, hashes=[HashAlgorithm.SHA512], ciphers=[SymmetricKeyAlgorithm.AES256], compression=[CompressionAlgorithm.Uncompressed] ) return key @pytest.inlineCallbacks def _get_http_client(): response = yield treq.get(ca) pemdata = yield response.text() cert = Certificate.loadPEM(pemdata) policy = BrowserLikePolicyForHTTPS(trustRoot=cert) agent = Agent(reactor, contextFactory=policy) client = treq.client.HTTPClient(agent) returnValue(client) @pytest.inlineCallbacks def put_key(uuid, token, data): print("uploading public key to server") client = yield _get_http_client() headers = { 'Authorization': [str('Token token=%s' % token)], 'Content-Type': ['application/x-www-form-urlencoded'], } data = str(urllib.urlencode({'user[public_key]': data})) response = yield client.put( '%s/users/%s.json' % (uri, uuid), headers=headers, data=data) assert response.code == 204 # # mail sending # def send_email(username): address = "%s@%s" % (username, _provider) print("sending email to %s" % address) secret = ''.join(random.choice(ascii_lowercase) for i in range(20)) msg = MIMEText(secret) msg["To"] = address msg["Subject"] = "e2e test token" p = Popen(["/usr/sbin/sendmail", "-t"], stdin=PIPE) p.communicate(msg.as_string()) return secret # # incoming message retrieval # @pytest.inlineCallbacks def get_incoming_fd(client): pending = [] attempts = 1 while not pending: print("attempting to fetch incoming blob (%d/10)" % attempts) pending = yield client.blobmanager.remote_list( namespace='MX', filter_flags=Flags.PENDING) if not pending and attempts == 10: raise Exception("Timed out waiting for message to get delivered.") attempts += 1 time.sleep(1) assert len(pending) == 1 fd = yield client.blobmanager.get(pending.pop(), namespace='MX') returnValue(fd) def get_received_secret(key, fd): print("decoding incoming blob to get the secret") encrypted = pgpy.PGPMessage.from_blob(fd.read()) decrypted = key.decrypt(encrypted) doc_content = json.loads(decrypted.message) content = doc_content['content'] email_message = email.message_from_string(content) received_secret = email_message.get_payload().strip() return received_secret