summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorelijah <elijah@riseup.net>2016-05-26 11:19:05 -0700
committerelijah <elijah@riseup.net>2016-05-26 11:19:05 -0700
commit7b8c0d723e635f7a8cde3ceb4425426528ac8240 (patch)
treefb3589665d21feb8dcc386336be073a48f97dce9
parentc8550e3712ffed2fb9c93f6ca423b0a49849ad36 (diff)
added support for processing message headersHEADmaster
-rw-r--r--README.md68
-rwxr-xr-xbin/parse-email-logs130
-rw-r--r--config/initializer.rb5
-rw-r--r--db/migrations/001_create_messages.rb7
4 files changed, 185 insertions, 25 deletions
diff --git a/README.md b/README.md
index dd59a16..abd0714 100644
--- a/README.md
+++ b/README.md
@@ -15,6 +15,74 @@ USAGE
rake reset
cat postfix.log.1 | bin/parse-email-logs
+DB NOTES
+============================================
+
+There is one record for each message delivery. This means that there might be
+many records with duplicate queue_ids. Some messages never get successfully
+delivered, and have status of 'deferred' or 'bounced'.
+
+A lot of messages never show up in the db, such as those rejected because they
+had viruses or blocked by RBLs.
+
+Emails addresses are cleaned and then hashed using HMAC. For example:
+
+ 1. Elijah <elijah@riseup.net>
+ 2. elijah@riseup.net
+ 3. 31b8edad2227cc37ecead62bb14dcfe9@ff437a33d77574732ae1e09add6cfe49
+
+The username and the domain parts are hashed separately.
+
+The fields:
+
+* id: sequence number. ignore it.
+
+* message_id: a hash of the actual message id in the headers. It might be empty
+ or missing.
+
+* queue_id: the id assigned to this delivery by postfix. messages with many
+ recipients might be spread across multiple queue_ids
+
+* first_seen_at: the first time a log entry with this queue_id appeared in the
+ logs.
+
+* date: the actual "Date" header.
+
+* sent_at:
+ * for incoming: the date header
+ * for outgoing: first_seen_at
+
+* received_at:
+ * for incoming: first_seen_at
+ * for outgoing: when the mx server logs status=sent
+
+* sender: the envelope sender, hashed
+
+* recipient: the envelope recipient, hashed.
+
+* from, to, cc, bcc: the addresses in respective headers, hashed.
+
+* message_size: the byte size of the entire message
+
+* spam_score: not currently gathered
+
+* subject_size: the number of characters in the "Subject" header
+
+* is_list: true if message was sent by a mailing list
+
+* is_outgoing: true if the message is outgoing
+
+* re_message_id: message id of another message that this message is in reply to
+
+* status: one of deferred, bounced, or sent. You can ignore all messages that
+ are not "sent". deferred messages might later get delivered, so we keep
+ these records when scanning the logs.
+
+* delay: I am not sure exactly what this is, but postfix logs it and it seems
+ interesting.
+
+* delays: again, not sure exactly what it is.
+
NOTES
============================================
diff --git a/bin/parse-email-logs b/bin/parse-email-logs
index 2a24261..b0fd824 100755
--- a/bin/parse-email-logs
+++ b/bin/parse-email-logs
@@ -1,4 +1,4 @@
-#!/usr/bin/ruby
+#!/usr/bin/env ruby
require_relative '../config/initializer'
@@ -9,14 +9,23 @@ end
$input = nil
def parse_command_line
- if ARGV.grep(/-h/).any?
- usage
- end
- if ARGV[0] && File.exist?(ARGV[0])
- $input = File.open(ARGV[0])
- else
- $input = ARGF
+ while ARGV.any? do
+ case ARGV[0]
+ when '-h' then
+ usage
+ when '--help' then
+ usage
+ when '--debug' then
+ require 'byebug'
+ ARGV.shift
+ else
+ if ARGV[0] && File.exist?(ARGV[0])
+ $input = File.open(ARGV[0])
+ end
+ ARGV.shift
+ end
end
+ $input ||= ARGF
end
def usage
@@ -33,9 +42,9 @@ end
def hash_addresses(str)
str.split(',').map {|address|
- address = address.sub(/<.*>/, '')
+ address = address.sub(/.*</, '').sub(/[>\"\']/, '')
address.split('@').map {|segment|
- segment #Digest::HMAC.hexdigest(segment, CONFIG['secret'], Digest::MD5)
+ Digest::HMAC.hexdigest(segment, CONFIG['secret'], Digest::MD5)
}.join('@')
}.join(',')
end
@@ -51,9 +60,16 @@ def get_message(queue_id, timestamp)
end
#
-# if we see this, then it was incoming: "relay=0.0.0.0[0.0.0.0]:25"
+# if we see this:
+# relay=0.0.0.0[0.0.0.0]:25
+# then the message was an incoming message
#
-def do_sent(m, ts, matches)
+# if the queue id is the same, but the recipient is different
+# then duplicate the record.
+#
+def do_sent(m, ts, matches, line)
+ recipient = hash_addresses(matches["to"])
+ return if m.recipient == recipient
if m.recipient.nil?
if matches['relay'] == "relay=0.0.0.0[0.0.0.0]:25"
m.is_outgoing = false
@@ -64,18 +80,38 @@ def do_sent(m, ts, matches)
m.sent_at = m.first_seen_at
m.received_at = parse_timestamp(ts)
else
- # sent_at will be set by the 'Date' header
+ m.sent_at = m.date
m.received_at = m.first_seen_at
end
- m.recipient = hash_addresses(matches["to"])
+ m.recipient = recipient
m.orig_to = hash_addresses(matches["orig_to"]) if matches["orig_to"]
+ m.delay = matches['delay'].to_f
+ m.delays = matches['delays']
+ m.status = "sent"
m.save
else
-
+ new_m = m.dup
+ new_m.received_at = parse_timestamp(ts)
+ new_m.recipient = recipient
+ new_m.orig_to = hash_addresses(matches["orig_to"]) if matches["orig_to"]
+ new_m.delay = matches['delay'].to_f
+ new_m.delays = matches['delays']
+ new_m.status = "sent"
+ new_m.save
end
end
#
+# for status=x where x != sent
+#
+def do_status(m, ts, matches)
+ m.delay = matches['delay'].to_f
+ m.delays = matches['delays']
+ m.status = matches['status']
+ m.save
+end
+
+#
# save the message size and the envelope sender
#
def do_queue(m, ts, matches)
@@ -96,6 +132,48 @@ def do_message_id(m, ts, matches, line)
end
#
+# headers
+#
+def do_subject(m, ts, matches)
+ m.subject_length = matches['subject'].length
+ m.save
+end
+def do_date(m, ts, matches)
+ m.date = matches['date']
+ m.save
+end
+def do_from(m, ts, matches)
+ m.from = hash_addresses(matches['from'])
+ m.save
+end
+def do_to(m, ts, matches)
+ m.to = hash_addresses(matches['to'])
+ m.save
+end
+def do_cc(m, ts, matches)
+ m.cc = hash_addresses(matches['cc'])
+ m.save
+end
+def do_bcc(m, ts, matches)
+ m.bcc = hash_addresses(matches['bcc'])
+ m.save
+end
+def do_list(m, ts, matches)
+ m.is_list = true
+ m.save
+end
+def do_in_reply_to(m, ts, matches)
+ m.re_message_id = hash_addresses(matches['in-reply-to'])
+ m.save
+end
+def do_precedence(m, ts, matches)
+ if matches['precedence'] =~ /(bulk|list)/i
+ m.is_list = true
+ m.save
+ end
+end
+
+#
# the message was rejected, likely because milter scan thinks it is a virus.
# so we remove the record from the database
#
@@ -104,14 +182,19 @@ def do_purge_message(m, ts, matches)
end
def do_error(line)
+ puts
puts "ERROR: unmatched line!"
- puts " " + line
+ puts line
+ puts
end
+FROM_HOST = /(local|[A-Za-z0-9\.\-]+\[0.0.0.0\]);/
+
LINE_PARSE_MAP = {
'postfix/smtp' => {
- /to=<(?<to>.*?)>, (orig_to=<(?<orig_to>.*?)>, )?(?<relay>relay=.*?),.*status=sent/ => method(:do_sent),
- /status=sent/ => :error
+ /to=<(?<to>.*?)>, (orig_to=<(?<orig_to>.*?)>, )?(?<relay>relay=.*?), delay=(?<delay>[0-9\.]+), delays=(?<delays>[0-9\.\/]+), .*status=sent/ => method(:do_sent),
+ /delay=(?<delay>[0-9\.]+), delays=(?<delays>[0-9\.\/]+), .*status=(?<status>[a-z]+) / => method(:do_status),
+ /status=/ => :error
},
'postfix/qmgr' => {
/from=<(?<from>.*)>, size=(?<size>\d+),.*\(queue active\)/ => method(:do_queue),
@@ -119,6 +202,17 @@ LINE_PARSE_MAP = {
},
'postfix/cleanup' => {
/message-id=(?<message_id>.*)$/ => method(:do_message_id),
+ /info: header Subject: (?<subject>.*) from #{FROM_HOST}/i => method(:do_subject),
+ /info: header From: (?<from>.*) from #{FROM_HOST}/i => method(:do_from),
+ /info: header To: (?<to>.*) from #{FROM_HOST}/i => method(:do_to),
+ /info: header Cc: (?<cc>.*) from #{FROM_HOST}/i => method(:do_cc),
+ /info: header Bcc: (?<bcc>.*) from #{FROM_HOST}/i => method(:do_bcc),
+ /info: header In-Reply-To: (?<in-reply-to>.*) from #{FROM_HOST}/i => method(:do_in_reply_to),
+ /info: header Precedence: (?<precedence>.*) from #{FROM_HOST}/i => method(:do_precedence),
+ /info: header List-ID/i => method(:do_list),
+ /info: header Mailing-list/i => method(:do_list),
+ /info: header Date: (?<date>.*) from #{FROM_HOST}/i => method(:do_date),
+ /info: header / => :error,
/milter-reject/ => method(:do_purge_message),
// => :error
}
diff --git a/config/initializer.rb b/config/initializer.rb
index c8e5cca..6819d0f 100644
--- a/config/initializer.rb
+++ b/config/initializer.rb
@@ -5,11 +5,6 @@ require 'yaml'
require 'date'
require 'digest/hmac'
require 'active_record'
-#require 'active_support'
-
-if ARGV.grep(/--debug/)
- require 'byebug'
-end
CONFIG = YAML.load(
File.read(File.join(ROOT, 'config','config.yml'))
diff --git a/db/migrations/001_create_messages.rb b/db/migrations/001_create_messages.rb
index 9ff04e8..a618ebe 100644
--- a/db/migrations/001_create_messages.rb
+++ b/db/migrations/001_create_messages.rb
@@ -19,8 +19,11 @@ class CreateMessages < ActiveRecord::Migration
t.integer :spam_score
t.integer :subject_length
t.boolean :is_list, :default => false
- t.boolean :is_outgoing, :default => nil
- t.string :re_message_id, :default => nil
+ t.boolean :is_outgoing
+ t.string :re_message_id
+ t.string :status
+ t.float :delay
+ t.string :delays
end
add_index :messages, :message_id
end