From cf6adf149d2356400e611b019353f431a032d88e Mon Sep 17 00:00:00 2001 From: Folker Bernitt Date: Wed, 10 Feb 2016 17:06:15 +0100 Subject: Download SMTP client certificate, not VPN one - Issue #591 --- service/pixelated/bitmask_libraries/session.py | 60 ++++++++++++++++------ .../bitmask_libraries/test_smtp_cert_downloader.py | 23 ++++++--- .../test_smtp_client_certificate.py | 59 +++++++++++++++++++++ 3 files changed, 117 insertions(+), 25 deletions(-) create mode 100644 service/test/unit/bitmask_libraries/test_smtp_client_certificate.py diff --git a/service/pixelated/bitmask_libraries/session.py b/service/pixelated/bitmask_libraries/session.py index e217c286..ef41fe6a 100644 --- a/service/pixelated/bitmask_libraries/session.py +++ b/service/pixelated/bitmask_libraries/session.py @@ -120,6 +120,37 @@ class LeapSession(object): raise +class SmtpClientCertificate(object): + def __init__(self, provider, auth, user_path): + self._provider = provider + self._auth = auth + self._user_path = user_path + + def cert_path(self): + if not self._is_cert_already_downloaded(): + self._download_smtp_cert() + + return self._smtp_client_cert_path() + + def _is_cert_already_downloaded(self): + return os.path.exists(self._smtp_client_cert_path()) + + def _download_smtp_cert(self): + cert_path = self._smtp_client_cert_path() + + if not os.path.exists(os.path.dirname(cert_path)): + os.makedirs(os.path.dirname(cert_path)) + + SmtpCertDownloader(self._provider, self._auth).download_to(cert_path) + + def _smtp_client_cert_path(self): + return os.path.join( + self._user_path, + "providers", + self._provider.domain, + "keys", "client", "smtp.pem") + + class SmtpCertDownloader(object): def __init__(self, provider, auth): @@ -127,12 +158,15 @@ class SmtpCertDownloader(object): self._auth = auth def download(self): - cert_url = '%s/%s/cert' % (self._provider.api_uri, self._provider.api_version) + cert_url = '%s/%s/smtp_cert' % (self._provider.api_uri, self._provider.api_version) cookies = {"_session_id": self._auth.session_id} headers = {} headers["Authorization"] = 'Token token="{0}"'.format(self._auth.token) - response = requests.get( + params = {'address': self._auth.username} + response = requests.post( cert_url, + params=params, + data=params, verify=LeapCertificate(self._provider).provider_api_cert, cookies=cookies, timeout=self._provider.config.timeout_in_s, @@ -188,26 +222,15 @@ class LeapSessionFactory(object): mail_store = LeapMailStore(soledad) nicknym = self._create_nicknym(account_email, auth.token, auth.uuid, soledad) - self._download_smtp_cert(auth) + smtp_client_cert = self._download_smtp_cert(auth) smtp_host, smtp_port = self._provider.smtp_info() - smtp_config = LeapSMTPConfig(account_email, self._smtp_client_cert_path(), smtp_host, smtp_port) + smtp_config = LeapSMTPConfig(account_email, smtp_client_cert, smtp_host, smtp_port) return LeapSession(self._provider, auth, mail_store, soledad, nicknym, smtp_config) def _download_smtp_cert(self, auth): - cert_path = self._smtp_client_cert_path() - - if not os.path.exists(os.path.dirname(cert_path)): - os.makedirs(os.path.dirname(cert_path)) - - SmtpCertDownloader(self._provider, auth).download_to(cert_path) - - def _smtp_client_cert_path(self): - return os.path.join( - self._config.leap_home, - "providers", - self._provider.domain, - "keys", "client", "smtp.pem") + cert = SmtpClientCertificate(self._provider, auth, self._user_path(auth.uuid)) + return cert.cert_path() def _create_dir(self, path): try: @@ -221,6 +244,9 @@ class LeapSessionFactory(object): def _create_nicknym(self, email_address, token, uuid, soledad): return NickNym(self._provider, self._config, soledad, email_address, token, uuid) + def _user_path(self, user_uuid): + return os.path.join(self._config.leap_home, user_uuid) + def _soledad_path(self, user_uuid): return os.path.join(self._config.leap_home, user_uuid, 'soledad') diff --git a/service/test/unit/bitmask_libraries/test_smtp_cert_downloader.py b/service/test/unit/bitmask_libraries/test_smtp_cert_downloader.py index 5644ab6a..cfc9353d 100644 --- a/service/test/unit/bitmask_libraries/test_smtp_cert_downloader.py +++ b/service/test/unit/bitmask_libraries/test_smtp_cert_downloader.py @@ -21,6 +21,7 @@ from tempfile import NamedTemporaryFile from httmock import all_requests, HTTMock, urlmatch CERTIFICATE_DATA = 'some cert data' +USERNAME = 'some_user_name' @all_requests @@ -29,12 +30,17 @@ def not_found_mock(url, request): 'content': 'foobar'} -@urlmatch(netloc='api.some-server.test:4430', path='/1/cert') -def ca_cert_mock(url, request): - return { - "status_code": 200, - "content": CERTIFICATE_DATA - } +@urlmatch(netloc='api.some-server.test:4430', path='/1/smtp_cert', method='POST') +def smtp_cert_mock(url, request): + if request.body == 'address=%s' % USERNAME: + return { + "status_code": 200, + "content": CERTIFICATE_DATA + } + else: + return { + 'status_code': 401 + } class TestSmtpCertDownloader(unittest.TestCase): @@ -50,6 +56,7 @@ class TestSmtpCertDownloader(unittest.TestCase): self._provider.api_version = '1' self._provider.server_name = 'some.host.tld' + self._auth.username = USERNAME self._auth.session_id = 'some session id' self._auth.token = 'some token' @@ -57,7 +64,7 @@ class TestSmtpCertDownloader(unittest.TestCase): unstub() def test_download_certificate(self): - with HTTMock(ca_cert_mock, not_found_mock): + with HTTMock(smtp_cert_mock, not_found_mock): cert_data = SmtpCertDownloader(self._provider, self._auth).download() self.assertEqual(CERTIFICATE_DATA, cert_data) @@ -71,7 +78,7 @@ class TestSmtpCertDownloader(unittest.TestCase): downloader = SmtpCertDownloader(self._provider, self._auth) with NamedTemporaryFile() as tmp_file: - with HTTMock(ca_cert_mock, not_found_mock): + with HTTMock(smtp_cert_mock, not_found_mock): downloader.download_to(tmp_file.name) file_content = open(tmp_file.name).read() diff --git a/service/test/unit/bitmask_libraries/test_smtp_client_certificate.py b/service/test/unit/bitmask_libraries/test_smtp_client_certificate.py new file mode 100644 index 00000000..1a57487a --- /dev/null +++ b/service/test/unit/bitmask_libraries/test_smtp_client_certificate.py @@ -0,0 +1,59 @@ +# +# Copyright (c) 2016 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see . +import os +import unittest +import tempdir +from pixelated.bitmask_libraries import session +from leap.srp_session import SRPSession +from mockito import mock, unstub, when, verify, never, any as ANY + +from pixelated.bitmask_libraries.session import SmtpClientCertificate + + +class TestSmtpClientCertificate(unittest.TestCase): + + def setUp(self): + self.tmp_dir = tempdir.TempDir() + self.provider = mock() + self.provider.domain = 'some-provider.tld' + self.auth = SRPSession('username', 'token', 'uuid', 'session_id') + self.pem_path = os.path.join(self.tmp_dir.name, 'providers', 'some-provider.tld', 'keys', 'client', 'smtp.pem') + + def tearDown(self): + self.tmp_dir.dissolve() + unstub() + + def test_download_certificate(self): + downloader = mock() + when(session).SmtpCertDownloader(self.provider, self.auth).thenReturn(downloader) + + cert = SmtpClientCertificate(self.provider, self.auth, self.tmp_dir.name) + result = cert.cert_path() + + self.assertEqual(self.pem_path, result) + verify(downloader).download_to(self.pem_path) + + def test_skip_download_if_already_downloaded(self): + + downloader = mock() + when(session).SmtpCertDownloader(self.provider, self.auth).thenReturn(downloader) + when(os.path).exists(self.pem_path).thenReturn(True) + + cert = SmtpClientCertificate(self.provider, self.auth, self.tmp_dir.name) + result = cert.cert_path() + + self.assertEqual(self.pem_path, result) + verify(downloader, never).download_to(ANY()) -- cgit v1.2.3