From 84888155b09b3af6a755262b28728de2f851c8cb Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Sun, 4 Jan 2015 03:37:18 -0400 Subject: tests for mail.mail module: Message --- src/leap/mail/tests/test_mail.py | 255 +++++++++++++++++++++++++++++++++++---- 1 file changed, 234 insertions(+), 21 deletions(-) (limited to 'src/leap/mail/tests/test_mail.py') diff --git a/src/leap/mail/tests/test_mail.py b/src/leap/mail/tests/test_mail.py index ce2366c..cb97be5 100644 --- a/src/leap/mail/tests/test_mail.py +++ b/src/leap/mail/tests/test_mail.py @@ -17,24 +17,48 @@ """ Tests for the mail module. """ +import time import os from functools import partial +from email.parser import Parser +from email.Utils import formatdate + +from twisted.python import util from leap.mail.adaptors.soledad import SoledadMailAdaptor from leap.mail.mail import MessageCollection from leap.mail.mailbox_indexer import MailboxIndexer from leap.mail.tests.common import SoledadTestMixin -from twisted.internet import defer +# from twisted.internet import defer from twisted.trial import unittest HERE = os.path.split(os.path.abspath(__file__))[0] -class MessageCollectionTestCase(unittest.TestCase, SoledadTestMixin): - """ - Tests for the SoledadDocumentWrapper. - """ +def _get_raw_msg(multi=False): + if multi: + sample = "rfc822.multi.message" + else: + sample = "rfc822.message" + with open(os.path.join(HERE, sample)) as f: + raw = f.read() + return raw + + +def _get_parsed_msg(multi=False): + mail_parser = Parser() + raw = _get_raw_msg(multi=multi) + return mail_parser.parsestr(raw) + + +def _get_msg_time(): + timestamp = time.mktime((2010, 12, 12, 1, 1, 1, 1, 1, 1)) + return formatdate(timestamp) + + + +class CollectionMixin(object): def get_collection(self, mbox_collection=True): """ @@ -61,35 +85,224 @@ class MessageCollectionTestCase(unittest.TestCase, SoledadTestMixin): d.addCallback(get_collection_from_mbox_wrapper) return d - def test_is_mailbox_collection(self): - def assert_is_mbox_collection(collection): - self.assertTrue(collection.is_mailbox_collection()) +class MessageTestCase(unittest.TestCase, SoledadTestMixin, CollectionMixin): + """ + Tests for the Message class. + """ + msg_flags = ('\Recent', '\Unseen', '\TestFlag') + msg_tags = ('important', 'todo', 'wonderful') + internal_date = "19-Mar-2015 19:22:21 -0500" + maxDiff = None + + def _do_insert_msg(self, multi=False): + """ + Inserts and return a regular message, for tests. + """ + raw = _get_raw_msg(multi=multi) d = self.get_collection() - d.addCallback(assert_is_mbox_collection) + d.addCallback(lambda col: col.add_msg( + raw, flags=self.msg_flags, tags=self.msg_tags, + date=self.internal_date)) + return d + + def get_inserted_msg(self, multi=False): + d = self._do_insert_msg(multi=multi) + d.addCallback(lambda _: self.get_collection()) + d.addCallback(lambda col: col.get_message_by_uid(1)) + return d + + def test_get_flags(self): + d = self.get_inserted_msg() + d.addCallback(self._test_get_flags_cb) + return d + + def _test_get_flags_cb(self, msg): + self.assertTrue(msg is not None) + self.assertEquals(msg.get_flags(), self.msg_flags) + + def test_get_internal_date(self): + d = self.get_inserted_msg() + d.addCallback(self._test_get_internal_date_cb) + + def _test_get_internal_date_cb(self, msg): + self.assertTrue(msg is not None) + self.assertDictEqual(msg.get_internal_date(), + self.internal_date) + + def test_get_headers(self): + d = self.get_inserted_msg() + d.addCallback(self._test_get_headers_cb) + return d + + def _test_get_headers_cb(self, msg): + self.assertTrue(msg is not None) + expected = _get_parsed_msg().items() + self.assertEqual(msg.get_headers(), expected) + + def test_get_body_file(self): + d = self.get_inserted_msg(multi=True) + d.addCallback(self._test_get_body_file_cb) + return d + + def _test_get_body_file_cb(self, msg): + self.assertTrue(msg is not None) + orig = _get_parsed_msg(multi=True) + expected = orig.get_payload()[0].get_payload() + d = msg.get_body_file(self._soledad) + + def assert_body(fd): + self.assertTrue(fd is not None) + self.assertEqual(fd.read(), expected) + d.addCallback(assert_body) + return d + + def test_get_size(self): + d = self.get_inserted_msg() + d.addCallback(self._test_get_size_cb) + return d + + def _test_get_size_cb(self, msg): + self.assertTrue(msg is not None) + expected = len(_get_parsed_msg().as_string()) + self.assertEqual(msg.get_size(), expected) + + def test_is_multipart_no(self): + d = self.get_inserted_msg() + d.addCallback(self._test_is_multipart_no_cb) + return d + + def _test_is_multipart_no_cb(self, msg): + self.assertTrue(msg is not None) + expected = _get_parsed_msg().is_multipart() + self.assertEqual(msg.is_multipart(), expected) + + def test_is_multipart_yes(self): + d = self.get_inserted_msg(multi=True) + d.addCallback(self._test_is_multipart_yes_cb) + return d + + def _test_is_multipart_yes_cb(self, msg): + self.assertTrue(msg is not None) + expected = _get_parsed_msg(multi=True).is_multipart() + self.assertEqual(msg.is_multipart(), expected) + + def test_get_subpart(self): + d = self.get_inserted_msg(multi=True) + d.addCallback(self._test_get_subpart_cb) + return d + + def _test_get_subpart_cb(self, msg): + self.assertTrue(msg is not None) + + def test_get_tags(self): + d = self.get_inserted_msg() + d.addCallback(self._test_get_tags_cb) return d - def assert_collection_count(self, _, expected, collection): + def _test_get_tags_cb(self, msg): + self.assertTrue(msg is not None) + self.assertEquals(msg.get_tags(), self.msg_tags) + +class MessageCollectionTestCase(unittest.TestCase, + SoledadTestMixin, CollectionMixin): + """ + Tests for the MessageCollection class. + """ + def assert_collection_count(self, _, expected): def _assert_count(count): self.assertEqual(count, expected) - d = collection.count() + + d = self.get_collection() + d.addCallback(lambda col: col.count()) d.addCallback(_assert_count) return d - def test_add_msg(self): + def add_msg_to_collection(self): + raw = _get_raw_msg() - with open(os.path.join(HERE, "rfc822.message")) as f: - raw = f.read() - - def add_msg_to_collection_and_assert_count(collection): - d = collection.add_msg(raw) - d.addCallback(partial( - self.assert_collection_count, - expected=1, collection=collection)) + def add_msg_to_collection(collection): + d = collection.add_msg(raw, date=_get_msg_time()) return d + d = self.get_collection() + d.addCallback(add_msg_to_collection) + return d + def test_is_mailbox_collection(self): d = self.get_collection() - d.addCallback(add_msg_to_collection_and_assert_count) + d.addCallback(self._test_is_mailbox_collection_cb) + return d + + def _test_is_mailbox_collection_cb(self, collection): + self.assertTrue(collection.is_mailbox_collection()) + + def test_get_uid_next(self): + d = self.add_msg_to_collection() + d.addCallback(lambda _: self.get_collection()) + d.addCallback(lambda col: col.get_uid_next()) + d.addCallback(self._test_get_uid_next_cb) + + def _test_get_uid_next_cb(self, next_uid): + self.assertEqual(next_uid, 2) + + def test_add_and_count_msg(self): + d = self.add_msg_to_collection() + d.addCallback(self._test_add_and_count_msg_cb) + return d + + def _test_add_and_count_msg_cb(self, _): + return partial(self.assert_collection_count, expected=1) + + def test_coppy_msg(self): + self.fail() + + def test_delete_msg(self): + self.fail() + + def test_update_flags(self): + d = self.add_msg_to_collection() + d.addCallback(self._test_update_flags_cb) + return d + + def _test_update_flags_cb(self, msg): + pass + + def test_update_tags(self): + d = self.add_msg_to_collection() + d.addCallback(self._test_update_tags_cb) return d + + def _test_update_tags_cb(self, msg): + pass + + +class AccountTestCase(unittest.TestCase, SoledadTestMixin): + """ + Tests for the Account class. + """ + + def test_add_mailbox(self): + self.fail() + + def test_delete_mailbox(self): + self.fail() + + def test_rename_mailbox(self): + self.fail() + + def test_list_all_mailbox_names(self): + self.fail() + + def test_get_all_mailboxes(self): + self.fail() + + def test_get_collection_by_docs(self): + self.fail() + + def test_get_collection_by_mailbox(self): + self.fail() + + def test_get_collection_by_tag(self): + self.fail() -- cgit v1.2.3