Commits

Andriy Kornatskyy committed 9939606

Added lockout quota (the lockout counter is increased on each success).

Comments (0)

Files changed (2)

src/wheezy/caching/lockout.py

         """
         def guard_wrapper(ctx, *args, **kwargs):
             succeed = func(ctx, *args, **kwargs)
-            key_prefix = self.key_prefix
             if succeed:
-                keys = [key_prefix + c.key_func(ctx)
-                        for c in self.counters if c.reset]
-                key_prefix = 'lock:' + key_prefix
-                keys.extend([key_prefix + c.key_func(ctx)
-                             for c in self.counters if c.reset])
-                keys and self.cache.delete_multi(keys, 0, '', self.namespace)
+                self.reset(ctx)
             else:
-                for c in self.counters:
-                    key = key_prefix + c.key_func(ctx)
-                    max_try = self.cache.add(
-                        key, 1, c.period, self.namespace
-                    ) and 1 or self.cache.incr(key, 1, self.namespace)
-                    #print("%s ~ %d" % (key, max_try))
-                    if max_try >= c.count:
-                        self.cache.delete(key, 0, self.namespace)
-                        self.cache.add('lock:' + key, 1,
-                                       c.duration, self.namespace)
-                        c.alert and c.alert(ctx, self.name, c)
+                self.incr(ctx)
             return succeed
         return guard_wrapper
 
+    def quota(self, func):
+        """ A quota decorator is applied to a `func` which returns a
+            boolean indicating success or failure. Each success is a
+            subject to increase counter.
+        """
+        def quota_wrapper(ctx, *args, **kwargs):
+            succeed = func(ctx, *args, **kwargs)
+            if succeed:
+                self.incr(ctx)
+            return succeed
+        return quota_wrapper
+
     def forbid_locked(self, wrapped=None, action=None):
         """ A decorator that forbids access (by a call to `forbid_action`)
             to `func` once the counter threshold is reached (lock is set).
             return decorate
         else:
             return decorate(wrapped)
+
+    def reset(self, ctx):
+        """ Removes locks for counters that support reset.
+        """
+        key_prefix = self.key_prefix
+        keys = [key_prefix + c.key_func(ctx)
+                for c in self.counters if c.reset]
+        key_prefix = 'lock:' + key_prefix
+        keys.extend([key_prefix + c.key_func(ctx)
+                     for c in self.counters if c.reset])
+        keys and self.cache.delete_multi(keys, 0, '', self.namespace)
+
+    def incr(self, ctx):
+        """ Increments lockout counters for given context.
+        """
+        key_prefix = self.key_prefix
+        for c in self.counters:
+            key = key_prefix + c.key_func(ctx)
+            max_try = self.cache.add(
+                key, 1, c.period, self.namespace
+            ) and 1 or self.cache.incr(key, 1, self.namespace)
+            #print("%s ~ %d" % (key, max_try))
+            if max_try >= c.count:
+                self.cache.delete(key, 0, self.namespace)
+                self.cache.add('lock:' + key, 1,
+                               c.duration, self.namespace)
+                c.alert and c.alert(ctx, self.name, c)

src/wheezy/caching/tests/test_lockout.py

     def do_action(self):
         return self.action_result
 
+    @lockout.forbid_locked
+    def action3(self):
+        if self.do_action3():
+            return 'show ok'
+        else:
+            return 'show error'
+
+    @lockout.quota
+    def do_action3(self):
+        return self.action_result
+
 
 # region: test case
 
         for i in range(4):
             assert 'show error' == s.action2()
         assert 'show captcha' == s.action2()
+
+    def test_quota(self):
+        s = MyService()
+        s.user_id = 'u1q'
+        s.user_ip = 'ip1q'
+        for i in range(10):
+            assert 'show error' == s.action3()
+        assert [] == alerts
+        del alerts[:]
+
+        s.action_result = True
+        for i in range(4):
+            assert 'show ok' == s.action3()
+        assert ['ignore: action'] == alerts
+        assert 'forbidden' == s.action3(), 'lock by id/ip'