From d79aa00e3c24c5bf5e5ed5ba5a9b976f93034362 Mon Sep 17 00:00:00 2001 From: Folker Bernitt Date: Wed, 4 Nov 2015 10:11:19 +0100 Subject: Instantiate new MailSender in Services - Issue #499 - Some smaller refactorings - Extract smtp cert download to own class --- service/test/unit/bitmask_libraries/test_smtp.py | 53 +++------------ .../bitmask_libraries/test_smtp_cert_downloader.py | 78 ++++++++++++++++++++++ 2 files changed, 88 insertions(+), 43 deletions(-) create mode 100644 service/test/unit/bitmask_libraries/test_smtp_cert_downloader.py (limited to 'service/test/unit/bitmask_libraries') diff --git a/service/test/unit/bitmask_libraries/test_smtp.py b/service/test/unit/bitmask_libraries/test_smtp.py index 182a0786..22b69b9e 100644 --- a/service/test/unit/bitmask_libraries/test_smtp.py +++ b/service/test/unit/bitmask_libraries/test_smtp.py @@ -13,29 +13,10 @@ # # You should have received a copy of the GNU Affero General Public License # along with Pixelated. If not, see . -import sys - import os from mock import MagicMock, patch from test_abstract_leap import AbstractLeapTest -from pixelated.bitmask_libraries.smtp import LeapSmtp -from httmock import all_requests, HTTMock, urlmatch - - -@all_requests -def not_found_mock(url, request): - sys.stderr.write('url=%s\n' % url.netloc) - sys.stderr.write('path=%s\n' % url.path) - return {'status_code': 404, - 'content': 'foobar'} - - -@urlmatch(netloc='api.some-server.test:4430', path='/1/cert') -def ca_cert_mock(url, request): - return { - "status_code": 200, - "content": "some content" - } +from pixelated.bitmask_libraries.smtp import LeapSmtp, LeapSMTPConfig class LeapSmtpTest(AbstractLeapTest): @@ -43,53 +24,39 @@ class LeapSmtpTest(AbstractLeapTest): def setUp(self): super(LeapSmtpTest, self).setUp() - self.provider.fetch_smtp_json.return_value = { - 'hosts': { - 'leap-mx': { - 'hostname': 'smtp.some-sever.test', - 'port': '1234' - } - } - } self.config.timeout_in_s = 15 + self._smtp_config = LeapSMTPConfig('test_user@some-server.test', self._client_cert_path(), 'smtp.some-server.test', 1234) def _client_cert_path(self): return os.path.join(self.leap_home, 'providers', 'some-server.test', 'keys', 'client', 'smtp.pem') @patch('pixelated.bitmask_libraries.smtp.setup_smtp_gateway') def test_that_start_calls_setup_smtp_gateway(self, gateway_mock): - self.provider.smtp_info = MagicMock(return_value=('smtp.some-sever.test', 1234)) - self.provider._client_cert_path = MagicMock(return_value=self._client_cert_path()) - smtp = LeapSmtp(self.provider, self.auth, self.keymanager) + smtp = LeapSmtp(self._smtp_config, self.keymanager) port = 500 smtp.local_smtp_port_number = port gateway_mock.return_value = (None, None) - with HTTMock(ca_cert_mock, not_found_mock): - smtp.ensure_running() + smtp.ensure_running() cert_path = self._client_cert_path() - gateway_mock.assert_called_with(smtp_cert=cert_path, userid='test_user@some-server.test', smtp_port=1234, smtp_key=cert_path, keymanager=self.keymanager, encrypted_only=False, smtp_host='smtp.some-sever.test', port=port) + gateway_mock.assert_called_with(smtp_cert=cert_path, userid='test_user@some-server.test', smtp_port=1234, smtp_key=cert_path, keymanager=self.keymanager, encrypted_only=False, smtp_host='smtp.some-server.test', port=port) def test_that_client_stop_does_nothing_if_not_started(self): - self.provider.smtp_info = MagicMock(return_value=('smtp.some-sever.test', 1234)) - smtp = LeapSmtp(self.provider, self.auth, self.keymanager) + smtp = LeapSmtp(self._smtp_config, self.keymanager) - with HTTMock(not_found_mock): - smtp.stop() + smtp.stop() @patch('pixelated.bitmask_libraries.smtp.setup_smtp_gateway') def test_that_running_smtp_sevice_is_stopped(self, gateway_mock): - self.provider.smtp_info = MagicMock(return_value=('smtp.some-sever.test', 1234)) - smtp = LeapSmtp(self.provider, self.auth, self.keymanager) + smtp = LeapSmtp(self._smtp_config, self.keymanager) smtp_service = MagicMock() smtp_port = MagicMock() gateway_mock.return_value = (smtp_service, smtp_port) - with HTTMock(ca_cert_mock, not_found_mock): - smtp.ensure_running() - smtp.stop() + smtp.ensure_running() + smtp.stop() smtp_port.stopListening.assert_called_with() smtp_service.doStop.assert_called_with() diff --git a/service/test/unit/bitmask_libraries/test_smtp_cert_downloader.py b/service/test/unit/bitmask_libraries/test_smtp_cert_downloader.py new file mode 100644 index 00000000..5644ab6a --- /dev/null +++ b/service/test/unit/bitmask_libraries/test_smtp_cert_downloader.py @@ -0,0 +1,78 @@ +# +# Copyright (c) 2015 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see . +import unittest +from mockito import mock, unstub +from requests import HTTPError +from pixelated.bitmask_libraries.session import SmtpCertDownloader +from tempfile import NamedTemporaryFile +from httmock import all_requests, HTTMock, urlmatch + +CERTIFICATE_DATA = 'some cert data' + + +@all_requests +def not_found_mock(url, request): + return {'status_code': 404, + 'content': 'foobar'} + + +@urlmatch(netloc='api.some-server.test:4430', path='/1/cert') +def ca_cert_mock(url, request): + return { + "status_code": 200, + "content": CERTIFICATE_DATA + } + + +class TestSmtpCertDownloader(unittest.TestCase): + + def setUp(self): + self._provider = mock() + self._config = mock() + self._config.leap_home = '/tmp' + self._auth = mock() + + self._provider.config = self._config + self._provider.api_uri = 'https://api.some-server.test:4430' + self._provider.api_version = '1' + self._provider.server_name = 'some.host.tld' + + self._auth.session_id = 'some session id' + self._auth.token = 'some token' + + def tearDown(self): + unstub() + + def test_download_certificate(self): + with HTTMock(ca_cert_mock, not_found_mock): + cert_data = SmtpCertDownloader(self._provider, self._auth).download() + + self.assertEqual(CERTIFICATE_DATA, cert_data) + + def test_error_if_not_found(self): + downloader = SmtpCertDownloader(self._provider, self._auth) + with HTTMock(not_found_mock): + self.assertRaises(HTTPError, downloader.download) + + def test_download_to(self): + downloader = SmtpCertDownloader(self._provider, self._auth) + + with NamedTemporaryFile() as tmp_file: + with HTTMock(ca_cert_mock, not_found_mock): + downloader.download_to(tmp_file.name) + + file_content = open(tmp_file.name).read() + self.assertEqual(CERTIFICATE_DATA, file_content) -- cgit v1.2.3