loganalyze / loganalyze.py

#!/usr/bin/env python

# Much of the coroutine stuff inspired and/or stolen from Chapter 6 of "Python
# Essential Reference, Fourth Edition."

import os
import sys
import re
import subprocess
import shlex
import readline
import traceback
import math

from datetime import datetime, timedelta
from optparse import OptionParser, OptionValueError

# XXX: Debug
import pdb
from pprint import pprint as pp

class Bucket(object):
    def __init__(self, bottom, top, value):
        self.bottom = bottom
        self.top = top
        self.value = value

    def __repr__(self):
        return "<Bucket: bottom: %s, top: %s, value: %s>" % (self.bottom, self.top, self.value)

class Aggregator(object):
    def __init__(self, log_analyzer):
        self._log_analyzer = log_analyzer

    def __call__(self, **kwargs):
        """
        Call the aggregator object, getting back a dictionary.
        """
        # NOTE: *args includes 'start' and 'end'
        for arg in kwargs.keys():
                if not hasattr(self, "_%s" % arg) or \
                   not getattr(self, "_%s" % arg) == kwargs[arg]:
                    break
        else:
            # We've already done the computation for this time period including all args, just
            # return what we've already done
            return self._cached_result

        self._cached_result = self._call(**kwargs)
        return self._cached_result

    def _call(self, **kwargs):
        for arg in kwargs.keys():
            setattr(self, "_%s" % arg, kwargs[arg])
        aggr = self._do_aggregation()
        return aggr


    def _do_aggregation(self):
        aggr = {}
        for datetimeobj, url, ms, ip in self._log_analyzer.log_data:
            if self._start <= datetimeobj <= self._end:
                aggr = self._aggregate(aggr, datetimeobj, url, ms, ip)
        return aggr

    def _total_seconds(self, timedeltaobj):
        return timedeltaobj.seconds + (timedeltaobj.days*24*3600)

class UrlAggregator(Aggregator):
    pass

class HitsPerUrl(UrlAggregator):
    # __call__ requires start, end keyword args
    def _aggregate(self, aggr, datetimeobj, url, ms, ip):
        hits_for_url = aggr.setdefault(url, 0)
        aggr[url] = hits_for_url + 1
        return aggr

class HitsPerUrlPerUnitTime(UrlAggregator):
    # __call__ requires start, end, step keyword args
    def _aggregate(self, aggr, datetimeobj, url, ms, ip):
        sub_aggr = aggr.setdefault(url, {'total_hits': 0})
        sub_aggr['total_hits'] += 1
        # Note that we are guaranteed that datetimeobj is between _start and _end
        seconds_offset_into_range = datetimeobj - self._start
        seconds_offset_into_range = self._total_seconds(seconds_offset_into_range)
        multiplier = seconds_offset_into_range / self._step
        bucket_start = self._start + timedelta(seconds=( self._step * (multiplier )))
        bucket_end   = self._start + timedelta(seconds=((self._step * (multiplier+1)) - 1))
        if bucket_end > self._end:
            bucket_end = self._end
        hits_for_bucket = sub_aggr.setdefault((bucket_start, bucket_end), 0)
        sub_aggr[(bucket_start, bucket_end)] = hits_for_bucket + 1
        return aggr

class HitsPerUrlByTimeToRenderLinear(UrlAggregator):
    # __call__ requires start, end, ms_step keyword args
    def _aggregate(self, aggr, datetimeobj, url, ms, ip):
        sub_aggr = aggr.setdefault(url, {'total_hits': 0})
        sub_aggr['total_hits'] += 1
        bucket_idx = ms / self._ms_step
        bucket_start = self._ms_step * bucket_idx
        bucket_end  = self._ms_step * (bucket_idx + 1)
        hits_for_bucket = sub_aggr.setdefault((bucket_start, bucket_end), 0)
        sub_aggr[(bucket_start, bucket_end)] = hits_for_bucket + 1
        return aggr

class IpAggregator(Aggregator):
    pass

class HitsByIP(IpAggregator):
    # __call__ requires start, end keyword args
    def _aggregate(self, aggr, datetimeobj, url, ms, ip):
        hits_for_ip = aggr.setdefault(ip, 0)
        aggr[ip] = hits_for_ip+1
        return aggr

class HitsByIPPerUnitTime(IpAggregator):
    # __call__ requires start, end, step keyword args
    def _aggregate(self, aggr, datetimeobj, url, ms, ip):
        sub_aggr = aggr.setdefault(ip, {'total_hits': 0})
        sub_aggr['total_hits'] += 1
        # Note that we are guaranteed that datetimeobj is between _start and _end
        seconds_offset_into_range = datetimeobj - self._start
        seconds_offset_into_range = self._total_seconds(seconds_offset_into_range)
        multiplier = seconds_offset_into_range / self._step
        bucket_start = self._start + timedelta(seconds=( self._step * (multiplier )))
        bucket_end   = self._start + timedelta(seconds=((self._step * (multiplier+1)) - 1))
        if bucket_end > self._end:
            bucket_end = self._end
        hits_for_bucket = sub_aggr.setdefault((bucket_start, bucket_end), 0)
        sub_aggr[(bucket_start, bucket_end)] = hits_for_bucket + 1
        return aggr

class HitsByIPPerByTimeToRenderLinear(IpAggregator):
    # __call__ requires start, end, ms_step keyword args
    def _aggregate(self, aggr, datetimeobj, url, ms, ip):
        sub_aggr = aggr.setdefault(ip, {'total_hits': 0})
        sub_aggr['total_hits'] += 1
        bucket_idx = ms / self._ms_step
        bucket_start = self._ms_step * bucket_idx
        bucket_end  = self._ms_step * (bucket_idx + 1)
        hits_for_bucket = sub_aggr.setdefault((bucket_start, bucket_end), 0)
        sub_aggr[(bucket_start, bucket_end)] = hits_for_bucket + 1
        return aggr

class LinearQuantizeSpeed(UrlAggregator):
    def __init__(self, log_analyzer):
        self._log_analyzer = log_analyzer
        self._start = None
        self._end = None

    def __call__(self, start, end, ms_step):
        self._ms_step = int(ms_step)

        if self._start == start and self._end == end:
            self.output()
        else:
            self.clear()
            self._start = start
            self._end = end
            for datetimeobj, url, ms, ip in self._log_analyzer.log_data:
                if start <= datetimeobj <= end:
                    self.update(datetimeobj, url, ms, ip)
            self.output()

    def update(self, datetimeobj, url, ms, ip):
        url_dict = self.setdefault(url, {'count': 0, 'buckets': []})
        url_dict['count'] += 1

        bucket_idx = ms / self._ms_step
        num_buckets = bucket_idx + 1
        bucket_list_len = len(url_dict['buckets'])
        if bucket_list_len < num_buckets:
            url_dict['buckets'].extend([0 for x in range(num_buckets-bucket_list_len)])

        url_dict['buckets'][bucket_idx] += 1

    def output(self):
        url_list = self._sort()
        if url_list:
            # only want distributions for the top 5 urls
            for url in url_list[-5:]:
                print "%-60s\n" % (url)
                url_dict = self[url]
                for bucket_idx in range(len(url_dict['buckets'])):
                    print "%5d ms: %d" % ((bucket_idx+1)*self._ms_step, url_dict['buckets'][bucket_idx])
                print
                
class QuantizeSpeed(UrlAggregator):
    def __init__(self):
        pass

    def add(self, datetimeobj, url, ip, ms):
        pass

    def output(self):
        pass

class LogAnalyzer(object):

    month_map = {
        "Jan": 1,
        "Feb": 2,
        "Mar": 3,
        "Apr": 4,
        "May": 5,
        "Jun": 6,
        "Jul": 7,
        "Aug": 8,
        "Sep": 9,
        "Oct": 10,
        "Nov": 11,
        "Dec": 12
    }

    def __init__(self):
        self.log_data = []
        self.start = None
        self.end = None

        self.aggr_map = {
        }

    def cmd_loop(self):
        while True:
            try:
                line = raw_input("\rstart:%s, end:%s >> " % (self.start, self.end))
                if not line or line.lstrip().rstrip() == '':
                    continue

                # TODO: This splitting is broken, i.e. nocmd causes exception
                func, args = line.split(' ', 1)
                if func.startswith('\\'):
                        # internal command, not for an aggregator
                        # i.e. setting start and end times
                        func = func[1:]
                        method = "_%s" % func
                        if hasattr(self, method):
                            getattr(self, method)(*args.split(' '))
                            continue

                for cmd in self.aggr_map.keys():
                    if func.lower() == cmd:
                        self.aggr_map[cmd](self.start, self.end, *args.split(' '))
                        break
                else:
                    print "Unknown Command: %s" % line

            except EOFError, e:
                print ""
                break

            except StandardError, e:
                traceback.print_exc()
                continue

    def parse_logfile(self, path):
        cmds = []
        if path.endswith(".gz"):
            cmds.append("gzip -dc %s" % path)
            cmds.append("grep 'request end'")
        else:
            cmds.append("grep 'request end' %s" % path)

        procs = []
        stdin = None
        for cmd in cmds:
            p = subprocess.Popen(shlex.split(cmd), stdin=stdin, stdout=subprocess.PIPE)
            stdin = p.stdout
            procs.append(p)

        for line in p.stdout:
            self._process_line(line)

        for p in procs:
            p.wait()

    def _process_line(self, line):
        year   = int(line[8:12])
        month  = self.month_map[line[4:7]]
        day    = int(line[1:3])
        hour   = int(line[13:15])
        minute = int(line[16:18])
        second = int(line[19:21])
        datetimeobj = datetime(year, month, day, hour, minute, second)

        line = self._fixup_urls(line)
        line_elements = line.split(' ')
        url = line_elements[8]
        ip = line_elements[11]
        ms = int(line_elements[9])

        # Record the data for later analysis
        self.log_data.append((datetimeobj, url, ms, ip))

        # Calculate the start of the time period for the logs we're dealing with
        if self.start:
            if datetimeobj < self.start:
                self.start = datetimeobj
        else:
            self.start = datetimeobj

        # Calculate the end of the time period for the logs we're dealing with
        if self.end:
            if datetimeobj > self.end:
                self.end = datetimeobj
        else:
            self.end = datetimeobj

    def _fixup_urls(self, line):
        line = re.sub('&uqid=jQuery[0-9_]+', '', line)
        line = re.sub('&ts=\d+', '', line)
        line = re.sub('&_=\d+', '', line)
        return line

    def _set(self, what, arg):
        if what == "start":
            self.start = datetime(*[ int(x) for x in re.split('[-:]', arg)])
        elif what == "end":
            self.end = datetime(*[int(x) for x in re.split('[-:]', arg)])
        else:
            pass
        

def main():
    log_analyzer = LogAnalyzer()
    for path in sys.argv[1:]:
        print "Parsing log file: %s  " % os.path.basename(path),
        start = datetime.now()
        log_analyzer.parse_logfile(path)    
        end = datetime.now()
        print "DONE (%s)" % (end - start)
    log_analyzer.cmd_loop()

if __name__ == "__main__":
#   import cProfile
#   cProfile.run('main()')
    main()

# vim: et sw=4 ts=4 softtabstop=4 foldmethod=expr tw=100
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.