diff options
Diffstat (limited to 'common/src/leap/soledad/common/tests/u1db_tests/test_https.py')
-rw-r--r-- | common/src/leap/soledad/common/tests/u1db_tests/test_https.py | 196 |
1 files changed, 94 insertions, 102 deletions
diff --git a/common/src/leap/soledad/common/tests/u1db_tests/test_https.py b/common/src/leap/soledad/common/tests/u1db_tests/test_https.py index e177a808..8a5743e7 100644 --- a/common/src/leap/soledad/common/tests/u1db_tests/test_https.py +++ b/common/src/leap/soledad/common/tests/u1db_tests/test_https.py @@ -4,119 +4,111 @@ import os import ssl import sys -from paste import httpserver +#from paste import httpserver from unittest import skip -from u1db.remote import http_client -from u1db.remote import http_target +from leap.soledad.common.l2db.remote import http_client +#from leap.soledad.common.l2db.remote import http_target from leap import soledad from leap.soledad.common.tests import u1db_tests as tests from leap.soledad.common.tests.u1db_tests import make_oauth_http_app -def https_server_def(): - def make_server(host_port, application): - from OpenSSL import SSL - cert_file = os.path.join(os.path.dirname(__file__), 'testing-certs', - 'testing.cert') - key_file = os.path.join(os.path.dirname(__file__), 'testing-certs', - 'testing.key') - ssl_context = SSL.Context(SSL.SSLv23_METHOD) - ssl_context.use_privatekey_file(key_file) - ssl_context.use_certificate_chain_file(cert_file) - srv = httpserver.WSGIServerBase(application, host_port, - httpserver.WSGIHandler, - ssl_context=ssl_context - ) - - def shutdown_request(req): - req.shutdown() - srv.close_request(req) - - srv.shutdown_request = shutdown_request - application.base_url = "https://localhost:%s" % srv.server_address[1] - return srv - return make_server, "shutdown", "https" - - -def oauth_https_sync_target(test, host, path): - _, port = test.server.server_address - st = http_target.HTTPSyncTarget('https://%s:%d/~/%s' % (host, port, path)) - st.set_oauth_credentials(tests.consumer1.key, tests.consumer1.secret, - tests.token1.key, tests.token1.secret) - return st - - -@skip("Skiping tests imported from U1DB.") -class TestHttpSyncTargetHttpsSupport(tests.TestCaseWithServer): - - scenarios = [ - ('oauth_https', {'server_def': https_server_def, - 'make_app_with_state': make_oauth_http_app, - 'make_document_for_test': - tests.make_document_for_test, - 'sync_target': oauth_https_sync_target - }), - ] - - def setUp(self): - try: - import OpenSSL # noqa - except ImportError: - self.skipTest("Requires pyOpenSSL") - self.cacert_pem = os.path.join(os.path.dirname(__file__), - 'testing-certs', 'cacert.pem') +#def https_server_def(): + #def make_server(host_port, application): + #from OpenSSL import SSL + #cert_file = os.path.join(os.path.dirname(__file__), 'testing-certs', + #'testing.cert') + #key_file = os.path.join(os.path.dirname(__file__), 'testing-certs', + #'testing.key') + #ssl_context = SSL.Context(SSL.SSLv23_METHOD) + #ssl_context.use_privatekey_file(key_file) + #ssl_context.use_certificate_chain_file(cert_file) + #srv = httpserver.WSGIServerBase(application, host_port, + #httpserver.WSGIHandler, + #ssl_context=ssl_context + #) +# + #def shutdown_request(req): + #req.shutdown() + #srv.close_request(req) +# + #srv.shutdown_request = shutdown_request + #application.base_url = "https://localhost:%s" % srv.server_address[1] + #return srv + #return make_server, "shutdown", "https" + + +#@skip("Skiping tests imported from U1DB.") +#class TestHttpSyncTargetHttpsSupport(tests.TestCaseWithServer): +# + #scenarios = [ + #('oauth_https', {'server_def': https_server_def, + #'make_app_with_state': make_oauth_http_app, + #'make_document_for_test': + #tests.make_document_for_test, + #'sync_target': oauth_https_sync_target + #}), + #] +# + #def setUp(self): + #try: + #import OpenSSL # noqa + #except ImportError: + #self.skipTest("Requires pyOpenSSL") + #self.cacert_pem = os.path.join(os.path.dirname(__file__), + #'testing-certs', 'cacert.pem') # The default u1db http_client class for doing HTTPS only does HTTPS # if the platform is linux. Because of this, soledad replaces that # class with one that will do HTTPS independent of the platform. In # order to maintain the compatibility with u1db default tests, we undo # that replacement here. - http_client._VerifiedHTTPSConnection = \ - soledad.client.api.old__VerifiedHTTPSConnection - super(TestHttpSyncTargetHttpsSupport, self).setUp() - - def getSyncTarget(self, host, path=None, cert_file=None): - if self.server is None: - self.startServer() - return self.sync_target(self, host, path, cert_file=cert_file) - - def test_working(self): - self.startServer() - db = self.request_state._create_database('test') - self.patch(http_client, 'CA_CERTS', self.cacert_pem) - remote_target = self.getSyncTarget('localhost', 'test') - remote_target.record_sync_info('other-id', 2, 'T-id') - self.assertEqual( - (2, 'T-id'), db._get_replica_gen_and_trans_id('other-id')) - - def test_cannot_verify_cert(self): - if not sys.platform.startswith('linux'): - self.skipTest( - "XXX certificate verification happens on linux only for now") - self.startServer() + #http_client._VerifiedHTTPSConnection = \ + #soledad.client.api.old__VerifiedHTTPSConnection + #super(TestHttpSyncTargetHttpsSupport, self).setUp() +# + #def getSyncTarget(self, host, path=None, cert_file=None): + #if self.server is None: + #self.startServer() + #return self.sync_target(self, host, path, cert_file=cert_file) +# + #def test_working(self): + #self.startServer() + #db = self.request_state._create_database('test') + #self.patch(http_client, 'CA_CERTS', self.cacert_pem) + #remote_target = self.getSyncTarget('localhost', 'test') + #remote_target.record_sync_info('other-id', 2, 'T-id') + #self.assertEqual( + #(2, 'T-id'), db._get_replica_gen_and_trans_id('other-id')) +# + #def test_cannot_verify_cert(self): + #if not sys.platform.startswith('linux'): + #self.skipTest( + #"XXX certificate verification happens on linux only for now") + #self.startServer() # don't print expected traceback server-side - self.server.handle_error = lambda req, cli_addr: None - self.request_state._create_database('test') - remote_target = self.getSyncTarget('localhost', 'test') - try: - remote_target.record_sync_info('other-id', 2, 'T-id') - except ssl.SSLError, e: - self.assertIn("certificate verify failed", str(e)) - else: - self.fail("certificate verification should have failed.") - - def test_host_mismatch(self): - if not sys.platform.startswith('linux'): - self.skipTest( - "XXX certificate verification happens on linux only for now") - self.startServer() - self.request_state._create_database('test') - self.patch(http_client, 'CA_CERTS', self.cacert_pem) - remote_target = self.getSyncTarget('127.0.0.1', 'test') - self.assertRaises( - http_client.CertificateError, remote_target.record_sync_info, - 'other-id', 2, 'T-id') - - -load_tests = tests.load_with_scenarios + #self.server.handle_error = lambda req, cli_addr: None + #self.request_state._create_database('test') + #remote_target = self.getSyncTarget('localhost', 'test') + #try: + #remote_target.record_sync_info('other-id', 2, 'T-id') + #except ssl.SSLError, e: + #self.assertIn("certificate verify failed", str(e)) + #else: + #self.fail("certificate verification should have failed.") +# + #def test_host_mismatch(self): + #if not sys.platform.startswith('linux'): + #self.skipTest( + #"XXX certificate verification happens on linux only for now") + #self.startServer() + #self.request_state._create_database('test') + #self.patch(http_client, 'CA_CERTS', self.cacert_pem) + #remote_target = self.getSyncTarget('127.0.0.1', 'test') + #self.assertRaises( + #http_client.CertificateError, remote_target.record_sync_info, + #'other-id', 2, 'T-id') +# +# +#load_tests = tests.load_with_scenarios |