summaryrefslogtreecommitdiff
path: root/service/pixelated/adapter
diff options
context:
space:
mode:
authorBruno Wagner <bwagner@riseup.net>2015-09-10 15:51:14 -0300
committerBruno Wagner <bwagner@riseup.net>2015-09-10 15:51:14 -0300
commit8d2dac75d667818ea28abc0c25f45e4ce4b3f6c8 (patch)
treed36d21cf6a09997dd8f9e54c795afc91a76872fb /service/pixelated/adapter
parent6f708c2b53c8faa2bc3d73891413a44d33044237 (diff)
Removed PixelatedMail class leftovers
We removed Pixelated mail tests and adapted the remaining tests to use Leap mail instead of it. Completely removed that now
Diffstat (limited to 'service/pixelated/adapter')
-rw-r--r--service/pixelated/adapter/model/mail.py244
1 files changed, 0 insertions, 244 deletions
diff --git a/service/pixelated/adapter/model/mail.py b/service/pixelated/adapter/model/mail.py
index 524ca48d..31eb0654 100644
--- a/service/pixelated/adapter/model/mail.py
+++ b/service/pixelated/adapter/model/mail.py
@@ -245,13 +245,10 @@ class InputMail(Mail):
input_mail = InputMail()
input_mail.headers = {key.capitalize(): value for key, value in mail_dict.get('header', {}).items()}
- # XXX this is overriding the property in PixelatedMail
input_mail.headers['Date'] = date.iso_now()
- # XXX this is overriding the property in PixelatedMail
input_mail.body = mail_dict.get('body', '')
- # XXX this is overriding the property in the PixelatedMail
input_mail.tags = set(mail_dict.get('tags', []))
input_mail._status = set(mail_dict.get('status', []))
@@ -272,247 +269,6 @@ class InputMail(Mail):
return input_mail
-class PixelatedMail(Mail):
-
- @staticmethod
- def from_soledad(fdoc, hdoc, bdoc, parts=None):
- mail = PixelatedMail()
- mail.parts = parts
- mail.boundary = str(uuid4()).replace('-', '')
- mail.bdoc = bdoc
- mail.fdoc = fdoc
- mail.hdoc = hdoc
- mail._mime = None
- return mail
-
- def _decode_part(self, part):
- encoding = part['headers'].get('Content-Transfer-Encoding', '')
- content_type = self._parse_charset_header(part['headers'].get('Content-Type'))
-
- try:
- decoding_func = self._decoding_function_for_encoding(encoding)
- return self._decode_content_with_fallback(part['content'], decoding_func, content_type)
- except Exception:
- logger.error('Failed to decode mail part with:')
- logger.error('Content-Transfer-Encoding: %s' % encoding)
- logger.error('Content-Type: %s' % part['headers'].get('Content-Type'))
- raise
-
- def _decoding_function_for_encoding(self, encoding):
- decoding_map = {
- 'quoted-printable': lambda content, content_type: content.decode('quopri').decode(content_type),
- 'base64': lambda content, content_type: content.decode('base64').decode('utf-8'),
- '7bit': lambda content, content_type: content.encode(content_type),
- '8bit': lambda content, content_type: content.encode(content_type)
- }
- if encoding in decoding_map:
- return decoding_map[encoding]
- else:
- return decoding_map['8bit']
-
- def _decode_content_with_fallback(self, content, decode_func, content_type):
- try:
- return decode_func(content, content_type)
- # return content.encode(content_type)
- except ValueError:
- return content.encode('ascii', 'ignore')
-
- @property
- def alternatives(self):
- return self.parts.get('alternatives')
-
- @property
- def text_plain_body(self):
- if self.parts and len(self.alternatives) >= 1:
- return self._decode_part(self.alternatives[0])
- else:
- return self.bdoc.content['raw'] # plain
-
- @property
- def html_body(self):
- if self.parts and len(self.alternatives) > 1:
- html_parts = [e for e in self.alternatives if re.match('text/html', e['headers'].get('Content-Type', ''))]
- if len(html_parts):
- return self._decode_part(html_parts[0])
-
- @property
- def headers(self):
- _headers = {
- 'To': [],
- 'Cc': [],
- 'Bcc': []
- }
- hdoc_headers = self.hdoc.content['headers']
-
- for header in ['To', 'Cc', 'Bcc']:
- header_value = self._decode_header(hdoc_headers.get(header))
- if not header_value:
- continue
- _headers[header] = header_value if type(header_value) is list else header_value.split(',')
- _headers[header] = [head.strip() for head in compact(_headers[header])]
-
- for header in ['From', 'Subject']:
- _headers[header] = self._decode_header(hdoc_headers.get(header))
-
- try:
- _headers['Date'] = self._get_date()
- except Exception:
- _headers['Date'] = date.iso_now()
-
- if self.parts and len(self.parts['alternatives']) > 1:
- _headers['content_type'] = 'multipart/alternative; boundary="%s"' % self.boundary
- elif self.hdoc.content['headers'].get('Content-Type'):
- _headers['content_type'] = hdoc_headers.get('Content-Type')
-
- if hdoc_headers.get('Reply-To'):
- _headers['Reply-To'] = hdoc_headers.get('Reply-To')
-
- return _headers
-
- def _decode_header_with_fallback(self, entry):
- try:
- return decode_header(entry)[0][0]
- except Exception:
- return entry.encode('ascii', 'ignore')
-
- def _decode_header(self, header):
- if not header:
- return None
- if isinstance(header, list):
- return [self._decode_header_with_fallback(entry) for entry in header]
- else:
- return self._decode_header_with_fallback(header)
-
- def _get_date(self):
- date = self.hdoc.content.get('date', None)
- try:
- if not date:
- received = self.hdoc.content.get('received', None)
- if received:
- date = received.split(";")[-1].strip()
- else:
- # we can't get a date for this mail, so lets just use now
- logger.warning('Encountered a mail with missing date and received header fields. ID %s' % self.fdoc.content.get('uid', None))
- date = date.iso_now()
- return dateparser.parse(date).isoformat()
- except (ValueError, TypeError):
- date = date.iso_now()
- return dateparser.parse(date).isoformat()
-
- @property
- def security_casing(self):
- casing = {"imprints": [], "locks": []}
- casing["imprints"] = self.signature_information
- if self.encrypted == "true":
- casing["locks"] = [{"state": "valid"}]
- elif self.encrypted == "fail":
- casing["locks"] = [{"state": "failure"}]
- return casing
-
- @property
- def tags(self):
- _tags = self.fdoc.content.get('tags', '[]')
- return set(_tags) if type(_tags) is list or type(_tags) is set else set(json.loads(_tags))
-
- @property
- def ident(self):
- return self.fdoc.content.get('chash')
-
- @property
- def mailbox_name(self):
- # FIXME mbox is no longer available, instead we now have mbox_uuid
- return self.fdoc.content.get('mbox', 'INBOX')
-
- @property
- def is_recent(self):
- return Status('recent') in self.status
-
- @property
- def uid(self):
- return self.fdoc.content['uid']
-
- @property
- def flags(self):
- return self.fdoc.content['flags']
-
- def set_mailbox(self, mailbox_name):
- self.fdoc.content['mbox'] = mailbox_name
-
- def has_tag(self, tag):
- return tag in self.tags
-
- @property
- def signature_information(self):
- signature = self.hdoc.content["headers"].get("X-Leap-Signature", None)
- if signature is None or signature.startswith("could not verify"):
- return [{"state": "no_signature_information"}]
- else:
- if signature.startswith("valid"):
- return [{"state": "valid", "seal": {"validity": "valid"}}]
- else:
- return []
-
- @property
- def encrypted(self):
- return self.hdoc.content["headers"].get("X-Pixelated-encryption-status", "false")
-
- @property
- def bounced(self):
- content_type = self.hdoc.content["headers"].get("Content-Type", '')
- if re.compile('delivery-status').search(content_type):
- bounce_recipient = self._extract_bounced_address(self.hdoc.content)
- bounce_daemon = self.headers["From"]
- return [bounce_recipient, bounce_daemon] if bounce_recipient else False
-
- return False
-
- def _extract_bounced_address(self, part):
- part_header = dict(part.get('headers', {}))
- if 'Final-Recipient' in part_header:
- if self._bounce_permanent(part_header):
- return part_header['Final-Recipient'].split(';')[1].strip()
- else:
- return False
- elif 'part_map' in part:
- for subpart in part['part_map'].values():
- result = self._extract_bounced_address(subpart)
- if result:
- return result
- else:
- continue
- return False
-
- def _bounce_permanent(self, part_headers):
- status = part_headers.get('Status', '')
- return status.startswith('5')
-
- def as_dict(self):
- dict_mail = {'header': {k.lower(): v for k, v in self.headers.items()},
- 'ident': self.ident,
- 'tags': list(self.tags),
- 'status': list(self.status),
- 'security_casing': self.security_casing,
- 'textPlainBody': self.text_plain_body,
- 'htmlBody': self.html_body,
- 'mailbox': self.mailbox_name.lower(),
- 'attachments': self.parts['attachments'] if self.parts else []}
- dict_mail['replying'] = {'single': None, 'all': {'to-field': [], 'cc-field': []}}
-
- sender_mail = self.headers.get('Reply-To', self.headers.get('From'))
- # Issue #215: Fix for existing mails without any from address.
- if sender_mail is None:
- sender_mail = InputMail.FROM_EMAIL_ADDRESS
-
- recipients = [recipient for recipient in self.headers['To'] if recipient != InputMail.FROM_EMAIL_ADDRESS]
- recipients.append(sender_mail)
- ccs = [cc for cc in self.headers['Cc'] if cc != InputMail.FROM_EMAIL_ADDRESS]
-
- dict_mail['replying']['single'] = sender_mail
- dict_mail['replying']['all']['to-field'] = recipients
- dict_mail['replying']['all']['cc-field'] = ccs
- return dict_mail
-
-
def welcome_mail():
current_path = os.path.dirname(os.path.abspath(__file__))
with open(os.path.join(current_path, '..', '..', 'assets', 'welcome.mail')) as mail_template_file: