1. Chris Perl
  2. loganalyze

Source

loganalyze / loganalyze.py

#!/usr/bin/env python

# Much of the coroutine stuff inspired and/or stolen from Chapter 6 of "Python
# Essential Reference, Fourth Edition."

import os
import sys
import re
import subprocess
import shlex

from datetime import datetime, timedelta
from optparse import OptionParser, OptionValueError

# XXX: Debug
import pdb
from pprint import pprint as pp

def coroutine(func):
    """
    A function to wrap coroutines, advancing them to their first yield
    statement automatically so they are ready to have data .send()'d to
    them.
    """
    def wrapper(*args, **kwargs):
        gen = func(*args, **kwargs)
        gen.next()
        return gen
    return wrapper

@coroutine
def open_and_filter(target):
    while True:
        path = (yield)

        cmds = []
        if path.endswith(".gz"):
            cmds.append("gzip -dc %s" % path)
            cmds.append("grep 'request end'")
        else:
            cmds.append("grep 'request end' %s" % path)

        procs = []
        stdin = None
        for cmd in cmds:
            p = subprocess.Popen(shlex.split(cmd), stdin=stdin, stdout=subprocess.PIPE)
            stdin = p.stdout
            procs.append(p)

        for line in p.stdout:
            target.send(line[:-1])

        for p in procs:
            p.wait()

@coroutine
def filter_time(target, start, end):
    """
    Expects to receive lines in the format that our internal appservers use,
    filters them to ensure  they are within the given date range, inclusive,
    then passes the line and the associated datetime object on to target.

    @start: datetime object representing the start of the range
    @end:   datetime object representing the end of the range
    """
    month_map = {
            "Jan": 1,
            "Feb": 2,
            "Mar": 3,
            "Apr": 4,
            "May": 5,
            "Jun": 6,
            "Jul": 7,
            "Aug": 8,
            "Sep": 9,
            "Oct": 10,
            "Nov": 11,
            "Dec": 12
    }
    while True:
        line = (yield)
        # Using slicing instead of regex's due to speed
        year   = int(line[8:12])
        month  = month_map[line[4:7]]
        day    = int(line[1:3])
        hour   = int(line[13:15])
        minute = int(line[16:18])
        second = int(line[19:21])

        datetimeobj = datetime(year, month, day, hour, minute, second)
        if start is not None and end is not None:
            if datetimeobj >= start and datetimeobj <= end:
                target.send((datetimeobj, line))
        else:
            target.send((datetimeobj, line))

@coroutine
def filter_url_fixups(target):
    while True:
        datetimeobj, line = (yield)
        line = re.sub('&uqid=jQuery[0-9_]+', '', line)
        line = re.sub('&ts=\d+', '', line)
        line = re.sub('&_=\d+', '', line)
        target.send((datetimeobj, line))

@coroutine
def dispatch(*aggrs):
    while True:
        datetimeobj, line = (yield)
        line_elements = line.split(' ')
        url = line_elements[8]
        ip = line_elements[11]
        ms = int(line_elements[9])
        for aggr in aggrs:
            aggr.add(datetimeobj, url, ip, ms)
        
@coroutine
def printer():
    while True:
        line = (yield)
        print line

def date_validate(option, opt_str, value, parser):
    """
    Function to validate date options via optparse
    """
    if re.match('^\d{4}-\d{2}-\d{2}(?::\d{2}:\d{2}:\d{2})?$', value):
        setattr(parser.values, option.dest, value)
    else:
        raise OptionValueError("Date must be in the form YYYY-mm-dd:HH:MM:SS")

class UrlAggregator(dict):
    def _sort(self):
        url_list = self.keys()
        url_list = sorted(url_list, 
                          cmp=lambda a,b: cmp(self[a]['count'], self[b]['count']))
        return url_list

class TopUrl(UrlAggregator):
    def __init__(self, num_urls):
        self._num_urls = int(num_urls)

    def add(self, datetimeobj, url, ip, ms):
        url_dict = self.setdefault(url, {'count': 0})
        url_dict['count'] += 1
        
    def output(self):
        url_list = self._sort()
        if url_list:
            # only print the top self._num_urls urls
            for url in url_list[-self._num_urls:]:
                print "%6d %-40s" % (self[url]['count'], url)

class LinearQuantizeTimeOfDay(UrlAggregator):
    def __init__(self, period, start, end):
        self._time_period = end - start
        self._period = int(period)
        self._num_buckets = (self._time_period.seconds + (self._time_period.days*24*3600)) / self._period
        self._start = start
        self._end = end

    def add(self, datetimeobj, url, ip, ms):
        url_dict = self.setdefault(url, {'count': 0, 
                                         'buckets': [0 for x in range(self._num_buckets)]})
        url_dict['count'] += 1
        for bucket_idx in range(self._num_buckets):
            bucket_start = self._start + timedelta(seconds=self._period*bucket_idx)
            bucket_end = self._start + timedelta(seconds=self._period*(bucket_idx+1)-1)
            if bucket_start <= datetimeobj <= bucket_end:
                url_dict['buckets'][bucket_idx] += 1

    def output(self):
        url_list = self._sort()
        if url_list:
            # only want distributions for the top 5 urls
            for url in url_list[-5:]:
                print "%-60s %6d\n" % (url, self[url]['count'])
                for bucket_idx in range(self._num_buckets):
                    bucket = self[url]['buckets'][bucket_idx]
                    bucket_start = self._start + timedelta(seconds=self._period*bucket_idx)
                    bucket_end = self._start + timedelta(seconds=(self._period*(bucket_idx+1))-1)
                    print "\t%s - %s: %d" % (bucket_start, bucket_end, bucket)
                print

class LinearQuantizeSpeed(UrlAggregator):
    def __init__(self, ms_step):
        self._ms_step = int(ms_step)

    def add(self, datetimeobj, url, ip, ms):
        url_dict = self.setdefault(url, {'count': 0, 'buckets': []})
        url_dict['count'] += 1

        bucket_idx = ms / self._ms_step
        num_buckets = bucket_idx + 1
        bucket_list_len = len(url_dict['buckets'])
        if bucket_list_len < num_buckets:
            url_dict['buckets'].extend([0 for x in range(num_buckets-bucket_list_len)])

        url_dict['buckets'][bucket_idx] += 1

    def output(self):
        url_list = self._sort()
        if url_list:
            # only want distributions for the top 5 urls
            for url in url_list[-5:]:
                print "%-60s\n" % (url)
                url_dict = self[url]
                for bucket_idx in range(len(url_dict['buckets'])):
                    print "%5d ms: %d" % ((bucket_idx+1)*self._ms_step, url_dict['buckets'][bucket_idx])
                print
                
class QuantizeSpeed(UrlAggregator):
    def __init__(self):
        pass

    def add(self, datetimeobj, url, ip, ms):
        pass

    def output(self):
        pass

def main():
    # parse date options
    parser = OptionParser()
    parser.add_option("-s", "--start",
                      dest="start", action="callback",
                      callback=date_validate, type="string",
                      help="Start datetime for filtering (YYYY-mm-dd:HH:MM:SS)")
    parser.add_option("-e", "--end",
                      dest="end", action="callback",
                      callback=date_validate, type="string",
                      help="End datetime for filtering (YYYY-mm-dd:HH:MM:SS)")

    # args should be a list of the files we want to inspect
    options, args = parser.parse_args()

    if options.start:
        tmp = options.start.split('-')
        tmp = tmp[:-1] + tmp[-1].split(':')
        start = datetime(*[int(x) for x in tmp])
    else:
        start = None

    if options.end:
        tmp = options.end.split('-')
        tmp = tmp[:-1] + tmp[-1].split(':')
        end = datetime(*[int(x) for x in tmp])
    else:
        end = None


    aggregators = []

    operation = args.pop(0)
    if operation == "topurls":
        aggregators.append(TopUrl(args.pop(0)))
    elif operation == "lquantize-timeofday":
        aggregators.append(LinearQuantizeTimeOfDay(args.pop(0), start, end))
    elif operation == "lquantize-speed":
        aggregators.append(LinearQuantizeSpeed(args.pop(0)))
    else:
        print "Error, usage"
        sys.exit(1)

    o = open_and_filter(
         filter_time(
          filter_url_fixups(
           dispatch(*aggregators)), start, end))

    # start feeding the pipeline with data
    for path in args:
        o.send(path)

    for aggr in aggregators:
        aggr.output()

if __name__ == "__main__":
    import cProfile
#    cProfile.run('main()')
    main()

# vim: et sw=4 ts=4 softtabstop=4 foldmethod=expr tw=100