Commits

Chris Perl  committed f63c0ef

Work in progress, redoing the way this is setup to be able to unit test

  • Participants
  • Parent commits 3075fcd

Comments (0)

Files changed (1)

File loganalyze.py

 import shlex
 import readline
 import traceback
+import math
 
 from datetime import datetime, timedelta
 from optparse import OptionParser, OptionValueError
 import pdb
 from pprint import pprint as pp
 
-class UrlAggregator(dict):
-    def _sort(self):
-        url_list = self.keys()
-        url_list = sorted(url_list, 
-                          cmp=lambda a,b: cmp(self[a]['count'], self[b]['count']))
-        return url_list
-
-class TopUrls(UrlAggregator):
+class Aggregator(object):
     def __init__(self, log_analyzer):
         self._log_analyzer = log_analyzer
-        self._start = None
-        self._end = None
 
-    def __call__(self, start, end, num_urls):
-        num_urls = int(num_urls)
-        if self._start == start and self._end == end: 
-            self.output(num_urls)
+    def __call__(self, *args):
+        """
+        Call the aggregator object, getting back a dictionary.
+        """
+        # NOTE: *args includes 'start' and 'end'
+        for arg in args:
+                if not hasattr(self, "_%s" % arg) or \
+                   not getattr(self, "_%s" % arg) == arg:
+                    break
         else:
-            self.clear()
-            self._start = start
-            self._end = end
-            for datetimeobj, url, ms, ip in self._log_analyzer.log_data:
-                if start <= datetimeobj <= end:
-                    self.update(datetimeobj, url, ms, ip)
-            self.output(num_urls)
-
-    def update(self, datetimeobj, url, ms, ip):
-        url_dict = self.setdefault(url, {'count': 0})
-        url_dict['count'] += 1
-        
-    def output(self, num_urls):
-        url_list = self._sort()
-        if url_list:
-            # only print the top self._num_urls urls
-            for url in url_list[-num_urls:]:
-                print "%6d %-40s" % (self[url]['count'], url)
+            # We've already done the computation for this time period including all args, just
+            # return what we've already done
+            return self._cached_result
+
+        self._cached_result = self._call(*args)
+        return self._cached_result
+
+    def _do_aggregation(self):
+        aggr = {}
+        for datetimeobj, url, ms, ip in self._log_analyzer.log_data:
+            if self._start <= datetimeobj <= self._end:
+                aggr = self._aggregate(aggr, datetimeobj, url, ms, ip)
+        return aggr
+
+    def _num_buckets_for_time_period_with_step(self, time_period, step):
+        return int(math.ceil(float(time_period)/float(step)))
+
+    def _total_seconds(self, timedeltaobj):
+        return timedeltaobj.seconds + (timedeltaobj.days*24*3600)
+
+class UrlAggregator(Aggregator):
+    pass
+
+class HitsPerUrl(UrlAggregator):
+    def _call(self, start, end):
+        self._start = start
+        self._end = end
+        aggr = self._do_aggregation()
+        return aggr
+
+    def _aggregate(self, aggr, datetimeobj, url, ms, ip):
+        hits_for_url = aggr.setdefault(url, 0)
+        aggr[url] = hits_for_url+1
+        return aggr
+
+class HitsPerUrlPerUnitTime(UrlAggregator):
+    def _call(self, start, end, step):
+        self._start = start
+        self._end = end
+        self._step = step
+        self._time_period = self._end - self._start
+        aggr = self._do_aggregation()
+        return aggr
+
+    def _aggregate(self, aggr, datetimeobj, url, ms, ip):
+        sub_aggr_for_url = aggr.setdefault(url, {'total_hits': 0})
+        sub_aggr_for_url['total_hits'] += 1
+        seconds_offset_into_range = datetimeobj - self._start
+        seconds_offset_into_range = self._total_seconds(seconds_offset_into_range)
+        multiplier = seconds_offset_into_range / self._step
+        bucket_start = self._start + timedelta(seconds=( self._step * (multiplier )))
+        bucket_end   = self._start + timedelta(seconds=((self._step * (multiplier+1)) - 1))
+        hits_for_bucket = sub_aggr_for_url.setdefault((bucket_start, bucket_end), 0)
+        sub_aggr_for_url[(bucket_start, bucket_end)] = hits_for_bucket + 1
+        return aggr
+
+class IpAggregator(Aggregator):
+    pass
+
+class HitsPerIP(IpAggregator):
+    def _call(self, start, end):
+        self._start = start
+        self._end = end
+        aggr = self._do_aggregation()
+        return aggr
+
+    def _aggregate(self, aggr, datetimeobj, url, ms, ip):
+        hits_for_ip = aggr.setdefault(ip, 0)
+        aggr[ip] = hits_for_ip+1
+        return aggr
 
 class LinearQuantizeTimeOfDay(UrlAggregator):
     def __init__(self, log_analyzer):
                 print
 
 class LinearQuantizeSpeed(UrlAggregator):
-    def __init__(self, ms_step):
+    def __init__(self, log_analyzer):
+        self._log_analyzer = log_analyzer
+        self._start = None
+        self._end = None
+
+    def __call__(self, start, end, ms_step):
         self._ms_step = int(ms_step)
 
-    def add(self, datetimeobj, url, ip, ms):
+        if self._start == start and self._end == end:
+            self.output()
+        else:
+            self.clear()
+            self._start = start
+            self._end = end
+            for datetimeobj, url, ms, ip in self._log_analyzer.log_data:
+                if start <= datetimeobj <= end:
+                    self.update(datetimeobj, url, ms, ip)
+            self.output()
+
+    def update(self, datetimeobj, url, ms, ip):
         url_dict = self.setdefault(url, {'count': 0, 'buckets': []})
         url_dict['count'] += 1
 
         self.end = None
 
         self.aggr_map = {
-            "topurls": TopUrls(self),
-            "lquantize-timeofday": LinearQuantizeTimeOfDay(self),
         }
 
     def cmd_loop(self):