Snippets
Created by
Jesse Almanrode
last modified
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 | # coding=utf-8
""" A Python module for creating thread pools.
"""
# Imports
from __future__ import print_function
from builtins import range
from queue import Queue
from types import FunctionType
import sys
import threading
import time
# Private variables
__author__ = 'Jesse Almanrode'
class ThreadPool(object):
""" Create a pool of threads that can process an iterable
:param size: Number of threads to spawn
:return: <ThreadPool> Object
"""
def __init__(self, size=10, blocking=True):
self.size = int(size)
self.results = None
self.blocking = blocking
self._inqueue = None
self._outqueue = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.blocking is False:
return self.__join()
def map_extended(self, target, iterable, args=(), kwargs={}):
""" Run function on every item in iterable. If extra args or kwargs are passed they are passed to function.
:param target: Function to run
:param iterable: List or tuple
:param args: Extra args to pass
:param kwargs: Extra kwargs to pass
:return: List of results from target(for i in iterable)
"""
if isinstance(target, FunctionType) is False:
raise TypeError('target must be of type: ' + str(FunctionType))
if isinstance(iterable, (list, tuple)):
self._inqueue = Queue(maxsize=len(iterable))
for item in iterable:
self._inqueue.put(item)
elif isinstance(iterable, Queue):
self._inqueue = iterable
else:
raise TypeError('iterable must be of type: ' + str(list) + ', ' + str(tuple) + ', or ' + str(Queue))
if isinstance(args, (tuple, list)) is False:
raise TypeError('args must be of type: ' + str(list) + ' or ' + str(tuple))
if isinstance(kwargs, dict) is False:
raise TypeError('kwargs must be of instance: ' + str(dict))
self._outqueue = Queue(maxsize=len(iterable))
self.results = list()
if self.size == 0:
self._worker(target, self._inqueue, self._outqueue, *args, **kwargs)
else:
if self.size > self._inqueue.qsize():
self.size = self._inqueue.qsize()
_args = [target, self._inqueue, self._outqueue]
_args.extend(args)
for t in range(self.size):
t = threading.Thread(target=self._worker, args=_args, kwargs=kwargs)
t.daemon = True
t.start()
if self.blocking:
return self.__join()
def join(self):
""" Wait for non-blocking threads to complete.
:return: List of results from target(for i in iterable)
"""
while self._inqueue.empty() is False:
time.sleep(1)
self._inqueue.join()
while self._outqueue.empty() is False:
self.results.append(self._outqueue.get())
return self.results
def map(self, target, iterable):
""" Run function on every item in iterable in a pool of threads
:param target: Function to run
:param iterable: List or tuple
:return: List of results from target(for i in iterable)
"""
return self.__map_extended(target, iterable)
@staticmethod
def _worker(target, inqueue, outqueue, *args, **kwargs):
""" Private worker to remove requirement for working with Queues in defined functions
"""
while inqueue.empty() is False:
task = inqueue.get()
outqueue.put(target(task, *args, **kwargs))
inqueue.task_done()
return outqueue
__map_extended = map_extended
__join = join
|
Comments (0)
You can clone a snippet to your computer for local editing. Learn more.