summaryrefslogtreecommitdiff
path: root/service/pixelated/support
diff options
context:
space:
mode:
authorFolker Bernitt <fbernitt@thoughtworks.com>2015-10-28 12:05:59 +0100
committerFolker Bernitt <fbernitt@thoughtworks.com>2015-10-28 12:08:41 +0100
commit70c770635199cfa473608162ec7d31e030a11c5f (patch)
tree5bb5d9ff72a6edabf756256ca30eed0a21e180a4 /service/pixelated/support
parent41b462e9b29d62dc197be6d8a633c1b9a46688cf (diff)
Add markov-generate to load-mails
- Allows to generat mails based on a sample mails - use it with: pixelated-maintenance markov-generate --seed 21 --limit 10
Diffstat (limited to 'service/pixelated/support')
-rw-r--r--service/pixelated/support/mail_generator.py150
-rw-r--r--service/pixelated/support/markov.py94
2 files changed, 244 insertions, 0 deletions
diff --git a/service/pixelated/support/mail_generator.py b/service/pixelated/support/mail_generator.py
new file mode 100644
index 00000000..af8dd4cc
--- /dev/null
+++ b/service/pixelated/support/mail_generator.py
@@ -0,0 +1,150 @@
+#
+# Copyright (c) 2015 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+
+
+from email.mime.text import MIMEText
+from email.utils import formatdate
+from random import Random
+from pixelated.support.markov import MarkovGenerator
+import re
+from collections import Counter
+import time
+
+
+def filter_two_line_on_wrote(lines):
+ skip_next = False
+ if len(lines) > 0:
+ for i in xrange(len(lines) - 1):
+ if skip_next:
+ skip_next = False
+ continue
+
+ if lines[i].startswith('On') and lines[i + 1].endswith('wrote:'):
+ skip_next = True
+ else:
+ yield lines[i].strip()
+
+ yield lines[-1]
+
+
+def filter_lines(text):
+ pattern = re.compile('\s*[>-].*')
+ wrote_pattern = re.compile('\s*On.*wrote.*')
+
+ lines = text.splitlines()
+
+ lines = filter(lambda line: not pattern.match(line), lines)
+ lines = filter(lambda line: not len(line.strip()) == 0, lines)
+ lines = filter(lambda line: not wrote_pattern.match(line), lines)
+ lines = filter(lambda line: not line.endswith('writes:'), lines)
+ lines = filter(lambda line: ' ' in line.strip(), lines)
+
+ lines = filter_two_line_on_wrote(lines)
+
+ return ' '.join(lines)
+
+
+def decode_multipart_mail_text(mail):
+ for payload in mail.get_payload():
+ if payload.get_content_type() == 'text/plain':
+ return payload.get_payload(decode=True)
+ return ''
+
+
+def search_for_tags(content):
+ words = content.split()
+
+ only_alnum = filter(lambda word: word.isalnum(), words)
+ only_longer = filter(lambda word: len(word) > 5, only_alnum)
+ lower_case = map(lambda word: word.lower(), only_longer)
+
+ counter = Counter(lower_case)
+ potential_tags = counter.most_common(10)
+
+ return map(lambda tag: tag[0], potential_tags)
+
+
+def load_all_mails(mail_list):
+ subjects = set()
+ mail_bodies = []
+
+ for mail in mail_list:
+ subjects.add(mail['Subject'])
+ if mail.is_multipart():
+ mail_bodies.append(filter_lines(decode_multipart_mail_text(mail)))
+ else:
+ if mail.get_content_type() == 'text/plain':
+ mail_bodies.append(filter_lines(mail.get_payload(decode=True)))
+ else:
+ raise Exception(mail.get_content_type())
+
+ return subjects, mail_bodies
+
+
+class MailGenerator(object):
+
+ NAMES = ['alice', 'bob', 'eve']
+
+ def __init__(self, receiver, domain_name, sample_mail_list, random=None):
+ self._random = random if random else Random()
+ self._receiver = receiver
+ self._domain_name = domain_name
+ self._subjects, self._bodies = load_all_mails(sample_mail_list)
+
+ self._potential_tags = search_for_tags(' '.join(self._bodies))
+ self._subject_markov = MarkovGenerator(self._subjects, random=self._random)
+ self._body_markov = MarkovGenerator(self._bodies, random=self._random, add_paragraph_on_empty_chain=True)
+
+ def generate_mail(self):
+ body = self._body_markov.generate(150)
+ mail = MIMEText(body)
+
+ mail['Subject'] = self._subject_markov.generate(8)
+ mail['To'] = '%s@%s' % (self._receiver, self._domain_name)
+ mail['From'] = self._random_from()
+ mail['Date'] = self._random_date()
+ mail['X-Tags'] = self._random_tags()
+ mail['X-Leap-Encryption'] = self._random_encryption_state()
+ mail['X-Leap-Signature'] = self._random_signature_state()
+
+ return mail
+
+ def _random_date(self):
+ now = int(time.time())
+ ten_days = 60 * 60 * 24 * 10
+ mail_time = self._random.randint(now - ten_days, now)
+
+ return formatdate(mail_time)
+
+ def _random_encryption_state(self):
+ return self._random.choice(['true', 'decrypted'])
+
+ def _random_signature_state(self):
+ return self._random.choice(['could not verify', 'valid'])
+
+ def _random_from(self):
+ name = self._random.choice(filter(lambda name: name != self._receiver, MailGenerator.NAMES))
+
+ return '%s@%s' % (name, self._domain_name)
+
+ def _random_tags(self):
+ barrier = 0.5
+ tags = set()
+ while self._random.random() > barrier:
+ tags.add(self._random.choice(self._potential_tags))
+ barrier += 0.15
+
+ return ' '.join(tags)
diff --git a/service/pixelated/support/markov.py b/service/pixelated/support/markov.py
new file mode 100644
index 00000000..8f7c0ef3
--- /dev/null
+++ b/service/pixelated/support/markov.py
@@ -0,0 +1,94 @@
+#
+# Copyright (c) 2015 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+
+from random import Random
+
+NEW_PARAGRAPH = '\n\n'
+
+
+class MarkovGenerator(object):
+
+ def __init__(self, texts, random=None, add_paragraph_on_empty_chain=False):
+ self._markov_chain = {}
+ self._random = random if random else Random()
+ self._add_paragraph_on_empty_chain = add_paragraph_on_empty_chain
+
+ for text in filter(lambda _: _ is not None, texts):
+ self._extend_chain_with(text)
+
+ def add(self, text):
+ self._extend_chain_with(text)
+
+ @staticmethod
+ def _triplet_generator(words):
+ if len(words) < 3:
+ raise ValueError('Expected input with at least three words')
+
+ for i in xrange(len(words) - 2):
+ yield ((words[i], words[i + 1]), words[i + 2])
+
+ def _extend_chain_with(self, input_text):
+ words = input_text.split()
+ gen = self._triplet_generator(words)
+
+ for key, value in gen:
+ if key in self._markov_chain:
+ self._markov_chain[key].add(value)
+ else:
+ self._markov_chain[key] = {value}
+
+ def _generate_chain(self, length):
+ seed_pair = self._find_good_seed()
+ word, next_word = seed_pair
+ new_seed = False
+
+ for i in xrange(length):
+ yield word
+
+ if new_seed:
+ word, next_word = self._find_good_seed()
+ if self._add_paragraph_on_empty_chain:
+ yield NEW_PARAGRAPH
+ new_seed = False
+ else:
+ prev_word, word = word, next_word
+
+ try:
+ next_word = self._random_next_word(prev_word, word)
+ except KeyError:
+ new_seed = True
+
+ def _random_next_word(self, prev_word, word):
+ return self._random.choice(list(self._markov_chain[(prev_word, word)]))
+
+ def _find_good_seed(self):
+ max_tries = len(self._markov_chain.keys())
+ try_count = 0
+
+ seed_pair = self._random.choice(self._markov_chain.keys())
+ while not seed_pair[0][0].isupper() and try_count <= max_tries:
+ seed_pair = self._random.choice(self._markov_chain.keys())
+ try_count += 1
+
+ if try_count > max_tries:
+ raise ValueError('Not able find start word with captial letter')
+
+ return seed_pair
+
+ def generate(self, length):
+ if len(self._markov_chain.keys()) == 0:
+ raise ValueError('Expected at least three words input')
+ return ' '.join(self._generate_chain(length))