1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
#
# Copyright (c) 2014 ThoughtWorks, Inc.
#
# Pixelated is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pixelated is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
from leap.bitmask.mail.outgoing.service import OutgoingMail
from mock import patch
from twisted.mail.smtp import User
from twisted.trial import unittest
from mockito import mock, when, verify, any, unstub
from pixelated.adapter.services.mail_sender import MailSender, MailSenderException
from pixelated.adapter.model.mail import InputMail
from pixelated.bitmask_libraries.smtp import LeapSMTPConfig
from pixelated.support.functional import flatten
from test.support.test_helper import mail_dict
from twisted.internet import reactor, defer
from twisted.internet.defer import Deferred
from mockito.matchers import Matcher
class TwistedSmtpUserCapture(Matcher):
def __init__(self, username):
self._username = username
def matches(self, arg):
return isinstance(arg, User) \
and isinstance(arg.dest.addrstr, str) \
and self._username == arg.dest.addrstr
class MailToSmtpFormatCapture(Matcher):
def __init__(self, recipient, bccs):
self._recipient = recipient
self._bccs = bccs
def matches(self, mail):
if self._recipient in self._bccs:
return 'Bcc: %s\n' % self._recipient in mail
else:
return "Bcc: " not in mail
class MailSenderTest(unittest.TestCase):
def setUp(self):
self._cert_path = u'/some/cert/path'
self._keymanager_mock = mock()
self._remote_smtp_host = 'some.host.test'
self._remote_smtp_port = 1234
self._smtp_config = LeapSMTPConfig('someone@somedomain.tld', self._cert_path, self._remote_smtp_host, self._remote_smtp_port)
self.sender = MailSender(self._smtp_config, self._keymanager_mock)
def tearDown(self):
unstub()
@defer.inlineCallbacks
def test_iterates_over_recipients(self):
input_mail = InputMail.from_dict(mail_dict(), from_address='pixelated@org')
when(OutgoingMail).send_message(any(), any()).thenReturn(defer.succeed(None))
yield self.sender.sendmail(input_mail)
for recipient in flatten([input_mail.to, input_mail.cc, input_mail.bcc]):
verify(OutgoingMail).send_message(any(), TwistedSmtpUserCapture(recipient))
@defer.inlineCallbacks
def test_send_leaves_mail_in_tact(self):
input_mail_dict = mail_dict()
input_mail = InputMail.from_dict(input_mail_dict, from_address='pixelated@org')
when(OutgoingMail).send_message(any(), any()).thenReturn(defer.succeed(None))
yield self.sender.sendmail(input_mail)
self.assertEqual(input_mail.to, input_mail_dict["header"]["to"])
self.assertEqual(input_mail.cc, input_mail_dict["header"]["cc"])
self.assertEqual(input_mail.bcc, input_mail_dict["header"]["bcc"])
self.assertEqual(input_mail.subject, input_mail_dict["header"]["subject"])
@defer.inlineCallbacks
def test_problem_with_email_raises_exception(self):
input_mail = InputMail.from_dict(mail_dict(), from_address='pixelated@org')
when(OutgoingMail).send_message(any(), any()).thenReturn(defer.fail(Exception('pretend something went wrong')))
try:
yield self.sender.sendmail(input_mail)
self.fail('Exception expected!')
except MailSenderException, e:
for recipient in flatten([input_mail.to, input_mail.cc, input_mail.bcc]):
self.assertTrue(recipient in e.email_error_map)
@defer.inlineCallbacks
def test_keymanager_encrypt_problem_raises_exception(self):
input_mail = InputMail.from_dict(mail_dict(), from_address='pixelated@org')
when(OutgoingMail)._maybe_attach_key(any(), any(), any()).thenReturn(
defer.succeed(None))
when(OutgoingMail)._fix_headers(any(), any(), any()).thenReturn(
defer.succeed((None, mock())))
when(self._keymanager_mock).encrypt(any(), any(), sign=any(),
fetch_remote=any()).thenReturn(defer.fail(Exception('pretend key expired')))
with patch('leap.bitmask.mail.outgoing.service.emit_async'):
try:
yield self.sender.sendmail(input_mail)
self.fail('Exception expected!')
except MailSenderException, e:
for recipient in flatten([input_mail.to, input_mail.cc, input_mail.bcc]):
self.assertTrue(recipient in e.email_error_map)
@defer.inlineCallbacks
def test_iterates_over_recipients_and_send_whitout_bcc_field(self):
input_mail = InputMail.from_dict(mail_dict(), from_address='pixelated@org')
bccs = input_mail.bcc
when(OutgoingMail).send_message(any(), any()).thenReturn(defer.succeed(None))
yield self.sender.sendmail(input_mail)
for recipient in flatten([input_mail.to, input_mail.cc, input_mail.bcc]):
verify(OutgoingMail).send_message(MailToSmtpFormatCapture(recipient, bccs), TwistedSmtpUserCapture(recipient))
|