Source

loganalyze / loganalyze.py

#!/usr/bin/env python

# Much of the coroutine stuff inspired and/or stolen from Chapter 6 of "Python
# Essential Reference, Fourth Edition."

import os
import sys
import re
import subprocess
import shlex
import readline
import traceback
import math

from datetime import datetime, timedelta
from optparse import OptionParser, OptParseError, OptionValueError

# XXX: Debug
import pdb
from pprint import pprint as pp

class Bucket(object):
    def __init__(self, bottom, top, value):
        self.bottom = bottom
        self.top = top
        self.value = value

    def __repr__(self):
        return "<Bucket: bottom: %s, top: %s, value: %s>" % (self.bottom, self.top, self.value)

class Aggregator(object):
    def __init__(self, log_analyzer):
        self._log_analyzer = log_analyzer

    def __call__(self, **kwargs):
        """
        Call the aggregator object, getting back a dictionary.
        """
        # NOTE: *args includes 'start' and 'end'
        for arg in kwargs.keys():
                if not hasattr(self, "_%s" % arg) or \
                   not getattr(self, "_%s" % arg) == kwargs[arg]:
                    break
        else:
            # We've already done the computation for this time period including all args, just
            # return what we've already done
            return self._cached_result

        self._cached_result = self._call(**kwargs)
        return self._cached_result

    def _call(self, **kwargs):
        for arg in kwargs.keys():
            setattr(self, "_%s" % arg, kwargs[arg])
        aggr = self._do_aggregation()
        return aggr


    def _do_aggregation(self):
        aggr = {}
        for datetimeobj, url, ms, ip in self._log_analyzer.log_data:
            if self._start <= datetimeobj <= self._end:
                aggr = self._aggregate(aggr, datetimeobj, url, ms, ip)
        return aggr

    def _total_seconds(self, timedeltaobj):
        return timedeltaobj.seconds + (timedeltaobj.days*24*3600)

class UrlAggregator(Aggregator):
    pass

class HitsPerUrl(UrlAggregator):
    # __call__ requires start, end keyword args
    def _aggregate(self, aggr, datetimeobj, url, ms, ip):
        hits_for_url = aggr.setdefault(url, 0)
        aggr[url] = hits_for_url + 1
        return aggr

class HitsPerUrlPerUnitTime(UrlAggregator):
    # __call__ requires start, end, step keyword args
    def _aggregate(self, aggr, datetimeobj, url, ms, ip):
        sub_aggr = aggr.setdefault(url, {'total_hits': 0})
        sub_aggr['total_hits'] += 1
        # Note that we are guaranteed that datetimeobj is between _start and _end
        seconds_offset_into_range = datetimeobj - self._start
        seconds_offset_into_range = self._total_seconds(seconds_offset_into_range)
        multiplier = seconds_offset_into_range / self._step
        bucket_start = self._start + timedelta(seconds=( self._step * (multiplier )))
        bucket_end   = self._start + timedelta(seconds=((self._step * (multiplier+1)) - 1))
        if bucket_end > self._end:
            bucket_end = self._end
        hits_for_bucket = sub_aggr.setdefault((bucket_start, bucket_end), 0)
        sub_aggr[(bucket_start, bucket_end)] = hits_for_bucket + 1
        return aggr

class HitsPerUrlByTimeToRenderLinear(UrlAggregator):
    # __call__ requires start, end, ms_step keyword args
    def _aggregate(self, aggr, datetimeobj, url, ms, ip):
        sub_aggr = aggr.setdefault(url, {'total_hits': 0})
        sub_aggr['total_hits'] += 1
        bucket_idx = ms / self._ms_step
        bucket_start = self._ms_step * bucket_idx
        bucket_end  = self._ms_step * (bucket_idx + 1)
        hits_for_bucket = sub_aggr.setdefault((bucket_start, bucket_end), 0)
        sub_aggr[(bucket_start, bucket_end)] = hits_for_bucket + 1
        return aggr

class IpAggregator(Aggregator):
    pass

class HitsByIP(IpAggregator):
    # __call__ requires start, end keyword args
    def _aggregate(self, aggr, datetimeobj, url, ms, ip):
        hits_for_ip = aggr.setdefault(ip, 0)
        aggr[ip] = hits_for_ip+1
        return aggr

class HitsByIPPerUnitTime(IpAggregator):
    # __call__ requires start, end, step keyword args
    def _aggregate(self, aggr, datetimeobj, url, ms, ip):
        sub_aggr = aggr.setdefault(ip, {'total_hits': 0})
        sub_aggr['total_hits'] += 1
        # Note that we are guaranteed that datetimeobj is between _start and _end
        seconds_offset_into_range = datetimeobj - self._start
        seconds_offset_into_range = self._total_seconds(seconds_offset_into_range)
        multiplier = seconds_offset_into_range / self._step
        bucket_start = self._start + timedelta(seconds=( self._step * (multiplier )))
        bucket_end   = self._start + timedelta(seconds=((self._step * (multiplier+1)) - 1))
        if bucket_end > self._end:
            bucket_end = self._end
        hits_for_bucket = sub_aggr.setdefault((bucket_start, bucket_end), 0)
        sub_aggr[(bucket_start, bucket_end)] = hits_for_bucket + 1
        return aggr

class HitsByIPByTimeToRenderLinear(IpAggregator):
    # __call__ requires start, end, ms_step keyword args
    def _aggregate(self, aggr, datetimeobj, url, ms, ip):
        sub_aggr = aggr.setdefault(ip, {'total_hits': 0})
        sub_aggr['total_hits'] += 1
        bucket_idx = ms / self._ms_step
        bucket_start = self._ms_step * bucket_idx
        bucket_end  = self._ms_step * (bucket_idx + 1)
        hits_for_bucket = sub_aggr.setdefault((bucket_start, bucket_end), 0)
        sub_aggr[(bucket_start, bucket_end)] = hits_for_bucket + 1
        return aggr

class LogAnalyzer(object):

    month_map = {
        "Jan": 1,
        "Feb": 2,
        "Mar": 3,
        "Apr": 4,
        "May": 5,
        "Jun": 6,
        "Jul": 7,
        "Aug": 8,
        "Sep": 9,
        "Oct": 10,
        "Nov": 11,
        "Dec": 12
    }

    def __init__(self):
        self.log_data = []
        self.start = None
        self.end = None
        self._setup_parser_do_set()
        self._setup_parser_do_topurls()
        self._setup_parser_do_toptalkers()

    def cmd_loop(self):
        while True:
            try:
                line = raw_input("\rstart:%s, end:%s >> " % (self.start, self.end))
                if not line or line.lstrip().rstrip() == '':
                    continue

                # TODO: This splitting is broken, i.e. nocmd causes exception
                cmd, args = line.split(' ', 1)

                for method in [ x for x in dir(self) if x.startswith('_do_')]:
                    method_for_cmd = "_do_%s" % cmd.lower()
                    if method_for_cmd == method:
                        # TODO: figure out the kwargs stuff
                        getattr(self, method)(*args.split(' '))
                        break
                else:
                    print "Unknown Command: %s" % line

            except EOFError, e:
                print ""
                break

            except OptParseError, e:
                print e

            except StandardError, e:
                traceback.print_exc()
                continue

    def parse_logfile(self, path):
        cmds = []
        if path.endswith(".gz"):
            cmds.append("gzip -dc %s" % path)
            cmds.append("grep 'request end'")
        else:
            cmds.append("grep 'request end' %s" % path)

        procs = []
        stdin = None
        for cmd in cmds:
            p = subprocess.Popen(shlex.split(cmd), stdin=stdin, stdout=subprocess.PIPE)
            stdin = p.stdout
            procs.append(p)

        for line in p.stdout:
            self._process_line(line)

        for p in procs:
            p.wait()

    def _process_line(self, line):
        year   = int(line[8:12])
        month  = self.month_map[line[4:7]]
        day    = int(line[1:3])
        hour   = int(line[13:15])
        minute = int(line[16:18])
        second = int(line[19:21])
        datetimeobj = datetime(year, month, day, hour, minute, second)

        line = self._fixup_urls(line)
        line_elements = line.split(' ')
        url = line_elements[8]
        ip = line_elements[11]
        ms = int(line_elements[9])

        # Record the data for later analysis
        self.log_data.append((datetimeobj, url, ms, ip))

        # Calculate the start of the time period for the logs we're dealing with
        if self.start:
            if datetimeobj < self.start:
                self.start = datetimeobj
        else:
            self.start = datetimeobj

        # Calculate the end of the time period for the logs we're dealing with
        if self.end:
            if datetimeobj > self.end:
                self.end = datetimeobj
        else:
            self.end = datetimeobj

    def _fixup_urls(self, line):
        line = re.sub('&uqid=jQuery[0-9_]+', '', line)
        line = re.sub('&ts=\d+', '', line)
        line = re.sub('&_=\d+', '', line)
        return line

    def _setup_parser_do_set(self):
        self._parser_do_set = OptionParser(usage="set [start|end] YYYY-dd-mm:HH:MM:SS")

    def _setup_parser_do_topurls(self):
        self._parser_do_topurls = OptionParser(usage="topurls [number of urls to display]")

    def _setup_parser_do_toptalkers(self):
        self._parser_do_toptalkers = OptionParser(usage="toptalkers [number of IPs to display]")

    def _do_set(self, *cmd_args):
        options, args = self._parser_do_set.parse_args(list(cmd_args))
        if len(args) != 2:
            raise OptionValueError("Wrong number of arguments for set command")
        what = args[0]
        if what not in ("start", "end"):
            raise OptionValueError("First argument to 'set' must be 'start' or 'end'")
        datetime_string = args[1]
        if not re.match('\d{4}-\d{2}-\d{2}:\d{2}:\d{2}:\d{2}', datetime_string):
            raise OptionValueError("Datetime must be specified as YYYY-dd-mm:HH:MM:SS")

        setattr(self, what, datetime(*[int(x) for x in re.split('[-:]', datetime_string)]))

    def _do_topurls_toptalkers_arg_checks(self, cmd, args):
        if len(args) != 1:
            raise OptionValueError("First and only argument to '%s' must be a number" % cmd)
        num = args[0]
        if not re.match('\d+', num):
            raise OptionValueError("First and only argument to '%s' must be a number" % cmd)
        return int(num)

    def _do_topurls(self, *cmd_args):
        options, args = self._parser_do_topurls.parse_args(list(cmd_args))
        num_urls = self._do_topurls_toptalkers_arg_checks('topurls', args)

        # setup a singleton of this aggregator for use
        if not hasattr(self, '_HitsPerUrl'):
            self._HitsPerUrl = HitsPerUrl(self)
        data = self._HitsPerUrl(start=self.start, end=self.end)
        self._display_topurls(data, num_urls)

    def _display_topurls(self, data, num_urls):
        sorted_keys = sorted(data.keys(), lambda a,b: cmp(data[a], data[b]))
        for url in sorted_keys[-num_urls:]:
            print "%-100s %10d" % (url, data[url])

    def _do_toptalkers(self, *cmd_args):
        options, args = self._parser_do_toptalkers.parse_args(list(cmd_args))
        num_talkers = self._do_topurls_toptalkers_arg_checks('toptalkers', args)

        # setup a singleton of this aggregator for use
        if not hasattr(self, '_HitsByIP'):
            self._HitsByIP = HitsByIP(self)
        data = self._HitsByIP(start=self.start, end=self.end)
        self._display_toptalkers(data, num_talkers)

    def _display_toptalkers(self, data, num_talkers):
        sorted_keys = sorted(data.keys(), lambda a,b: cmp(data[a], data[b]))
        for ip in sorted_keys[-num_talkers:]:
            print "%-16s %10d" % (ip, data[ip])

def main():
    log_analyzer = LogAnalyzer()
    for path in sys.argv[1:]:
        print "Parsing log file: %s  " % os.path.basename(path),
        start = datetime.now()
        log_analyzer.parse_logfile(path)    
        end = datetime.now()
        print "DONE (%s)" % (end - start)
    log_analyzer.cmd_loop()

if __name__ == "__main__":
#   import cProfile
#   cProfile.run('main()')
    main()

# vim: et sw=4 ts=4 softtabstop=4 foldmethod=expr tw=100