# twoq / thingpipe / mapping.py

 ``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171``` ```# -*- coding: utf-8 -*- '''thingpipe mapping mixins''' from time import sleep from copy import deepcopy from threading import local from operator import methodcaller from itertools import starmap, repeat, product, combinations, permutations from stuf.six import keys, values, items from thingpipe.compat import imap, ichain, xrange class RepeatMixin(local): '''repetition mixin''' def combinations(self, n): ''' repeat every combination for `n` of incoming things @param n: number of repetitions ''' with self._context(): return self._xtend(combinations(self._iterable, n)) def copy(self): '''copy each incoming thing''' with self._context(): return self._xtend(imap(deepcopy, self._iterable)) def product(self, n=1): ''' nested for each loops repeated `n` times @param n: number of repetitions (default: 1) ''' with self._context(): return self._xtend(product(*self._iterable, repeat=n)) def permutations(self, n): ''' repeat every permutation for every `n` of incoming things @param n: length of thing to permutate ''' with self._context(): return self._xtend(permutations(self._iterable, n)) def range(self, start, stop=0, step=1): ''' put sequence of numbers in incoming things @param start: number to start with @param stop: number to stop with (default: 0) @param step: number of steps to advance per iteration (default: 1) ''' with self._context(): return self._xtend( xrange(start, stop, step) if stop else xrange(start) ) def repeat(self, n): ''' repeat incoming things `n` times @param n: number of times to repeat ''' with self._context(): return self._xtend(repeat(tuple(self._iterable), n)) def times(self, n=None): ''' repeat call with incoming things `n` times @param n: repeat call n times on incoming things (default: None) ''' with self._context(): if n is None: return self._xtend(starmap( self._call, repeat(list(self._iterable)), )) return self._xtend(starmap( self._call, repeat(list(self._iterable), n), )) class MapMixin(local): '''mapping mixin''' def each(self, wait=0): ''' invoke call with passed arguments, keywords in incoming things @param wait: time in seconds (default: 0) ''' call = self._call if wait: def delay_each(x, y, wait=0, caller=None): sleep(wait) return caller(*x, **y) de = delay_each call_ = lambda x, y: de(x, y, wait, call) else: call_ = lambda x, y: call(*x, **y) with self._context(): return self._xtend(starmap(call_, self._iterable)) def invoke(self, name, wait=0): ''' invoke method `name` on each incoming thing with passed arguments, keywords but return incoming thing instead if method returns `None` @param name: name of method @param wait: time in seconds (default: 0) ''' caller = methodcaller(name, *self._args, **self._kw) def invoke(thing): results = caller(thing) return thing if results is None else results if wait: def invoke(x, wait=0): sleep(wait) results = caller(x) return x if results is None else results with self._context(): return self._xtend(imap(invoke, self._iterable)) def items(self): '''invoke call on each mapping to get key, value pairs''' with self._context(): return self._xtend(starmap( self._call, ichain(imap(items, self._iterable)) )) def keys(self): '''invoke call on each mapping to get keys''' with self._context(): return self._xtend(starmap( self._call, ichain(imap(keys, self._iterable)) )) def map(self, wait=0): ''' invoke call on each incoming thing @param wait: time in seconds (default: 0) ''' call_ = self._call if wait: def delay_map(x, wait=None, caller=None): sleep(wait) return caller(x) call = self._call call_ = lambda x: delay_map(x, wait, call) with self._context(): return self._xtend(imap(call_, self._iterable)) def starmap(self): '''invoke call on each sequence of incoming things''' with self._context(): return self._xtend(starmap(self._call, self._iterable)) def values(self): '''invoke call on each mapping to get values''' with self._context(): return self._xtend(starmap( self._call, ichain(imap(values, self._iterable)) )) ```
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.