Chris Perl avatar Chris Perl committed cd56ae7

Initial commit

Comments (0)

Files changed (1)

+#!/usr/bin/env python
+
+# Much of the coroutine stuff inspired and/or stolen from Chapter 6 of "Python
+# Essential Reference, Fourth Edition."
+
+import os
+import sys
+import re
+import subprocess
+import shlex
+
+from datetime import datetime, timedelta
+from optparse import OptionParser, OptionValueError
+
+# XXX: Debug
+import pdb
+from pprint import pprint as pp
+
+def coroutine(func):
+    """
+    A function to wrap coroutines, advancing them to their first yield
+    statement automatically so they are ready to have data .send()'d to
+    them.
+    """
+    def wrapper(*args, **kwargs):
+        gen = func(*args, **kwargs)
+        gen.next()
+        return gen
+    return wrapper
+
+@coroutine
+def open_and_filter(target):
+    while True:
+        path = (yield)
+
+        cmds = []
+        if path.endswith(".gz"):
+            cmds.append("gzip -dc %s" % path)
+            cmds.append("grep 'request end'")
+        else:
+            cmds.append("grep 'request end' %s" % path)
+
+        procs = []
+        stdin = None
+        for cmd in cmds:
+            p = subprocess.Popen(shlex.split(cmd), stdin=stdin, stdout=subprocess.PIPE)
+            stdin = p.stdout
+            procs.append(p)
+
+        for line in p.stdout:
+            target.send(line[:-1])
+
+        for p in procs:
+            p.wait()
+
+@coroutine
+def filter_time(target, start, end):
+    """
+    Expects to receive lines in the format that our internal appservers use,
+    filters them to ensure  they are within the given date range, inclusive,
+    then passes the line and the associated datetime object on to target.
+
+    @start: datetime object representing the start of the range
+    @end:   datetime object representing the end of the range
+    """
+    month_map = {
+            "Jan": 1,
+            "Feb": 2,
+            "Mar": 3,
+            "Apr": 4,
+            "May": 5,
+            "Jun": 6,
+            "Jul": 7,
+            "Aug": 8,
+            "Sep": 9,
+            "Oct": 10,
+            "Nov": 11,
+            "Dec": 12
+    }
+    while True:
+        line = (yield)
+        # Using slicing instead of regex's due to speed
+        year   = int(line[8:12])
+        month  = month_map[line[4:7]]
+        day    = int(line[1:3])
+        hour   = int(line[13:15])
+        minute = int(line[16:18])
+        second = int(line[19:21])
+
+        datetimeobj = datetime(year, month, day, hour, minute, second)
+        if start is not None and end is not None:
+            if datetimeobj >= start and datetimeobj <= end:
+                target.send((datetimeobj, line))
+        else:
+            target.send((datetimeobj, line))
+
+@coroutine
+def filter_url_fixups(target):
+    while True:
+        datetimeobj, line = (yield)
+        line = re.sub('&uqid=jQuery[0-9_]+', '', line)
+        line = re.sub('&ts=\d+', '', line)
+        line = re.sub('&_=\d+', '', line)
+        target.send((datetimeobj, line))
+
+@coroutine
+def dispatch(*aggrs):
+    while True:
+        datetimeobj, line = (yield)
+        line_elements = line.split(' ')
+        url = line_elements[8]
+        ip = line_elements[11]
+        ms = int(line_elements[9])
+        for aggr in aggrs:
+            aggr.add(datetimeobj, url, ip, ms)
+        
+@coroutine
+def printer():
+    while True:
+        line = (yield)
+        print line
+
+def date_validate(option, opt_str, value, parser):
+    """
+    Function to validate date options via optparse
+    """
+    if re.match('^\d{4}-\d{2}-\d{2}(?::\d{2}:\d{2}:\d{2})?$', value):
+        setattr(parser.values, option.dest, value)
+    else:
+        raise OptionValueError("Date must be in the form YYYY-mm-dd:HH:MM:SS")
+
+class UrlAggregator(dict):
+    def _sort(self):
+        url_list = self.keys()
+        url_list = sorted(url_list, 
+                          cmp=lambda a,b: cmp(self[a]['count'], self[b]['count']))
+        return url_list
+
+class TopUrl(UrlAggregator):
+    def __init__(self, num_urls):
+        self._num_urls = int(num_urls)
+
+    def add(self, datetimeobj, url, ip, ms):
+        url_dict = self.setdefault(url, {'count': 0})
+        url_dict['count'] += 1
+        
+    def output(self):
+        url_list = self._sort()
+        if url_list:
+            # only print the top self._num_urls urls
+            for url in url_list[-self._num_urls:]:
+                print "%6d %-40s" % (self[url]['count'], url)
+
+class LinearQuantizeTimeOfDay(UrlAggregator):
+    def __init__(self, period, start, end):
+        self._time_period = end - start
+        self._period = int(period)
+        self._num_buckets = (self._time_period.seconds + (self._time_period.days*24*3600)) / self._period
+        self._start = start
+        self._end = end
+
+    def add(self, datetimeobj, url, ip, ms):
+        url_dict = self.setdefault(url, {'count': 0, 
+                                         'buckets': [0 for x in range(self._num_buckets)]})
+        url_dict['count'] += 1
+        for bucket_idx in range(self._num_buckets):
+            bucket_start = self._start + timedelta(seconds=self._period*bucket_idx)
+            bucket_end = self._start + timedelta(seconds=self._period*(bucket_idx+1)-1)
+            if bucket_start <= datetimeobj <= bucket_end:
+                url_dict['buckets'][bucket_idx] += 1
+
+    def output(self):
+        url_list = self._sort()
+        if url_list:
+            # only want distributions for the top 5 urls
+            for url in url_list[-5:]:
+                print "%-60s %6d\n" % (url, self[url]['count'])
+                for bucket_idx in range(self._num_buckets):
+                    bucket = self[url]['buckets'][bucket_idx]
+                    bucket_start = self._start + timedelta(seconds=self._period*bucket_idx)
+                    bucket_end = self._start + timedelta(seconds=(self._period*(bucket_idx+1))-1)
+                    print "\t%s - %s: %d" % (bucket_start, bucket_end, bucket)
+                print
+
+class LinearQuantizeSpeed(UrlAggregator):
+    def __init__(self, ms_step):
+        self._ms_step = int(ms_step)
+
+    def add(self, datetimeobj, url, ip, ms):
+        url_dict = self.setdefault(url, {'count': 0, 'buckets': []})
+        url_dict['count'] += 1
+
+        bucket_idx = ms / self._ms_step
+        num_buckets = bucket_idx + 1
+        bucket_list_len = len(url_dict['buckets'])
+        if bucket_list_len < num_buckets:
+            url_dict['buckets'].extend([0 for x in range(num_buckets-bucket_list_len)])
+
+        url_dict['buckets'][bucket_idx] += 1
+
+    def output(self):
+        url_list = self._sort()
+        if url_list:
+            # only want distributions for the top 5 urls
+            for url in url_list[-5:]:
+                print "%-60s\n" % (url)
+                url_dict = self[url]
+                for bucket_idx in range(len(url_dict['buckets'])):
+                    print "%5d ms: %d" % ((bucket_idx+1)*self._ms_step, url_dict['buckets'][bucket_idx])
+                print
+                
+class QuantizeSpeed(UrlAggregator):
+    def __init__(self):
+        pass
+
+    def add(self, datetimeobj, url, ip, ms):
+        pass
+
+    def output(self):
+        pass
+
+def main():
+    # parse date options
+    parser = OptionParser()
+    parser.add_option("-s", "--start",
+                      dest="start", action="callback",
+                      callback=date_validate, type="string",
+                      help="Start datetime for filtering (YYYY-mm-dd:HH:MM:SS)")
+    parser.add_option("-e", "--end",
+                      dest="end", action="callback",
+                      callback=date_validate, type="string",
+                      help="End datetime for filtering (YYYY-mm-dd:HH:MM:SS)")
+
+    # args should be a list of the files we want to inspect
+    options, args = parser.parse_args()
+
+    if options.start:
+        tmp = options.start.split('-')
+        tmp = tmp[:-1] + tmp[-1].split(':')
+        start = datetime(*[int(x) for x in tmp])
+    else:
+        start = None
+
+    if options.end:
+        tmp = options.end.split('-')
+        tmp = tmp[:-1] + tmp[-1].split(':')
+        end = datetime(*[int(x) for x in tmp])
+    else:
+        end = None
+
+
+    aggregators = []
+
+    operation = args.pop(0)
+    if operation == "topurls":
+        aggregators.append(TopUrl(args.pop(0)))
+    elif operation == "lquantize-timeofday":
+        aggregators.append(LinearQuantizeTimeOfDay(args.pop(0), start, end))
+    elif operation == "lquantize-speed":
+        aggregators.append(LinearQuantizeSpeed(args.pop(0)))
+    else:
+        print "Error, usage"
+        sys.exit(1)
+
+    o = open_and_filter(
+         filter_time(
+          filter_url_fixups(
+           dispatch(*aggregators)), start, end))
+
+    # start feeding the pipeline with data
+    for path in args:
+        o.send(path)
+
+    for aggr in aggregators:
+        aggr.output()
+
+if __name__ == "__main__":
+    import cProfile
+#    cProfile.run('main()')
+    main()
+
+# vim: et sw=4 ts=4 softtabstop=4 foldmethod=expr tw=100
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.