From 7b8c0d723e635f7a8cde3ceb4425426528ac8240 Mon Sep 17 00:00:00 2001 From: elijah Date: Thu, 26 May 2016 11:19:05 -0700 Subject: added support for processing message headers --- README.md | 68 ++++++++++++++++++ bin/parse-email-logs | 130 ++++++++++++++++++++++++++++++----- config/initializer.rb | 5 -- db/migrations/001_create_messages.rb | 7 +- 4 files changed, 185 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index dd59a16..abd0714 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,74 @@ USAGE rake reset cat postfix.log.1 | bin/parse-email-logs +DB NOTES +============================================ + +There is one record for each message delivery. This means that there might be +many records with duplicate queue_ids. Some messages never get successfully +delivered, and have status of 'deferred' or 'bounced'. + +A lot of messages never show up in the db, such as those rejected because they +had viruses or blocked by RBLs. + +Emails addresses are cleaned and then hashed using HMAC. For example: + + 1. Elijah + 2. elijah@riseup.net + 3. 31b8edad2227cc37ecead62bb14dcfe9@ff437a33d77574732ae1e09add6cfe49 + +The username and the domain parts are hashed separately. + +The fields: + +* id: sequence number. ignore it. + +* message_id: a hash of the actual message id in the headers. It might be empty + or missing. + +* queue_id: the id assigned to this delivery by postfix. messages with many + recipients might be spread across multiple queue_ids + +* first_seen_at: the first time a log entry with this queue_id appeared in the + logs. + +* date: the actual "Date" header. + +* sent_at: + * for incoming: the date header + * for outgoing: first_seen_at + +* received_at: + * for incoming: first_seen_at + * for outgoing: when the mx server logs status=sent + +* sender: the envelope sender, hashed + +* recipient: the envelope recipient, hashed. + +* from, to, cc, bcc: the addresses in respective headers, hashed. + +* message_size: the byte size of the entire message + +* spam_score: not currently gathered + +* subject_size: the number of characters in the "Subject" header + +* is_list: true if message was sent by a mailing list + +* is_outgoing: true if the message is outgoing + +* re_message_id: message id of another message that this message is in reply to + +* status: one of deferred, bounced, or sent. You can ignore all messages that + are not "sent". deferred messages might later get delivered, so we keep + these records when scanning the logs. + +* delay: I am not sure exactly what this is, but postfix logs it and it seems + interesting. + +* delays: again, not sure exactly what it is. + NOTES ============================================ diff --git a/bin/parse-email-logs b/bin/parse-email-logs index 2a24261..b0fd824 100755 --- a/bin/parse-email-logs +++ b/bin/parse-email-logs @@ -1,4 +1,4 @@ -#!/usr/bin/ruby +#!/usr/bin/env ruby require_relative '../config/initializer' @@ -9,14 +9,23 @@ end $input = nil def parse_command_line - if ARGV.grep(/-h/).any? - usage - end - if ARGV[0] && File.exist?(ARGV[0]) - $input = File.open(ARGV[0]) - else - $input = ARGF + while ARGV.any? do + case ARGV[0] + when '-h' then + usage + when '--help' then + usage + when '--debug' then + require 'byebug' + ARGV.shift + else + if ARGV[0] && File.exist?(ARGV[0]) + $input = File.open(ARGV[0]) + end + ARGV.shift + end end + $input ||= ARGF end def usage @@ -33,9 +42,9 @@ end def hash_addresses(str) str.split(',').map {|address| - address = address.sub(/<.*>/, '') + address = address.sub(/.*\"\']/, '') address.split('@').map {|segment| - segment #Digest::HMAC.hexdigest(segment, CONFIG['secret'], Digest::MD5) + Digest::HMAC.hexdigest(segment, CONFIG['secret'], Digest::MD5) }.join('@') }.join(',') end @@ -51,9 +60,16 @@ def get_message(queue_id, timestamp) end # -# if we see this, then it was incoming: "relay=0.0.0.0[0.0.0.0]:25" +# if we see this: +# relay=0.0.0.0[0.0.0.0]:25 +# then the message was an incoming message # -def do_sent(m, ts, matches) +# if the queue id is the same, but the recipient is different +# then duplicate the record. +# +def do_sent(m, ts, matches, line) + recipient = hash_addresses(matches["to"]) + return if m.recipient == recipient if m.recipient.nil? if matches['relay'] == "relay=0.0.0.0[0.0.0.0]:25" m.is_outgoing = false @@ -64,17 +80,37 @@ def do_sent(m, ts, matches) m.sent_at = m.first_seen_at m.received_at = parse_timestamp(ts) else - # sent_at will be set by the 'Date' header + m.sent_at = m.date m.received_at = m.first_seen_at end - m.recipient = hash_addresses(matches["to"]) + m.recipient = recipient m.orig_to = hash_addresses(matches["orig_to"]) if matches["orig_to"] + m.delay = matches['delay'].to_f + m.delays = matches['delays'] + m.status = "sent" m.save else - + new_m = m.dup + new_m.received_at = parse_timestamp(ts) + new_m.recipient = recipient + new_m.orig_to = hash_addresses(matches["orig_to"]) if matches["orig_to"] + new_m.delay = matches['delay'].to_f + new_m.delays = matches['delays'] + new_m.status = "sent" + new_m.save end end +# +# for status=x where x != sent +# +def do_status(m, ts, matches) + m.delay = matches['delay'].to_f + m.delays = matches['delays'] + m.status = matches['status'] + m.save +end + # # save the message size and the envelope sender # @@ -95,6 +131,48 @@ def do_message_id(m, ts, matches, line) m.save end +# +# headers +# +def do_subject(m, ts, matches) + m.subject_length = matches['subject'].length + m.save +end +def do_date(m, ts, matches) + m.date = matches['date'] + m.save +end +def do_from(m, ts, matches) + m.from = hash_addresses(matches['from']) + m.save +end +def do_to(m, ts, matches) + m.to = hash_addresses(matches['to']) + m.save +end +def do_cc(m, ts, matches) + m.cc = hash_addresses(matches['cc']) + m.save +end +def do_bcc(m, ts, matches) + m.bcc = hash_addresses(matches['bcc']) + m.save +end +def do_list(m, ts, matches) + m.is_list = true + m.save +end +def do_in_reply_to(m, ts, matches) + m.re_message_id = hash_addresses(matches['in-reply-to']) + m.save +end +def do_precedence(m, ts, matches) + if matches['precedence'] =~ /(bulk|list)/i + m.is_list = true + m.save + end +end + # # the message was rejected, likely because milter scan thinks it is a virus. # so we remove the record from the database @@ -104,14 +182,19 @@ def do_purge_message(m, ts, matches) end def do_error(line) + puts puts "ERROR: unmatched line!" - puts " " + line + puts line + puts end +FROM_HOST = /(local|[A-Za-z0-9\.\-]+\[0.0.0.0\]);/ + LINE_PARSE_MAP = { 'postfix/smtp' => { - /to=<(?.*?)>, (orig_to=<(?.*?)>, )?(?relay=.*?),.*status=sent/ => method(:do_sent), - /status=sent/ => :error + /to=<(?.*?)>, (orig_to=<(?.*?)>, )?(?relay=.*?), delay=(?[0-9\.]+), delays=(?[0-9\.\/]+), .*status=sent/ => method(:do_sent), + /delay=(?[0-9\.]+), delays=(?[0-9\.\/]+), .*status=(?[a-z]+) / => method(:do_status), + /status=/ => :error }, 'postfix/qmgr' => { /from=<(?.*)>, size=(?\d+),.*\(queue active\)/ => method(:do_queue), @@ -119,6 +202,17 @@ LINE_PARSE_MAP = { }, 'postfix/cleanup' => { /message-id=(?.*)$/ => method(:do_message_id), + /info: header Subject: (?.*) from #{FROM_HOST}/i => method(:do_subject), + /info: header From: (?.*) from #{FROM_HOST}/i => method(:do_from), + /info: header To: (?.*) from #{FROM_HOST}/i => method(:do_to), + /info: header Cc: (?.*) from #{FROM_HOST}/i => method(:do_cc), + /info: header Bcc: (?.*) from #{FROM_HOST}/i => method(:do_bcc), + /info: header In-Reply-To: (?.*) from #{FROM_HOST}/i => method(:do_in_reply_to), + /info: header Precedence: (?.*) from #{FROM_HOST}/i => method(:do_precedence), + /info: header List-ID/i => method(:do_list), + /info: header Mailing-list/i => method(:do_list), + /info: header Date: (?.*) from #{FROM_HOST}/i => method(:do_date), + /info: header / => :error, /milter-reject/ => method(:do_purge_message), // => :error } diff --git a/config/initializer.rb b/config/initializer.rb index c8e5cca..6819d0f 100644 --- a/config/initializer.rb +++ b/config/initializer.rb @@ -5,11 +5,6 @@ require 'yaml' require 'date' require 'digest/hmac' require 'active_record' -#require 'active_support' - -if ARGV.grep(/--debug/) - require 'byebug' -end CONFIG = YAML.load( File.read(File.join(ROOT, 'config','config.yml')) diff --git a/db/migrations/001_create_messages.rb b/db/migrations/001_create_messages.rb index 9ff04e8..a618ebe 100644 --- a/db/migrations/001_create_messages.rb +++ b/db/migrations/001_create_messages.rb @@ -19,8 +19,11 @@ class CreateMessages < ActiveRecord::Migration t.integer :spam_score t.integer :subject_length t.boolean :is_list, :default => false - t.boolean :is_outgoing, :default => nil - t.string :re_message_id, :default => nil + t.boolean :is_outgoing + t.string :re_message_id + t.string :status + t.float :delay + t.string :delays end add_index :messages, :message_id end -- cgit v1.2.3