summaryrefslogtreecommitdiff
path: root/service/test/unit
diff options
context:
space:
mode:
Diffstat (limited to 'service/test/unit')
-rw-r--r--service/test/unit/adapter/mailstore/__init__.py15
-rw-r--r--service/test/unit/adapter/mailstore/maintenance/__init__.py15
-rw-r--r--service/test/unit/adapter/mailstore/maintenance/test_soledad_maintenance.py109
-rw-r--r--service/test/unit/adapter/mailstore/test_body_parser.py57
-rw-r--r--service/test/unit/adapter/mailstore/test_leap_mail.py230
-rw-r--r--service/test/unit/adapter/mailstore/test_leap_mailstore.py474
-rw-r--r--service/test/unit/adapter/mailstore/test_searchable_mailstore.py112
-rw-r--r--service/test/unit/adapter/search/test_index_storage_key.py52
-rw-r--r--service/test/unit/adapter/search/test_search.py3
-rw-r--r--service/test/unit/adapter/test_draft_service.py15
-rw-r--r--service/test/unit/adapter/test_mail.py397
-rw-r--r--service/test/unit/adapter/test_mail_service.py138
-rw-r--r--service/test/unit/adapter/test_mailbox.py42
-rw-r--r--service/test/unit/adapter/test_mailbox_indexer_listener.py36
-rw-r--r--service/test/unit/adapter/test_mailboxes.py41
-rw-r--r--service/test/unit/adapter/test_soledad_querier.py150
-rw-r--r--service/test/unit/bitmask_libraries/test_abstract_leap.py39
-rw-r--r--service/test/unit/bitmask_libraries/test_nicknym.py14
-rw-r--r--service/test/unit/bitmask_libraries/test_provider.py6
-rw-r--r--service/test/unit/bitmask_libraries/test_session.py38
-rw-r--r--service/test/unit/bitmask_libraries/test_smtp.py1
-rw-r--r--service/test/unit/bitmask_libraries/test_soledad.py52
-rw-r--r--service/test/unit/config/test_register.py6
-rw-r--r--service/test/unit/config/test_site.py28
-rw-r--r--service/test/unit/fixtures/__init__.py0
-rw-r--r--service/test/unit/fixtures/bounced_mail_hdoc.json218
-rw-r--r--service/test/unit/fixtures/mailset/new/mbox000000002
-rw-r--r--service/test/unit/fixtures/mailset/new/mbox000000012
-rw-r--r--service/test/unit/maintenance/test_commands.py36
-rw-r--r--service/test/unit/resources/test_feedback_resource.py27
-rw-r--r--service/test/unit/resources/test_keys_resources.py34
-rw-r--r--service/test/unit/support/test_encrypted_file_storage.py4
-rw-r--r--service/test/unit/test_application.py8
-rw-r--r--service/test/unit/test_welcome_mail.py73
34 files changed, 1480 insertions, 994 deletions
diff --git a/service/test/unit/adapter/mailstore/__init__.py b/service/test/unit/adapter/mailstore/__init__.py
new file mode 100644
index 00000000..c5c30cde
--- /dev/null
+++ b/service/test/unit/adapter/mailstore/__init__.py
@@ -0,0 +1,15 @@
+#
+# Copyright (c) 2015 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
diff --git a/service/test/unit/adapter/mailstore/maintenance/__init__.py b/service/test/unit/adapter/mailstore/maintenance/__init__.py
new file mode 100644
index 00000000..c5c30cde
--- /dev/null
+++ b/service/test/unit/adapter/mailstore/maintenance/__init__.py
@@ -0,0 +1,15 @@
+#
+# Copyright (c) 2015 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
diff --git a/service/test/unit/adapter/mailstore/maintenance/test_soledad_maintenance.py b/service/test/unit/adapter/mailstore/maintenance/test_soledad_maintenance.py
new file mode 100644
index 00000000..9a89d62b
--- /dev/null
+++ b/service/test/unit/adapter/mailstore/maintenance/test_soledad_maintenance.py
@@ -0,0 +1,109 @@
+#
+# Copyright (c) 2015 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+from leap.soledad.common.document import SoledadDocument
+from twisted.internet import defer
+
+from twisted.trial import unittest
+from mockito import mock, when, verify, never
+from pixelated.adapter.mailstore.maintenance import SoledadMaintenance
+from leap.keymanager.openpgp import OpenPGPKey
+
+SOME_EMAIL_ADDRESS = 'foo@example.tld'
+SOME_KEY_ID = '4914254E384E264C'
+
+
+class TestSoledadMaintenance(unittest.TestCase):
+
+ def test_repair_is_deferred(self):
+ soledad = mock()
+ when(soledad).get_all_docs().thenReturn(defer.succeed((1, [])))
+
+ d = SoledadMaintenance(soledad).repair()
+
+ self.assertIsInstance(d, defer.Deferred)
+
+ @defer.inlineCallbacks
+ def test_repair_delete_public_key_active_docs(self):
+ soledad = mock()
+ key = self._public_key(SOME_EMAIL_ADDRESS, SOME_KEY_ID)
+ active_doc = SoledadDocument(doc_id='some_doc', json=key.get_active_json(SOME_EMAIL_ADDRESS))
+ when(soledad).get_all_docs().thenReturn(defer.succeed((1, [active_doc])))
+
+ yield SoledadMaintenance(soledad).repair()
+
+ verify(soledad).delete_doc(active_doc)
+
+ @defer.inlineCallbacks
+ def test_repair_delete_public_key_docs(self):
+ soledad = mock()
+ key = self._public_key(SOME_EMAIL_ADDRESS, SOME_KEY_ID)
+ active_doc = SoledadDocument(doc_id='some_doc', json=key.get_active_json(SOME_EMAIL_ADDRESS))
+ key_doc = SoledadDocument(doc_id='some_doc', json=key.get_json())
+ when(soledad).get_all_docs().thenReturn(defer.succeed((1, [key_doc, active_doc])))
+
+ yield SoledadMaintenance(soledad).repair()
+
+ verify(soledad).delete_doc(active_doc)
+ verify(soledad).delete_doc(key_doc)
+
+ @defer.inlineCallbacks
+ def test_repair_keeps_active_and_key_doc_if_private_key_exists(self):
+ soledad = mock()
+ key = self._public_key(SOME_EMAIL_ADDRESS, SOME_KEY_ID)
+ private_key = self._private_key(SOME_EMAIL_ADDRESS, SOME_KEY_ID)
+ active_doc = SoledadDocument(doc_id='some_doc', json=key.get_active_json(SOME_EMAIL_ADDRESS))
+ key_doc = SoledadDocument(doc_id='some_doc', json=key.get_json())
+ private_key_doc = SoledadDocument(doc_id='some_doc', json=private_key.get_json())
+ when(soledad).get_all_docs().thenReturn(defer.succeed((1, [key_doc, active_doc, private_key_doc])))
+
+ yield SoledadMaintenance(soledad).repair()
+
+ verify(soledad, never).delete_doc(key_doc)
+ verify(soledad, never).delete_doc(active_doc)
+ verify(soledad, never).delete_doc(private_key_doc)
+
+ @defer.inlineCallbacks
+ def test_repair_only_deletes_key_docs(self):
+ soledad = mock()
+ key = self._public_key(SOME_EMAIL_ADDRESS, SOME_KEY_ID)
+ key_doc = SoledadDocument(doc_id='some_doc', json=key.get_active_json(SOME_EMAIL_ADDRESS))
+ other_doc = SoledadDocument(doc_id='something', json='{}')
+ when(soledad).get_all_docs().thenReturn(defer.succeed((1, [key_doc, other_doc])))
+
+ yield SoledadMaintenance(soledad).repair()
+
+ verify(soledad, never).delete_doc(other_doc)
+
+ @defer.inlineCallbacks
+ def test_repair_recreates_public_key_active_doc_if_necessary(self):
+ soledad = mock()
+
+ private_key = self._private_key(SOME_EMAIL_ADDRESS, SOME_KEY_ID)
+ private_key_doc = SoledadDocument(doc_id='some_doc', json=private_key.get_json())
+ when(soledad).get_all_docs().thenReturn(defer.succeed((1, [private_key_doc])))
+
+ yield SoledadMaintenance(soledad).repair()
+
+ verify(soledad).create_doc_from_json('{"key_id": "4914254E384E264C", "tags": ["keymanager-active"], "type": "OpenPGPKey-active", "private": false, "address": "foo@example.tld"}')
+
+ def _public_key(self, address, keyid):
+ return self._gpgkey(address, keyid, private=False)
+
+ def _private_key(self, address, keyid):
+ return self._gpgkey(address, keyid, private=True)
+
+ def _gpgkey(self, address, keyid, private=False):
+ return OpenPGPKey(address, key_id=keyid, private=private)
diff --git a/service/test/unit/adapter/mailstore/test_body_parser.py b/service/test/unit/adapter/mailstore/test_body_parser.py
new file mode 100644
index 00000000..9d58637c
--- /dev/null
+++ b/service/test/unit/adapter/mailstore/test_body_parser.py
@@ -0,0 +1,57 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2015 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+import unittest
+from mock import patch
+from pixelated.adapter.mailstore.body_parser import BodyParser
+
+
+class BodyParserTest(unittest.TestCase):
+
+ def test_simple_text(self):
+ parser = BodyParser('simple text')
+
+ self.assertEqual('simple text', parser.parsed_content())
+
+ def test_base64_text(self):
+ parser = BodyParser('dGVzdCB0ZXh0\n', content_type='text/plain; charset="utf-8"', content_transfer_encoding='base64')
+
+ self.assertEqual('test text', parser.parsed_content())
+
+ def test_8bit_transfer_encoding_with_iso_8859_1_str_input(self):
+ data = 'Hmm, here are \xdcml\xe4\xfcts again!'
+ parser = BodyParser(data, content_type='text/plain; charset=iso-8859-1', content_transfer_encoding='8bit')
+
+ self.assertEqual(u'Hmm, here are Ümläüts again!', parser.parsed_content())
+
+ def test_8bit_transfer_encoding_with_iso_8859_1_unicode_input(self):
+ data = u'Hmm, here are \xdcml\xe4\xfcts again!'
+ parser = BodyParser(data, content_type='text/plain; charset=iso-8859-1', content_transfer_encoding='8bit')
+
+ self.assertEqual(u'Hmm, here are Ümläüts again!', parser.parsed_content())
+
+ def test_base64_with_default_us_ascii_encoding(self):
+ parser = BodyParser('dGVzdCB0ZXh0\n', content_type='text/plain', content_transfer_encoding='base64')
+
+ self.assertEqual('test text', parser.parsed_content())
+
+ @patch('pixelated.adapter.mailstore.body_parser.logger')
+ def test_body_parser_logs_problems_and_then_ignores_invalid_chars(self, logger_mock):
+ data = u'unkown char: \ufffd'
+ parser = BodyParser(data, content_type='text/plain; charset=iso-8859-1', content_transfer_encoding='8bit')
+
+ self.assertEqual(u'unkown char: ', parser.parsed_content())
+ logger_mock.warn.assert_called_with(u'Failed to encode content for charset iso-8859-1. Ignoring invalid chars: \'latin-1\' codec can\'t encode character u\'\\ufffd\' in position 13: ordinal not in range(256)')
diff --git a/service/test/unit/adapter/mailstore/test_leap_mail.py b/service/test/unit/adapter/mailstore/test_leap_mail.py
new file mode 100644
index 00000000..ef585654
--- /dev/null
+++ b/service/test/unit/adapter/mailstore/test_leap_mail.py
@@ -0,0 +1,230 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2015 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+from mock import patch
+from twisted.trial.unittest import TestCase
+
+from pixelated.adapter.mailstore.leap_mailstore import LeapMail, AttachmentInfo
+
+
+class TestLeapMail(TestCase):
+ def test_leap_mail(self):
+ mail = LeapMail('', 'INBOX', {'From': 'test@example.test', 'Subject': 'A test Mail', 'To': 'receiver@example.test'})
+
+ self.assertEqual('test@example.test', mail.from_sender)
+ self.assertEqual(['receiver@example.test'], mail.to)
+ self.assertEqual('A test Mail', mail.subject)
+
+ def test_email_addresses_in_to_are_split_into_a_list(self):
+ mail = LeapMail('', 'INBOX', {'To': 'first@example.test,second@example.test'})
+
+ self.assertEqual(['first@example.test', 'second@example.test'], mail.headers['To'])
+
+ def test_email_addresses_in_cc_are_split_into_a_list(self):
+ mail = LeapMail('', 'INBOX', {'Cc': 'first@example.test,second@example.test'})
+
+ self.assertEqual(['first@example.test', 'second@example.test'], mail.headers['Cc'])
+
+ def test_email_addresses_in_bcc_are_split_into_a_list(self):
+ mail = LeapMail('', 'INBOX', {'Bcc': 'first@example.test,second@example.test'})
+
+ self.assertEqual(['first@example.test', 'second@example.test'], mail.headers['Bcc'])
+
+ def test_email_addresses_might_be_empty_array(self):
+ mail = LeapMail('', 'INBOX', {'Cc': None})
+
+ self.assertEqual([], mail.headers['Cc'])
+
+ def test_as_dict(self):
+ mail = LeapMail('doc id', 'INBOX', {'From': 'test@example.test', 'Subject': 'A test Mail', 'To': 'receiver@example.test,receiver2@other.test'}, ('foo', 'bar'))
+ self.maxDiff = None
+ expected = {
+ 'header': {
+ 'from': 'test@example.test',
+ 'subject': 'A test Mail',
+ 'to': ['receiver@example.test', 'receiver2@other.test'],
+
+ },
+ 'ident': 'doc id',
+ 'mailbox': 'inbox',
+ 'tags': {'foo', 'bar'},
+ 'status': [],
+ 'body': None,
+ 'textPlainBody': None,
+ 'security_casing': {
+ 'imprints': [{'state': 'no_signature_information'}],
+ 'locks': []
+ },
+ 'replying': {'all': {'cc-field': [],
+ 'to-field': ['receiver@example.test',
+ 'test@example.test',
+ 'receiver2@other.test']},
+ 'single': 'test@example.test'},
+ 'attachments': []
+ }
+
+ self.assertEqual(expected, mail.as_dict())
+
+ def test_as_dict_with_body(self):
+ body = 'some body content'
+ mail = LeapMail('doc id', 'INBOX', {'From': 'test@example.test', 'Subject': 'A test Mail', 'To': 'receiver@example.test'}, ('foo', 'bar'), body=body)
+
+ self.assertEqual(body, mail.as_dict()['body'])
+
+ def test_as_dict_with_attachments(self):
+ mail = LeapMail('doc id', 'INBOX', attachments=[AttachmentInfo('id', 'name', 'encoding')])
+
+ self.assertEqual([{'ident': 'id', 'name': 'name', 'encoding': 'encoding'}],
+ mail.as_dict()['attachments'])
+
+ def test_as_dict_headers_with_special_chars(self):
+ expected_address = u'"\xc4lbert \xdcbr\xf6" <\xe4\xfc\xf6@example.mail>'
+ expected_subject = u'H\xe4ll\xf6 W\xf6rld'
+ mail = LeapMail('', 'INBOX',
+ {'From': '=?iso-8859-1?q?=22=C4lbert_=DCbr=F6=22_=3C=E4=FC=F6=40example=2Email=3E?=',
+ 'To': '=?iso-8859-1?q?=22=C4lbert_=DCbr=F6=22_=3C=E4=FC=F6=40example=2Email=3E?=',
+ 'Cc': '=?iso-8859-1?q?=22=C4lbert_=DCbr=F6=22_=3C=E4=FC=F6=40example=2Email=3E?=',
+ 'Subject': '=?iso-8859-1?q?H=E4ll=F6_W=F6rld?='})
+
+ self.assertEqual(expected_address, mail.as_dict()['header']['from'])
+ self.assertEqual([expected_address], mail.as_dict()['header']['to'])
+ self.assertEqual([expected_address], mail.as_dict()['header']['cc'])
+ self.assertEqual(expected_subject, mail.as_dict()['header']['subject'])
+
+ def test_as_dict_replying_with_special_chars(self):
+ expected_address = u'"\xc4lbert \xdcbr\xf6" <\xe4\xfc\xf6@example.mail>'
+ mail = LeapMail('', 'INBOX',
+ {'From': '=?iso-8859-1?q?=22=C4lbert_=DCbr=F6=22_=3C=E4=FC=F6=40example=2Email=3E?=',
+ 'To': '=?iso-8859-1?q?=22=C4lbert_=DCbr=F6=22_=3C=E4=FC=F6=40example=2Email=3E?=',
+ 'Cc': '=?iso-8859-1?q?=22=C4lbert_=DCbr=F6=22_=3C=E4=FC=F6=40example=2Email=3E?=',
+ 'Subject': '=?iso-8859-1?q?H=E4ll=F6_W=F6rld?='})
+
+ self.assertEqual([expected_address], mail.as_dict()['replying']['all']['to-field'])
+ self.assertEqual([expected_address], mail.as_dict()['replying']['all']['cc-field'])
+ self.assertEqual(expected_address, mail.as_dict()['replying']['single'])
+
+ def test_reply_all_result_does_not_contain_own_address_in_to_with_spaces(self):
+ my_address = 'myaddress@example.test'
+
+ with patch('pixelated.adapter.mailstore.leap_mailstore.InputMail.FROM_EMAIL_ADDRESS', my_address):
+ mail = LeapMail('', 'INBOX',
+ {'From': 'test@example.test',
+ 'To': 'receiver@example.test, %s ' % my_address})
+
+ self.assertEqual(['receiver@example.test', 'test@example.test'], mail.as_dict()['replying']['all']['to-field'])
+
+ def test_reply_all_result_does_not_contain_own_address_in_to_with_name(self):
+ my_address = 'myaddress@example.test'
+
+ with patch('pixelated.adapter.mailstore.leap_mailstore.InputMail.FROM_EMAIL_ADDRESS', my_address):
+ mail = LeapMail('', 'INBOX',
+ {'From': 'test@example.test',
+ 'To': 'receiver@example.test, Folker Bernitt <%s>' % my_address})
+
+ self.assertEqual(['receiver@example.test', 'test@example.test'], mail.as_dict()['replying']['all']['to-field'])
+
+ def test_reply_all_result_does_not_contain_own_address_in_to_with_encoded(self):
+ my_address = 'myaddress@example.test'
+
+ with patch('pixelated.adapter.mailstore.leap_mailstore.InputMail.FROM_EMAIL_ADDRESS', my_address):
+ mail = LeapMail('', 'INBOX',
+ {'From': 'test@example.test',
+ 'To': 'receiver@example.test, =?iso-8859-1?q?=C4lbert_=3Cmyaddress=40example=2Etest=3E?='})
+
+ self.assertEqual(['receiver@example.test', 'test@example.test'], mail.as_dict()['replying']['all']['to-field'])
+
+ def test_reply_all_result_does_not_contain_own_address_in_cc(self):
+ my_address = 'myaddress@example.test'
+
+ with patch('pixelated.adapter.mailstore.leap_mailstore.InputMail.FROM_EMAIL_ADDRESS', my_address):
+ mail = LeapMail('', 'INBOX',
+ {'From': 'test@example.test',
+ 'To': 'receiver@example.test',
+ 'Cc': my_address})
+
+ self.assertEqual([], mail.as_dict()['replying']['all']['cc-field'])
+
+ def test_reply_all_result_does_not_contain_own_address_if_sender(self):
+ my_address = 'myaddress@example.test'
+
+ with patch('pixelated.adapter.mailstore.leap_mailstore.InputMail.FROM_EMAIL_ADDRESS', my_address):
+ mail = LeapMail('', 'INBOX',
+ {'From': 'myaddress@example.test',
+ 'To': 'receiver@example.test'})
+
+ self.assertEqual(['receiver@example.test'], mail.as_dict()['replying']['all']['to-field'])
+
+ def test_reply_all_result_does_contain_own_address_if_only_recipient(self):
+ my_address = 'myaddress@example.test'
+
+ with patch('pixelated.adapter.mailstore.leap_mailstore.InputMail.FROM_EMAIL_ADDRESS', my_address):
+ mail = LeapMail('', 'INBOX',
+ {'From': 'myaddress@example.test',
+ 'To': 'myaddress@example.test'})
+
+ self.assertEqual(['myaddress@example.test'], mail.as_dict()['replying']['all']['to-field'])
+
+ def test_reply_result_swaps_sender_and_recipient_if_i_am_the_sender(self):
+ my_address = 'myaddress@example.test'
+
+ with patch('pixelated.adapter.mailstore.leap_mailstore.InputMail.FROM_EMAIL_ADDRESS', my_address):
+ mail = LeapMail('', 'INBOX',
+ {'From': 'myaddress@example.test',
+ 'To': 'recipient@example.test'})
+
+ self.assertEqual('recipient@example.test', mail.as_dict()['replying']['single'])
+
+ def test_as_dict_with_mixed_encodings(self):
+ subject = 'Another test with =?iso-8859-1?B?3G1s5Px0?= =?iso-8859-1?Q?s?='
+ mail = LeapMail('', 'INBOX',
+ {'Subject': subject})
+
+ self.assertEqual(u'Another test with Ümläüts', mail.as_dict()['header']['subject'])
+
+ def test_raw_constructed_by_headers_and_body(self):
+ body = 'some body content'
+ mail = LeapMail('doc id', 'INBOX', {'From': 'test@example.test', 'Subject': 'A test Mail', 'To': 'receiver@example.test'}, ('foo', 'bar'), body=body)
+
+ result = mail.raw
+
+ expected_raw = 'To: receiver@example.test\nFrom: test@example.test\nSubject: A test Mail\n\nsome body content'
+ self.assertEqual(expected_raw, result)
+
+ def test_headers_none_recipients_are_converted_to_empty_array(self):
+ mail = LeapMail('id', 'INBOX', {'To': None, 'Cc': None, 'Bcc': None})
+
+ self.assertEquals([], mail.headers['To'])
+ self.assertEquals([], mail.headers['Cc'])
+ self.assertEquals([], mail.headers['Bcc'])
+
+ def test_security_casing(self):
+ # No Encryption, no Signature
+ mail = LeapMail('id', 'INBOX', {})
+ self.assertEqual({'locks': [], 'imprints': [{'state': 'no_signature_information'}]}, mail.security_casing)
+
+ # Encryption
+ mail = LeapMail('id', 'INBOX', {'X-Leap-Encryption': 'decrypted'})
+ self.assertEqual([{'state': 'valid'}], mail.security_casing['locks'])
+
+ mail = LeapMail('id', 'INBOX', {'X-Leap-Encryption': 'false'})
+ self.assertEqual([], mail.security_casing['locks'])
+
+ # Signature
+ mail = LeapMail('id', 'INBOX', {'X-Leap-Signature': 'valid'})
+ self.assertEqual([{'seal': {'validity': 'valid'}, 'state': 'valid'}], mail.security_casing['imprints'])
+
+ mail = LeapMail('id', 'INBOX', {'X-Leap-Signature': 'invalid'})
+ self.assertEqual([], mail.security_casing['imprints'])
diff --git a/service/test/unit/adapter/mailstore/test_leap_mailstore.py b/service/test/unit/adapter/mailstore/test_leap_mailstore.py
new file mode 100644
index 00000000..4eabc144
--- /dev/null
+++ b/service/test/unit/adapter/mailstore/test_leap_mailstore.py
@@ -0,0 +1,474 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2015 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+import base64
+from email.header import Header
+from email.mime.application import MIMEApplication
+from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
+import json
+import quopri
+from uuid import uuid4
+from email.parser import Parser
+import os
+from leap.soledad.common.document import SoledadDocument
+from leap.mail.adaptors.soledad_indexes import MAIL_INDEXES
+from twisted.internet.defer import FirstError
+from twisted.trial.unittest import TestCase
+from leap.mail import constants
+from twisted.internet import defer
+from mockito import mock, when, verify, any as ANY
+import test.support.mockito
+from leap.mail.adaptors.soledad import SoledadMailAdaptor, MailboxWrapper, ContentDocWrapper
+import pkg_resources
+from leap.mail.mail import Message
+from pixelated.adapter.mailstore import underscore_uuid
+
+from pixelated.adapter.mailstore.leap_mailstore import LeapMailStore, LeapMail, AttachmentInfo
+
+
+class TestLeapMailStore(TestCase):
+ def setUp(self):
+ self.soledad = mock()
+ self.mbox_uuid = str(uuid4())
+ self.doc_by_id = {}
+ self.mbox_uuid_by_name = {}
+ self.mbox_soledad_docs = []
+
+ when(self.soledad).get_from_index('by-type', 'mbox').thenAnswer(lambda: defer.succeed(self.mbox_soledad_docs))
+ self._mock_get_mailbox('INBOX')
+
+ @defer.inlineCallbacks
+ def test_get_mail_not_exist(self):
+ when(self.soledad).get_doc(ANY()).thenAnswer(lambda: defer.succeed(None))
+ store = LeapMailStore(self.soledad)
+
+ mail = yield store.get_mail(_format_mdoc_id(uuid4(), 1))
+
+ self.assertIsNone(mail)
+
+ @defer.inlineCallbacks
+ def test_get_mail(self):
+ mdoc_id, _ = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+
+ store = LeapMailStore(self.soledad)
+
+ mail = yield store.get_mail(mdoc_id)
+
+ self.assertIsInstance(mail, LeapMail)
+ self.assertEqual('darby.senger@zemlak.biz', mail.from_sender)
+ self.assertEqual(['carmel@murazikortiz.name'], mail.to)
+ self.assertEqual('Itaque consequatur repellendus provident sunt quia.', mail.subject)
+ self.assertIsNone(mail.body)
+ self.assertEqual('INBOX', mail.mailbox_name)
+
+ @defer.inlineCallbacks
+ def test_get_mail_from_mailbox(self):
+ other, _ = self._mock_get_mailbox('OTHER', create_new_uuid=True)
+ mdoc_id, _ = self._add_mail_fixture_to_soledad_from_file('mbox00000000', other.uuid)
+
+ store = LeapMailStore(self.soledad)
+
+ mail = yield store.get_mail(mdoc_id)
+
+ self.assertEqual('OTHER', mail.mailbox_name)
+
+ @defer.inlineCallbacks
+ def test_get_two_different_mails(self):
+ first_mdoc_id, _ = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+ second_mdoc_id, _ = self._add_mail_fixture_to_soledad_from_file('mbox00000001')
+
+ store = LeapMailStore(self.soledad)
+
+ mail1 = yield store.get_mail(first_mdoc_id)
+ mail2 = yield store.get_mail(second_mdoc_id)
+
+ self.assertNotEqual(mail1, mail2)
+ self.assertEqual('Itaque consequatur repellendus provident sunt quia.', mail1.subject)
+ self.assertEqual('Error illum dignissimos autem eos aspernatur.', mail2.subject)
+
+ @defer.inlineCallbacks
+ def test_get_mails(self):
+ first_mdoc_id, _ = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+ second_mdoc_id, _ = self._add_mail_fixture_to_soledad_from_file('mbox00000001')
+
+ store = LeapMailStore(self.soledad)
+
+ mails = yield store.get_mails([first_mdoc_id, second_mdoc_id])
+
+ self.assertEqual(2, len(mails))
+ self.assertEqual('Itaque consequatur repellendus provident sunt quia.', mails[0].subject)
+ self.assertEqual('Error illum dignissimos autem eos aspernatur.', mails[1].subject)
+
+ @defer.inlineCallbacks
+ def test_get_mails_fails_for_invalid_mail_id(self):
+ store = LeapMailStore(self.soledad)
+
+ try:
+ yield store.get_mails(['invalid'])
+ self.fail('Exception expected')
+ except FirstError:
+ pass
+
+ @defer.inlineCallbacks
+ def test_get_mail_with_body(self):
+ expeted_body = 'Dignissimos ducimus veritatis. Est tenetur consequatur quia occaecati. Vel sit sit voluptas.\n\nEarum distinctio eos. Accusantium qui sint ut quia assumenda. Facere dignissimos inventore autem sit amet. Pariatur voluptatem sint est.\n\nUt recusandae praesentium aspernatur. Exercitationem amet placeat deserunt quae consequatur eum. Unde doloremque suscipit quia.\n\n'
+ mdoc_id, _ = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+
+ store = LeapMailStore(self.soledad)
+
+ mail = yield store.get_mail(mdoc_id, include_body=True)
+
+ self.assertEqual(expeted_body, mail.body)
+
+ @defer.inlineCallbacks
+ def test_get_mail_attachment(self):
+ attachment_id = 'AAAA9AAD9E153D24265395203C53884506ABA276394B9FEC02B214BF9E77E48E'
+ doc = SoledadDocument(json=json.dumps({'content_type': 'foo/bar', 'raw': 'asdf'}))
+ when(self.soledad).get_from_index('by-type-and-payloadhash', 'cnt', attachment_id).thenReturn(defer.succeed([doc]))
+ store = LeapMailStore(self.soledad)
+
+ attachment = yield store.get_mail_attachment(attachment_id)
+
+ self.assertEqual({'content-type': 'foo/bar', 'content': bytearray('asdf')}, attachment)
+
+ @defer.inlineCallbacks
+ def test_get_mail_attachment_different_content_encodings(self):
+ attachment_id = '1B0A9AAD9E153D24265395203C53884506ABA276394B9FEC02B214BF9E77E48E'
+ encoding_examples = [('', 'asdf', 'asdf'),
+ ('base64', 'asdf', 'YXNkZg=='),
+ ('quoted-printable', 'äsdf', '=C3=A4sdf')]
+
+ for transfer_encoding, data, encoded_data in encoding_examples:
+ doc = SoledadDocument(json=json.dumps({'content_type': 'foo/bar', 'raw': encoded_data,
+ 'content_transfer_encoding': transfer_encoding}))
+ when(self.soledad).get_from_index('by-type-and-payloadhash', 'cnt', attachment_id).thenReturn(defer.succeed([doc]))
+ store = LeapMailStore(self.soledad)
+
+ attachment = yield store.get_mail_attachment(attachment_id)
+
+ self.assertEqual(bytearray(data), attachment['content'])
+
+ @defer.inlineCallbacks
+ def test_get_mail_attachment_throws_exception_if_attachment_does_not_exist(self):
+ attachment_id = '1B0A9AAD9E153D24265395203C53884506ABA276394B9FEC02B214BF9E77E48E'
+ when(self.soledad).get_from_index('by-type-and-payloadhash', 'cnt', attachment_id).thenReturn(defer.succeed([]))
+ store = LeapMailStore(self.soledad)
+ try:
+ yield store.get_mail_attachment(attachment_id)
+ self.fail('ValueError exception expected')
+ except ValueError:
+ pass
+
+ @defer.inlineCallbacks
+ def test_update_mail(self):
+ mdoc_id, fdoc_id = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+ soledad_fdoc = self.doc_by_id[fdoc_id]
+ when(self.soledad).put_doc(soledad_fdoc).thenReturn(defer.succeed(None))
+
+ store = LeapMailStore(self.soledad)
+
+ mail = yield store.get_mail(mdoc_id)
+
+ mail.tags.add('new_tag')
+
+ yield store.update_mail(mail)
+
+ verify(self.soledad).put_doc(soledad_fdoc)
+ self.assertTrue('new_tag' in soledad_fdoc.content['tags'])
+
+ @defer.inlineCallbacks
+ def test_all_mails(self):
+ first_mdoc_id, _ = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+ second_mdoc_id, _ = self._add_mail_fixture_to_soledad_from_file('mbox00000001')
+ when(self.soledad).get_from_index('by-type', 'meta').thenReturn(defer.succeed([self.doc_by_id[first_mdoc_id], self.doc_by_id[second_mdoc_id]]))
+
+ store = LeapMailStore(self.soledad)
+
+ mails = yield store.all_mails()
+
+ self.assertIsNotNone(mails)
+ self.assertEqual(2, len(mails))
+ self.assertEqual('Itaque consequatur repellendus provident sunt quia.', mails[0].subject)
+ self.assertEqual('Error illum dignissimos autem eos aspernatur.', mails[1].subject)
+
+ @defer.inlineCallbacks
+ def test_add_mailbox(self):
+ when(self.soledad).list_indexes().thenReturn(defer.succeed(MAIL_INDEXES)).thenReturn(defer.succeed(MAIL_INDEXES))
+ when(self.soledad).get_from_index('by-type-and-mbox', 'mbox', 'TEST').thenReturn(defer.succeed([]))
+ self._mock_create_soledad_doc(self.mbox_uuid, MailboxWrapper(mbox='TEST'))
+ when(self.soledad).get_doc(self.mbox_uuid).thenAnswer(lambda: defer.succeed(self.doc_by_id[self.mbox_uuid]))
+ when(self.soledad).put_doc(ANY()).thenAnswer(lambda: defer.succeed(None))
+ store = LeapMailStore(self.soledad)
+
+ mbox = yield store.add_mailbox('TEST')
+
+ self.assertIsNotNone(mbox)
+ self.assertEqual(self.mbox_uuid, mbox.doc_id)
+ self.assertEqual('TEST', mbox.mbox)
+ self.assertIsNotNone(mbox.uuid)
+ # assert index got updated
+
+ @defer.inlineCallbacks
+ def test_get_mailbox_names_always_contains_inbox(self):
+ store = LeapMailStore(self.soledad)
+
+ names = yield store.get_mailbox_names()
+
+ self.assertEqual({'INBOX'}, names)
+
+ @defer.inlineCallbacks
+ def test_get_mailbox_names(self):
+ self._mock_get_mailbox('OTHER', create_new_uuid=True)
+ store = LeapMailStore(self.soledad)
+
+ names = yield store.get_mailbox_names()
+
+ self.assertEqual({'INBOX', 'OTHER'}, names)
+
+ @defer.inlineCallbacks
+ def test_handles_unmapped_mailbox_uuid(self):
+ # given
+ store = LeapMailStore(self.soledad)
+ new_uuid = 'UNICORN'
+
+ # if no mailbox doc is created yet (async hell?)
+ when(self.soledad).get_from_index('by-type', 'mbox').thenReturn(defer.succeed([]))
+
+ # then it should point to empty, which is all mails
+ name = yield store._mailbox_name_from_uuid(new_uuid)
+ self.assertEquals('', name)
+
+ @defer.inlineCallbacks
+ def test_add_mail(self):
+ expected_message = self._add_create_mail_mocks_to_soledad_from_fixture_file('mbox00000000')
+ mail = self._load_mail_from_file('mbox00000000')
+ self._mock_get_mailbox('INBOX')
+
+ store = LeapMailStore(self.soledad)
+
+ message = yield store.add_mail('INBOX', mail.as_string())
+
+ self.assertIsInstance(message, LeapMail)
+ self._assert_message_docs_created(expected_message, message)
+
+ @defer.inlineCallbacks
+ def test_add_mail_with_attachment(self):
+ input_mail = MIMEMultipart()
+ input_mail.attach(MIMEText(u'a utf8 message', _charset='utf-8'))
+ attachment = MIMEApplication('pretend to be binary attachment data')
+ attachment.add_header('Content-Disposition', 'attachment', filename='filename.txt')
+ input_mail.attach(attachment)
+ mocked_message = self._add_create_mail_mocks_to_soledad(input_mail)
+ store = LeapMailStore(self.soledad)
+
+ message = yield store.add_mail('INBOX', input_mail.as_string())
+
+ expected = [{'ident': self._cdoc_phash_from_message(mocked_message, 2), 'name': 'filename.txt', 'encoding': 'base64'}]
+ self.assertEqual(expected, message.as_dict()['attachments'])
+
+ @defer.inlineCallbacks
+ def test_add_mail_with_nested_attachments(self):
+ input_mail = MIMEMultipart()
+ input_mail.attach(MIMEText(u'a utf8 message', _charset='utf-8'))
+ attachment = MIMEApplication('pretend to be binary attachment data')
+ attachment.add_header('Content-Disposition', 'attachment', filename='filename.txt')
+ nested_attachment = MIMEMultipart()
+ nested_attachment.attach(attachment)
+ input_mail.attach(nested_attachment)
+ mocked_message = self._add_create_mail_mocks_to_soledad(input_mail)
+ store = LeapMailStore(self.soledad)
+
+ message = yield store.add_mail('INBOX', input_mail.as_string())
+
+ expected = [{'ident': self._cdoc_phash_from_message(mocked_message, 2), 'name': 'filename.txt', 'encoding': 'base64'}]
+ self.assertEqual(expected, message.as_dict()['attachments'])
+
+ @defer.inlineCallbacks
+ def test_add_mail_with_special_chars(self):
+ input_mail = MIMEText(u'a utf8 message', _charset='utf-8')
+ input_mail['From'] = Header(u'"Älbert Übrö" <äüö@example.mail>', 'iso-8859-1')
+ input_mail['Subject'] = Header(u'Hällö Wörld', 'iso-8859-1')
+ self._add_create_mail_mocks_to_soledad(input_mail)
+ store = LeapMailStore(self.soledad)
+
+ message = yield store.add_mail('INBOX', input_mail.as_string())
+
+ self.assertEqual(u'"\xc4lbert \xdcbr\xf6" <\xe4\xfc\xf6@example.mail>', message.as_dict()['header']['from'])
+
+ def _cdoc_phash_from_message(self, mocked_message, attachment_nr):
+ return mocked_message.get_wrapper().cdocs[attachment_nr].future_doc_id[2:]
+
+ @defer.inlineCallbacks
+ def test_delete_mail(self):
+ mdoc_id, fdoc_id = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+
+ store = LeapMailStore(self.soledad)
+
+ yield store.delete_mail(mdoc_id)
+
+ self._assert_mail_got_deleted(fdoc_id, mdoc_id)
+
+ @defer.inlineCallbacks
+ def test_get_mailbox_mail_ids(self):
+ mdoc_id, fdoc_id = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+ when(self.soledad).get_from_index('by-type-and-mbox-uuid', 'flags', underscore_uuid(self.mbox_uuid)).thenReturn(defer.succeed([self.doc_by_id[fdoc_id]]))
+ self._mock_get_mailbox('INBOX')
+ store = LeapMailStore(self.soledad)
+
+ mail_ids = yield store.get_mailbox_mail_ids('INBOX')
+
+ self.assertEqual(1, len(mail_ids))
+ self.assertEqual(mdoc_id, mail_ids[0])
+
+ @defer.inlineCallbacks
+ def test_delete_mailbox(self):
+ _, mbox_soledad_doc = self._mock_get_mailbox('INBOX')
+ store = LeapMailStore(self.soledad)
+ when(self.soledad).delete_doc(mbox_soledad_doc).thenReturn(defer.succeed(None))
+
+ yield store.delete_mailbox('INBOX')
+
+ verify(self.soledad).delete_doc(self.doc_by_id[mbox_soledad_doc.doc_id])
+ # should also verify index is updated
+
+ @defer.inlineCallbacks
+ def test_copy_mail_to_mailbox(self):
+ expected_message = self._add_create_mail_mocks_to_soledad_from_fixture_file('mbox00000000')
+ mail_id, fdoc_id = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+ self._mock_get_mailbox('TRASH')
+ store = LeapMailStore(self.soledad)
+
+ mail = yield store.copy_mail_to_mailbox(mail_id, 'TRASH')
+
+ self._assert_message_docs_created(expected_message, mail, only_mdoc_and_fdoc=True)
+
+ @defer.inlineCallbacks
+ def test_move_to_mailbox(self):
+ expected_message = self._add_create_mail_mocks_to_soledad_from_fixture_file('mbox00000000')
+ mail_id, fdoc_id = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+ self._mock_get_mailbox('TRASH')
+ store = LeapMailStore(self.soledad)
+
+ mail = yield store.move_mail_to_mailbox(mail_id, 'TRASH')
+
+ self._assert_message_docs_created(expected_message, mail, only_mdoc_and_fdoc=True)
+ self._assert_mail_got_deleted(fdoc_id, mail_id)
+
+ def _assert_mail_got_deleted(self, fdoc_id, mail_id):
+ verify(self.soledad).delete_doc(self.doc_by_id[mail_id])
+ verify(self.soledad).delete_doc(self.doc_by_id[fdoc_id])
+
+ def _assert_message_docs_created(self, expected_message, actual_message, only_mdoc_and_fdoc=False):
+ wrapper = expected_message.get_wrapper()
+
+ verify(self.soledad).create_doc(wrapper.mdoc.serialize(), doc_id=actual_message.mail_id)
+ verify(self.soledad).create_doc(wrapper.fdoc.serialize(), doc_id=wrapper.fdoc.future_doc_id)
+ if not only_mdoc_and_fdoc:
+ verify(self.soledad).create_doc(wrapper.hdoc.serialize(), doc_id=wrapper.hdoc.future_doc_id)
+ for nr, cdoc in wrapper.cdocs.items():
+ verify(self.soledad).create_doc(cdoc.serialize(), doc_id=wrapper.cdocs[nr].future_doc_id)
+
+ def _mock_get_mailbox(self, mailbox_name, create_new_uuid=False):
+ mbox_uuid = self.mbox_uuid if not create_new_uuid else str(uuid4())
+ when(self.soledad).list_indexes().thenReturn(defer.succeed(MAIL_INDEXES)).thenReturn(
+ defer.succeed(MAIL_INDEXES))
+ doc_id = str(uuid4())
+ mbox = MailboxWrapper(doc_id=doc_id, mbox=mailbox_name, uuid=mbox_uuid)
+ soledad_doc = SoledadDocument(doc_id, json=json.dumps(mbox.serialize()))
+ when(self.soledad).get_from_index('by-type-and-mbox', 'mbox', mailbox_name).thenReturn(defer.succeed([soledad_doc]))
+ self._mock_get_soledad_doc(doc_id, mbox)
+
+ self.mbox_uuid_by_name[mailbox_name] = mbox_uuid
+ self.mbox_soledad_docs.append(soledad_doc)
+
+ return mbox, soledad_doc
+
+ def _add_mail_fixture_to_soledad_from_file(self, mail_file, mbox_uuid=None):
+ mail = self._load_mail_from_file(mail_file)
+ return self._add_mail_fixture_to_soledad(mail, mbox_uuid)
+
+ def _add_mail_fixture_to_soledad(self, mail, mbox_uuid=None):
+ msg = self._convert_mail_to_leap_message(mail, mbox_uuid)
+ wrapper = msg.get_wrapper()
+ mdoc_id = wrapper.mdoc.future_doc_id
+ fdoc_id = wrapper.mdoc.fdoc
+ hdoc_id = wrapper.mdoc.hdoc
+ cdoc_id = wrapper.mdoc.cdocs[0]
+
+ self._mock_get_soledad_doc(mdoc_id, wrapper.mdoc)
+ self._mock_get_soledad_doc(fdoc_id, wrapper.fdoc)
+ self._mock_get_soledad_doc(hdoc_id, wrapper.hdoc)
+ self._mock_get_soledad_doc(cdoc_id, wrapper.cdocs[1])
+ return mdoc_id, fdoc_id
+
+ def _add_create_mail_mocks_to_soledad_from_fixture_file(self, mail_file):
+ mail = self._load_mail_from_file(mail_file)
+ return self._add_create_mail_mocks_to_soledad(mail)
+
+ def _add_create_mail_mocks_to_soledad(self, example_mail):
+ mail = self._convert_mail_to_leap_message(example_mail)
+ wrapper = mail.get_wrapper()
+
+ mdoc_id = wrapper.mdoc.future_doc_id
+ fdoc_id = wrapper.mdoc.fdoc
+ hdoc_id = wrapper.mdoc.hdoc
+
+ self._mock_create_soledad_doc(mdoc_id, wrapper.mdoc)
+ self._mock_create_soledad_doc(fdoc_id, wrapper.fdoc)
+ self._mock_create_soledad_doc(hdoc_id, wrapper.hdoc)
+
+ for _, cdoc in wrapper.cdocs.items():
+ self._mock_create_soledad_doc(cdoc.future_doc_id, cdoc)
+ self._mock_get_soledad_doc(cdoc.future_doc_id, cdoc)
+
+ return mail
+
+ def _convert_mail_to_leap_message(self, mail, mbox_uuid=None):
+ msg = SoledadMailAdaptor().get_msg_from_string(Message, mail.as_string())
+ if mbox_uuid is None:
+ msg.get_wrapper().set_mbox_uuid(self.mbox_uuid)
+ else:
+ msg.get_wrapper().set_mbox_uuid(mbox_uuid)
+
+ return msg
+
+ def _mock_get_soledad_doc(self, doc_id, doc):
+ soledad_doc = SoledadDocument(doc_id, json=json.dumps(doc.serialize()))
+
+ # when(self.soledad).get_doc(doc_id).thenReturn(defer.succeed(soledad_doc))
+ when(self.soledad).get_doc(doc_id).thenAnswer(lambda: defer.succeed(soledad_doc))
+
+ self.doc_by_id[doc_id] = soledad_doc
+
+ def _mock_create_soledad_doc(self, doc_id, doc):
+ soledad_doc = SoledadDocument(doc_id, json=json.dumps(doc.serialize()))
+ if doc.future_doc_id:
+ when(self.soledad).create_doc(doc.serialize(), doc_id=doc_id).thenReturn(defer.succeed(soledad_doc))
+ else:
+ when(self.soledad).create_doc(doc.serialize()).thenReturn(defer.succeed(soledad_doc))
+ self.doc_by_id[doc_id] = soledad_doc
+
+ def _load_mail_from_file(self, mail_file):
+ mailset_dir = pkg_resources.resource_filename('test.unit.fixtures', 'mailset')
+ mail_file = os.path.join(mailset_dir, 'new', mail_file)
+ with open(mail_file) as f:
+ mail = Parser().parse(f)
+ return mail
+
+
+def _format_mdoc_id(mbox_uuid, chash):
+ return constants.METAMSGID.format(mbox_uuid=mbox_uuid, chash=chash)
diff --git a/service/test/unit/adapter/mailstore/test_searchable_mailstore.py b/service/test/unit/adapter/mailstore/test_searchable_mailstore.py
new file mode 100644
index 00000000..8c571201
--- /dev/null
+++ b/service/test/unit/adapter/mailstore/test_searchable_mailstore.py
@@ -0,0 +1,112 @@
+#
+# Copyright (c) 2015 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+from email.parser import Parser
+import os
+from mockito import verify, mock, when
+import pkg_resources
+from twisted.internet import defer
+from twisted.trial.unittest import TestCase
+from pixelated.adapter.mailstore import MailStore
+from pixelated.adapter.mailstore.leap_mailstore import LeapMail
+from pixelated.adapter.mailstore.searchable_mailstore import SearchableMailStore
+from pixelated.adapter.search import SearchEngine
+
+
+ANY_MAILBOX = 'INBOX'
+
+
+class TestSearchableMailStore(TestCase):
+
+ def setUp(self):
+ super(TestSearchableMailStore, self).setUp()
+ self.search_index = mock(mocked_obj=SearchEngine)
+ self.delegate_mail_store = mock(mocked_obj=MailStore)
+ self.store = SearchableMailStore(self.delegate_mail_store, self.search_index)
+
+ @defer.inlineCallbacks
+ def test_add_mail_delegates_to_mail_store_and_updates_index(self):
+ mail = self._load_mail_from_file('mbox00000000')
+ leap_mail = LeapMail('id', ANY_MAILBOX)
+ when(self.delegate_mail_store).add_mail(ANY_MAILBOX, mail).thenReturn(defer.succeed(leap_mail))
+
+ result = yield self.store.add_mail(ANY_MAILBOX, mail)
+
+ verify(self.delegate_mail_store).add_mail(ANY_MAILBOX, mail)
+ verify(self.search_index).index_mail(leap_mail)
+ self.assertEqual(leap_mail, result)
+
+ @defer.inlineCallbacks
+ def test_delete_mail_delegates_to_mail_store_and_updates_index(self):
+ when(self.delegate_mail_store).delete_mail('mail id').thenReturn(defer.succeed(None))
+ when(self.search_index).remove_from_index('mail id').thenReturn(defer.succeed(None))
+
+ yield self.store.delete_mail('mail id')
+
+ verify(self.delegate_mail_store).delete_mail('mail id')
+ verify(self.search_index).remove_from_index('mail id')
+
+ @defer.inlineCallbacks
+ def test_update_mail_delegates_to_mail_store_and_updates_index(self):
+ leap_mail = LeapMail('id', ANY_MAILBOX)
+
+ yield self.store.update_mail(leap_mail)
+
+ verify(self.delegate_mail_store).update_mail(leap_mail)
+ verify(self.search_index).index_mail(leap_mail)
+
+ @defer.inlineCallbacks
+ def test_copy_mail_delegates_to_mail_store_and_updates_index(self):
+ copied_mail = LeapMail('new id', ANY_MAILBOX)
+ when(self.delegate_mail_store).copy_mail_to_mailbox('mail id', ANY_MAILBOX).thenReturn(defer.succeed(copied_mail))
+
+ result = yield self.store.copy_mail_to_mailbox('mail id', ANY_MAILBOX)
+
+ verify(self.search_index).index_mail(copied_mail)
+ self.assertEqual(copied_mail, result)
+
+ @defer.inlineCallbacks
+ def test_move_mail_delegates_to_mail_store_and_updates_index(self):
+ moved_mail = LeapMail('new id', ANY_MAILBOX)
+ when(self.delegate_mail_store).move_mail_to_mailbox('mail id', ANY_MAILBOX).thenReturn(defer.succeed(moved_mail))
+
+ result = yield self.store.move_mail_to_mailbox('mail id', ANY_MAILBOX)
+
+ verify(self.search_index).remove_from_index('mail id')
+ verify(self.search_index).index_mail(moved_mail)
+ self.assertEqual(moved_mail, result)
+
+ @defer.inlineCallbacks
+ def test_other_methods_are_delegated(self):
+ mail = LeapMail('mail id', ANY_MAILBOX)
+ when(self.delegate_mail_store).get_mail('mail id').thenReturn(defer.succeed(mail), defer.succeed(mail))
+ result = yield self.store.get_mail('mail id')
+
+ self.assertEqual(mail, result)
+
+ @defer.inlineCallbacks
+ def test_delete_mailbox_is_not_implemented(self):
+ try:
+ yield self.store.delete_mailbox(ANY_MAILBOX)
+ self.fail("Should raise NotImplementedError")
+ except NotImplementedError:
+ pass
+
+ def _load_mail_from_file(self, mail_file):
+ mailset_dir = pkg_resources.resource_filename('test.unit.fixtures', 'mailset')
+ mail_file = os.path.join(mailset_dir, 'new', mail_file)
+ with open(mail_file) as f:
+ mail = Parser().parse(f)
+ return mail
diff --git a/service/test/unit/adapter/search/test_index_storage_key.py b/service/test/unit/adapter/search/test_index_storage_key.py
new file mode 100644
index 00000000..e60c69ef
--- /dev/null
+++ b/service/test/unit/adapter/search/test_index_storage_key.py
@@ -0,0 +1,52 @@
+#
+# Copyright (c) 2015 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+from leap.soledad.common.document import SoledadDocument
+from mockito import mock, when, unstub, verify
+from twisted.internet import defer
+from twisted.trial import unittest
+from pixelated.adapter.search.index_storage_key import SearchIndexStorageKey
+import os
+
+
+class TestSearchIndexStorageKey(unittest.TestCase):
+
+ def tearDown(self):
+ unstub()
+
+ @defer.inlineCallbacks
+ def test_get_or_create_key_returns_key(self):
+ soledad = mock()
+
+ when(soledad).get_from_index('by-type', 'index_key').thenReturn([SoledadDocument(json='{"value": "somekey"}')])
+
+ key = yield SearchIndexStorageKey(soledad).get_or_create_key()
+
+ self.assertEqual('somekey', key)
+
+ @defer.inlineCallbacks
+ def test_get_or_create_creates_key_if_not_exists(self):
+ expected_key = '\x8brN\xa3\xe5-\x828 \x95\x8d\n\xc6\x0c\x82\n\xd7!\xa9\xb0.\xcc\\h\xa9\x98\xe9V\xc1*<\xfe\xbb\x8f\xcd\x7f\x8c#\xff\xf9\x840\xdf{}\x97\xebS-*\xe2f\xf9B\xa9\xb1\x0c\x1d-C)\xc5\xa0B'
+ base64_encoded_key = 'i3JOo+UtgjgglY0KxgyCCtchqbAuzFxoqZjpVsEqPP67j81/jCP/+YQw33t9l+tTLSriZvlCqbEM\nHS1DKcWgQg==\n'
+ soledad = mock()
+
+ when(soledad).get_from_index('by-type', 'index_key').thenReturn([])
+ when(os).urandom(64).thenReturn(expected_key)
+
+ key = yield SearchIndexStorageKey(soledad).get_or_create_key()
+
+ self.assertEqual(expected_key, key)
+
+ verify(soledad).create_doc(dict(type='index_key', value=base64_encoded_key))
diff --git a/service/test/unit/adapter/search/test_search.py b/service/test/unit/adapter/search/test_search.py
index 1d9076a2..76e704b6 100644
--- a/service/test/unit/adapter/search/test_search.py
+++ b/service/test/unit/adapter/search/test_search.py
@@ -16,6 +16,7 @@
import unittest
+from pixelated.adapter.mailstore.leap_mailstore import LeapMail
from pixelated.adapter.search import SearchEngine
from tempdir import TempDir
from test.support import test_helper
@@ -56,7 +57,7 @@ class SearchEngineTest(unittest.TestCase):
}
# when
- se.index_mail(test_helper.pixelated_mail(extra_headers=headers, chash='mailid'))
+ se.index_mail(LeapMail('mailid', 'INBOX', headers=headers)) # test_helper.pixelated_mail(extra_headers=headers, chash='mailid'))
result = se.search('folker')
diff --git a/service/test/unit/adapter/test_draft_service.py b/service/test/unit/adapter/test_draft_service.py
index 79eca5f6..c2b7cd93 100644
--- a/service/test/unit/adapter/test_draft_service.py
+++ b/service/test/unit/adapter/test_draft_service.py
@@ -1,4 +1,6 @@
import unittest
+from twisted.internet import defer
+from pixelated.adapter.mailstore.leap_mailstore import LeapMail
from pixelated.adapter.model.mail import InputMail
from pixelated.adapter.services.draft_service import DraftService
@@ -10,21 +12,20 @@ class DraftServiceTest(unittest.TestCase):
def setUp(self):
self.mailboxes = mock()
- self.drafts_mailbox = mock()
- self.draft_service = DraftService(self.mailboxes)
- self.mailboxes.drafts = self.drafts_mailbox
+ self.mail_store = mock()
+ self.draft_service = DraftService(self.mail_store)
def test_add_draft(self):
mail = InputMail()
self.draft_service.create_draft(mail)
- verify(self.drafts_mailbox).add(mail)
+ verify(self.mail_store).add_mail('DRAFTS', mail.raw)
def test_update_draft(self):
mail = InputMail.from_dict(test_helper.mail_dict())
- when(self.drafts_mailbox).add(mail).thenReturn(mail)
+ when(self.mail_store).add_mail('DRAFTS', mail.raw).thenReturn(defer.succeed(LeapMail('id', 'DRAFTS')))
self.draft_service.update_draft(mail.ident, mail)
- inorder.verify(self.drafts_mailbox).add(mail)
- inorder.verify(self.drafts_mailbox).remove(mail.ident)
+ inorder.verify(self.mail_store).add_mail('DRAFTS', mail.raw)
+ inorder.verify(self.mail_store).delete_mail(mail.ident)
diff --git a/service/test/unit/adapter/test_mail.py b/service/test/unit/adapter/test_mail.py
index 1a9280ff..dc344992 100644
--- a/service/test/unit/adapter/test_mail.py
+++ b/service/test/unit/adapter/test_mail.py
@@ -14,378 +14,20 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
-import unittest
+from twisted.trial import unittest
import pixelated.support.date
-from pixelated.adapter.model.mail import PixelatedMail, InputMail
+from pixelated.adapter.model.mail import InputMail, HEADERS_KEY
from mockito import mock, unstub, when
from test.support import test_helper
import dateutil.parser as dateparser
import base64
-from leap.mail.imap.fields import fields
+from leap.mail.adaptors import soledad_indexes as fields
from datetime import datetime
import os
import json
-
-
-class TestPixelatedMail(unittest.TestCase):
- def setUp(self):
- self.querier = mock()
-
- def tearDown(self):
- unstub()
-
- def test_parse_date_from_soledad_uses_date_header_if_available(self):
- leap_mail_date = 'Wed, 3 Sep 2014 12:36:17 -0300'
- leap_mail_date_in_iso_format = "2014-09-03T12:36:17-03:00"
-
- leap_mail = test_helper.leap_mail(headers={'date': leap_mail_date})
-
- mail = PixelatedMail.from_soledad(*leap_mail, soledad_querier=self.querier)
-
- self.assertEqual(str(mail.headers['Date']), leap_mail_date_in_iso_format)
-
- def test_parse_date_from_soledad_fallback_to_received_header_if_date_header_isnt_available(self):
- leap_mail_date = "Wed, 03 Sep 2014 13:11:15 -0300"
- leap_mail_date_in_iso_format = "2014-09-03T13:11:15-03:00"
- leap_mail_received_header = "by bitmask.local from 127.0.0.1 with ESMTP ;\n " + leap_mail_date
-
- leap_mail = test_helper.leap_mail(headers={'received': leap_mail_received_header})
-
- mail = PixelatedMail.from_soledad(*leap_mail, soledad_querier=self.querier)
-
- self.assertEqual(str(mail.headers['Date']), leap_mail_date_in_iso_format)
-
- def test_parse_date_from_soledad_fallback_to_now_if_neither_date_nor_received_header(self):
- leap_mail_date_in_iso_format = "2014-09-03T13:11:15-03:00"
-
- when(pixelated.support.date).iso_now().thenReturn(leap_mail_date_in_iso_format)
- fdoc, hdoc, bdoc = test_helper.leap_mail()
- del hdoc.content['date']
-
- mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier)
-
- self.assertEqual(str(mail.headers['Date']), leap_mail_date_in_iso_format)
-
- def test_use_datetime_now_as_fallback_for_invalid_date(self):
- leap_mail_date = u'söme däte'
- date_expected = "2014-09-03T13:11:15-03:00"
-
- when(pixelated.support.date).iso_now().thenReturn(date_expected)
- leap_mail = test_helper.leap_mail(headers={'date': leap_mail_date})
-
- mail = PixelatedMail.from_soledad(*leap_mail, soledad_querier=self.querier)
-
- self.assertEqual(str(mail.headers['Date']), date_expected)
-
- def test_fall_back_to_ascii_if_invalid_received_header(self):
- leap_mail_received_header = u"söme invalid received heäder\n"
- date_expected = "2014-09-03T13:11:15-03:00"
-
- when(pixelated.support.date).iso_now().thenReturn(date_expected)
- leap_mail = test_helper.leap_mail(headers={'received': leap_mail_received_header})
-
- mail = PixelatedMail.from_soledad(*leap_mail, soledad_querier=self.querier)
-
- self.assertEqual(mail.headers['Date'], date_expected)
-
- def test_update_tags_return_a_set_with_the_current_tags(self):
- soledad_docs = test_helper.leap_mail(extra_headers={'X-tags': '["custom_1", "custom_2"]'})
- pixelated_mail = PixelatedMail.from_soledad(*soledad_docs, soledad_querier=self.querier)
-
- current_tags = pixelated_mail.update_tags({'custom_1', 'custom_3'})
- self.assertEquals({'custom_3', 'custom_1'}, current_tags)
-
- def test_mark_as_read(self):
- mail = PixelatedMail.from_soledad(*test_helper.leap_mail(flags=[]), soledad_querier=self.querier)
-
- mail.mark_as_read()
-
- self.assertEquals(mail.fdoc.content['flags'], ['\\Seen'])
-
- def test_mark_as_not_recent(self):
- mail = PixelatedMail.from_soledad(*test_helper.leap_mail(flags=['\\Recent']), soledad_querier=self.querier)
-
- mail.mark_as_not_recent()
-
- self.assertEquals(mail.fdoc.content['flags'], [])
-
- def test_get_for_save_adds_from(self):
- InputMail.FROM_EMAIL_ADDRESS = 'me@pixelated.org'
- headers = {'Subject': 'The subject',
- 'Date': str(datetime.now()),
- 'To': 'me@pixelated.org'}
-
- input_mail = InputMail()
- input_mail.headers = headers
-
- self.assertEqual('me@pixelated.org', input_mail.get_for_save(1, 'SENT')[1][fields.HEADERS_KEY]['From'])
-
- def test_as_dict(self):
- headers = {'Subject': 'The subject',
- 'From': 'someone@pixelated.org',
- 'To': 'me@pixelated.org'}
- fdoc, hdoc, bdoc = test_helper.leap_mail(flags=['\\Recent'],
- extra_headers=headers)
-
- InputMail.FROM_EMAIL_ADDRESS = 'me@pixelated.org'
-
- mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier)
-
- _dict = mail.as_dict()
-
- self.maxDiff = None
-
- self.assertEquals(_dict, {'htmlBody': None,
- 'textPlainBody': 'body',
- 'header': {
- 'date': dateparser.parse(hdoc.content['date']).isoformat(),
- 'from': 'someone@pixelated.org',
- 'subject': 'The subject',
- 'to': ['me@pixelated.org'],
- 'cc': [],
- 'bcc': []
- },
- 'ident': 'chash',
- 'mailbox': 'inbox',
- 'security_casing': {'imprints': [{'state': 'no_signature_information'}], 'locks': []},
- 'status': ['recent'],
- 'tags': [],
- 'attachments': [],
- 'replying': {
- 'single': 'someone@pixelated.org',
- 'all': {
- 'to-field': ['someone@pixelated.org'],
- 'cc-field': []
- }
- }})
-
- def test_use_reply_to_address_for_replying(self):
- headers = {'Subject': 'The subject',
- 'From': 'someone@pixelated.org',
- 'Reply-To': 'reply-to-this-address@pixelated.org',
- 'To': 'me@pixelated.org, \nalice@pixelated.org'}
- fdoc, hdoc, bdoc = test_helper.leap_mail(flags=['\\Recent'],
- extra_headers=headers)
-
- InputMail.FROM_EMAIL_ADDRESS = 'me@pixelated.org'
-
- mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier)
-
- _dict = mail.as_dict()
-
- self.assertEquals(_dict['replying'], {'single': 'reply-to-this-address@pixelated.org',
- 'all': {
- 'to-field': ['alice@pixelated.org', 'reply-to-this-address@pixelated.org'],
- 'cc-field': []
- }})
-
- def test_alternatives_body(self):
- parts = {'alternatives': [], 'attachments': []}
- parts['alternatives'].append({'content': 'blablabla', 'headers': {'Content-Type': 'text/plain'}})
- parts['alternatives'].append({'content': '<p>blablabla</p>', 'headers': {'Content-Type': 'text/html'}})
-
- mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw='blablabla'), parts=parts, soledad_querier=None)
-
- self.assertRegexpMatches(mail.html_body, '^<p>blablabla</p>$')
- self.assertRegexpMatches(mail.text_plain_body, '^blablabla$')
-
- def test_html_is_none_if_multiple_alternatives_have_no_html_part(self):
- parts = {
- 'attachments': [],
- 'alternatives': [
- {'content': u'content', 'headers': {u'Content-Type': u'text/plain; charset=us-ascii'}},
- {'content': u'', 'headers': {u'Some info': u'info'}}]}
-
- mail = PixelatedMail.from_soledad(None, None, None, parts=parts, soledad_querier=None)
- self.assertIsNone(mail.html_body)
-
- def test_percent_character_is_allowed_on_body(self):
- parts = {'alternatives': [], 'attachments': []}
- parts['alternatives'].append({'content': '100% happy with percentage symbol', 'headers': {'Content-Type': 'text/plain'}})
- parts['alternatives'].append({'content': '<p>100% happy with percentage symbol</p>', 'headers': {'Content-Type': 'text/html'}})
-
- mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw="100% happy with percentage symbol"), parts=parts, soledad_querier=None)
-
- self.assertRegexpMatches(mail.text_plain_body, '([\s\S]*100%)')
- self.assertRegexpMatches(mail.html_body, '([\s\S]*100%)')
-
- def test_content_type_header_of_mail_part_is_used(self):
- plain_headers = {'Content-Type': 'text/plain; charset=iso-8859-1', 'Content-Transfer-Encoding': 'quoted-printable'}
- html_headers = {'Content-Type': 'text/html; charset=utf-8', 'Content-Transfer-Encoding': 'quoted-printable'}
- parts = {'alternatives': [{'content': 'H=E4llo', 'headers': plain_headers}, {'content': '<p>H=C3=A4llo</p>', 'headers': html_headers}]}
-
- mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw='some raw body'), parts=parts, soledad_querier=None)
-
- self.assertEqual(2, len(mail.alternatives))
- self.assertEquals(u'H\xe4llo', mail.text_plain_body)
- self.assertEquals(u'<p>H\xe4llo</p>', mail.html_body)
-
- def test_multi_line_content_type_header_is_supported(self):
- plain_headers = {'Content-Type': 'text/plain;\ncharset=iso-8859-1', 'Content-Transfer-Encoding': 'quoted-printable'}
- html_headers = {'Content-Type': 'text/html;\ncharset=utf-8', 'Content-Transfer-Encoding': 'quoted-printable'}
- parts = {'alternatives': [{'content': 'H=E4llo', 'headers': plain_headers}, {'content': '<p>H=C3=A4llo</p>', 'headers': html_headers}]}
-
- mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw='some raw body'), parts=parts, soledad_querier=None)
-
- self.assertEqual(2, len(mail.alternatives))
- self.assertEquals(u'H\xe4llo', mail.text_plain_body)
- self.assertEquals(u'<p>H\xe4llo</p>', mail.html_body)
-
- def test_broken_content_type_defaults_to_usascii(self):
- plain_headers = {'Content-Type': 'I lie to you', 'Content-Transfer-Encoding': 'quoted-printable'}
- html_headers = {'Content-Type': 'text/html;\ncharset=utf-8', 'Content-Transfer-Encoding': 'quoted-printable'}
- parts = {'alternatives': [{'content': 'H=E4llo', 'headers': plain_headers}, {'content': '<p>H=C3=A4llo</p>', 'headers': html_headers}]}
-
- mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw='some raw body'), parts=parts, soledad_querier=None)
-
- self.assertEquals(u'H=E4llo', mail.text_plain_body)
-
- def test_broken_encoding_defaults_to_8bit(self):
- plain_headers = {'Content-Type': 'text/plain;\ncharset=iso-8859-1', 'Content-Transfer-Encoding': 'I lie to you!'}
- html_headers = {'Content-Type': 'text/html;\ncharset=utf-8', 'Content-Transfer-Encoding': 'quoted-printable'}
- parts = {'alternatives': [{'content': 'H=E4llo', 'headers': plain_headers}, {'content': '<p>H=C3=A4llo</p>', 'headers': html_headers}]}
-
- mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw='some raw body'), parts=parts, soledad_querier=None)
-
- self.assertEquals(u'H=E4llo', mail.text_plain_body)
- self.assertEquals(u'<p>H\xe4llo</p>', mail.html_body)
-
- def test_clean_line_breaks_on_address_headers(self):
- many_recipients = 'One <one@mail.com>,\nTwo <two@mail.com>, Normal <normal@mail.com>,\nalone@mail.com'
- headers = {'Cc': many_recipients,
- 'Bcc': many_recipients,
- 'To': many_recipients}
- fdoc, hdoc, bdoc = test_helper.leap_mail(flags=['\\Recent'],
- extra_headers=headers)
-
- mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier)
-
- for header_label in ['To', 'Cc', 'Bcc']:
- for address in mail.headers[header_label]:
- self.assertNotIn('\n', address)
- self.assertNotIn(',', address)
- self.assertEquals(4, len(mail.headers[header_label]))
-
- def test_that_body_understands_base64(self):
- body = u'bl\xe1'
- encoded_body = unicode(body.encode('utf-8').encode('base64'))
-
- fdoc, hdoc, bdoc = test_helper.leap_mail()
- parts = {'alternatives': []}
- parts['alternatives'].append({'content': encoded_body, 'headers': {'Content-Transfer-Encoding': 'base64'}})
- mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier, parts=parts)
-
- self.assertEquals(body, mail.text_plain_body)
-
- def test_that_body_understands_7bit(self):
- body = u'testtext'
- encoded_body = body
-
- fdoc, hdoc, bdoc = test_helper.leap_mail()
- parts = {'alternatives': []}
- parts['alternatives'].append({'content': encoded_body, 'headers': {'Content-Transfer-Encoding': '7bit'}})
- mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier, parts=parts)
-
- self.assertEquals(body, mail.text_plain_body)
-
- def test_that_body_understands_8bit(self):
- body = u'testtext'
- encoded_body = body
-
- fdoc, hdoc, bdoc = test_helper.leap_mail()
- parts = {'alternatives': []}
- parts['alternatives'].append({'content': encoded_body, 'headers': {'Content-Transfer-Encoding': '8bit'}})
- mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier, parts=parts)
-
- self.assertEquals(body, mail.text_plain_body)
-
- def test_bounced_mails_are_recognized(self):
- bounced_mail_hdoc = os.path.join(os.path.dirname(__file__), '..', 'fixtures', 'bounced_mail_hdoc.json')
- with open(bounced_mail_hdoc) as f:
- hdoc = json.loads(f.read())
-
- bounced_leap_mail = test_helper.leap_mail()
- bounced_leap_mail[1].content = hdoc
- bounced_mail = PixelatedMail.from_soledad(*bounced_leap_mail, soledad_querier=self.querier)
-
- not_bounced_leap_mail = test_helper.leap_mail()
- not_bounced_mail = PixelatedMail.from_soledad(*not_bounced_leap_mail, soledad_querier=self.querier)
-
- self.assertTrue(bounced_mail.bounced)
- self.assertIn('this_mail_was_bounced@domain.com', bounced_mail.bounced)
- self.assertIn("MAILER-DAEMON@domain.org (Mail Delivery System)", bounced_mail.bounced)
- self.assertFalse(not_bounced_mail.bounced)
-
- def test_ignore_transient_failures(self):
- """
- Persistent errors should start with 5.
- See: http://www.iana.org/assignments/smtp-enhanced-status-codes/smtp-enhanced-status-codes.xhtml
- """
- bounced_mail_hdoc = os.path.join(os.path.dirname(__file__), '..', 'fixtures', 'bounced_mail_hdoc.json')
- with open(bounced_mail_hdoc) as f:
- content = f.read()
- # Change status to 4.XXX.YYY (only the first number is relevant here)
- content = content.replace("5.1.1", "4.X.Y")
- hdoc = json.loads(content)
-
- temporary_bounced_leap_mail = test_helper.leap_mail()
- temporary_bounced_leap_mail[1].content = hdoc
- temporary_bounced_mail = PixelatedMail.from_soledad(*temporary_bounced_leap_mail, soledad_querier=self.querier)
-
- not_bounced_leap_mail = test_helper.leap_mail()
- not_bounced_mail = PixelatedMail.from_soledad(*not_bounced_leap_mail, soledad_querier=self.querier)
-
- self.assertFalse(temporary_bounced_mail.bounced)
- self.assertFalse(not_bounced_mail.bounced)
-
- def _create_bdoc(self, raw):
- class FakeBDoc:
- def __init__(self, raw):
- self.content = {'raw': raw}
- return FakeBDoc(raw)
-
- def test_encoding_special_character_on_header(self):
- subject = "=?UTF-8?Q?test_encoding_St=C3=A4ch?="
- email_from = "=?UTF-8?Q?St=C3=A4ch_<stach@pixelated-project.org>?="
- email_to = "=?utf-8?b?IsOEw7zDtiDDlsO8w6QiIDxmb2xrZXJAcGl4ZWxhdGVkLXByb2plY3Qub3Jn?=\n =?utf-8?b?PiwgRsO2bGtlciA8Zm9sa2VyQHBpeGVsYXRlZC1wcm9qZWN0Lm9yZz4=?="
-
- pixel_mail = PixelatedMail()
-
- self.assertEqual(pixel_mail._decode_header(subject), 'test encoding St\xc3\xa4ch')
- self.assertEqual(pixel_mail._decode_header(email_from), 'St\xc3\xa4ch <stach@pixelated-project.org>')
- self.assertEqual(pixel_mail._decode_header(email_to), '"\xc3\x84\xc3\xbc\xc3\xb6 \xc3\x96\xc3\xbc\xc3\xa4" <folker@pixelated-project.org>, F\xc3\xb6lker <folker@pixelated-project.org>')
- self.assertEqual(pixel_mail._decode_header(None), None)
-
- def test_headers_are_encoded_right(self):
- subject = "=?UTF-8?Q?test_encoding_St=C3=A4ch?="
- email_from = "=?UTF-8?Q?St=C3=A4ch_<stach@pixelated-project.org>?="
- email_to = "=?utf-8?b?IsOEw7zDtiDDlsO8w6QiIDxmb2xrZXJAcGl4ZWxhdGVkLXByb2plY3Qub3Jn?=\n =?utf-8?b?PiwgRsO2bGtlciA8Zm9sa2VyQHBpeGVsYXRlZC1wcm9qZWN0Lm9yZz4=?="
- email_cc = "=?UTF-8?Q?St=C3=A4ch_<stach@pixelated-project.org>?="
- email_bcc = "=?UTF-8?Q?St=C3=A4ch_<stach@pixelated-project.org>?="
-
- leap_mail = test_helper.leap_mail(extra_headers={'Subject': subject, 'From': email_from, 'To': email_to, 'Cc': email_cc, 'Bcc': email_bcc})
-
- mail = PixelatedMail.from_soledad(*leap_mail, soledad_querier=self.querier)
-
- self.assertEqual(str(mail.headers['Subject']), 'test encoding St\xc3\xa4ch')
- self.assertEqual(str(mail.headers['From']), 'St\xc3\xa4ch <stach@pixelated-project.org>')
- self.assertEqual(mail.headers['To'], ['"\xc3\x84\xc3\xbc\xc3\xb6 \xc3\x96\xc3\xbc\xc3\xa4" <folker@pixelated-project.org>', 'F\xc3\xb6lker <folker@pixelated-project.org>'])
- self.assertEqual(mail.headers['Cc'], ['St\xc3\xa4ch <stach@pixelated-project.org>'])
- self.assertEqual(mail.headers['Bcc'], ['St\xc3\xa4ch <stach@pixelated-project.org>'])
-
- mail.as_dict()
-
- def test_parse_UTF8_headers_with_CharsetAscii(self):
- leap_mail_from = u'"söme ümläuds" <lisa5@dev.pixelated-project.org>'
- leap_mail_to = u'"söme ümläuds" <lisa5@dev.pixelated-project.org>,\n"söme ümläuds" <lisa5@dev.pixelated-project.org>'
-
- leap_mail = test_helper.leap_mail(extra_headers={'From': leap_mail_from, 'Subject': "some subject", 'To': leap_mail_to})
-
- mail = PixelatedMail.from_soledad(*leap_mail, soledad_querier=self.querier)
-
- mail.headers['From'].encode('ascii')
- self.assertEqual(mail.headers['To'], ['"sme mluds" <lisa5@dev.pixelated-project.org>', '"sme mluds" <lisa5@dev.pixelated-project.org>'])
+import pkg_resources
+from twisted.internet import defer
def simple_mail_dict():
@@ -420,7 +62,7 @@ def multipart_mail_dict():
class InputMailTest(unittest.TestCase):
def test_to_mime_multipart_should_add_blank_fields(self):
- pixelated.support.date.iso_now = lambda: 'date now'
+ pixelated.support.date.mail_date_now = lambda: 'date now'
mail_dict = simple_mail_dict()
mail_dict['header']['to'] = ''
@@ -435,8 +77,23 @@ class InputMailTest(unittest.TestCase):
self.assertNotRegexpMatches(mime_multipart.as_string(), "\nCc: \n")
self.assertNotRegexpMatches(mime_multipart.as_string(), "\nSubject: \n")
+ def test_single_recipient(self):
+ mail_single_recipient = {
+ 'body': '',
+ 'header': {
+ 'to': ['to@pixelated.org'],
+ 'cc': [''],
+ 'bcc': [''],
+ 'subject': 'Oi'
+ }
+ }
+
+ result = InputMail.from_dict(mail_single_recipient).raw
+
+ self.assertRegexpMatches(result, 'To: to@pixelated.org')
+
def test_to_mime_multipart(self):
- pixelated.support.date.iso_now = lambda: 'date now'
+ pixelated.support.date.mail_date_now = lambda: 'date now'
mime_multipart = InputMail.from_dict(simple_mail_dict()).to_mime_multipart()
@@ -447,6 +104,16 @@ class InputMailTest(unittest.TestCase):
self.assertRegexpMatches(mime_multipart.as_string(), "\nSubject: Oi\n")
self.assertRegexpMatches(mime_multipart.as_string(), base64.b64encode(simple_mail_dict()['body']))
+ def test_to_mime_multipart_with_special_chars(self):
+ mail_dict = simple_mail_dict()
+ mail_dict['header']['to'] = u'"Älbert Übrö \xF0\x9F\x92\xA9" <äüö@example.mail>'
+ pixelated.support.date.mail_date_now = lambda: 'date now'
+
+ mime_multipart = InputMail.from_dict(mail_dict).to_mime_multipart()
+
+ expected_part_of_encoded_to = 'Iiwgw4QsIGwsIGIsIGUsIHIsIHQsICAsIMOcLCBiLCByLCDDtiwgICwgw7As'
+ self.assertRegexpMatches(mime_multipart.as_string(), expected_part_of_encoded_to)
+
def test_smtp_format(self):
InputMail.FROM_EMAIL_ADDRESS = 'pixelated@org'
diff --git a/service/test/unit/adapter/test_mail_service.py b/service/test/unit/adapter/test_mail_service.py
index f5e29b0c..6faf5140 100644
--- a/service/test/unit/adapter/test_mail_service.py
+++ b/service/test/unit/adapter/test_mail_service.py
@@ -14,33 +14,37 @@
# You should have received a copy of the GNU Affero General Public License
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
from twisted.trial import unittest
-from pixelated.adapter.model.mail import InputMail, PixelatedMail
+from pixelated.adapter.mailstore.leap_mailstore import LeapMail
+from pixelated.adapter.model.mail import InputMail
+from pixelated.adapter.model.status import Status
from pixelated.adapter.services.mail_service import MailService
from test.support.test_helper import mail_dict, leap_mail
-from mockito import mock, unstub, when, verify, verifyNoMoreInteractions, any
-from twisted.internet.defer import Deferred
+from mockito import mock, unstub, when, verify, verifyNoMoreInteractions, any as ANY
+from twisted.internet import defer
class TestMailService(unittest.TestCase):
def setUp(self):
self.drafts = mock()
- self.querier = mock()
+ self.mail_store = mock()
self.mailboxes = mock()
- self.mailboxes.drafts = self.drafts
+
+ self.mailboxes.drafts = defer.succeed(self.drafts)
+
self.mailboxes.trash = mock()
self.mailboxes.sent = mock()
self.mail_sender = mock()
self.search_engine = mock()
- self.mail_service = MailService(self.mailboxes, self.mail_sender, self.querier, self.search_engine)
+ self.mail_service = MailService(self.mail_sender, self.mail_store, self.search_engine)
def tearDown(self):
unstub()
def test_send_mail(self):
- when(InputMail).from_dict(any()).thenReturn('inputmail')
- when(self.mail_sender).sendmail(any()).thenReturn(Deferred())
+ when(InputMail).from_dict(ANY()).thenReturn('inputmail')
+ when(self.mail_sender).sendmail(ANY()).thenReturn(defer.Deferred())
sent_deferred = self.mail_service.send_mail(mail_dict())
@@ -50,64 +54,110 @@ class TestMailService(unittest.TestCase):
return sent_deferred
+ @defer.inlineCallbacks
def test_send_mail_removes_draft(self):
- mail_ident = 'Some ident'
- mail = mail_dict()
- mail['ident'] = mail_ident
- when(InputMail).from_dict(any()).thenReturn('inputmail')
- deferred = Deferred()
- when(self.mail_sender).sendmail(any()).thenReturn(deferred)
+ mail = LeapMail('id', 'INBOX')
+ when(mail).raw = 'raw mail'
+ when(InputMail).from_dict(ANY()).thenReturn(mail)
+ when(self.mail_store).delete_mail('12').thenReturn(defer.succeed(None))
+ when(self.mail_store).add_mail('SENT', ANY()).thenReturn(mail)
- sent_deferred = self.mail_service.send_mail(mail)
+ deferred_success = defer.succeed(None)
+ when(self.mail_sender).sendmail(ANY()).thenReturn(deferred_success)
- verify(self.mail_sender).sendmail("inputmail")
+ yield self.mail_service.send_mail({'ident': '12'})
- def assert_removed_from_drafts(_):
- verify(self.drafts).remove(any())
+ verify(self.mail_sender).sendmail(mail)
+ verify(self.mail_store).add_mail('SENT', mail.raw)
+ verify(self.mail_store).delete_mail('12')
- sent_deferred.addCallback(assert_removed_from_drafts)
- sent_deferred.callback('Assume sending mail succeeded')
+ @defer.inlineCallbacks
+ def test_send_mail_marks_as_read(self):
+ mail = LeapMail('id', 'INBOX')
+ when(mail).raw = 'raw mail'
+ when(InputMail).from_dict(ANY()).thenReturn(mail)
+ when(self.mail_store).delete_mail('12').thenReturn(defer.succeed(None))
+ when(self.mail_sender).sendmail(ANY()).thenReturn(defer.succeed(None))
- return sent_deferred
+ sent_mail = LeapMail('id', 'INBOX')
+ add_mail_deferral = defer.succeed(sent_mail)
+ when(self.mail_store).add_mail('SENT', ANY()).thenReturn(add_mail_deferral)
- def test_send_mail_does_not_delete_draft_on_error(self):
- when(InputMail).from_dict(any()).thenReturn('inputmail')
- when(self.mail_sender).sendmail(any()).thenReturn(Deferred())
+ yield self.mail_service.send_mail({'ident': '12'})
- send_deferred = self.mail_service.send_mail(mail_dict())
+ self.assertIn(Status.SEEN, sent_mail.flags)
+ verify(self.mail_store).update_mail(sent_mail)
- verify(self.mail_sender).sendmail("inputmail")
+ @defer.inlineCallbacks
+ def test_send_mail_does_not_delete_draft_on_error(self):
+ when(InputMail).from_dict(ANY()).thenReturn('inputmail')
+
+ deferred_failure = defer.fail(Exception("Assume sending mail failed"))
+ when(self.mail_sender).sendmail(ANY()).thenReturn(deferred_failure)
- def assert_not_removed_from_drafts(_):
+ try:
+ yield self.mail_service.send_mail({'ident': '12'})
+ self.fail("send_mail is expected to raise if underlying call fails")
+ except:
+ verify(self.mail_sender).sendmail("inputmail")
verifyNoMoreInteractions(self.drafts)
- send_deferred.addErrback(assert_not_removed_from_drafts)
+ @defer.inlineCallbacks
+ def test_mark_as_read(self):
+ mail = LeapMail(1, 'INBOX')
+ when(self.mail_store).get_mail(1, include_body=True).thenReturn(mail)
+ yield self.mail_service.mark_as_read(1)
- send_deferred.errback(Exception('Assume sending mail failed'))
+ self.assertIn(Status.SEEN, mail.flags)
+ verify(self.mail_store).update_mail(mail)
- return send_deferred
+ @defer.inlineCallbacks
+ def test_mark_as_unread(self):
+ mail = LeapMail(1, 'INBOX')
+ mail.flags.add(Status.SEEN)
- def test_mark_as_read(self):
- mail = mock()
- when(self.mail_service).mail(any()).thenReturn(mail)
- self.mail_service.mark_as_read(1)
+ when(self.mail_store).get_mail(1, include_body=True).thenReturn(mail)
+ yield self.mail_service.mark_as_unread(1)
+
+ verify(self.mail_store).update_mail(mail)
- verify(mail).mark_as_read()
+ self.assertNotEqual(mail.status, Status.SEEN)
+ @defer.inlineCallbacks
def test_delete_mail(self):
- mail_to_delete = PixelatedMail.from_soledad(*leap_mail(), soledad_querier=None)
- when(self.mail_service).mail(1).thenReturn(mail_to_delete)
+ mail_to_delete = LeapMail(1, 'INBOX')
+ when(self.mail_store).get_mail(1, include_body=True).thenReturn(defer.succeed(mail_to_delete))
- self.mail_service.delete_mail(1)
+ yield self.mail_service.delete_mail(1)
- verify(self.mailboxes).move_to_trash(1)
+ verify(self.mail_store).move_mail_to_mailbox(1, 'TRASH')
+ @defer.inlineCallbacks
def test_recover_mail(self):
- mail_to_recover = PixelatedMail.from_soledad(*leap_mail(), soledad_querier=None)
+ mail_to_recover = LeapMail(1, 'TRASH')
when(self.mail_service).mail(1).thenReturn(mail_to_recover)
- when(self.mailboxes).move_to_inbox(1).thenReturn(mail_to_recover)
+ when(self.mail_store).move_mail_to_mailbox(1, 'INBOX').thenReturn(mail_to_recover)
+
+ yield self.mail_service.recover_mail(1)
+
+ verify(self.mail_store).move_mail_to_mailbox(1, 'INBOX')
+
+ @defer.inlineCallbacks
+ def test_get_attachment(self):
+ attachment_dict = {'content': bytearray('data'), 'content-type': 'text/plain'}
+ when(self.mail_store).get_mail_attachment('some attachment id').thenReturn(defer.succeed(attachment_dict))
+
+ attachment = yield self.mail_service.attachment('some attachment id')
+
+ self.assertEqual(attachment_dict, attachment)
+
+ @defer.inlineCallbacks
+ def test_update_tags_return_a_set_with_the_current_tags(self):
+ mail = LeapMail(1, 'INBOX', tags={'custom_1', 'custom_2'})
+ when(self.mail_store).get_mail(1, include_body=True).thenReturn(mail)
+ when(self.search_engine).tags(query='', skip_default_tags=True).thenReturn([])
- self.mail_service.recover_mail(1)
+ updated_mail = yield self.mail_service.update_tags(1, {'custom_1', 'custom_3'})
- verify(self.mailboxes).move_to_inbox(1)
- verify(self.search_engine).index_mail(mail_to_recover)
+ verify(self.mail_store).update_mail(mail)
+ self.assertEqual({'custom_1', 'custom_3'}, updated_mail.tags)
diff --git a/service/test/unit/adapter/test_mailbox.py b/service/test/unit/adapter/test_mailbox.py
deleted file mode 100644
index ed634648..00000000
--- a/service/test/unit/adapter/test_mailbox.py
+++ /dev/null
@@ -1,42 +0,0 @@
-#
-# Copyright (c) 2014 ThoughtWorks, Inc.
-#
-# Pixelated is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# Pixelated is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
-import unittest
-
-from pixelated.adapter.model.mail import PixelatedMail
-from pixelated.adapter.services.mailbox import Mailbox
-from mockito import mock, when, verify
-from test.support import test_helper
-
-
-class PixelatedMailboxTest(unittest.TestCase):
- def setUp(self):
- self.querier = mock()
- self.search_engine = mock()
- self.mailbox = Mailbox('INBOX', self.querier, self.search_engine)
-
- def test_remove_message_from_mailbox(self):
- mail = PixelatedMail.from_soledad(*test_helper.leap_mail(), soledad_querier=self.querier)
- when(self.querier).mail(1).thenReturn(mail)
-
- self.mailbox.remove(1)
-
- verify(self.querier).remove_mail(mail)
-
- def test_fresh_mailbox_checking_lastuid(self):
- when(self.querier).get_lastuid('INBOX').thenReturn(0)
- self.assertTrue(self.mailbox.fresh)
- when(self.querier).get_lastuid('INBOX').thenReturn(1)
- self.assertFalse(self.mailbox.fresh)
diff --git a/service/test/unit/adapter/test_mailbox_indexer_listener.py b/service/test/unit/adapter/test_mailbox_indexer_listener.py
index 71c9cd15..9ad3c94d 100644
--- a/service/test/unit/adapter/test_mailbox_indexer_listener.py
+++ b/service/test/unit/adapter/test_mailbox_indexer_listener.py
@@ -13,15 +13,18 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
-import unittest
+from twisted.trial import unittest
-from mockito import mock, when, verify
+from mockito import mock, when, verify, any as ANY
from pixelated.adapter.listeners.mailbox_indexer_listener import MailboxIndexerListener
+from twisted.internet import defer
+
+from pixelated.adapter.listeners.mailbox_indexer_listener import logger
class MailboxListenerTest(unittest.TestCase):
def setUp(self):
- self.querier = mock()
+ self.mail_store = mock()
self.account = mock()
self.account.mailboxes = []
@@ -32,11 +35,11 @@ class MailboxListenerTest(unittest.TestCase):
mailbox.listeners = set()
when(mailbox).addListener = lambda x: mailbox.listeners.add(x)
- self.assertNotIn(MailboxIndexerListener('INBOX', self.querier), mailbox.listeners)
+ self.assertNotIn(MailboxIndexerListener('INBOX', self.mail_store), mailbox.listeners)
- MailboxIndexerListener.listen(self.account, 'INBOX', self.querier)
+ MailboxIndexerListener.listen(self.account, 'INBOX', self.mail_store)
- self.assertIn(MailboxIndexerListener('INBOX', self.querier), mailbox.listeners)
+ self.assertIn(MailboxIndexerListener('INBOX', self.mail_store), mailbox.listeners)
def test_reindex_missing_idents(self):
search_engine = mock()
@@ -44,11 +47,20 @@ class MailboxListenerTest(unittest.TestCase):
MailboxIndexerListener.SEARCH_ENGINE = search_engine
- listener = MailboxIndexerListener('INBOX', self.querier)
- when(self.querier).idents_by_mailbox('INBOX').thenReturn({'ident1', 'ident2', 'missing_ident'})
- self.querier.used_arguments = []
- self.querier.mails = lambda x: self.querier.used_arguments.append(x)
+ listener = MailboxIndexerListener('INBOX', self.mail_store)
+ when(self.mail_store).get_mailbox_mail_ids('INBOX').thenReturn({'ident1', 'ident2', 'missing_ident'})
+ self.mail_store.used_arguments = []
+ self.mail_store.get_mails = lambda x: self.mail_store.used_arguments.append(x)
listener.newMessages(10, 5)
- verify(self.querier, times=1).idents_by_mailbox('INBOX')
- self.assertIn({'missing_ident'}, self.querier.used_arguments)
+ verify(self.mail_store, times=1).get_mails('INBOX')
+ self.assertIn({'missing_ident'}, self.mail_store.used_arguments)
+
+ @defer.inlineCallbacks
+ def test_catches_exceptions_to_not_break_other_listeners(self):
+ when(logger).error(ANY()).thenReturn(None)
+ listener = MailboxIndexerListener('INBOX', self.mail_store)
+
+ yield listener.newMessages(1, 1)
+
+ verify(logger).error(ANY())
diff --git a/service/test/unit/adapter/test_mailboxes.py b/service/test/unit/adapter/test_mailboxes.py
deleted file mode 100644
index 6ff3849b..00000000
--- a/service/test/unit/adapter/test_mailboxes.py
+++ /dev/null
@@ -1,41 +0,0 @@
-#
-# Copyright (c) 2014 ThoughtWorks, Inc.
-#
-# Pixelated is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# Pixelated is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
-import unittest
-
-from pixelated.adapter.model.mail import PixelatedMail
-from pixelated.adapter.services.mailboxes import Mailboxes
-from mockito import mock, when, verify
-from test.support import test_helper
-from mock import MagicMock
-
-
-class PixelatedMailboxesTest(unittest.TestCase):
-
- def setUp(self):
- self.querier = mock()
- self.search_engine = mock()
- self.account = MagicMock()
- self.mailboxes = Mailboxes(self.account, self.querier, self.search_engine)
-
- def test_move_to_inbox(self):
- mail = PixelatedMail.from_soledad(*test_helper.leap_mail(), soledad_querier=self.querier)
- when(self.querier).mail(1).thenReturn(mail)
- when(mail).save().thenReturn(None)
-
- mail.set_mailbox('TRASH')
- recovered_mail = self.mailboxes.move_to_inbox(1)
- self.assertEquals('INBOX', recovered_mail.mailbox_name)
- verify(mail).save()
diff --git a/service/test/unit/adapter/test_soledad_querier.py b/service/test/unit/adapter/test_soledad_querier.py
deleted file mode 100644
index e5ea457d..00000000
--- a/service/test/unit/adapter/test_soledad_querier.py
+++ /dev/null
@@ -1,150 +0,0 @@
-#
-# Copyright (c) 2014 ThoughtWorks, Inc.
-#
-# Pixelated is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# Pixelated is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
-import unittest
-import json
-import base64
-import quopri
-
-from pixelated.adapter.soledad.soledad_querier import SoledadQuerier
-from mockito import mock, when, any
-import os
-
-
-class SoledadQuerierTest(unittest.TestCase):
-
- def test_extract_parts(self):
- soledad = mock()
- bdoc = mock()
- bdoc.content = {'raw': 'esse papo seu ta qualquer coisa'}
- when(soledad).get_from_index('by-type-and-payloadhash', 'cnt', any(unicode)).thenReturn([bdoc])
- multipart_attachment_file = os.path.join(os.path.dirname(__file__), '..', 'fixtures', 'multipart_attachment.json')
- with open(multipart_attachment_file) as f:
- hdoc = json.loads(f.read())
- querier = SoledadQuerier(soledad)
-
- parts = querier._extract_parts(hdoc)
-
- self.assertIn('alternatives', parts.keys())
- self.assertIn('attachments', parts.keys())
- self.assertEquals(2, len(parts['alternatives']))
- self.assertEquals(1, len(parts['attachments']))
-
- self.check_alternatives(parts)
- self.check_attachments(parts)
-
- def check_alternatives(self, parts):
- for alternative in parts['alternatives']:
- self.assertIn('headers', alternative)
- self.assertIn('content', alternative)
-
- def check_attachments(self, parts):
- for attachment in parts['attachments']:
- self.assertIn('headers', attachment)
- self.assertIn('ident', attachment)
- self.assertIn('name', attachment)
-
- def test_extract_part_without_headers(self):
- soledad = mock()
- bdoc = mock()
- bdoc.content = {'raw': 'esse papo seu ta qualquer coisa'}
- when(soledad).get_from_index('by-type-and-payloadhash', 'cnt', any(unicode)).thenReturn([bdoc])
- hdoc = {'multi': True, 'part_map': {'1': {'multi': False, 'phash': u'0400BEBACAFE'}}}
- querier = SoledadQuerier(soledad)
-
- parts = querier._extract_parts(hdoc)
-
- self.assertEquals(bdoc.content['raw'], parts['alternatives'][0]['content'])
-
- def test_extract_handles_missing_part_map(self):
- soledad = mock()
- hdoc = {u'multi': True,
- u'ctype': u'message/delivery-status',
- u'headers': [[u'Content-Description', u'Delivery report'], [u'Content-Type', u'message/delivery-status']],
- u'parts': 2,
- u'phash': None,
- u'size': 554}
- querier = SoledadQuerier(soledad)
-
- parts = querier._extract_parts(hdoc)
-
- self.assertEquals(0, len(parts['alternatives']))
- self.assertEquals(0, len(parts['attachments']))
-
- def test_attachment_base64(self):
- soledad = mock()
- bdoc = mock()
- bdoc.content = {'raw': base64.encodestring('esse papo seu ta qualquer coisa'), 'content-type': 'text/plain'}
- when(soledad).get_from_index('by-type-and-payloadhash', 'cnt', any(unicode)).thenReturn([bdoc])
- querier = SoledadQuerier(soledad)
-
- attachment = querier.attachment(u'0400BEBACAFE', 'base64')
-
- self.assertEquals('esse papo seu ta qualquer coisa', attachment['content'])
-
- def test_attachment_quoted_printable(self):
- soledad = mock()
- bdoc = mock()
- bdoc.content = {'raw': quopri.encodestring('esse papo seu ta qualquer coisa'), 'content-type': 'text/plain'}
- when(soledad).get_from_index('by-type-and-payloadhash', 'cnt', any(unicode)).thenReturn([bdoc])
- querier = SoledadQuerier(soledad)
-
- attachment = querier.attachment(u'0400BEBACAFE', 'quoted-printable')
-
- self.assertEquals('esse papo seu ta qualquer coisa', attachment['content'])
-
- def test_empty_or_null_queries_are_ignored(self):
- soledad = mock()
- when(soledad).get_from_index(any(), any(), any()).thenReturn(['nonempty', 'list'])
- querier = SoledadQuerier(soledad)
-
- test_parameters = ['', None]
-
- def call_with_bad_parameters(funct):
- for param in test_parameters:
- self.assertFalse(funct(param))
-
- call_with_bad_parameters(querier.get_all_flags_by_mbox)
- call_with_bad_parameters(querier.get_content_by_phash)
- call_with_bad_parameters(querier.get_flags_by_chash)
- call_with_bad_parameters(querier.get_header_by_chash)
- call_with_bad_parameters(querier.get_recent_by_mbox)
- call_with_bad_parameters(querier.idents_by_mailbox)
- call_with_bad_parameters(querier.get_mbox)
-
- def test_get_lastuid(self):
- soledad = mock()
- mbox = mock()
- mbox.content = {'lastuid': 0}
- when(soledad).get_from_index('by-type-and-mbox', 'mbox', 'INBOX').thenReturn([mbox])
- querier = SoledadQuerier(soledad)
-
- self.assertEquals(querier.get_lastuid(querier.get_mbox('INBOX')[0]), 0)
- mbox.content = {'lastuid': 1}
- self.assertEquals(querier.get_lastuid(querier.get_mbox('INBOX')[0]), 1)
-
- def test_create_mail_increments_uid(self):
- soledad = mock()
- mbox = mock()
- mail = mock()
- when(mail).get_for_save(next_uid=any(), mailbox='INBOX').thenReturn([])
- mbox.content = {'lastuid': 0}
- when(soledad).get_from_index('by-type-and-mbox', 'mbox', 'INBOX').thenReturn([mbox])
- querier = SoledadQuerier(soledad)
- when(querier).mail(any()).thenReturn([])
-
- self.assertEquals(querier.get_lastuid(querier.get_mbox('INBOX')[0]), 0)
- querier.create_mail(mail, 'INBOX')
- self.assertEquals(querier.get_lastuid(querier.get_mbox('INBOX')[0]), 1)
diff --git a/service/test/unit/bitmask_libraries/test_abstract_leap.py b/service/test/unit/bitmask_libraries/test_abstract_leap.py
index 64de09bc..521d9cd4 100644
--- a/service/test/unit/bitmask_libraries/test_abstract_leap.py
+++ b/service/test/unit/bitmask_libraries/test_abstract_leap.py
@@ -19,28 +19,33 @@ from uuid import uuid4
import os
from mock import Mock, MagicMock
+from pixelated.adapter.mailstore import MailStore
class AbstractLeapTest(unittest.TestCase):
- _uuid = str(uuid4())
- _session_id = str(uuid4())
- _token = str(uuid4())
- leap_home = os.path.join(tempfile.mkdtemp(), 'leap')
+ def setUp(self):
+ self._uuid = str(uuid4())
+ self._session_id = str(uuid4())
+ self._token = str(uuid4())
- config = Mock(leap_home=leap_home, bootstrap_ca_cert_bundle='/some/path/to/ca_cert', ca_cert_bundle='/some/path/to/provider_ca_cert', gpg_binary='/path/to/gpg')
- provider = Mock(config=config, server_name='some-server.test', domain='some-server.test',
- api_uri='https://api.some-server.test:4430', api_version='1')
- soledad = Mock()
- soledad_session = Mock(soledad=soledad)
- auth = Mock(username='test_user',
- api_server_name='some-server.test',
- uuid=_uuid,
- session_id=_session_id,
- token=_token)
+ self.leap_home = os.path.join(tempfile.mkdtemp(), 'leap')
- nicknym = MagicMock()
+ self.config = Mock(leap_home=self.leap_home, bootstrap_ca_cert_bundle='/some/path/to/ca_cert', ca_cert_bundle='/some/path/to/provider_ca_cert', gpg_binary='/path/to/gpg')
+ self.provider = Mock(config=self.config, server_name='some-server.test', domain='some-server.test',
+ api_uri='https://api.some-server.test:4430', api_version='1')
+ self.soledad = Mock()
+ self.soledad_session = Mock(soledad=self.soledad)
+ self.auth = Mock(username='test_user',
+ api_server_name='some-server.test',
+ uuid=self._uuid,
+ session_id=self._session_id,
+ token=self._token)
- soledad_account = MagicMock()
+ self.nicknym = MagicMock()
- mail_fetcher_mock = MagicMock()
+ self.soledad_account = MagicMock()
+
+ self.mail_fetcher_mock = MagicMock()
+
+ self.mail_store = MagicMock(spec=MailStore)
diff --git a/service/test/unit/bitmask_libraries/test_nicknym.py b/service/test/unit/bitmask_libraries/test_nicknym.py
index ca3b348d..dc4845d1 100644
--- a/service/test/unit/bitmask_libraries/test_nicknym.py
+++ b/service/test/unit/bitmask_libraries/test_nicknym.py
@@ -39,12 +39,12 @@ class NickNymTest(AbstractLeapTest):
'test_user@some-server.test',
'https://nicknym.some-server.test:6425/',
self.soledad,
- self.auth.token,
- '/some/path/to/provider_ca_cert',
- 'https://api.some-server.test:4430',
- '1',
- self.auth.uuid,
- '/path/to/gpg')
+ token=self.auth.token,
+ ca_cert_path='/some/path/to/provider_ca_cert',
+ api_uri='https://api.some-server.test:4430',
+ api_version='1',
+ uid=self.auth.uuid,
+ gpgbinary='/path/to/gpg')
@patch('pixelated.bitmask_libraries.nicknym.KeyManager')
def test_gen_key(self, keymanager_mock):
@@ -61,5 +61,5 @@ class NickNymTest(AbstractLeapTest):
# when/then
nicknym.generate_openpgp_key()
- keyman.get_key.assert_called_with('test_user@some-server.test', openpgp.OpenPGPKey, fetch_remote=False, private=True)
+ keyman.get_key.assert_called_with('test_user@some-server.test', openpgp.OpenPGPKey, private=True, fetch_remote=False)
keyman.gen_key.assert_called_with(openpgp.OpenPGPKey)
diff --git a/service/test/unit/bitmask_libraries/test_provider.py b/service/test/unit/bitmask_libraries/test_provider.py
index 1fe5a66d..df851203 100644
--- a/service/test/unit/bitmask_libraries/test_provider.py
+++ b/service/test/unit/bitmask_libraries/test_provider.py
@@ -188,9 +188,13 @@ class LeapProviderTest(AbstractLeapTest):
provider.fetch_valid_certificate()
def test_throw_exception_for_invalid_certificate(self):
+ expected_exception_message = 'Certificate fingerprints don\'t match! Expected [0123456789012345678901234567890123456789012345678901234567890123] but got [06e2300bdbc118c290eda0dc977c24080718f4eeca68c8b0ad431872a2baa22d]'
+
with HTTMock(provider_json_invalid_fingerprint_mock, ca_cert_mock, not_found_mock):
provider = LeapProvider('some-provider.test', self.config)
- self.assertRaises(Exception, provider.fetch_valid_certificate)
+ with self.assertRaises(Exception) as cm:
+ provider.fetch_valid_certificate()
+ self.assertEqual(expected_exception_message, cm.exception.message)
def test_that_bootstrap_cert_is_used_to_fetch_certificate(self):
session = MagicMock(wraps=requests.session())
diff --git a/service/test/unit/bitmask_libraries/test_session.py b/service/test/unit/bitmask_libraries/test_session.py
index 0c662ecb..e20f96f9 100644
--- a/service/test/unit/bitmask_libraries/test_session.py
+++ b/service/test/unit/bitmask_libraries/test_session.py
@@ -18,40 +18,29 @@ from mock import MagicMock
from pixelated.bitmask_libraries.session import LeapSession
from test_abstract_leap import AbstractLeapTest
+from twisted.internet import defer
class SessionTest(AbstractLeapTest):
def setUp(self):
- self.mail_fetcher_mock = MagicMock()
+ super(SessionTest, self).setUp()
self.smtp_mock = MagicMock()
- def tearDown(self):
- self.mail_fetcher_mock = MagicMock()
-
- def test_background_jobs_are_started(self):
- self.config.start_background_jobs = True
-
+ def test_background_jobs_are_started_during_initial_sync(self):
with patch('pixelated.bitmask_libraries.session.reactor.callFromThread', new=_execute_func) as _:
- self._create_session()
-
- self.mail_fetcher_mock.start_loop.assert_called_once_with()
-
- def test_background_jobs_are_not_started(self):
- self.config.start_background_jobs = False
-
- with patch('pixelated.bitmask_libraries.session.reactor.callFromThread', new=_execute_func) as _:
- self._create_session()
-
- self.assertFalse(self.mail_fetcher_mock.start_loop.called)
+ with patch('pixelated.bitmask_libraries.session.LeapSession._create_incoming_mail_fetcher') as mail_fetcher_mock:
+ session = self._create_session()
+ yield session.initial_sync()
+ mail_fetcher_mock.startService.assert_called_once_with()
def test_that_close_stops_background_jobs(self):
with patch('pixelated.bitmask_libraries.session.reactor.callFromThread', new=_execute_func) as _:
- session = self._create_session()
-
- session.close()
-
- self.mail_fetcher_mock.stop.assert_called_once_with()
+ with patch('pixelated.bitmask_libraries.session.LeapSession._create_incoming_mail_fetcher') as mail_fetcher_mock:
+ session = self._create_session()
+ yield session.initial_sync()
+ session.close()
+ mail_fetcher_mock.stopService.assert_called_once_with()
def test_that_sync_deferes_to_soledad(self):
session = self._create_session()
@@ -61,8 +50,7 @@ class SessionTest(AbstractLeapTest):
self.soledad_session.sync.assert_called_once_with()
def _create_session(self):
- return LeapSession(self.provider, self.auth, self.soledad_session, self.nicknym, self.soledad_account,
- self.mail_fetcher_mock, self.smtp_mock)
+ return LeapSession(self.provider, self.auth, self.mail_store, self.soledad_session, self.nicknym, self.smtp_mock)
def _execute_func(func):
diff --git a/service/test/unit/bitmask_libraries/test_smtp.py b/service/test/unit/bitmask_libraries/test_smtp.py
index ec51c56b..9481c488 100644
--- a/service/test/unit/bitmask_libraries/test_smtp.py
+++ b/service/test/unit/bitmask_libraries/test_smtp.py
@@ -42,6 +42,7 @@ class LeapSmtpTest(AbstractLeapTest):
keymanager = MagicMock()
def setUp(self):
+ super(LeapSmtpTest, self).setUp()
self.provider.fetch_smtp_json.return_value = {
'hosts': {
'leap-mx': {
diff --git a/service/test/unit/bitmask_libraries/test_soledad.py b/service/test/unit/bitmask_libraries/test_soledad.py
index a3a1094a..af2cfd0a 100644
--- a/service/test/unit/bitmask_libraries/test_soledad.py
+++ b/service/test/unit/bitmask_libraries/test_soledad.py
@@ -15,13 +15,15 @@
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
from mock import patch
from pixelated.bitmask_libraries.soledad import SoledadSession
+from pixelated.bitmask_libraries.certs import LeapCertificate
from test_abstract_leap import AbstractLeapTest
-@patch('pixelated.bitmask_libraries.soledad.Soledad')
class SoledadSessionTest(AbstractLeapTest):
def setUp(self):
+ super(SoledadSessionTest, self).setUp()
+
# given
self.provider.fetch_soledad_json.return_value = {'hosts': {
'couch1': {
@@ -31,39 +33,29 @@ class SoledadSessionTest(AbstractLeapTest):
}
}}
- @patch('pixelated.bitmask_libraries.soledad.Soledad.__init__')
- def test_that_soledad_is_created_with_required_params(self, soledad_mock, init_mock):
+ @patch('pixelated.bitmask_libraries.soledad.Soledad')
+ def test_that_soledad_is_created_with_required_params(self, soledad_mock):
+ soledad_mock.return_value = None
# when
SoledadSession(self.provider, 'any-passphrase', self.auth.token, self.auth.uuid)
# then
- init_mock.assert_called_with(self.auth.uuid, 'any-passphrase', '%s/soledad/%s.secret' % (self.leap_home, self.auth.uuid),
- '%s/soledad/%s.db' % (self.leap_home, self.auth.uuid),
- 'https://couch1.some-server.test:1234/user-%s' % self.auth.uuid,
- '/some/path/to/ca_cert', self.token, defer_encryption=False)
-
+ soledad_mock.assert_called_with(self.auth.uuid, passphrase=u'any-passphrase',
+ secrets_path='%s/soledad/%s.secret' % (self.leap_home, self.auth.uuid),
+ local_db_path='%s/soledad/%s.db' % (self.leap_home, self.auth.uuid),
+ server_url='https://couch1.some-server.test:1234/user-%s' % self.auth.uuid,
+ cert_file=LeapCertificate(self.provider).provider_api_cert,
+ shared_db=None,
+ auth_token=self.auth.token, defer_encryption=False)
+
+ @patch('pixelated.bitmask_libraries.soledad.Soledad')
def test_that_sync_is_called(self, soledad_mock):
- instance = soledad_mock.return_value
- instance.server_url = '/foo/bar'
- instance.need_sync.return_value = True
- soledad_session = SoledadSession(self.provider, 'any-passphrase', self.auth.token, self.auth.uuid)
-
- # when
- soledad_session.sync()
-
- # then
- instance.need_sync.assert_called_with('/foo/bar')
- instance.sync.assert_called_with()
+ instance = soledad_mock.return_value
+ instance.server_url = '/foo/bar'
+ soledad_session = SoledadSession(self.provider, 'any-passphrase', self.auth.token, self.auth.uuid)
- def test_that_sync_not_called_if_not_needed(self, mock):
- instance = mock.return_value
- instance.server_url = '/foo/bar'
- instance.need_sync.return_value = False
- soledad_session = SoledadSession(self.provider, 'any-passphrase', self.auth.token, self.auth.uuid)
-
- # when
- soledad_session.sync()
+ # when
+ soledad_session.sync()
- # then
- instance.need_sync.assert_called_with('/foo/bar')
- self.assertFalse(instance.sync.called)
+ # then
+ instance.sync.assert_called_with()
diff --git a/service/test/unit/config/test_register.py b/service/test/unit/config/test_register.py
index 8e1a71a4..08cf56f0 100644
--- a/service/test/unit/config/test_register.py
+++ b/service/test/unit/config/test_register.py
@@ -1,6 +1,6 @@
import unittest
-from pixelated.register import validate_username
+from pixelated.register import validate_username, validate_password
class TestRegister(unittest.TestCase):
@@ -13,6 +13,10 @@ class TestRegister(unittest.TestCase):
with self.assertRaises(ValueError):
validate_username('invalid@username')
+ def test_password_raises_error_if_shorter_than_8_characters(self):
+ with self.assertRaises(ValueError):
+ validate_password('short')
+
def test_username_pass_when_valid(self):
try:
validate_username('a.valid_username-123')
diff --git a/service/test/unit/config/test_site.py b/service/test/unit/config/test_site.py
new file mode 100644
index 00000000..1858bfaf
--- /dev/null
+++ b/service/test/unit/config/test_site.py
@@ -0,0 +1,28 @@
+import unittest
+from mockito import mock
+from pixelated.config.site import PixelatedSite
+from twisted.protocols.basic import LineReceiver
+
+
+class TestPixelatedSite(unittest.TestCase):
+ def test_add_csp_header_request(self):
+ request = self.create_request()
+ request.process()
+ headers = request.headers
+
+ header_value = "default-src 'self'; style-src 'self' 'unsafe-inline'"
+ self.assertEqual(headers.get("Content-Security-Policy"), header_value)
+ self.assertEqual(headers.get("X-Content-Security-Policy"), header_value)
+ self.assertEqual(headers.get("X-Webkit-CSP"), header_value)
+
+ def create_request(self):
+ channel = LineReceiver()
+ channel.site = PixelatedSite(mock())
+ request = PixelatedSite.requestFactory(channel=channel, queued=True)
+ request.method = "GET"
+ request.uri = "localhost"
+ request.clientproto = 'HTTP/1.1'
+ request.prepath = []
+ request.postpath = request.uri.split('/')[1:]
+ request.path = "/"
+ return request
diff --git a/service/test/unit/fixtures/__init__.py b/service/test/unit/fixtures/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/service/test/unit/fixtures/__init__.py
diff --git a/service/test/unit/fixtures/bounced_mail_hdoc.json b/service/test/unit/fixtures/bounced_mail_hdoc.json
deleted file mode 100644
index 2cc5997c..00000000
--- a/service/test/unit/fixtures/bounced_mail_hdoc.json
+++ /dev/null
@@ -1,218 +0,0 @@
-{
- "body": "3583150D422268B15A27553279767CD26D7AC20937E8BDA8DC55BA53CFF0E9B2",
- "chash": "E374570706F4263C533103893344FD6CBE62B9807ECCC082C159FECF60277656",
- "date": "Wed, 11 Feb 2015 20:49:14 +0100 (CET)",
- "headers": {
- "Auto-Submitted": "auto-replied",
- "Content-Type": "multipart/report; report-type=delivery-status;\n boundary=\"499F57FF7C.1423684154/domain.org\"",
- "Date": "Wed, 11 Feb 2015 20:49:14 +0100 (CET)",
- "Delivered-To": "412cab846cd0d6327f4505e9c4112c64@domain.org",
- "From": "MAILER-DAEMON@domain.org (Mail Delivery System)",
- "MIME-Version": "1.0",
- "Message-Id": "<20150211194914.12CB3804C7@domain.org>",
- "Received": "by domain.org (Postfix)\n id 12CB3804C7; Wed, 11 Feb 2015 20:49:14 +0100 (CET)",
- "Return-Path": "<>",
- "Subject": "Undelivered Mail Returned to Sender",
- "To": "cuzcuz2@domain.org",
- "X-Leap-Provenance": "Wed, 11 Feb 2015 19:49:14 -0000; pubkey=\"E2F104EE8B01F675\"",
- "X-Leap-Signature": "could not verify",
- "X-Original-To": "cuzcuz2@domain.org"
- },
- "msgid": null,
- "multi": true,
- "part_map": {
- "1": {
- "ctype": "text/plain",
- "headers": [
- [
- "Content-Description",
- "Notification"
- ],
- [
- "Content-Type",
- "text/plain; charset=us-ascii"
- ]
- ],
- "multi": false,
- "parts": 1,
- "phash": "3583150D422268B15A27553279767CD26D7AC20937E8BDA8DC55BA53CFF0E9B2",
- "size": 896
- },
- "2": {
- "ctype": "message/delivery-status",
- "headers": [
- [
- "Content-Description",
- "Delivery report"
- ],
- [
- "Content-Type",
- "message/delivery-status"
- ]
- ],
- "multi": true,
- "parts": 2,
- "phash": null,
- "size": 772
- },
- "3": {
- "ctype": "text/plain",
- "headers": [
- [
- "Reporting-MTA",
- "dns; domain.org"
- ],
- [
- "X-Postfix-Queue-ID",
- "499F57FF7C"
- ],
- [
- "X-Postfix-Sender",
- "rfc822; cuzcuz2@domain.org"
- ],
- [
- "Arrival-Date",
- "Wed, 11 Feb 2015 20:49:10 +0100 (CET)"
- ]
- ],
- "multi": false,
- "part_map": {
- "0": {
- "ctype": "text/plain",
- "headers": [
- [
- "Final-Recipient",
- "rfc822; this_mail_was_bounced@domain.com"
- ],
- [
- "Original-Recipient",
- "rfc822;this_mail_was_bounced@domain.com"
- ],
- [
- "Action",
- "failed"
- ],
- [
- "Status",
- "5.1.1"
- ],
- [
- "Remote-MTA",
- "dns; ASPMX.L.DOMAIN.com"
- ],
- [
- "Diagnostic-Code",
- "smtp; 550-5.1.1 The email account that you tried to reach does\n not exist. Please try 550-5.1.1 double-checking the\n recipient's email\n address for typos or 550-5.1.1 unnecessary spaces."
- ]
- ],
- "multi": false,
- "parts": 1,
- "phash": "E3B0C44298FC1C149AFBF4C8996FB92427AE41E4649B934CA495991B7852B855",
- "size": 497
- },
- "1": {
- "ctype": "message/rfc822",
- "headers": [
- [
- "Content-Description",
- "Undelivered Message"
- ],
- [
- "Content-Type",
- "message/rfc822"
- ]
- ],
- "multi": true,
- "parts": 1,
- "phash": null,
- "size": 2397
- },
- "2": {
- "headers": {
- "Content-Type": "multipart/signed; protocol=\"application/pgp-signature\";\n micalg=\"pgp-sha512\"; boundary=\"===============5178971458699783746==\"",
- "Date": "2015-02-11T16:48:54.245828-03:00",
- "From": "cuzcuz2@domain.org",
- "MIME-Version": "1.0",
- "Message-Id": "<20150211194857.19888.1605637474.0@host>",
- "OpenPGP": "id=E2F104EE8B01F675;\n url=\"https://domain.org/key/cuzcuz2\"; preference=\"signencrypt\"",
- "Received": "from 0.3.9-1-gc1f9c92 (unknown [127.0.0.1])\n (using TLSv1 with cipher ECDHE-RSA-AES256-SHA (256/256 bits))\n (Client CN \"UNLIMITEDdbe63unx9tfxa286ol3che4vx\",\n Issuer \"LEAP_Example Root CA (client certificates only!)\" (verified OK))\n by domain.org (Postfix) with ESMTPS id 499F57FF7C\n for <this_mail_was_bounced@domain.com>; Wed, 11 Feb 2015 20:49:10 +0100 (CET)",
- "Return-Path": "<cuzcuz2@domain.org>",
- "Subject": "volte",
- "To": "this_mail_was_bounced@domain.com"
- },
- "multi": true,
- "part_map": {
- "1": {
- "ctype": "multipart/mixed",
- "headers": [
- [
- "Received",
- "by bitmask.local from 127.0.0.1 with ESMTP ;\n Wed, 11 Feb 2015 16:48:55 -0300"
- ],
- [
- "Content-Type",
- "multipart/mixed; boundary=\"===============4044974129166450777==\""
- ],
- [
- "MIME-Version",
- "1.0"
- ]
- ],
- "multi": true,
- "parts": 1,
- "phash": null,
- "size": 370
- },
- "2": {
- "ctype": "text/plain",
- "headers": [
- [
- "Content-Type",
- "text/plain; charset=\"utf-8\""
- ],
- [
- "MIME-Version",
- "1.0"
- ],
- [
- "Content-Transfer-Encoding",
- "base64"
- ]
- ],
- "multi": false,
- "parts": 1,
- "phash": "231B2728DE0EF4968B1183F0DD7FA9C963A90996E64B9B7A9AB6936F0B1EADB7",
- "size": 100
- }
- }
- },
- "3": {
- "ctype": "application/pgp-signature",
- "headers": [
- [
- "Content-Type",
- "application/pgp-signature; name=\"signature.asc\""
- ],
- [
- "MIME-Version",
- "1.0"
- ],
- [
- "Content-Description",
- "OpenPGP Digital Signature"
- ]
- ],
- "multi": false,
- "parts": 1,
- "phash": "90A694453A671BA3144BA9147ACA4C4B41DF7E714377C584E802ED5469AB5365",
- "size": 929
- }
- },
- "parts": 1,
- "phash": "E3B0C44298FC1C149AFBF4C8996FB92427AE41E4649B934CA495991B7852B855",
- "size": 200
- }
- },
- "subject": "Undelivered Mail Returned to Sender",
- "type": "head"
-}
diff --git a/service/test/unit/fixtures/mailset/new/mbox00000000 b/service/test/unit/fixtures/mailset/new/mbox00000000
index 3d01c203..8c80257d 100644
--- a/service/test/unit/fixtures/mailset/new/mbox00000000
+++ b/service/test/unit/fixtures/mailset/new/mbox00000000
@@ -1,4 +1,4 @@
-From darby.senger@zemlak.biz
+From: darby.senger@zemlak.biz
Subject: Itaque consequatur repellendus provident sunt quia.
To: carmel@murazikortiz.name
X-TW-Pixelated-Tags: nite, macro, trash
diff --git a/service/test/unit/fixtures/mailset/new/mbox00000001 b/service/test/unit/fixtures/mailset/new/mbox00000001
index fc76bba2..ba563430 100644
--- a/service/test/unit/fixtures/mailset/new/mbox00000001
+++ b/service/test/unit/fixtures/mailset/new/mbox00000001
@@ -1,4 +1,4 @@
-From madeline.littel@sanfordruel.com
+From: madeline.littel@sanfordruel.com
Subject: Error illum dignissimos autem eos aspernatur.
To: phyllis@stiedemann.net
X-TW-Pixelated-Tags: instadaily, inspiration
diff --git a/service/test/unit/maintenance/test_commands.py b/service/test/unit/maintenance/test_commands.py
index f1bf6e45..52fe6ca2 100644
--- a/service/test/unit/maintenance/test_commands.py
+++ b/service/test/unit/maintenance/test_commands.py
@@ -18,13 +18,13 @@ import email
from pixelated.maintenance import delete_all_mails, load_mails
from pixelated.bitmask_libraries.session import LeapSession
-from leap.mail.imap.account import SoledadBackedAccount
-from leap.mail.imap.fields import WithMsgFields
+from pixelated.adapter.mailstore import MailStore
from leap.soledad.client import Soledad
from leap.soledad.common.document import SoledadDocument
from mock import MagicMock
-from os.path import join, dirname
-from twisted.internet import defer, reactor
+from os.path import join
+from twisted.internet import defer
+import pkg_resources
class TestCommands(unittest.TestCase):
@@ -32,10 +32,8 @@ class TestCommands(unittest.TestCase):
def setUp(self):
self.leap_session = MagicMock(spec=LeapSession)
self.soledad = MagicMock(spec=Soledad)
- self.account = MagicMock(spec=SoledadBackedAccount)
- self.mailbox = MagicMock()
- self.leap_session.account = self.account
- self.account.getMailbox.return_value = self.mailbox
+ self.mail_store = MagicMock(spec=MailStore)
+ self.leap_session.mail_store = self.mail_store
self.args = (self.leap_session, self.soledad)
@@ -77,23 +75,25 @@ class TestCommands(unittest.TestCase):
def test_load_mails_empty_path_list(self):
load_mails(self.args, [])
- self.assertFalse(self.mailbox.called)
+ self.assertFalse(self.mail_store.add_mailbox.called)
def test_load_mails_adds_mails(self):
# given
- mail_root = join(dirname(__file__), '..', 'fixtures', 'mailset')
- firstMailDeferred = defer.Deferred()
- secondMailDeferred = defer.Deferred()
- self.mailbox.addMessage.side_effect = [firstMailDeferred, secondMailDeferred]
+ mail_root = pkg_resources.resource_filename('test.unit.fixtures', 'mailset')
+ firstMailDeferred = defer.succeed(None)
+ secondMailDeferred = defer.succeed(None)
+ self.mail_store.add_mail.side_effect = [firstMailDeferred, secondMailDeferred]
+ self.mail_store.add_mailbox.return_value = defer.succeed(None)
# when
d = load_mails(self.args, [mail_root])
# then
def assert_mails_added(_):
- self.assertTrue(self.mailbox.addMessage.called)
- self.mailbox.addMessage.assert_any_call(self._mail_content(join(mail_root, 'new', 'mbox00000000')), flags=(WithMsgFields.RECENT_FLAG,), notify_on_disk=False)
- self.mailbox.addMessage.assert_any_call(self._mail_content(join(mail_root, 'new', 'mbox00000001')), flags=(WithMsgFields.RECENT_FLAG,), notify_on_disk=False)
+ self.assertTrue(self.mail_store.add_mail.called)
+ self.mail_store.add_mail.assert_any_call('INBOX', self._mail_content(join(mail_root, 'new', 'mbox00000000')))
+ self.mail_store.add_mail.assert_any_call('INBOX', self._mail_content(join(mail_root, 'new', 'mbox00000001')))
+ # TODO Should we check for flags?
def error_callack(err):
print err
@@ -102,10 +102,6 @@ class TestCommands(unittest.TestCase):
d.addCallback(assert_mails_added)
d.addErrback(error_callack)
- # trigger callbacks for both mails
- reactor.callLater(0, firstMailDeferred.callback, None)
- reactor.callLater(0, secondMailDeferred.callback, None)
-
return d
def _mail_content(self, mail_file):
diff --git a/service/test/unit/resources/test_feedback_resource.py b/service/test/unit/resources/test_feedback_resource.py
new file mode 100644
index 00000000..63e6efc4
--- /dev/null
+++ b/service/test/unit/resources/test_feedback_resource.py
@@ -0,0 +1,27 @@
+import json
+from mockito import verify, mock, when
+from twisted.trial import unittest
+from twisted.web.test.requesthelper import DummyRequest
+from pixelated.resources.feedback_resource import FeedbackResource
+from test.unit.resources import DummySite
+
+
+class TestFeedbackResource(unittest.TestCase):
+ def setUp(self):
+ self.feedback_service = mock()
+ self.web = DummySite(FeedbackResource(self.feedback_service))
+
+ def test_sends_feedback_to_leap_web(self):
+ request = DummyRequest(['/feedback'])
+ request.method = 'POST'
+ content = mock()
+ when(content).read().thenReturn(json.dumps({'feedback': 'Pixelated is awesome!'}))
+ request.content = content
+
+ d = self.web.get(request)
+
+ def assert_posted_feedback_to_leap_web(_):
+ verify(self.feedback_service).open_ticket('Pixelated is awesome!')
+
+ d.addCallback(assert_posted_feedback_to_leap_web)
+ return d
diff --git a/service/test/unit/resources/test_keys_resources.py b/service/test/unit/resources/test_keys_resources.py
index be79424b..1990efe8 100644
--- a/service/test/unit/resources/test_keys_resources.py
+++ b/service/test/unit/resources/test_keys_resources.py
@@ -1,8 +1,11 @@
+import json
+import ast
from mockito import mock, when
from leap.keymanager import OpenPGPKey, KeyNotFound
from pixelated.resources.keys_resource import KeysResource
import twisted.trial.unittest as unittest
from twisted.web.test.requesthelper import DummyRequest
+from twisted.internet import defer
from test.unit.resources import DummySite
@@ -15,7 +18,7 @@ class TestKeysResource(unittest.TestCase):
def test_returns_404_if_key_not_found(self):
request = DummyRequest(['/keys'])
request.addArg('search', 'some@inexistent.key')
- when(self.keymanager).get_key_from_cache('some@inexistent.key', OpenPGPKey).thenRaise(KeyNotFound())
+ when(self.keymanager).fetch_key('some@inexistent.key').thenReturn(defer.fail(KeyNotFound()))
d = self.web.get(request)
@@ -28,17 +31,30 @@ class TestKeysResource(unittest.TestCase):
def test_returns_the_key_as_json_if_found(self):
request = DummyRequest(['/keys'])
request.addArg('search', 'some@key')
- when(self.keymanager).get_key_from_cache('some@key', OpenPGPKey).thenReturn(OpenPGPKey('some@key'))
+ when(self.keymanager).fetch_key('some@key').thenReturn(defer.succeed(OpenPGPKey('some@key')))
d = self.web.get(request)
+ expected = {
+ "tags": ["keymanager-key"],
+ "fingerprint": '',
+ "private": False,
+ 'sign_used': False,
+ 'refreshed_at': 0,
+ "expiry_date": 0,
+ "address": 'some@key',
+ 'encr_used': False,
+ 'last_audited_at': 0,
+ 'key_data': '',
+ 'length': 0,
+ 'key_id': '',
+ 'validation': 'Weak_Chain',
+ 'type': 'OpenPGPKey',
+ }
+
def assert_response(_):
- self.assertEquals('"{\\"tags\\": [\\"keymanager-key\\"], \\"fingerprint\\": null, '
- '\\"private\\": null, \\"expiry_date\\": null, \\"address\\": '
- '\\"some@key\\", \\"last_audited_at\\": null, \\"key_data\\": null, '
- '\\"length\\": null, \\"key_id\\": null, \\"validation\\": null, '
- '\\"type\\": \\"<class \'leap.keymanager.openpgp.OpenPGPKey\'>\\", '
- '\\"first_seen_at\\": null}"', request.written[0])
+ actual = json.loads(ast.literal_eval(request.written[0]))
+ self.assertEquals(expected, actual)
d.addCallback(assert_response)
return d
@@ -46,7 +62,7 @@ class TestKeysResource(unittest.TestCase):
def test_returns_unauthorized_if_key_is_private(self):
request = DummyRequest(['/keys'])
request.addArg('search', 'some@key')
- when(self.keymanager).get_key_from_cache('some@key', OpenPGPKey).thenReturn(OpenPGPKey('some@key', private=True))
+ when(self.keymanager).fetch_key('some@key').thenReturn(defer.succeed(OpenPGPKey('some@key', private=True)))
d = self.web.get(request)
diff --git a/service/test/unit/support/test_encrypted_file_storage.py b/service/test/unit/support/test_encrypted_file_storage.py
index 2a6735c3..69b82f3d 100644
--- a/service/test/unit/support/test_encrypted_file_storage.py
+++ b/service/test/unit/support/test_encrypted_file_storage.py
@@ -25,9 +25,13 @@ class EncryptedFileStorageTest(unittest.TestCase):
self.key = '2\x06\xf87F:\xd2\xe2]w\xc9\x0c\xb8\x9b\x8e\xd3\x92\t\xabHu\xa6\xa3\x9a\x8d\xec\x0c\xab<8\xbb\x12\xfbP\xf2\x83"\xa1\xcf7\x92\xb0!\xfe\xebM\x80\x8a\x14\xe6\xf9xr\xf5#\x8f\x1bs\xb3#\x0e)a\xd8'
self.msg = 'this is a very, very secret binary message: \xbe\xba\xca\xfe'
self.path = os.path.join('tmp', 'search_test')
+ self._cleanup_path()
self.storage = EncryptedFileStorage(self.path, self.key)
def tearDown(self):
+ self._cleanup_path()
+
+ def _cleanup_path(self):
if os.path.exists(self.path):
shutil.rmtree(self.path)
diff --git a/service/test/unit/test_application.py b/service/test/unit/test_application.py
index b2799d4c..16317ee5 100644
--- a/service/test/unit/test_application.py
+++ b/service/test/unit/test_application.py
@@ -14,9 +14,11 @@ class ApplicationTest(unittest.TestCase):
self.sslcert = sslcert
self.home = 'leap_home'
+ @patch('leap.common.events.client')
@patch('pixelated.application.reactor')
@patch('pixelated.application.Services')
- def test_that_create_app_binds_to_tcp_port_if_no_ssl_options(self, services_mock, reactor_mock):
+ def test_that_create_app_binds_to_tcp_port_if_no_ssl_options(self, services_mock, reactor_mock, _):
+ # FIXME patch something closer, instead of leap.common
app_mock = MagicMock()
leap_session = MagicMock()
config = ApplicationTest.MockConfig(12345, '127.0.0.1', leap_session)
@@ -32,9 +34,11 @@ class ApplicationTest(unittest.TestCase):
d.addCallback(_assert)
return d
+ @patch('leap.common.events.client')
@patch('pixelated.application.reactor')
@patch('pixelated.application.Services')
- def test_that_create_app_binds_to_ssl_if_ssl_options(self, services_mock, reactor_mock):
+ def test_that_create_app_binds_to_ssl_if_ssl_options(self, services_mock, reactor_mock, _):
+ # FIXME patch something closer, instead of leap.common
app_mock = MagicMock()
leap_session = MagicMock()
pixelated.application._ssl_options = lambda x, y: 'options'
diff --git a/service/test/unit/test_welcome_mail.py b/service/test/unit/test_welcome_mail.py
new file mode 100644
index 00000000..7e9ab0c9
--- /dev/null
+++ b/service/test/unit/test_welcome_mail.py
@@ -0,0 +1,73 @@
+#
+# Copyright (c) 2014 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import unittest
+from mockito import verify, mock
+from mockito.matchers import Matcher
+from email import message_from_file
+from pixelated.config.leap import add_welcome_mail
+from pixelated.adapter.model.mail import InputMail
+
+
+class TestWelcomeMail(unittest.TestCase):
+
+ def test_add_welcome_mail(self):
+ mail_store = mock()
+ input_mail = self._get_welcome_mail()
+
+ add_welcome_mail(mail_store)
+ capture = WelcomeMailCapture()
+
+ verify(mail_store).add_mail('INBOX', capture)
+ capture.assert_mail(input_mail.raw)
+
+ def _get_welcome_mail(self):
+ current_path = os.path.dirname(os.path.abspath(__file__))
+ with open(os.path.join(current_path,
+ '..',
+ '..',
+ 'pixelated',
+ 'assets',
+ 'welcome.mail')) as mail_template_file:
+ mail_template = message_from_file(mail_template_file)
+
+ return InputMail.from_python_mail(mail_template)
+
+
+class WelcomeMailCapture(Matcher):
+
+ def matches(self, arg):
+ self.value = arg
+ return True
+
+ def assert_mail(self, mail):
+ captured_mail = self._format(self.value)
+ expected_mail = self._format(mail)
+ assert captured_mail == expected_mail
+
+ def _format(self, mail):
+ splitter = '\n'
+ arr = mail.split(splitter)
+ arr = self._remove_variable_value(arr)
+
+ return splitter.join(arr)
+
+ def _remove_variable_value(self, arr):
+ arr.pop(0)
+ arr.pop(6)
+ arr.pop(44)
+ return arr