diff options
Diffstat (limited to 'tests/integration/mail/test_mail.py')
-rw-r--r-- | tests/integration/mail/test_mail.py | 399 |
1 files changed, 399 insertions, 0 deletions
diff --git a/tests/integration/mail/test_mail.py b/tests/integration/mail/test_mail.py new file mode 100644 index 00000000..637340d5 --- /dev/null +++ b/tests/integration/mail/test_mail.py @@ -0,0 +1,399 @@ +# -*- coding: utf-8 -*- +# test_mail.py +# Copyright (C) 2014-2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for the mail module. +""" +import os +import time +import uuid + +from functools import partial +from email.parser import Parser +from email.Utils import formatdate + +from leap.bitmask.mail.adaptors.soledad import SoledadMailAdaptor +from leap.bitmask.mail.mail import MessageCollection, Account, _unpack_headers +from leap.bitmask.mail.mailbox_indexer import MailboxIndexer +from leap.bitmask.mail.testing.common import SoledadTestMixin + +HERE = os.path.split(os.path.abspath(__file__))[0] + + +def _get_raw_msg(multi=False): + if multi: + sample = "rfc822.multi.message" + else: + sample = "rfc822.message" + with open(os.path.join(HERE, sample)) as f: + raw = f.read() + return raw + + +def _get_parsed_msg(multi=False): + mail_parser = Parser() + raw = _get_raw_msg(multi=multi) + return mail_parser.parsestr(raw) + + +def _get_msg_time(): + timestamp = time.mktime((2010, 12, 12, 1, 1, 1, 1, 1, 1)) + return formatdate(timestamp) + + +class CollectionMixin(object): + + def get_collection(self, mbox_collection=True, mbox_name=None, + mbox_uuid=None): + """ + Get a collection for tests. + """ + adaptor = SoledadMailAdaptor() + store = self._soledad + adaptor.store = store + + if mbox_collection: + mbox_indexer = MailboxIndexer(store) + mbox_name = mbox_name or "TestMbox" + mbox_uuid = mbox_uuid or str(uuid.uuid4()) + else: + mbox_indexer = mbox_name = None + + def get_collection_from_mbox_wrapper(wrapper): + wrapper.uuid = mbox_uuid + return MessageCollection( + adaptor, store, + mbox_indexer=mbox_indexer, mbox_wrapper=wrapper) + + d = adaptor.initialize_store(store) + if mbox_collection: + d.addCallback(lambda _: mbox_indexer.create_table(mbox_uuid)) + d.addCallback(lambda _: adaptor.get_or_create_mbox(store, mbox_name)) + d.addCallback(get_collection_from_mbox_wrapper) + return d + + +# TODO profile add_msg. Why are these tests so SLOW??! +class MessageTestCase(SoledadTestMixin, CollectionMixin): + """ + Tests for the Message class. + """ + msg_flags = ('\Recent', '\Unseen', '\TestFlag') + msg_tags = ('important', 'todo', 'wonderful') + internal_date = "19-Mar-2015 19:22:21 -0500" + + maxDiff = None + + def _do_insert_msg(self, multi=False): + """ + Inserts and return a regular message, for tests. + """ + def insert_message(collection): + self._mbox_uuid = collection.mbox_uuid + return collection.add_msg( + raw, flags=self.msg_flags, tags=self.msg_tags, + date=self.internal_date) + + raw = _get_raw_msg(multi=multi) + + d = self.get_collection() + d.addCallback(insert_message) + return d + + def get_inserted_msg(self, multi=False): + d = self._do_insert_msg(multi=multi) + d.addCallback(lambda _: self.get_collection(mbox_uuid=self._mbox_uuid)) + d.addCallback(lambda col: col.get_message_by_uid(1)) + return d + + def test_get_flags(self): + d = self.get_inserted_msg() + d.addCallback(self._test_get_flags_cb) + return d + + def _test_get_flags_cb(self, msg): + self.assertTrue(msg is not None) + self.assertEquals(tuple(msg.get_flags()), self.msg_flags) + + def test_get_internal_date(self): + d = self.get_inserted_msg() + d.addCallback(self._test_get_internal_date_cb) + + def _test_get_internal_date_cb(self, msg): + self.assertTrue(msg is not None) + self.assertDictEqual(msg.get_internal_date(), + self.internal_date) + + def test_get_headers(self): + d = self.get_inserted_msg() + d.addCallback(self._test_get_headers_cb) + return d + + def _test_get_headers_cb(self, msg): + self.assertTrue(msg is not None) + expected = [ + (str(key.lower()), str(value)) + for (key, value) in _get_parsed_msg().items()] + self.assertItemsEqual(_unpack_headers(msg.get_headers()), expected) + + def test_get_body_file(self): + d = self.get_inserted_msg(multi=True) + d.addCallback(self._test_get_body_file_cb) + return d + + def _test_get_body_file_cb(self, msg): + self.assertTrue(msg is not None) + orig = _get_parsed_msg(multi=True) + expected = orig.get_payload()[0].get_payload() + d = msg.get_body_file(self._soledad) + + def assert_body(fd): + self.assertTrue(fd is not None) + self.assertEqual(fd.read(), expected) + d.addCallback(assert_body) + return d + + def test_get_size(self): + d = self.get_inserted_msg() + d.addCallback(self._test_get_size_cb) + return d + + def _test_get_size_cb(self, msg): + self.assertTrue(msg is not None) + expected = len(_get_parsed_msg().as_string()) + self.assertEqual(msg.get_size(), expected) + + def test_is_multipart_no(self): + d = self.get_inserted_msg() + d.addCallback(self._test_is_multipart_no_cb) + return d + + def _test_is_multipart_no_cb(self, msg): + self.assertTrue(msg is not None) + expected = _get_parsed_msg().is_multipart() + self.assertEqual(msg.is_multipart(), expected) + + def test_is_multipart_yes(self): + d = self.get_inserted_msg(multi=True) + d.addCallback(self._test_is_multipart_yes_cb) + return d + + def _test_is_multipart_yes_cb(self, msg): + self.assertTrue(msg is not None) + expected = _get_parsed_msg(multi=True).is_multipart() + self.assertEqual(msg.is_multipart(), expected) + + def test_get_subpart(self): + d = self.get_inserted_msg(multi=True) + d.addCallback(self._test_get_subpart_cb) + return d + + def _test_get_subpart_cb(self, msg): + self.assertTrue(msg is not None) + + def test_get_tags(self): + d = self.get_inserted_msg() + d.addCallback(self._test_get_tags_cb) + return d + + def _test_get_tags_cb(self, msg): + self.assertTrue(msg is not None) + self.assertEquals(msg.get_tags(), self.msg_tags) + + +class MessageCollectionTestCase(SoledadTestMixin, CollectionMixin): + """ + Tests for the MessageCollection class. + """ + _mbox_uuid = None + + def assert_collection_count(self, _, expected): + def _assert_count(count): + self.assertEqual(count, expected) + + d = self.get_collection() + d.addCallback(lambda col: col.count()) + d.addCallback(_assert_count) + return d + + def add_msg_to_collection(self): + raw = _get_raw_msg() + + def add_msg_to_collection(collection): + # We keep the uuid in case we need to instantiate the same + # collection afterwards. + self._mbox_uuid = collection.mbox_uuid + d = collection.add_msg(raw, date=_get_msg_time()) + return d + + d = self.get_collection() + d.addCallback(add_msg_to_collection) + return d + + def test_is_mailbox_collection(self): + d = self.get_collection() + d.addCallback(self._test_is_mailbox_collection_cb) + return d + + def _test_is_mailbox_collection_cb(self, collection): + self.assertTrue(collection.is_mailbox_collection()) + + def test_get_uid_next(self): + d = self.add_msg_to_collection() + d.addCallback(lambda _: self.get_collection()) + d.addCallback(lambda col: col.get_uid_next()) + d.addCallback(self._test_get_uid_next_cb) + + def _test_get_uid_next_cb(self, next_uid): + self.assertEqual(next_uid, 2) + + def test_add_and_count_msg(self): + d = self.add_msg_to_collection() + d.addCallback(self._test_add_and_count_msg_cb) + return d + + def _test_add_and_count_msg_cb(self, _): + return partial(self.assert_collection_count, expected=1) + + def test_copy_msg(self): + # TODO ---- update when implementing messagecopier + # interface + pass + test_copy_msg.skip = "Not yet implemented" + + def test_delete_msg(self): + d = self.add_msg_to_collection() + + def del_msg(collection): + def _delete_it(msg): + self.assertTrue(msg is not None) + return collection.delete_msg(msg) + + d = collection.get_message_by_uid(1) + d.addCallback(_delete_it) + return d + + # We need to instantiate an mbox collection with the same uuid that + # the one in which we inserted the doc. + d.addCallback(lambda _: self.get_collection(mbox_uuid=self._mbox_uuid)) + d.addCallback(del_msg) + d.addCallback(self._test_delete_msg_cb) + return d + + def _test_delete_msg_cb(self, _): + return partial(self.assert_collection_count, expected=0) + + def test_update_flags(self): + d = self.add_msg_to_collection() + d.addCallback(self._test_update_flags_cb) + return d + + def _test_update_flags_cb(self, msg): + pass + + def test_update_tags(self): + d = self.add_msg_to_collection() + d.addCallback(self._test_update_tags_cb) + return d + + def _test_update_tags_cb(self, msg): + pass + + +class AccountTestCase(SoledadTestMixin): + """ + Tests for the Account class. + """ + def get_account(self, user_id): + store = self._soledad + return Account(store, user_id) + + def test_add_mailbox(self): + acc = self.get_account('some_user_id') + d = acc.callWhenReady(lambda _: acc.add_mailbox("TestMailbox")) + d.addCallback(lambda _: acc.list_all_mailbox_names()) + d.addCallback(self._test_add_mailbox_cb) + return d + + def _test_add_mailbox_cb(self, mboxes): + expected = ['INBOX', 'TestMailbox'] + self.assertItemsEqual(mboxes, expected) + + def test_delete_mailbox(self): + acc = self.get_account('some_user_id') + d = acc.callWhenReady(lambda _: acc.delete_mailbox("Inbox")) + d.addCallback(lambda _: acc.list_all_mailbox_names()) + d.addCallback(self._test_delete_mailbox_cb) + return d + + def _test_delete_mailbox_cb(self, mboxes): + expected = [] + self.assertItemsEqual(mboxes, expected) + + def test_rename_mailbox(self): + acc = self.get_account('some_user_id') + d = acc.callWhenReady(lambda _: acc.add_mailbox("OriginalMailbox")) + d.addCallback(lambda _: acc.rename_mailbox( + "OriginalMailbox", "RenamedMailbox")) + d.addCallback(lambda _: acc.list_all_mailbox_names()) + d.addCallback(self._test_rename_mailbox_cb) + return d + + def _test_rename_mailbox_cb(self, mboxes): + expected = ['INBOX', 'RenamedMailbox'] + self.assertItemsEqual(mboxes, expected) + + def test_get_all_mailboxes(self): + acc = self.get_account('some_user_id') + d = acc.callWhenReady(lambda _: acc.add_mailbox("OneMailbox")) + d.addCallback(lambda _: acc.add_mailbox("TwoMailbox")) + d.addCallback(lambda _: acc.add_mailbox("ThreeMailbox")) + d.addCallback(lambda _: acc.add_mailbox("anotherthing")) + d.addCallback(lambda _: acc.add_mailbox("anotherthing2")) + d.addCallback(lambda _: acc.get_all_mailboxes()) + d.addCallback(self._test_get_all_mailboxes_cb) + return d + + def _test_get_all_mailboxes_cb(self, mailboxes): + expected = ["INBOX", "OneMailbox", "TwoMailbox", "ThreeMailbox", + "anotherthing", "anotherthing2"] + names = [m.mbox for m in mailboxes] + self.assertItemsEqual(names, expected) + + def test_get_collection_by_mailbox(self): + acc = self.get_account('some_user_id') + d = acc.callWhenReady(lambda _: acc.get_collection_by_mailbox("INBOX")) + d.addCallback(self._test_get_collection_by_mailbox_cb) + return d + + def _test_get_collection_by_mailbox_cb(self, collection): + self.assertTrue(collection.is_mailbox_collection()) + + def assert_uid_next_empty_collection(uid): + self.assertEqual(uid, 1) + d = collection.get_uid_next() + d.addCallback(assert_uid_next_empty_collection) + return d + + def test_get_collection_by_docs(self): + pass + + test_get_collection_by_docs.skip = "Not yet implemented" + + def test_get_collection_by_tag(self): + pass + + test_get_collection_by_tag.skip = "Not yet implemented" |