summaryrefslogtreecommitdiff
path: root/tests/integration/mail/test_mail.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/integration/mail/test_mail.py')
-rw-r--r--tests/integration/mail/test_mail.py399
1 files changed, 399 insertions, 0 deletions
diff --git a/tests/integration/mail/test_mail.py b/tests/integration/mail/test_mail.py
new file mode 100644
index 0000000..637340d
--- /dev/null
+++ b/tests/integration/mail/test_mail.py
@@ -0,0 +1,399 @@
+# -*- coding: utf-8 -*-
+# test_mail.py
+# Copyright (C) 2014-2016 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for the mail module.
+"""
+import os
+import time
+import uuid
+
+from functools import partial
+from email.parser import Parser
+from email.Utils import formatdate
+
+from leap.bitmask.mail.adaptors.soledad import SoledadMailAdaptor
+from leap.bitmask.mail.mail import MessageCollection, Account, _unpack_headers
+from leap.bitmask.mail.mailbox_indexer import MailboxIndexer
+from leap.bitmask.mail.testing.common import SoledadTestMixin
+
+HERE = os.path.split(os.path.abspath(__file__))[0]
+
+
+def _get_raw_msg(multi=False):
+ if multi:
+ sample = "rfc822.multi.message"
+ else:
+ sample = "rfc822.message"
+ with open(os.path.join(HERE, sample)) as f:
+ raw = f.read()
+ return raw
+
+
+def _get_parsed_msg(multi=False):
+ mail_parser = Parser()
+ raw = _get_raw_msg(multi=multi)
+ return mail_parser.parsestr(raw)
+
+
+def _get_msg_time():
+ timestamp = time.mktime((2010, 12, 12, 1, 1, 1, 1, 1, 1))
+ return formatdate(timestamp)
+
+
+class CollectionMixin(object):
+
+ def get_collection(self, mbox_collection=True, mbox_name=None,
+ mbox_uuid=None):
+ """
+ Get a collection for tests.
+ """
+ adaptor = SoledadMailAdaptor()
+ store = self._soledad
+ adaptor.store = store
+
+ if mbox_collection:
+ mbox_indexer = MailboxIndexer(store)
+ mbox_name = mbox_name or "TestMbox"
+ mbox_uuid = mbox_uuid or str(uuid.uuid4())
+ else:
+ mbox_indexer = mbox_name = None
+
+ def get_collection_from_mbox_wrapper(wrapper):
+ wrapper.uuid = mbox_uuid
+ return MessageCollection(
+ adaptor, store,
+ mbox_indexer=mbox_indexer, mbox_wrapper=wrapper)
+
+ d = adaptor.initialize_store(store)
+ if mbox_collection:
+ d.addCallback(lambda _: mbox_indexer.create_table(mbox_uuid))
+ d.addCallback(lambda _: adaptor.get_or_create_mbox(store, mbox_name))
+ d.addCallback(get_collection_from_mbox_wrapper)
+ return d
+
+
+# TODO profile add_msg. Why are these tests so SLOW??!
+class MessageTestCase(SoledadTestMixin, CollectionMixin):
+ """
+ Tests for the Message class.
+ """
+ msg_flags = ('\Recent', '\Unseen', '\TestFlag')
+ msg_tags = ('important', 'todo', 'wonderful')
+ internal_date = "19-Mar-2015 19:22:21 -0500"
+
+ maxDiff = None
+
+ def _do_insert_msg(self, multi=False):
+ """
+ Inserts and return a regular message, for tests.
+ """
+ def insert_message(collection):
+ self._mbox_uuid = collection.mbox_uuid
+ return collection.add_msg(
+ raw, flags=self.msg_flags, tags=self.msg_tags,
+ date=self.internal_date)
+
+ raw = _get_raw_msg(multi=multi)
+
+ d = self.get_collection()
+ d.addCallback(insert_message)
+ return d
+
+ def get_inserted_msg(self, multi=False):
+ d = self._do_insert_msg(multi=multi)
+ d.addCallback(lambda _: self.get_collection(mbox_uuid=self._mbox_uuid))
+ d.addCallback(lambda col: col.get_message_by_uid(1))
+ return d
+
+ def test_get_flags(self):
+ d = self.get_inserted_msg()
+ d.addCallback(self._test_get_flags_cb)
+ return d
+
+ def _test_get_flags_cb(self, msg):
+ self.assertTrue(msg is not None)
+ self.assertEquals(tuple(msg.get_flags()), self.msg_flags)
+
+ def test_get_internal_date(self):
+ d = self.get_inserted_msg()
+ d.addCallback(self._test_get_internal_date_cb)
+
+ def _test_get_internal_date_cb(self, msg):
+ self.assertTrue(msg is not None)
+ self.assertDictEqual(msg.get_internal_date(),
+ self.internal_date)
+
+ def test_get_headers(self):
+ d = self.get_inserted_msg()
+ d.addCallback(self._test_get_headers_cb)
+ return d
+
+ def _test_get_headers_cb(self, msg):
+ self.assertTrue(msg is not None)
+ expected = [
+ (str(key.lower()), str(value))
+ for (key, value) in _get_parsed_msg().items()]
+ self.assertItemsEqual(_unpack_headers(msg.get_headers()), expected)
+
+ def test_get_body_file(self):
+ d = self.get_inserted_msg(multi=True)
+ d.addCallback(self._test_get_body_file_cb)
+ return d
+
+ def _test_get_body_file_cb(self, msg):
+ self.assertTrue(msg is not None)
+ orig = _get_parsed_msg(multi=True)
+ expected = orig.get_payload()[0].get_payload()
+ d = msg.get_body_file(self._soledad)
+
+ def assert_body(fd):
+ self.assertTrue(fd is not None)
+ self.assertEqual(fd.read(), expected)
+ d.addCallback(assert_body)
+ return d
+
+ def test_get_size(self):
+ d = self.get_inserted_msg()
+ d.addCallback(self._test_get_size_cb)
+ return d
+
+ def _test_get_size_cb(self, msg):
+ self.assertTrue(msg is not None)
+ expected = len(_get_parsed_msg().as_string())
+ self.assertEqual(msg.get_size(), expected)
+
+ def test_is_multipart_no(self):
+ d = self.get_inserted_msg()
+ d.addCallback(self._test_is_multipart_no_cb)
+ return d
+
+ def _test_is_multipart_no_cb(self, msg):
+ self.assertTrue(msg is not None)
+ expected = _get_parsed_msg().is_multipart()
+ self.assertEqual(msg.is_multipart(), expected)
+
+ def test_is_multipart_yes(self):
+ d = self.get_inserted_msg(multi=True)
+ d.addCallback(self._test_is_multipart_yes_cb)
+ return d
+
+ def _test_is_multipart_yes_cb(self, msg):
+ self.assertTrue(msg is not None)
+ expected = _get_parsed_msg(multi=True).is_multipart()
+ self.assertEqual(msg.is_multipart(), expected)
+
+ def test_get_subpart(self):
+ d = self.get_inserted_msg(multi=True)
+ d.addCallback(self._test_get_subpart_cb)
+ return d
+
+ def _test_get_subpart_cb(self, msg):
+ self.assertTrue(msg is not None)
+
+ def test_get_tags(self):
+ d = self.get_inserted_msg()
+ d.addCallback(self._test_get_tags_cb)
+ return d
+
+ def _test_get_tags_cb(self, msg):
+ self.assertTrue(msg is not None)
+ self.assertEquals(msg.get_tags(), self.msg_tags)
+
+
+class MessageCollectionTestCase(SoledadTestMixin, CollectionMixin):
+ """
+ Tests for the MessageCollection class.
+ """
+ _mbox_uuid = None
+
+ def assert_collection_count(self, _, expected):
+ def _assert_count(count):
+ self.assertEqual(count, expected)
+
+ d = self.get_collection()
+ d.addCallback(lambda col: col.count())
+ d.addCallback(_assert_count)
+ return d
+
+ def add_msg_to_collection(self):
+ raw = _get_raw_msg()
+
+ def add_msg_to_collection(collection):
+ # We keep the uuid in case we need to instantiate the same
+ # collection afterwards.
+ self._mbox_uuid = collection.mbox_uuid
+ d = collection.add_msg(raw, date=_get_msg_time())
+ return d
+
+ d = self.get_collection()
+ d.addCallback(add_msg_to_collection)
+ return d
+
+ def test_is_mailbox_collection(self):
+ d = self.get_collection()
+ d.addCallback(self._test_is_mailbox_collection_cb)
+ return d
+
+ def _test_is_mailbox_collection_cb(self, collection):
+ self.assertTrue(collection.is_mailbox_collection())
+
+ def test_get_uid_next(self):
+ d = self.add_msg_to_collection()
+ d.addCallback(lambda _: self.get_collection())
+ d.addCallback(lambda col: col.get_uid_next())
+ d.addCallback(self._test_get_uid_next_cb)
+
+ def _test_get_uid_next_cb(self, next_uid):
+ self.assertEqual(next_uid, 2)
+
+ def test_add_and_count_msg(self):
+ d = self.add_msg_to_collection()
+ d.addCallback(self._test_add_and_count_msg_cb)
+ return d
+
+ def _test_add_and_count_msg_cb(self, _):
+ return partial(self.assert_collection_count, expected=1)
+
+ def test_copy_msg(self):
+ # TODO ---- update when implementing messagecopier
+ # interface
+ pass
+ test_copy_msg.skip = "Not yet implemented"
+
+ def test_delete_msg(self):
+ d = self.add_msg_to_collection()
+
+ def del_msg(collection):
+ def _delete_it(msg):
+ self.assertTrue(msg is not None)
+ return collection.delete_msg(msg)
+
+ d = collection.get_message_by_uid(1)
+ d.addCallback(_delete_it)
+ return d
+
+ # We need to instantiate an mbox collection with the same uuid that
+ # the one in which we inserted the doc.
+ d.addCallback(lambda _: self.get_collection(mbox_uuid=self._mbox_uuid))
+ d.addCallback(del_msg)
+ d.addCallback(self._test_delete_msg_cb)
+ return d
+
+ def _test_delete_msg_cb(self, _):
+ return partial(self.assert_collection_count, expected=0)
+
+ def test_update_flags(self):
+ d = self.add_msg_to_collection()
+ d.addCallback(self._test_update_flags_cb)
+ return d
+
+ def _test_update_flags_cb(self, msg):
+ pass
+
+ def test_update_tags(self):
+ d = self.add_msg_to_collection()
+ d.addCallback(self._test_update_tags_cb)
+ return d
+
+ def _test_update_tags_cb(self, msg):
+ pass
+
+
+class AccountTestCase(SoledadTestMixin):
+ """
+ Tests for the Account class.
+ """
+ def get_account(self, user_id):
+ store = self._soledad
+ return Account(store, user_id)
+
+ def test_add_mailbox(self):
+ acc = self.get_account('some_user_id')
+ d = acc.callWhenReady(lambda _: acc.add_mailbox("TestMailbox"))
+ d.addCallback(lambda _: acc.list_all_mailbox_names())
+ d.addCallback(self._test_add_mailbox_cb)
+ return d
+
+ def _test_add_mailbox_cb(self, mboxes):
+ expected = ['INBOX', 'TestMailbox']
+ self.assertItemsEqual(mboxes, expected)
+
+ def test_delete_mailbox(self):
+ acc = self.get_account('some_user_id')
+ d = acc.callWhenReady(lambda _: acc.delete_mailbox("Inbox"))
+ d.addCallback(lambda _: acc.list_all_mailbox_names())
+ d.addCallback(self._test_delete_mailbox_cb)
+ return d
+
+ def _test_delete_mailbox_cb(self, mboxes):
+ expected = []
+ self.assertItemsEqual(mboxes, expected)
+
+ def test_rename_mailbox(self):
+ acc = self.get_account('some_user_id')
+ d = acc.callWhenReady(lambda _: acc.add_mailbox("OriginalMailbox"))
+ d.addCallback(lambda _: acc.rename_mailbox(
+ "OriginalMailbox", "RenamedMailbox"))
+ d.addCallback(lambda _: acc.list_all_mailbox_names())
+ d.addCallback(self._test_rename_mailbox_cb)
+ return d
+
+ def _test_rename_mailbox_cb(self, mboxes):
+ expected = ['INBOX', 'RenamedMailbox']
+ self.assertItemsEqual(mboxes, expected)
+
+ def test_get_all_mailboxes(self):
+ acc = self.get_account('some_user_id')
+ d = acc.callWhenReady(lambda _: acc.add_mailbox("OneMailbox"))
+ d.addCallback(lambda _: acc.add_mailbox("TwoMailbox"))
+ d.addCallback(lambda _: acc.add_mailbox("ThreeMailbox"))
+ d.addCallback(lambda _: acc.add_mailbox("anotherthing"))
+ d.addCallback(lambda _: acc.add_mailbox("anotherthing2"))
+ d.addCallback(lambda _: acc.get_all_mailboxes())
+ d.addCallback(self._test_get_all_mailboxes_cb)
+ return d
+
+ def _test_get_all_mailboxes_cb(self, mailboxes):
+ expected = ["INBOX", "OneMailbox", "TwoMailbox", "ThreeMailbox",
+ "anotherthing", "anotherthing2"]
+ names = [m.mbox for m in mailboxes]
+ self.assertItemsEqual(names, expected)
+
+ def test_get_collection_by_mailbox(self):
+ acc = self.get_account('some_user_id')
+ d = acc.callWhenReady(lambda _: acc.get_collection_by_mailbox("INBOX"))
+ d.addCallback(self._test_get_collection_by_mailbox_cb)
+ return d
+
+ def _test_get_collection_by_mailbox_cb(self, collection):
+ self.assertTrue(collection.is_mailbox_collection())
+
+ def assert_uid_next_empty_collection(uid):
+ self.assertEqual(uid, 1)
+ d = collection.get_uid_next()
+ d.addCallback(assert_uid_next_empty_collection)
+ return d
+
+ def test_get_collection_by_docs(self):
+ pass
+
+ test_get_collection_by_docs.skip = "Not yet implemented"
+
+ def test_get_collection_by_tag(self):
+ pass
+
+ test_get_collection_by_tag.skip = "Not yet implemented"