# # Copyright (c) 2014 ThoughtWorks, Inc. # # Pixelated is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Pixelated is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with Pixelated. If not, see . import os import requests from .certs import which_bundle from leap.mail.smtp import setup_smtp_gateway import random class LeapSmtp(object): SMTP_PORT = 2014 def __init__(self, provider, keymanager=None, leap_srp_session=None): self._provider = provider self._keymanager = keymanager self._srp_session = leap_srp_session self._hostname, self._port = self._discover_smtp_server() self._smtp_port = None self._smtp_service = None self._twisted_port = 10000 + int(random.random() * 5000) print "## SMTP port: " + str(self._twisted_port) def smtp_info(self): return ('localhost', self._twisted_port) def _discover_smtp_server(self): json_data = self._provider.fetch_smtp_json() hosts = json_data['hosts'] hostname = hosts.keys()[0] host = hosts[hostname] hostname = host['hostname'] port = host['port'] return hostname, port def _download_client_certificates(self): cert_path = self._client_cert_path() if not os.path.exists(os.path.dirname(cert_path)): os.makedirs(os.path.dirname(cert_path)) session = requests.session() cert_url = '%s/%s/cert' % (self._provider.api_uri, self._provider.api_version) cookies = {"_session_id": self._srp_session.session_id} response = requests.get(cert_url, verify=which_bundle(self._provider), cookies=cookies, timeout=self._provider.config.timeout_in_s) response.raise_for_status() client_cert = response.content with open(cert_path, 'w') as f: f.write(client_cert) def _client_cert_path(self): return os.path.join( self._provider.config.leap_home, "providers", self._provider.domain, "keys", "client", "smtp.pem") def start(self): self._download_client_certificates() cert_path = self._client_cert_path() email = '%s@%s' % (self._srp_session.user_name, self._provider.domain) self._smtp_service, self._smtp_port = setup_smtp_gateway( port=self._twisted_port, userid=email, keymanager=self._keymanager, smtp_host=self._hostname.encode('UTF-8'), smtp_port=self._port, smtp_cert=cert_path, smtp_key=cert_path, encrypted_only=False ) def stop(self): if self._smtp_service is not None: self._smtp_port.stopListening() self._smtp_service.doStop() self._smtp_port = None self._smtp_service = None