Commits

Eric Larson committed 529e78a

Initial commit

  • Participants

Comments (0)

Files changed (2)

+from __future__ import with_statement
+
+import thread, threading
+import contextlib
+import random
+import time
+
+from Queue import Queue
+
+# taken from:
+#  http://jessenoller.com/2009/02/03/get-with-the-program-as-contextmanager-completely-different/
+class ThreadPool(object):
+    def __init__(self, workers, workerClass, pool=None):
+        self.myq = Queue()
+        self.workers = workers
+        self.workerClass = workerClass
+        self.pool = pool or []
+
+    def __enter__(self):
+        # On entering, start all the workers, who will block trying to
+        # get work off the queue
+        for i in range(self.workers):
+            self.pool.append(self.workerClass(self.myq))
+        for i in self.pool:
+            i.start()
+        return self.myq
+
+    def __exit__(self, type, value, traceback):
+        # Now, shut down the pool once all work is done
+        for i in self.pool:
+            self.myq.put('STOP')
+        for i in self.pool:
+            i.join()
+
+
+class Pool(object):
+    def __init__(self, factory, args=None, kwargs=None, cleanup=None, min=0):
+        self.pool = []
+        self.swimmers = {}
+        self.args = args or tuple()
+        self.kwargs = kwargs or {}
+        self.factory = factory
+        self.cleanup = cleanup
+        self._lock = threading.Semaphore()
+        for i in xrange(min):
+            self.pool.append(self._create())
+
+    def _create(self):
+        return self.factory(*self.args, **self.kwargs)
+
+    def _get(self):
+            return self.swimmers[id]
+
+    @contextlib.contextmanager
+    def get(self):
+        with self._lock:
+            id = thread.get_ident()
+            if id not in self.swimmers:
+                if self.pool:
+                    self.swimmers[id] = self.pool.pop()
+                else:
+                    self.swimmers[id] = self._create()
+            yield self.swimmers[id]
+            
+        with self._lock:
+            id = thread.get_ident()
+            if self.swimmers.get(id):
+                if self.cleanup:
+                    self.cleanup(self.swimmers[id])
+                else:
+                    self.pool.append(self.swimmers[id])
+                    del self.swimmers[id]
+
+
+class WithPool(object):
+    def __init__(self, *args, **kw):
+        self.close = False
+        if kw.get('close'):
+            self.close = close
+        self.pool = Pool(*args, **kw)
+
+    def __enter__(self):
+        return self.pool
+
+    def __exit__(self, *args, **kw):
+        if self.close:
+            for conn in pool.pool:
+                self.close(conn)
+
+
+class MockThread(threading.Thread):
+
+    def __init__(self, pool, name, indent=0):
+        threading.Thread.__init__(self)
+        self.pool = pool
+        self.name = name
+        self.indent = '\t'.join(['|' for i in xrange(0, indent)])
+
+    def m(self, *s):
+        print '%s %s' % (str(self.indent), ''.join(map(str, s)))
+
+    def run(self):
+        with self.pool.get() as conn:
+            waiting = random.randint(1, 3)
+            self.m('got connection')
+            self.m('using connection in ', self.name)
+            time.sleep(waiting)
+            conn(self.indent, ' hello world')
+        return
+
+
+class MockConn(object):
+    def __init__(self, name):
+        self.name = name
+    
+    def __call__(self, *args):
+        print ''.join(args)
+
+
+if __name__ == '__main__':
+    tp = Pool(MockConn, ['eric'], min=0)
+    workers = [MockThread(tp, x, x) for x in range(0, 10)]
+    for i, w in enumerate(workers):
+        w.start()
+from setuptools import setup, find_packages
+import sys, os
+
+setup(
+	name='Poodle',
+	version='0.1',
+	author='Eric Larson <eric@ionrock.org>',
+	packages=find_packages(exclude=['ez_setup', 'examples', 'tests']),
+)