From f826bc473a0c50fcf55f4e8609aa07622814f902 Mon Sep 17 00:00:00 2001 From: "Kali Kaneko (leap communications)" Date: Thu, 1 Sep 2016 00:06:52 -0400 Subject: [tests] move tests to root folder --- .../bitmask/mail/adaptors/tests/rfc822.message | 1 - .../bitmask/mail/adaptors/tests/test_models.py | 106 ----- .../mail/adaptors/tests/test_soledad_adaptor.py | 529 --------------------- 3 files changed, 636 deletions(-) delete mode 120000 src/leap/bitmask/mail/adaptors/tests/rfc822.message delete mode 100644 src/leap/bitmask/mail/adaptors/tests/test_models.py delete mode 100644 src/leap/bitmask/mail/adaptors/tests/test_soledad_adaptor.py (limited to 'src/leap/bitmask/mail/adaptors/tests') diff --git a/src/leap/bitmask/mail/adaptors/tests/rfc822.message b/src/leap/bitmask/mail/adaptors/tests/rfc822.message deleted file mode 120000 index b19cc280..00000000 --- a/src/leap/bitmask/mail/adaptors/tests/rfc822.message +++ /dev/null @@ -1 +0,0 @@ -../../tests/rfc822.message \ No newline at end of file diff --git a/src/leap/bitmask/mail/adaptors/tests/test_models.py b/src/leap/bitmask/mail/adaptors/tests/test_models.py deleted file mode 100644 index b82cfad0..00000000 --- a/src/leap/bitmask/mail/adaptors/tests/test_models.py +++ /dev/null @@ -1,106 +0,0 @@ -# -*- coding: utf-8 -*- -# test_models.py -# Copyright (C) 2014 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for the leap.mail.adaptors.models module. -""" -from twisted.trial import unittest - -from leap.mail.adaptors import models - - -class SerializableModelsTestCase(unittest.TestCase): - - def test_good_serialized_model(self): - - class M(models.SerializableModel): - foo = 42 - bar = 33 - baaz_ = None - _nope = 0 - __nope = 0 - - def not_today(self): - pass - - class IgnoreMe(object): - pass - - def killmeplease(x): - return x - - serialized = M.serialize() - expected = {'foo': 42, 'bar': 33, 'baaz': None} - self.assertEqual(serialized, expected) - - -class DocumentWrapperTestCase(unittest.TestCase): - - def test_wrapper_defaults(self): - - class Wrapper(models.DocumentWrapper): - class model(models.SerializableModel): - foo = 42 - bar = 11 - - wrapper = Wrapper() - wrapper._ignored = True - serialized = wrapper.serialize() - expected = {'foo': 42, 'bar': 11} - self.assertEqual(serialized, expected) - - def test_initialized_wrapper(self): - - class Wrapper(models.DocumentWrapper): - class model(models.SerializableModel): - foo = 42 - bar_ = 11 - - wrapper = Wrapper(foo=0, bar=-1) - serialized = wrapper.serialize() - expected = {'foo': 0, 'bar': -1} - self.assertEqual(serialized, expected) - - wrapper.foo = 23 - serialized = wrapper.serialize() - expected = {'foo': 23, 'bar': -1} - self.assertEqual(serialized, expected) - - wrapper = Wrapper(foo=0) - serialized = wrapper.serialize() - expected = {'foo': 0, 'bar': 11} - self.assertEqual(serialized, expected) - - def test_invalid_initialized_wrapper(self): - - class Wrapper(models.DocumentWrapper): - class model(models.SerializableModel): - foo = 42 - - def getwrapper(): - return Wrapper(bar=1) - self.assertRaises(RuntimeError, getwrapper) - - def test_no_model_wrapper(self): - - class Wrapper(models.DocumentWrapper): - pass - - def getwrapper(): - w = Wrapper() - w.foo = None - - self.assertRaises(RuntimeError, getwrapper) diff --git a/src/leap/bitmask/mail/adaptors/tests/test_soledad_adaptor.py b/src/leap/bitmask/mail/adaptors/tests/test_soledad_adaptor.py deleted file mode 100644 index 73eaf164..00000000 --- a/src/leap/bitmask/mail/adaptors/tests/test_soledad_adaptor.py +++ /dev/null @@ -1,529 +0,0 @@ -# -*- coding: utf-8 -*- -# test_soledad_adaptor.py -# Copyright (C) 2014 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for the Soledad Adaptor module - leap.mail.adaptors.soledad -""" -import os -from functools import partial - -from twisted.internet import defer - -from leap.mail.adaptors import models -from leap.mail.adaptors.soledad import SoledadDocumentWrapper -from leap.mail.adaptors.soledad import SoledadIndexMixin -from leap.mail.adaptors.soledad import SoledadMailAdaptor -from leap.mail.testing.common import SoledadTestMixin - -from email.MIMEMultipart import MIMEMultipart -from email.mime.text import MIMEText - -# DEBUG -# import logging -# logging.basicConfig(level=logging.DEBUG) - - -class CounterWrapper(SoledadDocumentWrapper): - class model(models.SerializableModel): - counter = 0 - flag = None - - -class CharacterWrapper(SoledadDocumentWrapper): - class model(models.SerializableModel): - name = "" - age = 20 - - -class ActorWrapper(SoledadDocumentWrapper): - class model(models.SerializableModel): - type_ = "actor" - name = None - - class __meta__(object): - index = "name" - list_index = ("by-type", "type_") - - -class TestAdaptor(SoledadIndexMixin): - indexes = {'by-name': ['name'], - 'by-type-and-name': ['type', 'name'], - 'by-type': ['type']} - - -class SoledadDocWrapperTestCase(SoledadTestMixin): - """ - Tests for the SoledadDocumentWrapper. - """ - def assert_num_docs(self, num, docs): - self.assertEqual(len(docs[1]), num) - - def test_create_single(self): - - store = self._soledad - wrapper = CounterWrapper() - - def assert_one_doc(docs): - self.assertEqual(docs[0], 1) - - d = wrapper.create(store) - d.addCallback(lambda _: store.get_all_docs()) - d.addCallback(assert_one_doc) - return d - - def test_create_many(self): - - store = self._soledad - w1 = CounterWrapper() - w2 = CounterWrapper(counter=1) - w3 = CounterWrapper(counter=2) - w4 = CounterWrapper(counter=3) - w5 = CounterWrapper(counter=4) - - d1 = [w1.create(store), - w2.create(store), - w3.create(store), - w4.create(store), - w5.create(store)] - - d = defer.gatherResults(d1) - d.addCallback(lambda _: store.get_all_docs()) - d.addCallback(partial(self.assert_num_docs, 5)) - return d - - def test_multiple_updates(self): - - store = self._soledad - wrapper = CounterWrapper(counter=1) - MAX = 100 - - def assert_doc_id(doc): - self.assertTrue(wrapper._doc_id is not None) - return doc - - def assert_counter_initial_ok(doc): - self.assertEqual(wrapper.counter, 1) - - def increment_counter(ignored): - d1 = [] - - def record_revision(revision): - rev = int(revision.split(':')[1]) - self.results.append(rev) - - for i in list(range(MAX)): - wrapper.counter += 1 - wrapper.flag = i % 2 == 0 - d = wrapper.update(store) - d.addCallback(record_revision) - d1.append(d) - - return defer.gatherResults(d1) - - def assert_counter_final_ok(doc): - self.assertEqual(doc.content['counter'], MAX + 1) - self.assertEqual(doc.content['flag'], False) - - def assert_results_ordered_list(ignored): - self.assertEqual(self.results, sorted(range(2, MAX + 2))) - - d = wrapper.create(store) - d.addCallback(assert_doc_id) - d.addCallback(assert_counter_initial_ok) - d.addCallback(increment_counter) - d.addCallback(lambda _: store.get_doc(wrapper._doc_id)) - d.addCallback(assert_counter_final_ok) - d.addCallback(assert_results_ordered_list) - return d - - def test_delete(self): - adaptor = TestAdaptor() - store = self._soledad - - wrapper_list = [] - - def get_or_create_bob(ignored): - def add_to_list(wrapper): - wrapper_list.append(wrapper) - return wrapper - wrapper = CharacterWrapper.get_or_create( - store, 'by-name', 'bob') - wrapper.addCallback(add_to_list) - return wrapper - - def delete_bob(ignored): - wrapper = wrapper_list[0] - return wrapper.delete(store) - - d = adaptor.initialize_store(store) - d.addCallback(lambda _: store.get_all_docs()) - d.addCallback(partial(self.assert_num_docs, 0)) - - # this should create bob document - d.addCallback(get_or_create_bob) - d.addCallback(lambda _: store.get_all_docs()) - d.addCallback(partial(self.assert_num_docs, 1)) - - d.addCallback(delete_bob) - d.addCallback(lambda _: store.get_all_docs()) - d.addCallback(partial(self.assert_num_docs, 0)) - return d - - def test_get_or_create(self): - adaptor = TestAdaptor() - store = self._soledad - - def get_or_create_bob(ignored): - wrapper = CharacterWrapper.get_or_create( - store, 'by-name', 'bob') - return wrapper - - d = adaptor.initialize_store(store) - d.addCallback(lambda _: store.get_all_docs()) - d.addCallback(partial(self.assert_num_docs, 0)) - - # this should create bob document - d.addCallback(get_or_create_bob) - d.addCallback(lambda _: store.get_all_docs()) - d.addCallback(partial(self.assert_num_docs, 1)) - - # this should get us bob document - d.addCallback(get_or_create_bob) - d.addCallback(lambda _: store.get_all_docs()) - d.addCallback(partial(self.assert_num_docs, 1)) - return d - - def test_get_or_create_multi_index(self): - adaptor = TestAdaptor() - store = self._soledad - - def get_or_create_actor_harry(ignored): - wrapper = ActorWrapper.get_or_create( - store, 'by-type-and-name', 'harrison') - return wrapper - - def create_director_harry(ignored): - wrapper = ActorWrapper(name="harrison", type="director") - return wrapper.create(store) - - d = adaptor.initialize_store(store) - d.addCallback(lambda _: store.get_all_docs()) - d.addCallback(partial(self.assert_num_docs, 0)) - - # this should create harrison document - d.addCallback(get_or_create_actor_harry) - d.addCallback(lambda _: store.get_all_docs()) - d.addCallback(partial(self.assert_num_docs, 1)) - - # this should get us harrison document - d.addCallback(get_or_create_actor_harry) - d.addCallback(lambda _: store.get_all_docs()) - d.addCallback(partial(self.assert_num_docs, 1)) - - # create director harry, should create new doc - d.addCallback(create_director_harry) - d.addCallback(lambda _: store.get_all_docs()) - d.addCallback(partial(self.assert_num_docs, 2)) - - # this should get us harrison document, still 2 docs - d.addCallback(get_or_create_actor_harry) - d.addCallback(lambda _: store.get_all_docs()) - d.addCallback(partial(self.assert_num_docs, 2)) - return d - - def test_get_all(self): - adaptor = TestAdaptor() - store = self._soledad - actor_names = ["harry", "carrie", "mark", "david"] - - def create_some_actors(ignored): - deferreds = [] - for name in actor_names: - dw = ActorWrapper.get_or_create( - store, 'by-type-and-name', name) - deferreds.append(dw) - return defer.gatherResults(deferreds) - - d = adaptor.initialize_store(store) - d.addCallback(lambda _: store.get_all_docs()) - d.addCallback(partial(self.assert_num_docs, 0)) - - d.addCallback(create_some_actors) - - d.addCallback(lambda _: store.get_all_docs()) - d.addCallback(partial(self.assert_num_docs, 4)) - - def assert_actor_list_is_expected(res): - got = set([actor.name for actor in res]) - expected = set(actor_names) - self.assertEqual(got, expected) - - d.addCallback(lambda _: ActorWrapper.get_all(store)) - d.addCallback(assert_actor_list_is_expected) - return d - -HERE = os.path.split(os.path.abspath(__file__))[0] - - -class MessageClass(object): - def __init__(self, wrapper, uid): - self.wrapper = wrapper - self.uid = uid - - def get_wrapper(self): - return self.wrapper - - -class SoledadMailAdaptorTestCase(SoledadTestMixin): - """ - Tests for the SoledadMailAdaptor. - """ - - def get_adaptor(self): - adaptor = SoledadMailAdaptor() - adaptor.store = self._soledad - return adaptor - - def assert_num_docs(self, num, docs): - self.assertEqual(len(docs[1]), num) - - def test_mail_adaptor_init(self): - adaptor = self.get_adaptor() - self.assertTrue(isinstance(adaptor.indexes, dict)) - self.assertTrue(len(adaptor.indexes) != 0) - - # Messages - - def test_get_msg_from_string(self): - adaptor = self.get_adaptor() - - with open(os.path.join(HERE, "rfc822.message")) as f: - raw = f.read() - - msg = adaptor.get_msg_from_string(MessageClass, raw) - - chash = ("D27B2771C0DCCDCB468EE65A4540438" - "09DBD11588E87E951545BE0CBC321C308") - phash = ("64934534C1C80E0D4FA04BE1CCBA104" - "F07BCA5F469C86E2C0ABE1D41310B7299") - subject = ("[Twisted-commits] rebuild now works on " - "python versions from 2.2.0 and up.") - self.assertTrue(msg.wrapper.fdoc is not None) - self.assertTrue(msg.wrapper.hdoc is not None) - self.assertTrue(msg.wrapper.cdocs is not None) - self.assertEquals(len(msg.wrapper.cdocs), 1) - self.assertEquals(msg.wrapper.fdoc.chash, chash) - self.assertEquals(msg.wrapper.fdoc.size, 3837) - self.assertEquals(msg.wrapper.hdoc.chash, chash) - self.assertEqual(dict(msg.wrapper.hdoc.headers)['Subject'], - subject) - self.assertEqual(msg.wrapper.hdoc.subject, subject) - self.assertEqual(msg.wrapper.cdocs[1].phash, phash) - - def test_get_msg_from_string_multipart(self): - msg = MIMEMultipart() - msg['Subject'] = 'Test multipart mail' - msg.attach(MIMEText(u'a utf8 message', _charset='utf-8')) - adaptor = self.get_adaptor() - - msg = adaptor.get_msg_from_string(MessageClass, msg.as_string()) - - self.assertEqual( - 'base64', msg.wrapper.cdocs[1].content_transfer_encoding) - self.assertEqual( - 'text/plain', msg.wrapper.cdocs[1].content_type) - self.assertEqual( - 'YSB1dGY4IG1lc3NhZ2U=\n', msg.wrapper.cdocs[1].raw) - - def test_get_msg_from_docs(self): - adaptor = self.get_adaptor() - mdoc = dict( - fdoc="F-Foobox-deadbeef", - hdoc="H-deadbeef", - cdocs=["C-deadabad"]) - fdoc = dict( - mbox_uuid="Foobox", - flags=('\Seen', '\Nice'), - tags=('Personal', 'TODO'), - seen=False, deleted=False, - recent=False, multi=False) - hdoc = dict( - chash="deadbeef", - subject="Test Msg") - cdocs = { - 1: dict( - raw='This is a test message')} - - msg = adaptor.get_msg_from_docs( - MessageClass, mdoc, fdoc, hdoc, cdocs=cdocs) - self.assertEqual(msg.wrapper.fdoc.flags, - ('\Seen', '\Nice')) - self.assertEqual(msg.wrapper.fdoc.tags, - ('Personal', 'TODO')) - self.assertEqual(msg.wrapper.fdoc.mbox_uuid, "Foobox") - self.assertEqual(msg.wrapper.hdoc.multi, False) - self.assertEqual(msg.wrapper.hdoc.subject, - "Test Msg") - self.assertEqual(msg.wrapper.cdocs[1].raw, - "This is a test message") - - def test_get_msg_from_metamsg_doc_id(self): - # TODO complete-me! - pass - - test_get_msg_from_metamsg_doc_id.skip = "Not yet implemented" - - def test_create_msg(self): - adaptor = self.get_adaptor() - - with open(os.path.join(HERE, "rfc822.message")) as f: - raw = f.read() - msg = adaptor.get_msg_from_string(MessageClass, raw) - - def check_create_result(created): - # that's one mdoc, one hdoc, one fdoc, one cdoc - self.assertEqual(len(created), 4) - for doc in created: - self.assertTrue( - doc.__class__.__name__, - "SoledadDocument") - - d = adaptor.create_msg(adaptor.store, msg) - d.addCallback(check_create_result) - return d - - def test_update_msg(self): - adaptor = self.get_adaptor() - with open(os.path.join(HERE, "rfc822.message")) as f: - raw = f.read() - - def assert_msg_has_doc_id(ignored, msg): - wrapper = msg.get_wrapper() - self.assertTrue(wrapper.fdoc.doc_id is not None) - - def assert_msg_has_no_flags(ignored, msg): - wrapper = msg.get_wrapper() - self.assertEqual(wrapper.fdoc.flags, []) - - def update_msg_flags(ignored, msg): - wrapper = msg.get_wrapper() - wrapper.fdoc.flags = ["This", "That"] - return wrapper.update(adaptor.store) - - def assert_msg_has_flags(ignored, msg): - wrapper = msg.get_wrapper() - self.assertEqual(wrapper.fdoc.flags, ["This", "That"]) - - def get_fdoc_and_check_flags(ignored): - def assert_doc_has_flags(doc): - self.assertEqual(doc.content['flags'], - ['This', 'That']) - wrapper = msg.get_wrapper() - d = adaptor.store.get_doc(wrapper.fdoc.doc_id) - d.addCallback(assert_doc_has_flags) - return d - - msg = adaptor.get_msg_from_string(MessageClass, raw) - d = adaptor.create_msg(adaptor.store, msg) - d.addCallback(lambda _: adaptor.store.get_all_docs()) - d.addCallback(partial(self.assert_num_docs, 4)) - d.addCallback(assert_msg_has_doc_id, msg) - d.addCallback(assert_msg_has_no_flags, msg) - - # update it! - d.addCallback(update_msg_flags, msg) - d.addCallback(assert_msg_has_flags, msg) - d.addCallback(get_fdoc_and_check_flags) - return d - - # Mailboxes - - def test_get_or_create_mbox(self): - adaptor = self.get_adaptor() - - def get_or_create_mbox(ignored): - d = adaptor.get_or_create_mbox(adaptor.store, "Trash") - return d - - def assert_good_doc(mbox_wrapper): - self.assertTrue(mbox_wrapper.doc_id is not None) - self.assertEqual(mbox_wrapper.mbox, "Trash") - self.assertEqual(mbox_wrapper.type, "mbox") - self.assertEqual(mbox_wrapper.closed, False) - self.assertEqual(mbox_wrapper.subscribed, False) - - d = adaptor.initialize_store(adaptor.store) - d.addCallback(get_or_create_mbox) - d.addCallback(assert_good_doc) - d.addCallback(lambda _: adaptor.store.get_all_docs()) - d.addCallback(partial(self.assert_num_docs, 1)) - return d - - def test_update_mbox(self): - adaptor = self.get_adaptor() - - wrapper_ref = [] - - def get_or_create_mbox(ignored): - d = adaptor.get_or_create_mbox(adaptor.store, "Trash") - return d - - def update_wrapper(wrapper, wrapper_ref): - wrapper_ref.append(wrapper) - wrapper.subscribed = True - wrapper.closed = True - d = adaptor.update_mbox(adaptor.store, wrapper) - return d - - def get_mbox_doc_and_check_flags(res, wrapper_ref): - wrapper = wrapper_ref[0] - - def assert_doc_has_flags(doc): - self.assertEqual(doc.content['subscribed'], True) - self.assertEqual(doc.content['closed'], True) - d = adaptor.store.get_doc(wrapper.doc_id) - d.addCallback(assert_doc_has_flags) - return d - - d = adaptor.initialize_store(adaptor.store) - d.addCallback(get_or_create_mbox) - d.addCallback(update_wrapper, wrapper_ref) - d.addCallback(get_mbox_doc_and_check_flags, wrapper_ref) - return d - - def test_get_all_mboxes(self): - adaptor = self.get_adaptor() - mboxes = ("Sent", "Trash", "Personal", "ListFoo") - - def get_or_create_mboxes(ignored): - d = [] - for mbox in mboxes: - d.append(adaptor.get_or_create_mbox( - adaptor.store, mbox)) - return defer.gatherResults(d) - - def get_all_mboxes(ignored): - return adaptor.get_all_mboxes(adaptor.store) - - def assert_mboxes_match_expected(wrappers): - names = [m.mbox for m in wrappers] - self.assertEqual(set(names), set(mboxes)) - - d = adaptor.initialize_store(adaptor.store) - d.addCallback(get_or_create_mboxes) - d.addCallback(get_all_mboxes) - d.addCallback(assert_mboxes_match_expected) - return d -- cgit v1.2.3