#!/usr/bin/env ruby require_relative '../config/initializer' class Message < ActiveRecord::Base self.inheritance_column = 'disabled' end $input = nil def parse_command_line while ARGV.any? do case ARGV[0] when '-h' then usage when '--help' then usage when '--debug' then require 'byebug' ARGV.shift else if ARGV[0] && File.exist?(ARGV[0]) $input = File.open(ARGV[0]) end ARGV.shift end end $input ||= ARGF end def usage puts "USAGE: " puts " option 1: parse-email-logs [LOGFILE]" puts " option 2: cat log | parse-email-logs" exit(0) end def parse_timestamp(str) # e.g. May 20 20:17:14 DateTime.strptime(str, "%b %d %H:%M:%S") end def hash_addresses(str) str.split(',').map {|address| address = address.sub(/.*\"\']/, '') address.split('@').map {|segment| Digest::HMAC.hexdigest(segment, CONFIG['secret'], Digest::MD5) }.join('@') }.join(',') end def get_message(queue_id, timestamp) msg = Message.find_or_create_by(queue_id: queue_id) if msg.first_seen_at.nil? putc '.'; STDOUT.flush() msg.first_seen_at = parse_timestamp(timestamp) msg.save end return msg end # # if we see this: # relay=0.0.0.0[0.0.0.0]:25 # then the message was an incoming message # # if the queue id is the same, but the recipient is different # then duplicate the record. # def do_sent(m, ts, matches, line) recipient = hash_addresses(matches["to"]) return if m.recipient == recipient if m.recipient.nil? if matches['relay'] == "relay=0.0.0.0[0.0.0.0]:25" m.is_outgoing = false else m.is_outgoing = true end if m.is_outgoing? m.sent_at = m.first_seen_at m.received_at = parse_timestamp(ts) else m.sent_at = m.date m.received_at = m.first_seen_at end m.recipient = recipient m.orig_to = hash_addresses(matches["orig_to"]) if matches["orig_to"] m.delay = matches['delay'].to_f m.delays = matches['delays'] m.status = "sent" m.save else new_m = m.dup new_m.received_at = parse_timestamp(ts) new_m.recipient = recipient new_m.orig_to = hash_addresses(matches["orig_to"]) if matches["orig_to"] new_m.delay = matches['delay'].to_f new_m.delays = matches['delays'] new_m.status = "sent" new_m.save end end # # for status=x where x != sent # def do_status(m, ts, matches) m.delay = matches['delay'].to_f m.delays = matches['delays'] m.status = matches['status'] m.save end # # save the message size and the envelope sender # def do_queue(m, ts, matches) return if m.size != nil m.size = matches['size'].to_i m.sender = hash_addresses(matches['from']) m.save end # # save the message id # def do_message_id(m, ts, matches, line) return if m.message_id m_id = matches['message_id'].gsub(/[<>]/,'').strip m.message_id = hash_addresses(m_id) m.save end # # headers # def do_subject(m, ts, matches) m.subject_length = matches['subject'].length m.save end def do_date(m, ts, matches) m.date = matches['date'] m.save end def do_from(m, ts, matches) m.from = hash_addresses(matches['from']) m.save end def do_to(m, ts, matches) m.to = hash_addresses(matches['to']) m.save end def do_cc(m, ts, matches) m.cc = hash_addresses(matches['cc']) m.save end def do_bcc(m, ts, matches) m.bcc = hash_addresses(matches['bcc']) m.save end def do_list(m, ts, matches) m.is_list = true m.save end def do_in_reply_to(m, ts, matches) m.re_message_id = hash_addresses(matches['in-reply-to']) m.save end def do_precedence(m, ts, matches) if matches['precedence'] =~ /(bulk|list)/i m.is_list = true m.save end end # # the message was rejected, likely because milter scan thinks it is a virus. # so we remove the record from the database # def do_purge_message(m, ts, matches) m.destroy end def do_error(line) puts puts "ERROR: unmatched line!" puts line puts end FROM_HOST = /(local|[A-Za-z0-9\.\-]+\[0.0.0.0\]);/ LINE_PARSE_MAP = { 'postfix/smtp' => { /to=<(?.*?)>, (orig_to=<(?.*?)>, )?(?relay=.*?), delay=(?[0-9\.]+), delays=(?[0-9\.\/]+), .*status=sent/ => method(:do_sent), /delay=(?[0-9\.]+), delays=(?[0-9\.\/]+), .*status=(?[a-z]+) / => method(:do_status), /status=/ => :error }, 'postfix/qmgr' => { /from=<(?.*)>, size=(?\d+),.*\(queue active\)/ => method(:do_queue), /\(queue active\)/ => :error }, 'postfix/cleanup' => { /message-id=(?.*)$/ => method(:do_message_id), /info: header Subject: (?.*) from #{FROM_HOST}/i => method(:do_subject), /info: header From: (?.*) from #{FROM_HOST}/i => method(:do_from), /info: header To: (?.*) from #{FROM_HOST}/i => method(:do_to), /info: header Cc: (?.*) from #{FROM_HOST}/i => method(:do_cc), /info: header Bcc: (?.*) from #{FROM_HOST}/i => method(:do_bcc), /info: header In-Reply-To: (?.*) from #{FROM_HOST}/i => method(:do_in_reply_to), /info: header Precedence: (?.*) from #{FROM_HOST}/i => method(:do_precedence), /info: header List-ID/i => method(:do_list), /info: header Mailing-list/i => method(:do_list), /info: header Date: (?.*) from #{FROM_HOST}/i => method(:do_date), /info: header / => :error, /milter-reject/ => method(:do_purge_message), // => :error } } def process_line(line) splits = line.split(' ') timestamp = splits[0..2].join(' ') daemon = splits[4].split('[').first queue_id = splits[5].sub(':', '') message = splits[6..-1].join(' ') LINE_PARSE_MAP.fetch(daemon, {}).each do |re, method| match = re.match(line) next unless match if method == :error do_error(line) elsif !method.nil? msg = get_message(queue_id, timestamp) if method.arity == 3 method.call(msg, timestamp, match) else method.call(msg, timestamp, match, line) end end break end end def main parse_command_line start_time = Time.now start_msg = Message.count line_count = 0 Message.transaction do $input.each_line do |line| process_line(line) line_count += 1 end end end_time = Time.now end_msg = Message.count puts puts "FINISHED" puts " Time: %s minutes" % ((end_time - start_time).to_i / 60) puts "Records: %s" % (end_msg - start_msg) puts " Lines: %s" % line_count end main()