summaryrefslogtreecommitdiff
path: root/common/src/leap/soledad/common/tests/u1db_tests/test_https.py
blob: f22ce51e9523a8955ecca78d8648528a6bdd86a0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
"""Test support for client-side https support."""

import os
import ssl
import sys

from paste import httpserver
from unittest import skip

from u1db.remote import (
    http_client,
    http_target,
)

from leap import soledad
from leap.soledad.common.tests import u1db_tests as tests
from leap.soledad.common.tests.u1db_tests.test_remote_sync_target import (
    make_oauth_http_app,
)


def https_server_def():
    def make_server(host_port, application):
        from OpenSSL import SSL
        cert_file = os.path.join(os.path.dirname(__file__), 'testing-certs',
                                 'testing.cert')
        key_file = os.path.join(os.path.dirname(__file__), 'testing-certs',
                                'testing.key')
        ssl_context = SSL.Context(SSL.SSLv23_METHOD)
        ssl_context.use_privatekey_file(key_file)
        ssl_context.use_certificate_chain_file(cert_file)
        srv = httpserver.WSGIServerBase(application, host_port,
                                        httpserver.WSGIHandler,
                                        ssl_context=ssl_context
                                        )

        def shutdown_request(req):
            req.shutdown()
            srv.close_request(req)

        srv.shutdown_request = shutdown_request
        application.base_url = "https://localhost:%s" % srv.server_address[1]
        return srv
    return make_server, "shutdown", "https"


def oauth_https_sync_target(test, host, path):
    _, port = test.server.server_address
    st = http_target.HTTPSyncTarget('https://%s:%d/~/%s' % (host, port, path))
    st.set_oauth_credentials(tests.consumer1.key, tests.consumer1.secret,
                             tests.token1.key, tests.token1.secret)
    return st


@skip("Skiping tests imported from U1DB.")
class TestHttpSyncTargetHttpsSupport(tests.TestCaseWithServer):

    scenarios = [
        ('oauth_https', {'server_def': https_server_def,
                         'make_app_with_state': make_oauth_http_app,
                         'make_document_for_test':
                         tests.make_document_for_test,
                         'sync_target': oauth_https_sync_target
                         }),
    ]

    def setUp(self):
        try:
            import OpenSSL  # noqa
        except ImportError:
            self.skipTest("Requires pyOpenSSL")
        self.cacert_pem = os.path.join(os.path.dirname(__file__),
                                       'testing-certs', 'cacert.pem')
        # The default u1db http_client class for doing HTTPS only does HTTPS
        # if the platform is linux. Because of this, soledad replaces that
        # class with one that will do HTTPS independent of the platform. In
        # order to maintain the compatibility with u1db default tests, we undo
        # that replacement here.
        http_client._VerifiedHTTPSConnection = \
            soledad.client.api.old__VerifiedHTTPSConnection
        super(TestHttpSyncTargetHttpsSupport, self).setUp()

    def getSyncTarget(self, host, path=None, cert_file=None):
        if self.server is None:
            self.startServer()
        return self.sync_target(self, host, path, cert_file=cert_file)

    def test_working(self):
        self.startServer()
        db = self.request_state._create_database('test')
        self.patch(http_client, 'CA_CERTS', self.cacert_pem)
        remote_target = self.getSyncTarget('localhost', 'test')
        remote_target.record_sync_info('other-id', 2, 'T-id')
        self.assertEqual(
            (2, 'T-id'), db._get_replica_gen_and_trans_id('other-id'))

    def test_cannot_verify_cert(self):
        if not sys.platform.startswith('linux'):
            self.skipTest(
                "XXX certificate verification happens on linux only for now")
        self.startServer()
        # don't print expected traceback server-side
        self.server.handle_error = lambda req, cli_addr: None
        self.request_state._create_database('test')
        remote_target = self.getSyncTarget('localhost', 'test')
        try:
            remote_target.record_sync_info('other-id', 2, 'T-id')
        except ssl.SSLError, e:
            self.assertIn("certificate verify failed", str(e))
        else:
            self.fail("certificate verification should have failed.")

    def test_host_mismatch(self):
        if not sys.platform.startswith('linux'):
            self.skipTest(
                "XXX certificate verification happens on linux only for now")
        self.startServer()
        self.request_state._create_database('test')
        self.patch(http_client, 'CA_CERTS', self.cacert_pem)
        remote_target = self.getSyncTarget('127.0.0.1', 'test')
        self.assertRaises(
            http_client.CertificateError, remote_target.record_sync_info,
            'other-id', 2, 'T-id')


load_tests = tests.load_with_scenarios