summaryrefslogtreecommitdiff
path: root/tests/integration/mail/test_mailbox_indexer.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/integration/mail/test_mailbox_indexer.py')
-rw-r--r--tests/integration/mail/test_mailbox_indexer.py250
1 files changed, 250 insertions, 0 deletions
diff --git a/tests/integration/mail/test_mailbox_indexer.py b/tests/integration/mail/test_mailbox_indexer.py
new file mode 100644
index 00000000..a3388d1a
--- /dev/null
+++ b/tests/integration/mail/test_mailbox_indexer.py
@@ -0,0 +1,250 @@
+# -*- coding: utf-8 -*-
+# test_mailbox_indexer.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for the mailbox_indexer module.
+"""
+import uuid
+from functools import partial
+
+from leap.bitmask.mail import mailbox_indexer as mi
+from leap.bitmask.mail.testing.common import SoledadTestMixin
+
+hash_test0 = '590c9f8430c7435807df8ba9a476e3f1295d46ef210f6efae2043a4c085a569e'
+hash_test1 = '1b4f0e9851971998e732078544c96b36c3d01cedf7caa332359d6f1d83567014'
+hash_test2 = '60303ae22b998861bce3b28f33eec1be758a213c86c93c076dbe9f558c11c752'
+hash_test3 = 'fd61a03af4f77d870fc21e05e7e80678095c92d808cfb3b5c279ee04c74aca13'
+hash_test4 = 'a4e624d686e03ed2767c0abd85c14426b0b1157d2ce81d27bb4fe4f6f01d688a'
+
+
+def fmt_hash(mailbox_uuid, hash):
+ return "M-" + mailbox_uuid.replace('-', '_') + "-" + hash
+
+mbox_id = str(uuid.uuid4())
+
+
+class MailboxIndexerTestCase(SoledadTestMixin):
+ """
+ Tests for the MailboxUID class.
+ """
+ def get_mbox_uid(self):
+ m_uid = mi.MailboxIndexer(self._soledad)
+ return m_uid
+
+ def list_mail_tables_cb(self, ignored):
+ def filter_mailuid_tables(tables):
+ filtered = [
+ table[0] for table in tables if
+ table[0].startswith(mi.MailboxIndexer.table_preffix)]
+ return filtered
+
+ sql = "SELECT name FROM sqlite_master WHERE type='table';"
+ d = self._soledad.raw_sqlcipher_query(sql)
+ d.addCallback(filter_mailuid_tables)
+ return d
+
+ def select_uid_rows(self, mailbox):
+ sql = "SELECT * FROM %s%s;" % (
+ mi.MailboxIndexer.table_preffix, mailbox.replace('-', '_'))
+ d = self._soledad.raw_sqlcipher_query(sql)
+ return d
+
+ def test_create_table(self):
+ def assert_table_created(tables):
+ self.assertEqual(
+ tables, ["leapmail_uid_" + mbox_id.replace('-', '_')])
+
+ m_uid = self.get_mbox_uid()
+ d = m_uid.create_table(mbox_id)
+ d.addCallback(self.list_mail_tables_cb)
+ d.addCallback(assert_table_created)
+ return d
+
+ def test_create_and_delete_table(self):
+ def assert_table_deleted(tables):
+ self.assertEqual(tables, [])
+
+ m_uid = self.get_mbox_uid()
+ d = m_uid.create_table(mbox_id)
+ d.addCallback(lambda _: m_uid.delete_table(mbox_id))
+ d.addCallback(self.list_mail_tables_cb)
+ d.addCallback(assert_table_deleted)
+ return d
+
+ def test_insert_doc(self):
+ m_uid = self.get_mbox_uid()
+
+ h1 = fmt_hash(mbox_id, hash_test0)
+ h2 = fmt_hash(mbox_id, hash_test1)
+ h3 = fmt_hash(mbox_id, hash_test2)
+ h4 = fmt_hash(mbox_id, hash_test3)
+ h5 = fmt_hash(mbox_id, hash_test4)
+
+ def assert_uid_rows(rows):
+ expected = [(1, h1), (2, h2), (3, h3), (4, h4), (5, h5)]
+ self.assertEquals(rows, expected)
+
+ d = m_uid.create_table(mbox_id)
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h1))
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h2))
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h3))
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h4))
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h5))
+ d.addCallback(lambda _: self.select_uid_rows(mbox_id))
+ d.addCallback(assert_uid_rows)
+ return d
+
+ def test_insert_doc_return(self):
+ m_uid = self.get_mbox_uid()
+
+ def assert_rowid(rowid, expected=None):
+ self.assertEqual(rowid, expected)
+
+ h1 = fmt_hash(mbox_id, hash_test0)
+ h2 = fmt_hash(mbox_id, hash_test1)
+ h3 = fmt_hash(mbox_id, hash_test2)
+
+ d = m_uid.create_table(mbox_id)
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h1))
+ d.addCallback(partial(assert_rowid, expected=1))
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h2))
+ d.addCallback(partial(assert_rowid, expected=2))
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h3))
+ d.addCallback(partial(assert_rowid, expected=3))
+ return d
+
+ def test_delete_doc(self):
+ m_uid = self.get_mbox_uid()
+
+ h1 = fmt_hash(mbox_id, hash_test0)
+ h2 = fmt_hash(mbox_id, hash_test1)
+ h3 = fmt_hash(mbox_id, hash_test2)
+ h4 = fmt_hash(mbox_id, hash_test3)
+ h5 = fmt_hash(mbox_id, hash_test4)
+
+ def assert_uid_rows(rows):
+ expected = [(4, h4), (5, h5)]
+ self.assertEquals(rows, expected)
+
+ d = m_uid.create_table(mbox_id)
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h1))
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h2))
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h3))
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h4))
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h5))
+
+ d.addCallbacks(lambda _: m_uid.delete_doc_by_uid(mbox_id, 1))
+ d.addCallbacks(lambda _: m_uid.delete_doc_by_uid(mbox_id, 2))
+ d.addCallbacks(lambda _: m_uid.delete_doc_by_hash(mbox_id, h3))
+
+ d.addCallback(lambda _: self.select_uid_rows(mbox_id))
+ d.addCallback(assert_uid_rows)
+ return d
+
+ def test_get_doc_id_from_uid(self):
+ m_uid = self.get_mbox_uid()
+
+ h1 = fmt_hash(mbox_id, hash_test0)
+
+ def assert_doc_hash(res):
+ self.assertEqual(res, h1)
+
+ d = m_uid.create_table(mbox_id)
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h1))
+ d.addCallback(lambda _: m_uid.get_doc_id_from_uid(mbox_id, 1))
+ d.addCallback(assert_doc_hash)
+ return d
+
+ def test_count(self):
+ m_uid = self.get_mbox_uid()
+
+ h1 = fmt_hash(mbox_id, hash_test0)
+ h2 = fmt_hash(mbox_id, hash_test1)
+ h3 = fmt_hash(mbox_id, hash_test2)
+ h4 = fmt_hash(mbox_id, hash_test3)
+ h5 = fmt_hash(mbox_id, hash_test4)
+
+ d = m_uid.create_table(mbox_id)
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h1))
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h2))
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h3))
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h4))
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h5))
+
+ def assert_count_after_inserts(count):
+ self.assertEquals(count, 5)
+
+ d.addCallback(lambda _: m_uid.count(mbox_id))
+ d.addCallback(assert_count_after_inserts)
+
+ d.addCallbacks(lambda _: m_uid.delete_doc_by_uid(mbox_id, 1))
+ d.addCallbacks(lambda _: m_uid.delete_doc_by_uid(mbox_id, 2))
+
+ def assert_count_after_deletions(count):
+ self.assertEquals(count, 3)
+
+ d.addCallback(lambda _: m_uid.count(mbox_id))
+ d.addCallback(assert_count_after_deletions)
+ return d
+
+ def test_get_next_uid(self):
+ m_uid = self.get_mbox_uid()
+
+ h1 = fmt_hash(mbox_id, hash_test0)
+ h2 = fmt_hash(mbox_id, hash_test1)
+ h3 = fmt_hash(mbox_id, hash_test2)
+ h4 = fmt_hash(mbox_id, hash_test3)
+ h5 = fmt_hash(mbox_id, hash_test4)
+
+ d = m_uid.create_table(mbox_id)
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h1))
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h2))
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h3))
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h4))
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h5))
+
+ def assert_next_uid(result, expected=1):
+ self.assertEquals(result, expected)
+
+ d.addCallback(lambda _: m_uid.get_next_uid(mbox_id))
+ d.addCallback(partial(assert_next_uid, expected=6))
+ return d
+
+ def test_all_uid_iter(self):
+
+ m_uid = self.get_mbox_uid()
+
+ h1 = fmt_hash(mbox_id, hash_test0)
+ h2 = fmt_hash(mbox_id, hash_test1)
+ h3 = fmt_hash(mbox_id, hash_test2)
+ h4 = fmt_hash(mbox_id, hash_test3)
+ h5 = fmt_hash(mbox_id, hash_test4)
+
+ d = m_uid.create_table(mbox_id)
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h1))
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h2))
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h3))
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h4))
+ d.addCallback(lambda _: m_uid.insert_doc(mbox_id, h5))
+ d.addCallback(lambda _: m_uid.delete_doc_by_uid(mbox_id, 1))
+ d.addCallback(lambda _: m_uid.delete_doc_by_uid(mbox_id, 4))
+
+ def assert_all_uid(result, expected=[2, 3, 5]):
+ self.assertEquals(result, expected)
+
+ d.addCallback(lambda _: m_uid.all_uid_iter(mbox_id))
+ d.addCallback(partial(assert_all_uid))
+ return d