# # Copyright (c) 2014 ThoughtWorks, Inc. # # Pixelated is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Pixelated is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with Pixelated. If not, see . from twisted.trial import unittest from pixelated.adapter.mailstore.leap_mailstore import LeapMail from pixelated.adapter.model.mail import InputMail from pixelated.adapter.model.status import Status from pixelated.adapter.services.mail_service import MailService from test.support.test_helper import mail_dict, leap_mail, duplicates_in_fields_mail_dict from mockito import mock, unstub, when, verify, verifyNoMoreInteractions, any as ANY, never from twisted.internet import defer class TestMailService(unittest.TestCase): def setUp(self): self.drafts = mock() self.mail_store = mock() self.attachment_store = mock() self.mailboxes = mock() self.mailboxes.drafts = defer.succeed(self.drafts) self.mailboxes.trash = mock() self.mailboxes.sent = mock() self.mail_sender = mock() self.search_engine = mock() self.mail_service = MailService(self.mail_sender, self.mail_store, self.search_engine, 'acount@email', self.attachment_store) self.mail = InputMail.from_dict(duplicates_in_fields_mail_dict(), from_address='pixelated@org') def tearDown(self): unstub() def test_send_mail(self): when(InputMail).from_dict(ANY(), ANY()).thenReturn(self.mail) when(self.mail_sender).sendmail(ANY()).thenReturn(defer.Deferred()) sent_deferred = self.mail_service.send_mail(mail_dict()) verify(self.mail_sender).sendmail(self.mail) sent_deferred.callback('Assume sending mail succeeded') return sent_deferred @defer.inlineCallbacks def test_send_mail_removes_draft(self): mail = LeapMail('id', 'INBOX') when(mail).raw = 'raw mail' mail._headers['To'] = [] mail._headers['Cc'] = [] mail._headers['Bcc'] = [] when(InputMail).from_dict(ANY(), ANY()).thenReturn(mail) when(self.mail_store).delete_mail('12').thenReturn(defer.succeed(None)) when(self.mail_store).add_mail('SENT', ANY()).thenReturn(mail) deferred_success = defer.succeed(None) when(self.mail_sender).sendmail(ANY()).thenReturn(deferred_success) yield self.mail_service.send_mail({'ident': '12'}) verify(self.mail_sender).sendmail(mail) verify(self.mail_store).add_mail('SENT', mail.raw) verify(self.mail_store).delete_mail('12') @defer.inlineCallbacks def test_send_mail_marks_as_read(self): when(self.mail).raw = 'raw mail' when(InputMail).from_dict(ANY(), ANY()).thenReturn(self.mail) when(self.mail_store).delete_mail('12').thenReturn(defer.succeed(None)) when(self.mail_sender).sendmail(self.mail).thenReturn(defer.succeed(None)) sent_mail = LeapMail('id', 'INBOX') add_mail_deferral = defer.succeed(sent_mail) when(self.mail_store).add_mail('SENT', ANY()).thenReturn(add_mail_deferral) yield self.mail_service.send_mail({'ident': '12'}) self.assertIn(Status.SEEN, sent_mail.flags) verify(self.mail_store).update_mail(sent_mail) @defer.inlineCallbacks def test_send_mail_does_not_delete_draft_on_error(self): when(InputMail).from_dict(ANY(), ANY()).thenReturn(self.mail) deferred_failure = defer.fail(Exception("Assume sending mail failed")) when(self.mail_sender).sendmail(ANY()).thenReturn(deferred_failure) try: yield self.mail_service.send_mail({'ident': '12'}) self.fail("send_mail is expected to raise if underlying call fails") except: verify(self.mail_sender).sendmail(self.mail) verifyNoMoreInteractions(self.drafts) @defer.inlineCallbacks def test_mark_as_read(self): mail = LeapMail(1, 'INBOX') when(self.mail_store).get_mail(1, include_body=True).thenReturn(mail) yield self.mail_service.mark_as_read(1) self.assertIn(Status.SEEN, mail.flags) verify(self.mail_store).update_mail(mail) @defer.inlineCallbacks def test_mark_as_unread(self): mail = LeapMail(1, 'INBOX') mail.flags.add(Status.SEEN) when(self.mail_store).get_mail(1, include_body=True).thenReturn(mail) yield self.mail_service.mark_as_unread(1) verify(self.mail_store).update_mail(mail) self.assertNotEqual(mail.status, Status.SEEN) @defer.inlineCallbacks def test_delete_mail(self): mail_to_delete = LeapMail(1, 'INBOX') when(self.mail_store).get_mail(1, include_body=True).thenReturn(defer.succeed(mail_to_delete)) yield self.mail_service.delete_mail(1) verify(self.mail_store).move_mail_to_mailbox(1, 'TRASH') @defer.inlineCallbacks def test_delete_mail_does_not_fail_for_invalid_mail(self): no_mail = None mail_id = 1 when(self.mail_store).get_mail(mail_id, include_body=True).thenReturn(defer.succeed(no_mail)) yield self.mail_service.delete_mail(mail_id) verify(self.mail_store, never).delete_mail(mail_id) verify(self.mail_store, never).move_mail_to_mailbox(mail_id, ANY()) @defer.inlineCallbacks def test_recover_mail(self): mail_to_recover = LeapMail(1, 'TRASH') when(self.mail_service).mail(1).thenReturn(mail_to_recover) when(self.mail_store).move_mail_to_mailbox(1, 'INBOX').thenReturn(mail_to_recover) yield self.mail_service.recover_mail(1) verify(self.mail_store).move_mail_to_mailbox(1, 'INBOX') @defer.inlineCallbacks def test_get_attachment(self): attachment_dict = {'content': bytearray('data'), 'content-type': 'text/plain'} when(self.attachment_store).get_mail_attachment('some attachment id').thenReturn(defer.succeed(attachment_dict)) attachment = yield self.mail_service.attachment('some attachment id') self.assertEqual(attachment_dict, attachment) @defer.inlineCallbacks def test_update_tags_return_a_set_with_the_current_tags(self): mail = LeapMail(1, 'INBOX', tags={'custom_1', 'custom_2'}) when(self.mail_store).get_mail(1, include_body=True).thenReturn(mail) when(self.search_engine).tags(query='', skip_default_tags=True).thenReturn([]) updated_mail = yield self.mail_service.update_tags(1, {'custom_1', 'custom_3'}) verify(self.mail_store).update_mail(mail) self.assertEqual({'custom_1', 'custom_3'}, updated_mail.tags) @defer.inlineCallbacks def test_if_recipient_doubled_in_fields_send_only_in_bcc(self): mail = InputMail.from_dict(duplicates_in_fields_mail_dict(), from_address='pixelated@org') yield self.mail_service._deduplicate_recipients(mail) self.assertIn('to@pixelated.org', mail.to) self.assertNotIn('another@pixelated.org', mail.to) self.assertIn('another@pixelated.org', mail.bcc) @defer.inlineCallbacks def test_if_recipient_doubled_in_fields_send_only_in_to(self): mail = InputMail.from_dict(duplicates_in_fields_mail_dict(), from_address='pixelated@org') yield self.mail_service._deduplicate_recipients(mail) self.assertIn('third@pixelated.org', mail.to) self.assertNotIn('third@pixelated.org', mail.cc) self.assertIn('cc@pixelated.org', mail.cc) self.assertNotIn('another@pixelated.org', mail.cc) @defer.inlineCallbacks def test_if_deduplicates_when_recipient_repeated_in_field(self): mail = InputMail.from_dict(duplicates_in_fields_mail_dict(), from_address='pixelated@org') yield self.mail_service._deduplicate_recipients(mail) self.assertItemsEqual(['bcc@pixelated.org', 'another@pixelated.org'], mail.bcc) self.assertItemsEqual(['third@pixelated.org', 'to@pixelated.org'], mail.to) self.assertItemsEqual(['cc@pixelated.org'], mail.cc) def test_remove_canonical_recipient_when_it_is_not_canonical(self): recipient = u'user@pixelated.org' non_canonical = self.mail_service._remove_canonical_recipient(recipient) self.assertEqual(recipient, non_canonical) def test_remove_canonical_recipient_when_it_is_canonical(self): recipient = u'User ' non_canonical = self.mail_service._remove_canonical_recipient(recipient) self.assertEqual(u'user@pixelated.org', non_canonical)