Commits

Andriy Kornatskyy  committed d0412e9

Introduced benchmark timer: intercepts a call in order to compute timing better.

  • Participants
  • Parent commits 0e749d0

Comments (0)

Files changed (1)

File src/wheezy/core/benchmark.py

 
 
 if PY2 and PY_MINOR < 6:  # pragma: nocover
-    # TODO:
-    timeit = lambda f, number: 1.0
+    import gc
+    from itertools import repeat
+    from time import time as default_timer
+
+    def timeit(f, number=1000000):
+        it = repeat(None, number)
+        e = gc.isenabled()
+        gc.disable()
+        try:
+            t0 = default_timer()
+            for i in it:
+                f()
+            t1 = default_timer()
+            return t1 - t0
+        finally:
+            if e:
+                gc.enable()
+
 else:  # pragma: nocover
     from timeit import timeit
+    from timeit import default_timer  # noqa
 
 
 class Benchmark(object):
         ...% ...rps ...% test_2
     """
 
-    def __init__(self, targets, number, warmup_number=None):
+    def __init__(self, targets, number, warmup_number=None, timer=None):
         """
             ``targets`` - a list of targets (callables) to be tested.
 
         self.targets = targets
         self.number = number
         self.warmup_number = warmup_number or max(int(number / 100), 10)
+        if timer is not None:
+            self.timer = timer
+            self.bench = self.bench_timer
 
     def bench(self, number):
         for target in self.targets:
             yield (target.__name__, timeit(target, number=number))
 
+    def bench_timer(self, number):
+        for target in self.targets:
+            self.timer.start()
+            timeit(target, number=number)
+            self.timer.stop()
+            yield (target.__name__, self.timer.timing)
+
     def run(self):
         # warm up
         list(self.bench(self.warmup_number))
             delta = base_relative / previous_relative - 1.0
             print("%7.1f%% %7drps %+5.1f%% %s" % (
                 base_relative * 100, rps, delta * 100, name))
+
+
+class Timer(object):
+    """ Intercept a call to given method in order to compute
+        timing.
+
+        >>> class A(object):
+        ...     def test(self):
+        ...         pass
+        >>> a = A()
+        >>> def test():
+        ...     a.test()
+        >>> b = Benchmark((test, ), 1000, timer=Timer(a, 'test'))
+        >>> b.report() # doctest: +ELLIPSIS
+        noname: 1 x 1000
+        baseline throughput change target
+          100.0% ...rps  +0.0% test
+    """
+
+    def __init__(self, target, name):
+        assert hasattr(target, name)
+        assert callable(getattr(target, name))
+        self.target = target
+        self.name = name
+
+    def start(self):
+        self.timing = 0.0
+        self.saved = saved = getattr(self.target, self.name)
+
+        def timing_wrapper(*args, **kwargs):
+            t0 = default_timer()
+            result = saved(*args, **kwargs)
+            t1 = default_timer()
+            self.timing += t1 - t0
+            return result
+        setattr(self.target, self.name, timing_wrapper)
+
+    def stop(self):
+        setattr(self.target, self.name, self.saved)