# # Copyright (c) 2016 ThoughtWorks, Inc. # # Pixelated is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Pixelated is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with Pixelated. If not, see . import os import unittest import tempdir from leap.srp_session import SRPSession import leap.common.certs as certs from mockito import mock, unstub, when, verify, never, any as ANY from pixelated.config.sessions import SmtpClientCertificate from tempfile import NamedTemporaryFile from httmock import all_requests, HTTMock, urlmatch CERTIFICATE_DATA = 'some cert data' USERNAME = 'some_user_name' @all_requests def not_found_mock(url, request): return {'status_code': 404, 'content': 'foobar'} @urlmatch(netloc='api.some-server.test:4430', path='/1/smtp_cert', method='POST') def smtp_cert_mock(url, request): if request.body == 'address=%s' % USERNAME: return { "status_code": 200, "content": CERTIFICATE_DATA } else: return { 'status_code': 401 } class TestSmtpClientCertificate(unittest.TestCase): def setUp(self): self.tmp_dir = tempdir.TempDir() self.provider = mock() self.provider.api_uri = 'https://api.some-server.test:4430' self.provider.api_version = '1' self.provider.server_name = 'some.host.tld' self.provider.domain = 'some-provider.tld' self.auth = SRPSession(USERNAME, 'token', 'uuid', 'session_id', {}) self.pem_path = os.path.join(self.tmp_dir.name, 'providers', 'some-provider.tld', 'keys', 'client', 'smtp.pem') def tearDown(self): self.tmp_dir.dissolve() unstub() def test_download_certificate(self): cert = SmtpClientCertificate(self.provider, self.auth, self.tmp_dir.name) when(cert).download_to(ANY()).thenReturn(None) result = cert.cert_path() self.assertEqual(self.pem_path, result) def test_download_certificate_if_redownload_necessary_e_g_certificate_expired(self): self.pretend_all_paths_exist() when(certs).should_redownload(self.pem_path).thenReturn(True) cert = SmtpClientCertificate(self.provider, self.auth, self.tmp_dir.name) when(cert).download_to(ANY()).thenReturn(None) result = cert.cert_path() self.assertEqual(self.pem_path, result) def pretend_all_paths_exist(self): when(os.path).exists(ANY()).thenReturn(True) def test_skip_download_if_already_downloaded_and_still_valid(self): when(os.path).exists(self.pem_path).thenReturn(True) when(certs).should_redownload(ANY()).thenReturn(False) cert = SmtpClientCertificate(self.provider, self.auth, self.tmp_dir.name) result = cert.cert_path() self.assertEqual(self.pem_path, result) def test_download_to(self): cert = SmtpClientCertificate(self.provider, self.auth, self.tmp_dir.name) with NamedTemporaryFile() as tmp_file: with HTTMock(smtp_cert_mock, not_found_mock): cert.download_to(tmp_file.name) file_content = open(tmp_file.name).read() self.assertEqual(CERTIFICATE_DATA, file_content)