summaryrefslogtreecommitdiff
path: root/service/test/unit/adapter/services/test_mail_sender.py
blob: 863ac81a058359439c6aefaca17927cd3eb57d3b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#
# Copyright (c) 2014 ThoughtWorks, Inc.
#
# Pixelated is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pixelated is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
from leap.mail.outgoing.service import OutgoingMail
from twisted.mail.smtp import User
from twisted.trial import unittest

from mockito import mock, when, verify, any, unstub
from pixelated.adapter.services.mail_sender import MailSender, MailSenderException
from pixelated.adapter.model.mail import InputMail
from pixelated.bitmask_libraries.smtp import LeapSMTPConfig
from pixelated.support.functional import flatten
from test.support.test_helper import mail_dict
from twisted.internet import reactor, defer
from twisted.internet.defer import Deferred
from mockito.matchers import Matcher


class TwistedSmtpUserCapture(Matcher):

    def __init__(self, username):
        self._username = username

    def matches(self, arg):
        return isinstance(arg, User) \
            and isinstance(arg.dest.addrstr, str) \
            and self._username == arg.dest.addrstr


class MailToSmtpFormatCapture(Matcher):

    def __init__(self, recipient, bccs):
        self._recipient = recipient
        self._bccs = bccs

    def matches(self, mail):
        if self._recipient in self._bccs:
            return 'Bcc: %s\n' % self._recipient in mail
        else:
            return "Bcc: " not in mail


class MailSenderTest(unittest.TestCase):

    def setUp(self):
        self._cert_path = u'/some/cert/path'
        self._keymanager_mock = mock()
        self._remote_smtp_host = 'some.host.test'
        self._remote_smtp_port = 1234
        self._smtp_config = LeapSMTPConfig('someone@somedomain.tld', self._cert_path, self._remote_smtp_host, self._remote_smtp_port)
        self.sender = MailSender(self._smtp_config, self._keymanager_mock)

    def tearDown(self):
        unstub()

    @defer.inlineCallbacks
    def test_iterates_over_recipients(self):
        input_mail = InputMail.from_dict(mail_dict(), from_address='pixelated@org')

        when(OutgoingMail).send_message(any(), any()).thenReturn(defer.succeed(None))

        yield self.sender.sendmail(input_mail)

        for recipient in flatten([input_mail.to, input_mail.cc, input_mail.bcc]):
            verify(OutgoingMail).send_message(any(), TwistedSmtpUserCapture(recipient))

    @defer.inlineCallbacks
    def test_send_leaves_mail_in_tact(self):
        input_mail_dict = mail_dict()
        input_mail = InputMail.from_dict(input_mail_dict, from_address='pixelated@org')

        when(OutgoingMail).send_message(any(), any()).thenReturn(defer.succeed(None))

        yield self.sender.sendmail(input_mail)

        self.assertEqual(input_mail.to, input_mail_dict["header"]["to"])
        self.assertEqual(input_mail.cc, input_mail_dict["header"]["cc"])
        self.assertEqual(input_mail.bcc, input_mail_dict["header"]["bcc"])
        self.assertEqual(input_mail.subject, input_mail_dict["header"]["subject"])

    @defer.inlineCallbacks
    def test_problem_with_email_raises_exception(self):
        input_mail = InputMail.from_dict(mail_dict(), from_address='pixelated@org')

        when(OutgoingMail).send_message(any(), any()).thenReturn(defer.fail(Exception('pretend something went wrong')))

        try:
            yield self.sender.sendmail(input_mail)
            self.fail('Exception expected!')
        except MailSenderException, e:
            for recipient in flatten([input_mail.to, input_mail.cc, input_mail.bcc]):
                self.assertTrue(recipient in e.email_error_map)

    @defer.inlineCallbacks
    def test_iterates_over_recipients_and_send_whitout_bcc_field(self):
        input_mail = InputMail.from_dict(mail_dict(), from_address='pixelated@org')
        bccs = input_mail.bcc

        when(OutgoingMail).send_message(any(), any()).thenReturn(defer.succeed(None))

        yield self.sender.sendmail(input_mail)

        for recipient in flatten([input_mail.to, input_mail.cc, input_mail.bcc]):
            verify(OutgoingMail).send_message(MailToSmtpFormatCapture(recipient, bccs), TwistedSmtpUserCapture(recipient))