summaryrefslogtreecommitdiff
path: root/service
diff options
context:
space:
mode:
authorFolker Bernitt <fbernitt@thoughtworks.com>2016-02-10 17:06:15 +0100
committerFolker Bernitt <fbernitt@thoughtworks.com>2016-02-10 17:06:15 +0100
commitcf6adf149d2356400e611b019353f431a032d88e (patch)
treebc44f35adf758e635e15b2b0c609c0a0949a0766 /service
parent8caffb50f42aba45a59d6bd9f38eac4086735d49 (diff)
Download SMTP client certificate, not VPN one
- Issue #591
Diffstat (limited to 'service')
-rw-r--r--service/pixelated/bitmask_libraries/session.py60
-rw-r--r--service/test/unit/bitmask_libraries/test_smtp_cert_downloader.py23
-rw-r--r--service/test/unit/bitmask_libraries/test_smtp_client_certificate.py59
3 files changed, 117 insertions, 25 deletions
diff --git a/service/pixelated/bitmask_libraries/session.py b/service/pixelated/bitmask_libraries/session.py
index e217c286..ef41fe6a 100644
--- a/service/pixelated/bitmask_libraries/session.py
+++ b/service/pixelated/bitmask_libraries/session.py
@@ -120,6 +120,37 @@ class LeapSession(object):
raise
+class SmtpClientCertificate(object):
+ def __init__(self, provider, auth, user_path):
+ self._provider = provider
+ self._auth = auth
+ self._user_path = user_path
+
+ def cert_path(self):
+ if not self._is_cert_already_downloaded():
+ self._download_smtp_cert()
+
+ return self._smtp_client_cert_path()
+
+ def _is_cert_already_downloaded(self):
+ return os.path.exists(self._smtp_client_cert_path())
+
+ def _download_smtp_cert(self):
+ cert_path = self._smtp_client_cert_path()
+
+ if not os.path.exists(os.path.dirname(cert_path)):
+ os.makedirs(os.path.dirname(cert_path))
+
+ SmtpCertDownloader(self._provider, self._auth).download_to(cert_path)
+
+ def _smtp_client_cert_path(self):
+ return os.path.join(
+ self._user_path,
+ "providers",
+ self._provider.domain,
+ "keys", "client", "smtp.pem")
+
+
class SmtpCertDownloader(object):
def __init__(self, provider, auth):
@@ -127,12 +158,15 @@ class SmtpCertDownloader(object):
self._auth = auth
def download(self):
- cert_url = '%s/%s/cert' % (self._provider.api_uri, self._provider.api_version)
+ cert_url = '%s/%s/smtp_cert' % (self._provider.api_uri, self._provider.api_version)
cookies = {"_session_id": self._auth.session_id}
headers = {}
headers["Authorization"] = 'Token token="{0}"'.format(self._auth.token)
- response = requests.get(
+ params = {'address': self._auth.username}
+ response = requests.post(
cert_url,
+ params=params,
+ data=params,
verify=LeapCertificate(self._provider).provider_api_cert,
cookies=cookies,
timeout=self._provider.config.timeout_in_s,
@@ -188,26 +222,15 @@ class LeapSessionFactory(object):
mail_store = LeapMailStore(soledad)
nicknym = self._create_nicknym(account_email, auth.token, auth.uuid, soledad)
- self._download_smtp_cert(auth)
+ smtp_client_cert = self._download_smtp_cert(auth)
smtp_host, smtp_port = self._provider.smtp_info()
- smtp_config = LeapSMTPConfig(account_email, self._smtp_client_cert_path(), smtp_host, smtp_port)
+ smtp_config = LeapSMTPConfig(account_email, smtp_client_cert, smtp_host, smtp_port)
return LeapSession(self._provider, auth, mail_store, soledad, nicknym, smtp_config)
def _download_smtp_cert(self, auth):
- cert_path = self._smtp_client_cert_path()
-
- if not os.path.exists(os.path.dirname(cert_path)):
- os.makedirs(os.path.dirname(cert_path))
-
- SmtpCertDownloader(self._provider, auth).download_to(cert_path)
-
- def _smtp_client_cert_path(self):
- return os.path.join(
- self._config.leap_home,
- "providers",
- self._provider.domain,
- "keys", "client", "smtp.pem")
+ cert = SmtpClientCertificate(self._provider, auth, self._user_path(auth.uuid))
+ return cert.cert_path()
def _create_dir(self, path):
try:
@@ -221,6 +244,9 @@ class LeapSessionFactory(object):
def _create_nicknym(self, email_address, token, uuid, soledad):
return NickNym(self._provider, self._config, soledad, email_address, token, uuid)
+ def _user_path(self, user_uuid):
+ return os.path.join(self._config.leap_home, user_uuid)
+
def _soledad_path(self, user_uuid):
return os.path.join(self._config.leap_home, user_uuid, 'soledad')
diff --git a/service/test/unit/bitmask_libraries/test_smtp_cert_downloader.py b/service/test/unit/bitmask_libraries/test_smtp_cert_downloader.py
index 5644ab6a..cfc9353d 100644
--- a/service/test/unit/bitmask_libraries/test_smtp_cert_downloader.py
+++ b/service/test/unit/bitmask_libraries/test_smtp_cert_downloader.py
@@ -21,6 +21,7 @@ from tempfile import NamedTemporaryFile
from httmock import all_requests, HTTMock, urlmatch
CERTIFICATE_DATA = 'some cert data'
+USERNAME = 'some_user_name'
@all_requests
@@ -29,12 +30,17 @@ def not_found_mock(url, request):
'content': 'foobar'}
-@urlmatch(netloc='api.some-server.test:4430', path='/1/cert')
-def ca_cert_mock(url, request):
- return {
- "status_code": 200,
- "content": CERTIFICATE_DATA
- }
+@urlmatch(netloc='api.some-server.test:4430', path='/1/smtp_cert', method='POST')
+def smtp_cert_mock(url, request):
+ if request.body == 'address=%s' % USERNAME:
+ return {
+ "status_code": 200,
+ "content": CERTIFICATE_DATA
+ }
+ else:
+ return {
+ 'status_code': 401
+ }
class TestSmtpCertDownloader(unittest.TestCase):
@@ -50,6 +56,7 @@ class TestSmtpCertDownloader(unittest.TestCase):
self._provider.api_version = '1'
self._provider.server_name = 'some.host.tld'
+ self._auth.username = USERNAME
self._auth.session_id = 'some session id'
self._auth.token = 'some token'
@@ -57,7 +64,7 @@ class TestSmtpCertDownloader(unittest.TestCase):
unstub()
def test_download_certificate(self):
- with HTTMock(ca_cert_mock, not_found_mock):
+ with HTTMock(smtp_cert_mock, not_found_mock):
cert_data = SmtpCertDownloader(self._provider, self._auth).download()
self.assertEqual(CERTIFICATE_DATA, cert_data)
@@ -71,7 +78,7 @@ class TestSmtpCertDownloader(unittest.TestCase):
downloader = SmtpCertDownloader(self._provider, self._auth)
with NamedTemporaryFile() as tmp_file:
- with HTTMock(ca_cert_mock, not_found_mock):
+ with HTTMock(smtp_cert_mock, not_found_mock):
downloader.download_to(tmp_file.name)
file_content = open(tmp_file.name).read()
diff --git a/service/test/unit/bitmask_libraries/test_smtp_client_certificate.py b/service/test/unit/bitmask_libraries/test_smtp_client_certificate.py
new file mode 100644
index 00000000..1a57487a
--- /dev/null
+++ b/service/test/unit/bitmask_libraries/test_smtp_client_certificate.py
@@ -0,0 +1,59 @@
+#
+# Copyright (c) 2016 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+import os
+import unittest
+import tempdir
+from pixelated.bitmask_libraries import session
+from leap.srp_session import SRPSession
+from mockito import mock, unstub, when, verify, never, any as ANY
+
+from pixelated.bitmask_libraries.session import SmtpClientCertificate
+
+
+class TestSmtpClientCertificate(unittest.TestCase):
+
+ def setUp(self):
+ self.tmp_dir = tempdir.TempDir()
+ self.provider = mock()
+ self.provider.domain = 'some-provider.tld'
+ self.auth = SRPSession('username', 'token', 'uuid', 'session_id')
+ self.pem_path = os.path.join(self.tmp_dir.name, 'providers', 'some-provider.tld', 'keys', 'client', 'smtp.pem')
+
+ def tearDown(self):
+ self.tmp_dir.dissolve()
+ unstub()
+
+ def test_download_certificate(self):
+ downloader = mock()
+ when(session).SmtpCertDownloader(self.provider, self.auth).thenReturn(downloader)
+
+ cert = SmtpClientCertificate(self.provider, self.auth, self.tmp_dir.name)
+ result = cert.cert_path()
+
+ self.assertEqual(self.pem_path, result)
+ verify(downloader).download_to(self.pem_path)
+
+ def test_skip_download_if_already_downloaded(self):
+
+ downloader = mock()
+ when(session).SmtpCertDownloader(self.provider, self.auth).thenReturn(downloader)
+ when(os.path).exists(self.pem_path).thenReturn(True)
+
+ cert = SmtpClientCertificate(self.provider, self.auth, self.tmp_dir.name)
+ result = cert.cert_path()
+
+ self.assertEqual(self.pem_path, result)
+ verify(downloader, never).download_to(ANY())