From 70c770635199cfa473608162ec7d31e030a11c5f Mon Sep 17 00:00:00 2001 From: Folker Bernitt Date: Wed, 28 Oct 2015 12:05:59 +0100 Subject: Add markov-generate to load-mails - Allows to generat mails based on a sample mails - use it with: pixelated-maintenance markov-generate --seed 21 --limit 10 --- service/pixelated/support/mail_generator.py | 150 ++++++++++++++++++++++++++++ service/pixelated/support/markov.py | 94 +++++++++++++++++ 2 files changed, 244 insertions(+) create mode 100644 service/pixelated/support/mail_generator.py create mode 100644 service/pixelated/support/markov.py (limited to 'service/pixelated/support') diff --git a/service/pixelated/support/mail_generator.py b/service/pixelated/support/mail_generator.py new file mode 100644 index 00000000..af8dd4cc --- /dev/null +++ b/service/pixelated/support/mail_generator.py @@ -0,0 +1,150 @@ +# +# Copyright (c) 2015 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see . + + +from email.mime.text import MIMEText +from email.utils import formatdate +from random import Random +from pixelated.support.markov import MarkovGenerator +import re +from collections import Counter +import time + + +def filter_two_line_on_wrote(lines): + skip_next = False + if len(lines) > 0: + for i in xrange(len(lines) - 1): + if skip_next: + skip_next = False + continue + + if lines[i].startswith('On') and lines[i + 1].endswith('wrote:'): + skip_next = True + else: + yield lines[i].strip() + + yield lines[-1] + + +def filter_lines(text): + pattern = re.compile('\s*[>-].*') + wrote_pattern = re.compile('\s*On.*wrote.*') + + lines = text.splitlines() + + lines = filter(lambda line: not pattern.match(line), lines) + lines = filter(lambda line: not len(line.strip()) == 0, lines) + lines = filter(lambda line: not wrote_pattern.match(line), lines) + lines = filter(lambda line: not line.endswith('writes:'), lines) + lines = filter(lambda line: ' ' in line.strip(), lines) + + lines = filter_two_line_on_wrote(lines) + + return ' '.join(lines) + + +def decode_multipart_mail_text(mail): + for payload in mail.get_payload(): + if payload.get_content_type() == 'text/plain': + return payload.get_payload(decode=True) + return '' + + +def search_for_tags(content): + words = content.split() + + only_alnum = filter(lambda word: word.isalnum(), words) + only_longer = filter(lambda word: len(word) > 5, only_alnum) + lower_case = map(lambda word: word.lower(), only_longer) + + counter = Counter(lower_case) + potential_tags = counter.most_common(10) + + return map(lambda tag: tag[0], potential_tags) + + +def load_all_mails(mail_list): + subjects = set() + mail_bodies = [] + + for mail in mail_list: + subjects.add(mail['Subject']) + if mail.is_multipart(): + mail_bodies.append(filter_lines(decode_multipart_mail_text(mail))) + else: + if mail.get_content_type() == 'text/plain': + mail_bodies.append(filter_lines(mail.get_payload(decode=True))) + else: + raise Exception(mail.get_content_type()) + + return subjects, mail_bodies + + +class MailGenerator(object): + + NAMES = ['alice', 'bob', 'eve'] + + def __init__(self, receiver, domain_name, sample_mail_list, random=None): + self._random = random if random else Random() + self._receiver = receiver + self._domain_name = domain_name + self._subjects, self._bodies = load_all_mails(sample_mail_list) + + self._potential_tags = search_for_tags(' '.join(self._bodies)) + self._subject_markov = MarkovGenerator(self._subjects, random=self._random) + self._body_markov = MarkovGenerator(self._bodies, random=self._random, add_paragraph_on_empty_chain=True) + + def generate_mail(self): + body = self._body_markov.generate(150) + mail = MIMEText(body) + + mail['Subject'] = self._subject_markov.generate(8) + mail['To'] = '%s@%s' % (self._receiver, self._domain_name) + mail['From'] = self._random_from() + mail['Date'] = self._random_date() + mail['X-Tags'] = self._random_tags() + mail['X-Leap-Encryption'] = self._random_encryption_state() + mail['X-Leap-Signature'] = self._random_signature_state() + + return mail + + def _random_date(self): + now = int(time.time()) + ten_days = 60 * 60 * 24 * 10 + mail_time = self._random.randint(now - ten_days, now) + + return formatdate(mail_time) + + def _random_encryption_state(self): + return self._random.choice(['true', 'decrypted']) + + def _random_signature_state(self): + return self._random.choice(['could not verify', 'valid']) + + def _random_from(self): + name = self._random.choice(filter(lambda name: name != self._receiver, MailGenerator.NAMES)) + + return '%s@%s' % (name, self._domain_name) + + def _random_tags(self): + barrier = 0.5 + tags = set() + while self._random.random() > barrier: + tags.add(self._random.choice(self._potential_tags)) + barrier += 0.15 + + return ' '.join(tags) diff --git a/service/pixelated/support/markov.py b/service/pixelated/support/markov.py new file mode 100644 index 00000000..8f7c0ef3 --- /dev/null +++ b/service/pixelated/support/markov.py @@ -0,0 +1,94 @@ +# +# Copyright (c) 2015 ThoughtWorks, Inc. +# +# Pixelated is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pixelated is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Pixelated. If not, see . + +from random import Random + +NEW_PARAGRAPH = '\n\n' + + +class MarkovGenerator(object): + + def __init__(self, texts, random=None, add_paragraph_on_empty_chain=False): + self._markov_chain = {} + self._random = random if random else Random() + self._add_paragraph_on_empty_chain = add_paragraph_on_empty_chain + + for text in filter(lambda _: _ is not None, texts): + self._extend_chain_with(text) + + def add(self, text): + self._extend_chain_with(text) + + @staticmethod + def _triplet_generator(words): + if len(words) < 3: + raise ValueError('Expected input with at least three words') + + for i in xrange(len(words) - 2): + yield ((words[i], words[i + 1]), words[i + 2]) + + def _extend_chain_with(self, input_text): + words = input_text.split() + gen = self._triplet_generator(words) + + for key, value in gen: + if key in self._markov_chain: + self._markov_chain[key].add(value) + else: + self._markov_chain[key] = {value} + + def _generate_chain(self, length): + seed_pair = self._find_good_seed() + word, next_word = seed_pair + new_seed = False + + for i in xrange(length): + yield word + + if new_seed: + word, next_word = self._find_good_seed() + if self._add_paragraph_on_empty_chain: + yield NEW_PARAGRAPH + new_seed = False + else: + prev_word, word = word, next_word + + try: + next_word = self._random_next_word(prev_word, word) + except KeyError: + new_seed = True + + def _random_next_word(self, prev_word, word): + return self._random.choice(list(self._markov_chain[(prev_word, word)])) + + def _find_good_seed(self): + max_tries = len(self._markov_chain.keys()) + try_count = 0 + + seed_pair = self._random.choice(self._markov_chain.keys()) + while not seed_pair[0][0].isupper() and try_count <= max_tries: + seed_pair = self._random.choice(self._markov_chain.keys()) + try_count += 1 + + if try_count > max_tries: + raise ValueError('Not able find start word with captial letter') + + return seed_pair + + def generate(self, length): + if len(self._markov_chain.keys()) == 0: + raise ValueError('Expected at least three words input') + return ' '.join(self._generate_chain(length)) -- cgit v1.2.3