From 91e4481c450eb7eb928debc1cb7fa59bdb63dd7b Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 25 Jul 2017 11:40:11 -0400 Subject: [pkg] packaging and path changes - move all the pixelated python package under src/ - move the pixelated_www package under the leap namespace - allow to set globally the static folder - add hours and minutes to the timestamp in package version, to allow for several releases a day. --- service/src/pixelated/support/mail_generator.py | 154 ++++++++++++++++++++++++ 1 file changed, 154 insertions(+) create mode 100644 service/src/pixelated/support/mail_generator.py (limited to 'service/src/pixelated/support/mail_generator.py') diff --git a/service/src/pixelated/support/mail_generator.py b/service/src/pixelated/support/mail_generator.py new file mode 100644 index 00000000..e5232370 --- /dev/null +++ b/service/src/pixelated/support/mail_generator.py @@ -0,0 +1,154 @@ +# +# Copyright (c) 2015 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see . + + +from email.mime.text import MIMEText +from email.utils import formatdate +from random import Random +from pixelated.support.markov import MarkovGenerator +import re +from collections import Counter +import time + + +def filter_two_line_on_wrote(lines): + skip_next = False + if len(lines) > 0: + for i in xrange(len(lines) - 1): + if skip_next: + skip_next = False + continue + + if lines[i].startswith('On') and lines[i + 1].endswith('wrote:'): + skip_next = True + else: + yield lines[i].strip() + + yield lines[-1] + + +def filter_lines(text): + pattern = re.compile('\s*[>-].*') + wrote_pattern = re.compile('\s*On.*wrote.*') + + lines = text.splitlines() + + lines = filter(lambda line: not pattern.match(line), lines) + lines = filter(lambda line: not len(line.strip()) == 0, lines) + lines = filter(lambda line: not wrote_pattern.match(line), lines) + lines = filter(lambda line: not line.endswith('writes:'), lines) + lines = filter(lambda line: ' ' in line.strip(), lines) + + lines = filter_two_line_on_wrote(lines) + + return ' '.join(lines) + + +def decode_multipart_mail_text(mail): + for payload in mail.get_payload(): + if payload.get_content_type() == 'text/plain': + return payload.get_payload(decode=True) + return '' + + +def search_for_tags(content): + words = content.split() + + only_alnum = filter(lambda word: word.isalnum(), words) + only_longer = filter(lambda word: len(word) > 5, only_alnum) + lower_case = map(lambda word: word.lower(), only_longer) + + counter = Counter(lower_case) + potential_tags = counter.most_common(10) + + return map(lambda tag: tag[0], potential_tags) + + +def filter_too_short_texts(texts): + return [text for text in texts if text is not None and len(text.split()) >= 3] + + +def load_all_mails(mail_list): + subjects = set() + mail_bodies = [] + + for mail in mail_list: + subjects.add(mail['Subject']) + if mail.is_multipart(): + mail_bodies.append(filter_lines(decode_multipart_mail_text(mail))) + else: + if mail.get_content_type() == 'text/plain': + mail_bodies.append(filter_lines(mail.get_payload(decode=True))) + else: + raise Exception(mail.get_content_type()) + + return filter_too_short_texts(subjects), filter_too_short_texts(mail_bodies) + + +class MailGenerator(object): + + NAMES = ['alice', 'bob', 'eve'] + + def __init__(self, receiver, domain_name, sample_mail_list, random=None): + self._random = random if random else Random() + self._receiver = receiver + self._domain_name = domain_name + self._subjects, self._bodies = load_all_mails(sample_mail_list) + + self._potential_tags = search_for_tags(' '.join(self._bodies)) + self._subject_markov = MarkovGenerator(self._subjects, random=self._random) + self._body_markov = MarkovGenerator(self._bodies, random=self._random, add_paragraph_on_empty_chain=True) + + def generate_mail(self): + body = self._body_markov.generate(150) + mail = MIMEText(body) + + mail['Subject'] = self._subject_markov.generate(8) + mail['To'] = '%s@%s' % (self._receiver, self._domain_name) + mail['From'] = self._random_from() + mail['Date'] = self._random_date() + mail['X-Tags'] = self._random_tags() + mail['X-Leap-Encryption'] = self._random_encryption_state() + mail['X-Leap-Signature'] = self._random_signature_state() + + return mail + + def _random_date(self): + now = int(time.time()) + ten_days = 60 * 60 * 24 * 10 + mail_time = self._random.randint(now - ten_days, now) + + return formatdate(mail_time) + + def _random_encryption_state(self): + return self._random.choice(['true', 'decrypted']) + + def _random_signature_state(self): + return self._random.choice(['could not verify', 'valid']) + + def _random_from(self): + name = self._random.choice(filter(lambda name: name != self._receiver, MailGenerator.NAMES)) + + return '%s@%s' % (name, self._domain_name) + + def _random_tags(self): + barrier = 0.5 + tags = set() + while self._random.random() > barrier: + tags.add(self._random.choice(self._potential_tags)) + barrier += 0.15 + + return ' '.join(tags) -- cgit v1.2.3