Snippets
Created by
Jesse Almanrode
last modified
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 | # coding=utf-8 """ A Python module for creating thread pools. """ # Imports from __future__ import print_function from builtins import range from queue import Queue from types import FunctionType import sys import threading import time # Private variables __author__ = 'Jesse Almanrode' class ThreadPool(object): """ Create a pool of threads that can process an iterable :param size: Number of threads to spawn :return: <ThreadPool> Object """ def __init__(self, size=10, blocking=True): self.size = int(size) self.results = None self.blocking = blocking self._inqueue = None self._outqueue = None def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): if self.blocking is False: return self.__join() def map_extended(self, target, iterable, args=(), kwargs={}): """ Run function on every item in iterable. If extra args or kwargs are passed they are passed to function. :param target: Function to run :param iterable: List or tuple :param args: Extra args to pass :param kwargs: Extra kwargs to pass :return: List of results from target(for i in iterable) """ if isinstance(target, FunctionType) is False: raise TypeError('target must be of type: ' + str(FunctionType)) if isinstance(iterable, (list, tuple)): self._inqueue = Queue(maxsize=len(iterable)) for item in iterable: self._inqueue.put(item) elif isinstance(iterable, Queue): self._inqueue = iterable else: raise TypeError('iterable must be of type: ' + str(list) + ', ' + str(tuple) + ', or ' + str(Queue)) if isinstance(args, (tuple, list)) is False: raise TypeError('args must be of type: ' + str(list) + ' or ' + str(tuple)) if isinstance(kwargs, dict) is False: raise TypeError('kwargs must be of instance: ' + str(dict)) self._outqueue = Queue(maxsize=len(iterable)) self.results = list() if self.size == 0: self._worker(target, self._inqueue, self._outqueue, *args, **kwargs) else: if self.size > self._inqueue.qsize(): self.size = self._inqueue.qsize() _args = [target, self._inqueue, self._outqueue] _args.extend(args) for t in range(self.size): t = threading.Thread(target=self._worker, args=_args, kwargs=kwargs) t.daemon = True t.start() if self.blocking: return self.__join() def join(self): """ Wait for non-blocking threads to complete. :return: List of results from target(for i in iterable) """ while self._inqueue.empty() is False: time.sleep(1) self._inqueue.join() while self._outqueue.empty() is False: self.results.append(self._outqueue.get()) return self.results def map(self, target, iterable): """ Run function on every item in iterable in a pool of threads :param target: Function to run :param iterable: List or tuple :return: List of results from target(for i in iterable) """ return self.__map_extended(target, iterable) @staticmethod def _worker(target, inqueue, outqueue, *args, **kwargs): """ Private worker to remove requirement for working with Queues in defined functions """ while inqueue.empty() is False: task = inqueue.get() outqueue.put(target(task, *args, **kwargs)) inqueue.task_done() return outqueue __map_extended = map_extended __join = join |
Comments (0)
You can clone a snippet to your computer for local editing. Learn more.