summaryrefslogtreecommitdiff
path: root/service/pixelated/adapter/model/mail.py
diff options
context:
space:
mode:
Diffstat (limited to 'service/pixelated/adapter/model/mail.py')
-rw-r--r--service/pixelated/adapter/model/mail.py244
1 files changed, 0 insertions, 244 deletions
diff --git a/service/pixelated/adapter/model/mail.py b/service/pixelated/adapter/model/mail.py
index 524ca48d..31eb0654 100644
--- a/service/pixelated/adapter/model/mail.py
+++ b/service/pixelated/adapter/model/mail.py
@@ -245,13 +245,10 @@ class InputMail(Mail):
input_mail = InputMail()
input_mail.headers = {key.capitalize(): value for key, value in mail_dict.get('header', {}).items()}
- # XXX this is overriding the property in PixelatedMail
input_mail.headers['Date'] = date.iso_now()
- # XXX this is overriding the property in PixelatedMail
input_mail.body = mail_dict.get('body', '')
- # XXX this is overriding the property in the PixelatedMail
input_mail.tags = set(mail_dict.get('tags', []))
input_mail._status = set(mail_dict.get('status', []))
@@ -272,247 +269,6 @@ class InputMail(Mail):
return input_mail
-class PixelatedMail(Mail):
-
- @staticmethod
- def from_soledad(fdoc, hdoc, bdoc, parts=None):
- mail = PixelatedMail()
- mail.parts = parts
- mail.boundary = str(uuid4()).replace('-', '')
- mail.bdoc = bdoc
- mail.fdoc = fdoc
- mail.hdoc = hdoc
- mail._mime = None
- return mail
-
- def _decode_part(self, part):
- encoding = part['headers'].get('Content-Transfer-Encoding', '')
- content_type = self._parse_charset_header(part['headers'].get('Content-Type'))
-
- try:
- decoding_func = self._decoding_function_for_encoding(encoding)
- return self._decode_content_with_fallback(part['content'], decoding_func, content_type)
- except Exception:
- logger.error('Failed to decode mail part with:')
- logger.error('Content-Transfer-Encoding: %s' % encoding)
- logger.error('Content-Type: %s' % part['headers'].get('Content-Type'))
- raise
-
- def _decoding_function_for_encoding(self, encoding):
- decoding_map = {
- 'quoted-printable': lambda content, content_type: content.decode('quopri').decode(content_type),
- 'base64': lambda content, content_type: content.decode('base64').decode('utf-8'),
- '7bit': lambda content, content_type: content.encode(content_type),
- '8bit': lambda content, content_type: content.encode(content_type)
- }
- if encoding in decoding_map:
- return decoding_map[encoding]
- else:
- return decoding_map['8bit']
-
- def _decode_content_with_fallback(self, content, decode_func, content_type):
- try:
- return decode_func(content, content_type)
- # return content.encode(content_type)
- except ValueError:
- return content.encode('ascii', 'ignore')
-
- @property
- def alternatives(self):
- return self.parts.get('alternatives')
-
- @property
- def text_plain_body(self):
- if self.parts and len(self.alternatives) >= 1:
- return self._decode_part(self.alternatives[0])
- else:
- return self.bdoc.content['raw'] # plain
-
- @property
- def html_body(self):
- if self.parts and len(self.alternatives) > 1:
- html_parts = [e for e in self.alternatives if re.match('text/html', e['headers'].get('Content-Type', ''))]
- if len(html_parts):
- return self._decode_part(html_parts[0])
-
- @property
- def headers(self):
- _headers = {
- 'To': [],
- 'Cc': [],
- 'Bcc': []
- }
- hdoc_headers = self.hdoc.content['headers']
-
- for header in ['To', 'Cc', 'Bcc']:
- header_value = self._decode_header(hdoc_headers.get(header))
- if not header_value:
- continue
- _headers[header] = header_value if type(header_value) is list else header_value.split(',')
- _headers[header] = [head.strip() for head in compact(_headers[header])]
-
- for header in ['From', 'Subject']:
- _headers[header] = self._decode_header(hdoc_headers.get(header))
-
- try:
- _headers['Date'] = self._get_date()
- except Exception:
- _headers['Date'] = date.iso_now()
-
- if self.parts and len(self.parts['alternatives']) > 1:
- _headers['content_type'] = 'multipart/alternative; boundary="%s"' % self.boundary
- elif self.hdoc.content['headers'].get('Content-Type'):
- _headers['content_type'] = hdoc_headers.get('Content-Type')
-
- if hdoc_headers.get('Reply-To'):
- _headers['Reply-To'] = hdoc_headers.get('Reply-To')
-
- return _headers
-
- def _decode_header_with_fallback(self, entry):
- try:
- return decode_header(entry)[0][0]
- except Exception:
- return entry.encode('ascii', 'ignore')
-
- def _decode_header(self, header):
- if not header:
- return None
- if isinstance(header, list):
- return [self._decode_header_with_fallback(entry) for entry in header]
- else:
- return self._decode_header_with_fallback(header)
-
- def _get_date(self):
- date = self.hdoc.content.get('date', None)
- try:
- if not date:
- received = self.hdoc.content.get('received', None)
- if received:
- date = received.split(";")[-1].strip()
- else:
- # we can't get a date for this mail, so lets just use now
- logger.warning('Encountered a mail with missing date and received header fields. ID %s' % self.fdoc.content.get('uid', None))
- date = date.iso_now()
- return dateparser.parse(date).isoformat()
- except (ValueError, TypeError):
- date = date.iso_now()
- return dateparser.parse(date).isoformat()
-
- @property
- def security_casing(self):
- casing = {"imprints": [], "locks": []}
- casing["imprints"] = self.signature_information
- if self.encrypted == "true":
- casing["locks"] = [{"state": "valid"}]
- elif self.encrypted == "fail":
- casing["locks"] = [{"state": "failure"}]
- return casing
-
- @property
- def tags(self):
- _tags = self.fdoc.content.get('tags', '[]')
- return set(_tags) if type(_tags) is list or type(_tags) is set else set(json.loads(_tags))
-
- @property
- def ident(self):
- return self.fdoc.content.get('chash')
-
- @property
- def mailbox_name(self):
- # FIXME mbox is no longer available, instead we now have mbox_uuid
- return self.fdoc.content.get('mbox', 'INBOX')
-
- @property
- def is_recent(self):
- return Status('recent') in self.status
-
- @property
- def uid(self):
- return self.fdoc.content['uid']
-
- @property
- def flags(self):
- return self.fdoc.content['flags']
-
- def set_mailbox(self, mailbox_name):
- self.fdoc.content['mbox'] = mailbox_name
-
- def has_tag(self, tag):
- return tag in self.tags
-
- @property
- def signature_information(self):
- signature = self.hdoc.content["headers"].get("X-Leap-Signature", None)
- if signature is None or signature.startswith("could not verify"):
- return [{"state": "no_signature_information"}]
- else:
- if signature.startswith("valid"):
- return [{"state": "valid", "seal": {"validity": "valid"}}]
- else:
- return []
-
- @property
- def encrypted(self):
- return self.hdoc.content["headers"].get("X-Pixelated-encryption-status", "false")
-
- @property
- def bounced(self):
- content_type = self.hdoc.content["headers"].get("Content-Type", '')
- if re.compile('delivery-status').search(content_type):
- bounce_recipient = self._extract_bounced_address(self.hdoc.content)
- bounce_daemon = self.headers["From"]
- return [bounce_recipient, bounce_daemon] if bounce_recipient else False
-
- return False
-
- def _extract_bounced_address(self, part):
- part_header = dict(part.get('headers', {}))
- if 'Final-Recipient' in part_header:
- if self._bounce_permanent(part_header):
- return part_header['Final-Recipient'].split(';')[1].strip()
- else:
- return False
- elif 'part_map' in part:
- for subpart in part['part_map'].values():
- result = self._extract_bounced_address(subpart)
- if result:
- return result
- else:
- continue
- return False
-
- def _bounce_permanent(self, part_headers):
- status = part_headers.get('Status', '')
- return status.startswith('5')
-
- def as_dict(self):
- dict_mail = {'header': {k.lower(): v for k, v in self.headers.items()},
- 'ident': self.ident,
- 'tags': list(self.tags),
- 'status': list(self.status),
- 'security_casing': self.security_casing,
- 'textPlainBody': self.text_plain_body,
- 'htmlBody': self.html_body,
- 'mailbox': self.mailbox_name.lower(),
- 'attachments': self.parts['attachments'] if self.parts else []}
- dict_mail['replying'] = {'single': None, 'all': {'to-field': [], 'cc-field': []}}
-
- sender_mail = self.headers.get('Reply-To', self.headers.get('From'))
- # Issue #215: Fix for existing mails without any from address.
- if sender_mail is None:
- sender_mail = InputMail.FROM_EMAIL_ADDRESS
-
- recipients = [recipient for recipient in self.headers['To'] if recipient != InputMail.FROM_EMAIL_ADDRESS]
- recipients.append(sender_mail)
- ccs = [cc for cc in self.headers['Cc'] if cc != InputMail.FROM_EMAIL_ADDRESS]
-
- dict_mail['replying']['single'] = sender_mail
- dict_mail['replying']['all']['to-field'] = recipients
- dict_mail['replying']['all']['cc-field'] = ccs
- return dict_mail
-
-
def welcome_mail():
current_path = os.path.dirname(os.path.abspath(__file__))
with open(os.path.join(current_path, '..', '..', 'assets', 'welcome.mail')) as mail_template_file: