Commits

Lynn Rees  committed 8bcb607

- cleanup

  • Participants
  • Parent commits cd590c5

Comments (0)

Files changed (2)

File knife/_mixins.py

 GroupBy = namedtuple('Group', 'keys groups')
 MinMax = namedtuple('MinMax', 'min max')
 TrueFalse = namedtuple('TrueFalse', 'true false')
-
-
-def slicer(x, y, iz=islice, nx=next):
-    return nx(iz(x, y, None))
+slicer = lambda x, y: next(islice(x, y, None))
 
 
 class _CmpMixin(local):
 
     @memoize
-    def _all(self, a=all, m=map):
+    def _all(self):
         # invoke worker on each item to yield truth
-        return self._append(a(m(self._test, self._iterable)))
+        return self._append(all(map(self._test, self._iterable)))
 
     @memoize
-    def _any(self, a=any, m=map):
+    def _any(self):
         # invoke worker on each item to yield truth
-        return self._append(a(m(self._test, self._iterable)))
+        return self._append(any(map(self._test, self._iterable)))
 
     @memoize
-    def _difference(self, symmetric, rz=reduce, se=set):
+    def _difference(self, symmetric):
         if symmetric:
-            test = lambda x, y: se(x).symmetric_difference(y)
+            test = lambda x, y: set(x).symmetric_difference(y)
         else:
-            test = lambda x, y: se(x).difference(y)
-        return self._xtend(rz(test, self._iterable))
+            test = lambda x, y: set(x).difference(y)
+        return self._xtend(reduce(test, self._iterable))
 
-    def _intersection(self, rz=reduce, se=set):
+    def _intersection(self):
         return self._xtend(
-            rz(lambda x, y: se(x).intersection(y), self._iterable)
+            reduce(lambda x, y: set(x).intersection(y), self._iterable)
         )
 
-    def _union(self, rz=reduce, se=set):
-        return self._xtend(rz(lambda x, y: se(x).union(y), self._iterable))
+    def _union(self):
+        return self._xtend(
+            reduce(lambda x, y: set(x).union(y), self._iterable)
+        )
 
     @memoize
-    def _unique(self, nx=next, se=set, S=StopIteration):
+    def _unique(self):
         def unique(key, iterable):
-            seen = se()
+            seen = set()
             seenadd = seen.add
             try:
                 while 1:
-                    element = key(nx(iterable))
+                    element = key(next(iterable))
                     if element not in seen:
                         yield element
                         seenadd(element)
-            except S:
+            except StopIteration:
                 pass
         return self._xtend(unique(self._identity, self._iterable))
 
 
 class _MathMixin(local):
 
-    def _average(self, cnt=count, su=sum, td=truediv, t=tee):
-        i1, i2 = t(self._iterable)
-        return self._append(td(su(i1, 0.0), cnt(i2)))
+    def _average(self):
+        i1, i2 = tee(self._iterable)
+        return self._append(truediv(sum(i1, 0.0), count(i2)))
 
-    def _count(self, R=Counter, T=Count):
-        cnt = R(self._iterable).most_common
+    def _count(self):
+        cnt = Counter(self._iterable).most_common
         commonality = cnt()
-        return self._append(T(
+        return self._append(Count(
             # least common
             commonality[:-2:-1][0][0],
             # most common (mode)
         ))
 
     @memoize
-    def _max(self, mx=max):
-        return self._append(mx(self._iterable, key=self._identity))
+    def _max(self):
+        return self._append(max(self._iterable, key=self._identity))
 
-    def _median(
-        self, t=tee, sd=sorted, td=truediv, i=int, cnt=count, z=slicer,
-    ):
-        i1, i2 = t(sd(self._iterable))
-        result = td(cnt(i1) - 1, 2)
-        pint = i(result)
+    def _median(self):
+        i1, i2 = tee(sorted(self._iterable))
+        result = truediv(count(i1) - 1, 2)
+        pint = int(result)
         if result % 2 == 0:
-            return self._append(z(i2, pint))
-        i3, i4 = t(i2)
-        return self._append(td(z(i3, pint) + z(i4, pint + 1), 2))
+            return self._append(slicer(i2, pint))
+        i3, i4 = tee(i2)
+        return self._append(
+            truediv(slicer(i3, pint) + slicer(i4, pint + 1), 2)
+        )
 
-    def _minmax(self, mn=min, mx=max, t=tee, MM=MinMax):
-        i1, i2 = t(self._iterable)
-        return self._append(MM(mn(i1), mx(i2)))
+    def _minmax(self):
+        i1, i2 = tee(self._iterable)
+        return self._append(MinMax(min(i1), max(i2)))
 
-    def _range(self, d=deque, sd=sorted, nx=next, t=tee):
-        i1, i2 = t(sd(self._iterable))
-        return self._append(d(i1, maxlen=1).pop() - nx(i2))
+    def _range(self):
+        i1, i2 = tee(sorted(self._iterable))
+        return self._append(deque(i1, maxlen=1).pop() - next(i2))
 
     @memoize
-    def _min(self, mn=min):
-        return self._append(mn(self._iterable, key=self._identity))
+    def _min(self):
+        return self._append(min(self._iterable, key=self._identity))
 
     @memoize
-    def _sum(self, start, floats, su=sum, fs=fsum):
+    def _sum(self, start, floats):
         return self._append(
-            fs(self._iterable) if floats else su(self._iterable, start)
+            fsum(self._iterable) if floats else sum(self._iterable, start)
         )
 
 
 class _OrderMixin(local):
 
     @memoize
-    def _group(
-        self, G=GroupBy, S=StopIteration, g=groupby, nx=next, sd=sorted,
-        tu=tuple,
-    ):
+    def _group(self):
         def grouper(call, iterable):
             try:
-                it = g(sd(iterable, key=call), call)
+                it = groupby(sorted(iterable, key=call), call)
                 while 1:
-                    k, v = nx(it)
-                    yield G(k, tu(v))
-            except S:
+                    k, v = next(it)
+                    yield GroupBy(k, tuple(v))
+            except StopIteration:
                 pass
         return self._xtend(grouper(self._identity, self._iterable))
 
-    def _reverse(self, S=StopIteration, nx=next, rv=reversed, tu=tuple):
-        return self._xtend(rv(tu(self._iterable)))
+    def _reverse(self):
+        return self._xtend(reversed(tuple(self._iterable)))
 
-    def _shuffle(self, l=list, sf=shuffle):
-        iterable = l(self._iterable)
-        sf(iterable)
+    def _shuffle(self):
+        iterable = list(self._iterable)
+        shuffle(iterable)
         return self._xtend(iterable)
 
     @memoize
-    def _sort(self, sd=sorted):
-        return self._xtend(sd(self._iterable, key=self._identity))
+    def _sort(self):
+        return self._xtend(sorted(self._iterable, key=self._identity))
 
 
 class _RepeatMixin(local):
 
-    def _combinate(self, n, cb=combinations):
-        return self._xtend(cb(self._iterable, n))
+    def _combinate(self, n):
+        return self._xtend(combinations(self._iterable, n))
 
-    def _copy(self, dc=deepcopy, m=map):
-        return self._xtend(m(dc, self._iterable))
+    def _copy(self):
+        return self._xtend(map(deepcopy, self._iterable))
 
-    def _permute(self, n, pm=permutations):
-        return self._xtend(pm(self._iterable, n))
+    def _permute(self, n):
+        return self._xtend(permutations(self._iterable, n))
 
     @memoize
-    def _repeat(self, n, use, rt=repeat, sm=starmap, tu=tuple):
+    def _repeat(self, n, use):
         call = self._identity
         if use:
-            return self._xtend(sm(call, rt(tu(self._iterable), n)))
-        return self._xtend(rt(tu(self._iterable), n))
+            return self._xtend(starmap(call, repeat(tuple(self._iterable), n)))
+        return self._xtend(repeat(tuple(self._iterable), n))
 
 
 class _MapMixin(local):
 
     @memoize
-    def _argmap(self, curr, sm=starmap):
+    def _argmap(self, curr):
         call = self._identity
         if curr:
             def argmap(*args):
                 return call(*(args + self._args))
-            return self._xtend(sm(argmap, self._iterable))
-        return self._xtend(sm(call, self._iterable))
+            return self._xtend(starmap(argmap, self._iterable))
+        return self._xtend(starmap(call, self._iterable))
 
     @memoize
-    def _invoke(self, name, m=map, mc=methodcaller):
-        def invoke(thing, caller=mc(name, *self._args, **self._kw)):
+    def _invoke(self, name):
+        def invoke(thing, caller=methodcaller(name, *self._args, **self._kw)):
             read = caller(thing)
             return thing if read is None else read
-        return self._xtend(m(invoke, self._iterable))
+        return self._xtend(map(invoke, self._iterable))
 
     @memoize
-    def _kwargmap(self, curr, sm=starmap):
+    def _kwargmap(self, curr):
         call = self._identity
         if curr:
             def kwargmap(*params):
                 return call(*(args + self._args), **kwargs)
         else:
             kwargmap = lambda x, y: call(*x, **y)
-        return self._xtend(sm(kwargmap, self._iterable))
+        return self._xtend(starmap(kwargmap, self._iterable))
 
     @memoize
-    def _map(self, m=map):
-        return self._xtend(m(self._identity, self._iterable))
+    def _map(self):
+        return self._xtend(map(self._identity, self._iterable))
 
     @memoize
-    def _mapping(
-        self, key, value, ky=keys, it=items, vl=values,
-        sm=starmap, m=map, ci=chain.from_iterable, aget=attrgetter,
-    ):
+    def _mapping(self, key, value):
         if key:
-            return self._xtend(m(self._identity, ci(m(ky, self._iterable))))
+            return self._xtend(map(
+                self._identity, chain.from_iterable(map(keys, self._iterable))
+            ))
         elif value:
-            return self._xtend(m(self._identity, ci(m(vl, self._iterable))))
+            return self._xtend(map(self._identity, chain.from_iterable(
+                map(values, self._iterable)
+            )))
         call = (lambda x, y: (x, y)) if self._worker is None else self._worker
-        return self._xtend(sm(call, ci(m(it, self._iterable))))
+        return self._xtend(starmap(
+            call, chain.from_iterable(map(items, self._iterable)))
+        )
 
 
 class _FilterMixin(local):
 
     @memoize
-    def _attrs(
-        self, names, A=AttributeError, S=StopIteration, ag=attrgetter, nx=next
-    ):
-        def attrs(iterable, get=ag(*names)):
+    def _attrs(self, names):
+        def attrs(iterable, ):
             try:
+                get = attrgetter(*names)
+                nx = next
                 while 1:
                     try:
                         yield get(nx(iterable))
-                    except A:
+                    except AttributeError:
                         pass
-            except S:
+            except StopIteration:
                 pass
         return self._xtend(attrs(self._iterable))
 
     @memoize
-    def _duality(
-        self, TF=TrueFalse, f=filter, ff=filterfalse, tu=tuple, t=tee,
-    ):
-        truth, false = t(self._iterable)
+    def _duality(self):
+        truth, false = tee(self._iterable)
         call = self._test
-        return self._append(TF(tu(f(call, truth)), tu(ff(call, false))))
+        return self._append(TrueFalse(
+            tuple(filter(call, truth)), tuple(filterfalse(call, false))
+        ))
 
     @memoize
-    def _filter(self, false, f=filter, ff=filterfalse):
-        return self._xtend((ff if false else f)(self._worker, self._iterable))
+    def _filter(self, false):
+        return self._xtend(
+            (filterfalse if false else filter)(self._worker, self._iterable)
+        )
 
     @memoize
-    def _items(
-        self, key, ig=itemgetter, I=IndexError, K=KeyError, T=TypeError,
-        nx=next, S=StopIteration,
-    ):
-        def itemz(iterable, get=ig(*key)):
+    def _items(self, keys):
+        def itemz(iterable):
             try:
+                get = itemgetter(*keys)
+                nx = next
                 while 1:
                     try:
                         yield get(nx(iterable))
-                    except (I, K, T):
+                    except (IndexError, KeyError, TypeError):
                         pass
-            except S:
+            except StopIteration:
                 pass
         return self._xtend(itemz(self._iterable))
 
     @memoize
-    def _traverse(
-        self, invert, CM=ChainMap, OD=OrderedDict, S=StopIteration, cn=chain,
-        d=dir, f=filter, ff=filterfalse, ga=getattr, gm=getmro, ic=isclass,
-        ii=isinstance, nx=next, se=set, sn=selfname, vz=vars, it=iter,
-    ):
+    def _traverse(self, invert):
         if self._worker is None:
             test = lambda x: not x[0].startswith('__')
         else:
             test = self._identity
-        ifilter = ff if invert else f
+        ifilter = filterfalse if invert else filter
         def members(iterable):  # @IgnorePep8
-            mro = gm(iterable)
-            names = it(d(iterable))
-            beenthere = se()
+            mro = getmro(iterable)
+            names = iter(dir(iterable))
+            beenthere = set()
             adder = beenthere.add
             try:
+                OD = OrderedDict
+                vz = vars
+                cn = chain
+                ga = getattr
+                ic = isclass
+                nx = next
                 while 1:
                     name = nx(names)
                     # yes, it's really supposed to be a tuple
                 pass
         def traverse(iterable):  # @IgnorePep8
             try:
-                iterable = it(iterable)
+                iterable = iter(iterable)
+                OD = OrderedDict
+                sn = selfname
+                nx = next
                 while 1:
                     iterator = nx(iterable)
-                    chaining = CM()
+                    chaining = ChainMap()
                     chaining['classname'] = sn(iterator)
                     cappend = chaining.maps.append
                     for k, v in ifilter(test, members(iterator)):
                 pass
         return self._xtend(traverse(self._iterable))
 
-    def _mro(self, ci=chain.from_iterable, map=map, getmro=getmro):
-        return self._xtend(ci(map(getmro, self._iterable)))
+    def _mro(self):
+        return self._xtend(chain.from_iterable(map(getmro, self._iterable)))
 
-    def _members(
-        self, invert, ci=chain.from_iterable, map=map, getmro=members,
-        f=filter, ff=filterfalse,
-    ):
+    def _members(self, invert):
         if self._worker is None:
             test = lambda x: not x[0].startswith('__')
         else:
             test = self._identity
-        ifilter = ff if invert else f
-        return self._xtend(ifilter(test, ci(map(members, self._iterable))))
+        ifilter = filterfalse if invert else filter
+        return self._xtend(ifilter(
+            test, chain.from_iterable(map(members, self._iterable))
+        ))
 
 
 class _ReduceMixin(local):
 
-    def _flatten(
-        self, A=AttributeError, S=StopIteration, T=TypeError, nx=next,
-        st=isstring,
-    ):
+    def _flatten(self):
         def flatten(iterable):
+            nx = next
+            st = isstring
             next_ = iterable.__iter__()
             try:
                 while 1:
                             # do recur over other things
                             for j in flatten(item):
                                 yield j
-                    except (A, T):
+                    except (AttributeError, TypeError):
                         # does not recur
                         yield item
-            except S:
+            except StopIteration:
                 pass
         return self._xtend(flatten(self._iterable))
 
-    def _merge(self, ci=chain.from_iterable):
-        return self._xtend(ci(self._iterable))
+    def _merge(self):
+        return self._xtend(chain.from_iterable(self._iterable))
 
     @memoize
-    def _reduce(self, initial, reverse, rz=reduce):
+    def _reduce(self, initial, reverse):
         call = self._identity
         if reverse:
             if initial is None:
                 return self._append(
-                    rz(lambda x, y: call(y, x), self._iterable)
+                    reduce(lambda x, y: call(y, x), self._iterable)
                 )
             return self._append(
-                rz(lambda x, y: call(y, x), self._iterable, initial)
+                reduce(lambda x, y: call(y, x), self._iterable, initial)
             )
         if initial is None:
-            return self._append(rz(call, self._iterable))
-        return self._append(rz(call, self._iterable, initial))
+            return self._append(reduce(call, self._iterable))
+        return self._append(reduce(call, self._iterable, initial))
 
     def _zip(self, zip_=zip_longest):
         return self._xtend(zip_(*self._iterable))
 class _SliceMixin(local):
 
     @memoize
-    def _at(self, n, default, iz=islice, nx=next):
-        return self._append(nx(iz(self._iterable, n, None), default))
+    def _at(self, n, default):
+        return self._append(next(islice(self._iterable, n, None), default))
 
     @memoize
-    def _choice(self, cnt=count, iz=islice, nx=next, rr=randrange, t=tee):
-        i1, i2 = t(self._iterable)
-        return self._append(iz(i1, rr(0, cnt(i2)), None))
+    def _choice(self):
+        i1, i2 = tee(self._iterable)
+        return self._append(islice(i1, randrange(0, count(i2)), None))
 
     @memoize
-    def _dice(self, n, fill, zl=zip_longest):
+    def _dice(self, n, fill):
         return self._xtend(
-            zl(fillvalue=fill, *[self._iterable.__iter__()] * n)
+            zip_longest(fillvalue=fill, *[self._iterable.__iter__()] * n)
         )
 
     @memoize
-    def _first(self, n=0, df=deferiter, iz=islice):
-        return self._xtend(iz(self._iterable, n) if n else df(self._iterable))
+    def _first(self, n=0):
+        return self._xtend(
+            islice(self._iterable, n) if n else deferiter(self._iterable),
+        )
 
-    def _initial(self, cnt=count, iz=islice, t=tee):
-        i1, i2 = t(self._iterable)
-        return self._xtend(iz(i1, cnt(i2) - 1))
+    def _initial(self):
+        i1, i2 = tee(self._iterable)
+        return self._xtend(islice(i1, count(i2) - 1))
 
     @memoize
-    def _last(self, n, cnt=count, d=deque, df=deferfunc, iz=islice, t=tee):
+    def _last(self, n):
         if n:
-            i1, i2 = t(self._iterable)
-            return self._xtend(iz(i1, cnt(i2) - n, None))
-        return self._xtend(df(d(self._iterable, maxlen=1).pop))
+            i1, i2 = tee(self._iterable)
+            return self._xtend(islice(i1, count(i2) - n, None))
+        return self._xtend(deferfunc(deque(self._iterable, maxlen=1).pop))
 
     def _rest(self, iz=islice):
         return self._xtend(iz(self._iterable, 1, None))
 
     @memoize
-    def _sample(self, n, t=tee, z=slice, rr=randrange, m=map, cnt=count):
-        i1, i2 = t(self._iterable)
-        length = cnt(i1)
-        return self._xtend(m(lambda x: z(x, rr(0, length)), t(i2, n)))
+    def _sample(self, n):
+        i1, i2 = tee(self._iterable)
+        length = count(i1)
+        return self._xtend(
+            map(lambda x: slice(x, randrange(0, length)), tee(i2, n))
+        )
 
-    def _slice(self, start, stop, step, iz=islice):
+    def _slice(self, start, stop, step):
         if stop and step:
-            return self._xtend(iz(self._iterable, start, stop, step))
+            return self._xtend(islice(self._iterable, start, stop, step))
         elif stop:
-            return self._xtend(iz(self._iterable, start, stop))
-        return self._xtend(iz(self._iterable, start))
+            return self._xtend(islice(self._iterable, start, stop))
+        return self._xtend(islice(self._iterable, start))

File knife/base.py

 '''base knife mixins'''
 
 from threading import local
+from functools import partial
 
 from stuf.six import tounicode, tobytes
 
         self._kw = kw
         return self
 
+    def partial(self, worker, *args, **kw):
+        self._worker = partial(worker, *args, **kw)
+        return self
+
     def pattern(self, pattern, type='parse', flags=0):
         '''
         Compile search `pattern` for use as :meth:`worker`.