summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--service/pixelated/adapter/model/mail.py244
-rw-r--r--service/test/support/integration/app_test_client.py3
-rw-r--r--service/test/support/test_helper.py8
-rw-r--r--service/test/unit/adapter/test_mail.py338
-rw-r--r--service/test/unit/adapter/test_mail_service.py4
5 files changed, 4 insertions, 593 deletions
diff --git a/service/pixelated/adapter/model/mail.py b/service/pixelated/adapter/model/mail.py
index 524ca48d..31eb0654 100644
--- a/service/pixelated/adapter/model/mail.py
+++ b/service/pixelated/adapter/model/mail.py
@@ -245,13 +245,10 @@ class InputMail(Mail):
input_mail = InputMail()
input_mail.headers = {key.capitalize(): value for key, value in mail_dict.get('header', {}).items()}
- # XXX this is overriding the property in PixelatedMail
input_mail.headers['Date'] = date.iso_now()
- # XXX this is overriding the property in PixelatedMail
input_mail.body = mail_dict.get('body', '')
- # XXX this is overriding the property in the PixelatedMail
input_mail.tags = set(mail_dict.get('tags', []))
input_mail._status = set(mail_dict.get('status', []))
@@ -272,247 +269,6 @@ class InputMail(Mail):
return input_mail
-class PixelatedMail(Mail):
-
- @staticmethod
- def from_soledad(fdoc, hdoc, bdoc, parts=None):
- mail = PixelatedMail()
- mail.parts = parts
- mail.boundary = str(uuid4()).replace('-', '')
- mail.bdoc = bdoc
- mail.fdoc = fdoc
- mail.hdoc = hdoc
- mail._mime = None
- return mail
-
- def _decode_part(self, part):
- encoding = part['headers'].get('Content-Transfer-Encoding', '')
- content_type = self._parse_charset_header(part['headers'].get('Content-Type'))
-
- try:
- decoding_func = self._decoding_function_for_encoding(encoding)
- return self._decode_content_with_fallback(part['content'], decoding_func, content_type)
- except Exception:
- logger.error('Failed to decode mail part with:')
- logger.error('Content-Transfer-Encoding: %s' % encoding)
- logger.error('Content-Type: %s' % part['headers'].get('Content-Type'))
- raise
-
- def _decoding_function_for_encoding(self, encoding):
- decoding_map = {
- 'quoted-printable': lambda content, content_type: content.decode('quopri').decode(content_type),
- 'base64': lambda content, content_type: content.decode('base64').decode('utf-8'),
- '7bit': lambda content, content_type: content.encode(content_type),
- '8bit': lambda content, content_type: content.encode(content_type)
- }
- if encoding in decoding_map:
- return decoding_map[encoding]
- else:
- return decoding_map['8bit']
-
- def _decode_content_with_fallback(self, content, decode_func, content_type):
- try:
- return decode_func(content, content_type)
- # return content.encode(content_type)
- except ValueError:
- return content.encode('ascii', 'ignore')
-
- @property
- def alternatives(self):
- return self.parts.get('alternatives')
-
- @property
- def text_plain_body(self):
- if self.parts and len(self.alternatives) >= 1:
- return self._decode_part(self.alternatives[0])
- else:
- return self.bdoc.content['raw'] # plain
-
- @property
- def html_body(self):
- if self.parts and len(self.alternatives) > 1:
- html_parts = [e for e in self.alternatives if re.match('text/html', e['headers'].get('Content-Type', ''))]
- if len(html_parts):
- return self._decode_part(html_parts[0])
-
- @property
- def headers(self):
- _headers = {
- 'To': [],
- 'Cc': [],
- 'Bcc': []
- }
- hdoc_headers = self.hdoc.content['headers']
-
- for header in ['To', 'Cc', 'Bcc']:
- header_value = self._decode_header(hdoc_headers.get(header))
- if not header_value:
- continue
- _headers[header] = header_value if type(header_value) is list else header_value.split(',')
- _headers[header] = [head.strip() for head in compact(_headers[header])]
-
- for header in ['From', 'Subject']:
- _headers[header] = self._decode_header(hdoc_headers.get(header))
-
- try:
- _headers['Date'] = self._get_date()
- except Exception:
- _headers['Date'] = date.iso_now()
-
- if self.parts and len(self.parts['alternatives']) > 1:
- _headers['content_type'] = 'multipart/alternative; boundary="%s"' % self.boundary
- elif self.hdoc.content['headers'].get('Content-Type'):
- _headers['content_type'] = hdoc_headers.get('Content-Type')
-
- if hdoc_headers.get('Reply-To'):
- _headers['Reply-To'] = hdoc_headers.get('Reply-To')
-
- return _headers
-
- def _decode_header_with_fallback(self, entry):
- try:
- return decode_header(entry)[0][0]
- except Exception:
- return entry.encode('ascii', 'ignore')
-
- def _decode_header(self, header):
- if not header:
- return None
- if isinstance(header, list):
- return [self._decode_header_with_fallback(entry) for entry in header]
- else:
- return self._decode_header_with_fallback(header)
-
- def _get_date(self):
- date = self.hdoc.content.get('date', None)
- try:
- if not date:
- received = self.hdoc.content.get('received', None)
- if received:
- date = received.split(";")[-1].strip()
- else:
- # we can't get a date for this mail, so lets just use now
- logger.warning('Encountered a mail with missing date and received header fields. ID %s' % self.fdoc.content.get('uid', None))
- date = date.iso_now()
- return dateparser.parse(date).isoformat()
- except (ValueError, TypeError):
- date = date.iso_now()
- return dateparser.parse(date).isoformat()
-
- @property
- def security_casing(self):
- casing = {"imprints": [], "locks": []}
- casing["imprints"] = self.signature_information
- if self.encrypted == "true":
- casing["locks"] = [{"state": "valid"}]
- elif self.encrypted == "fail":
- casing["locks"] = [{"state": "failure"}]
- return casing
-
- @property
- def tags(self):
- _tags = self.fdoc.content.get('tags', '[]')
- return set(_tags) if type(_tags) is list or type(_tags) is set else set(json.loads(_tags))
-
- @property
- def ident(self):
- return self.fdoc.content.get('chash')
-
- @property
- def mailbox_name(self):
- # FIXME mbox is no longer available, instead we now have mbox_uuid
- return self.fdoc.content.get('mbox', 'INBOX')
-
- @property
- def is_recent(self):
- return Status('recent') in self.status
-
- @property
- def uid(self):
- return self.fdoc.content['uid']
-
- @property
- def flags(self):
- return self.fdoc.content['flags']
-
- def set_mailbox(self, mailbox_name):
- self.fdoc.content['mbox'] = mailbox_name
-
- def has_tag(self, tag):
- return tag in self.tags
-
- @property
- def signature_information(self):
- signature = self.hdoc.content["headers"].get("X-Leap-Signature", None)
- if signature is None or signature.startswith("could not verify"):
- return [{"state": "no_signature_information"}]
- else:
- if signature.startswith("valid"):
- return [{"state": "valid", "seal": {"validity": "valid"}}]
- else:
- return []
-
- @property
- def encrypted(self):
- return self.hdoc.content["headers"].get("X-Pixelated-encryption-status", "false")
-
- @property
- def bounced(self):
- content_type = self.hdoc.content["headers"].get("Content-Type", '')
- if re.compile('delivery-status').search(content_type):
- bounce_recipient = self._extract_bounced_address(self.hdoc.content)
- bounce_daemon = self.headers["From"]
- return [bounce_recipient, bounce_daemon] if bounce_recipient else False
-
- return False
-
- def _extract_bounced_address(self, part):
- part_header = dict(part.get('headers', {}))
- if 'Final-Recipient' in part_header:
- if self._bounce_permanent(part_header):
- return part_header['Final-Recipient'].split(';')[1].strip()
- else:
- return False
- elif 'part_map' in part:
- for subpart in part['part_map'].values():
- result = self._extract_bounced_address(subpart)
- if result:
- return result
- else:
- continue
- return False
-
- def _bounce_permanent(self, part_headers):
- status = part_headers.get('Status', '')
- return status.startswith('5')
-
- def as_dict(self):
- dict_mail = {'header': {k.lower(): v for k, v in self.headers.items()},
- 'ident': self.ident,
- 'tags': list(self.tags),
- 'status': list(self.status),
- 'security_casing': self.security_casing,
- 'textPlainBody': self.text_plain_body,
- 'htmlBody': self.html_body,
- 'mailbox': self.mailbox_name.lower(),
- 'attachments': self.parts['attachments'] if self.parts else []}
- dict_mail['replying'] = {'single': None, 'all': {'to-field': [], 'cc-field': []}}
-
- sender_mail = self.headers.get('Reply-To', self.headers.get('From'))
- # Issue #215: Fix for existing mails without any from address.
- if sender_mail is None:
- sender_mail = InputMail.FROM_EMAIL_ADDRESS
-
- recipients = [recipient for recipient in self.headers['To'] if recipient != InputMail.FROM_EMAIL_ADDRESS]
- recipients.append(sender_mail)
- ccs = [cc for cc in self.headers['Cc'] if cc != InputMail.FROM_EMAIL_ADDRESS]
-
- dict_mail['replying']['single'] = sender_mail
- dict_mail['replying']['all']['to-field'] = recipients
- dict_mail['replying']['all']['cc-field'] = ccs
- return dict_mail
-
-
def welcome_mail():
current_path = os.path.dirname(os.path.abspath(__file__))
with open(os.path.join(current_path, '..', '..', 'assets', 'welcome.mail')) as mail_template_file:
diff --git a/service/test/support/integration/app_test_client.py b/service/test/support/integration/app_test_client.py
index 0b3e71a2..3245e728 100644
--- a/service/test/support/integration/app_test_client.py
+++ b/service/test/support/integration/app_test_client.py
@@ -36,7 +36,6 @@ from pixelated.config.site import PixelatedSite
from pixelated.adapter.mailstore import LeapMailStore
from pixelated.adapter.mailstore.searchable_mailstore import SearchableMailStore
-from pixelated.adapter.model.mail import PixelatedMail
from pixelated.adapter.search import SearchEngine
from pixelated.adapter.services.draft_service import DraftService
from pixelated.adapter.services.mail_service import MailService
@@ -62,8 +61,6 @@ class AppTestClient(object):
self.cleanup = lambda: shutil.rmtree(soledad_test_folder)
- PixelatedMail.from_email_address = self.MAIL_ADDRESS
-
self.soledad = yield initialize_soledad(tempdir=soledad_test_folder)
self.keymanager = mock()
diff --git a/service/test/support/test_helper.py b/service/test/support/test_helper.py
index 8f2a5308..21f59d8f 100644
--- a/service/test/support/test_helper.py
+++ b/service/test/support/test_helper.py
@@ -17,7 +17,7 @@ from datetime import datetime
import io
from twisted.web.test.test_web import DummyRequest
-from pixelated.adapter.model.mail import InputMail, PixelatedMail
+from pixelated.adapter.model.mail import InputMail
LEAP_FLAGS = ['\\Seen',
@@ -69,12 +69,6 @@ def leap_mail(uid=0, flags=LEAP_FLAGS, headers=None, extra_headers={}, mbox_uuid
return (fdoc, hdoc, bdoc)
-def pixelated_mail(uid=0, flags=LEAP_FLAGS, headers=None, extra_headers={}, mbox_uuid='INBOX', body='body', chash='chash'):
- fdoc, hdoc, bdoc = leap_mail(uid, flags, headers, extra_headers, mbox_uuid, body, chash)
-
- return PixelatedMail.from_soledad(fdoc, hdoc, bdoc)
-
-
def input_mail():
mail = InputMail()
mail.fdoc = TestDoc({})
diff --git a/service/test/unit/adapter/test_mail.py b/service/test/unit/adapter/test_mail.py
index 4b0160ac..bd6ed25b 100644
--- a/service/test/unit/adapter/test_mail.py
+++ b/service/test/unit/adapter/test_mail.py
@@ -17,7 +17,7 @@
from twisted.trial import unittest
import pixelated.support.date
-from pixelated.adapter.model.mail import PixelatedMail, InputMail, HEADERS_KEY
+from pixelated.adapter.model.mail import InputMail, HEADERS_KEY
from mockito import mock, unstub, when
from test.support import test_helper
import dateutil.parser as dateparser
@@ -30,342 +30,6 @@ import pkg_resources
from twisted.internet import defer
-class TestPixelatedMail(unittest.TestCase):
- def tearDown(self):
- unstub()
-
- def test_parse_date_from_soledad_uses_date_header_if_available(self):
- leap_mail_date = 'Wed, 3 Sep 2014 12:36:17 -0300'
- leap_mail_date_in_iso_format = "2014-09-03T12:36:17-03:00"
-
- leap_mail = test_helper.leap_mail(headers={'date': leap_mail_date})
-
- mail = PixelatedMail.from_soledad(*leap_mail)
-
- self.assertEqual(str(mail.headers['Date']), leap_mail_date_in_iso_format)
-
- def test_parse_date_from_soledad_fallback_to_received_header_if_date_header_isnt_available(self):
- leap_mail_date = "Wed, 03 Sep 2014 13:11:15 -0300"
- leap_mail_date_in_iso_format = "2014-09-03T13:11:15-03:00"
- leap_mail_received_header = "by bitmask.local from 127.0.0.1 with ESMTP ;\n " + leap_mail_date
-
- leap_mail = test_helper.leap_mail(headers={'received': leap_mail_received_header})
-
- mail = PixelatedMail.from_soledad(*leap_mail)
-
- self.assertEqual(str(mail.headers['Date']), leap_mail_date_in_iso_format)
-
- def test_parse_date_from_soledad_fallback_to_now_if_neither_date_nor_received_header(self):
- leap_mail_date_in_iso_format = "2014-09-03T13:11:15-03:00"
-
- when(pixelated.support.date).iso_now().thenReturn(leap_mail_date_in_iso_format)
- fdoc, hdoc, bdoc = test_helper.leap_mail()
- del hdoc.content['date']
-
- mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc)
-
- self.assertEqual(str(mail.headers['Date']), leap_mail_date_in_iso_format)
-
- def test_use_datetime_now_as_fallback_for_invalid_date(self):
- leap_mail_date = u'söme däte'
- date_expected = "2014-09-03T13:11:15-03:00"
-
- when(pixelated.support.date).iso_now().thenReturn(date_expected)
- leap_mail = test_helper.leap_mail(headers={'date': leap_mail_date})
-
- mail = PixelatedMail.from_soledad(*leap_mail)
-
- self.assertEqual(str(mail.headers['Date']), date_expected)
-
- def test_fall_back_to_ascii_if_invalid_received_header(self):
- leap_mail_received_header = u"söme invalid received heäder\n"
- date_expected = "2014-09-03T13:11:15-03:00"
-
- when(pixelated.support.date).iso_now().thenReturn(date_expected)
- leap_mail = test_helper.leap_mail(headers={'received': leap_mail_received_header})
-
- mail = PixelatedMail.from_soledad(*leap_mail)
-
- self.assertEqual(mail.headers['Date'], date_expected)
-
- def test_get_for_save_adds_from(self):
- InputMail.FROM_EMAIL_ADDRESS = 'me@pixelated.org'
- headers = {'Subject': 'The subject',
- 'Date': str(datetime.now()),
- 'To': 'me@pixelated.org'}
-
- input_mail = InputMail()
- input_mail.headers = headers
-
- self.assertEqual('me@pixelated.org', input_mail.get_for_save(1, 'SENT')[1][HEADERS_KEY]['From'])
-
- def test_as_dict(self):
- headers = {'Subject': 'The subject',
- 'From': 'someone@pixelated.org',
- 'To': 'me@pixelated.org'}
- fdoc, hdoc, bdoc = test_helper.leap_mail(flags=['\\Recent'],
- extra_headers=headers)
-
- InputMail.FROM_EMAIL_ADDRESS = 'me@pixelated.org'
-
- mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc)
-
- _dict = mail.as_dict()
-
- self.maxDiff = None
-
- self.assertEquals(_dict, {'htmlBody': None,
- 'textPlainBody': 'body',
- 'header': {
- 'date': dateparser.parse(hdoc.content['date']).isoformat(),
- 'from': 'someone@pixelated.org',
- 'subject': 'The subject',
- 'to': ['me@pixelated.org'],
- 'cc': [],
- 'bcc': []
- },
- 'ident': 'chash',
- 'mailbox': 'inbox',
- 'security_casing': {'imprints': [{'state': 'no_signature_information'}], 'locks': []},
- 'status': ['recent'],
- 'tags': [],
- 'attachments': [],
- 'replying': {
- 'single': 'someone@pixelated.org',
- 'all': {
- 'to-field': ['someone@pixelated.org'],
- 'cc-field': []
- }
- }})
-
- def test_use_reply_to_address_for_replying(self):
- headers = {'Subject': 'The subject',
- 'From': 'someone@pixelated.org',
- 'Reply-To': 'reply-to-this-address@pixelated.org',
- 'To': 'me@pixelated.org, \nalice@pixelated.org'}
- fdoc, hdoc, bdoc = test_helper.leap_mail(flags=['\\Recent'],
- extra_headers=headers)
-
- InputMail.FROM_EMAIL_ADDRESS = 'me@pixelated.org'
-
- mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc)
-
- _dict = mail.as_dict()
-
- self.assertEquals(_dict['replying'], {'single': 'reply-to-this-address@pixelated.org',
- 'all': {
- 'to-field': ['alice@pixelated.org', 'reply-to-this-address@pixelated.org'],
- 'cc-field': []
- }})
-
- def test_alternatives_body(self):
- parts = {'alternatives': [], 'attachments': []}
- parts['alternatives'].append({'content': 'blablabla', 'headers': {'Content-Type': 'text/plain'}})
- parts['alternatives'].append({'content': '<p>blablabla</p>', 'headers': {'Content-Type': 'text/html'}})
-
- mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw='blablabla'), parts=parts)
-
- self.assertRegexpMatches(mail.html_body, '^<p>blablabla</p>$')
- self.assertRegexpMatches(mail.text_plain_body, '^blablabla$')
-
- def test_html_is_none_if_multiple_alternatives_have_no_html_part(self):
- parts = {
- 'attachments': [],
- 'alternatives': [
- {'content': u'content', 'headers': {u'Content-Type': u'text/plain; charset=us-ascii'}},
- {'content': u'', 'headers': {u'Some info': u'info'}}]}
-
- mail = PixelatedMail.from_soledad(None, None, None, parts=parts)
- self.assertIsNone(mail.html_body)
-
- def test_percent_character_is_allowed_on_body(self):
- parts = {'alternatives': [], 'attachments': []}
- parts['alternatives'].append({'content': '100% happy with percentage symbol', 'headers': {'Content-Type': 'text/plain'}})
- parts['alternatives'].append({'content': '<p>100% happy with percentage symbol</p>', 'headers': {'Content-Type': 'text/html'}})
-
- mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw="100% happy with percentage symbol"), parts=parts)
-
- self.assertRegexpMatches(mail.text_plain_body, '([\s\S]*100%)')
- self.assertRegexpMatches(mail.html_body, '([\s\S]*100%)')
-
- def test_content_type_header_of_mail_part_is_used(self):
- plain_headers = {'Content-Type': 'text/plain; charset=iso-8859-1', 'Content-Transfer-Encoding': 'quoted-printable'}
- html_headers = {'Content-Type': 'text/html; charset=utf-8', 'Content-Transfer-Encoding': 'quoted-printable'}
- parts = {'alternatives': [{'content': 'H=E4llo', 'headers': plain_headers}, {'content': '<p>H=C3=A4llo</p>', 'headers': html_headers}]}
-
- mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw='some raw body'), parts=parts)
-
- self.assertEqual(2, len(mail.alternatives))
- self.assertEquals(u'H\xe4llo', mail.text_plain_body)
- self.assertEquals(u'<p>H\xe4llo</p>', mail.html_body)
-
- def test_multi_line_content_type_header_is_supported(self):
- plain_headers = {'Content-Type': 'text/plain;\ncharset=iso-8859-1', 'Content-Transfer-Encoding': 'quoted-printable'}
- html_headers = {'Content-Type': 'text/html;\ncharset=utf-8', 'Content-Transfer-Encoding': 'quoted-printable'}
- parts = {'alternatives': [{'content': 'H=E4llo', 'headers': plain_headers}, {'content': '<p>H=C3=A4llo</p>', 'headers': html_headers}]}
-
- mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw='some raw body'), parts=parts)
-
- self.assertEqual(2, len(mail.alternatives))
- self.assertEquals(u'H\xe4llo', mail.text_plain_body)
- self.assertEquals(u'<p>H\xe4llo</p>', mail.html_body)
-
- def test_broken_content_type_defaults_to_usascii(self):
- plain_headers = {'Content-Type': 'I lie to you', 'Content-Transfer-Encoding': 'quoted-printable'}
- html_headers = {'Content-Type': 'text/html;\ncharset=utf-8', 'Content-Transfer-Encoding': 'quoted-printable'}
- parts = {'alternatives': [{'content': 'H=E4llo', 'headers': plain_headers}, {'content': '<p>H=C3=A4llo</p>', 'headers': html_headers}]}
-
- mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw='some raw body'), parts=parts)
-
- self.assertEquals(u'H=E4llo', mail.text_plain_body)
-
- def test_broken_encoding_defaults_to_8bit(self):
- plain_headers = {'Content-Type': 'text/plain;\ncharset=iso-8859-1', 'Content-Transfer-Encoding': 'I lie to you!'}
- html_headers = {'Content-Type': 'text/html;\ncharset=utf-8', 'Content-Transfer-Encoding': 'quoted-printable'}
- parts = {'alternatives': [{'content': 'H=E4llo', 'headers': plain_headers}, {'content': '<p>H=C3=A4llo</p>', 'headers': html_headers}]}
-
- mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw='some raw body'), parts=parts)
-
- self.assertEquals(u'H=E4llo', mail.text_plain_body)
- self.assertEquals(u'<p>H\xe4llo</p>', mail.html_body)
-
- def test_clean_line_breaks_on_address_headers(self):
- many_recipients = 'One <one@mail.com>,\nTwo <two@mail.com>, Normal <normal@mail.com>,\nalone@mail.com'
- headers = {'Cc': many_recipients,
- 'Bcc': many_recipients,
- 'To': many_recipients}
- fdoc, hdoc, bdoc = test_helper.leap_mail(flags=['\\Recent'],
- extra_headers=headers)
-
- mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc)
-
- for header_label in ['To', 'Cc', 'Bcc']:
- for address in mail.headers[header_label]:
- self.assertNotIn('\n', address)
- self.assertNotIn(',', address)
- self.assertEquals(4, len(mail.headers[header_label]))
-
- def test_that_body_understands_base64(self):
- body = u'bl\xe1'
- encoded_body = unicode(body.encode('utf-8').encode('base64'))
-
- fdoc, hdoc, bdoc = test_helper.leap_mail()
- parts = {'alternatives': []}
- parts['alternatives'].append({'content': encoded_body, 'headers': {'Content-Transfer-Encoding': 'base64'}})
- mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, parts=parts)
-
- self.assertEquals(body, mail.text_plain_body)
-
- def test_that_body_understands_7bit(self):
- body = u'testtext'
- encoded_body = body
-
- fdoc, hdoc, bdoc = test_helper.leap_mail()
- parts = {'alternatives': []}
- parts['alternatives'].append({'content': encoded_body, 'headers': {'Content-Transfer-Encoding': '7bit'}})
- mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, parts=parts)
-
- self.assertEquals(body, mail.text_plain_body)
-
- def test_that_body_understands_8bit(self):
- body = u'testtext'
- encoded_body = body
-
- fdoc, hdoc, bdoc = test_helper.leap_mail()
- parts = {'alternatives': []}
- parts['alternatives'].append({'content': encoded_body, 'headers': {'Content-Transfer-Encoding': '8bit'}})
- mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, parts=parts)
-
- self.assertEquals(body, mail.text_plain_body)
-
- def test_bounced_mails_are_recognized(self):
- bounced_mail_hdoc = pkg_resources.resource_filename('test.unit.fixtures', 'bounced_mail_hdoc.json')
- with open(bounced_mail_hdoc) as f:
- hdoc = json.loads(f.read())
-
- bounced_leap_mail = test_helper.leap_mail()
- bounced_leap_mail[1].content = hdoc
- bounced_mail = PixelatedMail.from_soledad(*bounced_leap_mail)
-
- not_bounced_leap_mail = test_helper.leap_mail()
- not_bounced_mail = PixelatedMail.from_soledad(*not_bounced_leap_mail)
-
- self.assertTrue(bounced_mail.bounced)
- self.assertIn('this_mail_was_bounced@domain.com', bounced_mail.bounced)
- self.assertIn("MAILER-DAEMON@domain.org (Mail Delivery System)", bounced_mail.bounced)
- self.assertFalse(not_bounced_mail.bounced)
-
- def test_ignore_transient_failures(self):
- """
- Persistent errors should start with 5.
- See: http://www.iana.org/assignments/smtp-enhanced-status-codes/smtp-enhanced-status-codes.xhtml
- """
- bounced_mail_hdoc = pkg_resources.resource_filename('test.unit.fixtures', 'bounced_mail_hdoc.json')
- with open(bounced_mail_hdoc) as f:
- content = f.read()
- # Change status to 4.XXX.YYY (only the first number is relevant here)
- content = content.replace("5.1.1", "4.X.Y")
- hdoc = json.loads(content)
-
- temporary_bounced_leap_mail = test_helper.leap_mail()
- temporary_bounced_leap_mail[1].content = hdoc
- temporary_bounced_mail = PixelatedMail.from_soledad(*temporary_bounced_leap_mail)
-
- not_bounced_leap_mail = test_helper.leap_mail()
- not_bounced_mail = PixelatedMail.from_soledad(*not_bounced_leap_mail)
-
- self.assertFalse(temporary_bounced_mail.bounced)
- self.assertFalse(not_bounced_mail.bounced)
-
- def _create_bdoc(self, raw):
- class FakeBDoc:
- def __init__(self, raw):
- self.content = {'raw': raw}
- return FakeBDoc(raw)
-
- def test_encoding_special_character_on_header(self):
- subject = "=?UTF-8?Q?test_encoding_St=C3=A4ch?="
- email_from = "=?UTF-8?Q?St=C3=A4ch_<stach@pixelated-project.org>?="
- email_to = "=?utf-8?b?IsOEw7zDtiDDlsO8w6QiIDxmb2xrZXJAcGl4ZWxhdGVkLXByb2plY3Qub3Jn?=\n =?utf-8?b?PiwgRsO2bGtlciA8Zm9sa2VyQHBpeGVsYXRlZC1wcm9qZWN0Lm9yZz4=?="
-
- pixel_mail = PixelatedMail()
-
- self.assertEqual(pixel_mail._decode_header(subject), 'test encoding St\xc3\xa4ch')
- self.assertEqual(pixel_mail._decode_header(email_from), 'St\xc3\xa4ch <stach@pixelated-project.org>')
- self.assertEqual(pixel_mail._decode_header(email_to), '"\xc3\x84\xc3\xbc\xc3\xb6 \xc3\x96\xc3\xbc\xc3\xa4" <folker@pixelated-project.org>, F\xc3\xb6lker <folker@pixelated-project.org>')
- self.assertEqual(pixel_mail._decode_header(None), None)
-
- def test_headers_are_encoded_right(self):
- subject = "=?UTF-8?Q?test_encoding_St=C3=A4ch?="
- email_from = "=?UTF-8?Q?St=C3=A4ch_<stach@pixelated-project.org>?="
- email_to = "=?utf-8?b?IsOEw7zDtiDDlsO8w6QiIDxmb2xrZXJAcGl4ZWxhdGVkLXByb2plY3Qub3Jn?=\n =?utf-8?b?PiwgRsO2bGtlciA8Zm9sa2VyQHBpeGVsYXRlZC1wcm9qZWN0Lm9yZz4=?="
- email_cc = "=?UTF-8?Q?St=C3=A4ch_<stach@pixelated-project.org>?="
- email_bcc = "=?UTF-8?Q?St=C3=A4ch_<stach@pixelated-project.org>?="
-
- leap_mail = test_helper.leap_mail(extra_headers={'Subject': subject, 'From': email_from, 'To': email_to, 'Cc': email_cc, 'Bcc': email_bcc})
-
- mail = PixelatedMail.from_soledad(*leap_mail)
-
- self.assertEqual(str(mail.headers['Subject']), 'test encoding St\xc3\xa4ch')
- self.assertEqual(str(mail.headers['From']), 'St\xc3\xa4ch <stach@pixelated-project.org>')
- self.assertEqual(mail.headers['To'], ['"\xc3\x84\xc3\xbc\xc3\xb6 \xc3\x96\xc3\xbc\xc3\xa4" <folker@pixelated-project.org>', 'F\xc3\xb6lker <folker@pixelated-project.org>'])
- self.assertEqual(mail.headers['Cc'], ['St\xc3\xa4ch <stach@pixelated-project.org>'])
- self.assertEqual(mail.headers['Bcc'], ['St\xc3\xa4ch <stach@pixelated-project.org>'])
-
- mail.as_dict()
-
- def test_parse_UTF8_headers_with_CharsetAscii(self):
- leap_mail_from = u'"söme ümläuds" <lisa5@dev.pixelated-project.org>'
- leap_mail_to = u'"söme ümläuds" <lisa5@dev.pixelated-project.org>,\n"söme ümläuds" <lisa5@dev.pixelated-project.org>'
-
- leap_mail = test_helper.leap_mail(extra_headers={'From': leap_mail_from, 'Subject': "some subject", 'To': leap_mail_to})
-
- mail = PixelatedMail.from_soledad(*leap_mail)
-
- mail.headers['From'].encode('ascii')
- self.assertEqual(mail.headers['To'], ['"sme mluds" <lisa5@dev.pixelated-project.org>', '"sme mluds" <lisa5@dev.pixelated-project.org>'])
-
-
def simple_mail_dict():
return {
'body': 'Este \xe9 o corpo',
diff --git a/service/test/unit/adapter/test_mail_service.py b/service/test/unit/adapter/test_mail_service.py
index 5c035fc8..6faf5140 100644
--- a/service/test/unit/adapter/test_mail_service.py
+++ b/service/test/unit/adapter/test_mail_service.py
@@ -15,7 +15,7 @@
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
from twisted.trial import unittest
from pixelated.adapter.mailstore.leap_mailstore import LeapMail
-from pixelated.adapter.model.mail import InputMail, PixelatedMail
+from pixelated.adapter.model.mail import InputMail
from pixelated.adapter.model.status import Status
from pixelated.adapter.services.mail_service import MailService
@@ -134,7 +134,7 @@ class TestMailService(unittest.TestCase):
@defer.inlineCallbacks
def test_recover_mail(self):
- mail_to_recover = PixelatedMail.from_soledad(*leap_mail())
+ mail_to_recover = LeapMail(1, 'TRASH')
when(self.mail_service).mail(1).thenReturn(mail_to_recover)
when(self.mail_store).move_mail_to_mailbox(1, 'INBOX').thenReturn(mail_to_recover)