From a8274b633a62262e65c7d013e61a54541ce07bf8 Mon Sep 17 00:00:00 2001 From: Duda Dornelles Date: Thu, 22 Jan 2015 10:25:40 -0200 Subject: #224 renaming tests so they get caught by trial runner --- service/test/integration/contacts_test.py | 54 ---- service/test/integration/delete_mail_test.py | 65 ----- service/test/integration/drafts_test.py | 84 ------ .../test/integration/mark_as_read_unread_test.py | 104 -------- .../test/integration/retrieve_attachment_test.py | 45 ---- service/test/integration/search_test.py | 144 ----------- service/test/integration/soledad_querier_test.py | 88 ------- service/test/integration/tags_test.py | 54 ---- service/test/integration/test_contacts.py | 54 ++++ service/test/integration/test_delete_mail.py | 65 +++++ service/test/integration/test_drafts.py | 84 ++++++ .../test/integration/test_mark_as_read_unread.py | 104 ++++++++ .../test/integration/test_retrieve_attachment.py | 45 ++++ service/test/integration/test_search.py | 144 +++++++++++ service/test/integration/test_soledad_querier.py | 88 +++++++ service/test/integration/test_tags.py | 54 ++++ service/test/unit/adapter/draft_service_test.py | 30 --- service/test/unit/adapter/mail_service_test.py | 51 ---- service/test/unit/adapter/mail_test.py | 287 --------------------- .../unit/adapter/mailbox_indexer_listener_test.py | 54 ---- service/test/unit/adapter/mailbox_test.py | 36 --- service/test/unit/adapter/soledad_querier_test.py | 106 -------- service/test/unit/adapter/test_draft_service.py | 30 +++ service/test/unit/adapter/test_mail.py | 287 +++++++++++++++++++++ service/test/unit/adapter/test_mail_service.py | 51 ++++ service/test/unit/adapter/test_mailbox.py | 36 +++ .../unit/adapter/test_mailbox_indexer_listener.py | 54 ++++ service/test/unit/adapter/test_soledad_querier.py | 106 ++++++++ .../unit/bitmask_libraries/abstract_leap_test.py | 42 --- service/test/unit/bitmask_libraries/certs_test.py | 19 -- .../test/unit/bitmask_libraries/leap_srp_test.py | 157 ----------- .../test/unit/bitmask_libraries/nicknym_test.py | 48 ---- .../test/unit/bitmask_libraries/provider_test.py | 185 ------------- .../test/unit/bitmask_libraries/session_test.py | 72 ------ service/test/unit/bitmask_libraries/smtp_test.py | 98 ------- .../test/unit/bitmask_libraries/soledad_test.py | 69 ----- .../unit/bitmask_libraries/test_abstract_leap.py | 42 +++ service/test/unit/bitmask_libraries/test_certs.py | 19 ++ .../test/unit/bitmask_libraries/test_leap_srp.py | 157 +++++++++++ .../test/unit/bitmask_libraries/test_nicknym.py | 48 ++++ .../test/unit/bitmask_libraries/test_provider.py | 185 +++++++++++++ .../test/unit/bitmask_libraries/test_session.py | 72 ++++++ service/test/unit/bitmask_libraries/test_smtp.py | 98 +++++++ .../test/unit/bitmask_libraries/test_soledad.py | 69 +++++ service/test/unit/config/app_factory_test.py | 32 --- service/test/unit/config/test_app_factory.py | 32 +++ .../unit/resources/sync_info_controller_test.py | 54 ---- .../unit/resources/test_sync_info_controller.py | 54 ++++ service/test/unit/runserver_test.py | 88 ------- .../unit/support/encrypted_file_storage_test.py | 64 ----- .../unit/support/test_encrypted_file_storage.py | 64 +++++ service/test/unit/test_runserver.py | 88 +++++++ 52 files changed, 2130 insertions(+), 2130 deletions(-) delete mode 100644 service/test/integration/contacts_test.py delete mode 100644 service/test/integration/delete_mail_test.py delete mode 100644 service/test/integration/drafts_test.py delete mode 100644 service/test/integration/mark_as_read_unread_test.py delete mode 100644 service/test/integration/retrieve_attachment_test.py delete mode 100644 service/test/integration/search_test.py delete mode 100644 service/test/integration/soledad_querier_test.py delete mode 100644 service/test/integration/tags_test.py create mode 100644 service/test/integration/test_contacts.py create mode 100644 service/test/integration/test_delete_mail.py create mode 100644 service/test/integration/test_drafts.py create mode 100644 service/test/integration/test_mark_as_read_unread.py create mode 100644 service/test/integration/test_retrieve_attachment.py create mode 100644 service/test/integration/test_search.py create mode 100644 service/test/integration/test_soledad_querier.py create mode 100644 service/test/integration/test_tags.py delete mode 100644 service/test/unit/adapter/draft_service_test.py delete mode 100644 service/test/unit/adapter/mail_service_test.py delete mode 100644 service/test/unit/adapter/mail_test.py delete mode 100644 service/test/unit/adapter/mailbox_indexer_listener_test.py delete mode 100644 service/test/unit/adapter/mailbox_test.py delete mode 100644 service/test/unit/adapter/soledad_querier_test.py create mode 100644 service/test/unit/adapter/test_draft_service.py create mode 100644 service/test/unit/adapter/test_mail.py create mode 100644 service/test/unit/adapter/test_mail_service.py create mode 100644 service/test/unit/adapter/test_mailbox.py create mode 100644 service/test/unit/adapter/test_mailbox_indexer_listener.py create mode 100644 service/test/unit/adapter/test_soledad_querier.py delete mode 100644 service/test/unit/bitmask_libraries/abstract_leap_test.py delete mode 100644 service/test/unit/bitmask_libraries/certs_test.py delete mode 100644 service/test/unit/bitmask_libraries/leap_srp_test.py delete mode 100644 service/test/unit/bitmask_libraries/nicknym_test.py delete mode 100644 service/test/unit/bitmask_libraries/provider_test.py delete mode 100644 service/test/unit/bitmask_libraries/session_test.py delete mode 100644 service/test/unit/bitmask_libraries/smtp_test.py delete mode 100644 service/test/unit/bitmask_libraries/soledad_test.py create mode 100644 service/test/unit/bitmask_libraries/test_abstract_leap.py create mode 100644 service/test/unit/bitmask_libraries/test_certs.py create mode 100644 service/test/unit/bitmask_libraries/test_leap_srp.py create mode 100644 service/test/unit/bitmask_libraries/test_nicknym.py create mode 100644 service/test/unit/bitmask_libraries/test_provider.py create mode 100644 service/test/unit/bitmask_libraries/test_session.py create mode 100644 service/test/unit/bitmask_libraries/test_smtp.py create mode 100644 service/test/unit/bitmask_libraries/test_soledad.py delete mode 100644 service/test/unit/config/app_factory_test.py create mode 100644 service/test/unit/config/test_app_factory.py delete mode 100644 service/test/unit/resources/sync_info_controller_test.py create mode 100644 service/test/unit/resources/test_sync_info_controller.py delete mode 100644 service/test/unit/runserver_test.py delete mode 100644 service/test/unit/support/encrypted_file_storage_test.py create mode 100644 service/test/unit/support/test_encrypted_file_storage.py create mode 100644 service/test/unit/test_runserver.py (limited to 'service/test') diff --git a/service/test/integration/contacts_test.py b/service/test/integration/contacts_test.py deleted file mode 100644 index 925e5e02..00000000 --- a/service/test/integration/contacts_test.py +++ /dev/null @@ -1,54 +0,0 @@ -# -# Copyright (c) 2014 ThoughtWorks, Inc. -# -# Pixelated is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Pixelated is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with Pixelated. If not, see . -from nose.twistedtools import deferred -from test.support.integration import SoledadTestBase, MailBuilder - - -class ContactsTest(SoledadTestBase): - - def setUp(self): - SoledadTestBase.setUp(self) - - @deferred(timeout=SoledadTestBase.DEFERRED_TIMEOUT) - def test_TO_CC_and_BCC_fields_are_being_searched(self): - input_mail = MailBuilder().with_tags(['important']).build_input_mail() - self.client.add_mail_to_inbox(input_mail) - - d = self.get_contacts(query='recipient') - - def _assert(contacts): - self.assertTrue('recipient@to.com' in contacts) - self.assertTrue('recipient@cc.com' in contacts) - self.assertTrue('recipient@bcc.com' in contacts) - d.addCallback(_assert) - return d - - @deferred(timeout=SoledadTestBase.DEFERRED_TIMEOUT) - def test_trash_and_drafts_mailboxes_are_being_ignored(self): - self.client.add_multiple_to_mailbox(1, mailbox='INBOX', to='recipient@inbox.com') - self.client.add_multiple_to_mailbox(1, mailbox='DRAFTS', to='recipient@drafts.com') - self.client.add_multiple_to_mailbox(1, mailbox='SENT', to='recipient@sent.com') - self.client.add_multiple_to_mailbox(1, mailbox='TRASH', to='recipient@trash.com') - - d = self.get_contacts(query='recipient') - - def _assert(contacts): - self.assertTrue('recipient@inbox.com' in contacts) - self.assertTrue('recipient@sent.com' in contacts) - self.assertFalse('recipient@drafts.com' in contacts) - self.assertFalse('recipient@trash.com' in contacts) - d.addCallback(_assert) - return d diff --git a/service/test/integration/delete_mail_test.py b/service/test/integration/delete_mail_test.py deleted file mode 100644 index 5a3a97fb..00000000 --- a/service/test/integration/delete_mail_test.py +++ /dev/null @@ -1,65 +0,0 @@ -# -# Copyright (c) 2014 ThoughtWorks, Inc. -# -# Pixelated is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Pixelated is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with Pixelated. If not, see . - -from test.support.integration import * - - -class DeleteMailTest(SoledadTestBase): - - def setUp(self): - SoledadTestBase.setUp(self) - - def tearDown(self): - SoledadTestBase.tearDown(self) - - def test_move_mail_to_trash_when_deleting(self): - input_mail = MailBuilder().with_subject('Mail with tags').build_input_mail() - self.client.add_mail_to_inbox(input_mail) - - inbox_mails = self.get_mails_by_tag('inbox') - self.assertEquals(1, len(inbox_mails)) - - self.delete_mail(input_mail.ident) - - inbox_mails = self.get_mails_by_tag('inbox') - self.assertEquals(0, len(inbox_mails)) - trash_mails = self.get_mails_by_tag('trash') - self.assertEquals(1, len(trash_mails)) - - def test_delete_mail_when_trashing_mail_from_trash_mailbox(self): - mails = self.client.add_multiple_to_mailbox(1, 'trash') - self.delete_mails([mails[0].ident]) - - trash_mails = self.get_mails_by_tag('trash') - - self.assertEqual(0, len(trash_mails)) - - def test_move_mail_to_trash_when_delete_multiple(self): - mails = self.client.add_multiple_to_mailbox(5, 'inbox') - mail_idents = [m.ident for m in mails] - - self.delete_mails(mail_idents) - - inbox = self.get_mails_by_tag('inbox') - self.assertEquals(0, len(inbox)) - - def test_delete_permanently_when_mails_are_in_trash(self): - mails = self.client.add_multiple_to_mailbox(5, 'trash') - self.delete_mails([m.ident for m in mails]) - - trash = self.get_mails_by_tag('trash') - - self.assertEquals(0, len(trash)) diff --git a/service/test/integration/drafts_test.py b/service/test/integration/drafts_test.py deleted file mode 100644 index d4fde099..00000000 --- a/service/test/integration/drafts_test.py +++ /dev/null @@ -1,84 +0,0 @@ -# -# Copyright (c) 2014 ThoughtWorks, Inc. -# -# Pixelated is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Pixelated is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with Pixelated. If not, see . - -from test.support.integration import * - - -class DraftsTest(SoledadTestBase): - - def setUp(self): - SoledadTestBase.setUp(self) - - def tearDown(self): - SoledadTestBase.tearDown(self) - - def test_post_sends_mail_and_deletes_previous_draft_if_it_exists(self): - # creates one draft - first_draft = MailBuilder().with_subject('First draft').build_json() - first_draft_ident = self.put_mail(first_draft)[0]['ident'] - - # sends an updated version of the draft - second_draft = MailBuilder().with_subject('Second draft').with_ident(first_draft_ident).build_json() - self.post_mail(second_draft) - - sent_mails = self.get_mails_by_tag('sent') - drafts = self.get_mails_by_tag('drafts') - - # make sure there is one email in the sent mailbox and it is the second draft - self.assertEquals(1, len(sent_mails)) - self.assertEquals('Second draft', sent_mails[0].subject) - - # make sure that there are no drafts in the draft mailbox - self.assertEquals(0, len(drafts)) - - def test_post_sends_mail_even_when_draft_does_not_exist(self): - first_draft = MailBuilder().with_subject('First draft').build_json() - self.post_mail(first_draft) - - sent_mails = self.get_mails_by_tag('sent') - drafts = self.get_mails_by_tag('drafts') - - self.assertEquals(1, len(sent_mails)) - self.assertEquals('First draft', sent_mails[0].subject) - self.assertEquals(0, len(drafts)) - - def test_put_creates_a_draft_if_it_does_not_exist(self): - mail = MailBuilder().with_subject('A new draft').build_json() - self.put_mail(mail) - mails = self.get_mails_by_tag('drafts') - - self.assertEquals('A new draft', mails[0].subject) - - def test_put_updates_draft_if_it_already_exists(self): - draft = MailBuilder().with_subject('First draft').build_json() - draft_ident = self.put_mail(draft)[0]['ident'] - - updated_draft = MailBuilder().with_subject('First draft edited').with_ident(draft_ident).build_json() - self.put_mail(updated_draft) - - drafts = self.get_mails_by_tag('drafts') - - self.assertEquals(1, len(drafts)) - self.assertEquals('First draft edited', drafts[0].subject) - - def test_respond_unprocessable_entity_if_draft_to_remove_doesnt_exist(self): - draft = MailBuilder().with_subject('First draft').build_json() - self.put_mail(draft) - - updated_draft = MailBuilder().with_subject('First draft edited').with_ident('NOTFOUND').build_json() - _, request = self.put_mail(updated_draft) - - self.assertEquals(422, request.code) diff --git a/service/test/integration/mark_as_read_unread_test.py b/service/test/integration/mark_as_read_unread_test.py deleted file mode 100644 index 86a48e62..00000000 --- a/service/test/integration/mark_as_read_unread_test.py +++ /dev/null @@ -1,104 +0,0 @@ -# -# Copyright (c) 2014 ThoughtWorks, Inc. -# -# Pixelated is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Pixelated is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with Pixelated. If not, see . - -from test.support.integration import * -from pixelated.adapter.model.status import Status - - -class MarkAsReadUnreadTest(SoledadTestBase): - - def setUp(self): - SoledadTestBase.setUp(self) - - def tearDown(self): - SoledadTestBase.tearDown(self) - - def test_mark_single_as_read(self): - input_mail = MailBuilder().build_input_mail() - self.client.add_mail_to_inbox(input_mail) - - mails = self.get_mails_by_tag('inbox') - self.assertNotIn('read', mails[0].status) - - self.mark_many_as_read([input_mail.ident]) - - mails = self.get_mails_by_tag('inbox') - self.assertIn('read', mails[0].status) - - def test_mark_single_as_unread(self): - input_mail = MailBuilder().with_status([Status.SEEN]).build_input_mail() - self.client.add_mail_to_inbox(input_mail) - - self.mark_many_as_unread([input_mail.ident]) - mail = self.get_mails_by_tag('inbox')[0] - - self.assertNotIn('read', mail.status) - - def test_mark_many_mails_as_unread(self): - input_mail = MailBuilder().with_status([Status.SEEN]).build_input_mail() - input_mail2 = MailBuilder().with_status([Status.SEEN]).build_input_mail() - - self.client.add_mail_to_inbox(input_mail) - self.client.add_mail_to_inbox(input_mail2) - - self.mark_many_as_unread([input_mail.ident, input_mail2.ident]) - - mails = self.get_mails_by_tag('inbox') - - self.assertNotIn('read', mails[0].status) - self.assertNotIn('read', mails[1].status) - - def test_mark_many_mails_as_read(self): - input_mail = MailBuilder().build_input_mail() - input_mail2 = MailBuilder().build_input_mail() - - self.client.add_mail_to_inbox(input_mail) - self.client.add_mail_to_inbox(input_mail2) - - mails = self.get_mails_by_tag('inbox') - - self.assertNotIn('read', mails[0].status) - self.assertNotIn('read', mails[1].status) - - response = self.mark_many_as_read([input_mail.ident, input_mail2.ident]) - self.assertEquals(200, response.code) - - mails = self.get_mails_by_tag('inbox') - - self.assertIn('read', mails[0].status) - self.assertIn('read', mails[1].status) - - def test_mark_mixed_status_as_read(self): - input_mail = MailBuilder().build_input_mail() - input_mail2 = MailBuilder().with_status([Status.SEEN]).build_input_mail() - - self.client.add_mail_to_inbox(input_mail) - self.client.add_mail_to_inbox(input_mail2) - - mails = self.get_mails_by_tag('inbox') - - read_mails = filter(lambda x: 'read' in x.status, mails) - unread_mails = filter(lambda x: 'read' not in x.status, mails) - self.assertEquals(1, len(unread_mails)) - self.assertEquals(1, len(read_mails)) - - response = self.mark_many_as_read([input_mail.ident, input_mail2.ident]) - self.assertEquals(200, response.code) - - mails = self.get_mails_by_tag('inbox') - - self.assertIn('read', mails[0].status) - self.assertIn('read', mails[1].status) diff --git a/service/test/integration/retrieve_attachment_test.py b/service/test/integration/retrieve_attachment_test.py deleted file mode 100644 index d6ad9298..00000000 --- a/service/test/integration/retrieve_attachment_test.py +++ /dev/null @@ -1,45 +0,0 @@ -# -# Copyright (c) 2014 ThoughtWorks, Inc. -# -# Pixelated is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Pixelated is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with Pixelated. If not, see . - -from test.support.integration.soledad_test_base import SoledadTestBase - - -class RetrieveAttachmentTest(SoledadTestBase): - - def setUp(self): - SoledadTestBase.setUp(self) - - def tearDown(self): - SoledadTestBase.tearDown(self) - - def test_attachment_content_is_retrieved(self): - ident = 'F4E99C1CEC4D300A4223A96CCABBE0304BDBC31C550A5A03E207A5E4C3C71A22' - attachment_dict = {'content-disposition': 'attachment', - 'content-transfer-encoding': '', - 'type': 'cnt', - 'raw': 'cGVxdWVubyBhbmV4byA6RAo=', - 'phash': ident, - 'content-type': 'text/plain; charset=US-ASCII; name="attachment_pequeno.txt"'} - - self.client.add_document_to_soledad(attachment_dict) - - d = self.get_attachment(ident, 'base64') - - def _assert(attachment): - self.assertEquals('pequeno anexo :D\n', attachment) - d.addCallback(_assert) - - return d diff --git a/service/test/integration/search_test.py b/service/test/integration/search_test.py deleted file mode 100644 index 464830d2..00000000 --- a/service/test/integration/search_test.py +++ /dev/null @@ -1,144 +0,0 @@ -# -# Copyright (c) 2014 ThoughtWorks, Inc. -# -# Pixelated is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Pixelated is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with Pixelated. If not, see . - -from nose.twistedtools import deferred -from test.support.integration import * - - -class SearchTest(SoledadTestBase): - - def setUp(self): - SoledadTestBase.setUp(self) - - @deferred(timeout=SoledadTestBase.DEFERRED_TIMEOUT) - def test_that_tags_returns_all_tags(self): - input_mail = MailBuilder().with_tags(['important']).build_input_mail() - self.client.add_mail_to_inbox(input_mail) - - d = self.get_tags() - - def _assert(all_tags): - all_tag_names = [t['name'] for t in all_tags] - self.assertTrue('inbox' in all_tag_names) - self.assertTrue('sent' in all_tag_names) - self.assertTrue('trash' in all_tag_names) - self.assertTrue('drafts' in all_tag_names) - self.assertTrue('important' in all_tag_names) - d.addCallback(_assert) - return d - - @deferred(timeout=SoledadTestBase.DEFERRED_TIMEOUT) - def test_that_tags_are_filtered_by_query(self): - input_mail = MailBuilder().with_tags(['ateu', 'catoa', 'luat', 'zuado']).build_input_mail() - self.client.add_mail_to_inbox(input_mail) - - d = self.get_tags(q=["at"], skipDefaultTags=["true"]) - - def _assert(all_tags): - all_tag_names = [t['name'] for t in all_tags] - self.assertEqual(3, len(all_tag_names)) - self.assertTrue('ateu' in all_tag_names) - self.assertTrue('catoa' in all_tag_names) - self.assertTrue('luat' in all_tag_names) - - d.addCallback(_assert) - return d - - @deferred(timeout=SoledadTestBase.DEFERRED_TIMEOUT) - def test_that_default_tags_are_ignorable(self): - input_mail = MailBuilder().with_tags(['sometag']).build_input_mail() - self.client.add_mail_to_inbox(input_mail) - - d = self.get_tags(skipDefaultTags=["true"]) - - def _assert(all_tags): - all_tag_names = [t['name'] for t in all_tags] - self.assertEqual(1, len(all_tag_names)) - self.assertTrue('sometag' in all_tag_names) - d.addCallback(_assert) - return d - - @deferred(timeout=SoledadTestBase.DEFERRED_TIMEOUT_LONG) - def test_tags_count(self): - self.client.add_multiple_to_mailbox(num=10, mailbox='inbox', flags=['\\Recent']) - self.client.add_multiple_to_mailbox(num=5, mailbox='inbox', flags=['\\Seen']) - self.client.add_multiple_to_mailbox(num=3, mailbox='inbox', flags=['\\Recent'], tags=['important', 'later']) - self.client.add_multiple_to_mailbox(num=1, mailbox='inbox', flags=['\\Seen'], tags=['important']) - - d = self.get_tags() - - def _assert(tags_count): - self.assertEqual(self.get_count(tags_count, 'inbox')['total'], 19) - self.assertEqual(self.get_count(tags_count, 'inbox')['read'], 6) - self.assertEqual(self.get_count(tags_count, 'important')['total'], 4) - self.assertEqual(self.get_count(tags_count, 'important')['read'], 1) - d.addCallback(_assert) - return d - - def test_search_mails_different_window(self): - input_mail = MailBuilder().build_input_mail() - input_mail2 = MailBuilder().build_input_mail() - self.client.add_mail_to_inbox(input_mail) - self.client.add_mail_to_inbox(input_mail2) - - first_page = self.get_mails_by_tag('inbox', page=1, window=1) - - self.assertEqual(len(first_page), 1) - - def test_search_mails_with_multiple_pages(self): - input_mail = MailBuilder().build_input_mail() - input_mail2 = MailBuilder().build_input_mail() - self.client.add_mail_to_inbox(input_mail) - self.client.add_mail_to_inbox(input_mail2) - - first_page = self.get_mails_by_tag('inbox', page=1, window=1) - second_page = self.get_mails_by_tag('inbox', page=2, window=1) - - idents = [input_mail.ident, input_mail2.ident] - - self.assertIn(first_page[0].ident, idents) - self.assertIn(second_page[0].ident, idents) - - def test_page_zero_fetches_first_page(self): - input_mail = MailBuilder().build_input_mail() - self.client.add_mail_to_inbox(input_mail) - page = self.get_mails_by_tag('inbox', page=0, window=1) - self.assertEqual(page[0].ident, input_mail.ident) - - def get_count(self, tags_count, mailbox): - for tag in tags_count: - if tag['name'] == mailbox: - return tag['counts'] - - def test_order_by_date(self): - input_mail = MailBuilder().with_date('2014-10-15T15:15').build_input_mail() - input_mail2 = MailBuilder().with_date('2014-10-15T15:16').build_input_mail() - - self.client.add_mail_to_inbox(input_mail) - self.client.add_mail_to_inbox(input_mail2) - - results = self.get_mails_by_tag('inbox') - self.assertEqual(results[0].ident, input_mail2.ident) - self.assertEqual(results[1].ident, input_mail.ident) - - def test_search_base64_body(self): - body = u'bl\xe1' - input_mail = MailBuilder().with_body(body.encode('utf-8')).build_input_mail() - self.client.add_mail_to_inbox(input_mail) - results = self.search(body) - - self.assertGreater(len(results), 0, 'No results returned from search') - self.assertEquals(results[0].ident, input_mail.ident) diff --git a/service/test/integration/soledad_querier_test.py b/service/test/integration/soledad_querier_test.py deleted file mode 100644 index f8767630..00000000 --- a/service/test/integration/soledad_querier_test.py +++ /dev/null @@ -1,88 +0,0 @@ -# -# Copyright (c) 2014 ThoughtWorks, Inc. -# -# Pixelated is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Pixelated is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with Pixelated. If not, see . - -import copy -import time - -from test.support.integration import * -from leap.mail.imap.fields import WithMsgFields - - -class SoledadQuerierTest(SoledadTestBase, WithMsgFields): - - def setUp(self): - SoledadTestBase.setUp(self) - self.soledad = self.client.soledad - self.maxDiff = None - self.soledad_querier = self.client.soledad_querier - - def tearDown(self): - SoledadTestBase.tearDown(self) - - def _get_empty_mailbox(self): - return copy.deepcopy(self.EMPTY_MBOX) - - def _create_mailbox(self, mailbox_name): - new_mailbox = self._get_empty_mailbox() - new_mailbox['mbox'] = mailbox_name - new_mailbox['created'] = int(time.time() * 10E2) - return self.soledad.create_doc(new_mailbox) - - def _get_mailboxes_from_soledad(self, mailbox_name): - return [m for m in self.soledad.get_from_index('by-type', 'mbox') if m.content['mbox'] == mailbox_name] - - def test_remove_dup_mailboxes_keeps_the_one_with_the_highest_last_uid(self): - self.client.add_multiple_to_mailbox(3, 'INBOX') # by now we already have one inbox with 3 mails - self._create_mailbox('INBOX') # now we have a duplicate - - # make sure we have two - inboxes = self._get_mailboxes_from_soledad('INBOX') - self.assertEqual(2, len(inboxes)) - - self.soledad_querier.remove_duplicates() - - # make sure we only have one, and the one with the right lastuid - inboxes = self._get_mailboxes_from_soledad('INBOX') - self.assertEqual(1, len(inboxes)) - self.assertEqual(3, inboxes[0].content['lastuid']) - - def test_all_mails_skips_incomplete_mails(self): - # creating incomplete mail, we will only save the fdoc - fdoc, hdoc, bdoc = MailBuilder().build_input_mail().get_for_save(1, 'INBOX') - self.soledad.create_doc(fdoc) - - mails = self.soledad_querier.all_mails() - self.assertEqual(0, len(mails)) # mail is incomplete since it only has fdoc - - # adding the hdoc still doesn't complete the mail - self.soledad.create_doc(hdoc) - - mails = self.soledad_querier.all_mails() - self.assertEqual(0, len(mails)) - - # now the mail is complete - self.soledad.create_doc(bdoc) - - mails = self.soledad_querier.all_mails() - self.assertEqual(1, len(mails)) - - def test_get_mails_by_chash(self): - mails = self.client.add_multiple_to_mailbox(3, 'INBOX') - chashes = [mail.ident for mail in mails] - - fetched_mails = self.soledad_querier.mails(chashes) - - self.assertEquals([m.as_dict() for m in fetched_mails], [m.as_dict() for m in mails]) diff --git a/service/test/integration/tags_test.py b/service/test/integration/tags_test.py deleted file mode 100644 index 6072392c..00000000 --- a/service/test/integration/tags_test.py +++ /dev/null @@ -1,54 +0,0 @@ -# -# Copyright (c) 2014 ThoughtWorks, Inc. -# -# Pixelated is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Pixelated is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with Pixelated. If not, see . -import json - -from test.support.integration import * -from pixelated.adapter.services.tag_service import TagService - - -class TagsTest(SoledadTestBase): - - def setUp(self): - SoledadTestBase.setUp(self) - - def tearDown(self): - SoledadTestBase.tearDown(self) - - def _tags_json(self, tags): - return json.dumps({'newtags': tags}) - - def test_add_tag_to_an_inbox_mail_and_query(self): - mail = MailBuilder().with_subject('Mail with tags').build_input_mail() - self.client.add_mail_to_inbox(mail) - - self.post_tags(mail.ident, self._tags_json(['IMPORTANT'])) - - mails = self.get_mails_by_tag('inbox') - self.assertEquals({'important'}, set(mails[0].tags)) - - mails = self.get_mails_by_tag('important') - self.assertEquals('Mail with tags', mails[0].subject) - - def test_addition_of_reserved_tags_is_not_allowed(self): - mail = MailBuilder().with_subject('Mail with tags').build_input_mail() - self.client.add_mail_to_inbox(mail) - - for tag in TagService.SPECIAL_TAGS: - response = self.post_tags(mail.ident, self._tags_json([tag.name.upper()])) - self.assertEquals("None of the following words can be used as tags: %s" % tag.name, response) - - mail = self.client.mailboxes.inbox().mail(mail.ident) - self.assertNotIn('drafts', mail.tags) diff --git a/service/test/integration/test_contacts.py b/service/test/integration/test_contacts.py new file mode 100644 index 00000000..925e5e02 --- /dev/null +++ b/service/test/integration/test_contacts.py @@ -0,0 +1,54 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see . +from nose.twistedtools import deferred +from test.support.integration import SoledadTestBase, MailBuilder + + +class ContactsTest(SoledadTestBase): + + def setUp(self): + SoledadTestBase.setUp(self) + + @deferred(timeout=SoledadTestBase.DEFERRED_TIMEOUT) + def test_TO_CC_and_BCC_fields_are_being_searched(self): + input_mail = MailBuilder().with_tags(['important']).build_input_mail() + self.client.add_mail_to_inbox(input_mail) + + d = self.get_contacts(query='recipient') + + def _assert(contacts): + self.assertTrue('recipient@to.com' in contacts) + self.assertTrue('recipient@cc.com' in contacts) + self.assertTrue('recipient@bcc.com' in contacts) + d.addCallback(_assert) + return d + + @deferred(timeout=SoledadTestBase.DEFERRED_TIMEOUT) + def test_trash_and_drafts_mailboxes_are_being_ignored(self): + self.client.add_multiple_to_mailbox(1, mailbox='INBOX', to='recipient@inbox.com') + self.client.add_multiple_to_mailbox(1, mailbox='DRAFTS', to='recipient@drafts.com') + self.client.add_multiple_to_mailbox(1, mailbox='SENT', to='recipient@sent.com') + self.client.add_multiple_to_mailbox(1, mailbox='TRASH', to='recipient@trash.com') + + d = self.get_contacts(query='recipient') + + def _assert(contacts): + self.assertTrue('recipient@inbox.com' in contacts) + self.assertTrue('recipient@sent.com' in contacts) + self.assertFalse('recipient@drafts.com' in contacts) + self.assertFalse('recipient@trash.com' in contacts) + d.addCallback(_assert) + return d diff --git a/service/test/integration/test_delete_mail.py b/service/test/integration/test_delete_mail.py new file mode 100644 index 00000000..5a3a97fb --- /dev/null +++ b/service/test/integration/test_delete_mail.py @@ -0,0 +1,65 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see . + +from test.support.integration import * + + +class DeleteMailTest(SoledadTestBase): + + def setUp(self): + SoledadTestBase.setUp(self) + + def tearDown(self): + SoledadTestBase.tearDown(self) + + def test_move_mail_to_trash_when_deleting(self): + input_mail = MailBuilder().with_subject('Mail with tags').build_input_mail() + self.client.add_mail_to_inbox(input_mail) + + inbox_mails = self.get_mails_by_tag('inbox') + self.assertEquals(1, len(inbox_mails)) + + self.delete_mail(input_mail.ident) + + inbox_mails = self.get_mails_by_tag('inbox') + self.assertEquals(0, len(inbox_mails)) + trash_mails = self.get_mails_by_tag('trash') + self.assertEquals(1, len(trash_mails)) + + def test_delete_mail_when_trashing_mail_from_trash_mailbox(self): + mails = self.client.add_multiple_to_mailbox(1, 'trash') + self.delete_mails([mails[0].ident]) + + trash_mails = self.get_mails_by_tag('trash') + + self.assertEqual(0, len(trash_mails)) + + def test_move_mail_to_trash_when_delete_multiple(self): + mails = self.client.add_multiple_to_mailbox(5, 'inbox') + mail_idents = [m.ident for m in mails] + + self.delete_mails(mail_idents) + + inbox = self.get_mails_by_tag('inbox') + self.assertEquals(0, len(inbox)) + + def test_delete_permanently_when_mails_are_in_trash(self): + mails = self.client.add_multiple_to_mailbox(5, 'trash') + self.delete_mails([m.ident for m in mails]) + + trash = self.get_mails_by_tag('trash') + + self.assertEquals(0, len(trash)) diff --git a/service/test/integration/test_drafts.py b/service/test/integration/test_drafts.py new file mode 100644 index 00000000..d4fde099 --- /dev/null +++ b/service/test/integration/test_drafts.py @@ -0,0 +1,84 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see . + +from test.support.integration import * + + +class DraftsTest(SoledadTestBase): + + def setUp(self): + SoledadTestBase.setUp(self) + + def tearDown(self): + SoledadTestBase.tearDown(self) + + def test_post_sends_mail_and_deletes_previous_draft_if_it_exists(self): + # creates one draft + first_draft = MailBuilder().with_subject('First draft').build_json() + first_draft_ident = self.put_mail(first_draft)[0]['ident'] + + # sends an updated version of the draft + second_draft = MailBuilder().with_subject('Second draft').with_ident(first_draft_ident).build_json() + self.post_mail(second_draft) + + sent_mails = self.get_mails_by_tag('sent') + drafts = self.get_mails_by_tag('drafts') + + # make sure there is one email in the sent mailbox and it is the second draft + self.assertEquals(1, len(sent_mails)) + self.assertEquals('Second draft', sent_mails[0].subject) + + # make sure that there are no drafts in the draft mailbox + self.assertEquals(0, len(drafts)) + + def test_post_sends_mail_even_when_draft_does_not_exist(self): + first_draft = MailBuilder().with_subject('First draft').build_json() + self.post_mail(first_draft) + + sent_mails = self.get_mails_by_tag('sent') + drafts = self.get_mails_by_tag('drafts') + + self.assertEquals(1, len(sent_mails)) + self.assertEquals('First draft', sent_mails[0].subject) + self.assertEquals(0, len(drafts)) + + def test_put_creates_a_draft_if_it_does_not_exist(self): + mail = MailBuilder().with_subject('A new draft').build_json() + self.put_mail(mail) + mails = self.get_mails_by_tag('drafts') + + self.assertEquals('A new draft', mails[0].subject) + + def test_put_updates_draft_if_it_already_exists(self): + draft = MailBuilder().with_subject('First draft').build_json() + draft_ident = self.put_mail(draft)[0]['ident'] + + updated_draft = MailBuilder().with_subject('First draft edited').with_ident(draft_ident).build_json() + self.put_mail(updated_draft) + + drafts = self.get_mails_by_tag('drafts') + + self.assertEquals(1, len(drafts)) + self.assertEquals('First draft edited', drafts[0].subject) + + def test_respond_unprocessable_entity_if_draft_to_remove_doesnt_exist(self): + draft = MailBuilder().with_subject('First draft').build_json() + self.put_mail(draft) + + updated_draft = MailBuilder().with_subject('First draft edited').with_ident('NOTFOUND').build_json() + _, request = self.put_mail(updated_draft) + + self.assertEquals(422, request.code) diff --git a/service/test/integration/test_mark_as_read_unread.py b/service/test/integration/test_mark_as_read_unread.py new file mode 100644 index 00000000..86a48e62 --- /dev/null +++ b/service/test/integration/test_mark_as_read_unread.py @@ -0,0 +1,104 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see . + +from test.support.integration import * +from pixelated.adapter.model.status import Status + + +class MarkAsReadUnreadTest(SoledadTestBase): + + def setUp(self): + SoledadTestBase.setUp(self) + + def tearDown(self): + SoledadTestBase.tearDown(self) + + def test_mark_single_as_read(self): + input_mail = MailBuilder().build_input_mail() + self.client.add_mail_to_inbox(input_mail) + + mails = self.get_mails_by_tag('inbox') + self.assertNotIn('read', mails[0].status) + + self.mark_many_as_read([input_mail.ident]) + + mails = self.get_mails_by_tag('inbox') + self.assertIn('read', mails[0].status) + + def test_mark_single_as_unread(self): + input_mail = MailBuilder().with_status([Status.SEEN]).build_input_mail() + self.client.add_mail_to_inbox(input_mail) + + self.mark_many_as_unread([input_mail.ident]) + mail = self.get_mails_by_tag('inbox')[0] + + self.assertNotIn('read', mail.status) + + def test_mark_many_mails_as_unread(self): + input_mail = MailBuilder().with_status([Status.SEEN]).build_input_mail() + input_mail2 = MailBuilder().with_status([Status.SEEN]).build_input_mail() + + self.client.add_mail_to_inbox(input_mail) + self.client.add_mail_to_inbox(input_mail2) + + self.mark_many_as_unread([input_mail.ident, input_mail2.ident]) + + mails = self.get_mails_by_tag('inbox') + + self.assertNotIn('read', mails[0].status) + self.assertNotIn('read', mails[1].status) + + def test_mark_many_mails_as_read(self): + input_mail = MailBuilder().build_input_mail() + input_mail2 = MailBuilder().build_input_mail() + + self.client.add_mail_to_inbox(input_mail) + self.client.add_mail_to_inbox(input_mail2) + + mails = self.get_mails_by_tag('inbox') + + self.assertNotIn('read', mails[0].status) + self.assertNotIn('read', mails[1].status) + + response = self.mark_many_as_read([input_mail.ident, input_mail2.ident]) + self.assertEquals(200, response.code) + + mails = self.get_mails_by_tag('inbox') + + self.assertIn('read', mails[0].status) + self.assertIn('read', mails[1].status) + + def test_mark_mixed_status_as_read(self): + input_mail = MailBuilder().build_input_mail() + input_mail2 = MailBuilder().with_status([Status.SEEN]).build_input_mail() + + self.client.add_mail_to_inbox(input_mail) + self.client.add_mail_to_inbox(input_mail2) + + mails = self.get_mails_by_tag('inbox') + + read_mails = filter(lambda x: 'read' in x.status, mails) + unread_mails = filter(lambda x: 'read' not in x.status, mails) + self.assertEquals(1, len(unread_mails)) + self.assertEquals(1, len(read_mails)) + + response = self.mark_many_as_read([input_mail.ident, input_mail2.ident]) + self.assertEquals(200, response.code) + + mails = self.get_mails_by_tag('inbox') + + self.assertIn('read', mails[0].status) + self.assertIn('read', mails[1].status) diff --git a/service/test/integration/test_retrieve_attachment.py b/service/test/integration/test_retrieve_attachment.py new file mode 100644 index 00000000..d6ad9298 --- /dev/null +++ b/service/test/integration/test_retrieve_attachment.py @@ -0,0 +1,45 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see . + +from test.support.integration.soledad_test_base import SoledadTestBase + + +class RetrieveAttachmentTest(SoledadTestBase): + + def setUp(self): + SoledadTestBase.setUp(self) + + def tearDown(self): + SoledadTestBase.tearDown(self) + + def test_attachment_content_is_retrieved(self): + ident = 'F4E99C1CEC4D300A4223A96CCABBE0304BDBC31C550A5A03E207A5E4C3C71A22' + attachment_dict = {'content-disposition': 'attachment', + 'content-transfer-encoding': '', + 'type': 'cnt', + 'raw': 'cGVxdWVubyBhbmV4byA6RAo=', + 'phash': ident, + 'content-type': 'text/plain; charset=US-ASCII; name="attachment_pequeno.txt"'} + + self.client.add_document_to_soledad(attachment_dict) + + d = self.get_attachment(ident, 'base64') + + def _assert(attachment): + self.assertEquals('pequeno anexo :D\n', attachment) + d.addCallback(_assert) + + return d diff --git a/service/test/integration/test_search.py b/service/test/integration/test_search.py new file mode 100644 index 00000000..464830d2 --- /dev/null +++ b/service/test/integration/test_search.py @@ -0,0 +1,144 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see . + +from nose.twistedtools import deferred +from test.support.integration import * + + +class SearchTest(SoledadTestBase): + + def setUp(self): + SoledadTestBase.setUp(self) + + @deferred(timeout=SoledadTestBase.DEFERRED_TIMEOUT) + def test_that_tags_returns_all_tags(self): + input_mail = MailBuilder().with_tags(['important']).build_input_mail() + self.client.add_mail_to_inbox(input_mail) + + d = self.get_tags() + + def _assert(all_tags): + all_tag_names = [t['name'] for t in all_tags] + self.assertTrue('inbox' in all_tag_names) + self.assertTrue('sent' in all_tag_names) + self.assertTrue('trash' in all_tag_names) + self.assertTrue('drafts' in all_tag_names) + self.assertTrue('important' in all_tag_names) + d.addCallback(_assert) + return d + + @deferred(timeout=SoledadTestBase.DEFERRED_TIMEOUT) + def test_that_tags_are_filtered_by_query(self): + input_mail = MailBuilder().with_tags(['ateu', 'catoa', 'luat', 'zuado']).build_input_mail() + self.client.add_mail_to_inbox(input_mail) + + d = self.get_tags(q=["at"], skipDefaultTags=["true"]) + + def _assert(all_tags): + all_tag_names = [t['name'] for t in all_tags] + self.assertEqual(3, len(all_tag_names)) + self.assertTrue('ateu' in all_tag_names) + self.assertTrue('catoa' in all_tag_names) + self.assertTrue('luat' in all_tag_names) + + d.addCallback(_assert) + return d + + @deferred(timeout=SoledadTestBase.DEFERRED_TIMEOUT) + def test_that_default_tags_are_ignorable(self): + input_mail = MailBuilder().with_tags(['sometag']).build_input_mail() + self.client.add_mail_to_inbox(input_mail) + + d = self.get_tags(skipDefaultTags=["true"]) + + def _assert(all_tags): + all_tag_names = [t['name'] for t in all_tags] + self.assertEqual(1, len(all_tag_names)) + self.assertTrue('sometag' in all_tag_names) + d.addCallback(_assert) + return d + + @deferred(timeout=SoledadTestBase.DEFERRED_TIMEOUT_LONG) + def test_tags_count(self): + self.client.add_multiple_to_mailbox(num=10, mailbox='inbox', flags=['\\Recent']) + self.client.add_multiple_to_mailbox(num=5, mailbox='inbox', flags=['\\Seen']) + self.client.add_multiple_to_mailbox(num=3, mailbox='inbox', flags=['\\Recent'], tags=['important', 'later']) + self.client.add_multiple_to_mailbox(num=1, mailbox='inbox', flags=['\\Seen'], tags=['important']) + + d = self.get_tags() + + def _assert(tags_count): + self.assertEqual(self.get_count(tags_count, 'inbox')['total'], 19) + self.assertEqual(self.get_count(tags_count, 'inbox')['read'], 6) + self.assertEqual(self.get_count(tags_count, 'important')['total'], 4) + self.assertEqual(self.get_count(tags_count, 'important')['read'], 1) + d.addCallback(_assert) + return d + + def test_search_mails_different_window(self): + input_mail = MailBuilder().build_input_mail() + input_mail2 = MailBuilder().build_input_mail() + self.client.add_mail_to_inbox(input_mail) + self.client.add_mail_to_inbox(input_mail2) + + first_page = self.get_mails_by_tag('inbox', page=1, window=1) + + self.assertEqual(len(first_page), 1) + + def test_search_mails_with_multiple_pages(self): + input_mail = MailBuilder().build_input_mail() + input_mail2 = MailBuilder().build_input_mail() + self.client.add_mail_to_inbox(input_mail) + self.client.add_mail_to_inbox(input_mail2) + + first_page = self.get_mails_by_tag('inbox', page=1, window=1) + second_page = self.get_mails_by_tag('inbox', page=2, window=1) + + idents = [input_mail.ident, input_mail2.ident] + + self.assertIn(first_page[0].ident, idents) + self.assertIn(second_page[0].ident, idents) + + def test_page_zero_fetches_first_page(self): + input_mail = MailBuilder().build_input_mail() + self.client.add_mail_to_inbox(input_mail) + page = self.get_mails_by_tag('inbox', page=0, window=1) + self.assertEqual(page[0].ident, input_mail.ident) + + def get_count(self, tags_count, mailbox): + for tag in tags_count: + if tag['name'] == mailbox: + return tag['counts'] + + def test_order_by_date(self): + input_mail = MailBuilder().with_date('2014-10-15T15:15').build_input_mail() + input_mail2 = MailBuilder().with_date('2014-10-15T15:16').build_input_mail() + + self.client.add_mail_to_inbox(input_mail) + self.client.add_mail_to_inbox(input_mail2) + + results = self.get_mails_by_tag('inbox') + self.assertEqual(results[0].ident, input_mail2.ident) + self.assertEqual(results[1].ident, input_mail.ident) + + def test_search_base64_body(self): + body = u'bl\xe1' + input_mail = MailBuilder().with_body(body.encode('utf-8')).build_input_mail() + self.client.add_mail_to_inbox(input_mail) + results = self.search(body) + + self.assertGreater(len(results), 0, 'No results returned from search') + self.assertEquals(results[0].ident, input_mail.ident) diff --git a/service/test/integration/test_soledad_querier.py b/service/test/integration/test_soledad_querier.py new file mode 100644 index 00000000..f8767630 --- /dev/null +++ b/service/test/integration/test_soledad_querier.py @@ -0,0 +1,88 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see . + +import copy +import time + +from test.support.integration import * +from leap.mail.imap.fields import WithMsgFields + + +class SoledadQuerierTest(SoledadTestBase, WithMsgFields): + + def setUp(self): + SoledadTestBase.setUp(self) + self.soledad = self.client.soledad + self.maxDiff = None + self.soledad_querier = self.client.soledad_querier + + def tearDown(self): + SoledadTestBase.tearDown(self) + + def _get_empty_mailbox(self): + return copy.deepcopy(self.EMPTY_MBOX) + + def _create_mailbox(self, mailbox_name): + new_mailbox = self._get_empty_mailbox() + new_mailbox['mbox'] = mailbox_name + new_mailbox['created'] = int(time.time() * 10E2) + return self.soledad.create_doc(new_mailbox) + + def _get_mailboxes_from_soledad(self, mailbox_name): + return [m for m in self.soledad.get_from_index('by-type', 'mbox') if m.content['mbox'] == mailbox_name] + + def test_remove_dup_mailboxes_keeps_the_one_with_the_highest_last_uid(self): + self.client.add_multiple_to_mailbox(3, 'INBOX') # by now we already have one inbox with 3 mails + self._create_mailbox('INBOX') # now we have a duplicate + + # make sure we have two + inboxes = self._get_mailboxes_from_soledad('INBOX') + self.assertEqual(2, len(inboxes)) + + self.soledad_querier.remove_duplicates() + + # make sure we only have one, and the one with the right lastuid + inboxes = self._get_mailboxes_from_soledad('INBOX') + self.assertEqual(1, len(inboxes)) + self.assertEqual(3, inboxes[0].content['lastuid']) + + def test_all_mails_skips_incomplete_mails(self): + # creating incomplete mail, we will only save the fdoc + fdoc, hdoc, bdoc = MailBuilder().build_input_mail().get_for_save(1, 'INBOX') + self.soledad.create_doc(fdoc) + + mails = self.soledad_querier.all_mails() + self.assertEqual(0, len(mails)) # mail is incomplete since it only has fdoc + + # adding the hdoc still doesn't complete the mail + self.soledad.create_doc(hdoc) + + mails = self.soledad_querier.all_mails() + self.assertEqual(0, len(mails)) + + # now the mail is complete + self.soledad.create_doc(bdoc) + + mails = self.soledad_querier.all_mails() + self.assertEqual(1, len(mails)) + + def test_get_mails_by_chash(self): + mails = self.client.add_multiple_to_mailbox(3, 'INBOX') + chashes = [mail.ident for mail in mails] + + fetched_mails = self.soledad_querier.mails(chashes) + + self.assertEquals([m.as_dict() for m in fetched_mails], [m.as_dict() for m in mails]) diff --git a/service/test/integration/test_tags.py b/service/test/integration/test_tags.py new file mode 100644 index 00000000..6072392c --- /dev/null +++ b/service/test/integration/test_tags.py @@ -0,0 +1,54 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see . +import json + +from test.support.integration import * +from pixelated.adapter.services.tag_service import TagService + + +class TagsTest(SoledadTestBase): + + def setUp(self): + SoledadTestBase.setUp(self) + + def tearDown(self): + SoledadTestBase.tearDown(self) + + def _tags_json(self, tags): + return json.dumps({'newtags': tags}) + + def test_add_tag_to_an_inbox_mail_and_query(self): + mail = MailBuilder().with_subject('Mail with tags').build_input_mail() + self.client.add_mail_to_inbox(mail) + + self.post_tags(mail.ident, self._tags_json(['IMPORTANT'])) + + mails = self.get_mails_by_tag('inbox') + self.assertEquals({'important'}, set(mails[0].tags)) + + mails = self.get_mails_by_tag('important') + self.assertEquals('Mail with tags', mails[0].subject) + + def test_addition_of_reserved_tags_is_not_allowed(self): + mail = MailBuilder().with_subject('Mail with tags').build_input_mail() + self.client.add_mail_to_inbox(mail) + + for tag in TagService.SPECIAL_TAGS: + response = self.post_tags(mail.ident, self._tags_json([tag.name.upper()])) + self.assertEquals("None of the following words can be used as tags: %s" % tag.name, response) + + mail = self.client.mailboxes.inbox().mail(mail.ident) + self.assertNotIn('drafts', mail.tags) diff --git a/service/test/unit/adapter/draft_service_test.py b/service/test/unit/adapter/draft_service_test.py deleted file mode 100644 index baa07ce0..00000000 --- a/service/test/unit/adapter/draft_service_test.py +++ /dev/null @@ -1,30 +0,0 @@ -import unittest - -from pixelated.adapter.model.mail import InputMail -from pixelated.adapter.services.draft_service import DraftService -import test.support.test_helper as test_helper -from mockito import * - - -class DraftServiceTest(unittest.TestCase): - - def setUp(self): - self.mailboxes = mock() - self.drafts_mailbox = mock() - self.draft_service = DraftService(self.mailboxes) - when(self.mailboxes).drafts().thenReturn(self.drafts_mailbox) - - def test_add_draft(self): - mail = InputMail() - self.draft_service.create_draft(mail) - - verify(self.drafts_mailbox).add(mail) - - def test_update_draft(self): - mail = InputMail.from_dict(test_helper.mail_dict()) - when(self.drafts_mailbox).add(mail).thenReturn(mail) - - self.draft_service.update_draft(mail.ident, mail) - - inorder.verify(self.drafts_mailbox).add(mail) - inorder.verify(self.drafts_mailbox).remove(mail.ident) diff --git a/service/test/unit/adapter/mail_service_test.py b/service/test/unit/adapter/mail_service_test.py deleted file mode 100644 index fca6e79b..00000000 --- a/service/test/unit/adapter/mail_service_test.py +++ /dev/null @@ -1,51 +0,0 @@ -# -# Copyright (c) 2014 ThoughtWorks, Inc. -# -# Pixelated is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Pixelated is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with Pixelated. If not, see . -import unittest - -from pixelated.adapter.services.mail_service import MailService -from mockito import * - - -class TestMailService(unittest.TestCase): - def setUp(self): - self.querier = mock() - self.mailboxes = mock() - self.tag_service = mock() - self.mailboxes.drafts = lambda: mock() - self.mailboxes.trash = lambda: mock() - self.mailboxes.sent = lambda: mock() - - self.mail_sender = mock() - self.mail_service = MailService(self.mailboxes, self.mail_sender, self.tag_service, self.querier) - - def test_send_mail(self): - mail = "mail" - - self.mail_service.send(1, mail) - - verify(self.mail_sender).sendmail(mail) - - def test_mark_as_read(self): - mail = mock() - when(self.mail_service).mail(any()).thenReturn(mail) - self.mail_service.mark_as_read(1) - - verify(mail).mark_as_read() - - def test_delete_mail(self): - self.mail_service.delete_mail(1) - - verify(self.mailboxes).move_to_trash(1) diff --git a/service/test/unit/adapter/mail_test.py b/service/test/unit/adapter/mail_test.py deleted file mode 100644 index be7b731d..00000000 --- a/service/test/unit/adapter/mail_test.py +++ /dev/null @@ -1,287 +0,0 @@ -# -# Copyright (c) 2014 ThoughtWorks, Inc. -# -# Pixelated is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Pixelated is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with Pixelated. If not, see . -import unittest - -import pixelated.support.date -from pixelated.adapter.model.mail import PixelatedMail, InputMail -from mockito import * -from test.support import test_helper -import dateutil.parser as dateparser -import base64 -from leap.mail.imap.fields import fields -from datetime import datetime - - -class TestPixelatedMail(unittest.TestCase): - def setUp(self): - self.querier = mock() - - def test_parse_date_from_soledad_uses_date_header_if_available(self): - leap_mail_date = 'Wed, 3 Sep 2014 12:36:17 -0300' - leap_mail_date_in_iso_format = "2014-09-03T12:36:17-03:00" - - leap_mail = test_helper.leap_mail(headers={'date': leap_mail_date}) - - mail = PixelatedMail.from_soledad(*leap_mail, soledad_querier=self.querier) - - self.assertEqual(str(mail.headers['Date']), leap_mail_date_in_iso_format) - - def test_parse_date_from_soledad_fallback_to_received_header_if_date_header_isnt_available(self): - leap_mail_date = "Wed, 03 Sep 2014 13:11:15 -0300" - leap_mail_date_in_iso_format = "2014-09-03T13:11:15-03:00" - leap_mail_received_header = "by bitmask.local from 127.0.0.1 with ESMTP ;\n " + leap_mail_date - - leap_mail = test_helper.leap_mail(headers={'received': leap_mail_received_header}) - - mail = PixelatedMail.from_soledad(*leap_mail, soledad_querier=self.querier) - - self.assertEqual(str(mail.headers['Date']), leap_mail_date_in_iso_format) - - def test_update_tags_return_a_set_with_the_current_tags(self): - soledad_docs = test_helper.leap_mail(extra_headers={'X-tags': '["custom_1", "custom_2"]'}) - pixelated_mail = PixelatedMail.from_soledad(*soledad_docs, soledad_querier=self.querier) - - current_tags = pixelated_mail.update_tags({'custom_1', 'custom_3'}) - self.assertEquals({'custom_3', 'custom_1'}, current_tags) - - def test_mark_as_read(self): - mail = PixelatedMail.from_soledad(*test_helper.leap_mail(flags=[]), soledad_querier=self.querier) - - mail.mark_as_read() - - self.assertEquals(mail.fdoc.content['flags'], ['\\Seen']) - - def test_mark_as_not_recent(self): - mail = PixelatedMail.from_soledad(*test_helper.leap_mail(flags=['\\Recent']), soledad_querier=self.querier) - - mail.mark_as_not_recent() - - self.assertEquals(mail.fdoc.content['flags'], []) - - def test_get_for_save_adds_from(self): - InputMail.FROM_EMAIL_ADDRESS = 'me@pixelated.org' - headers = {'Subject': 'The subject', - 'Date': str(datetime.now()), - 'To': 'me@pixelated.org'} - - input_mail = InputMail() - input_mail.headers = headers - - self.assertEqual('me@pixelated.org', input_mail.get_for_save(1, 'SENT')[1][fields.HEADERS_KEY]['From']) - - def test_as_dict(self): - headers = {'Subject': 'The subject', - 'From': 'someone@pixelated.org', - 'To': 'me@pixelated.org'} - fdoc, hdoc, bdoc = test_helper.leap_mail(flags=['\\Recent'], - extra_headers=headers) - - InputMail.FROM_EMAIL_ADDRESS = 'me@pixelated.org' - - mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier) - - _dict = mail.as_dict() - - self.maxDiff = None - - self.assertEquals(_dict, {'htmlBody': None, - 'textPlainBody': 'body', - 'header': { - 'date': dateparser.parse(hdoc.content['date']).isoformat(), - 'from': 'someone@pixelated.org', - 'subject': 'The subject', - 'to': ['me@pixelated.org'], - 'cc': [], - 'bcc': [] - }, - 'ident': 'chash', - 'mailbox': 'inbox', - 'security_casing': {'imprints': [{'state': 'no_signature_information'}], 'locks': []}, - 'status': ['recent'], - 'tags': [], - 'attachments': [], - 'replying': { - 'single': 'someone@pixelated.org', - 'all': { - 'to-field': ['someone@pixelated.org'], - 'cc-field': [] - } - }}) - - def test_use_reply_to_address_for_replying(self): - headers = {'Subject': 'The subject', - 'From': 'someone@pixelated.org', - 'Reply-To': 'reply-to-this-address@pixelated.org', - 'To': 'me@pixelated.org, \nalice@pixelated.org'} - fdoc, hdoc, bdoc = test_helper.leap_mail(flags=['\\Recent'], - extra_headers=headers) - - InputMail.FROM_EMAIL_ADDRESS = 'me@pixelated.org' - - mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier) - - _dict = mail.as_dict() - - self.assertEquals(_dict['replying'], {'single': 'reply-to-this-address@pixelated.org', - 'all': { - 'to-field': ['alice@pixelated.org', 'reply-to-this-address@pixelated.org'], - 'cc-field': [] - }}) - - def test_alternatives_body(self): - parts = {'alternatives': [], 'attachments': []} - parts['alternatives'].append({'content': 'blablabla', 'headers': {'Content-Type': 'text/plain'}}) - parts['alternatives'].append({'content': '

blablabla

', 'headers': {'Content-Type': 'text/html'}}) - - mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw='blablabla'), parts=parts, soledad_querier=None) - - self.assertRegexpMatches(mail.html_body, '^

blablabla

$') - self.assertRegexpMatches(mail.text_plain_body, '^blablabla$') - - def test_html_is_none_if_multiple_alternatives_have_no_html_part(self): - parts = { - 'attachments': [], - 'alternatives': [ - {'content': u'content', 'headers': {u'Content-Type': u'text/plain; charset=us-ascii'}}, - {'content': u'', 'headers': {u'Some info': u'info'}}]} - - mail = PixelatedMail.from_soledad(None, None, None, parts=parts, soledad_querier=None) - self.assertIsNone(mail.html_body) - - def test_percent_character_is_allowed_on_body(self): - parts = {'alternatives': [], 'attachments': []} - parts['alternatives'].append({'content': '100% happy with percentage symbol', 'headers': {'Content-Type': 'text/plain'}}) - parts['alternatives'].append({'content': '

100% happy with percentage symbol

', 'headers': {'Content-Type': 'text/html'}}) - - mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw="100% happy with percentage symbol"), parts=parts, soledad_querier=None) - - self.assertRegexpMatches(mail.text_plain_body, '([\s\S]*100%)') - self.assertRegexpMatches(mail.html_body, '([\s\S]*100%)') - - def test_content_type_header_of_mail_part_is_used(self): - plain_headers = {'Content-Type': 'text/plain; charset=utf-8', 'Content-Transfer-Encoding': 'quoted-printable'} - html_headers = {'Content-Type': 'text/html; charset=utf-8', 'Content-Transfer-Encoding': 'quoted-printable'} - parts = {'alternatives': [{'content': 'H=C3=A4llo', 'headers': plain_headers}, {'content': '

H=C3=A4llo

', 'headers': html_headers}]} - - mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw='some raw body'), parts=parts, soledad_querier=None) - - self.assertEqual(2, len(mail.alternatives)) - self.assertEquals(u'H\xe4llo', mail.text_plain_body) - self.assertEquals(u'

H\xe4llo

', mail.html_body) - - def test_clean_line_breaks_on_address_headers(self): - many_recipients = 'One ,\nTwo , Normal ,\nalone@mail.com' - headers = {'Cc': many_recipients, - 'Bcc': many_recipients, - 'To': many_recipients} - fdoc, hdoc, bdoc = test_helper.leap_mail(flags=['\\Recent'], - extra_headers=headers) - - mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier) - - for header_label in ['To', 'Cc', 'Bcc']: - for address in mail.headers[header_label]: - self.assertNotIn('\n', address) - self.assertNotIn(',', address) - self.assertEquals(4, len(mail.headers[header_label])) - - def test_that_body_understands_base64(self): - body = u'bl\xe1' - encoded_body = unicode(body.encode('utf-8').encode('base64')) - - fdoc, hdoc, bdoc = test_helper.leap_mail() - parts = {'alternatives': []} - parts['alternatives'].append({'content': encoded_body, 'headers': {'Content-Transfer-Encoding': 'base64'}}) - mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier, parts=parts) - - self.assertEquals(body, mail.text_plain_body) - - def _create_bdoc(self, raw): - class FakeBDoc: - def __init__(self, raw): - self.content = {'raw': raw} - return FakeBDoc(raw) - - -class InputMailTest(unittest.TestCase): - mail_dict = lambda x: { - 'body': 'Este \xe9 o corpo', - 'header': { - 'cc': ['cc@pixelated.org', 'anothercc@pixelated.org'], - 'to': ['to@pixelated.org', 'anotherto@pixelated.org'], - 'bcc': ['bcc@pixelated.org', 'anotherbcc@pixelated.org'], - 'subject': 'Oi' - }, - 'ident': '', - 'tags': ['sent'] - } - - multipart_mail_dict = lambda x: { - 'body': [{'content-type': 'plain', 'raw': 'Hello world!'}, - {'content-type': 'html', 'raw': '

Hello html world!

'}], - 'header': { - 'cc': ['cc@pixelated.org', 'anothercc@pixelated.org'], - 'to': ['to@pixelated.org', 'anotherto@pixelated.org'], - 'bcc': ['bcc@pixelated.org', 'anotherbcc@pixelated.org'], - 'subject': 'Oi', - }, - 'ident': '', - 'tags': ['sent'] - } - - def test_to_mime_multipart_should_add_blank_fields(self): - pixelated.support.date.iso_now = lambda: 'date now' - - mail_dict = self.mail_dict() - mail_dict['header']['to'] = '' - mail_dict['header']['bcc'] = '' - mail_dict['header']['cc'] = '' - mail_dict['header']['subject'] = '' - - mime_multipart = InputMail.from_dict(mail_dict).to_mime_multipart() - - self.assertNotRegexpMatches(mime_multipart.as_string(), "\nTo: \n") - self.assertNotRegexpMatches(mime_multipart.as_string(), "\nBcc: \n") - self.assertNotRegexpMatches(mime_multipart.as_string(), "\nCc: \n") - self.assertNotRegexpMatches(mime_multipart.as_string(), "\nSubject: \n") - - def test_to_mime_multipart(self): - pixelated.support.date.iso_now = lambda: 'date now' - - mime_multipart = InputMail.from_dict(self.mail_dict()).to_mime_multipart() - - self.assertRegexpMatches(mime_multipart.as_string(), "\nTo: to@pixelated.org, anotherto@pixelated.org\n") - self.assertRegexpMatches(mime_multipart.as_string(), "\nCc: cc@pixelated.org, anothercc@pixelated.org\n") - self.assertRegexpMatches(mime_multipart.as_string(), "\nBcc: bcc@pixelated.org, anotherbcc@pixelated.org\n") - self.assertRegexpMatches(mime_multipart.as_string(), "\nDate: date now\n") - self.assertRegexpMatches(mime_multipart.as_string(), "\nSubject: Oi\n") - self.assertRegexpMatches(mime_multipart.as_string(), base64.b64encode(self.mail_dict()['body'])) - - def test_smtp_format(self): - InputMail.FROM_EMAIL_ADDRESS = 'pixelated@org' - - smtp_format = InputMail.from_dict(self.mail_dict()).to_smtp_format() - - self.assertRegexpMatches(smtp_format, "\nFrom: pixelated@org") - - def test_to_mime_multipart_handles_alternative_bodies(self): - mime_multipart = InputMail.from_dict(self.multipart_mail_dict()).to_mime_multipart() - - part_one = 'Content-Type: text/plain; charset="us-ascii"\nMIME-Version: 1.0\nContent-Transfer-Encoding: 7bit\n\nHello world!' - part_two = 'Content-Type: text/html; charset="us-ascii"\nMIME-Version: 1.0\nContent-Transfer-Encoding: 7bit\n\n

Hello html world!

' - - self.assertRegexpMatches(mime_multipart.as_string(), part_one) - self.assertRegexpMatches(mime_multipart.as_string(), part_two) diff --git a/service/test/unit/adapter/mailbox_indexer_listener_test.py b/service/test/unit/adapter/mailbox_indexer_listener_test.py deleted file mode 100644 index 65ba8966..00000000 --- a/service/test/unit/adapter/mailbox_indexer_listener_test.py +++ /dev/null @@ -1,54 +0,0 @@ -# -# Copyright (c) 2014 ThoughtWorks, Inc. -# -# Pixelated is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Pixelated is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with Pixelated. If not, see . -import unittest - -from mockito import * -from pixelated.adapter.listeners.mailbox_indexer_listener import MailboxIndexerListener - - -class MailboxListenerTest(unittest.TestCase): - def setUp(self): - self.querier = mock() - self.account = mock() - self.account.mailboxes = [] - - def test_add_itself_to_mailbox_listeners(self): - self.account.mailboxes = ['INBOX'] - mailbox = mock() - when(self.account).getMailbox('INBOX').thenReturn(mailbox) - mailbox.listeners = set() - when(mailbox).addListener = lambda x: mailbox.listeners.add(x) - - self.assertNotIn(MailboxIndexerListener('INBOX', self.querier), mailbox.listeners) - - MailboxIndexerListener.listen(self.account, 'INBOX', self.querier) - - self.assertIn(MailboxIndexerListener('INBOX', self.querier), mailbox.listeners) - - def test_reindex_missing_idents(self): - search_engine = mock() - when(search_engine).search('tag:inbox', all_mails=True).thenReturn(['ident1', 'ident2']) - - MailboxIndexerListener.SEARCH_ENGINE = search_engine - - listener = MailboxIndexerListener('INBOX', self.querier) - when(self.querier).idents_by_mailbox('INBOX').thenReturn({'ident1', 'ident2', 'missing_ident'}) - self.querier.used_arguments = [] - self.querier.mails = lambda x: self.querier.used_arguments.append(x) - listener.newMessages(10, 5) - - verify(self.querier, times=1).idents_by_mailbox('INBOX') - self.assertIn({'missing_ident'}, self.querier.used_arguments) diff --git a/service/test/unit/adapter/mailbox_test.py b/service/test/unit/adapter/mailbox_test.py deleted file mode 100644 index 9725f418..00000000 --- a/service/test/unit/adapter/mailbox_test.py +++ /dev/null @@ -1,36 +0,0 @@ -# -# Copyright (c) 2014 ThoughtWorks, Inc. -# -# Pixelated is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Pixelated is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with Pixelated. If not, see . -import unittest - -from pixelated.adapter.model.mail import PixelatedMail -from pixelated.adapter.services.mailbox import Mailbox -from mockito import * -from test.support import test_helper - - -class PixelatedMailboxTest(unittest.TestCase): - def setUp(self): - self.tag_service = mock() - self.querier = mock() - self.mailbox = Mailbox('INBOX', self.querier) - - def test_remove_message_from_mailbox(self): - mail = PixelatedMail.from_soledad(*test_helper.leap_mail(), soledad_querier=self.querier) - when(self.querier).mail(1).thenReturn(mail) - - self.mailbox.remove(1) - - verify(self.querier).remove_mail(mail) diff --git a/service/test/unit/adapter/soledad_querier_test.py b/service/test/unit/adapter/soledad_querier_test.py deleted file mode 100644 index 2cc23750..00000000 --- a/service/test/unit/adapter/soledad_querier_test.py +++ /dev/null @@ -1,106 +0,0 @@ -# -# Copyright (c) 2014 ThoughtWorks, Inc. -# -# Pixelated is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Pixelated is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with Pixelated. If not, see . -import unittest -import json -import base64 -import quopri - -from pixelated.adapter.soledad.soledad_querier import SoledadQuerier -from mockito import mock, when, any -import os - - -class SoledadQuerierTest(unittest.TestCase): - - def test_extract_parts(self): - soledad = mock() - bdoc = mock() - bdoc.content = {'raw': 'esse papo seu ta qualquer coisa'} - when(soledad).get_from_index('by-type-and-payloadhash', 'cnt', any(unicode)).thenReturn([bdoc]) - multipart_attachment_file = os.path.join(os.path.dirname(__file__), '..', 'fixtures', 'multipart_attachment.json') - with open(multipart_attachment_file) as f: - hdoc = json.loads(f.read()) - querier = SoledadQuerier(soledad) - - parts = querier._extract_parts(hdoc) - - self.assertIn('alternatives', parts.keys()) - self.assertIn('attachments', parts.keys()) - self.assertEquals(2, len(parts['alternatives'])) - self.assertEquals(1, len(parts['attachments'])) - - self.check_alternatives(parts) - self.check_attachments(parts) - - def check_alternatives(self, parts): - for alternative in parts['alternatives']: - self.assertIn('headers', alternative) - self.assertIn('content', alternative) - - def check_attachments(self, parts): - for attachment in parts['attachments']: - self.assertIn('headers', attachment) - self.assertIn('ident', attachment) - self.assertIn('name', attachment) - - def test_extract_part_without_headers(self): - soledad = mock() - bdoc = mock() - bdoc.content = {'raw': 'esse papo seu ta qualquer coisa'} - when(soledad).get_from_index('by-type-and-payloadhash', 'cnt', any(unicode)).thenReturn([bdoc]) - hdoc = {'multi': True, 'part_map': {'1': {'multi': False, 'phash': u'0400BEBACAFE'}}} - querier = SoledadQuerier(soledad) - - parts = querier._extract_parts(hdoc) - - self.assertEquals(bdoc.content['raw'], parts['alternatives'][0]['content']) - - def test_extract_handles_missing_part_map(self): - soledad = mock() - hdoc = {u'multi': True, - u'ctype': u'message/delivery-status', - u'headers': [[u'Content-Description', u'Delivery report'], [u'Content-Type', u'message/delivery-status']], - u'parts': 2, - u'phash': None, - u'size': 554} - querier = SoledadQuerier(soledad) - - parts = querier._extract_parts(hdoc) - - self.assertEquals(0, len(parts['alternatives'])) - self.assertEquals(0, len(parts['attachments'])) - - def test_attachment_base64(self): - soledad = mock() - bdoc = mock() - bdoc.content = {'raw': base64.encodestring('esse papo seu ta qualquer coisa'), 'content-type': 'text/plain'} - when(soledad).get_from_index('by-type-and-payloadhash', 'cnt', any(unicode)).thenReturn([bdoc]) - querier = SoledadQuerier(soledad) - - attachment = querier.attachment(u'0400BEBACAFE', 'base64') - - self.assertEquals('esse papo seu ta qualquer coisa', attachment['content']) - - def test_attachment_quoted_printable(self): - soledad = mock() - bdoc = mock() - bdoc.content = {'raw': quopri.encodestring('esse papo seu ta qualquer coisa'), 'content-type': 'text/plain'} - when(soledad).get_from_index('by-type-and-payloadhash', 'cnt', any(unicode)).thenReturn([bdoc]) - querier = SoledadQuerier(soledad) - - attachment = querier.attachment(u'0400BEBACAFE', 'quoted-printable') - - self.assertEquals('esse papo seu ta qualquer coisa', attachment['content']) diff --git a/service/test/unit/adapter/test_draft_service.py b/service/test/unit/adapter/test_draft_service.py new file mode 100644 index 00000000..baa07ce0 --- /dev/null +++ b/service/test/unit/adapter/test_draft_service.py @@ -0,0 +1,30 @@ +import unittest + +from pixelated.adapter.model.mail import InputMail +from pixelated.adapter.services.draft_service import DraftService +import test.support.test_helper as test_helper +from mockito import * + + +class DraftServiceTest(unittest.TestCase): + + def setUp(self): + self.mailboxes = mock() + self.drafts_mailbox = mock() + self.draft_service = DraftService(self.mailboxes) + when(self.mailboxes).drafts().thenReturn(self.drafts_mailbox) + + def test_add_draft(self): + mail = InputMail() + self.draft_service.create_draft(mail) + + verify(self.drafts_mailbox).add(mail) + + def test_update_draft(self): + mail = InputMail.from_dict(test_helper.mail_dict()) + when(self.drafts_mailbox).add(mail).thenReturn(mail) + + self.draft_service.update_draft(mail.ident, mail) + + inorder.verify(self.drafts_mailbox).add(mail) + inorder.verify(self.drafts_mailbox).remove(mail.ident) diff --git a/service/test/unit/adapter/test_mail.py b/service/test/unit/adapter/test_mail.py new file mode 100644 index 00000000..be7b731d --- /dev/null +++ b/service/test/unit/adapter/test_mail.py @@ -0,0 +1,287 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see . +import unittest + +import pixelated.support.date +from pixelated.adapter.model.mail import PixelatedMail, InputMail +from mockito import * +from test.support import test_helper +import dateutil.parser as dateparser +import base64 +from leap.mail.imap.fields import fields +from datetime import datetime + + +class TestPixelatedMail(unittest.TestCase): + def setUp(self): + self.querier = mock() + + def test_parse_date_from_soledad_uses_date_header_if_available(self): + leap_mail_date = 'Wed, 3 Sep 2014 12:36:17 -0300' + leap_mail_date_in_iso_format = "2014-09-03T12:36:17-03:00" + + leap_mail = test_helper.leap_mail(headers={'date': leap_mail_date}) + + mail = PixelatedMail.from_soledad(*leap_mail, soledad_querier=self.querier) + + self.assertEqual(str(mail.headers['Date']), leap_mail_date_in_iso_format) + + def test_parse_date_from_soledad_fallback_to_received_header_if_date_header_isnt_available(self): + leap_mail_date = "Wed, 03 Sep 2014 13:11:15 -0300" + leap_mail_date_in_iso_format = "2014-09-03T13:11:15-03:00" + leap_mail_received_header = "by bitmask.local from 127.0.0.1 with ESMTP ;\n " + leap_mail_date + + leap_mail = test_helper.leap_mail(headers={'received': leap_mail_received_header}) + + mail = PixelatedMail.from_soledad(*leap_mail, soledad_querier=self.querier) + + self.assertEqual(str(mail.headers['Date']), leap_mail_date_in_iso_format) + + def test_update_tags_return_a_set_with_the_current_tags(self): + soledad_docs = test_helper.leap_mail(extra_headers={'X-tags': '["custom_1", "custom_2"]'}) + pixelated_mail = PixelatedMail.from_soledad(*soledad_docs, soledad_querier=self.querier) + + current_tags = pixelated_mail.update_tags({'custom_1', 'custom_3'}) + self.assertEquals({'custom_3', 'custom_1'}, current_tags) + + def test_mark_as_read(self): + mail = PixelatedMail.from_soledad(*test_helper.leap_mail(flags=[]), soledad_querier=self.querier) + + mail.mark_as_read() + + self.assertEquals(mail.fdoc.content['flags'], ['\\Seen']) + + def test_mark_as_not_recent(self): + mail = PixelatedMail.from_soledad(*test_helper.leap_mail(flags=['\\Recent']), soledad_querier=self.querier) + + mail.mark_as_not_recent() + + self.assertEquals(mail.fdoc.content['flags'], []) + + def test_get_for_save_adds_from(self): + InputMail.FROM_EMAIL_ADDRESS = 'me@pixelated.org' + headers = {'Subject': 'The subject', + 'Date': str(datetime.now()), + 'To': 'me@pixelated.org'} + + input_mail = InputMail() + input_mail.headers = headers + + self.assertEqual('me@pixelated.org', input_mail.get_for_save(1, 'SENT')[1][fields.HEADERS_KEY]['From']) + + def test_as_dict(self): + headers = {'Subject': 'The subject', + 'From': 'someone@pixelated.org', + 'To': 'me@pixelated.org'} + fdoc, hdoc, bdoc = test_helper.leap_mail(flags=['\\Recent'], + extra_headers=headers) + + InputMail.FROM_EMAIL_ADDRESS = 'me@pixelated.org' + + mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier) + + _dict = mail.as_dict() + + self.maxDiff = None + + self.assertEquals(_dict, {'htmlBody': None, + 'textPlainBody': 'body', + 'header': { + 'date': dateparser.parse(hdoc.content['date']).isoformat(), + 'from': 'someone@pixelated.org', + 'subject': 'The subject', + 'to': ['me@pixelated.org'], + 'cc': [], + 'bcc': [] + }, + 'ident': 'chash', + 'mailbox': 'inbox', + 'security_casing': {'imprints': [{'state': 'no_signature_information'}], 'locks': []}, + 'status': ['recent'], + 'tags': [], + 'attachments': [], + 'replying': { + 'single': 'someone@pixelated.org', + 'all': { + 'to-field': ['someone@pixelated.org'], + 'cc-field': [] + } + }}) + + def test_use_reply_to_address_for_replying(self): + headers = {'Subject': 'The subject', + 'From': 'someone@pixelated.org', + 'Reply-To': 'reply-to-this-address@pixelated.org', + 'To': 'me@pixelated.org, \nalice@pixelated.org'} + fdoc, hdoc, bdoc = test_helper.leap_mail(flags=['\\Recent'], + extra_headers=headers) + + InputMail.FROM_EMAIL_ADDRESS = 'me@pixelated.org' + + mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier) + + _dict = mail.as_dict() + + self.assertEquals(_dict['replying'], {'single': 'reply-to-this-address@pixelated.org', + 'all': { + 'to-field': ['alice@pixelated.org', 'reply-to-this-address@pixelated.org'], + 'cc-field': [] + }}) + + def test_alternatives_body(self): + parts = {'alternatives': [], 'attachments': []} + parts['alternatives'].append({'content': 'blablabla', 'headers': {'Content-Type': 'text/plain'}}) + parts['alternatives'].append({'content': '

blablabla

', 'headers': {'Content-Type': 'text/html'}}) + + mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw='blablabla'), parts=parts, soledad_querier=None) + + self.assertRegexpMatches(mail.html_body, '^

blablabla

$') + self.assertRegexpMatches(mail.text_plain_body, '^blablabla$') + + def test_html_is_none_if_multiple_alternatives_have_no_html_part(self): + parts = { + 'attachments': [], + 'alternatives': [ + {'content': u'content', 'headers': {u'Content-Type': u'text/plain; charset=us-ascii'}}, + {'content': u'', 'headers': {u'Some info': u'info'}}]} + + mail = PixelatedMail.from_soledad(None, None, None, parts=parts, soledad_querier=None) + self.assertIsNone(mail.html_body) + + def test_percent_character_is_allowed_on_body(self): + parts = {'alternatives': [], 'attachments': []} + parts['alternatives'].append({'content': '100% happy with percentage symbol', 'headers': {'Content-Type': 'text/plain'}}) + parts['alternatives'].append({'content': '

100% happy with percentage symbol

', 'headers': {'Content-Type': 'text/html'}}) + + mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw="100% happy with percentage symbol"), parts=parts, soledad_querier=None) + + self.assertRegexpMatches(mail.text_plain_body, '([\s\S]*100%)') + self.assertRegexpMatches(mail.html_body, '([\s\S]*100%)') + + def test_content_type_header_of_mail_part_is_used(self): + plain_headers = {'Content-Type': 'text/plain; charset=utf-8', 'Content-Transfer-Encoding': 'quoted-printable'} + html_headers = {'Content-Type': 'text/html; charset=utf-8', 'Content-Transfer-Encoding': 'quoted-printable'} + parts = {'alternatives': [{'content': 'H=C3=A4llo', 'headers': plain_headers}, {'content': '

H=C3=A4llo

', 'headers': html_headers}]} + + mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw='some raw body'), parts=parts, soledad_querier=None) + + self.assertEqual(2, len(mail.alternatives)) + self.assertEquals(u'H\xe4llo', mail.text_plain_body) + self.assertEquals(u'

H\xe4llo

', mail.html_body) + + def test_clean_line_breaks_on_address_headers(self): + many_recipients = 'One ,\nTwo , Normal ,\nalone@mail.com' + headers = {'Cc': many_recipients, + 'Bcc': many_recipients, + 'To': many_recipients} + fdoc, hdoc, bdoc = test_helper.leap_mail(flags=['\\Recent'], + extra_headers=headers) + + mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier) + + for header_label in ['To', 'Cc', 'Bcc']: + for address in mail.headers[header_label]: + self.assertNotIn('\n', address) + self.assertNotIn(',', address) + self.assertEquals(4, len(mail.headers[header_label])) + + def test_that_body_understands_base64(self): + body = u'bl\xe1' + encoded_body = unicode(body.encode('utf-8').encode('base64')) + + fdoc, hdoc, bdoc = test_helper.leap_mail() + parts = {'alternatives': []} + parts['alternatives'].append({'content': encoded_body, 'headers': {'Content-Transfer-Encoding': 'base64'}}) + mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier, parts=parts) + + self.assertEquals(body, mail.text_plain_body) + + def _create_bdoc(self, raw): + class FakeBDoc: + def __init__(self, raw): + self.content = {'raw': raw} + return FakeBDoc(raw) + + +class InputMailTest(unittest.TestCase): + mail_dict = lambda x: { + 'body': 'Este \xe9 o corpo', + 'header': { + 'cc': ['cc@pixelated.org', 'anothercc@pixelated.org'], + 'to': ['to@pixelated.org', 'anotherto@pixelated.org'], + 'bcc': ['bcc@pixelated.org', 'anotherbcc@pixelated.org'], + 'subject': 'Oi' + }, + 'ident': '', + 'tags': ['sent'] + } + + multipart_mail_dict = lambda x: { + 'body': [{'content-type': 'plain', 'raw': 'Hello world!'}, + {'content-type': 'html', 'raw': '

Hello html world!

'}], + 'header': { + 'cc': ['cc@pixelated.org', 'anothercc@pixelated.org'], + 'to': ['to@pixelated.org', 'anotherto@pixelated.org'], + 'bcc': ['bcc@pixelated.org', 'anotherbcc@pixelated.org'], + 'subject': 'Oi', + }, + 'ident': '', + 'tags': ['sent'] + } + + def test_to_mime_multipart_should_add_blank_fields(self): + pixelated.support.date.iso_now = lambda: 'date now' + + mail_dict = self.mail_dict() + mail_dict['header']['to'] = '' + mail_dict['header']['bcc'] = '' + mail_dict['header']['cc'] = '' + mail_dict['header']['subject'] = '' + + mime_multipart = InputMail.from_dict(mail_dict).to_mime_multipart() + + self.assertNotRegexpMatches(mime_multipart.as_string(), "\nTo: \n") + self.assertNotRegexpMatches(mime_multipart.as_string(), "\nBcc: \n") + self.assertNotRegexpMatches(mime_multipart.as_string(), "\nCc: \n") + self.assertNotRegexpMatches(mime_multipart.as_string(), "\nSubject: \n") + + def test_to_mime_multipart(self): + pixelated.support.date.iso_now = lambda: 'date now' + + mime_multipart = InputMail.from_dict(self.mail_dict()).to_mime_multipart() + + self.assertRegexpMatches(mime_multipart.as_string(), "\nTo: to@pixelated.org, anotherto@pixelated.org\n") + self.assertRegexpMatches(mime_multipart.as_string(), "\nCc: cc@pixelated.org, anothercc@pixelated.org\n") + self.assertRegexpMatches(mime_multipart.as_string(), "\nBcc: bcc@pixelated.org, anotherbcc@pixelated.org\n") + self.assertRegexpMatches(mime_multipart.as_string(), "\nDate: date now\n") + self.assertRegexpMatches(mime_multipart.as_string(), "\nSubject: Oi\n") + self.assertRegexpMatches(mime_multipart.as_string(), base64.b64encode(self.mail_dict()['body'])) + + def test_smtp_format(self): + InputMail.FROM_EMAIL_ADDRESS = 'pixelated@org' + + smtp_format = InputMail.from_dict(self.mail_dict()).to_smtp_format() + + self.assertRegexpMatches(smtp_format, "\nFrom: pixelated@org") + + def test_to_mime_multipart_handles_alternative_bodies(self): + mime_multipart = InputMail.from_dict(self.multipart_mail_dict()).to_mime_multipart() + + part_one = 'Content-Type: text/plain; charset="us-ascii"\nMIME-Version: 1.0\nContent-Transfer-Encoding: 7bit\n\nHello world!' + part_two = 'Content-Type: text/html; charset="us-ascii"\nMIME-Version: 1.0\nContent-Transfer-Encoding: 7bit\n\n

Hello html world!

' + + self.assertRegexpMatches(mime_multipart.as_string(), part_one) + self.assertRegexpMatches(mime_multipart.as_string(), part_two) diff --git a/service/test/unit/adapter/test_mail_service.py b/service/test/unit/adapter/test_mail_service.py new file mode 100644 index 00000000..fca6e79b --- /dev/null +++ b/service/test/unit/adapter/test_mail_service.py @@ -0,0 +1,51 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see . +import unittest + +from pixelated.adapter.services.mail_service import MailService +from mockito import * + + +class TestMailService(unittest.TestCase): + def setUp(self): + self.querier = mock() + self.mailboxes = mock() + self.tag_service = mock() + self.mailboxes.drafts = lambda: mock() + self.mailboxes.trash = lambda: mock() + self.mailboxes.sent = lambda: mock() + + self.mail_sender = mock() + self.mail_service = MailService(self.mailboxes, self.mail_sender, self.tag_service, self.querier) + + def test_send_mail(self): + mail = "mail" + + self.mail_service.send(1, mail) + + verify(self.mail_sender).sendmail(mail) + + def test_mark_as_read(self): + mail = mock() + when(self.mail_service).mail(any()).thenReturn(mail) + self.mail_service.mark_as_read(1) + + verify(mail).mark_as_read() + + def test_delete_mail(self): + self.mail_service.delete_mail(1) + + verify(self.mailboxes).move_to_trash(1) diff --git a/service/test/unit/adapter/test_mailbox.py b/service/test/unit/adapter/test_mailbox.py new file mode 100644 index 00000000..9725f418 --- /dev/null +++ b/service/test/unit/adapter/test_mailbox.py @@ -0,0 +1,36 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see . +import unittest + +from pixelated.adapter.model.mail import PixelatedMail +from pixelated.adapter.services.mailbox import Mailbox +from mockito import * +from test.support import test_helper + + +class PixelatedMailboxTest(unittest.TestCase): + def setUp(self): + self.tag_service = mock() + self.querier = mock() + self.mailbox = Mailbox('INBOX', self.querier) + + def test_remove_message_from_mailbox(self): + mail = PixelatedMail.from_soledad(*test_helper.leap_mail(), soledad_querier=self.querier) + when(self.querier).mail(1).thenReturn(mail) + + self.mailbox.remove(1) + + verify(self.querier).remove_mail(mail) diff --git a/service/test/unit/adapter/test_mailbox_indexer_listener.py b/service/test/unit/adapter/test_mailbox_indexer_listener.py new file mode 100644 index 00000000..65ba8966 --- /dev/null +++ b/service/test/unit/adapter/test_mailbox_indexer_listener.py @@ -0,0 +1,54 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see . +import unittest + +from mockito import * +from pixelated.adapter.listeners.mailbox_indexer_listener import MailboxIndexerListener + + +class MailboxListenerTest(unittest.TestCase): + def setUp(self): + self.querier = mock() + self.account = mock() + self.account.mailboxes = [] + + def test_add_itself_to_mailbox_listeners(self): + self.account.mailboxes = ['INBOX'] + mailbox = mock() + when(self.account).getMailbox('INBOX').thenReturn(mailbox) + mailbox.listeners = set() + when(mailbox).addListener = lambda x: mailbox.listeners.add(x) + + self.assertNotIn(MailboxIndexerListener('INBOX', self.querier), mailbox.listeners) + + MailboxIndexerListener.listen(self.account, 'INBOX', self.querier) + + self.assertIn(MailboxIndexerListener('INBOX', self.querier), mailbox.listeners) + + def test_reindex_missing_idents(self): + search_engine = mock() + when(search_engine).search('tag:inbox', all_mails=True).thenReturn(['ident1', 'ident2']) + + MailboxIndexerListener.SEARCH_ENGINE = search_engine + + listener = MailboxIndexerListener('INBOX', self.querier) + when(self.querier).idents_by_mailbox('INBOX').thenReturn({'ident1', 'ident2', 'missing_ident'}) + self.querier.used_arguments = [] + self.querier.mails = lambda x: self.querier.used_arguments.append(x) + listener.newMessages(10, 5) + + verify(self.querier, times=1).idents_by_mailbox('INBOX') + self.assertIn({'missing_ident'}, self.querier.used_arguments) diff --git a/service/test/unit/adapter/test_soledad_querier.py b/service/test/unit/adapter/test_soledad_querier.py new file mode 100644 index 00000000..2cc23750 --- /dev/null +++ b/service/test/unit/adapter/test_soledad_querier.py @@ -0,0 +1,106 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see . +import unittest +import json +import base64 +import quopri + +from pixelated.adapter.soledad.soledad_querier import SoledadQuerier +from mockito import mock, when, any +import os + + +class SoledadQuerierTest(unittest.TestCase): + + def test_extract_parts(self): + soledad = mock() + bdoc = mock() + bdoc.content = {'raw': 'esse papo seu ta qualquer coisa'} + when(soledad).get_from_index('by-type-and-payloadhash', 'cnt', any(unicode)).thenReturn([bdoc]) + multipart_attachment_file = os.path.join(os.path.dirname(__file__), '..', 'fixtures', 'multipart_attachment.json') + with open(multipart_attachment_file) as f: + hdoc = json.loads(f.read()) + querier = SoledadQuerier(soledad) + + parts = querier._extract_parts(hdoc) + + self.assertIn('alternatives', parts.keys()) + self.assertIn('attachments', parts.keys()) + self.assertEquals(2, len(parts['alternatives'])) + self.assertEquals(1, len(parts['attachments'])) + + self.check_alternatives(parts) + self.check_attachments(parts) + + def check_alternatives(self, parts): + for alternative in parts['alternatives']: + self.assertIn('headers', alternative) + self.assertIn('content', alternative) + + def check_attachments(self, parts): + for attachment in parts['attachments']: + self.assertIn('headers', attachment) + self.assertIn('ident', attachment) + self.assertIn('name', attachment) + + def test_extract_part_without_headers(self): + soledad = mock() + bdoc = mock() + bdoc.content = {'raw': 'esse papo seu ta qualquer coisa'} + when(soledad).get_from_index('by-type-and-payloadhash', 'cnt', any(unicode)).thenReturn([bdoc]) + hdoc = {'multi': True, 'part_map': {'1': {'multi': False, 'phash': u'0400BEBACAFE'}}} + querier = SoledadQuerier(soledad) + + parts = querier._extract_parts(hdoc) + + self.assertEquals(bdoc.content['raw'], parts['alternatives'][0]['content']) + + def test_extract_handles_missing_part_map(self): + soledad = mock() + hdoc = {u'multi': True, + u'ctype': u'message/delivery-status', + u'headers': [[u'Content-Description', u'Delivery report'], [u'Content-Type', u'message/delivery-status']], + u'parts': 2, + u'phash': None, + u'size': 554} + querier = SoledadQuerier(soledad) + + parts = querier._extract_parts(hdoc) + + self.assertEquals(0, len(parts['alternatives'])) + self.assertEquals(0, len(parts['attachments'])) + + def test_attachment_base64(self): + soledad = mock() + bdoc = mock() + bdoc.content = {'raw': base64.encodestring('esse papo seu ta qualquer coisa'), 'content-type': 'text/plain'} + when(soledad).get_from_index('by-type-and-payloadhash', 'cnt', any(unicode)).thenReturn([bdoc]) + querier = SoledadQuerier(soledad) + + attachment = querier.attachment(u'0400BEBACAFE', 'base64') + + self.assertEquals('esse papo seu ta qualquer coisa', attachment['content']) + + def test_attachment_quoted_printable(self): + soledad = mock() + bdoc = mock() + bdoc.content = {'raw': quopri.encodestring('esse papo seu ta qualquer coisa'), 'content-type': 'text/plain'} + when(soledad).get_from_index('by-type-and-payloadhash', 'cnt', any(unicode)).thenReturn([bdoc]) + querier = SoledadQuerier(soledad) + + attachment = querier.attachment(u'0400BEBACAFE', 'quoted-printable') + + self.assertEquals('esse papo seu ta qualquer coisa', attachment['content']) diff --git a/service/test/unit/bitmask_libraries/abstract_leap_test.py b/service/test/unit/bitmask_libraries/abstract_leap_test.py deleted file mode 100644 index 2634f330..00000000 --- a/service/test/unit/bitmask_libraries/abstract_leap_test.py +++ /dev/null @@ -1,42 +0,0 @@ -# -# Copyright (c) 2014 ThoughtWorks, Inc. -# -# Pixelated is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Pixelated is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with Pixelated. If not, see . -import tempfile -import unittest -from uuid import uuid4 - -import os -from mock import Mock, MagicMock - - -class AbstractLeapTest(unittest.TestCase): - uuid = str(uuid4()) - session_id = str(uuid4()) - token = str(uuid4()) - - leap_home = os.path.join(tempfile.mkdtemp(), 'leap') - - config = Mock(leap_home=leap_home, ca_cert_bundle='/some/path/to/ca_cert', gpg_binary='/path/to/gpg') - provider = Mock(config=config, server_name='some-server.test', domain='some-server.test', - api_uri='https://api.some-server.test:4430', api_version='1') - soledad = Mock() - soledad_session = Mock(soledad=soledad) - srp_session = Mock(user_name='test_user', api_server_name='some-server.test', uuid=uuid, session_id=session_id, token=token) - - nicknym = MagicMock() - - soledad_account = MagicMock() - - mail_fetcher_mock = MagicMock() diff --git a/service/test/unit/bitmask_libraries/certs_test.py b/service/test/unit/bitmask_libraries/certs_test.py deleted file mode 100644 index 8caafe7e..00000000 --- a/service/test/unit/bitmask_libraries/certs_test.py +++ /dev/null @@ -1,19 +0,0 @@ -import unittest - -from pixelated.bitmask_libraries.certs import which_bundle -from pixelated.bitmask_libraries.config import AUTO_DETECT_CA_BUNDLE -from mock import MagicMock, patch - - -class CertsTest(unittest.TestCase): - - @patch('pixelated.bitmask_libraries.certs.os.path.isfile') - def test_that_which_bundle_returns_byte_string(self, mock_isfile): - mock_isfile.return_value = True - config = MagicMock(ca_cert_bundle=AUTO_DETECT_CA_BUNDLE, certs_home='/some/path') - provider = MagicMock(server_name=u'test.leap.net', config=config) - - bundle = which_bundle(provider) - - self.assertEqual('/some/path/test.leap.net.ca.crt', bundle) - self.assertEqual(str, type(bundle)) diff --git a/service/test/unit/bitmask_libraries/leap_srp_test.py b/service/test/unit/bitmask_libraries/leap_srp_test.py deleted file mode 100644 index 6d067e5d..00000000 --- a/service/test/unit/bitmask_libraries/leap_srp_test.py +++ /dev/null @@ -1,157 +0,0 @@ -# -# Copyright (c) 2014 ThoughtWorks, Inc. -# -# Pixelated is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Pixelated is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with Pixelated. If not, see . -import json -import unittest -import binascii -from urlparse import parse_qs - -from httmock import urlmatch, all_requests, HTTMock, response -from requests.exceptions import Timeout -import srp -from pixelated.bitmask_libraries.leap_srp import LeapSecureRemotePassword, LeapAuthException - - -(salt_bytes, verification_key_bytes) = srp.create_salted_verification_key('username', 'password', hash_alg=srp.SHA256, ng_type=srp.NG_1024) -verifier = None - - -@all_requests -def not_found_mock(url, request): - return {'status_code': 404, - 'content': 'foobar'} - - -@all_requests -def timeout_mock(url, request): - raise Timeout() - - -@urlmatch(netloc=r'(.*\.)?leap\.local$') -def srp_login_server_simulator_mock(url, request): - global verifier - - data = parse_qs(request.body) - if 'login' in data: - # SRP Authentication Step 1 - A = binascii.unhexlify(data.get('A')[0]) - - verifier = srp.Verifier('username', salt_bytes, verification_key_bytes, A, hash_alg=srp.SHA256, ng_type=srp.NG_1024) - (salt, B) = verifier.get_challenge() - - content = { - 'salt': binascii.hexlify(salt), - 'B': binascii.hexlify(B) - } - - return {'status_code': 200, - 'content': json.dumps(content)} - - else: - # SRP Authentication Step 2 - data = parse_qs(request.body) - client_auth = binascii.unhexlify(data.get('client_auth')[0]) - - M2 = verifier.verify_session(client_auth) - - if not verifier.authenticated(): - return {'status_code': 404, - 'content': ''} - - content = { - 'M2': binascii.hexlify(M2), - 'id': 'some id', - 'token': 'some token' - } - headers = { - 'Content-Type': 'application/json', - 'Set-Cookie': '_session_id=some_session_id;'} - return response(200, content, headers, None, 5, request) - - -class LeapSRPTest(unittest.TestCase): - - def test_status_code_is_checked(self): - with HTTMock(not_found_mock): - lsrp = LeapSecureRemotePassword() - self.assertRaises(LeapAuthException, lsrp.authenticate, 'https://api.leap.local', 'username', 'password') - - def test_invalid_username(self): - with HTTMock(srp_login_server_simulator_mock): - lsrp = LeapSecureRemotePassword() - self.assertRaises(LeapAuthException, lsrp.authenticate, 'https://api.leap.local', 'invalid_user', 'password') - - def test_invalid_password(self): - with HTTMock(srp_login_server_simulator_mock): - lsrp = LeapSecureRemotePassword() - self.assertRaises(LeapAuthException, lsrp.authenticate, 'https://api.leap.local', 'username', 'invalid') - - def test_login(self): - with HTTMock(srp_login_server_simulator_mock): - lsrp = LeapSecureRemotePassword() - leap_session = lsrp.authenticate('https://api.leap.local', 'username', 'password') - - self.assertIsNotNone(leap_session) - self.assertEqual('username', leap_session.user_name) - self.assertEqual('1', leap_session.api_version) - self.assertEqual('https://api.leap.local', leap_session.api_server_name) - self.assertEqual('some token', leap_session.token) - self.assertEqual('some_session_id', leap_session.session_id) - - def test_timeout(self): - with HTTMock(timeout_mock): - lrsp = LeapSecureRemotePassword() - self.assertRaises(LeapAuthException, lrsp.authenticate, 'https://api.leap.local', 'username', 'password') - - def test_register_raises_auth_exception_on_error(self): - with HTTMock(not_found_mock): - lsrp = LeapSecureRemotePassword() - self.assertRaises(LeapAuthException, lsrp.register, 'https://api.leap.local', 'username', 'password') - - def test_register(self): - @urlmatch(netloc=r'(.*\.)?leap\.local$', path='/1/users') - def register_success(url, request): - - content = { - 'login': 'username', - 'ok': True - } - - return {'status_code': 201, - 'content': content} - - with HTTMock(register_success, not_found_mock): - lsrp = LeapSecureRemotePassword() - self.assertTrue(lsrp.register('https://api.leap.local', 'username', 'password')) - - def test_register_user_exists(self): - @urlmatch(netloc=r'(.*\.)?leap\.local$', path='/1/users') - def register_error_user_exists(url, request): - content = {"errors": { - "login": [ - "has already been taken", "has already been taken", "has already been taken" - ]}} - - return {'status_code': 422, - 'content': content} - - with HTTMock(register_error_user_exists, not_found_mock): - lsrp = LeapSecureRemotePassword() - self.assertRaises(LeapAuthException, lsrp.register, 'https://api.leap.local', 'username', 'password') - - def test_registration_timeout(self): - with HTTMock(timeout_mock): - lsrp = LeapSecureRemotePassword() - self.assertRaises(LeapAuthException, lsrp.register, 'https://api.leap.local', 'username', 'password') diff --git a/service/test/unit/bitmask_libraries/nicknym_test.py b/service/test/unit/bitmask_libraries/nicknym_test.py deleted file mode 100644 index 9d564abe..00000000 --- a/service/test/unit/bitmask_libraries/nicknym_test.py +++ /dev/null @@ -1,48 +0,0 @@ -# -# Copyright (c) 2014 ThoughtWorks, Inc. -# -# Pixelated is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Pixelated is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with Pixelated. If not, see . -from mock import patch - -from leap.keymanager import openpgp, KeyNotFound -from pixelated.bitmask_libraries.nicknym import NickNym -from abstract_leap_test import AbstractLeapTest - - -class NickNymTest(AbstractLeapTest): - @patch('pixelated.bitmask_libraries.nicknym.KeyManager.__init__', return_value=None) - def test_that_keymanager_is_created(self, init_mock): - # given - - # when - NickNym(self.provider, self.config, self.soledad_session, self.srp_session) - - # then - init_mock.assert_called_with('test_user@some-server.test', 'https://nicknym.some-server.test:6425/', - self.soledad, self.token, '/some/path/to/ca_cert', - 'https://api.some-server.test:4430', '1', self.uuid, - '/path/to/gpg') - - @patch('pixelated.bitmask_libraries.nicknym.KeyManager') - def test_gen_key(self, keymanager_mock): - # given - keyman = keymanager_mock.return_value - keyman.get_key.side_effect = KeyNotFound - nicknym = NickNym(self.provider, self.config, self.soledad_session, self.srp_session) - - # when/then - nicknym.generate_openpgp_key() - - keyman.get_key.assert_called_with('test_user@some-server.test', openpgp.OpenPGPKey, fetch_remote=False, private=True) - keyman.gen_key.assert_called_with(openpgp.OpenPGPKey) diff --git a/service/test/unit/bitmask_libraries/provider_test.py b/service/test/unit/bitmask_libraries/provider_test.py deleted file mode 100644 index dd57afa0..00000000 --- a/service/test/unit/bitmask_libraries/provider_test.py +++ /dev/null @@ -1,185 +0,0 @@ -# -# Copyright (c) 2014 ThoughtWorks, Inc. -# -# Pixelated is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Pixelated is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with Pixelated. If not, see . -import json - -from httmock import all_requests, HTTMock, urlmatch -from requests import HTTPError -from pixelated.bitmask_libraries.config import LeapConfig -from pixelated.bitmask_libraries.provider import LeapProvider -from abstract_leap_test import AbstractLeapTest - - -@all_requests -def not_found_mock(url, request): - return {'status_code': 404, - 'content': 'foobar'} - - -@urlmatch(netloc=r'(.*\.)?some-provider\.test$', path='/provider.json') -def provider_json_mock(url, request): - return provider_json_response("SHA256: 06e2300bdbc118c290eda0dc977c24080718f4eeca68c8b0ad431872a2baa22d") - - -@urlmatch(netloc=r'(.*\.)?some-provider\.test$', path='/provider.json') -def provider_json_invalid_fingerprint_mock(url, request): - return provider_json_response("SHA256: 0123456789012345678901234567890123456789012345678901234567890123") - - -def provider_json_response(fingerprint): - content = { - "api_uri": "https://api.some-provider.test:4430", - "api_version": "1", - "ca_cert_fingerprint": fingerprint, - "ca_cert_uri": "https://some-provider.test/ca.crt", - "domain": "some-provider.test", - "services": [ - "mx" - ] - } - return { - "status_code": 200, - "content": json.dumps(content) - } - - -@urlmatch(netloc=r'api\.some-provider\.test:4430$', path='/1/config/soledad-service.json') -def soledad_json_mock(url, request): - content = { - "some key": "some value", - } - return { - "status_code": 200, - "content": json.dumps(content) - } - - -@urlmatch(netloc=r'api\.some-provider\.test:4430$', path='/1/config/smtp-service.json') -def smtp_json_mock(url, request): - content = { - "hosts": { - "leap-mx": { - "hostname": "mx.some-provider.test", - "ip_address": "0.0.0.0", - "port": 465 - } - }, - "locations": {}, - "serial": 1, - "version": 1 - } - return { - "status_code": 200, - "content": json.dumps(content) - } - - -@urlmatch(netloc=r'(.*\.)?some-provider\.test$', path='/ca.crt') -def ca_cert_mock(url, request): - return { - "status_code": 200, - "content": ca_crt - } - - -ca_crt = """ ------BEGIN CERTIFICATE----- -MIIFbzCCA1egAwIBAgIBATANBgkqhkiG9w0BAQ0FADBKMREwDwYDVQQKDAhXYXpv -a2F6aTEaMBgGA1UECwwRaHR0cHM6Ly9kZmkubG9jYWwxGTAXBgNVBAMMEFdhem9r -YXppIFJvb3QgQ0EwHhcNMTQwMzI1MDAwMDAwWhcNMjQwMzI1MDAwMDAwWjBKMREw -DwYDVQQKDAhXYXpva2F6aTEaMBgGA1UECwwRaHR0cHM6Ly9kZmkubG9jYWwxGTAX -BgNVBAMMEFdhem9rYXppIFJvb3QgQ0EwggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAw -ggIKAoICAQDSPyaslC6SNVsKpGoXllInPXbjiq7rJaV08Xg+64FJU/257BZZEJ/j -r33r0xlt2kj85PcbPySLKy0omXAQt9bs273hwAQXExdY41FxMD3wP/dmLqd55KYa -LDV4GUw0QPZ0QUyWVrRHkrdCDyjpRG+6GbowmtygJKLflYmUFC1PYQ3492esr0jC -+Q6L6+/D2+hBiH3NPI22Yk0kQmuPfnu2pvo+EYQ3It81qZE0Jo8u/BqOMgN2f9DS -GvSNfZcKAP18A41/VRrYFa/WUcdDxt/uP5nO1dm2vfLorje3wcMGtGRcDKG/+GAm -S0nYKKQeWYc6z5SDvPM1VlNdn1gOejhAoggT3Hr5Dq8kxW/lQZbOz+HLbz15qGjz -gL4KHKuDE6hOuqxpHdMTY4WZBBQ8/6ICBxaXH9587/nNDdZiom+XukVD4mrSMJS7 -PRr14Hw57433AJDJcZRwZNRRAGgDPNsCoR2caKB6/Uwkp+dWVndj5Ad8MEjyM1yV -+fYU6PSQWNig7qqN5VhNY+zUCcez5gL6volMuW00iOkXISW4lBrcZmEAQTTcWT1D -U7EkLlwITQce63LcuvK7ZWsEm5XCqD+yUz9oQfugmIhxAlTdqt3De9FA0WT9WxGt -zLeswCNKjnMpRgTerq6elwB03EBJVc7k1QRn4+s6C30sXR12dYnEMwIDAQABo2Aw -XjAdBgNVHQ4EFgQU8ItSdI5pSqMDjgRjgYI3Nj0SwxQwDgYDVR0PAQH/BAQDAgIE -MAwGA1UdEwQFMAMBAf8wHwYDVR0jBBgwFoAU8ItSdI5pSqMDjgRjgYI3Nj0SwxQw -DQYJKoZIhvcNAQENBQADggIBALdSPUrIqyIlSMr4R7pWd6Ep0BZH5RztVUcoXtei -x2MFi/rsw7aL9qZqACYIE8Gkkh6Z6GQph0fIqhAlNFvJXKkguL3ri5xh0XmPfbv/ -OLIvaUAixATivdm8ro/IqYQWdL3P6mDZOv4O6POdBEJ9JLc9RXUt1LiQ5Xb9QiLs -l/yOthhp5dJHqC8s6CDEUHRe3s9Q/4cwNB4td47I+mkLsNtVNXqi4lOzuQamqiFt -cFIqOLTFtBJ7G3k9iaDuN6RPS6LMRbqabwg4gafQTmJ+roHpnsaiHkfomI4MZOVi -TLQKOAJ3/pRGm5cGzkzQ+z4sUiCSQxtIWs7EnQCCE8agqpef6zArAvKEO+139+f2 -u1BhWOm/aHT5a3INnJEbuFr8V9MlbZSxSzU3UH7hby+9PxWKYesc6KUAu6Icooci -gEQqrVhVKmfaYMLL7UZHhw56yv/6B10SSmeAMiJhtTExjjrTRLSCaKCPa2ISAUDB -aPR3t8ZoUESWRAFQGj5NvWOomTaXfyE8Or2WfNemvdlWsKvlLeVsjts+iaTgQRU9 -VXcrUhrHhaXhYXeWrWkDDcl8VUlDWXzoUGV9SczOGwr6hONJWMn1HNxNV7ywFWf0 -QXH1g3LBW7qNgRaGhbIX4a1WoNQDmbbKaLgKWs74atZ8o4A2aUEjomclgZWPsc5l -VeJ6 ------END CERTIFICATE----- -""" - - -class LeapProviderTest(AbstractLeapTest): - def setUp(self): - self.config = LeapConfig(verify_ssl=False, leap_home='/tmp/foobar', ca_cert_bundle='/tmp/ca.crt') - - def test_provider_fetches_provider_json(self): - with HTTMock(provider_json_mock): - provider = LeapProvider('some-provider.test', self.config) - - self.assertEqual("1", provider.api_version) - self.assertEqual("some-provider.test", provider.domain) - self.assertEqual("https://api.some-provider.test:4430", provider.api_uri) - self.assertEqual("https://some-provider.test/ca.crt", provider.ca_cert_uri) - self.assertEqual("SHA256: 06e2300bdbc118c290eda0dc977c24080718f4eeca68c8b0ad431872a2baa22d", - provider.ca_cert_fingerprint) - self.assertEqual(["mx"], provider.services) - - def test_provider_json_throws_exception_on_status_code(self): - with HTTMock(not_found_mock): - self.assertRaises(HTTPError, LeapProvider, 'some-provider.test', self.config) - - def test_fetch_soledad_json(self): - with HTTMock(provider_json_mock, soledad_json_mock, not_found_mock): - provider = LeapProvider('some-provider.test', self.config) - soledad = provider.fetch_soledad_json() - - self.assertEqual("some value", soledad.get('some key')) - - def test_throw_exception_for_fetch_soledad_status_code(self): - with HTTMock(provider_json_mock, not_found_mock): - provider = LeapProvider('some-provider.test', self.config) - - self.assertRaises(HTTPError, provider.fetch_soledad_json) - - def test_fetch_smtp_json(self): - with HTTMock(provider_json_mock, smtp_json_mock, not_found_mock): - provider = LeapProvider('some-provider.test', self.config) - smtp = provider.fetch_smtp_json() - self.assertEqual('mx.some-provider.test', smtp.get('hosts').get('leap-mx').get('hostname')) - - def test_throw_exception_for_fetch_smtp_status_code(self): - with HTTMock(provider_json_mock, not_found_mock): - provider = LeapProvider('some-provider.test', self.config) - self.assertRaises(HTTPError, provider.fetch_smtp_json) - - def test_fetch_valid_certificate(self): - with HTTMock(provider_json_mock, ca_cert_mock, not_found_mock): - provider = LeapProvider('some-provider.test', self.config) - provider.fetch_valid_certificate() - - def test_throw_exception_for_invalid_certificate(self): - with HTTMock(provider_json_invalid_fingerprint_mock, ca_cert_mock, not_found_mock): - provider = LeapProvider('some-provider.test', self.config) - self.assertRaises(Exception, provider.fetch_valid_certificate) diff --git a/service/test/unit/bitmask_libraries/session_test.py b/service/test/unit/bitmask_libraries/session_test.py deleted file mode 100644 index 0aa7861a..00000000 --- a/service/test/unit/bitmask_libraries/session_test.py +++ /dev/null @@ -1,72 +0,0 @@ -# -# Copyright (c) 2014 ThoughtWorks, Inc. -# -# Pixelated is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Pixelated is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with Pixelated. If not, see . -from mock import patch -from mock import MagicMock - -from pixelated.bitmask_libraries.session import LeapSession -from abstract_leap_test import AbstractLeapTest - - -class SessionTest(AbstractLeapTest): - - def setUp(self): - self.mail_fetcher_mock = MagicMock() - - def tearDown(self): - self.mail_fetcher_mock = MagicMock() - - def test_background_jobs_are_started(self): - self.config.start_background_jobs = True - - with patch('pixelated.bitmask_libraries.session.reactor.callFromThread', new=_execute_func) as _: - self._create_session() - - self.mail_fetcher_mock.start_loop.assert_called_once_with() - - def test_background_jobs_are_not_started(self): - self.config.start_background_jobs = False - - with patch('pixelated.bitmask_libraries.session.reactor.callFromThread', new=_execute_func) as _: - self._create_session() - - self.assertFalse(self.mail_fetcher_mock.start_loop.called) - - def test_that_close_stops_background_jobs(self): - with patch('pixelated.bitmask_libraries.session.reactor.callFromThread', new=_execute_func) as _: - session = self._create_session() - - session.close() - - self.mail_fetcher_mock.stop.assert_called_once_with() - - def test_that_sync_deferes_to_soledad(self): - session = self._create_session() - - session.sync() - - self.soledad_session.sync.assert_called_once_with() - - def test_account_email(self): - session = self._create_session() - self.assertEqual('test_user@some-server.test', session.account_email()) - - def _create_session(self): - return LeapSession(self.provider, self.srp_session, self.soledad_session, self.nicknym, self.soledad_account, - self.mail_fetcher_mock) - - -def _execute_func(func): - func() diff --git a/service/test/unit/bitmask_libraries/smtp_test.py b/service/test/unit/bitmask_libraries/smtp_test.py deleted file mode 100644 index b00a0af6..00000000 --- a/service/test/unit/bitmask_libraries/smtp_test.py +++ /dev/null @@ -1,98 +0,0 @@ -# -# Copyright (c) 2014 ThoughtWorks, Inc. -# -# Pixelated is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Pixelated is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with Pixelated. If not, see . -import sys - -import os -from mock import MagicMock, patch -from abstract_leap_test import AbstractLeapTest -from pixelated.bitmask_libraries.smtp import LeapSmtp -from httmock import all_requests, HTTMock, urlmatch - - -@all_requests -def not_found_mock(url, request): - sys.stderr.write('url=%s\n' % url.netloc) - sys.stderr.write('path=%s\n' % url.path) - return {'status_code': 404, - 'content': 'foobar'} - - -@urlmatch(netloc='api.some-server.test:4430', path='/1/cert') -def ca_cert_mock(url, request): - return { - "status_code": 200, - "content": "some content" - } - - -class LeapSmtpTest(AbstractLeapTest): - keymanager = MagicMock() - - def setUp(self): - self.provider.fetch_smtp_json.return_value = { - 'hosts': { - 'leap-mx': { - 'hostname': 'smtp.some-sever.test', - 'port': '1234' - } - } - } - self.config.timeout_in_s = 15 - - def test_that_client_cert_gets_downloaded(self): - smtp = LeapSmtp(self.provider, self.keymanager, self.srp_session) - - with HTTMock(ca_cert_mock, not_found_mock): - smtp._download_client_certificates() - - path = self._client_cert_path() - self.assertTrue(os.path.isfile(path)) - - def _client_cert_path(self): - return os.path.join(self.leap_home, 'providers', 'some-server.test', 'keys', 'client', 'smtp.pem') - - @patch('pixelated.bitmask_libraries.smtp.setup_smtp_gateway') - def test_that_start_calls_setup_smtp_gateway(self, gateway_mock): - smtp = LeapSmtp(self.provider, self.keymanager, self.srp_session) - port = 500 - smtp.TWISTED_PORT = port - gateway_mock.return_value = (None, None) - with HTTMock(ca_cert_mock, not_found_mock): - smtp.start() - - cert_path = self._client_cert_path() - gateway_mock.assert_called_with(keymanager=self.keymanager, smtp_cert=cert_path, smtp_key=cert_path, userid='test_user@some-server.test', smtp_port='1234', encrypted_only=False, smtp_host='smtp.some-sever.test', port=port) - - def test_that_client_stop_does_nothing_if_not_started(self): - smtp = LeapSmtp(self.provider, self.keymanager, self.srp_session) - - with HTTMock(not_found_mock): - smtp.stop() - - @patch('pixelated.bitmask_libraries.smtp.setup_smtp_gateway') - def test_that_running_smtp_sevice_is_stopped(self, gateway_mock): - smtp = LeapSmtp(self.provider, self.keymanager, self.srp_session) - - smtp_service = MagicMock() - smtp_port = MagicMock() - gateway_mock.return_value = (smtp_service, smtp_port) - - with HTTMock(ca_cert_mock, not_found_mock): - smtp.start() - smtp.stop() - - smtp_port.stopListening.assert_called_with() - smtp_service.doStop.assert_called_with() diff --git a/service/test/unit/bitmask_libraries/soledad_test.py b/service/test/unit/bitmask_libraries/soledad_test.py deleted file mode 100644 index c8b45710..00000000 --- a/service/test/unit/bitmask_libraries/soledad_test.py +++ /dev/null @@ -1,69 +0,0 @@ -# -# Copyright (c) 2014 ThoughtWorks, Inc. -# -# Pixelated is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Pixelated is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with Pixelated. If not, see . -from mock import patch -from pixelated.bitmask_libraries.soledad import SoledadSession -from abstract_leap_test import AbstractLeapTest - - -@patch('pixelated.bitmask_libraries.soledad.Soledad') -class SoledadSessionTest(AbstractLeapTest): - - def setUp(self): - # given - self.provider.fetch_soledad_json.return_value = {'hosts': { - 'couch1': { - 'hostname': 'couch1.some-server.test', - 'ip_address': '192.168.1.1', - 'port': 1234 - } - }} - - @patch('pixelated.bitmask_libraries.soledad.Soledad.__init__') - def test_that_soledad_is_created_with_required_params(self, soledad_mock, init_mock): - # when - SoledadSession(self.provider, 'any-passphrase', self.srp_session) - - # then - init_mock.assert_called_with(self.uuid, 'any-passphrase', '%s/soledad/%s.secret' % (self.leap_home, self.uuid), - '%s/soledad/%s.db' % (self.leap_home, self.uuid), - 'https://couch1.some-server.test:1234/user-%s' % self.uuid, - '/some/path/to/ca_cert', self.token, defer_encryption=False) - - def test_that_sync_is_called(self, soledad_mock): - instance = soledad_mock.return_value - instance.server_url = '/foo/bar' - instance.need_sync.return_value = True - soledad_session = SoledadSession(self.provider, 'any-passphrase', self.srp_session) - - # when - soledad_session.sync() - - # then - instance.need_sync.assert_called_with('/foo/bar') - instance.sync.assert_called_with() - - def test_that_sync_not_called_if_not_needed(self, mock): - instance = mock.return_value - instance.server_url = '/foo/bar' - instance.need_sync.return_value = False - soledad_session = SoledadSession(self.provider, 'any-passphrase', self.srp_session) - - # when - soledad_session.sync() - - # then - instance.need_sync.assert_called_with('/foo/bar') - self.assertFalse(instance.sync.called) diff --git a/service/test/unit/bitmask_libraries/test_abstract_leap.py b/service/test/unit/bitmask_libraries/test_abstract_leap.py new file mode 100644 index 00000000..2634f330 --- /dev/null +++ b/service/test/unit/bitmask_libraries/test_abstract_leap.py @@ -0,0 +1,42 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see . +import tempfile +import unittest +from uuid import uuid4 + +import os +from mock import Mock, MagicMock + + +class AbstractLeapTest(unittest.TestCase): + uuid = str(uuid4()) + session_id = str(uuid4()) + token = str(uuid4()) + + leap_home = os.path.join(tempfile.mkdtemp(), 'leap') + + config = Mock(leap_home=leap_home, ca_cert_bundle='/some/path/to/ca_cert', gpg_binary='/path/to/gpg') + provider = Mock(config=config, server_name='some-server.test', domain='some-server.test', + api_uri='https://api.some-server.test:4430', api_version='1') + soledad = Mock() + soledad_session = Mock(soledad=soledad) + srp_session = Mock(user_name='test_user', api_server_name='some-server.test', uuid=uuid, session_id=session_id, token=token) + + nicknym = MagicMock() + + soledad_account = MagicMock() + + mail_fetcher_mock = MagicMock() diff --git a/service/test/unit/bitmask_libraries/test_certs.py b/service/test/unit/bitmask_libraries/test_certs.py new file mode 100644 index 00000000..8caafe7e --- /dev/null +++ b/service/test/unit/bitmask_libraries/test_certs.py @@ -0,0 +1,19 @@ +import unittest + +from pixelated.bitmask_libraries.certs import which_bundle +from pixelated.bitmask_libraries.config import AUTO_DETECT_CA_BUNDLE +from mock import MagicMock, patch + + +class CertsTest(unittest.TestCase): + + @patch('pixelated.bitmask_libraries.certs.os.path.isfile') + def test_that_which_bundle_returns_byte_string(self, mock_isfile): + mock_isfile.return_value = True + config = MagicMock(ca_cert_bundle=AUTO_DETECT_CA_BUNDLE, certs_home='/some/path') + provider = MagicMock(server_name=u'test.leap.net', config=config) + + bundle = which_bundle(provider) + + self.assertEqual('/some/path/test.leap.net.ca.crt', bundle) + self.assertEqual(str, type(bundle)) diff --git a/service/test/unit/bitmask_libraries/test_leap_srp.py b/service/test/unit/bitmask_libraries/test_leap_srp.py new file mode 100644 index 00000000..6d067e5d --- /dev/null +++ b/service/test/unit/bitmask_libraries/test_leap_srp.py @@ -0,0 +1,157 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see . +import json +import unittest +import binascii +from urlparse import parse_qs + +from httmock import urlmatch, all_requests, HTTMock, response +from requests.exceptions import Timeout +import srp +from pixelated.bitmask_libraries.leap_srp import LeapSecureRemotePassword, LeapAuthException + + +(salt_bytes, verification_key_bytes) = srp.create_salted_verification_key('username', 'password', hash_alg=srp.SHA256, ng_type=srp.NG_1024) +verifier = None + + +@all_requests +def not_found_mock(url, request): + return {'status_code': 404, + 'content': 'foobar'} + + +@all_requests +def timeout_mock(url, request): + raise Timeout() + + +@urlmatch(netloc=r'(.*\.)?leap\.local$') +def srp_login_server_simulator_mock(url, request): + global verifier + + data = parse_qs(request.body) + if 'login' in data: + # SRP Authentication Step 1 + A = binascii.unhexlify(data.get('A')[0]) + + verifier = srp.Verifier('username', salt_bytes, verification_key_bytes, A, hash_alg=srp.SHA256, ng_type=srp.NG_1024) + (salt, B) = verifier.get_challenge() + + content = { + 'salt': binascii.hexlify(salt), + 'B': binascii.hexlify(B) + } + + return {'status_code': 200, + 'content': json.dumps(content)} + + else: + # SRP Authentication Step 2 + data = parse_qs(request.body) + client_auth = binascii.unhexlify(data.get('client_auth')[0]) + + M2 = verifier.verify_session(client_auth) + + if not verifier.authenticated(): + return {'status_code': 404, + 'content': ''} + + content = { + 'M2': binascii.hexlify(M2), + 'id': 'some id', + 'token': 'some token' + } + headers = { + 'Content-Type': 'application/json', + 'Set-Cookie': '_session_id=some_session_id;'} + return response(200, content, headers, None, 5, request) + + +class LeapSRPTest(unittest.TestCase): + + def test_status_code_is_checked(self): + with HTTMock(not_found_mock): + lsrp = LeapSecureRemotePassword() + self.assertRaises(LeapAuthException, lsrp.authenticate, 'https://api.leap.local', 'username', 'password') + + def test_invalid_username(self): + with HTTMock(srp_login_server_simulator_mock): + lsrp = LeapSecureRemotePassword() + self.assertRaises(LeapAuthException, lsrp.authenticate, 'https://api.leap.local', 'invalid_user', 'password') + + def test_invalid_password(self): + with HTTMock(srp_login_server_simulator_mock): + lsrp = LeapSecureRemotePassword() + self.assertRaises(LeapAuthException, lsrp.authenticate, 'https://api.leap.local', 'username', 'invalid') + + def test_login(self): + with HTTMock(srp_login_server_simulator_mock): + lsrp = LeapSecureRemotePassword() + leap_session = lsrp.authenticate('https://api.leap.local', 'username', 'password') + + self.assertIsNotNone(leap_session) + self.assertEqual('username', leap_session.user_name) + self.assertEqual('1', leap_session.api_version) + self.assertEqual('https://api.leap.local', leap_session.api_server_name) + self.assertEqual('some token', leap_session.token) + self.assertEqual('some_session_id', leap_session.session_id) + + def test_timeout(self): + with HTTMock(timeout_mock): + lrsp = LeapSecureRemotePassword() + self.assertRaises(LeapAuthException, lrsp.authenticate, 'https://api.leap.local', 'username', 'password') + + def test_register_raises_auth_exception_on_error(self): + with HTTMock(not_found_mock): + lsrp = LeapSecureRemotePassword() + self.assertRaises(LeapAuthException, lsrp.register, 'https://api.leap.local', 'username', 'password') + + def test_register(self): + @urlmatch(netloc=r'(.*\.)?leap\.local$', path='/1/users') + def register_success(url, request): + + content = { + 'login': 'username', + 'ok': True + } + + return {'status_code': 201, + 'content': content} + + with HTTMock(register_success, not_found_mock): + lsrp = LeapSecureRemotePassword() + self.assertTrue(lsrp.register('https://api.leap.local', 'username', 'password')) + + def test_register_user_exists(self): + @urlmatch(netloc=r'(.*\.)?leap\.local$', path='/1/users') + def register_error_user_exists(url, request): + content = {"errors": { + "login": [ + "has already been taken", "has already been taken", "has already been taken" + ]}} + + return {'status_code': 422, + 'content': content} + + with HTTMock(register_error_user_exists, not_found_mock): + lsrp = LeapSecureRemotePassword() + self.assertRaises(LeapAuthException, lsrp.register, 'https://api.leap.local', 'username', 'password') + + def test_registration_timeout(self): + with HTTMock(timeout_mock): + lsrp = LeapSecureRemotePassword() + self.assertRaises(LeapAuthException, lsrp.register, 'https://api.leap.local', 'username', 'password') diff --git a/service/test/unit/bitmask_libraries/test_nicknym.py b/service/test/unit/bitmask_libraries/test_nicknym.py new file mode 100644 index 00000000..9d564abe --- /dev/null +++ b/service/test/unit/bitmask_libraries/test_nicknym.py @@ -0,0 +1,48 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see . +from mock import patch + +from leap.keymanager import openpgp, KeyNotFound +from pixelated.bitmask_libraries.nicknym import NickNym +from abstract_leap_test import AbstractLeapTest + + +class NickNymTest(AbstractLeapTest): + @patch('pixelated.bitmask_libraries.nicknym.KeyManager.__init__', return_value=None) + def test_that_keymanager_is_created(self, init_mock): + # given + + # when + NickNym(self.provider, self.config, self.soledad_session, self.srp_session) + + # then + init_mock.assert_called_with('test_user@some-server.test', 'https://nicknym.some-server.test:6425/', + self.soledad, self.token, '/some/path/to/ca_cert', + 'https://api.some-server.test:4430', '1', self.uuid, + '/path/to/gpg') + + @patch('pixelated.bitmask_libraries.nicknym.KeyManager') + def test_gen_key(self, keymanager_mock): + # given + keyman = keymanager_mock.return_value + keyman.get_key.side_effect = KeyNotFound + nicknym = NickNym(self.provider, self.config, self.soledad_session, self.srp_session) + + # when/then + nicknym.generate_openpgp_key() + + keyman.get_key.assert_called_with('test_user@some-server.test', openpgp.OpenPGPKey, fetch_remote=False, private=True) + keyman.gen_key.assert_called_with(openpgp.OpenPGPKey) diff --git a/service/test/unit/bitmask_libraries/test_provider.py b/service/test/unit/bitmask_libraries/test_provider.py new file mode 100644 index 00000000..dd57afa0 --- /dev/null +++ b/service/test/unit/bitmask_libraries/test_provider.py @@ -0,0 +1,185 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see . +import json + +from httmock import all_requests, HTTMock, urlmatch +from requests import HTTPError +from pixelated.bitmask_libraries.config import LeapConfig +from pixelated.bitmask_libraries.provider import LeapProvider +from abstract_leap_test import AbstractLeapTest + + +@all_requests +def not_found_mock(url, request): + return {'status_code': 404, + 'content': 'foobar'} + + +@urlmatch(netloc=r'(.*\.)?some-provider\.test$', path='/provider.json') +def provider_json_mock(url, request): + return provider_json_response("SHA256: 06e2300bdbc118c290eda0dc977c24080718f4eeca68c8b0ad431872a2baa22d") + + +@urlmatch(netloc=r'(.*\.)?some-provider\.test$', path='/provider.json') +def provider_json_invalid_fingerprint_mock(url, request): + return provider_json_response("SHA256: 0123456789012345678901234567890123456789012345678901234567890123") + + +def provider_json_response(fingerprint): + content = { + "api_uri": "https://api.some-provider.test:4430", + "api_version": "1", + "ca_cert_fingerprint": fingerprint, + "ca_cert_uri": "https://some-provider.test/ca.crt", + "domain": "some-provider.test", + "services": [ + "mx" + ] + } + return { + "status_code": 200, + "content": json.dumps(content) + } + + +@urlmatch(netloc=r'api\.some-provider\.test:4430$', path='/1/config/soledad-service.json') +def soledad_json_mock(url, request): + content = { + "some key": "some value", + } + return { + "status_code": 200, + "content": json.dumps(content) + } + + +@urlmatch(netloc=r'api\.some-provider\.test:4430$', path='/1/config/smtp-service.json') +def smtp_json_mock(url, request): + content = { + "hosts": { + "leap-mx": { + "hostname": "mx.some-provider.test", + "ip_address": "0.0.0.0", + "port": 465 + } + }, + "locations": {}, + "serial": 1, + "version": 1 + } + return { + "status_code": 200, + "content": json.dumps(content) + } + + +@urlmatch(netloc=r'(.*\.)?some-provider\.test$', path='/ca.crt') +def ca_cert_mock(url, request): + return { + "status_code": 200, + "content": ca_crt + } + + +ca_crt = """ +-----BEGIN CERTIFICATE----- +MIIFbzCCA1egAwIBAgIBATANBgkqhkiG9w0BAQ0FADBKMREwDwYDVQQKDAhXYXpv +a2F6aTEaMBgGA1UECwwRaHR0cHM6Ly9kZmkubG9jYWwxGTAXBgNVBAMMEFdhem9r +YXppIFJvb3QgQ0EwHhcNMTQwMzI1MDAwMDAwWhcNMjQwMzI1MDAwMDAwWjBKMREw +DwYDVQQKDAhXYXpva2F6aTEaMBgGA1UECwwRaHR0cHM6Ly9kZmkubG9jYWwxGTAX +BgNVBAMMEFdhem9rYXppIFJvb3QgQ0EwggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAw +ggIKAoICAQDSPyaslC6SNVsKpGoXllInPXbjiq7rJaV08Xg+64FJU/257BZZEJ/j +r33r0xlt2kj85PcbPySLKy0omXAQt9bs273hwAQXExdY41FxMD3wP/dmLqd55KYa +LDV4GUw0QPZ0QUyWVrRHkrdCDyjpRG+6GbowmtygJKLflYmUFC1PYQ3492esr0jC ++Q6L6+/D2+hBiH3NPI22Yk0kQmuPfnu2pvo+EYQ3It81qZE0Jo8u/BqOMgN2f9DS +GvSNfZcKAP18A41/VRrYFa/WUcdDxt/uP5nO1dm2vfLorje3wcMGtGRcDKG/+GAm +S0nYKKQeWYc6z5SDvPM1VlNdn1gOejhAoggT3Hr5Dq8kxW/lQZbOz+HLbz15qGjz +gL4KHKuDE6hOuqxpHdMTY4WZBBQ8/6ICBxaXH9587/nNDdZiom+XukVD4mrSMJS7 +PRr14Hw57433AJDJcZRwZNRRAGgDPNsCoR2caKB6/Uwkp+dWVndj5Ad8MEjyM1yV ++fYU6PSQWNig7qqN5VhNY+zUCcez5gL6volMuW00iOkXISW4lBrcZmEAQTTcWT1D +U7EkLlwITQce63LcuvK7ZWsEm5XCqD+yUz9oQfugmIhxAlTdqt3De9FA0WT9WxGt +zLeswCNKjnMpRgTerq6elwB03EBJVc7k1QRn4+s6C30sXR12dYnEMwIDAQABo2Aw +XjAdBgNVHQ4EFgQU8ItSdI5pSqMDjgRjgYI3Nj0SwxQwDgYDVR0PAQH/BAQDAgIE +MAwGA1UdEwQFMAMBAf8wHwYDVR0jBBgwFoAU8ItSdI5pSqMDjgRjgYI3Nj0SwxQw +DQYJKoZIhvcNAQENBQADggIBALdSPUrIqyIlSMr4R7pWd6Ep0BZH5RztVUcoXtei +x2MFi/rsw7aL9qZqACYIE8Gkkh6Z6GQph0fIqhAlNFvJXKkguL3ri5xh0XmPfbv/ +OLIvaUAixATivdm8ro/IqYQWdL3P6mDZOv4O6POdBEJ9JLc9RXUt1LiQ5Xb9QiLs +l/yOthhp5dJHqC8s6CDEUHRe3s9Q/4cwNB4td47I+mkLsNtVNXqi4lOzuQamqiFt +cFIqOLTFtBJ7G3k9iaDuN6RPS6LMRbqabwg4gafQTmJ+roHpnsaiHkfomI4MZOVi +TLQKOAJ3/pRGm5cGzkzQ+z4sUiCSQxtIWs7EnQCCE8agqpef6zArAvKEO+139+f2 +u1BhWOm/aHT5a3INnJEbuFr8V9MlbZSxSzU3UH7hby+9PxWKYesc6KUAu6Icooci +gEQqrVhVKmfaYMLL7UZHhw56yv/6B10SSmeAMiJhtTExjjrTRLSCaKCPa2ISAUDB +aPR3t8ZoUESWRAFQGj5NvWOomTaXfyE8Or2WfNemvdlWsKvlLeVsjts+iaTgQRU9 +VXcrUhrHhaXhYXeWrWkDDcl8VUlDWXzoUGV9SczOGwr6hONJWMn1HNxNV7ywFWf0 +QXH1g3LBW7qNgRaGhbIX4a1WoNQDmbbKaLgKWs74atZ8o4A2aUEjomclgZWPsc5l +VeJ6 +-----END CERTIFICATE----- +""" + + +class LeapProviderTest(AbstractLeapTest): + def setUp(self): + self.config = LeapConfig(verify_ssl=False, leap_home='/tmp/foobar', ca_cert_bundle='/tmp/ca.crt') + + def test_provider_fetches_provider_json(self): + with HTTMock(provider_json_mock): + provider = LeapProvider('some-provider.test', self.config) + + self.assertEqual("1", provider.api_version) + self.assertEqual("some-provider.test", provider.domain) + self.assertEqual("https://api.some-provider.test:4430", provider.api_uri) + self.assertEqual("https://some-provider.test/ca.crt", provider.ca_cert_uri) + self.assertEqual("SHA256: 06e2300bdbc118c290eda0dc977c24080718f4eeca68c8b0ad431872a2baa22d", + provider.ca_cert_fingerprint) + self.assertEqual(["mx"], provider.services) + + def test_provider_json_throws_exception_on_status_code(self): + with HTTMock(not_found_mock): + self.assertRaises(HTTPError, LeapProvider, 'some-provider.test', self.config) + + def test_fetch_soledad_json(self): + with HTTMock(provider_json_mock, soledad_json_mock, not_found_mock): + provider = LeapProvider('some-provider.test', self.config) + soledad = provider.fetch_soledad_json() + + self.assertEqual("some value", soledad.get('some key')) + + def test_throw_exception_for_fetch_soledad_status_code(self): + with HTTMock(provider_json_mock, not_found_mock): + provider = LeapProvider('some-provider.test', self.config) + + self.assertRaises(HTTPError, provider.fetch_soledad_json) + + def test_fetch_smtp_json(self): + with HTTMock(provider_json_mock, smtp_json_mock, not_found_mock): + provider = LeapProvider('some-provider.test', self.config) + smtp = provider.fetch_smtp_json() + self.assertEqual('mx.some-provider.test', smtp.get('hosts').get('leap-mx').get('hostname')) + + def test_throw_exception_for_fetch_smtp_status_code(self): + with HTTMock(provider_json_mock, not_found_mock): + provider = LeapProvider('some-provider.test', self.config) + self.assertRaises(HTTPError, provider.fetch_smtp_json) + + def test_fetch_valid_certificate(self): + with HTTMock(provider_json_mock, ca_cert_mock, not_found_mock): + provider = LeapProvider('some-provider.test', self.config) + provider.fetch_valid_certificate() + + def test_throw_exception_for_invalid_certificate(self): + with HTTMock(provider_json_invalid_fingerprint_mock, ca_cert_mock, not_found_mock): + provider = LeapProvider('some-provider.test', self.config) + self.assertRaises(Exception, provider.fetch_valid_certificate) diff --git a/service/test/unit/bitmask_libraries/test_session.py b/service/test/unit/bitmask_libraries/test_session.py new file mode 100644 index 00000000..0aa7861a --- /dev/null +++ b/service/test/unit/bitmask_libraries/test_session.py @@ -0,0 +1,72 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see . +from mock import patch +from mock import MagicMock + +from pixelated.bitmask_libraries.session import LeapSession +from abstract_leap_test import AbstractLeapTest + + +class SessionTest(AbstractLeapTest): + + def setUp(self): + self.mail_fetcher_mock = MagicMock() + + def tearDown(self): + self.mail_fetcher_mock = MagicMock() + + def test_background_jobs_are_started(self): + self.config.start_background_jobs = True + + with patch('pixelated.bitmask_libraries.session.reactor.callFromThread', new=_execute_func) as _: + self._create_session() + + self.mail_fetcher_mock.start_loop.assert_called_once_with() + + def test_background_jobs_are_not_started(self): + self.config.start_background_jobs = False + + with patch('pixelated.bitmask_libraries.session.reactor.callFromThread', new=_execute_func) as _: + self._create_session() + + self.assertFalse(self.mail_fetcher_mock.start_loop.called) + + def test_that_close_stops_background_jobs(self): + with patch('pixelated.bitmask_libraries.session.reactor.callFromThread', new=_execute_func) as _: + session = self._create_session() + + session.close() + + self.mail_fetcher_mock.stop.assert_called_once_with() + + def test_that_sync_deferes_to_soledad(self): + session = self._create_session() + + session.sync() + + self.soledad_session.sync.assert_called_once_with() + + def test_account_email(self): + session = self._create_session() + self.assertEqual('test_user@some-server.test', session.account_email()) + + def _create_session(self): + return LeapSession(self.provider, self.srp_session, self.soledad_session, self.nicknym, self.soledad_account, + self.mail_fetcher_mock) + + +def _execute_func(func): + func() diff --git a/service/test/unit/bitmask_libraries/test_smtp.py b/service/test/unit/bitmask_libraries/test_smtp.py new file mode 100644 index 00000000..b00a0af6 --- /dev/null +++ b/service/test/unit/bitmask_libraries/test_smtp.py @@ -0,0 +1,98 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see . +import sys + +import os +from mock import MagicMock, patch +from abstract_leap_test import AbstractLeapTest +from pixelated.bitmask_libraries.smtp import LeapSmtp +from httmock import all_requests, HTTMock, urlmatch + + +@all_requests +def not_found_mock(url, request): + sys.stderr.write('url=%s\n' % url.netloc) + sys.stderr.write('path=%s\n' % url.path) + return {'status_code': 404, + 'content': 'foobar'} + + +@urlmatch(netloc='api.some-server.test:4430', path='/1/cert') +def ca_cert_mock(url, request): + return { + "status_code": 200, + "content": "some content" + } + + +class LeapSmtpTest(AbstractLeapTest): + keymanager = MagicMock() + + def setUp(self): + self.provider.fetch_smtp_json.return_value = { + 'hosts': { + 'leap-mx': { + 'hostname': 'smtp.some-sever.test', + 'port': '1234' + } + } + } + self.config.timeout_in_s = 15 + + def test_that_client_cert_gets_downloaded(self): + smtp = LeapSmtp(self.provider, self.keymanager, self.srp_session) + + with HTTMock(ca_cert_mock, not_found_mock): + smtp._download_client_certificates() + + path = self._client_cert_path() + self.assertTrue(os.path.isfile(path)) + + def _client_cert_path(self): + return os.path.join(self.leap_home, 'providers', 'some-server.test', 'keys', 'client', 'smtp.pem') + + @patch('pixelated.bitmask_libraries.smtp.setup_smtp_gateway') + def test_that_start_calls_setup_smtp_gateway(self, gateway_mock): + smtp = LeapSmtp(self.provider, self.keymanager, self.srp_session) + port = 500 + smtp.TWISTED_PORT = port + gateway_mock.return_value = (None, None) + with HTTMock(ca_cert_mock, not_found_mock): + smtp.start() + + cert_path = self._client_cert_path() + gateway_mock.assert_called_with(keymanager=self.keymanager, smtp_cert=cert_path, smtp_key=cert_path, userid='test_user@some-server.test', smtp_port='1234', encrypted_only=False, smtp_host='smtp.some-sever.test', port=port) + + def test_that_client_stop_does_nothing_if_not_started(self): + smtp = LeapSmtp(self.provider, self.keymanager, self.srp_session) + + with HTTMock(not_found_mock): + smtp.stop() + + @patch('pixelated.bitmask_libraries.smtp.setup_smtp_gateway') + def test_that_running_smtp_sevice_is_stopped(self, gateway_mock): + smtp = LeapSmtp(self.provider, self.keymanager, self.srp_session) + + smtp_service = MagicMock() + smtp_port = MagicMock() + gateway_mock.return_value = (smtp_service, smtp_port) + + with HTTMock(ca_cert_mock, not_found_mock): + smtp.start() + smtp.stop() + + smtp_port.stopListening.assert_called_with() + smtp_service.doStop.assert_called_with() diff --git a/service/test/unit/bitmask_libraries/test_soledad.py b/service/test/unit/bitmask_libraries/test_soledad.py new file mode 100644 index 00000000..c8b45710 --- /dev/null +++ b/service/test/unit/bitmask_libraries/test_soledad.py @@ -0,0 +1,69 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see . +from mock import patch +from pixelated.bitmask_libraries.soledad import SoledadSession +from abstract_leap_test import AbstractLeapTest + + +@patch('pixelated.bitmask_libraries.soledad.Soledad') +class SoledadSessionTest(AbstractLeapTest): + + def setUp(self): + # given + self.provider.fetch_soledad_json.return_value = {'hosts': { + 'couch1': { + 'hostname': 'couch1.some-server.test', + 'ip_address': '192.168.1.1', + 'port': 1234 + } + }} + + @patch('pixelated.bitmask_libraries.soledad.Soledad.__init__') + def test_that_soledad_is_created_with_required_params(self, soledad_mock, init_mock): + # when + SoledadSession(self.provider, 'any-passphrase', self.srp_session) + + # then + init_mock.assert_called_with(self.uuid, 'any-passphrase', '%s/soledad/%s.secret' % (self.leap_home, self.uuid), + '%s/soledad/%s.db' % (self.leap_home, self.uuid), + 'https://couch1.some-server.test:1234/user-%s' % self.uuid, + '/some/path/to/ca_cert', self.token, defer_encryption=False) + + def test_that_sync_is_called(self, soledad_mock): + instance = soledad_mock.return_value + instance.server_url = '/foo/bar' + instance.need_sync.return_value = True + soledad_session = SoledadSession(self.provider, 'any-passphrase', self.srp_session) + + # when + soledad_session.sync() + + # then + instance.need_sync.assert_called_with('/foo/bar') + instance.sync.assert_called_with() + + def test_that_sync_not_called_if_not_needed(self, mock): + instance = mock.return_value + instance.server_url = '/foo/bar' + instance.need_sync.return_value = False + soledad_session = SoledadSession(self.provider, 'any-passphrase', self.srp_session) + + # when + soledad_session.sync() + + # then + instance.need_sync.assert_called_with('/foo/bar') + self.assertFalse(instance.sync.called) diff --git a/service/test/unit/config/app_factory_test.py b/service/test/unit/config/app_factory_test.py deleted file mode 100644 index a42c7c83..00000000 --- a/service/test/unit/config/app_factory_test.py +++ /dev/null @@ -1,32 +0,0 @@ -import unittest - -from mock import patch, MagicMock, ANY -import pixelated -from pixelated.config.app_factory import create_app - - -class AppFactoryTest(unittest.TestCase): - - class MockConfig: - def __init__(self, port, host, sslkey=None, sslcert=None): - self.port = port - self.host = host - self.sslkey = sslkey - self.sslcert = sslcert - - @patch('pixelated.config.app_factory.reactor') - def test_that_create_app_binds_to_tcp_port_if_no_ssl_options(self, reactor_mock): - app_mock = MagicMock() - - create_app(app_mock, AppFactoryTest.MockConfig(12345, '127.0.0.1')) - - reactor_mock.listenTCP.assert_called_once_with(12345, ANY, interface='127.0.0.1') - - @patch('pixelated.config.app_factory.reactor') - def test_that_create_app_binds_to_ssl_if_ssl_options(self, reactor_mock): - app_mock = MagicMock() - pixelated.config.app_factory._ssl_options = lambda _: 'options' - - create_app(app_mock, AppFactoryTest.MockConfig(12345, '127.0.0.1', sslkey="sslkey", sslcert="sslcert")) - - reactor_mock.listenSSL.assert_called_once_with(12345, ANY, 'options', interface='127.0.0.1') diff --git a/service/test/unit/config/test_app_factory.py b/service/test/unit/config/test_app_factory.py new file mode 100644 index 00000000..a42c7c83 --- /dev/null +++ b/service/test/unit/config/test_app_factory.py @@ -0,0 +1,32 @@ +import unittest + +from mock import patch, MagicMock, ANY +import pixelated +from pixelated.config.app_factory import create_app + + +class AppFactoryTest(unittest.TestCase): + + class MockConfig: + def __init__(self, port, host, sslkey=None, sslcert=None): + self.port = port + self.host = host + self.sslkey = sslkey + self.sslcert = sslcert + + @patch('pixelated.config.app_factory.reactor') + def test_that_create_app_binds_to_tcp_port_if_no_ssl_options(self, reactor_mock): + app_mock = MagicMock() + + create_app(app_mock, AppFactoryTest.MockConfig(12345, '127.0.0.1')) + + reactor_mock.listenTCP.assert_called_once_with(12345, ANY, interface='127.0.0.1') + + @patch('pixelated.config.app_factory.reactor') + def test_that_create_app_binds_to_ssl_if_ssl_options(self, reactor_mock): + app_mock = MagicMock() + pixelated.config.app_factory._ssl_options = lambda _: 'options' + + create_app(app_mock, AppFactoryTest.MockConfig(12345, '127.0.0.1', sslkey="sslkey", sslcert="sslcert")) + + reactor_mock.listenSSL.assert_called_once_with(12345, ANY, 'options', interface='127.0.0.1') diff --git a/service/test/unit/resources/sync_info_controller_test.py b/service/test/unit/resources/sync_info_controller_test.py deleted file mode 100644 index c24c7181..00000000 --- a/service/test/unit/resources/sync_info_controller_test.py +++ /dev/null @@ -1,54 +0,0 @@ -# -# Copyright (c) 2014 ThoughtWorks, Inc. -# -# Pixelated is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Pixelated is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with Pixelated. If not, see . -import unittest -import json - -from test.support.test_helper import request_mock -from pixelated.resources.sync_info_resource import SyncInfoResource -from mockito import * - - -class SyncInfoControllerTest(unittest.TestCase): - - def setUp(self): - self.dummy_request = request_mock() - self.controller = SyncInfoResource() - - def _set_count(self, current, total): - soledad_sync_data = mock() - soledad_sync_data.content = "%s/%s" % (current, total) - self.controller.set_sync_info(soledad_sync_data) - - def get_sync_info(self): - self.controller.render_GET(self.dummy_request) - return json.loads(self.dummy_request.written[0]) - - def test_is_not_syncing_if_total_is_equal_to_current(self): - self._set_count(total=0, current=0) - - sync_info = self.get_sync_info() - - self.assertFalse(sync_info['is_syncing']) - - def test_is_syncing_if_total_is_not_equal_to_current_and_adds_count(self): - self._set_count(total=10, current=5) - - sync_info = self.get_sync_info() - - self.assertTrue(sync_info['is_syncing']) - self.assertEquals(5, sync_info['count']['current']) - self.assertEquals(10, sync_info['count']['total']) - self.assertEquals(0.5, sync_info['count']['progress']) diff --git a/service/test/unit/resources/test_sync_info_controller.py b/service/test/unit/resources/test_sync_info_controller.py new file mode 100644 index 00000000..c24c7181 --- /dev/null +++ b/service/test/unit/resources/test_sync_info_controller.py @@ -0,0 +1,54 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see . +import unittest +import json + +from test.support.test_helper import request_mock +from pixelated.resources.sync_info_resource import SyncInfoResource +from mockito import * + + +class SyncInfoControllerTest(unittest.TestCase): + + def setUp(self): + self.dummy_request = request_mock() + self.controller = SyncInfoResource() + + def _set_count(self, current, total): + soledad_sync_data = mock() + soledad_sync_data.content = "%s/%s" % (current, total) + self.controller.set_sync_info(soledad_sync_data) + + def get_sync_info(self): + self.controller.render_GET(self.dummy_request) + return json.loads(self.dummy_request.written[0]) + + def test_is_not_syncing_if_total_is_equal_to_current(self): + self._set_count(total=0, current=0) + + sync_info = self.get_sync_info() + + self.assertFalse(sync_info['is_syncing']) + + def test_is_syncing_if_total_is_not_equal_to_current_and_adds_count(self): + self._set_count(total=10, current=5) + + sync_info = self.get_sync_info() + + self.assertTrue(sync_info['is_syncing']) + self.assertEquals(5, sync_info['count']['current']) + self.assertEquals(10, sync_info['count']['total']) + self.assertEquals(0.5, sync_info['count']['progress']) diff --git a/service/test/unit/runserver_test.py b/service/test/unit/runserver_test.py deleted file mode 100644 index 99b502f1..00000000 --- a/service/test/unit/runserver_test.py +++ /dev/null @@ -1,88 +0,0 @@ -# -# Copyright (c) 2014 ThoughtWorks, Inc. -# -# Pixelated is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Pixelated is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with Pixelated. If not, see . - -import unittest -import sys -import thread -import json - -import os -import pixelated.config.app_factory -import pixelated.runserver -from mockito import * -import pixelated.config.app_factory as app_factory -from leap.common.events import server as events_server - - -class RunserverTest(unittest.TestCase): - - def setUp(self): - events_server.ensure_server = lambda port=None: None - when(app_factory).create_app().thenReturn(None) - - def tearDown(self): - unstub() - - def test_that_config_file_can_be_specified_on_command_line(self): - self.config_file_loaded = None - - def _mock_parse_config_from_file(config_file): - self.config_file_loaded = config_file - return 1, 2, 3 - - pixelated.runserver.parse_config_from_file = _mock_parse_config_from_file - sys.argv = ['pixelated-user-agent', '--config', 'pixelated.cfg'] - - pixelated.runserver.setup() - - self.assertEquals('pixelated.cfg', self.config_file_loaded) - - def test_that_organization_switch_reads_the_credentials_from_pipe(self): - fifo_path = '/tmp/credentials-pipe' - if os.path.exists(fifo_path): - os.remove(fifo_path) - test_fifo = os.mkfifo('/tmp/credentials-pipe') - thread.start_new_thread(self.spin_up_fifo, (fifo_path,)) - sys.argv = ['tmp/does_not_exist', '--dispatcher', fifo_path] - pixelated.runserver.setup() - - def test_that_organization_switch_reads_the_credentials_from_stdin(self): - data = json.dumps({'leap_provider_hostname': 'test_provider', 'user': 'test_user', 'password': 'test_password'}) - orig_stdin = sys.stdin - try: - sys.stdin = Mock() - when(sys.stdin).read().thenReturn(data) - - sys.argv = ['tmp/does_not_exist', '--dispatcher-stdin'] - pixelated.runserver.setup() - - self.assertEquals('test_provider', pixelated.runserver.app.config['LEAP_SERVER_NAME']) - self.assertEquals('test_user', pixelated.runserver.app.config['LEAP_USERNAME']) - self.assertEquals('test_password', pixelated.runserver.app.config['LEAP_PASSWORD']) - finally: - sys.stdin = orig_stdin - - def test_start_services_pass_args_through(self): - args = {} - when(app_factory).create_app(any(), args).thenReturn(None) - - pixelated.runserver.start_services(args) - - verify(app_factory).create_app(any(), args) - - def spin_up_fifo(self, test_fifo): - with open(test_fifo, 'w') as fifo: - fifo.write(json.dumps({'leap_provider_hostname': 'test_provider', 'user': 'test_user', 'password': 'test_password'})) diff --git a/service/test/unit/support/encrypted_file_storage_test.py b/service/test/unit/support/encrypted_file_storage_test.py deleted file mode 100644 index 2a6735c3..00000000 --- a/service/test/unit/support/encrypted_file_storage_test.py +++ /dev/null @@ -1,64 +0,0 @@ -# -# Copyright (c) 2014 ThoughtWorks, Inc. -# -# Pixelated is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Pixelated is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with Pixelated. If not, see . -import os -import shutil -import unittest -from pixelated.support.encrypted_file_storage import EncryptedFileStorage - - -class EncryptedFileStorageTest(unittest.TestCase): - - def setUp(self): - self.key = '2\x06\xf87F:\xd2\xe2]w\xc9\x0c\xb8\x9b\x8e\xd3\x92\t\xabHu\xa6\xa3\x9a\x8d\xec\x0c\xab<8\xbb\x12\xfbP\xf2\x83"\xa1\xcf7\x92\xb0!\xfe\xebM\x80\x8a\x14\xe6\xf9xr\xf5#\x8f\x1bs\xb3#\x0e)a\xd8' - self.msg = 'this is a very, very secret binary message: \xbe\xba\xca\xfe' - self.path = os.path.join('tmp', 'search_test') - self.storage = EncryptedFileStorage(self.path, self.key) - - def tearDown(self): - if os.path.exists(self.path): - shutil.rmtree(self.path) - - def test_encrypt_decrypt(self): - storage, msg = self.storage, self.msg - - ciphertext = storage.encrypt(msg) - - self.assertNotEquals(msg, ciphertext) - self.assertEquals(msg, storage.decrypt(ciphertext)) - - def test_mac_against_appended_garbage(self): - storage, msg = self.storage, self.msg - - ciphertext = storage.encrypt(msg) - corrupted_ciphertext = ciphertext + 'garbage' - - try: - storage.decrypt(corrupted_ciphertext) - self.fail('MAC is not detecting appended garbage on ciphertext') - except: - pass - - def test_mac_against_modified_file(self): - storage, msg = self.storage, self.msg - - ciphertext = storage.encrypt(msg) - corrupted_ciphertext = ''.join([chr(ord(i) >> 1) for i in ciphertext]) - - try: - storage.decrypt(corrupted_ciphertext) - self.fail('MAC is not detecting corrupt ciphertext') - except: - pass diff --git a/service/test/unit/support/test_encrypted_file_storage.py b/service/test/unit/support/test_encrypted_file_storage.py new file mode 100644 index 00000000..2a6735c3 --- /dev/null +++ b/service/test/unit/support/test_encrypted_file_storage.py @@ -0,0 +1,64 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see . +import os +import shutil +import unittest +from pixelated.support.encrypted_file_storage import EncryptedFileStorage + + +class EncryptedFileStorageTest(unittest.TestCase): + + def setUp(self): + self.key = '2\x06\xf87F:\xd2\xe2]w\xc9\x0c\xb8\x9b\x8e\xd3\x92\t\xabHu\xa6\xa3\x9a\x8d\xec\x0c\xab<8\xbb\x12\xfbP\xf2\x83"\xa1\xcf7\x92\xb0!\xfe\xebM\x80\x8a\x14\xe6\xf9xr\xf5#\x8f\x1bs\xb3#\x0e)a\xd8' + self.msg = 'this is a very, very secret binary message: \xbe\xba\xca\xfe' + self.path = os.path.join('tmp', 'search_test') + self.storage = EncryptedFileStorage(self.path, self.key) + + def tearDown(self): + if os.path.exists(self.path): + shutil.rmtree(self.path) + + def test_encrypt_decrypt(self): + storage, msg = self.storage, self.msg + + ciphertext = storage.encrypt(msg) + + self.assertNotEquals(msg, ciphertext) + self.assertEquals(msg, storage.decrypt(ciphertext)) + + def test_mac_against_appended_garbage(self): + storage, msg = self.storage, self.msg + + ciphertext = storage.encrypt(msg) + corrupted_ciphertext = ciphertext + 'garbage' + + try: + storage.decrypt(corrupted_ciphertext) + self.fail('MAC is not detecting appended garbage on ciphertext') + except: + pass + + def test_mac_against_modified_file(self): + storage, msg = self.storage, self.msg + + ciphertext = storage.encrypt(msg) + corrupted_ciphertext = ''.join([chr(ord(i) >> 1) for i in ciphertext]) + + try: + storage.decrypt(corrupted_ciphertext) + self.fail('MAC is not detecting corrupt ciphertext') + except: + pass diff --git a/service/test/unit/test_runserver.py b/service/test/unit/test_runserver.py new file mode 100644 index 00000000..99b502f1 --- /dev/null +++ b/service/test/unit/test_runserver.py @@ -0,0 +1,88 @@ +# +# Copyright (c) 2014 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see . + +import unittest +import sys +import thread +import json + +import os +import pixelated.config.app_factory +import pixelated.runserver +from mockito import * +import pixelated.config.app_factory as app_factory +from leap.common.events import server as events_server + + +class RunserverTest(unittest.TestCase): + + def setUp(self): + events_server.ensure_server = lambda port=None: None + when(app_factory).create_app().thenReturn(None) + + def tearDown(self): + unstub() + + def test_that_config_file_can_be_specified_on_command_line(self): + self.config_file_loaded = None + + def _mock_parse_config_from_file(config_file): + self.config_file_loaded = config_file + return 1, 2, 3 + + pixelated.runserver.parse_config_from_file = _mock_parse_config_from_file + sys.argv = ['pixelated-user-agent', '--config', 'pixelated.cfg'] + + pixelated.runserver.setup() + + self.assertEquals('pixelated.cfg', self.config_file_loaded) + + def test_that_organization_switch_reads_the_credentials_from_pipe(self): + fifo_path = '/tmp/credentials-pipe' + if os.path.exists(fifo_path): + os.remove(fifo_path) + test_fifo = os.mkfifo('/tmp/credentials-pipe') + thread.start_new_thread(self.spin_up_fifo, (fifo_path,)) + sys.argv = ['tmp/does_not_exist', '--dispatcher', fifo_path] + pixelated.runserver.setup() + + def test_that_organization_switch_reads_the_credentials_from_stdin(self): + data = json.dumps({'leap_provider_hostname': 'test_provider', 'user': 'test_user', 'password': 'test_password'}) + orig_stdin = sys.stdin + try: + sys.stdin = Mock() + when(sys.stdin).read().thenReturn(data) + + sys.argv = ['tmp/does_not_exist', '--dispatcher-stdin'] + pixelated.runserver.setup() + + self.assertEquals('test_provider', pixelated.runserver.app.config['LEAP_SERVER_NAME']) + self.assertEquals('test_user', pixelated.runserver.app.config['LEAP_USERNAME']) + self.assertEquals('test_password', pixelated.runserver.app.config['LEAP_PASSWORD']) + finally: + sys.stdin = orig_stdin + + def test_start_services_pass_args_through(self): + args = {} + when(app_factory).create_app(any(), args).thenReturn(None) + + pixelated.runserver.start_services(args) + + verify(app_factory).create_app(any(), args) + + def spin_up_fifo(self, test_fifo): + with open(test_fifo, 'w') as fifo: + fifo.write(json.dumps({'leap_provider_hostname': 'test_provider', 'user': 'test_user', 'password': 'test_password'})) -- cgit v1.2.3