summaryrefslogtreecommitdiff
path: root/service/test/unit/adapter
diff options
context:
space:
mode:
Diffstat (limited to 'service/test/unit/adapter')
-rw-r--r--service/test/unit/adapter/mailstore/__init__.py15
-rw-r--r--service/test/unit/adapter/mailstore/maintenance/__init__.py15
-rw-r--r--service/test/unit/adapter/mailstore/maintenance/test_soledad_maintenance.py109
-rw-r--r--service/test/unit/adapter/mailstore/test_body_parser.py57
-rw-r--r--service/test/unit/adapter/mailstore/test_leap_mail.py230
-rw-r--r--service/test/unit/adapter/mailstore/test_leap_mailstore.py474
-rw-r--r--service/test/unit/adapter/mailstore/test_searchable_mailstore.py112
-rw-r--r--service/test/unit/adapter/search/test_index_storage_key.py52
-rw-r--r--service/test/unit/adapter/search/test_search.py3
-rw-r--r--service/test/unit/adapter/test_draft_service.py15
-rw-r--r--service/test/unit/adapter/test_mail.py397
-rw-r--r--service/test/unit/adapter/test_mail_service.py138
-rw-r--r--service/test/unit/adapter/test_mailbox.py42
-rw-r--r--service/test/unit/adapter/test_mailbox_indexer_listener.py36
-rw-r--r--service/test/unit/adapter/test_mailboxes.py41
-rw-r--r--service/test/unit/adapter/test_soledad_querier.py150
16 files changed, 1224 insertions, 662 deletions
diff --git a/service/test/unit/adapter/mailstore/__init__.py b/service/test/unit/adapter/mailstore/__init__.py
new file mode 100644
index 00000000..c5c30cde
--- /dev/null
+++ b/service/test/unit/adapter/mailstore/__init__.py
@@ -0,0 +1,15 @@
+#
+# Copyright (c) 2015 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
diff --git a/service/test/unit/adapter/mailstore/maintenance/__init__.py b/service/test/unit/adapter/mailstore/maintenance/__init__.py
new file mode 100644
index 00000000..c5c30cde
--- /dev/null
+++ b/service/test/unit/adapter/mailstore/maintenance/__init__.py
@@ -0,0 +1,15 @@
+#
+# Copyright (c) 2015 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
diff --git a/service/test/unit/adapter/mailstore/maintenance/test_soledad_maintenance.py b/service/test/unit/adapter/mailstore/maintenance/test_soledad_maintenance.py
new file mode 100644
index 00000000..9a89d62b
--- /dev/null
+++ b/service/test/unit/adapter/mailstore/maintenance/test_soledad_maintenance.py
@@ -0,0 +1,109 @@
+#
+# Copyright (c) 2015 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+from leap.soledad.common.document import SoledadDocument
+from twisted.internet import defer
+
+from twisted.trial import unittest
+from mockito import mock, when, verify, never
+from pixelated.adapter.mailstore.maintenance import SoledadMaintenance
+from leap.keymanager.openpgp import OpenPGPKey
+
+SOME_EMAIL_ADDRESS = 'foo@example.tld'
+SOME_KEY_ID = '4914254E384E264C'
+
+
+class TestSoledadMaintenance(unittest.TestCase):
+
+ def test_repair_is_deferred(self):
+ soledad = mock()
+ when(soledad).get_all_docs().thenReturn(defer.succeed((1, [])))
+
+ d = SoledadMaintenance(soledad).repair()
+
+ self.assertIsInstance(d, defer.Deferred)
+
+ @defer.inlineCallbacks
+ def test_repair_delete_public_key_active_docs(self):
+ soledad = mock()
+ key = self._public_key(SOME_EMAIL_ADDRESS, SOME_KEY_ID)
+ active_doc = SoledadDocument(doc_id='some_doc', json=key.get_active_json(SOME_EMAIL_ADDRESS))
+ when(soledad).get_all_docs().thenReturn(defer.succeed((1, [active_doc])))
+
+ yield SoledadMaintenance(soledad).repair()
+
+ verify(soledad).delete_doc(active_doc)
+
+ @defer.inlineCallbacks
+ def test_repair_delete_public_key_docs(self):
+ soledad = mock()
+ key = self._public_key(SOME_EMAIL_ADDRESS, SOME_KEY_ID)
+ active_doc = SoledadDocument(doc_id='some_doc', json=key.get_active_json(SOME_EMAIL_ADDRESS))
+ key_doc = SoledadDocument(doc_id='some_doc', json=key.get_json())
+ when(soledad).get_all_docs().thenReturn(defer.succeed((1, [key_doc, active_doc])))
+
+ yield SoledadMaintenance(soledad).repair()
+
+ verify(soledad).delete_doc(active_doc)
+ verify(soledad).delete_doc(key_doc)
+
+ @defer.inlineCallbacks
+ def test_repair_keeps_active_and_key_doc_if_private_key_exists(self):
+ soledad = mock()
+ key = self._public_key(SOME_EMAIL_ADDRESS, SOME_KEY_ID)
+ private_key = self._private_key(SOME_EMAIL_ADDRESS, SOME_KEY_ID)
+ active_doc = SoledadDocument(doc_id='some_doc', json=key.get_active_json(SOME_EMAIL_ADDRESS))
+ key_doc = SoledadDocument(doc_id='some_doc', json=key.get_json())
+ private_key_doc = SoledadDocument(doc_id='some_doc', json=private_key.get_json())
+ when(soledad).get_all_docs().thenReturn(defer.succeed((1, [key_doc, active_doc, private_key_doc])))
+
+ yield SoledadMaintenance(soledad).repair()
+
+ verify(soledad, never).delete_doc(key_doc)
+ verify(soledad, never).delete_doc(active_doc)
+ verify(soledad, never).delete_doc(private_key_doc)
+
+ @defer.inlineCallbacks
+ def test_repair_only_deletes_key_docs(self):
+ soledad = mock()
+ key = self._public_key(SOME_EMAIL_ADDRESS, SOME_KEY_ID)
+ key_doc = SoledadDocument(doc_id='some_doc', json=key.get_active_json(SOME_EMAIL_ADDRESS))
+ other_doc = SoledadDocument(doc_id='something', json='{}')
+ when(soledad).get_all_docs().thenReturn(defer.succeed((1, [key_doc, other_doc])))
+
+ yield SoledadMaintenance(soledad).repair()
+
+ verify(soledad, never).delete_doc(other_doc)
+
+ @defer.inlineCallbacks
+ def test_repair_recreates_public_key_active_doc_if_necessary(self):
+ soledad = mock()
+
+ private_key = self._private_key(SOME_EMAIL_ADDRESS, SOME_KEY_ID)
+ private_key_doc = SoledadDocument(doc_id='some_doc', json=private_key.get_json())
+ when(soledad).get_all_docs().thenReturn(defer.succeed((1, [private_key_doc])))
+
+ yield SoledadMaintenance(soledad).repair()
+
+ verify(soledad).create_doc_from_json('{"key_id": "4914254E384E264C", "tags": ["keymanager-active"], "type": "OpenPGPKey-active", "private": false, "address": "foo@example.tld"}')
+
+ def _public_key(self, address, keyid):
+ return self._gpgkey(address, keyid, private=False)
+
+ def _private_key(self, address, keyid):
+ return self._gpgkey(address, keyid, private=True)
+
+ def _gpgkey(self, address, keyid, private=False):
+ return OpenPGPKey(address, key_id=keyid, private=private)
diff --git a/service/test/unit/adapter/mailstore/test_body_parser.py b/service/test/unit/adapter/mailstore/test_body_parser.py
new file mode 100644
index 00000000..9d58637c
--- /dev/null
+++ b/service/test/unit/adapter/mailstore/test_body_parser.py
@@ -0,0 +1,57 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2015 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+import unittest
+from mock import patch
+from pixelated.adapter.mailstore.body_parser import BodyParser
+
+
+class BodyParserTest(unittest.TestCase):
+
+ def test_simple_text(self):
+ parser = BodyParser('simple text')
+
+ self.assertEqual('simple text', parser.parsed_content())
+
+ def test_base64_text(self):
+ parser = BodyParser('dGVzdCB0ZXh0\n', content_type='text/plain; charset="utf-8"', content_transfer_encoding='base64')
+
+ self.assertEqual('test text', parser.parsed_content())
+
+ def test_8bit_transfer_encoding_with_iso_8859_1_str_input(self):
+ data = 'Hmm, here are \xdcml\xe4\xfcts again!'
+ parser = BodyParser(data, content_type='text/plain; charset=iso-8859-1', content_transfer_encoding='8bit')
+
+ self.assertEqual(u'Hmm, here are Ümläüts again!', parser.parsed_content())
+
+ def test_8bit_transfer_encoding_with_iso_8859_1_unicode_input(self):
+ data = u'Hmm, here are \xdcml\xe4\xfcts again!'
+ parser = BodyParser(data, content_type='text/plain; charset=iso-8859-1', content_transfer_encoding='8bit')
+
+ self.assertEqual(u'Hmm, here are Ümläüts again!', parser.parsed_content())
+
+ def test_base64_with_default_us_ascii_encoding(self):
+ parser = BodyParser('dGVzdCB0ZXh0\n', content_type='text/plain', content_transfer_encoding='base64')
+
+ self.assertEqual('test text', parser.parsed_content())
+
+ @patch('pixelated.adapter.mailstore.body_parser.logger')
+ def test_body_parser_logs_problems_and_then_ignores_invalid_chars(self, logger_mock):
+ data = u'unkown char: \ufffd'
+ parser = BodyParser(data, content_type='text/plain; charset=iso-8859-1', content_transfer_encoding='8bit')
+
+ self.assertEqual(u'unkown char: ', parser.parsed_content())
+ logger_mock.warn.assert_called_with(u'Failed to encode content for charset iso-8859-1. Ignoring invalid chars: \'latin-1\' codec can\'t encode character u\'\\ufffd\' in position 13: ordinal not in range(256)')
diff --git a/service/test/unit/adapter/mailstore/test_leap_mail.py b/service/test/unit/adapter/mailstore/test_leap_mail.py
new file mode 100644
index 00000000..ef585654
--- /dev/null
+++ b/service/test/unit/adapter/mailstore/test_leap_mail.py
@@ -0,0 +1,230 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2015 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+from mock import patch
+from twisted.trial.unittest import TestCase
+
+from pixelated.adapter.mailstore.leap_mailstore import LeapMail, AttachmentInfo
+
+
+class TestLeapMail(TestCase):
+ def test_leap_mail(self):
+ mail = LeapMail('', 'INBOX', {'From': 'test@example.test', 'Subject': 'A test Mail', 'To': 'receiver@example.test'})
+
+ self.assertEqual('test@example.test', mail.from_sender)
+ self.assertEqual(['receiver@example.test'], mail.to)
+ self.assertEqual('A test Mail', mail.subject)
+
+ def test_email_addresses_in_to_are_split_into_a_list(self):
+ mail = LeapMail('', 'INBOX', {'To': 'first@example.test,second@example.test'})
+
+ self.assertEqual(['first@example.test', 'second@example.test'], mail.headers['To'])
+
+ def test_email_addresses_in_cc_are_split_into_a_list(self):
+ mail = LeapMail('', 'INBOX', {'Cc': 'first@example.test,second@example.test'})
+
+ self.assertEqual(['first@example.test', 'second@example.test'], mail.headers['Cc'])
+
+ def test_email_addresses_in_bcc_are_split_into_a_list(self):
+ mail = LeapMail('', 'INBOX', {'Bcc': 'first@example.test,second@example.test'})
+
+ self.assertEqual(['first@example.test', 'second@example.test'], mail.headers['Bcc'])
+
+ def test_email_addresses_might_be_empty_array(self):
+ mail = LeapMail('', 'INBOX', {'Cc': None})
+
+ self.assertEqual([], mail.headers['Cc'])
+
+ def test_as_dict(self):
+ mail = LeapMail('doc id', 'INBOX', {'From': 'test@example.test', 'Subject': 'A test Mail', 'To': 'receiver@example.test,receiver2@other.test'}, ('foo', 'bar'))
+ self.maxDiff = None
+ expected = {
+ 'header': {
+ 'from': 'test@example.test',
+ 'subject': 'A test Mail',
+ 'to': ['receiver@example.test', 'receiver2@other.test'],
+
+ },
+ 'ident': 'doc id',
+ 'mailbox': 'inbox',
+ 'tags': {'foo', 'bar'},
+ 'status': [],
+ 'body': None,
+ 'textPlainBody': None,
+ 'security_casing': {
+ 'imprints': [{'state': 'no_signature_information'}],
+ 'locks': []
+ },
+ 'replying': {'all': {'cc-field': [],
+ 'to-field': ['receiver@example.test',
+ 'test@example.test',
+ 'receiver2@other.test']},
+ 'single': 'test@example.test'},
+ 'attachments': []
+ }
+
+ self.assertEqual(expected, mail.as_dict())
+
+ def test_as_dict_with_body(self):
+ body = 'some body content'
+ mail = LeapMail('doc id', 'INBOX', {'From': 'test@example.test', 'Subject': 'A test Mail', 'To': 'receiver@example.test'}, ('foo', 'bar'), body=body)
+
+ self.assertEqual(body, mail.as_dict()['body'])
+
+ def test_as_dict_with_attachments(self):
+ mail = LeapMail('doc id', 'INBOX', attachments=[AttachmentInfo('id', 'name', 'encoding')])
+
+ self.assertEqual([{'ident': 'id', 'name': 'name', 'encoding': 'encoding'}],
+ mail.as_dict()['attachments'])
+
+ def test_as_dict_headers_with_special_chars(self):
+ expected_address = u'"\xc4lbert \xdcbr\xf6" <\xe4\xfc\xf6@example.mail>'
+ expected_subject = u'H\xe4ll\xf6 W\xf6rld'
+ mail = LeapMail('', 'INBOX',
+ {'From': '=?iso-8859-1?q?=22=C4lbert_=DCbr=F6=22_=3C=E4=FC=F6=40example=2Email=3E?=',
+ 'To': '=?iso-8859-1?q?=22=C4lbert_=DCbr=F6=22_=3C=E4=FC=F6=40example=2Email=3E?=',
+ 'Cc': '=?iso-8859-1?q?=22=C4lbert_=DCbr=F6=22_=3C=E4=FC=F6=40example=2Email=3E?=',
+ 'Subject': '=?iso-8859-1?q?H=E4ll=F6_W=F6rld?='})
+
+ self.assertEqual(expected_address, mail.as_dict()['header']['from'])
+ self.assertEqual([expected_address], mail.as_dict()['header']['to'])
+ self.assertEqual([expected_address], mail.as_dict()['header']['cc'])
+ self.assertEqual(expected_subject, mail.as_dict()['header']['subject'])
+
+ def test_as_dict_replying_with_special_chars(self):
+ expected_address = u'"\xc4lbert \xdcbr\xf6" <\xe4\xfc\xf6@example.mail>'
+ mail = LeapMail('', 'INBOX',
+ {'From': '=?iso-8859-1?q?=22=C4lbert_=DCbr=F6=22_=3C=E4=FC=F6=40example=2Email=3E?=',
+ 'To': '=?iso-8859-1?q?=22=C4lbert_=DCbr=F6=22_=3C=E4=FC=F6=40example=2Email=3E?=',
+ 'Cc': '=?iso-8859-1?q?=22=C4lbert_=DCbr=F6=22_=3C=E4=FC=F6=40example=2Email=3E?=',
+ 'Subject': '=?iso-8859-1?q?H=E4ll=F6_W=F6rld?='})
+
+ self.assertEqual([expected_address], mail.as_dict()['replying']['all']['to-field'])
+ self.assertEqual([expected_address], mail.as_dict()['replying']['all']['cc-field'])
+ self.assertEqual(expected_address, mail.as_dict()['replying']['single'])
+
+ def test_reply_all_result_does_not_contain_own_address_in_to_with_spaces(self):
+ my_address = 'myaddress@example.test'
+
+ with patch('pixelated.adapter.mailstore.leap_mailstore.InputMail.FROM_EMAIL_ADDRESS', my_address):
+ mail = LeapMail('', 'INBOX',
+ {'From': 'test@example.test',
+ 'To': 'receiver@example.test, %s ' % my_address})
+
+ self.assertEqual(['receiver@example.test', 'test@example.test'], mail.as_dict()['replying']['all']['to-field'])
+
+ def test_reply_all_result_does_not_contain_own_address_in_to_with_name(self):
+ my_address = 'myaddress@example.test'
+
+ with patch('pixelated.adapter.mailstore.leap_mailstore.InputMail.FROM_EMAIL_ADDRESS', my_address):
+ mail = LeapMail('', 'INBOX',
+ {'From': 'test@example.test',
+ 'To': 'receiver@example.test, Folker Bernitt <%s>' % my_address})
+
+ self.assertEqual(['receiver@example.test', 'test@example.test'], mail.as_dict()['replying']['all']['to-field'])
+
+ def test_reply_all_result_does_not_contain_own_address_in_to_with_encoded(self):
+ my_address = 'myaddress@example.test'
+
+ with patch('pixelated.adapter.mailstore.leap_mailstore.InputMail.FROM_EMAIL_ADDRESS', my_address):
+ mail = LeapMail('', 'INBOX',
+ {'From': 'test@example.test',
+ 'To': 'receiver@example.test, =?iso-8859-1?q?=C4lbert_=3Cmyaddress=40example=2Etest=3E?='})
+
+ self.assertEqual(['receiver@example.test', 'test@example.test'], mail.as_dict()['replying']['all']['to-field'])
+
+ def test_reply_all_result_does_not_contain_own_address_in_cc(self):
+ my_address = 'myaddress@example.test'
+
+ with patch('pixelated.adapter.mailstore.leap_mailstore.InputMail.FROM_EMAIL_ADDRESS', my_address):
+ mail = LeapMail('', 'INBOX',
+ {'From': 'test@example.test',
+ 'To': 'receiver@example.test',
+ 'Cc': my_address})
+
+ self.assertEqual([], mail.as_dict()['replying']['all']['cc-field'])
+
+ def test_reply_all_result_does_not_contain_own_address_if_sender(self):
+ my_address = 'myaddress@example.test'
+
+ with patch('pixelated.adapter.mailstore.leap_mailstore.InputMail.FROM_EMAIL_ADDRESS', my_address):
+ mail = LeapMail('', 'INBOX',
+ {'From': 'myaddress@example.test',
+ 'To': 'receiver@example.test'})
+
+ self.assertEqual(['receiver@example.test'], mail.as_dict()['replying']['all']['to-field'])
+
+ def test_reply_all_result_does_contain_own_address_if_only_recipient(self):
+ my_address = 'myaddress@example.test'
+
+ with patch('pixelated.adapter.mailstore.leap_mailstore.InputMail.FROM_EMAIL_ADDRESS', my_address):
+ mail = LeapMail('', 'INBOX',
+ {'From': 'myaddress@example.test',
+ 'To': 'myaddress@example.test'})
+
+ self.assertEqual(['myaddress@example.test'], mail.as_dict()['replying']['all']['to-field'])
+
+ def test_reply_result_swaps_sender_and_recipient_if_i_am_the_sender(self):
+ my_address = 'myaddress@example.test'
+
+ with patch('pixelated.adapter.mailstore.leap_mailstore.InputMail.FROM_EMAIL_ADDRESS', my_address):
+ mail = LeapMail('', 'INBOX',
+ {'From': 'myaddress@example.test',
+ 'To': 'recipient@example.test'})
+
+ self.assertEqual('recipient@example.test', mail.as_dict()['replying']['single'])
+
+ def test_as_dict_with_mixed_encodings(self):
+ subject = 'Another test with =?iso-8859-1?B?3G1s5Px0?= =?iso-8859-1?Q?s?='
+ mail = LeapMail('', 'INBOX',
+ {'Subject': subject})
+
+ self.assertEqual(u'Another test with Ümläüts', mail.as_dict()['header']['subject'])
+
+ def test_raw_constructed_by_headers_and_body(self):
+ body = 'some body content'
+ mail = LeapMail('doc id', 'INBOX', {'From': 'test@example.test', 'Subject': 'A test Mail', 'To': 'receiver@example.test'}, ('foo', 'bar'), body=body)
+
+ result = mail.raw
+
+ expected_raw = 'To: receiver@example.test\nFrom: test@example.test\nSubject: A test Mail\n\nsome body content'
+ self.assertEqual(expected_raw, result)
+
+ def test_headers_none_recipients_are_converted_to_empty_array(self):
+ mail = LeapMail('id', 'INBOX', {'To': None, 'Cc': None, 'Bcc': None})
+
+ self.assertEquals([], mail.headers['To'])
+ self.assertEquals([], mail.headers['Cc'])
+ self.assertEquals([], mail.headers['Bcc'])
+
+ def test_security_casing(self):
+ # No Encryption, no Signature
+ mail = LeapMail('id', 'INBOX', {})
+ self.assertEqual({'locks': [], 'imprints': [{'state': 'no_signature_information'}]}, mail.security_casing)
+
+ # Encryption
+ mail = LeapMail('id', 'INBOX', {'X-Leap-Encryption': 'decrypted'})
+ self.assertEqual([{'state': 'valid'}], mail.security_casing['locks'])
+
+ mail = LeapMail('id', 'INBOX', {'X-Leap-Encryption': 'false'})
+ self.assertEqual([], mail.security_casing['locks'])
+
+ # Signature
+ mail = LeapMail('id', 'INBOX', {'X-Leap-Signature': 'valid'})
+ self.assertEqual([{'seal': {'validity': 'valid'}, 'state': 'valid'}], mail.security_casing['imprints'])
+
+ mail = LeapMail('id', 'INBOX', {'X-Leap-Signature': 'invalid'})
+ self.assertEqual([], mail.security_casing['imprints'])
diff --git a/service/test/unit/adapter/mailstore/test_leap_mailstore.py b/service/test/unit/adapter/mailstore/test_leap_mailstore.py
new file mode 100644
index 00000000..4eabc144
--- /dev/null
+++ b/service/test/unit/adapter/mailstore/test_leap_mailstore.py
@@ -0,0 +1,474 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2015 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+import base64
+from email.header import Header
+from email.mime.application import MIMEApplication
+from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
+import json
+import quopri
+from uuid import uuid4
+from email.parser import Parser
+import os
+from leap.soledad.common.document import SoledadDocument
+from leap.mail.adaptors.soledad_indexes import MAIL_INDEXES
+from twisted.internet.defer import FirstError
+from twisted.trial.unittest import TestCase
+from leap.mail import constants
+from twisted.internet import defer
+from mockito import mock, when, verify, any as ANY
+import test.support.mockito
+from leap.mail.adaptors.soledad import SoledadMailAdaptor, MailboxWrapper, ContentDocWrapper
+import pkg_resources
+from leap.mail.mail import Message
+from pixelated.adapter.mailstore import underscore_uuid
+
+from pixelated.adapter.mailstore.leap_mailstore import LeapMailStore, LeapMail, AttachmentInfo
+
+
+class TestLeapMailStore(TestCase):
+ def setUp(self):
+ self.soledad = mock()
+ self.mbox_uuid = str(uuid4())
+ self.doc_by_id = {}
+ self.mbox_uuid_by_name = {}
+ self.mbox_soledad_docs = []
+
+ when(self.soledad).get_from_index('by-type', 'mbox').thenAnswer(lambda: defer.succeed(self.mbox_soledad_docs))
+ self._mock_get_mailbox('INBOX')
+
+ @defer.inlineCallbacks
+ def test_get_mail_not_exist(self):
+ when(self.soledad).get_doc(ANY()).thenAnswer(lambda: defer.succeed(None))
+ store = LeapMailStore(self.soledad)
+
+ mail = yield store.get_mail(_format_mdoc_id(uuid4(), 1))
+
+ self.assertIsNone(mail)
+
+ @defer.inlineCallbacks
+ def test_get_mail(self):
+ mdoc_id, _ = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+
+ store = LeapMailStore(self.soledad)
+
+ mail = yield store.get_mail(mdoc_id)
+
+ self.assertIsInstance(mail, LeapMail)
+ self.assertEqual('darby.senger@zemlak.biz', mail.from_sender)
+ self.assertEqual(['carmel@murazikortiz.name'], mail.to)
+ self.assertEqual('Itaque consequatur repellendus provident sunt quia.', mail.subject)
+ self.assertIsNone(mail.body)
+ self.assertEqual('INBOX', mail.mailbox_name)
+
+ @defer.inlineCallbacks
+ def test_get_mail_from_mailbox(self):
+ other, _ = self._mock_get_mailbox('OTHER', create_new_uuid=True)
+ mdoc_id, _ = self._add_mail_fixture_to_soledad_from_file('mbox00000000', other.uuid)
+
+ store = LeapMailStore(self.soledad)
+
+ mail = yield store.get_mail(mdoc_id)
+
+ self.assertEqual('OTHER', mail.mailbox_name)
+
+ @defer.inlineCallbacks
+ def test_get_two_different_mails(self):
+ first_mdoc_id, _ = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+ second_mdoc_id, _ = self._add_mail_fixture_to_soledad_from_file('mbox00000001')
+
+ store = LeapMailStore(self.soledad)
+
+ mail1 = yield store.get_mail(first_mdoc_id)
+ mail2 = yield store.get_mail(second_mdoc_id)
+
+ self.assertNotEqual(mail1, mail2)
+ self.assertEqual('Itaque consequatur repellendus provident sunt quia.', mail1.subject)
+ self.assertEqual('Error illum dignissimos autem eos aspernatur.', mail2.subject)
+
+ @defer.inlineCallbacks
+ def test_get_mails(self):
+ first_mdoc_id, _ = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+ second_mdoc_id, _ = self._add_mail_fixture_to_soledad_from_file('mbox00000001')
+
+ store = LeapMailStore(self.soledad)
+
+ mails = yield store.get_mails([first_mdoc_id, second_mdoc_id])
+
+ self.assertEqual(2, len(mails))
+ self.assertEqual('Itaque consequatur repellendus provident sunt quia.', mails[0].subject)
+ self.assertEqual('Error illum dignissimos autem eos aspernatur.', mails[1].subject)
+
+ @defer.inlineCallbacks
+ def test_get_mails_fails_for_invalid_mail_id(self):
+ store = LeapMailStore(self.soledad)
+
+ try:
+ yield store.get_mails(['invalid'])
+ self.fail('Exception expected')
+ except FirstError:
+ pass
+
+ @defer.inlineCallbacks
+ def test_get_mail_with_body(self):
+ expeted_body = 'Dignissimos ducimus veritatis. Est tenetur consequatur quia occaecati. Vel sit sit voluptas.\n\nEarum distinctio eos. Accusantium qui sint ut quia assumenda. Facere dignissimos inventore autem sit amet. Pariatur voluptatem sint est.\n\nUt recusandae praesentium aspernatur. Exercitationem amet placeat deserunt quae consequatur eum. Unde doloremque suscipit quia.\n\n'
+ mdoc_id, _ = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+
+ store = LeapMailStore(self.soledad)
+
+ mail = yield store.get_mail(mdoc_id, include_body=True)
+
+ self.assertEqual(expeted_body, mail.body)
+
+ @defer.inlineCallbacks
+ def test_get_mail_attachment(self):
+ attachment_id = 'AAAA9AAD9E153D24265395203C53884506ABA276394B9FEC02B214BF9E77E48E'
+ doc = SoledadDocument(json=json.dumps({'content_type': 'foo/bar', 'raw': 'asdf'}))
+ when(self.soledad).get_from_index('by-type-and-payloadhash', 'cnt', attachment_id).thenReturn(defer.succeed([doc]))
+ store = LeapMailStore(self.soledad)
+
+ attachment = yield store.get_mail_attachment(attachment_id)
+
+ self.assertEqual({'content-type': 'foo/bar', 'content': bytearray('asdf')}, attachment)
+
+ @defer.inlineCallbacks
+ def test_get_mail_attachment_different_content_encodings(self):
+ attachment_id = '1B0A9AAD9E153D24265395203C53884506ABA276394B9FEC02B214BF9E77E48E'
+ encoding_examples = [('', 'asdf', 'asdf'),
+ ('base64', 'asdf', 'YXNkZg=='),
+ ('quoted-printable', 'äsdf', '=C3=A4sdf')]
+
+ for transfer_encoding, data, encoded_data in encoding_examples:
+ doc = SoledadDocument(json=json.dumps({'content_type': 'foo/bar', 'raw': encoded_data,
+ 'content_transfer_encoding': transfer_encoding}))
+ when(self.soledad).get_from_index('by-type-and-payloadhash', 'cnt', attachment_id).thenReturn(defer.succeed([doc]))
+ store = LeapMailStore(self.soledad)
+
+ attachment = yield store.get_mail_attachment(attachment_id)
+
+ self.assertEqual(bytearray(data), attachment['content'])
+
+ @defer.inlineCallbacks
+ def test_get_mail_attachment_throws_exception_if_attachment_does_not_exist(self):
+ attachment_id = '1B0A9AAD9E153D24265395203C53884506ABA276394B9FEC02B214BF9E77E48E'
+ when(self.soledad).get_from_index('by-type-and-payloadhash', 'cnt', attachment_id).thenReturn(defer.succeed([]))
+ store = LeapMailStore(self.soledad)
+ try:
+ yield store.get_mail_attachment(attachment_id)
+ self.fail('ValueError exception expected')
+ except ValueError:
+ pass
+
+ @defer.inlineCallbacks
+ def test_update_mail(self):
+ mdoc_id, fdoc_id = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+ soledad_fdoc = self.doc_by_id[fdoc_id]
+ when(self.soledad).put_doc(soledad_fdoc).thenReturn(defer.succeed(None))
+
+ store = LeapMailStore(self.soledad)
+
+ mail = yield store.get_mail(mdoc_id)
+
+ mail.tags.add('new_tag')
+
+ yield store.update_mail(mail)
+
+ verify(self.soledad).put_doc(soledad_fdoc)
+ self.assertTrue('new_tag' in soledad_fdoc.content['tags'])
+
+ @defer.inlineCallbacks
+ def test_all_mails(self):
+ first_mdoc_id, _ = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+ second_mdoc_id, _ = self._add_mail_fixture_to_soledad_from_file('mbox00000001')
+ when(self.soledad).get_from_index('by-type', 'meta').thenReturn(defer.succeed([self.doc_by_id[first_mdoc_id], self.doc_by_id[second_mdoc_id]]))
+
+ store = LeapMailStore(self.soledad)
+
+ mails = yield store.all_mails()
+
+ self.assertIsNotNone(mails)
+ self.assertEqual(2, len(mails))
+ self.assertEqual('Itaque consequatur repellendus provident sunt quia.', mails[0].subject)
+ self.assertEqual('Error illum dignissimos autem eos aspernatur.', mails[1].subject)
+
+ @defer.inlineCallbacks
+ def test_add_mailbox(self):
+ when(self.soledad).list_indexes().thenReturn(defer.succeed(MAIL_INDEXES)).thenReturn(defer.succeed(MAIL_INDEXES))
+ when(self.soledad).get_from_index('by-type-and-mbox', 'mbox', 'TEST').thenReturn(defer.succeed([]))
+ self._mock_create_soledad_doc(self.mbox_uuid, MailboxWrapper(mbox='TEST'))
+ when(self.soledad).get_doc(self.mbox_uuid).thenAnswer(lambda: defer.succeed(self.doc_by_id[self.mbox_uuid]))
+ when(self.soledad).put_doc(ANY()).thenAnswer(lambda: defer.succeed(None))
+ store = LeapMailStore(self.soledad)
+
+ mbox = yield store.add_mailbox('TEST')
+
+ self.assertIsNotNone(mbox)
+ self.assertEqual(self.mbox_uuid, mbox.doc_id)
+ self.assertEqual('TEST', mbox.mbox)
+ self.assertIsNotNone(mbox.uuid)
+ # assert index got updated
+
+ @defer.inlineCallbacks
+ def test_get_mailbox_names_always_contains_inbox(self):
+ store = LeapMailStore(self.soledad)
+
+ names = yield store.get_mailbox_names()
+
+ self.assertEqual({'INBOX'}, names)
+
+ @defer.inlineCallbacks
+ def test_get_mailbox_names(self):
+ self._mock_get_mailbox('OTHER', create_new_uuid=True)
+ store = LeapMailStore(self.soledad)
+
+ names = yield store.get_mailbox_names()
+
+ self.assertEqual({'INBOX', 'OTHER'}, names)
+
+ @defer.inlineCallbacks
+ def test_handles_unmapped_mailbox_uuid(self):
+ # given
+ store = LeapMailStore(self.soledad)
+ new_uuid = 'UNICORN'
+
+ # if no mailbox doc is created yet (async hell?)
+ when(self.soledad).get_from_index('by-type', 'mbox').thenReturn(defer.succeed([]))
+
+ # then it should point to empty, which is all mails
+ name = yield store._mailbox_name_from_uuid(new_uuid)
+ self.assertEquals('', name)
+
+ @defer.inlineCallbacks
+ def test_add_mail(self):
+ expected_message = self._add_create_mail_mocks_to_soledad_from_fixture_file('mbox00000000')
+ mail = self._load_mail_from_file('mbox00000000')
+ self._mock_get_mailbox('INBOX')
+
+ store = LeapMailStore(self.soledad)
+
+ message = yield store.add_mail('INBOX', mail.as_string())
+
+ self.assertIsInstance(message, LeapMail)
+ self._assert_message_docs_created(expected_message, message)
+
+ @defer.inlineCallbacks
+ def test_add_mail_with_attachment(self):
+ input_mail = MIMEMultipart()
+ input_mail.attach(MIMEText(u'a utf8 message', _charset='utf-8'))
+ attachment = MIMEApplication('pretend to be binary attachment data')
+ attachment.add_header('Content-Disposition', 'attachment', filename='filename.txt')
+ input_mail.attach(attachment)
+ mocked_message = self._add_create_mail_mocks_to_soledad(input_mail)
+ store = LeapMailStore(self.soledad)
+
+ message = yield store.add_mail('INBOX', input_mail.as_string())
+
+ expected = [{'ident': self._cdoc_phash_from_message(mocked_message, 2), 'name': 'filename.txt', 'encoding': 'base64'}]
+ self.assertEqual(expected, message.as_dict()['attachments'])
+
+ @defer.inlineCallbacks
+ def test_add_mail_with_nested_attachments(self):
+ input_mail = MIMEMultipart()
+ input_mail.attach(MIMEText(u'a utf8 message', _charset='utf-8'))
+ attachment = MIMEApplication('pretend to be binary attachment data')
+ attachment.add_header('Content-Disposition', 'attachment', filename='filename.txt')
+ nested_attachment = MIMEMultipart()
+ nested_attachment.attach(attachment)
+ input_mail.attach(nested_attachment)
+ mocked_message = self._add_create_mail_mocks_to_soledad(input_mail)
+ store = LeapMailStore(self.soledad)
+
+ message = yield store.add_mail('INBOX', input_mail.as_string())
+
+ expected = [{'ident': self._cdoc_phash_from_message(mocked_message, 2), 'name': 'filename.txt', 'encoding': 'base64'}]
+ self.assertEqual(expected, message.as_dict()['attachments'])
+
+ @defer.inlineCallbacks
+ def test_add_mail_with_special_chars(self):
+ input_mail = MIMEText(u'a utf8 message', _charset='utf-8')
+ input_mail['From'] = Header(u'"Älbert Übrö" <äüö@example.mail>', 'iso-8859-1')
+ input_mail['Subject'] = Header(u'Hällö Wörld', 'iso-8859-1')
+ self._add_create_mail_mocks_to_soledad(input_mail)
+ store = LeapMailStore(self.soledad)
+
+ message = yield store.add_mail('INBOX', input_mail.as_string())
+
+ self.assertEqual(u'"\xc4lbert \xdcbr\xf6" <\xe4\xfc\xf6@example.mail>', message.as_dict()['header']['from'])
+
+ def _cdoc_phash_from_message(self, mocked_message, attachment_nr):
+ return mocked_message.get_wrapper().cdocs[attachment_nr].future_doc_id[2:]
+
+ @defer.inlineCallbacks
+ def test_delete_mail(self):
+ mdoc_id, fdoc_id = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+
+ store = LeapMailStore(self.soledad)
+
+ yield store.delete_mail(mdoc_id)
+
+ self._assert_mail_got_deleted(fdoc_id, mdoc_id)
+
+ @defer.inlineCallbacks
+ def test_get_mailbox_mail_ids(self):
+ mdoc_id, fdoc_id = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+ when(self.soledad).get_from_index('by-type-and-mbox-uuid', 'flags', underscore_uuid(self.mbox_uuid)).thenReturn(defer.succeed([self.doc_by_id[fdoc_id]]))
+ self._mock_get_mailbox('INBOX')
+ store = LeapMailStore(self.soledad)
+
+ mail_ids = yield store.get_mailbox_mail_ids('INBOX')
+
+ self.assertEqual(1, len(mail_ids))
+ self.assertEqual(mdoc_id, mail_ids[0])
+
+ @defer.inlineCallbacks
+ def test_delete_mailbox(self):
+ _, mbox_soledad_doc = self._mock_get_mailbox('INBOX')
+ store = LeapMailStore(self.soledad)
+ when(self.soledad).delete_doc(mbox_soledad_doc).thenReturn(defer.succeed(None))
+
+ yield store.delete_mailbox('INBOX')
+
+ verify(self.soledad).delete_doc(self.doc_by_id[mbox_soledad_doc.doc_id])
+ # should also verify index is updated
+
+ @defer.inlineCallbacks
+ def test_copy_mail_to_mailbox(self):
+ expected_message = self._add_create_mail_mocks_to_soledad_from_fixture_file('mbox00000000')
+ mail_id, fdoc_id = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+ self._mock_get_mailbox('TRASH')
+ store = LeapMailStore(self.soledad)
+
+ mail = yield store.copy_mail_to_mailbox(mail_id, 'TRASH')
+
+ self._assert_message_docs_created(expected_message, mail, only_mdoc_and_fdoc=True)
+
+ @defer.inlineCallbacks
+ def test_move_to_mailbox(self):
+ expected_message = self._add_create_mail_mocks_to_soledad_from_fixture_file('mbox00000000')
+ mail_id, fdoc_id = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+ self._mock_get_mailbox('TRASH')
+ store = LeapMailStore(self.soledad)
+
+ mail = yield store.move_mail_to_mailbox(mail_id, 'TRASH')
+
+ self._assert_message_docs_created(expected_message, mail, only_mdoc_and_fdoc=True)
+ self._assert_mail_got_deleted(fdoc_id, mail_id)
+
+ def _assert_mail_got_deleted(self, fdoc_id, mail_id):
+ verify(self.soledad).delete_doc(self.doc_by_id[mail_id])
+ verify(self.soledad).delete_doc(self.doc_by_id[fdoc_id])
+
+ def _assert_message_docs_created(self, expected_message, actual_message, only_mdoc_and_fdoc=False):
+ wrapper = expected_message.get_wrapper()
+
+ verify(self.soledad).create_doc(wrapper.mdoc.serialize(), doc_id=actual_message.mail_id)
+ verify(self.soledad).create_doc(wrapper.fdoc.serialize(), doc_id=wrapper.fdoc.future_doc_id)
+ if not only_mdoc_and_fdoc:
+ verify(self.soledad).create_doc(wrapper.hdoc.serialize(), doc_id=wrapper.hdoc.future_doc_id)
+ for nr, cdoc in wrapper.cdocs.items():
+ verify(self.soledad).create_doc(cdoc.serialize(), doc_id=wrapper.cdocs[nr].future_doc_id)
+
+ def _mock_get_mailbox(self, mailbox_name, create_new_uuid=False):
+ mbox_uuid = self.mbox_uuid if not create_new_uuid else str(uuid4())
+ when(self.soledad).list_indexes().thenReturn(defer.succeed(MAIL_INDEXES)).thenReturn(
+ defer.succeed(MAIL_INDEXES))
+ doc_id = str(uuid4())
+ mbox = MailboxWrapper(doc_id=doc_id, mbox=mailbox_name, uuid=mbox_uuid)
+ soledad_doc = SoledadDocument(doc_id, json=json.dumps(mbox.serialize()))
+ when(self.soledad).get_from_index('by-type-and-mbox', 'mbox', mailbox_name).thenReturn(defer.succeed([soledad_doc]))
+ self._mock_get_soledad_doc(doc_id, mbox)
+
+ self.mbox_uuid_by_name[mailbox_name] = mbox_uuid
+ self.mbox_soledad_docs.append(soledad_doc)
+
+ return mbox, soledad_doc
+
+ def _add_mail_fixture_to_soledad_from_file(self, mail_file, mbox_uuid=None):
+ mail = self._load_mail_from_file(mail_file)
+ return self._add_mail_fixture_to_soledad(mail, mbox_uuid)
+
+ def _add_mail_fixture_to_soledad(self, mail, mbox_uuid=None):
+ msg = self._convert_mail_to_leap_message(mail, mbox_uuid)
+ wrapper = msg.get_wrapper()
+ mdoc_id = wrapper.mdoc.future_doc_id
+ fdoc_id = wrapper.mdoc.fdoc
+ hdoc_id = wrapper.mdoc.hdoc
+ cdoc_id = wrapper.mdoc.cdocs[0]
+
+ self._mock_get_soledad_doc(mdoc_id, wrapper.mdoc)
+ self._mock_get_soledad_doc(fdoc_id, wrapper.fdoc)
+ self._mock_get_soledad_doc(hdoc_id, wrapper.hdoc)
+ self._mock_get_soledad_doc(cdoc_id, wrapper.cdocs[1])
+ return mdoc_id, fdoc_id
+
+ def _add_create_mail_mocks_to_soledad_from_fixture_file(self, mail_file):
+ mail = self._load_mail_from_file(mail_file)
+ return self._add_create_mail_mocks_to_soledad(mail)
+
+ def _add_create_mail_mocks_to_soledad(self, example_mail):
+ mail = self._convert_mail_to_leap_message(example_mail)
+ wrapper = mail.get_wrapper()
+
+ mdoc_id = wrapper.mdoc.future_doc_id
+ fdoc_id = wrapper.mdoc.fdoc
+ hdoc_id = wrapper.mdoc.hdoc
+
+ self._mock_create_soledad_doc(mdoc_id, wrapper.mdoc)
+ self._mock_create_soledad_doc(fdoc_id, wrapper.fdoc)
+ self._mock_create_soledad_doc(hdoc_id, wrapper.hdoc)
+
+ for _, cdoc in wrapper.cdocs.items():
+ self._mock_create_soledad_doc(cdoc.future_doc_id, cdoc)
+ self._mock_get_soledad_doc(cdoc.future_doc_id, cdoc)
+
+ return mail
+
+ def _convert_mail_to_leap_message(self, mail, mbox_uuid=None):
+ msg = SoledadMailAdaptor().get_msg_from_string(Message, mail.as_string())
+ if mbox_uuid is None:
+ msg.get_wrapper().set_mbox_uuid(self.mbox_uuid)
+ else:
+ msg.get_wrapper().set_mbox_uuid(mbox_uuid)
+
+ return msg
+
+ def _mock_get_soledad_doc(self, doc_id, doc):
+ soledad_doc = SoledadDocument(doc_id, json=json.dumps(doc.serialize()))
+
+ # when(self.soledad).get_doc(doc_id).thenReturn(defer.succeed(soledad_doc))
+ when(self.soledad).get_doc(doc_id).thenAnswer(lambda: defer.succeed(soledad_doc))
+
+ self.doc_by_id[doc_id] = soledad_doc
+
+ def _mock_create_soledad_doc(self, doc_id, doc):
+ soledad_doc = SoledadDocument(doc_id, json=json.dumps(doc.serialize()))
+ if doc.future_doc_id:
+ when(self.soledad).create_doc(doc.serialize(), doc_id=doc_id).thenReturn(defer.succeed(soledad_doc))
+ else:
+ when(self.soledad).create_doc(doc.serialize()).thenReturn(defer.succeed(soledad_doc))
+ self.doc_by_id[doc_id] = soledad_doc
+
+ def _load_mail_from_file(self, mail_file):
+ mailset_dir = pkg_resources.resource_filename('test.unit.fixtures', 'mailset')
+ mail_file = os.path.join(mailset_dir, 'new', mail_file)
+ with open(mail_file) as f:
+ mail = Parser().parse(f)
+ return mail
+
+
+def _format_mdoc_id(mbox_uuid, chash):
+ return constants.METAMSGID.format(mbox_uuid=mbox_uuid, chash=chash)
diff --git a/service/test/unit/adapter/mailstore/test_searchable_mailstore.py b/service/test/unit/adapter/mailstore/test_searchable_mailstore.py
new file mode 100644
index 00000000..8c571201
--- /dev/null
+++ b/service/test/unit/adapter/mailstore/test_searchable_mailstore.py
@@ -0,0 +1,112 @@
+#
+# Copyright (c) 2015 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+from email.parser import Parser
+import os
+from mockito import verify, mock, when
+import pkg_resources
+from twisted.internet import defer
+from twisted.trial.unittest import TestCase
+from pixelated.adapter.mailstore import MailStore
+from pixelated.adapter.mailstore.leap_mailstore import LeapMail
+from pixelated.adapter.mailstore.searchable_mailstore import SearchableMailStore
+from pixelated.adapter.search import SearchEngine
+
+
+ANY_MAILBOX = 'INBOX'
+
+
+class TestSearchableMailStore(TestCase):
+
+ def setUp(self):
+ super(TestSearchableMailStore, self).setUp()
+ self.search_index = mock(mocked_obj=SearchEngine)
+ self.delegate_mail_store = mock(mocked_obj=MailStore)
+ self.store = SearchableMailStore(self.delegate_mail_store, self.search_index)
+
+ @defer.inlineCallbacks
+ def test_add_mail_delegates_to_mail_store_and_updates_index(self):
+ mail = self._load_mail_from_file('mbox00000000')
+ leap_mail = LeapMail('id', ANY_MAILBOX)
+ when(self.delegate_mail_store).add_mail(ANY_MAILBOX, mail).thenReturn(defer.succeed(leap_mail))
+
+ result = yield self.store.add_mail(ANY_MAILBOX, mail)
+
+ verify(self.delegate_mail_store).add_mail(ANY_MAILBOX, mail)
+ verify(self.search_index).index_mail(leap_mail)
+ self.assertEqual(leap_mail, result)
+
+ @defer.inlineCallbacks
+ def test_delete_mail_delegates_to_mail_store_and_updates_index(self):
+ when(self.delegate_mail_store).delete_mail('mail id').thenReturn(defer.succeed(None))
+ when(self.search_index).remove_from_index('mail id').thenReturn(defer.succeed(None))
+
+ yield self.store.delete_mail('mail id')
+
+ verify(self.delegate_mail_store).delete_mail('mail id')
+ verify(self.search_index).remove_from_index('mail id')
+
+ @defer.inlineCallbacks
+ def test_update_mail_delegates_to_mail_store_and_updates_index(self):
+ leap_mail = LeapMail('id', ANY_MAILBOX)
+
+ yield self.store.update_mail(leap_mail)
+
+ verify(self.delegate_mail_store).update_mail(leap_mail)
+ verify(self.search_index).index_mail(leap_mail)
+
+ @defer.inlineCallbacks
+ def test_copy_mail_delegates_to_mail_store_and_updates_index(self):
+ copied_mail = LeapMail('new id', ANY_MAILBOX)
+ when(self.delegate_mail_store).copy_mail_to_mailbox('mail id', ANY_MAILBOX).thenReturn(defer.succeed(copied_mail))
+
+ result = yield self.store.copy_mail_to_mailbox('mail id', ANY_MAILBOX)
+
+ verify(self.search_index).index_mail(copied_mail)
+ self.assertEqual(copied_mail, result)
+
+ @defer.inlineCallbacks
+ def test_move_mail_delegates_to_mail_store_and_updates_index(self):
+ moved_mail = LeapMail('new id', ANY_MAILBOX)
+ when(self.delegate_mail_store).move_mail_to_mailbox('mail id', ANY_MAILBOX).thenReturn(defer.succeed(moved_mail))
+
+ result = yield self.store.move_mail_to_mailbox('mail id', ANY_MAILBOX)
+
+ verify(self.search_index).remove_from_index('mail id')
+ verify(self.search_index).index_mail(moved_mail)
+ self.assertEqual(moved_mail, result)
+
+ @defer.inlineCallbacks
+ def test_other_methods_are_delegated(self):
+ mail = LeapMail('mail id', ANY_MAILBOX)
+ when(self.delegate_mail_store).get_mail('mail id').thenReturn(defer.succeed(mail), defer.succeed(mail))
+ result = yield self.store.get_mail('mail id')
+
+ self.assertEqual(mail, result)
+
+ @defer.inlineCallbacks
+ def test_delete_mailbox_is_not_implemented(self):
+ try:
+ yield self.store.delete_mailbox(ANY_MAILBOX)
+ self.fail("Should raise NotImplementedError")
+ except NotImplementedError:
+ pass
+
+ def _load_mail_from_file(self, mail_file):
+ mailset_dir = pkg_resources.resource_filename('test.unit.fixtures', 'mailset')
+ mail_file = os.path.join(mailset_dir, 'new', mail_file)
+ with open(mail_file) as f:
+ mail = Parser().parse(f)
+ return mail
diff --git a/service/test/unit/adapter/search/test_index_storage_key.py b/service/test/unit/adapter/search/test_index_storage_key.py
new file mode 100644
index 00000000..e60c69ef
--- /dev/null
+++ b/service/test/unit/adapter/search/test_index_storage_key.py
@@ -0,0 +1,52 @@
+#
+# Copyright (c) 2015 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+from leap.soledad.common.document import SoledadDocument
+from mockito import mock, when, unstub, verify
+from twisted.internet import defer
+from twisted.trial import unittest
+from pixelated.adapter.search.index_storage_key import SearchIndexStorageKey
+import os
+
+
+class TestSearchIndexStorageKey(unittest.TestCase):
+
+ def tearDown(self):
+ unstub()
+
+ @defer.inlineCallbacks
+ def test_get_or_create_key_returns_key(self):
+ soledad = mock()
+
+ when(soledad).get_from_index('by-type', 'index_key').thenReturn([SoledadDocument(json='{"value": "somekey"}')])
+
+ key = yield SearchIndexStorageKey(soledad).get_or_create_key()
+
+ self.assertEqual('somekey', key)
+
+ @defer.inlineCallbacks
+ def test_get_or_create_creates_key_if_not_exists(self):
+ expected_key = '\x8brN\xa3\xe5-\x828 \x95\x8d\n\xc6\x0c\x82\n\xd7!\xa9\xb0.\xcc\\h\xa9\x98\xe9V\xc1*<\xfe\xbb\x8f\xcd\x7f\x8c#\xff\xf9\x840\xdf{}\x97\xebS-*\xe2f\xf9B\xa9\xb1\x0c\x1d-C)\xc5\xa0B'
+ base64_encoded_key = 'i3JOo+UtgjgglY0KxgyCCtchqbAuzFxoqZjpVsEqPP67j81/jCP/+YQw33t9l+tTLSriZvlCqbEM\nHS1DKcWgQg==\n'
+ soledad = mock()
+
+ when(soledad).get_from_index('by-type', 'index_key').thenReturn([])
+ when(os).urandom(64).thenReturn(expected_key)
+
+ key = yield SearchIndexStorageKey(soledad).get_or_create_key()
+
+ self.assertEqual(expected_key, key)
+
+ verify(soledad).create_doc(dict(type='index_key', value=base64_encoded_key))
diff --git a/service/test/unit/adapter/search/test_search.py b/service/test/unit/adapter/search/test_search.py
index 1d9076a2..76e704b6 100644
--- a/service/test/unit/adapter/search/test_search.py
+++ b/service/test/unit/adapter/search/test_search.py
@@ -16,6 +16,7 @@
import unittest
+from pixelated.adapter.mailstore.leap_mailstore import LeapMail
from pixelated.adapter.search import SearchEngine
from tempdir import TempDir
from test.support import test_helper
@@ -56,7 +57,7 @@ class SearchEngineTest(unittest.TestCase):
}
# when
- se.index_mail(test_helper.pixelated_mail(extra_headers=headers, chash='mailid'))
+ se.index_mail(LeapMail('mailid', 'INBOX', headers=headers)) # test_helper.pixelated_mail(extra_headers=headers, chash='mailid'))
result = se.search('folker')
diff --git a/service/test/unit/adapter/test_draft_service.py b/service/test/unit/adapter/test_draft_service.py
index 79eca5f6..c2b7cd93 100644
--- a/service/test/unit/adapter/test_draft_service.py
+++ b/service/test/unit/adapter/test_draft_service.py
@@ -1,4 +1,6 @@
import unittest
+from twisted.internet import defer
+from pixelated.adapter.mailstore.leap_mailstore import LeapMail
from pixelated.adapter.model.mail import InputMail
from pixelated.adapter.services.draft_service import DraftService
@@ -10,21 +12,20 @@ class DraftServiceTest(unittest.TestCase):
def setUp(self):
self.mailboxes = mock()
- self.drafts_mailbox = mock()
- self.draft_service = DraftService(self.mailboxes)
- self.mailboxes.drafts = self.drafts_mailbox
+ self.mail_store = mock()
+ self.draft_service = DraftService(self.mail_store)
def test_add_draft(self):
mail = InputMail()
self.draft_service.create_draft(mail)
- verify(self.drafts_mailbox).add(mail)
+ verify(self.mail_store).add_mail('DRAFTS', mail.raw)
def test_update_draft(self):
mail = InputMail.from_dict(test_helper.mail_dict())
- when(self.drafts_mailbox).add(mail).thenReturn(mail)
+ when(self.mail_store).add_mail('DRAFTS', mail.raw).thenReturn(defer.succeed(LeapMail('id', 'DRAFTS')))
self.draft_service.update_draft(mail.ident, mail)
- inorder.verify(self.drafts_mailbox).add(mail)
- inorder.verify(self.drafts_mailbox).remove(mail.ident)
+ inorder.verify(self.mail_store).add_mail('DRAFTS', mail.raw)
+ inorder.verify(self.mail_store).delete_mail(mail.ident)
diff --git a/service/test/unit/adapter/test_mail.py b/service/test/unit/adapter/test_mail.py
index 1a9280ff..dc344992 100644
--- a/service/test/unit/adapter/test_mail.py
+++ b/service/test/unit/adapter/test_mail.py
@@ -14,378 +14,20 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
-import unittest
+from twisted.trial import unittest
import pixelated.support.date
-from pixelated.adapter.model.mail import PixelatedMail, InputMail
+from pixelated.adapter.model.mail import InputMail, HEADERS_KEY
from mockito import mock, unstub, when
from test.support import test_helper
import dateutil.parser as dateparser
import base64
-from leap.mail.imap.fields import fields
+from leap.mail.adaptors import soledad_indexes as fields
from datetime import datetime
import os
import json
-
-
-class TestPixelatedMail(unittest.TestCase):
- def setUp(self):
- self.querier = mock()
-
- def tearDown(self):
- unstub()
-
- def test_parse_date_from_soledad_uses_date_header_if_available(self):
- leap_mail_date = 'Wed, 3 Sep 2014 12:36:17 -0300'
- leap_mail_date_in_iso_format = "2014-09-03T12:36:17-03:00"
-
- leap_mail = test_helper.leap_mail(headers={'date': leap_mail_date})
-
- mail = PixelatedMail.from_soledad(*leap_mail, soledad_querier=self.querier)
-
- self.assertEqual(str(mail.headers['Date']), leap_mail_date_in_iso_format)
-
- def test_parse_date_from_soledad_fallback_to_received_header_if_date_header_isnt_available(self):
- leap_mail_date = "Wed, 03 Sep 2014 13:11:15 -0300"
- leap_mail_date_in_iso_format = "2014-09-03T13:11:15-03:00"
- leap_mail_received_header = "by bitmask.local from 127.0.0.1 with ESMTP ;\n " + leap_mail_date
-
- leap_mail = test_helper.leap_mail(headers={'received': leap_mail_received_header})
-
- mail = PixelatedMail.from_soledad(*leap_mail, soledad_querier=self.querier)
-
- self.assertEqual(str(mail.headers['Date']), leap_mail_date_in_iso_format)
-
- def test_parse_date_from_soledad_fallback_to_now_if_neither_date_nor_received_header(self):
- leap_mail_date_in_iso_format = "2014-09-03T13:11:15-03:00"
-
- when(pixelated.support.date).iso_now().thenReturn(leap_mail_date_in_iso_format)
- fdoc, hdoc, bdoc = test_helper.leap_mail()
- del hdoc.content['date']
-
- mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier)
-
- self.assertEqual(str(mail.headers['Date']), leap_mail_date_in_iso_format)
-
- def test_use_datetime_now_as_fallback_for_invalid_date(self):
- leap_mail_date = u'söme däte'
- date_expected = "2014-09-03T13:11:15-03:00"
-
- when(pixelated.support.date).iso_now().thenReturn(date_expected)
- leap_mail = test_helper.leap_mail(headers={'date': leap_mail_date})
-
- mail = PixelatedMail.from_soledad(*leap_mail, soledad_querier=self.querier)
-
- self.assertEqual(str(mail.headers['Date']), date_expected)
-
- def test_fall_back_to_ascii_if_invalid_received_header(self):
- leap_mail_received_header = u"söme invalid received heäder\n"
- date_expected = "2014-09-03T13:11:15-03:00"
-
- when(pixelated.support.date).iso_now().thenReturn(date_expected)
- leap_mail = test_helper.leap_mail(headers={'received': leap_mail_received_header})
-
- mail = PixelatedMail.from_soledad(*leap_mail, soledad_querier=self.querier)
-
- self.assertEqual(mail.headers['Date'], date_expected)
-
- def test_update_tags_return_a_set_with_the_current_tags(self):
- soledad_docs = test_helper.leap_mail(extra_headers={'X-tags': '["custom_1", "custom_2"]'})
- pixelated_mail = PixelatedMail.from_soledad(*soledad_docs, soledad_querier=self.querier)
-
- current_tags = pixelated_mail.update_tags({'custom_1', 'custom_3'})
- self.assertEquals({'custom_3', 'custom_1'}, current_tags)
-
- def test_mark_as_read(self):
- mail = PixelatedMail.from_soledad(*test_helper.leap_mail(flags=[]), soledad_querier=self.querier)
-
- mail.mark_as_read()
-
- self.assertEquals(mail.fdoc.content['flags'], ['\\Seen'])
-
- def test_mark_as_not_recent(self):
- mail = PixelatedMail.from_soledad(*test_helper.leap_mail(flags=['\\Recent']), soledad_querier=self.querier)
-
- mail.mark_as_not_recent()
-
- self.assertEquals(mail.fdoc.content['flags'], [])
-
- def test_get_for_save_adds_from(self):
- InputMail.FROM_EMAIL_ADDRESS = 'me@pixelated.org'
- headers = {'Subject': 'The subject',
- 'Date': str(datetime.now()),
- 'To': 'me@pixelated.org'}
-
- input_mail = InputMail()
- input_mail.headers = headers
-
- self.assertEqual('me@pixelated.org', input_mail.get_for_save(1, 'SENT')[1][fields.HEADERS_KEY]['From'])
-
- def test_as_dict(self):
- headers = {'Subject': 'The subject',
- 'From': 'someone@pixelated.org',
- 'To': 'me@pixelated.org'}
- fdoc, hdoc, bdoc = test_helper.leap_mail(flags=['\\Recent'],
- extra_headers=headers)
-
- InputMail.FROM_EMAIL_ADDRESS = 'me@pixelated.org'
-
- mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier)
-
- _dict = mail.as_dict()
-
- self.maxDiff = None
-
- self.assertEquals(_dict, {'htmlBody': None,
- 'textPlainBody': 'body',
- 'header': {
- 'date': dateparser.parse(hdoc.content['date']).isoformat(),
- 'from': 'someone@pixelated.org',
- 'subject': 'The subject',
- 'to': ['me@pixelated.org'],
- 'cc': [],
- 'bcc': []
- },
- 'ident': 'chash',
- 'mailbox': 'inbox',
- 'security_casing': {'imprints': [{'state': 'no_signature_information'}], 'locks': []},
- 'status': ['recent'],
- 'tags': [],
- 'attachments': [],
- 'replying': {
- 'single': 'someone@pixelated.org',
- 'all': {
- 'to-field': ['someone@pixelated.org'],
- 'cc-field': []
- }
- }})
-
- def test_use_reply_to_address_for_replying(self):
- headers = {'Subject': 'The subject',
- 'From': 'someone@pixelated.org',
- 'Reply-To': 'reply-to-this-address@pixelated.org',
- 'To': 'me@pixelated.org, \nalice@pixelated.org'}
- fdoc, hdoc, bdoc = test_helper.leap_mail(flags=['\\Recent'],
- extra_headers=headers)
-
- InputMail.FROM_EMAIL_ADDRESS = 'me@pixelated.org'
-
- mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier)
-
- _dict = mail.as_dict()
-
- self.assertEquals(_dict['replying'], {'single': 'reply-to-this-address@pixelated.org',
- 'all': {
- 'to-field': ['alice@pixelated.org', 'reply-to-this-address@pixelated.org'],
- 'cc-field': []
- }})
-
- def test_alternatives_body(self):
- parts = {'alternatives': [], 'attachments': []}
- parts['alternatives'].append({'content': 'blablabla', 'headers': {'Content-Type': 'text/plain'}})
- parts['alternatives'].append({'content': '<p>blablabla</p>', 'headers': {'Content-Type': 'text/html'}})
-
- mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw='blablabla'), parts=parts, soledad_querier=None)
-
- self.assertRegexpMatches(mail.html_body, '^<p>blablabla</p>$')
- self.assertRegexpMatches(mail.text_plain_body, '^blablabla$')
-
- def test_html_is_none_if_multiple_alternatives_have_no_html_part(self):
- parts = {
- 'attachments': [],
- 'alternatives': [
- {'content': u'content', 'headers': {u'Content-Type': u'text/plain; charset=us-ascii'}},
- {'content': u'', 'headers': {u'Some info': u'info'}}]}
-
- mail = PixelatedMail.from_soledad(None, None, None, parts=parts, soledad_querier=None)
- self.assertIsNone(mail.html_body)
-
- def test_percent_character_is_allowed_on_body(self):
- parts = {'alternatives': [], 'attachments': []}
- parts['alternatives'].append({'content': '100% happy with percentage symbol', 'headers': {'Content-Type': 'text/plain'}})
- parts['alternatives'].append({'content': '<p>100% happy with percentage symbol</p>', 'headers': {'Content-Type': 'text/html'}})
-
- mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw="100% happy with percentage symbol"), parts=parts, soledad_querier=None)
-
- self.assertRegexpMatches(mail.text_plain_body, '([\s\S]*100%)')
- self.assertRegexpMatches(mail.html_body, '([\s\S]*100%)')
-
- def test_content_type_header_of_mail_part_is_used(self):
- plain_headers = {'Content-Type': 'text/plain; charset=iso-8859-1', 'Content-Transfer-Encoding': 'quoted-printable'}
- html_headers = {'Content-Type': 'text/html; charset=utf-8', 'Content-Transfer-Encoding': 'quoted-printable'}
- parts = {'alternatives': [{'content': 'H=E4llo', 'headers': plain_headers}, {'content': '<p>H=C3=A4llo</p>', 'headers': html_headers}]}
-
- mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw='some raw body'), parts=parts, soledad_querier=None)
-
- self.assertEqual(2, len(mail.alternatives))
- self.assertEquals(u'H\xe4llo', mail.text_plain_body)
- self.assertEquals(u'<p>H\xe4llo</p>', mail.html_body)
-
- def test_multi_line_content_type_header_is_supported(self):
- plain_headers = {'Content-Type': 'text/plain;\ncharset=iso-8859-1', 'Content-Transfer-Encoding': 'quoted-printable'}
- html_headers = {'Content-Type': 'text/html;\ncharset=utf-8', 'Content-Transfer-Encoding': 'quoted-printable'}
- parts = {'alternatives': [{'content': 'H=E4llo', 'headers': plain_headers}, {'content': '<p>H=C3=A4llo</p>', 'headers': html_headers}]}
-
- mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw='some raw body'), parts=parts, soledad_querier=None)
-
- self.assertEqual(2, len(mail.alternatives))
- self.assertEquals(u'H\xe4llo', mail.text_plain_body)
- self.assertEquals(u'<p>H\xe4llo</p>', mail.html_body)
-
- def test_broken_content_type_defaults_to_usascii(self):
- plain_headers = {'Content-Type': 'I lie to you', 'Content-Transfer-Encoding': 'quoted-printable'}
- html_headers = {'Content-Type': 'text/html;\ncharset=utf-8', 'Content-Transfer-Encoding': 'quoted-printable'}
- parts = {'alternatives': [{'content': 'H=E4llo', 'headers': plain_headers}, {'content': '<p>H=C3=A4llo</p>', 'headers': html_headers}]}
-
- mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw='some raw body'), parts=parts, soledad_querier=None)
-
- self.assertEquals(u'H=E4llo', mail.text_plain_body)
-
- def test_broken_encoding_defaults_to_8bit(self):
- plain_headers = {'Content-Type': 'text/plain;\ncharset=iso-8859-1', 'Content-Transfer-Encoding': 'I lie to you!'}
- html_headers = {'Content-Type': 'text/html;\ncharset=utf-8', 'Content-Transfer-Encoding': 'quoted-printable'}
- parts = {'alternatives': [{'content': 'H=E4llo', 'headers': plain_headers}, {'content': '<p>H=C3=A4llo</p>', 'headers': html_headers}]}
-
- mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw='some raw body'), parts=parts, soledad_querier=None)
-
- self.assertEquals(u'H=E4llo', mail.text_plain_body)
- self.assertEquals(u'<p>H\xe4llo</p>', mail.html_body)
-
- def test_clean_line_breaks_on_address_headers(self):
- many_recipients = 'One <one@mail.com>,\nTwo <two@mail.com>, Normal <normal@mail.com>,\nalone@mail.com'
- headers = {'Cc': many_recipients,
- 'Bcc': many_recipients,
- 'To': many_recipients}
- fdoc, hdoc, bdoc = test_helper.leap_mail(flags=['\\Recent'],
- extra_headers=headers)
-
- mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier)
-
- for header_label in ['To', 'Cc', 'Bcc']:
- for address in mail.headers[header_label]:
- self.assertNotIn('\n', address)
- self.assertNotIn(',', address)
- self.assertEquals(4, len(mail.headers[header_label]))
-
- def test_that_body_understands_base64(self):
- body = u'bl\xe1'
- encoded_body = unicode(body.encode('utf-8').encode('base64'))
-
- fdoc, hdoc, bdoc = test_helper.leap_mail()
- parts = {'alternatives': []}
- parts['alternatives'].append({'content': encoded_body, 'headers': {'Content-Transfer-Encoding': 'base64'}})
- mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier, parts=parts)
-
- self.assertEquals(body, mail.text_plain_body)
-
- def test_that_body_understands_7bit(self):
- body = u'testtext'
- encoded_body = body
-
- fdoc, hdoc, bdoc = test_helper.leap_mail()
- parts = {'alternatives': []}
- parts['alternatives'].append({'content': encoded_body, 'headers': {'Content-Transfer-Encoding': '7bit'}})
- mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier, parts=parts)
-
- self.assertEquals(body, mail.text_plain_body)
-
- def test_that_body_understands_8bit(self):
- body = u'testtext'
- encoded_body = body
-
- fdoc, hdoc, bdoc = test_helper.leap_mail()
- parts = {'alternatives': []}
- parts['alternatives'].append({'content': encoded_body, 'headers': {'Content-Transfer-Encoding': '8bit'}})
- mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier, parts=parts)
-
- self.assertEquals(body, mail.text_plain_body)
-
- def test_bounced_mails_are_recognized(self):
- bounced_mail_hdoc = os.path.join(os.path.dirname(__file__), '..', 'fixtures', 'bounced_mail_hdoc.json')
- with open(bounced_mail_hdoc) as f:
- hdoc = json.loads(f.read())
-
- bounced_leap_mail = test_helper.leap_mail()
- bounced_leap_mail[1].content = hdoc
- bounced_mail = PixelatedMail.from_soledad(*bounced_leap_mail, soledad_querier=self.querier)
-
- not_bounced_leap_mail = test_helper.leap_mail()
- not_bounced_mail = PixelatedMail.from_soledad(*not_bounced_leap_mail, soledad_querier=self.querier)
-
- self.assertTrue(bounced_mail.bounced)
- self.assertIn('this_mail_was_bounced@domain.com', bounced_mail.bounced)
- self.assertIn("MAILER-DAEMON@domain.org (Mail Delivery System)", bounced_mail.bounced)
- self.assertFalse(not_bounced_mail.bounced)
-
- def test_ignore_transient_failures(self):
- """
- Persistent errors should start with 5.
- See: http://www.iana.org/assignments/smtp-enhanced-status-codes/smtp-enhanced-status-codes.xhtml
- """
- bounced_mail_hdoc = os.path.join(os.path.dirname(__file__), '..', 'fixtures', 'bounced_mail_hdoc.json')
- with open(bounced_mail_hdoc) as f:
- content = f.read()
- # Change status to 4.XXX.YYY (only the first number is relevant here)
- content = content.replace("5.1.1", "4.X.Y")
- hdoc = json.loads(content)
-
- temporary_bounced_leap_mail = test_helper.leap_mail()
- temporary_bounced_leap_mail[1].content = hdoc
- temporary_bounced_mail = PixelatedMail.from_soledad(*temporary_bounced_leap_mail, soledad_querier=self.querier)
-
- not_bounced_leap_mail = test_helper.leap_mail()
- not_bounced_mail = PixelatedMail.from_soledad(*not_bounced_leap_mail, soledad_querier=self.querier)
-
- self.assertFalse(temporary_bounced_mail.bounced)
- self.assertFalse(not_bounced_mail.bounced)
-
- def _create_bdoc(self, raw):
- class FakeBDoc:
- def __init__(self, raw):
- self.content = {'raw': raw}
- return FakeBDoc(raw)
-
- def test_encoding_special_character_on_header(self):
- subject = "=?UTF-8?Q?test_encoding_St=C3=A4ch?="
- email_from = "=?UTF-8?Q?St=C3=A4ch_<stach@pixelated-project.org>?="
- email_to = "=?utf-8?b?IsOEw7zDtiDDlsO8w6QiIDxmb2xrZXJAcGl4ZWxhdGVkLXByb2plY3Qub3Jn?=\n =?utf-8?b?PiwgRsO2bGtlciA8Zm9sa2VyQHBpeGVsYXRlZC1wcm9qZWN0Lm9yZz4=?="
-
- pixel_mail = PixelatedMail()
-
- self.assertEqual(pixel_mail._decode_header(subject), 'test encoding St\xc3\xa4ch')
- self.assertEqual(pixel_mail._decode_header(email_from), 'St\xc3\xa4ch <stach@pixelated-project.org>')
- self.assertEqual(pixel_mail._decode_header(email_to), '"\xc3\x84\xc3\xbc\xc3\xb6 \xc3\x96\xc3\xbc\xc3\xa4" <folker@pixelated-project.org>, F\xc3\xb6lker <folker@pixelated-project.org>')
- self.assertEqual(pixel_mail._decode_header(None), None)
-
- def test_headers_are_encoded_right(self):
- subject = "=?UTF-8?Q?test_encoding_St=C3=A4ch?="
- email_from = "=?UTF-8?Q?St=C3=A4ch_<stach@pixelated-project.org>?="
- email_to = "=?utf-8?b?IsOEw7zDtiDDlsO8w6QiIDxmb2xrZXJAcGl4ZWxhdGVkLXByb2plY3Qub3Jn?=\n =?utf-8?b?PiwgRsO2bGtlciA8Zm9sa2VyQHBpeGVsYXRlZC1wcm9qZWN0Lm9yZz4=?="
- email_cc = "=?UTF-8?Q?St=C3=A4ch_<stach@pixelated-project.org>?="
- email_bcc = "=?UTF-8?Q?St=C3=A4ch_<stach@pixelated-project.org>?="
-
- leap_mail = test_helper.leap_mail(extra_headers={'Subject': subject, 'From': email_from, 'To': email_to, 'Cc': email_cc, 'Bcc': email_bcc})
-
- mail = PixelatedMail.from_soledad(*leap_mail, soledad_querier=self.querier)
-
- self.assertEqual(str(mail.headers['Subject']), 'test encoding St\xc3\xa4ch')
- self.assertEqual(str(mail.headers['From']), 'St\xc3\xa4ch <stach@pixelated-project.org>')
- self.assertEqual(mail.headers['To'], ['"\xc3\x84\xc3\xbc\xc3\xb6 \xc3\x96\xc3\xbc\xc3\xa4" <folker@pixelated-project.org>', 'F\xc3\xb6lker <folker@pixelated-project.org>'])
- self.assertEqual(mail.headers['Cc'], ['St\xc3\xa4ch <stach@pixelated-project.org>'])
- self.assertEqual(mail.headers['Bcc'], ['St\xc3\xa4ch <stach@pixelated-project.org>'])
-
- mail.as_dict()
-
- def test_parse_UTF8_headers_with_CharsetAscii(self):
- leap_mail_from = u'"söme ümläuds" <lisa5@dev.pixelated-project.org>'
- leap_mail_to = u'"söme ümläuds" <lisa5@dev.pixelated-project.org>,\n"söme ümläuds" <lisa5@dev.pixelated-project.org>'
-
- leap_mail = test_helper.leap_mail(extra_headers={'From': leap_mail_from, 'Subject': "some subject", 'To': leap_mail_to})
-
- mail = PixelatedMail.from_soledad(*leap_mail, soledad_querier=self.querier)
-
- mail.headers['From'].encode('ascii')
- self.assertEqual(mail.headers['To'], ['"sme mluds" <lisa5@dev.pixelated-project.org>', '"sme mluds" <lisa5@dev.pixelated-project.org>'])
+import pkg_resources
+from twisted.internet import defer
def simple_mail_dict():
@@ -420,7 +62,7 @@ def multipart_mail_dict():
class InputMailTest(unittest.TestCase):
def test_to_mime_multipart_should_add_blank_fields(self):
- pixelated.support.date.iso_now = lambda: 'date now'
+ pixelated.support.date.mail_date_now = lambda: 'date now'
mail_dict = simple_mail_dict()
mail_dict['header']['to'] = ''
@@ -435,8 +77,23 @@ class InputMailTest(unittest.TestCase):
self.assertNotRegexpMatches(mime_multipart.as_string(), "\nCc: \n")
self.assertNotRegexpMatches(mime_multipart.as_string(), "\nSubject: \n")
+ def test_single_recipient(self):
+ mail_single_recipient = {
+ 'body': '',
+ 'header': {
+ 'to': ['to@pixelated.org'],
+ 'cc': [''],
+ 'bcc': [''],
+ 'subject': 'Oi'
+ }
+ }
+
+ result = InputMail.from_dict(mail_single_recipient).raw
+
+ self.assertRegexpMatches(result, 'To: to@pixelated.org')
+
def test_to_mime_multipart(self):
- pixelated.support.date.iso_now = lambda: 'date now'
+ pixelated.support.date.mail_date_now = lambda: 'date now'
mime_multipart = InputMail.from_dict(simple_mail_dict()).to_mime_multipart()
@@ -447,6 +104,16 @@ class InputMailTest(unittest.TestCase):
self.assertRegexpMatches(mime_multipart.as_string(), "\nSubject: Oi\n")
self.assertRegexpMatches(mime_multipart.as_string(), base64.b64encode(simple_mail_dict()['body']))
+ def test_to_mime_multipart_with_special_chars(self):
+ mail_dict = simple_mail_dict()
+ mail_dict['header']['to'] = u'"Älbert Übrö \xF0\x9F\x92\xA9" <äüö@example.mail>'
+ pixelated.support.date.mail_date_now = lambda: 'date now'
+
+ mime_multipart = InputMail.from_dict(mail_dict).to_mime_multipart()
+
+ expected_part_of_encoded_to = 'Iiwgw4QsIGwsIGIsIGUsIHIsIHQsICAsIMOcLCBiLCByLCDDtiwgICwgw7As'
+ self.assertRegexpMatches(mime_multipart.as_string(), expected_part_of_encoded_to)
+
def test_smtp_format(self):
InputMail.FROM_EMAIL_ADDRESS = 'pixelated@org'
diff --git a/service/test/unit/adapter/test_mail_service.py b/service/test/unit/adapter/test_mail_service.py
index f5e29b0c..6faf5140 100644
--- a/service/test/unit/adapter/test_mail_service.py
+++ b/service/test/unit/adapter/test_mail_service.py
@@ -14,33 +14,37 @@
# You should have received a copy of the GNU Affero General Public License
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
from twisted.trial import unittest
-from pixelated.adapter.model.mail import InputMail, PixelatedMail
+from pixelated.adapter.mailstore.leap_mailstore import LeapMail
+from pixelated.adapter.model.mail import InputMail
+from pixelated.adapter.model.status import Status
from pixelated.adapter.services.mail_service import MailService
from test.support.test_helper import mail_dict, leap_mail
-from mockito import mock, unstub, when, verify, verifyNoMoreInteractions, any
-from twisted.internet.defer import Deferred
+from mockito import mock, unstub, when, verify, verifyNoMoreInteractions, any as ANY
+from twisted.internet import defer
class TestMailService(unittest.TestCase):
def setUp(self):
self.drafts = mock()
- self.querier = mock()
+ self.mail_store = mock()
self.mailboxes = mock()
- self.mailboxes.drafts = self.drafts
+
+ self.mailboxes.drafts = defer.succeed(self.drafts)
+
self.mailboxes.trash = mock()
self.mailboxes.sent = mock()
self.mail_sender = mock()
self.search_engine = mock()
- self.mail_service = MailService(self.mailboxes, self.mail_sender, self.querier, self.search_engine)
+ self.mail_service = MailService(self.mail_sender, self.mail_store, self.search_engine)
def tearDown(self):
unstub()
def test_send_mail(self):
- when(InputMail).from_dict(any()).thenReturn('inputmail')
- when(self.mail_sender).sendmail(any()).thenReturn(Deferred())
+ when(InputMail).from_dict(ANY()).thenReturn('inputmail')
+ when(self.mail_sender).sendmail(ANY()).thenReturn(defer.Deferred())
sent_deferred = self.mail_service.send_mail(mail_dict())
@@ -50,64 +54,110 @@ class TestMailService(unittest.TestCase):
return sent_deferred
+ @defer.inlineCallbacks
def test_send_mail_removes_draft(self):
- mail_ident = 'Some ident'
- mail = mail_dict()
- mail['ident'] = mail_ident
- when(InputMail).from_dict(any()).thenReturn('inputmail')
- deferred = Deferred()
- when(self.mail_sender).sendmail(any()).thenReturn(deferred)
+ mail = LeapMail('id', 'INBOX')
+ when(mail).raw = 'raw mail'
+ when(InputMail).from_dict(ANY()).thenReturn(mail)
+ when(self.mail_store).delete_mail('12').thenReturn(defer.succeed(None))
+ when(self.mail_store).add_mail('SENT', ANY()).thenReturn(mail)
- sent_deferred = self.mail_service.send_mail(mail)
+ deferred_success = defer.succeed(None)
+ when(self.mail_sender).sendmail(ANY()).thenReturn(deferred_success)
- verify(self.mail_sender).sendmail("inputmail")
+ yield self.mail_service.send_mail({'ident': '12'})
- def assert_removed_from_drafts(_):
- verify(self.drafts).remove(any())
+ verify(self.mail_sender).sendmail(mail)
+ verify(self.mail_store).add_mail('SENT', mail.raw)
+ verify(self.mail_store).delete_mail('12')
- sent_deferred.addCallback(assert_removed_from_drafts)
- sent_deferred.callback('Assume sending mail succeeded')
+ @defer.inlineCallbacks
+ def test_send_mail_marks_as_read(self):
+ mail = LeapMail('id', 'INBOX')
+ when(mail).raw = 'raw mail'
+ when(InputMail).from_dict(ANY()).thenReturn(mail)
+ when(self.mail_store).delete_mail('12').thenReturn(defer.succeed(None))
+ when(self.mail_sender).sendmail(ANY()).thenReturn(defer.succeed(None))
- return sent_deferred
+ sent_mail = LeapMail('id', 'INBOX')
+ add_mail_deferral = defer.succeed(sent_mail)
+ when(self.mail_store).add_mail('SENT', ANY()).thenReturn(add_mail_deferral)
- def test_send_mail_does_not_delete_draft_on_error(self):
- when(InputMail).from_dict(any()).thenReturn('inputmail')
- when(self.mail_sender).sendmail(any()).thenReturn(Deferred())
+ yield self.mail_service.send_mail({'ident': '12'})
- send_deferred = self.mail_service.send_mail(mail_dict())
+ self.assertIn(Status.SEEN, sent_mail.flags)
+ verify(self.mail_store).update_mail(sent_mail)
- verify(self.mail_sender).sendmail("inputmail")
+ @defer.inlineCallbacks
+ def test_send_mail_does_not_delete_draft_on_error(self):
+ when(InputMail).from_dict(ANY()).thenReturn('inputmail')
+
+ deferred_failure = defer.fail(Exception("Assume sending mail failed"))
+ when(self.mail_sender).sendmail(ANY()).thenReturn(deferred_failure)
- def assert_not_removed_from_drafts(_):
+ try:
+ yield self.mail_service.send_mail({'ident': '12'})
+ self.fail("send_mail is expected to raise if underlying call fails")
+ except:
+ verify(self.mail_sender).sendmail("inputmail")
verifyNoMoreInteractions(self.drafts)
- send_deferred.addErrback(assert_not_removed_from_drafts)
+ @defer.inlineCallbacks
+ def test_mark_as_read(self):
+ mail = LeapMail(1, 'INBOX')
+ when(self.mail_store).get_mail(1, include_body=True).thenReturn(mail)
+ yield self.mail_service.mark_as_read(1)
- send_deferred.errback(Exception('Assume sending mail failed'))
+ self.assertIn(Status.SEEN, mail.flags)
+ verify(self.mail_store).update_mail(mail)
- return send_deferred
+ @defer.inlineCallbacks
+ def test_mark_as_unread(self):
+ mail = LeapMail(1, 'INBOX')
+ mail.flags.add(Status.SEEN)
- def test_mark_as_read(self):
- mail = mock()
- when(self.mail_service).mail(any()).thenReturn(mail)
- self.mail_service.mark_as_read(1)
+ when(self.mail_store).get_mail(1, include_body=True).thenReturn(mail)
+ yield self.mail_service.mark_as_unread(1)
+
+ verify(self.mail_store).update_mail(mail)
- verify(mail).mark_as_read()
+ self.assertNotEqual(mail.status, Status.SEEN)
+ @defer.inlineCallbacks
def test_delete_mail(self):
- mail_to_delete = PixelatedMail.from_soledad(*leap_mail(), soledad_querier=None)
- when(self.mail_service).mail(1).thenReturn(mail_to_delete)
+ mail_to_delete = LeapMail(1, 'INBOX')
+ when(self.mail_store).get_mail(1, include_body=True).thenReturn(defer.succeed(mail_to_delete))
- self.mail_service.delete_mail(1)
+ yield self.mail_service.delete_mail(1)
- verify(self.mailboxes).move_to_trash(1)
+ verify(self.mail_store).move_mail_to_mailbox(1, 'TRASH')
+ @defer.inlineCallbacks
def test_recover_mail(self):
- mail_to_recover = PixelatedMail.from_soledad(*leap_mail(), soledad_querier=None)
+ mail_to_recover = LeapMail(1, 'TRASH')
when(self.mail_service).mail(1).thenReturn(mail_to_recover)
- when(self.mailboxes).move_to_inbox(1).thenReturn(mail_to_recover)
+ when(self.mail_store).move_mail_to_mailbox(1, 'INBOX').thenReturn(mail_to_recover)
+
+ yield self.mail_service.recover_mail(1)
+
+ verify(self.mail_store).move_mail_to_mailbox(1, 'INBOX')
+
+ @defer.inlineCallbacks
+ def test_get_attachment(self):
+ attachment_dict = {'content': bytearray('data'), 'content-type': 'text/plain'}
+ when(self.mail_store).get_mail_attachment('some attachment id').thenReturn(defer.succeed(attachment_dict))
+
+ attachment = yield self.mail_service.attachment('some attachment id')
+
+ self.assertEqual(attachment_dict, attachment)
+
+ @defer.inlineCallbacks
+ def test_update_tags_return_a_set_with_the_current_tags(self):
+ mail = LeapMail(1, 'INBOX', tags={'custom_1', 'custom_2'})
+ when(self.mail_store).get_mail(1, include_body=True).thenReturn(mail)
+ when(self.search_engine).tags(query='', skip_default_tags=True).thenReturn([])
- self.mail_service.recover_mail(1)
+ updated_mail = yield self.mail_service.update_tags(1, {'custom_1', 'custom_3'})
- verify(self.mailboxes).move_to_inbox(1)
- verify(self.search_engine).index_mail(mail_to_recover)
+ verify(self.mail_store).update_mail(mail)
+ self.assertEqual({'custom_1', 'custom_3'}, updated_mail.tags)
diff --git a/service/test/unit/adapter/test_mailbox.py b/service/test/unit/adapter/test_mailbox.py
deleted file mode 100644
index ed634648..00000000
--- a/service/test/unit/adapter/test_mailbox.py
+++ /dev/null
@@ -1,42 +0,0 @@
-#
-# Copyright (c) 2014 ThoughtWorks, Inc.
-#
-# Pixelated is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# Pixelated is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
-import unittest
-
-from pixelated.adapter.model.mail import PixelatedMail
-from pixelated.adapter.services.mailbox import Mailbox
-from mockito import mock, when, verify
-from test.support import test_helper
-
-
-class PixelatedMailboxTest(unittest.TestCase):
- def setUp(self):
- self.querier = mock()
- self.search_engine = mock()
- self.mailbox = Mailbox('INBOX', self.querier, self.search_engine)
-
- def test_remove_message_from_mailbox(self):
- mail = PixelatedMail.from_soledad(*test_helper.leap_mail(), soledad_querier=self.querier)
- when(self.querier).mail(1).thenReturn(mail)
-
- self.mailbox.remove(1)
-
- verify(self.querier).remove_mail(mail)
-
- def test_fresh_mailbox_checking_lastuid(self):
- when(self.querier).get_lastuid('INBOX').thenReturn(0)
- self.assertTrue(self.mailbox.fresh)
- when(self.querier).get_lastuid('INBOX').thenReturn(1)
- self.assertFalse(self.mailbox.fresh)
diff --git a/service/test/unit/adapter/test_mailbox_indexer_listener.py b/service/test/unit/adapter/test_mailbox_indexer_listener.py
index 71c9cd15..9ad3c94d 100644
--- a/service/test/unit/adapter/test_mailbox_indexer_listener.py
+++ b/service/test/unit/adapter/test_mailbox_indexer_listener.py
@@ -13,15 +13,18 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
-import unittest
+from twisted.trial import unittest
-from mockito import mock, when, verify
+from mockito import mock, when, verify, any as ANY
from pixelated.adapter.listeners.mailbox_indexer_listener import MailboxIndexerListener
+from twisted.internet import defer
+
+from pixelated.adapter.listeners.mailbox_indexer_listener import logger
class MailboxListenerTest(unittest.TestCase):
def setUp(self):
- self.querier = mock()
+ self.mail_store = mock()
self.account = mock()
self.account.mailboxes = []
@@ -32,11 +35,11 @@ class MailboxListenerTest(unittest.TestCase):
mailbox.listeners = set()
when(mailbox).addListener = lambda x: mailbox.listeners.add(x)
- self.assertNotIn(MailboxIndexerListener('INBOX', self.querier), mailbox.listeners)
+ self.assertNotIn(MailboxIndexerListener('INBOX', self.mail_store), mailbox.listeners)
- MailboxIndexerListener.listen(self.account, 'INBOX', self.querier)
+ MailboxIndexerListener.listen(self.account, 'INBOX', self.mail_store)
- self.assertIn(MailboxIndexerListener('INBOX', self.querier), mailbox.listeners)
+ self.assertIn(MailboxIndexerListener('INBOX', self.mail_store), mailbox.listeners)
def test_reindex_missing_idents(self):
search_engine = mock()
@@ -44,11 +47,20 @@ class MailboxListenerTest(unittest.TestCase):
MailboxIndexerListener.SEARCH_ENGINE = search_engine
- listener = MailboxIndexerListener('INBOX', self.querier)
- when(self.querier).idents_by_mailbox('INBOX').thenReturn({'ident1', 'ident2', 'missing_ident'})
- self.querier.used_arguments = []
- self.querier.mails = lambda x: self.querier.used_arguments.append(x)
+ listener = MailboxIndexerListener('INBOX', self.mail_store)
+ when(self.mail_store).get_mailbox_mail_ids('INBOX').thenReturn({'ident1', 'ident2', 'missing_ident'})
+ self.mail_store.used_arguments = []
+ self.mail_store.get_mails = lambda x: self.mail_store.used_arguments.append(x)
listener.newMessages(10, 5)
- verify(self.querier, times=1).idents_by_mailbox('INBOX')
- self.assertIn({'missing_ident'}, self.querier.used_arguments)
+ verify(self.mail_store, times=1).get_mails('INBOX')
+ self.assertIn({'missing_ident'}, self.mail_store.used_arguments)
+
+ @defer.inlineCallbacks
+ def test_catches_exceptions_to_not_break_other_listeners(self):
+ when(logger).error(ANY()).thenReturn(None)
+ listener = MailboxIndexerListener('INBOX', self.mail_store)
+
+ yield listener.newMessages(1, 1)
+
+ verify(logger).error(ANY())
diff --git a/service/test/unit/adapter/test_mailboxes.py b/service/test/unit/adapter/test_mailboxes.py
deleted file mode 100644
index 6ff3849b..00000000
--- a/service/test/unit/adapter/test_mailboxes.py
+++ /dev/null
@@ -1,41 +0,0 @@
-#
-# Copyright (c) 2014 ThoughtWorks, Inc.
-#
-# Pixelated is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# Pixelated is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
-import unittest
-
-from pixelated.adapter.model.mail import PixelatedMail
-from pixelated.adapter.services.mailboxes import Mailboxes
-from mockito import mock, when, verify
-from test.support import test_helper
-from mock import MagicMock
-
-
-class PixelatedMailboxesTest(unittest.TestCase):
-
- def setUp(self):
- self.querier = mock()
- self.search_engine = mock()
- self.account = MagicMock()
- self.mailboxes = Mailboxes(self.account, self.querier, self.search_engine)
-
- def test_move_to_inbox(self):
- mail = PixelatedMail.from_soledad(*test_helper.leap_mail(), soledad_querier=self.querier)
- when(self.querier).mail(1).thenReturn(mail)
- when(mail).save().thenReturn(None)
-
- mail.set_mailbox('TRASH')
- recovered_mail = self.mailboxes.move_to_inbox(1)
- self.assertEquals('INBOX', recovered_mail.mailbox_name)
- verify(mail).save()
diff --git a/service/test/unit/adapter/test_soledad_querier.py b/service/test/unit/adapter/test_soledad_querier.py
deleted file mode 100644
index e5ea457d..00000000
--- a/service/test/unit/adapter/test_soledad_querier.py
+++ /dev/null
@@ -1,150 +0,0 @@
-#
-# Copyright (c) 2014 ThoughtWorks, Inc.
-#
-# Pixelated is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# Pixelated is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
-import unittest
-import json
-import base64
-import quopri
-
-from pixelated.adapter.soledad.soledad_querier import SoledadQuerier
-from mockito import mock, when, any
-import os
-
-
-class SoledadQuerierTest(unittest.TestCase):
-
- def test_extract_parts(self):
- soledad = mock()
- bdoc = mock()
- bdoc.content = {'raw': 'esse papo seu ta qualquer coisa'}
- when(soledad).get_from_index('by-type-and-payloadhash', 'cnt', any(unicode)).thenReturn([bdoc])
- multipart_attachment_file = os.path.join(os.path.dirname(__file__), '..', 'fixtures', 'multipart_attachment.json')
- with open(multipart_attachment_file) as f:
- hdoc = json.loads(f.read())
- querier = SoledadQuerier(soledad)
-
- parts = querier._extract_parts(hdoc)
-
- self.assertIn('alternatives', parts.keys())
- self.assertIn('attachments', parts.keys())
- self.assertEquals(2, len(parts['alternatives']))
- self.assertEquals(1, len(parts['attachments']))
-
- self.check_alternatives(parts)
- self.check_attachments(parts)
-
- def check_alternatives(self, parts):
- for alternative in parts['alternatives']:
- self.assertIn('headers', alternative)
- self.assertIn('content', alternative)
-
- def check_attachments(self, parts):
- for attachment in parts['attachments']:
- self.assertIn('headers', attachment)
- self.assertIn('ident', attachment)
- self.assertIn('name', attachment)
-
- def test_extract_part_without_headers(self):
- soledad = mock()
- bdoc = mock()
- bdoc.content = {'raw': 'esse papo seu ta qualquer coisa'}
- when(soledad).get_from_index('by-type-and-payloadhash', 'cnt', any(unicode)).thenReturn([bdoc])
- hdoc = {'multi': True, 'part_map': {'1': {'multi': False, 'phash': u'0400BEBACAFE'}}}
- querier = SoledadQuerier(soledad)
-
- parts = querier._extract_parts(hdoc)
-
- self.assertEquals(bdoc.content['raw'], parts['alternatives'][0]['content'])
-
- def test_extract_handles_missing_part_map(self):
- soledad = mock()
- hdoc = {u'multi': True,
- u'ctype': u'message/delivery-status',
- u'headers': [[u'Content-Description', u'Delivery report'], [u'Content-Type', u'message/delivery-status']],
- u'parts': 2,
- u'phash': None,
- u'size': 554}
- querier = SoledadQuerier(soledad)
-
- parts = querier._extract_parts(hdoc)
-
- self.assertEquals(0, len(parts['alternatives']))
- self.assertEquals(0, len(parts['attachments']))
-
- def test_attachment_base64(self):
- soledad = mock()
- bdoc = mock()
- bdoc.content = {'raw': base64.encodestring('esse papo seu ta qualquer coisa'), 'content-type': 'text/plain'}
- when(soledad).get_from_index('by-type-and-payloadhash', 'cnt', any(unicode)).thenReturn([bdoc])
- querier = SoledadQuerier(soledad)
-
- attachment = querier.attachment(u'0400BEBACAFE', 'base64')
-
- self.assertEquals('esse papo seu ta qualquer coisa', attachment['content'])
-
- def test_attachment_quoted_printable(self):
- soledad = mock()
- bdoc = mock()
- bdoc.content = {'raw': quopri.encodestring('esse papo seu ta qualquer coisa'), 'content-type': 'text/plain'}
- when(soledad).get_from_index('by-type-and-payloadhash', 'cnt', any(unicode)).thenReturn([bdoc])
- querier = SoledadQuerier(soledad)
-
- attachment = querier.attachment(u'0400BEBACAFE', 'quoted-printable')
-
- self.assertEquals('esse papo seu ta qualquer coisa', attachment['content'])
-
- def test_empty_or_null_queries_are_ignored(self):
- soledad = mock()
- when(soledad).get_from_index(any(), any(), any()).thenReturn(['nonempty', 'list'])
- querier = SoledadQuerier(soledad)
-
- test_parameters = ['', None]
-
- def call_with_bad_parameters(funct):
- for param in test_parameters:
- self.assertFalse(funct(param))
-
- call_with_bad_parameters(querier.get_all_flags_by_mbox)
- call_with_bad_parameters(querier.get_content_by_phash)
- call_with_bad_parameters(querier.get_flags_by_chash)
- call_with_bad_parameters(querier.get_header_by_chash)
- call_with_bad_parameters(querier.get_recent_by_mbox)
- call_with_bad_parameters(querier.idents_by_mailbox)
- call_with_bad_parameters(querier.get_mbox)
-
- def test_get_lastuid(self):
- soledad = mock()
- mbox = mock()
- mbox.content = {'lastuid': 0}
- when(soledad).get_from_index('by-type-and-mbox', 'mbox', 'INBOX').thenReturn([mbox])
- querier = SoledadQuerier(soledad)
-
- self.assertEquals(querier.get_lastuid(querier.get_mbox('INBOX')[0]), 0)
- mbox.content = {'lastuid': 1}
- self.assertEquals(querier.get_lastuid(querier.get_mbox('INBOX')[0]), 1)
-
- def test_create_mail_increments_uid(self):
- soledad = mock()
- mbox = mock()
- mail = mock()
- when(mail).get_for_save(next_uid=any(), mailbox='INBOX').thenReturn([])
- mbox.content = {'lastuid': 0}
- when(soledad).get_from_index('by-type-and-mbox', 'mbox', 'INBOX').thenReturn([mbox])
- querier = SoledadQuerier(soledad)
- when(querier).mail(any()).thenReturn([])
-
- self.assertEquals(querier.get_lastuid(querier.get_mbox('INBOX')[0]), 0)
- querier.create_mail(mail, 'INBOX')
- self.assertEquals(querier.get_lastuid(querier.get_mbox('INBOX')[0]), 1)