Commits

Chris Perl committed c1c3f8d

Reworking everything to make it interactive, only topurls works at the moment and not perfectly

Comments (0)

Files changed (1)

 import re
 import subprocess
 import shlex
+import readline
 
 from datetime import datetime, timedelta
 from optparse import OptionParser, OptionValueError
 import pdb
 from pprint import pprint as pp
 
-def coroutine(func):
-    """
-    A function to wrap coroutines, advancing them to their first yield
-    statement automatically so they are ready to have data .send()'d to
-    them.
-    """
-    def wrapper(*args, **kwargs):
-        gen = func(*args, **kwargs)
-        gen.next()
-        return gen
-    return wrapper
-
-@coroutine
-def open_and_filter(target):
-    while True:
-        path = (yield)
-
-        cmds = []
-        if path.endswith(".gz"):
-            cmds.append("gzip -dc %s" % path)
-            cmds.append("grep 'request end'")
-        else:
-            cmds.append("grep 'request end' %s" % path)
-
-        procs = []
-        stdin = None
-        for cmd in cmds:
-            p = subprocess.Popen(shlex.split(cmd), stdin=stdin, stdout=subprocess.PIPE)
-            stdin = p.stdout
-            procs.append(p)
-
-        for line in p.stdout:
-            target.send(line[:-1])
-
-        for p in procs:
-            p.wait()
-
-@coroutine
-def filter_time(target, start, end):
-    """
-    Expects to receive lines in the format that our internal appservers use,
-    filters them to ensure  they are within the given date range, inclusive,
-    then passes the line and the associated datetime object on to target.
-
-    @start: datetime object representing the start of the range
-    @end:   datetime object representing the end of the range
-    """
-    month_map = {
-            "Jan": 1,
-            "Feb": 2,
-            "Mar": 3,
-            "Apr": 4,
-            "May": 5,
-            "Jun": 6,
-            "Jul": 7,
-            "Aug": 8,
-            "Sep": 9,
-            "Oct": 10,
-            "Nov": 11,
-            "Dec": 12
-    }
-    while True:
-        line = (yield)
-        # Using slicing instead of regex's due to speed
-        year   = int(line[8:12])
-        month  = month_map[line[4:7]]
-        day    = int(line[1:3])
-        hour   = int(line[13:15])
-        minute = int(line[16:18])
-        second = int(line[19:21])
-
-        datetimeobj = datetime(year, month, day, hour, minute, second)
-        if start is not None and end is not None:
-            if datetimeobj >= start and datetimeobj <= end:
-                target.send((datetimeobj, line))
-        else:
-            target.send((datetimeobj, line))
-
-@coroutine
-def filter_url_fixups(target):
-    while True:
-        datetimeobj, line = (yield)
-        line = re.sub('&uqid=jQuery[0-9_]+', '', line)
-        line = re.sub('&ts=\d+', '', line)
-        line = re.sub('&_=\d+', '', line)
-        target.send((datetimeobj, line))
-
-@coroutine
-def dispatch(*aggrs):
-    while True:
-        datetimeobj, line = (yield)
-        line_elements = line.split(' ')
-        url = line_elements[8]
-        ip = line_elements[11]
-        ms = int(line_elements[9])
-        for aggr in aggrs:
-            aggr.add(datetimeobj, url, ip, ms)
-        
-@coroutine
-def printer():
-    while True:
-        line = (yield)
-        print line
-
-def date_validate(option, opt_str, value, parser):
-    """
-    Function to validate date options via optparse
-    """
-    if re.match('^\d{4}-\d{2}-\d{2}(?::\d{2}:\d{2}:\d{2})?$', value):
-        setattr(parser.values, option.dest, value)
-    else:
-        raise OptionValueError("Date must be in the form YYYY-mm-dd:HH:MM:SS")
-
 class UrlAggregator(dict):
     def _sort(self):
         url_list = self.keys()
                           cmp=lambda a,b: cmp(self[a]['count'], self[b]['count']))
         return url_list
 
-class TopUrl(UrlAggregator):
-    def __init__(self, num_urls):
-        self._num_urls = int(num_urls)
+class TopUrls(UrlAggregator):
+    def __init__(self, log_analyzer):
+        self._log_analyzer = log_analyzer
+        self._start = None
+        self._end = None
 
-    def add(self, datetimeobj, url, ip, ms):
+    def __call__(self, start, end, num_urls):
+        num_urls = int(num_urls)
+        if self._start == start and self._end == end: 
+            self.output(num_urls)
+        else:
+            self._start = start
+            self._end = end
+            for datetimeobj, url, ms, ip in self._log_analyzer.log_data:
+                if start <= datetimeobj <= end:
+                    self.update(datetimeobj, url, ms, ip)
+            self.output(num_urls)
+
+    def update(self, datetimeobj, url, ms, ip):
         url_dict = self.setdefault(url, {'count': 0})
         url_dict['count'] += 1
         
-    def output(self):
+    def output(self, num_urls):
         url_list = self._sort()
         if url_list:
             # only print the top self._num_urls urls
-            for url in url_list[-self._num_urls:]:
+            for url in url_list[-num_urls:]:
                 print "%6d %-40s" % (self[url]['count'], url)
 
 class LinearQuantizeTimeOfDay(UrlAggregator):
     def output(self):
         pass
 
+class LogAnalyzer(object):
+
+    month_map = {
+        "Jan": 1,
+        "Feb": 2,
+        "Mar": 3,
+        "Apr": 4,
+        "May": 5,
+        "Jun": 6,
+        "Jul": 7,
+        "Aug": 8,
+        "Sep": 9,
+        "Oct": 10,
+        "Nov": 11,
+        "Dec": 12
+    }
+
+
+    def __init__(self):
+        self.log_data = []
+        self.start = None
+        self.end = None
+
+        self.cmd_map = {
+            "topurls": TopUrls(self)
+        }
+
+    def cmd_loop(self):
+        while True:
+            try:
+                line = raw_input("\rLogAnalyzer> ")
+                if not line or line.lstrip().rstrip() == '':
+                    continue
+                for cmd in self.cmd_map.keys():
+                    func, args = line.split(' ', 1)
+                    if func.lower() == cmd:
+                        self.cmd_map[cmd](self.start, self.end, *args.split(' '))
+                        break
+                else:
+                    print "Unknown Command: %s" % line
+            except EOFError:
+                break
+
+    def parse_logfile(self, path):
+        cmds = []
+        if path.endswith(".gz"):
+            cmds.append("gzip -dc %s" % path)
+            cmds.append("grep 'request end'")
+        else:
+            cmds.append("grep 'request end' %s" % path)
+
+        procs = []
+        stdin = None
+        for cmd in cmds:
+            p = subprocess.Popen(shlex.split(cmd), stdin=stdin, stdout=subprocess.PIPE)
+            stdin = p.stdout
+            procs.append(p)
+
+        for line in p.stdout:
+            self._process_line(line)
+
+        for p in procs:
+            p.wait()
+
+    def _process_line(self, line):
+        year   = int(line[8:12])
+        month  = self.month_map[line[4:7]]
+        day    = int(line[1:3])
+        hour   = int(line[13:15])
+        minute = int(line[16:18])
+        second = int(line[19:21])
+        datetimeobj = datetime(year, month, day, hour, minute, second)
+
+        line = self._fixup_urls(line)
+        line_elements = line.split(' ')
+        url = line_elements[8]
+        ip = line_elements[11]
+        ms = int(line_elements[9])
+
+        # Record the data for later analysis
+        self.log_data.append((datetimeobj, url, ms, ip))
+
+        # Calculate the start of the time period for the logs we're dealing with
+        if self.start:
+            if datetimeobj < self.start:
+                self.start = datetimeobj
+        else:
+            self.start = datetimeobj
+
+        # Calculate the end of the time period for the logs we're dealing with
+        if self.end:
+            if datetimeobj > self.end:
+                self.end = datetimeobj
+        else:
+            self.end = datetimeobj
+
+    def _fixup_urls(self, line):
+        line = re.sub('&uqid=jQuery[0-9_]+', '', line)
+        line = re.sub('&ts=\d+', '', line)
+        line = re.sub('&_=\d+', '', line)
+        return line
+
 def main():
-    # parse date options
-    parser = OptionParser()
-    parser.add_option("-s", "--start",
-                      dest="start", action="callback",
-                      callback=date_validate, type="string",
-                      help="Start datetime for filtering (YYYY-mm-dd:HH:MM:SS)")
-    parser.add_option("-e", "--end",
-                      dest="end", action="callback",
-                      callback=date_validate, type="string",
-                      help="End datetime for filtering (YYYY-mm-dd:HH:MM:SS)")
-
-    # args should be a list of the files we want to inspect
-    options, args = parser.parse_args()
-
-    if options.start:
-        tmp = options.start.split('-')
-        tmp = tmp[:-1] + tmp[-1].split(':')
-        start = datetime(*[int(x) for x in tmp])
-    else:
-        start = None
-
-    if options.end:
-        tmp = options.end.split('-')
-        tmp = tmp[:-1] + tmp[-1].split(':')
-        end = datetime(*[int(x) for x in tmp])
-    else:
-        end = None
-
-
-    aggregators = []
-
-    operation = args.pop(0)
-    if operation == "topurls":
-        aggregators.append(TopUrl(args.pop(0)))
-    elif operation == "lquantize-timeofday":
-        aggregators.append(LinearQuantizeTimeOfDay(args.pop(0), start, end))
-    elif operation == "lquantize-speed":
-        aggregators.append(LinearQuantizeSpeed(args.pop(0)))
-    else:
-        print "Error, usage"
-        sys.exit(1)
-
-    o = open_and_filter(
-         filter_time(
-          filter_url_fixups(
-           dispatch(*aggregators)), start, end))
-
-    # start feeding the pipeline with data
-    for path in args:
-        o.send(path)
-
-    for aggr in aggregators:
-        aggr.output()
+    log_analyzer = LogAnalyzer()
+    for path in sys.argv[1:]:
+        print "Parsing log file: %s  " % os.path.basename(path),
+        start = datetime.now()
+        log_analyzer.parse_logfile(path)    
+        end = datetime.now()
+        print "DONE (%s)" % (end - start)
+    log_analyzer.cmd_loop()
 
 if __name__ == "__main__":
-    import cProfile
-#    cProfile.run('main()')
+#   import cProfile
+#   cProfile.run('main()')
     main()
 
 # vim: et sw=4 ts=4 softtabstop=4 foldmethod=expr tw=100