diff options
Diffstat (limited to 'service/test/unit')
21 files changed, 1474 insertions, 0 deletions
diff --git a/service/test/unit/__init__.py b/service/test/unit/__init__.py new file mode 100644 index 00000000..2756a319 --- /dev/null +++ b/service/test/unit/__init__.py @@ -0,0 +1,15 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see <http://www.gnu.org/licenses/>. diff --git a/service/test/unit/adapter/__init__.py b/service/test/unit/adapter/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/service/test/unit/adapter/__init__.py diff --git a/service/test/unit/adapter/mail_service_test.py b/service/test/unit/adapter/mail_service_test.py new file mode 100644 index 00000000..549ab05c --- /dev/null +++ b/service/test/unit/adapter/mail_service_test.py @@ -0,0 +1,60 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see <http://www.gnu.org/licenses/>. +import unittest + +from pixelated.adapter.mail_service import MailService +from mockito import * +import pixelated.adapter.soledad_querier + + +class TestMailService(unittest.TestCase): + def setUp(self): + self.querier = mock() + pixelated.adapter.soledad_querier.get_soledad_querier_instance = lambda x, y: self.querier + + self.mailboxes = mock() + self.mailboxes.drafts = lambda: mock() + self.mailboxes.trash = lambda: mock() + self.mailboxes.sent = lambda: mock() + + self.mail_sender = mock() + self.mail_service = MailService(self.mailboxes, self.mail_sender) + + def test_send_mail(self): + mail = "mail" + + self.mail_service.send(1, mail) + + verify(self.mail_sender).sendmail(mail) + + def test_mark_as_read(self): + mail = mock() + when(self.mail_service).mail(any()).thenReturn(mail) + self.mail_service.mark_as_read(1) + + verify(mail).mark_as_read() + + def test_create_draft(self): + mail = '' + + self.mail_service.create_draft(mail) + + verify(self.mailboxes).add_draft(mail) + + def test_delete_mail(self): + self.mail_service.delete_mail(1) + + verify(self.mailboxes).move_to_trash(1) diff --git a/service/test/unit/adapter/pixelated_mail_sender_test.py b/service/test/unit/adapter/pixelated_mail_sender_test.py new file mode 100644 index 00000000..207baadb --- /dev/null +++ b/service/test/unit/adapter/pixelated_mail_sender_test.py @@ -0,0 +1,46 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see <http://www.gnu.org/licenses/>. +import unittest + +from pixelated.adapter.pixelated_mail import PixelatedMail +from pixelated.adapter.pixelated_mail_sender import PixelatedMailSender +from mockito import * +from test.support import test_helper + + +class PixelatedMailSenderTest(unittest.TestCase): + def setUp(self): + self.mail_address = "pixelated@pixelated.org" + self.smtp_client = mock() + self.mail_sender = PixelatedMailSender(self.mail_address, self.smtp_client) + + def test_send_mail_sends_to_To_Cc_and_Bcc(self): + headers = { + 'To': ['to@pixelated.org', 'anotherto@pixelated.org'], + 'Cc': ['cc@pixelated.org', 'anothercc@pixelated.org'], + 'Bcc': ['bcc@pixelated.org', 'anotherbcc@pixelated.org'] + } + + mail = PixelatedMail.from_soledad(*test_helper.leap_mail(extra_headers=headers)) + mail.to_smtp_format = lambda: "mail as smtp string" + + self.mail_sender.sendmail(mail) + + expected_recipients = ['to@pixelated.org', 'anotherto@pixelated.org', 'cc@pixelated.org', + 'anothercc@pixelated.org', + 'bcc@pixelated.org', 'anotherbcc@pixelated.org'] + + verify(self.smtp_client).sendmail(self.mail_address, expected_recipients, "mail as smtp string") diff --git a/service/test/unit/adapter/pixelated_mail_test.py b/service/test/unit/adapter/pixelated_mail_test.py new file mode 100644 index 00000000..0ab09a0a --- /dev/null +++ b/service/test/unit/adapter/pixelated_mail_test.py @@ -0,0 +1,133 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see <http://www.gnu.org/licenses/>. +import unittest + +import os +import pixelated.support.date +from pixelated.adapter.pixelated_mail import PixelatedMail, InputMail +from pixelated.adapter.tag_service import TagService +from pixelated.adapter.tag_index import TagIndex +from pixelated.adapter.tag import Tag +from mockito import * +from test.support import test_helper + + +class TestPixelatedMail(unittest.TestCase): + + def setUp(self): + self.querier = mock() + + def test_parse_date_from_soledad_uses_date_header_if_available(self): + leap_mail_date = 'Wed, 3 Sep 2014 12:36:17 -0300' + leap_mail_date_in_iso_format = "2014-09-03T12:36:17-03:00" + + leap_mail = test_helper.leap_mail(headers={'date': leap_mail_date}) + + mail = PixelatedMail.from_soledad(*leap_mail, soledad_querier=self.querier) + + self.assertEqual(str(mail.headers['Date']), leap_mail_date_in_iso_format) + + def test_parse_date_from_soledad_fallback_to_received_header_if_date_header_isnt_available(self): + leap_mail_date = "Wed, 03 Sep 2014 13:11:15 -0300" + leap_mail_date_in_iso_format = "2014-09-03T13:11:15-03:00" + leap_mail_received_header = "by bitmask.local from 127.0.0.1 with ESMTP ;\n " + leap_mail_date + + leap_mail = test_helper.leap_mail(headers={'received': leap_mail_received_header}) + + mail = PixelatedMail.from_soledad(*leap_mail, soledad_querier=self.querier) + + self.assertEqual(str(mail.headers['Date']), leap_mail_date_in_iso_format) + + def test_update_tags_return_a_set_with_the_current_tags(self): + soledad_docs = test_helper.leap_mail(extra_headers={'X-tags': '["custom_1", "custom_2"]'}) + pixelated_mail = PixelatedMail.from_soledad(*soledad_docs, soledad_querier=self.querier) + + current_tags = pixelated_mail.update_tags({'custom_1', 'custom_3'}) + self.assertEquals({'custom_3', 'custom_1'}, current_tags) + + def test_mark_as_read(self): + mail = PixelatedMail.from_soledad(*test_helper.leap_mail(flags=[]), soledad_querier=self.querier) + + mail.mark_as_read() + + self.assertEquals(mail.fdoc.content['flags'], ['\\Seen']) + + def test_mark_as_not_recent(self): + mail = PixelatedMail.from_soledad(*test_helper.leap_mail(flags=['\\Recent']), soledad_querier=self.querier) + + mail.mark_as_not_recent() + + self.assertEquals(mail.fdoc.content['flags'], []) + + def test_update_tags_notifies_tag_service(self): + db_path = '/tmp/test_update_tags_notifies_tag_service' + TagService.instance = TagService(TagIndex(db_path)) + + mail = PixelatedMail.from_soledad(*test_helper.leap_mail(), soledad_querier=self.querier) + + mail.update_tags({'new_tag'}) + self.assertIn(Tag('new_tag'), mail.tag_service.all_tags()) + + os.remove(db_path + '.db') + + +class InputMailTest(unittest.TestCase): + mail_dict = lambda x: { + 'body': 'Este \xe9 o corpo', + 'header': { + 'cc': ['cc@pixelated.org', 'anothercc@pixelated.org'], + 'to': ['to@pixelated.org', 'anotherto@pixelated.org'], + 'bcc': ['bcc@pixelated.org', 'anotherbcc@pixelated.org'], + 'subject': 'Oi' + }, + 'ident': '', + 'tags': ['sent'] + } + + def test_to_mime_multipart_should_add_blank_fields(self): + pixelated.support.date.iso_now = lambda: 'date now' + + mail_dict = self.mail_dict() + mail_dict['header']['to'] = '' + mail_dict['header']['bcc'] = '' + mail_dict['header']['cc'] = '' + mail_dict['header']['subject'] = '' + + mime_multipart = InputMail.from_dict(mail_dict).to_mime_multipart() + + self.assertNotRegexpMatches(mime_multipart.as_string(), "\nTo: \n") + self.assertNotRegexpMatches(mime_multipart.as_string(), "\nBcc: \n") + self.assertNotRegexpMatches(mime_multipart.as_string(), "\nCc: \n") + self.assertNotRegexpMatches(mime_multipart.as_string(), "\nSubject: \n") + + def test_to_mime_multipart(self): + pixelated.support.date.iso_now = lambda: 'date now' + + mime_multipart = InputMail.from_dict(self.mail_dict()).to_mime_multipart() + + self.assertRegexpMatches(mime_multipart.as_string(), "\nTo: to@pixelated.org, anotherto@pixelated.org\n") + self.assertRegexpMatches(mime_multipart.as_string(), "\nCc: cc@pixelated.org, anothercc@pixelated.org\n") + self.assertRegexpMatches(mime_multipart.as_string(), "\nBcc: bcc@pixelated.org, anotherbcc@pixelated.org\n") + self.assertRegexpMatches(mime_multipart.as_string(), "\nDate: date now\n") + self.assertRegexpMatches(mime_multipart.as_string(), "\nSubject: Oi\n") + self.assertRegexpMatches(mime_multipart.as_string(), "\nEste \xe9 o corpo") + + def test_smtp_format(self): + PixelatedMail.from_email_address = 'pixelated@org' + + smtp_format = InputMail.from_dict(self.mail_dict()).to_smtp_format() + + self.assertRegexpMatches(smtp_format, "\nFrom: pixelated@org") diff --git a/service/test/unit/adapter/pixelated_mailbox_test.py b/service/test/unit/adapter/pixelated_mailbox_test.py new file mode 100644 index 00000000..d38cef5c --- /dev/null +++ b/service/test/unit/adapter/pixelated_mailbox_test.py @@ -0,0 +1,36 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see <http://www.gnu.org/licenses/>. +import unittest + +from pixelated.adapter.pixelated_mail import PixelatedMail +from pixelated.adapter.pixelated_mailbox import PixelatedMailbox +from mockito import * +from test.support import test_helper + + +class PixelatedMailboxTest(unittest.TestCase): + def setUp(self): + self.tag_service = mock() + self.querier = mock() + self.mailbox = PixelatedMailbox('INBOX', self.querier, tag_service=self.tag_service) + + def test_remove_message_from_mailbox(self): + mail = PixelatedMail.from_soledad(*test_helper.leap_mail(), soledad_querier=self.querier) + when(self.querier).mail(1).thenReturn(mail) + + self.mailbox.remove(1) + + verify(self.querier).remove_mail(mail) diff --git a/service/test/unit/adapter/pixelated_mailboxes_test.py b/service/test/unit/adapter/pixelated_mailboxes_test.py new file mode 100644 index 00000000..8314f7f8 --- /dev/null +++ b/service/test/unit/adapter/pixelated_mailboxes_test.py @@ -0,0 +1,67 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see <http://www.gnu.org/licenses/>. +import unittest + +from test.support import test_helper +from mockito import * +import pixelated.adapter.soledad_querier + +querier = mock() +when(pixelated.adapter.soledad_querier).get_soledad_querier_instance().thenReturn(querier) + +from pixelated.adapter.pixelated_mail import InputMail +from pixelated.adapter.pixelated_mailbox import PixelatedMailbox +from pixelated.adapter.pixelated_mailboxes import PixelatedMailBoxes + + +class PixelatedMailboxesTest(unittest.TestCase): + def setUp(self): + + self.account = mock() + self.drafts_mailbox = mock() + self.drafts_mailbox.mailbox_name = 'drafts' + self.mailboxes = PixelatedMailBoxes(self.account) + self.mailboxes.drafts = lambda: self.drafts_mailbox + + def test_search_for_tags(self): + mailbox = mock() + self.account.mailboxes = ['INBOX'] + tags_to_search_for = {'tags': ['inbox', 'custom_tag']} + + when(PixelatedMailbox).create('INBOX').thenReturn(mailbox) + when(mailbox).mails_by_tags(any(list)).thenReturn(["mail"]) + + mails = self.mailboxes.mails_by_tag(tags_to_search_for['tags']) + + self.assertEqual(1, len(mails)) + self.assertEqual("mail", mails[0]) + + def test_add_draft(self): + mail = InputMail() + when(self.drafts_mailbox).add(mail).thenReturn(1) + + self.mailboxes.add_draft(mail) + + verify(self.drafts_mailbox).add(mail) + + def test_update_draft(self): + mail = test_helper.input_mail() + when(self.drafts_mailbox).add(mail).thenReturn(mail) + + self.mailboxes.update_draft(mail.ident, mail) + + inorder.verify(self.drafts_mailbox).add(mail) + inorder.verify(self.drafts_mailbox).remove(mail.ident) diff --git a/service/test/unit/adapter/tag_index_test.py b/service/test/unit/adapter/tag_index_test.py new file mode 100644 index 00000000..21564fc5 --- /dev/null +++ b/service/test/unit/adapter/tag_index_test.py @@ -0,0 +1,88 @@ + +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see <http://www.gnu.org/licenses/>. +import unittest +import os +import uuid + +from pixelated.adapter.tag_index import TagIndex +from pixelated.adapter.tag import Tag + + +class TestTagIndex(unittest.TestCase): + + def setUp(self): + self.db_path = '/tmp/test_tag_index_' + str(uuid.uuid4()) + self.tag_index = TagIndex(self.db_path) + + def tearDown(self): + self.tag_index._close_db() + os.remove(self.db_path + '.db') + + def test_get_and_set_works(self): + tag = Tag('a_tag') + self.tag_index.set(tag) + self.assertEquals(tag, self.tag_index.get('a_tag')) + + def test_values_returns_all_values_in_the_index(self): + tag_a = Tag('tag_a') + self.tag_index.set(tag_a) + tag_b = Tag('tag_b') + self.tag_index.set(tag_b) + tag_c = Tag('tag_c') + self.tag_index.set(tag_c) + + self.assertEquals(set([tag_a, tag_b, tag_c]), self.tag_index.values()) + + def test_changes_are_visible_between_instances_using_same_file(self): + tag = Tag('some_tag') + self.tag_index.set(tag) + + other_tag_index = TagIndex(self.db_path) + self.assertIn(tag, other_tag_index.values()) + + def test_add_does_not_replace_existent_tag_with_same_name(self): + tag = Tag('tag', True) + self.tag_index.set(tag) + + same_name_tag = Tag('tag', False) + self.tag_index.add(same_name_tag) + + self.assertEquals(True, self.tag_index.get('tag').default) + + def test_empty_returns_true_if_there_are_no_values(self): + self.assertTrue(self.tag_index.empty()) + + def test_empty_returns_false_if_there_are_values(self): + self.tag_index.set(Tag('tag')) + self.assertFalse(self.tag_index.empty()) + + def test_remove_deletes_the_tag_with_the_given_key_from_the_index(self): + self.tag_index.set(Tag('tag')) + self.tag_index.remove('tag') + self.assertEquals(None, self.tag_index.get('tag')) + + def test_remove_does_not_raises_exception_if_key_is_not_present(self): + self.tag_index.remove('not_there') + + def test_removals_are_visible_between_instances_using_same_file(self): + tag = Tag('some_tag') + self.tag_index.set(tag) + + other_tag_index = TagIndex(self.db_path) + other_tag_index.remove('some_tag') + + self.assertIsNone(self.tag_index.get('some_tag')) diff --git a/service/test/unit/adapter/test_status.py b/service/test/unit/adapter/test_status.py new file mode 100644 index 00000000..bcdbb360 --- /dev/null +++ b/service/test/unit/adapter/test_status.py @@ -0,0 +1,33 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see <http://www.gnu.org/licenses/>. +import unittest + +from pixelated.adapter.status import Status + + +class TestStatus(unittest.TestCase): + + def test_leap_seen_flag_is_translated_to_read_status(self): + status = Status.from_flag('\\Seen') + self.assertEquals(Status('read'), status) + + def test_leap_answered_flag_is_translated_to_replied_status(self): + status = Status.from_flag('\\Answered') + self.assertEquals(Status('replied'), status) + + def test_bulk_conversion(self): + statuses = Status.from_flags(['\\Answered', '\\Seen', '\\Recent', 'tag_a_custom']) + self.assertEquals(set([Status('read'), Status('replied'), Status('recent')]), statuses) diff --git a/service/test/unit/adapter/test_tag.py b/service/test/unit/adapter/test_tag.py new file mode 100644 index 00000000..fc14ff49 --- /dev/null +++ b/service/test/unit/adapter/test_tag.py @@ -0,0 +1,79 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see <http://www.gnu.org/licenses/>. +import unittest + +from pixelated.adapter.tag import Tag + + +class TestTag(unittest.TestCase): + + def test_from_dict_sets_all_tag_attributes(self): + tag_dict = {'name': 'a_tag', + 'default': False, + 'counts': {'total': 3, + 'read': 1, + 'starred': 1, + 'replied': 1}, + 'mails': [1, 2, 3]} + + tag = Tag.from_dict(tag_dict) + + self.assertEquals(tag_dict['name'], tag.name) + self.assertEquals(tag_dict['default'], tag.default) + self.assertEquals(tag_dict['counts']['total'], tag.total) + # Checks if mail ids are aways restored as set() + self.assertEquals(type(tag.mails), type(set())) + self.assertEquals(set(tag_dict['mails']), tag.mails) + + def test_as_dict_puts_all_tag_attributes_in_the_returning_dict(self): + tag = Tag('some_tag', default=True) + tag.counts = {'total': 0, 'read': 0, 'starred': 0, 'replied': 0} + tag.mails = [1, 2, 3] + + tag_dict = tag.as_dict() + + self.assertEquals(tag.name, tag_dict['name']) + self.assertEquals(tag.default, tag_dict['default']) + self.assertEquals(tag.total, tag_dict['counts']['total']) + self.assertEquals(tag.mails, tag_dict['mails']) + + def test_increments_total_count_and_adds_mails_id_to_mails(self): + tag = Tag('another') + tag.increment(12) + + self.assertIn(12, tag.mails) + self.assertEquals(1, tag.total) + + def test_decrement_does_nothing_if_mail_has_not_the_tag(self): + tag = Tag('tag') + tag.decrement(2000) + + self.assertEquals(0, tag.total) + + def test_increment_does_nothing_if_mail_already_has_the_tag(self): + tag = Tag('tag') + tag.mails = set([12]) + tag.increment(12) + + self.assertEquals(1, tag.total) + + def test_decrements_total_count_and_removes_mails_id_from_mails(self): + tag = Tag('one_more') + tag.mails = set([12]) + tag.decrement(12) + + self.assertNotIn(12, tag.mails) + self.assertEquals(0, tag.total) diff --git a/service/test/unit/adapter/test_tag_service.py b/service/test/unit/adapter/test_tag_service.py new file mode 100644 index 00000000..aeb1b503 --- /dev/null +++ b/service/test/unit/adapter/test_tag_service.py @@ -0,0 +1,60 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see <http://www.gnu.org/licenses/>. + +import unittest +import tempfile + +from pixelated.adapter.tag import Tag +from pixelated.adapter.pixelated_mail import PixelatedMail +from pixelated.adapter.tag_index import TagIndex +from pixelated.adapter.tag_service import TagService +from test.support import test_helper + + +class TagServiceTest(unittest.TestCase): + def setUp(self): + self.index_file_handler, self.index_file_path = tempfile.mkstemp() + self.tag_index = TagIndex(self.index_file_path) + self.tag_service = TagService(tag_index=self.tag_index) + + def test_index_is_initialized_with_mail_tags_if_empty(self): + mail_one = PixelatedMail.from_soledad(*test_helper.leap_mail(uid=0, extra_headers={'X-Tags': '["tag_1"]'})) + mail_two = PixelatedMail.from_soledad(*test_helper.leap_mail(uid=1, extra_headers={'X-Tags': '["tag_2"]'})) + mails = [mail_one, mail_two] + + self.tag_service.load_index(mails) + + self.assertEqual(self.tag_service.all_tags(), {Tag('sent'), Tag('inbox'), Tag('drafts'), Tag('trash'), Tag('tag_1'), Tag('tag_2')}) + + def test_special_tags_always_exists(self): + self.tag_service.load_index([]) + + self.assertEqual(self.tag_service.all_tags(), {Tag('sent'), Tag('inbox'), Tag('drafts'), Tag('trash')}) + + def test_notify_tags_updated_method_properly_changes_tags_state(self): + mail_ident = 12 + tag = Tag('one_tag') + tag.increment(mail_ident) + self.tag_service.load_index([]) + self.tag_service.tag_index.set(tag) + + self.assertEquals(0, self.tag_service.tag_index.get('inbox').total) + self.assertEquals(1, self.tag_service.tag_index.get('one_tag').total) + + self.tag_service.notify_tags_updated({'inbox'}, {'one_tag'}, mail_ident) + + self.assertEquals(1, self.tag_service.tag_index.get('inbox').total) + self.assertIsNone(self.tag_service.tag_index.get('one_tag')) diff --git a/service/test/unit/bitmask_libraries/__init__.py b/service/test/unit/bitmask_libraries/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/service/test/unit/bitmask_libraries/__init__.py diff --git a/service/test/unit/bitmask_libraries/abstract_leap_test.py b/service/test/unit/bitmask_libraries/abstract_leap_test.py new file mode 100644 index 00000000..ddcfb08f --- /dev/null +++ b/service/test/unit/bitmask_libraries/abstract_leap_test.py @@ -0,0 +1,41 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see <http://www.gnu.org/licenses/>. +import os +import tempfile +import unittest +from uuid import uuid4 +from mock import Mock, MagicMock + + +class AbstractLeapTest(unittest.TestCase): + uuid = str(uuid4()) + session_id = str(uuid4()) + token = str(uuid4()) + + leap_home = os.path.join(tempfile.mkdtemp(), 'leap') + + config = Mock(leap_home=leap_home, ca_cert_bundle='/some/path/to/ca_cert', gpg_binary='/path/to/gpg') + provider = Mock(config=config, server_name='some-server.test', domain='some-server.test', + api_uri='https://api.some-server.test:4430', api_version='1') + soledad = Mock() + soledad_session = Mock(soledad=soledad) + srp_session = Mock(user_name='test_user', api_server_name='some-server.test', uuid=uuid, session_id=session_id, token=token) + + nicknym = MagicMock() + + soledad_account = MagicMock() + + mail_fetcher_mock = MagicMock() diff --git a/service/test/unit/bitmask_libraries/leap_srp_test.py b/service/test/unit/bitmask_libraries/leap_srp_test.py new file mode 100644 index 00000000..591929ce --- /dev/null +++ b/service/test/unit/bitmask_libraries/leap_srp_test.py @@ -0,0 +1,157 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see <http://www.gnu.org/licenses/>. +import json +import unittest +import binascii +from urlparse import parse_qs + +from httmock import urlmatch, all_requests, HTTMock, response +from requests.exceptions import Timeout +import srp + +from pixelated.bitmask_libraries.leap_srp import LeapSecureRemotePassword, LeapAuthException + +(salt_bytes, verification_key_bytes) = srp.create_salted_verification_key('username', 'password', hash_alg=srp.SHA256, ng_type=srp.NG_1024) +verifier = None + + +@all_requests +def not_found_mock(url, request): + return {'status_code': 404, + 'content': 'foobar'} + + +@all_requests +def timeout_mock(url, request): + raise Timeout() + + +@urlmatch(netloc=r'(.*\.)?leap\.local$') +def srp_login_server_simulator_mock(url, request): + global verifier + + data = parse_qs(request.body) + if 'login' in data: + # SRP Authentication Step 1 + A = binascii.unhexlify(data.get('A')[0]) + + verifier = srp.Verifier('username', salt_bytes, verification_key_bytes, A, hash_alg=srp.SHA256, ng_type=srp.NG_1024) + (salt, B) = verifier.get_challenge() + + content = { + 'salt': binascii.hexlify(salt), + 'B': binascii.hexlify(B) + } + + return {'status_code': 200, + 'content': json.dumps(content)} + + else: + # SRP Authentication Step 2 + data = parse_qs(request.body) + client_auth = binascii.unhexlify(data.get('client_auth')[0]) + + M2 = verifier.verify_session(client_auth) + + if not verifier.authenticated(): + return {'status_code': 404, + 'content': ''} + + content = { + 'M2': binascii.hexlify(M2), + 'id': 'some id', + 'token': 'some token' + } + headers = { + 'Content-Type': 'application/json', + 'Set-Cookie': '_session_id=some_session_id;'} + return response(200, content, headers, None, 5, request) + + +class LeapSRPTest(unittest.TestCase): + + def test_status_code_is_checked(self): + with HTTMock(not_found_mock): + lsrp = LeapSecureRemotePassword() + self.assertRaises(LeapAuthException, lsrp.authenticate, 'https://api.leap.local', 'username', 'password') + + def test_invalid_username(self): + with HTTMock(srp_login_server_simulator_mock): + lsrp = LeapSecureRemotePassword() + self.assertRaises(LeapAuthException, lsrp.authenticate, 'https://api.leap.local', 'invalid_user', 'password') + + def test_invalid_password(self): + with HTTMock(srp_login_server_simulator_mock): + lsrp = LeapSecureRemotePassword() + self.assertRaises(LeapAuthException, lsrp.authenticate, 'https://api.leap.local', 'username', 'invalid') + + def test_login(self): + with HTTMock(srp_login_server_simulator_mock): + lsrp = LeapSecureRemotePassword() + leap_session = lsrp.authenticate('https://api.leap.local', 'username', 'password') + + self.assertIsNotNone(leap_session) + self.assertEqual('username', leap_session.user_name) + self.assertEqual('1', leap_session.api_version) + self.assertEqual('https://api.leap.local', leap_session.api_server_name) + self.assertEqual('some token', leap_session.token) + self.assertEqual('some_session_id', leap_session.session_id) + + def test_timeout(self): + with HTTMock(timeout_mock): + lrsp = LeapSecureRemotePassword() + self.assertRaises(LeapAuthException, lrsp.authenticate, 'https://api.leap.local', 'username', 'password') + + def test_register_raises_auth_exception_on_error(self): + with HTTMock(not_found_mock): + lsrp = LeapSecureRemotePassword() + self.assertRaises(LeapAuthException, lsrp.register, 'https://api.leap.local', 'username', 'password') + + def test_register(self): + @urlmatch(netloc=r'(.*\.)?leap\.local$', path='/1/users') + def register_success(url, request): + + content = { + 'login': 'username', + 'ok': True + } + + return {'status_code': 201, + 'content': content} + + with HTTMock(register_success, not_found_mock): + lsrp = LeapSecureRemotePassword() + self.assertTrue(lsrp.register('https://api.leap.local', 'username', 'password')) + + def test_register_user_exists(self): + @urlmatch(netloc=r'(.*\.)?leap\.local$', path='/1/users') + def register_error_user_exists(url, request): + content = {"errors": { + "login": [ + "has already been taken", "has already been taken", "has already been taken" + ]}} + + return {'status_code': 422, + 'content': content} + + with HTTMock(register_error_user_exists, not_found_mock): + lsrp = LeapSecureRemotePassword() + self.assertRaises(LeapAuthException, lsrp.register, 'https://api.leap.local', 'username', 'password') + + def test_registration_timeout(self): + with HTTMock(timeout_mock): + lsrp = LeapSecureRemotePassword() + self.assertRaises(LeapAuthException, lsrp.register, 'https://api.leap.local', 'username', 'password') diff --git a/service/test/unit/bitmask_libraries/nicknym_test.py b/service/test/unit/bitmask_libraries/nicknym_test.py new file mode 100644 index 00000000..9d564abe --- /dev/null +++ b/service/test/unit/bitmask_libraries/nicknym_test.py @@ -0,0 +1,48 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see <http://www.gnu.org/licenses/>. +from mock import patch + +from leap.keymanager import openpgp, KeyNotFound +from pixelated.bitmask_libraries.nicknym import NickNym +from abstract_leap_test import AbstractLeapTest + + +class NickNymTest(AbstractLeapTest): + @patch('pixelated.bitmask_libraries.nicknym.KeyManager.__init__', return_value=None) + def test_that_keymanager_is_created(self, init_mock): + # given + + # when + NickNym(self.provider, self.config, self.soledad_session, self.srp_session) + + # then + init_mock.assert_called_with('test_user@some-server.test', 'https://nicknym.some-server.test:6425/', + self.soledad, self.token, '/some/path/to/ca_cert', + 'https://api.some-server.test:4430', '1', self.uuid, + '/path/to/gpg') + + @patch('pixelated.bitmask_libraries.nicknym.KeyManager') + def test_gen_key(self, keymanager_mock): + # given + keyman = keymanager_mock.return_value + keyman.get_key.side_effect = KeyNotFound + nicknym = NickNym(self.provider, self.config, self.soledad_session, self.srp_session) + + # when/then + nicknym.generate_openpgp_key() + + keyman.get_key.assert_called_with('test_user@some-server.test', openpgp.OpenPGPKey, fetch_remote=False, private=True) + keyman.gen_key.assert_called_with(openpgp.OpenPGPKey) diff --git a/service/test/unit/bitmask_libraries/provider_test.py b/service/test/unit/bitmask_libraries/provider_test.py new file mode 100644 index 00000000..41cf3bf4 --- /dev/null +++ b/service/test/unit/bitmask_libraries/provider_test.py @@ -0,0 +1,186 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see <http://www.gnu.org/licenses/>. +import json + +from httmock import all_requests, HTTMock, urlmatch +from requests import HTTPError + +from pixelated.bitmask_libraries.config import LeapConfig +from pixelated.bitmask_libraries.provider import LeapProvider +from abstract_leap_test import AbstractLeapTest + + +@all_requests +def not_found_mock(url, request): + return {'status_code': 404, + 'content': 'foobar'} + + +@urlmatch(netloc=r'(.*\.)?some-provider\.test$', path='/provider.json') +def provider_json_mock(url, request): + return provider_json_response("SHA256: 06e2300bdbc118c290eda0dc977c24080718f4eeca68c8b0ad431872a2baa22d") + + +@urlmatch(netloc=r'(.*\.)?some-provider\.test$', path='/provider.json') +def provider_json_invalid_fingerprint_mock(url, request): + return provider_json_response("SHA256: 0123456789012345678901234567890123456789012345678901234567890123") + + +def provider_json_response(fingerprint): + content = { + "api_uri": "https://api.some-provider.test:4430", + "api_version": "1", + "ca_cert_fingerprint": fingerprint, + "ca_cert_uri": "https://some-provider.test/ca.crt", + "domain": "some-provider.test", + "services": [ + "mx" + ] + } + return { + "status_code": 200, + "content": json.dumps(content) + } + + +@urlmatch(netloc=r'api\.some-provider\.test:4430$', path='/1/config/soledad-service.json') +def soledad_json_mock(url, request): + content = { + "some key": "some value", + } + return { + "status_code": 200, + "content": json.dumps(content) + } + + +@urlmatch(netloc=r'api\.some-provider\.test:4430$', path='/1/config/smtp-service.json') +def smtp_json_mock(url, request): + content = { + "hosts": { + "leap-mx": { + "hostname": "mx.some-provider.test", + "ip_address": "0.0.0.0", + "port": 465 + } + }, + "locations": {}, + "serial": 1, + "version": 1 + } + return { + "status_code": 200, + "content": json.dumps(content) + } + + +@urlmatch(netloc=r'(.*\.)?some-provider\.test$', path='/ca.crt') +def ca_cert_mock(url, request): + return { + "status_code": 200, + "content": ca_crt + } + + +ca_crt = """ +-----BEGIN CERTIFICATE----- +MIIFbzCCA1egAwIBAgIBATANBgkqhkiG9w0BAQ0FADBKMREwDwYDVQQKDAhXYXpv +a2F6aTEaMBgGA1UECwwRaHR0cHM6Ly9kZmkubG9jYWwxGTAXBgNVBAMMEFdhem9r +YXppIFJvb3QgQ0EwHhcNMTQwMzI1MDAwMDAwWhcNMjQwMzI1MDAwMDAwWjBKMREw +DwYDVQQKDAhXYXpva2F6aTEaMBgGA1UECwwRaHR0cHM6Ly9kZmkubG9jYWwxGTAX +BgNVBAMMEFdhem9rYXppIFJvb3QgQ0EwggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAw +ggIKAoICAQDSPyaslC6SNVsKpGoXllInPXbjiq7rJaV08Xg+64FJU/257BZZEJ/j +r33r0xlt2kj85PcbPySLKy0omXAQt9bs273hwAQXExdY41FxMD3wP/dmLqd55KYa +LDV4GUw0QPZ0QUyWVrRHkrdCDyjpRG+6GbowmtygJKLflYmUFC1PYQ3492esr0jC ++Q6L6+/D2+hBiH3NPI22Yk0kQmuPfnu2pvo+EYQ3It81qZE0Jo8u/BqOMgN2f9DS +GvSNfZcKAP18A41/VRrYFa/WUcdDxt/uP5nO1dm2vfLorje3wcMGtGRcDKG/+GAm +S0nYKKQeWYc6z5SDvPM1VlNdn1gOejhAoggT3Hr5Dq8kxW/lQZbOz+HLbz15qGjz +gL4KHKuDE6hOuqxpHdMTY4WZBBQ8/6ICBxaXH9587/nNDdZiom+XukVD4mrSMJS7 +PRr14Hw57433AJDJcZRwZNRRAGgDPNsCoR2caKB6/Uwkp+dWVndj5Ad8MEjyM1yV ++fYU6PSQWNig7qqN5VhNY+zUCcez5gL6volMuW00iOkXISW4lBrcZmEAQTTcWT1D +U7EkLlwITQce63LcuvK7ZWsEm5XCqD+yUz9oQfugmIhxAlTdqt3De9FA0WT9WxGt +zLeswCNKjnMpRgTerq6elwB03EBJVc7k1QRn4+s6C30sXR12dYnEMwIDAQABo2Aw +XjAdBgNVHQ4EFgQU8ItSdI5pSqMDjgRjgYI3Nj0SwxQwDgYDVR0PAQH/BAQDAgIE +MAwGA1UdEwQFMAMBAf8wHwYDVR0jBBgwFoAU8ItSdI5pSqMDjgRjgYI3Nj0SwxQw +DQYJKoZIhvcNAQENBQADggIBALdSPUrIqyIlSMr4R7pWd6Ep0BZH5RztVUcoXtei +x2MFi/rsw7aL9qZqACYIE8Gkkh6Z6GQph0fIqhAlNFvJXKkguL3ri5xh0XmPfbv/ +OLIvaUAixATivdm8ro/IqYQWdL3P6mDZOv4O6POdBEJ9JLc9RXUt1LiQ5Xb9QiLs +l/yOthhp5dJHqC8s6CDEUHRe3s9Q/4cwNB4td47I+mkLsNtVNXqi4lOzuQamqiFt +cFIqOLTFtBJ7G3k9iaDuN6RPS6LMRbqabwg4gafQTmJ+roHpnsaiHkfomI4MZOVi +TLQKOAJ3/pRGm5cGzkzQ+z4sUiCSQxtIWs7EnQCCE8agqpef6zArAvKEO+139+f2 +u1BhWOm/aHT5a3INnJEbuFr8V9MlbZSxSzU3UH7hby+9PxWKYesc6KUAu6Icooci +gEQqrVhVKmfaYMLL7UZHhw56yv/6B10SSmeAMiJhtTExjjrTRLSCaKCPa2ISAUDB +aPR3t8ZoUESWRAFQGj5NvWOomTaXfyE8Or2WfNemvdlWsKvlLeVsjts+iaTgQRU9 +VXcrUhrHhaXhYXeWrWkDDcl8VUlDWXzoUGV9SczOGwr6hONJWMn1HNxNV7ywFWf0 +QXH1g3LBW7qNgRaGhbIX4a1WoNQDmbbKaLgKWs74atZ8o4A2aUEjomclgZWPsc5l +VeJ6 +-----END CERTIFICATE----- +""" + + +class LeapProviderTest(AbstractLeapTest): + def setUp(self): + self.config = LeapConfig(verify_ssl=False, leap_home='/tmp/foobar', ca_cert_bundle='/tmp/ca.crt') + + def test_provider_fetches_provider_json(self): + with HTTMock(provider_json_mock): + provider = LeapProvider('some-provider.test', self.config) + + self.assertEqual("1", provider.api_version) + self.assertEqual("some-provider.test", provider.domain) + self.assertEqual("https://api.some-provider.test:4430", provider.api_uri) + self.assertEqual("https://some-provider.test/ca.crt", provider.ca_cert_uri) + self.assertEqual("SHA256: 06e2300bdbc118c290eda0dc977c24080718f4eeca68c8b0ad431872a2baa22d", + provider.ca_cert_fingerprint) + self.assertEqual(["mx"], provider.services) + + def test_provider_json_throws_exception_on_status_code(self): + with HTTMock(not_found_mock): + self.assertRaises(HTTPError, LeapProvider, 'some-provider.test', self.config) + + def test_fetch_soledad_json(self): + with HTTMock(provider_json_mock, soledad_json_mock, not_found_mock): + provider = LeapProvider('some-provider.test', self.config) + soledad = provider.fetch_soledad_json() + + self.assertEqual("some value", soledad.get('some key')) + + def test_throw_exception_for_fetch_soledad_status_code(self): + with HTTMock(provider_json_mock, not_found_mock): + provider = LeapProvider('some-provider.test', self.config) + + self.assertRaises(HTTPError, provider.fetch_soledad_json) + + def test_fetch_smtp_json(self): + with HTTMock(provider_json_mock, smtp_json_mock, not_found_mock): + provider = LeapProvider('some-provider.test', self.config) + smtp = provider.fetch_smtp_json() + self.assertEqual('mx.some-provider.test', smtp.get('hosts').get('leap-mx').get('hostname')) + + def test_throw_exception_for_fetch_smtp_status_code(self): + with HTTMock(provider_json_mock, not_found_mock): + provider = LeapProvider('some-provider.test', self.config) + self.assertRaises(HTTPError, provider.fetch_smtp_json) + + def test_fetch_valid_certificate(self): + with HTTMock(provider_json_mock, ca_cert_mock, not_found_mock): + provider = LeapProvider('some-provider.test', self.config) + provider.fetch_valid_certificate() + + def test_throw_exception_for_invalid_certificate(self): + with HTTMock(provider_json_invalid_fingerprint_mock, ca_cert_mock, not_found_mock): + provider = LeapProvider('some-provider.test', self.config) + self.assertRaises(Exception, provider.fetch_valid_certificate) diff --git a/service/test/unit/bitmask_libraries/session_test.py b/service/test/unit/bitmask_libraries/session_test.py new file mode 100644 index 00000000..32d92f25 --- /dev/null +++ b/service/test/unit/bitmask_libraries/session_test.py @@ -0,0 +1,64 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see <http://www.gnu.org/licenses/>. +from mock import patch + +from pixelated.bitmask_libraries.session import LeapSession +from abstract_leap_test import AbstractLeapTest + + +class SessionTest(AbstractLeapTest): + def test_background_jobs_are_started(self): + self.config.start_background_jobs = True + + with patch('pixelated.bitmask_libraries.session.reactor.callFromThread', new=_execute_func) as _: + self._create_session() + + self.mail_fetcher_mock.start_loop.assert_called_once_with() + + def test_background_jobs_are_not_started(self): + self.config.start_background_jobs = False + + with patch('pixelated.bitmask_libraries.session.reactor.callFromThread', new=_execute_func) as _: + self._create_session() + + self.assertFalse(self.mail_fetcher_mock.start_loop.called) + + def test_that_close_stops_background_jobs(self): + with patch('pixelated.bitmask_libraries.session.reactor.callFromThread', new=_execute_func) as _: + session = self._create_session() + + session.close() + + self.mail_fetcher_mock.stop.assert_called_once_with() + + def test_that_sync_deferes_to_soledad(self): + session = self._create_session() + + session.sync() + + self.soledad_session.sync.assert_called_once_with() + + def test_account_email(self): + session = self._create_session() + self.assertEqual('test_user@some-server.test', session.account_email()) + + def _create_session(self): + return LeapSession(self.provider, self.srp_session, self.soledad_session, self.nicknym, self.soledad_account, + self.mail_fetcher_mock) + + +def _execute_func(func): + func() diff --git a/service/test/unit/bitmask_libraries/smtp_test.py b/service/test/unit/bitmask_libraries/smtp_test.py new file mode 100644 index 00000000..2bb3dcab --- /dev/null +++ b/service/test/unit/bitmask_libraries/smtp_test.py @@ -0,0 +1,96 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see <http://www.gnu.org/licenses/>. +import os +from mock import MagicMock, patch +from abstract_leap_test import AbstractLeapTest +from pixelated.bitmask_libraries.smtp import LeapSmtp +from httmock import all_requests, HTTMock, urlmatch + + +@all_requests +def not_found_mock(url, request): + sys.stderr.write('url=%s\n' % url.netloc) + sys.stderr.write('path=%s\n' % url.path) + return {'status_code': 404, + 'content': 'foobar'} + + +@urlmatch(netloc='api.some-server.test:4430', path='/1/cert') +def ca_cert_mock(url, request): + return { + "status_code": 200, + "content": "some content" + } + + +class LeapSmtpTest(AbstractLeapTest): + keymanager = MagicMock() + + def setUp(self): + self.provider.fetch_smtp_json.return_value = { + 'hosts': { + 'leap-mx': { + 'hostname': 'smtp.some-sever.test', + 'port': '1234' + } + } + } + self.config.timeout_in_s = 15 + + def test_that_client_cert_gets_downloaded(self): + smtp = LeapSmtp(self.provider, self.keymanager, self.srp_session) + + with HTTMock(ca_cert_mock, not_found_mock): + smtp._download_client_certificates() + + path = self._client_cert_path() + self.assertTrue(os.path.isfile(path)) + + def _client_cert_path(self): + return os.path.join(self.leap_home, 'providers', 'some-server.test', 'keys', 'client', 'smtp.pem') + + @patch('pixelated.bitmask_libraries.smtp.setup_smtp_gateway') + def test_that_start_calls_setup_smtp_gateway(self, gateway_mock): + smtp = LeapSmtp(self.provider, self.keymanager, self.srp_session) + port = 500 + smtp.TWISTED_PORT = port + gateway_mock.return_value = (None, None) + with HTTMock(ca_cert_mock, not_found_mock): + smtp.start() + + cert_path = self._client_cert_path() + gateway_mock.assert_called_with(keymanager=self.keymanager, smtp_cert=cert_path, smtp_key=cert_path, userid='test_user@some-server.test', smtp_port='1234', encrypted_only=False, smtp_host='smtp.some-sever.test', port=port) + + def test_that_client_stop_does_nothing_if_not_started(self): + smtp = LeapSmtp(self.provider, self.keymanager, self.srp_session) + + with HTTMock(not_found_mock): + smtp.stop() + + @patch('pixelated.bitmask_libraries.smtp.setup_smtp_gateway') + def test_that_running_smtp_sevice_is_stopped(self, gateway_mock): + smtp = LeapSmtp(self.provider, self.keymanager, self.srp_session) + + smtp_service = MagicMock() + smtp_port = MagicMock() + gateway_mock.return_value = (smtp_service, smtp_port) + + with HTTMock(ca_cert_mock, not_found_mock): + smtp.start() + smtp.stop() + + smtp_port.stopListening.assert_called_with() + smtp_service.doStop.assert_called_with() diff --git a/service/test/unit/bitmask_libraries/soledad_test.py b/service/test/unit/bitmask_libraries/soledad_test.py new file mode 100644 index 00000000..83a19fe1 --- /dev/null +++ b/service/test/unit/bitmask_libraries/soledad_test.py @@ -0,0 +1,69 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see <http://www.gnu.org/licenses/>. +from mock import patch +from pixelated.bitmask_libraries.soledad import SoledadSession +from abstract_leap_test import AbstractLeapTest + + +@patch('pixelated.bitmask_libraries.soledad.Soledad') +class SoledadSessionTest(AbstractLeapTest): + + def setUp(self): + # given + self.provider.fetch_soledad_json.return_value = {'hosts': { + 'couch1': { + 'hostname': 'couch1.some-server.test', + 'ip_address': '192.168.1.1', + 'port': 1234 + } + }} + + @patch('pixelated.bitmask_libraries.soledad.Soledad.__init__') + def test_that_soledad_is_created_with_required_params(self, soledad_mock, init_mock): + # when + SoledadSession(self.provider, 'any-passphrase', self.srp_session) + + # then + init_mock.assert_called_with(self.uuid, 'any-passphrase', '%s/soledad/%s.secret' % (self.leap_home, self.uuid), + '%s/soledad/%s.db' % (self.leap_home, self.uuid), + 'https://couch1.some-server.test:1234/user-%s' % self.uuid, + '/some/path/to/ca_cert', self.token) + + def test_that_sync_is_called(self, soledad_mock): + instance = soledad_mock.return_value + instance.server_url = '/foo/bar' + instance.need_sync.return_value = True + soledad_session = SoledadSession(self.provider, 'any-passphrase', self.srp_session) + + # when + soledad_session.sync() + + # then + instance.need_sync.assert_called_with('/foo/bar') + instance.sync.assert_called_with() + + def test_that_sync_not_called_if_not_needed(self, mock): + instance = mock.return_value + instance.server_url = '/foo/bar' + instance.need_sync.return_value = False + soledad_session = SoledadSession(self.provider, 'any-passphrase', self.srp_session) + + # when + soledad_session.sync() + + # then + instance.need_sync.assert_called_with('/foo/bar') + self.assertFalse(instance.sync.called) diff --git a/service/test/unit/search/test_search_query.py b/service/test/unit/search/test_search_query.py new file mode 100644 index 00000000..3bcbd219 --- /dev/null +++ b/service/test/unit/search/test_search_query.py @@ -0,0 +1,39 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see <http://www.gnu.org/licenses/>. +import sys +import os +import unittest +import pixelated.search_query as search_query + + +class SearchTestCase(unittest.TestCase): + + def test_one_tag(self): + self.assertEquals(search_query.compile(u"in:inbox")["tags"], ["inbox"]) + self.assertEquals(search_query.compile(u"in:trash")["tags"], ["trash"]) + + def test_two_tags_or(self): + self.assertEquals(search_query.compile(u"in:inbox or in:trash")["tags"], ["inbox", "trash"]) + + def test_tag_negate(self): + self.assertEquals(search_query.compile(u"-in:trash")["not_tags"], ["trash"]) + + def test_general_search(self): + self.assertEquals(search_query.compile(u"searching")["general"], "searching") + + def test_tags_with_quotes(self): + self.assertEquals(search_query.compile(u"in:\"inbox\"")["tags"], ["inbox"]) + self.assertEquals(search_query.compile(u"in:'inbox'")["tags"], ["inbox"]) diff --git a/service/test/unit/user_agent_test.py b/service/test/unit/user_agent_test.py new file mode 100644 index 00000000..22199333 --- /dev/null +++ b/service/test/unit/user_agent_test.py @@ -0,0 +1,157 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see <http://www.gnu.org/licenses/>. + +import unittest +import json +import sys + +import pixelated.user_agent +from pixelated.adapter.tag import Tag +from pixelated.adapter.tag_service import TagService +from mockito import * +import crochet +import pixelated.reactor_manager as reactor_manager +import test.support.test_helper as test_helper +import pixelated.adapter.pixelated_mail +import os + + +class UserAgentTest(unittest.TestCase): + + def setUp(self): + self.app = pixelated.user_agent.app.test_client() + self.mail_service = mock() + self.tag_service = mock() + self.mail_service.tag_service = self.tag_service + + pixelated.user_agent.DISABLED_FEATURES = [] + pixelated.user_agent.mail_service = self.mail_service + self.input_mail = None + pixelated.adapter.pixelated_mail.input_mail_from_dict = lambda x: self.input_mail + + def tearDown(self): + unstub() + + def test_create_or_send_draft_should_create_draft_if_mail_has_no_ident(self): + self.input_mail = self.draft() + + self.app.post('/mails', data='{}', content_type="application/json") + + verify(self.mail_service).create_draft(self.input_mail) + + def test_create_or_send_draft_should_send_draft_if_mail_has_ident(self): + self.input_mail = self.draft() + + self.app.post('/mails', data='{"ident":1}', content_type="application/json") + + verify(self.mail_service).send(1, self.input_mail) + + def test_sending_mail_return_sent_mail_data_when_send_succeeds(self): + self.input_mail = self.draft() + self.input_mail.as_dict = lambda: {'header': {'from': 'a@a.a', 'to': 'b@b.b'}, + 'ident': 1, + 'tags': [], + 'status': [], + 'security_casing': {}, + 'body': 'email body'} + + result = self.app.post('/mails', data='{"ident":1}', content_type="application/json") + + self.assertEqual(result.status_code, 200) + self.assertEqual(result.data, '{"status": [], "body": "email body", "ident": 1, "tags": [], "header": {"to": "b@b.b", "from": "a@a.a"}, "security_casing": {}}') + + def test_sending_mail_return_error_message_when_send_fails(self): + self.input_mail = self.draft() + + def send_that_throws_exception(id, mail): + raise Exception('email sending failed', 'more information of error') + + self.mail_service.send = send_that_throws_exception + + result = self.app.post('/mails', data='{"ident":1}', content_type="application/json") + + self.assertEqual(result.status_code, 500) + self.assertEqual(result.data, '{"message": "email sending failed\\nmore information of error"}') + + def test_update_draft(self): + self.input_mail = self.draft() + + when(self.mail_service).update_draft(1, self.input_mail).thenReturn(self.input_mail) + + self.app.put('/mails', data='{"ident":1}', content_type="application/json") + + verify(self.mail_service).update_draft(1, self.input_mail) + + def draft(self): + return test_helper.input_mail() + + def test_that_default_config_file_is_home_dot_pixelated(self): + orig_config = pixelated.user_agent.app.config + try: + when(crochet).setup().thenReturn(None) + when(reactor_manager).start_reactor().thenReturn(None) + when(pixelated.user_agent).start_user_agent().thenReturn(None) + pixelated.user_agent.app.config = mock() + + sys.argv = ['/tmp/does_not_exist'] + pixelated.user_agent.setup() + + verify(pixelated.user_agent.app.config).from_pyfile(os.path.join(os.environ['HOME'], '.pixelated')) + finally: + pixelated.user_agent.app.config = orig_config + + def test_that_config_file_can_be_specified_on_command_line(self): + orig_config = pixelated.user_agent.app.config + try: + when(crochet).setup().thenReturn(None) + when(reactor_manager).start_reactor().thenReturn(None) + when(pixelated.user_agent).start_user_agent().thenReturn(None) + pixelated.user_agent.app.config = mock() + + sys.argv = ['/tmp/does_not_exist', '--config', '/tmp/some/config/file'] + pixelated.user_agent.setup() + + verify(pixelated.user_agent.app.config).from_pyfile('/tmp/some/config/file') + finally: + pixelated.user_agent.app.config = orig_config + + def test_that_tags_returns_all_tags(self): + when(self.tag_service).all_tags().thenReturn(TagService.SPECIAL_TAGS) + + response = self.app.get('/tags') + + self.assertEqual(200, response.status_code) + expected = json.dumps([tag.as_dict() for tag in TagService.SPECIAL_TAGS]) + self.assertEqual(expected, response.data) + + def test_that_tags_are_filtered_by_query(self): + when(self.tag_service).all_tags().thenReturn(TagService.SPECIAL_TAGS) + + response = self.app.get('/tags?q=dr') + + self.assertEqual(200, response.status_code) + expected = json.dumps([Tag('drafts', True).as_dict()]) + self.assertEqual(expected, response.data) + + def test_that_default_tags_are_ignorable(self): + when(self.tag_service).all_tags().thenReturn(TagService.SPECIAL_TAGS) + when(self.tag_service).all_custom_tags().thenReturn([Tag('test')]) + + response = self.app.get('/tags?skipDefaultTags=true') + + self.assertEqual(200, response.status_code) + expected = json.dumps([Tag('test').as_dict()]) + self.assertEqual(expected, response.data) |