diff options
3 files changed, 117 insertions, 25 deletions
| diff --git a/service/pixelated/bitmask_libraries/session.py b/service/pixelated/bitmask_libraries/session.py index e217c286..ef41fe6a 100644 --- a/service/pixelated/bitmask_libraries/session.py +++ b/service/pixelated/bitmask_libraries/session.py @@ -120,6 +120,37 @@ class LeapSession(object):              raise +class SmtpClientCertificate(object): +    def __init__(self, provider, auth, user_path): +        self._provider = provider +        self._auth = auth +        self._user_path = user_path + +    def cert_path(self): +        if not self._is_cert_already_downloaded(): +            self._download_smtp_cert() + +        return self._smtp_client_cert_path() + +    def _is_cert_already_downloaded(self): +        return os.path.exists(self._smtp_client_cert_path()) + +    def _download_smtp_cert(self): +        cert_path = self._smtp_client_cert_path() + +        if not os.path.exists(os.path.dirname(cert_path)): +            os.makedirs(os.path.dirname(cert_path)) + +        SmtpCertDownloader(self._provider, self._auth).download_to(cert_path) + +    def _smtp_client_cert_path(self): +        return os.path.join( +            self._user_path, +            "providers", +            self._provider.domain, +            "keys", "client", "smtp.pem") + +  class SmtpCertDownloader(object):      def __init__(self, provider, auth): @@ -127,12 +158,15 @@ class SmtpCertDownloader(object):          self._auth = auth      def download(self): -        cert_url = '%s/%s/cert' % (self._provider.api_uri, self._provider.api_version) +        cert_url = '%s/%s/smtp_cert' % (self._provider.api_uri, self._provider.api_version)          cookies = {"_session_id": self._auth.session_id}          headers = {}          headers["Authorization"] = 'Token token="{0}"'.format(self._auth.token) -        response = requests.get( +        params = {'address': self._auth.username} +        response = requests.post(              cert_url, +            params=params, +            data=params,              verify=LeapCertificate(self._provider).provider_api_cert,              cookies=cookies,              timeout=self._provider.config.timeout_in_s, @@ -188,26 +222,15 @@ class LeapSessionFactory(object):          mail_store = LeapMailStore(soledad)          nicknym = self._create_nicknym(account_email, auth.token, auth.uuid, soledad) -        self._download_smtp_cert(auth) +        smtp_client_cert = self._download_smtp_cert(auth)          smtp_host, smtp_port = self._provider.smtp_info() -        smtp_config = LeapSMTPConfig(account_email, self._smtp_client_cert_path(), smtp_host, smtp_port) +        smtp_config = LeapSMTPConfig(account_email, smtp_client_cert, smtp_host, smtp_port)          return LeapSession(self._provider, auth, mail_store, soledad, nicknym, smtp_config)      def _download_smtp_cert(self, auth): -        cert_path = self._smtp_client_cert_path() - -        if not os.path.exists(os.path.dirname(cert_path)): -            os.makedirs(os.path.dirname(cert_path)) - -        SmtpCertDownloader(self._provider, auth).download_to(cert_path) - -    def _smtp_client_cert_path(self): -        return os.path.join( -            self._config.leap_home, -            "providers", -            self._provider.domain, -            "keys", "client", "smtp.pem") +        cert = SmtpClientCertificate(self._provider, auth, self._user_path(auth.uuid)) +        return cert.cert_path()      def _create_dir(self, path):          try: @@ -221,6 +244,9 @@ class LeapSessionFactory(object):      def _create_nicknym(self, email_address, token, uuid, soledad):          return NickNym(self._provider, self._config, soledad, email_address, token, uuid) +    def _user_path(self, user_uuid): +        return os.path.join(self._config.leap_home, user_uuid) +      def _soledad_path(self, user_uuid):          return os.path.join(self._config.leap_home, user_uuid, 'soledad') diff --git a/service/test/unit/bitmask_libraries/test_smtp_cert_downloader.py b/service/test/unit/bitmask_libraries/test_smtp_cert_downloader.py index 5644ab6a..cfc9353d 100644 --- a/service/test/unit/bitmask_libraries/test_smtp_cert_downloader.py +++ b/service/test/unit/bitmask_libraries/test_smtp_cert_downloader.py @@ -21,6 +21,7 @@ from tempfile import NamedTemporaryFile  from httmock import all_requests, HTTMock, urlmatch  CERTIFICATE_DATA = 'some cert data' +USERNAME = 'some_user_name'  @all_requests @@ -29,12 +30,17 @@ def not_found_mock(url, request):              'content': 'foobar'} -@urlmatch(netloc='api.some-server.test:4430', path='/1/cert') -def ca_cert_mock(url, request): -    return { -        "status_code": 200, -        "content": CERTIFICATE_DATA -    } +@urlmatch(netloc='api.some-server.test:4430', path='/1/smtp_cert', method='POST') +def smtp_cert_mock(url, request): +    if request.body == 'address=%s' % USERNAME: +        return { +            "status_code": 200, +            "content": CERTIFICATE_DATA +        } +    else: +        return { +            'status_code': 401 +        }  class TestSmtpCertDownloader(unittest.TestCase): @@ -50,6 +56,7 @@ class TestSmtpCertDownloader(unittest.TestCase):          self._provider.api_version = '1'          self._provider.server_name = 'some.host.tld' +        self._auth.username = USERNAME          self._auth.session_id = 'some session id'          self._auth.token = 'some token' @@ -57,7 +64,7 @@ class TestSmtpCertDownloader(unittest.TestCase):          unstub()      def test_download_certificate(self): -        with HTTMock(ca_cert_mock, not_found_mock): +        with HTTMock(smtp_cert_mock, not_found_mock):              cert_data = SmtpCertDownloader(self._provider, self._auth).download()          self.assertEqual(CERTIFICATE_DATA, cert_data) @@ -71,7 +78,7 @@ class TestSmtpCertDownloader(unittest.TestCase):          downloader = SmtpCertDownloader(self._provider, self._auth)          with NamedTemporaryFile() as tmp_file: -            with HTTMock(ca_cert_mock, not_found_mock): +            with HTTMock(smtp_cert_mock, not_found_mock):                  downloader.download_to(tmp_file.name)              file_content = open(tmp_file.name).read() diff --git a/service/test/unit/bitmask_libraries/test_smtp_client_certificate.py b/service/test/unit/bitmask_libraries/test_smtp_client_certificate.py new file mode 100644 index 00000000..1a57487a --- /dev/null +++ b/service/test/unit/bitmask_libraries/test_smtp_client_certificate.py @@ -0,0 +1,59 @@ +# +# Copyright (c) 2016 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see <http://www.gnu.org/licenses/>. +import os +import unittest +import tempdir +from pixelated.bitmask_libraries import session +from leap.srp_session import SRPSession +from mockito import mock, unstub, when, verify, never, any as ANY + +from pixelated.bitmask_libraries.session import SmtpClientCertificate + + +class TestSmtpClientCertificate(unittest.TestCase): + +    def setUp(self): +        self.tmp_dir = tempdir.TempDir() +        self.provider = mock() +        self.provider.domain = 'some-provider.tld' +        self.auth = SRPSession('username', 'token', 'uuid', 'session_id') +        self.pem_path = os.path.join(self.tmp_dir.name, 'providers', 'some-provider.tld', 'keys', 'client', 'smtp.pem') + +    def tearDown(self): +        self.tmp_dir.dissolve() +        unstub() + +    def test_download_certificate(self): +        downloader = mock() +        when(session).SmtpCertDownloader(self.provider, self.auth).thenReturn(downloader) + +        cert = SmtpClientCertificate(self.provider, self.auth, self.tmp_dir.name) +        result = cert.cert_path() + +        self.assertEqual(self.pem_path, result) +        verify(downloader).download_to(self.pem_path) + +    def test_skip_download_if_already_downloaded(self): + +        downloader = mock() +        when(session).SmtpCertDownloader(self.provider, self.auth).thenReturn(downloader) +        when(os.path).exists(self.pem_path).thenReturn(True) + +        cert = SmtpClientCertificate(self.provider, self.auth, self.tmp_dir.name) +        result = cert.cert_path() + +        self.assertEqual(self.pem_path, result) +        verify(downloader, never).download_to(ANY()) | 
