summaryrefslogtreecommitdiff
path: root/tests/e2e/utils.py
blob: e4c9df7e7fdfc4fe26794510c9020cd78e57e641 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
import email
import json
import os
import pytest
import random
import time
import treq
import urllib

from string import ascii_lowercase
from subprocess import check_call

from twisted.internet import reactor
from twisted.internet.defer import returnValue
from twisted.web.client import Agent
from twisted.web.client import BrowserLikePolicyForHTTPS
from twisted.internet.ssl import Certificate
from twisted.cred.credentials import UsernamePassword

import pgpy
from pgpy.constants import (
    PubKeyAlgorithm,
    KeyFlags,
    HashAlgorithm,
    SymmetricKeyAlgorithm,
    CompressionAlgorithm
)

from bonafide import provider
from bonafide.session import Session

from leap.soledad.common.blobs import Flags


_provider = 'cdev.bitmask.net'

uri = "https://api.%s:4430/1/" % _provider
ca = "https://%s/ca.crt" % _provider


random.seed()


#
# session management: user creation and authentication
#

def _get_invite_code():
    invite = os.environ.get('INVITE_CODE')
    if not invite:
        raise Exception('The INVITE_CODE environment variable is empty, but '
                        'we need it set to interact with the provider.')
    return invite


@pytest.inlineCallbacks
def _get_ca_file(tmpdir):
    response = yield treq.get(ca)
    pemdata = yield response.text()
    fname = os.path.join(tmpdir.strpath, 'cacert.pem')
    with open(fname, 'w') as f:
        f.write(pemdata)
    returnValue(fname)


@pytest.inlineCallbacks
def get_session(tmpdir):
    # setup user params
    invite = _get_invite_code()
    username = ''.join(random.choice(ascii_lowercase) for i in range(20))
    # users starting with "test_user" get removed by cron on a regular basis
    username = 'tmp_user_e2e_' + username
    passphrase = ''.join(random.choice(ascii_lowercase) for i in range(20))

    # create user and login
    credentials = UsernamePassword(username, passphrase)
    api = provider.Api('https://api.%s:4430' % _provider)
    cdev_pem = yield _get_ca_file(tmpdir)
    session = Session(credentials, api, cdev_pem)
    print("creating user")
    yield session.signup(username, passphrase, invite=invite)
    print("logging in")
    yield session.authenticate()
    returnValue(session)


#
# OpenPGP key creation and upload
#

def gen_key(username):
    print("generating OpenPGP key pair")
    key = pgpy.PGPKey.new(PubKeyAlgorithm.RSAEncryptOrSign, 4096)
    uid = pgpy.PGPUID.new(username, email='%s@%s' % (username, _provider))
    key.add_uid(
        uid,
        usage={KeyFlags.EncryptCommunications},
        hashes=[HashAlgorithm.SHA512],
        ciphers=[SymmetricKeyAlgorithm.AES256],
        compression=[CompressionAlgorithm.Uncompressed]
    )
    return key


@pytest.inlineCallbacks
def _get_http_client():
    response = yield treq.get(ca)
    pemdata = yield response.text()
    cert = Certificate.loadPEM(pemdata)
    policy = BrowserLikePolicyForHTTPS(trustRoot=cert)
    agent = Agent(reactor, contextFactory=policy)
    client = treq.client.HTTPClient(agent)
    returnValue(client)


@pytest.inlineCallbacks
def put_key(uuid, token, data):
    print("uploading public key to server")
    client = yield _get_http_client()
    headers = {
        'Authorization': [str('Token token=%s' % token)],
        'Content-Type': ['application/x-www-form-urlencoded'],
    }
    data = str(urllib.urlencode({'user[public_key]': data}))
    response = yield client.put(
        '%s/users/%s.json' % (uri, uuid),
        headers=headers,
        data=data)
    assert response.code == 204


#
# mail sending
#

def send_email(username):
    address = "%s@%s" % (username, _provider)
    print("sending email to %s" % address)
    secret = ''.join(random.choice(ascii_lowercase) for i in range(20))
    cmd = [
        'swaks',
        '--silent', '2',
        '--helo', 'ci.leap.se',
        '-f', 'ci@leap.se',
        '-t', address,
        '-h-Subject', 'e2e test token',
        '--body', secret,
        '-tlsc'
    ]
    check_call(cmd)
    return secret


#
# incoming message retrieval
#

@pytest.inlineCallbacks
def get_incoming_fd(client):
    pending = []
    attempts = 1
    while not pending:
        print("attempting to fetch incoming blob (%d/10)" % attempts)
        pending = yield client.blobmanager.remote_list(
            namespace='MX', filter_flags=Flags.PENDING)
        if not pending and attempts == 10:
            raise Exception("Timed out waiting for message to get delivered.")
        attempts += 1
        time.sleep(1)
    assert len(pending) == 1
    fd = yield client.blobmanager.get(pending.pop(), namespace='MX')
    returnValue(fd)


def get_received_secret(key, fd):
    print("decoding incoming blob to get the secret")
    encrypted = pgpy.PGPMessage.from_blob(fd.read())
    decrypted = key.decrypt(encrypted)
    doc_content = json.loads(decrypted.message)
    content = doc_content['content']
    email_message = email.message_from_string(content)
    received_secret = email_message.get_payload().strip()
    return received_secret