1. dan mackinlay
  2. bubble-economy


bubble-economy / utils.py

#!/usr/bin/env python
# encoding: utf-8

Created by dan mackinlay on 2010-10-18.
Copyright (c) 2010 __MyCompanyName__. All rights reserved.
# from decorator import decorator
import numpy as np
import subprocess
import shlex
from pprint import pprint
from inspect import getargvalues, stack
from pprint import pprint
from collections import defaultdict
import random

class Bunch(dict):
    picklable struct class from http://code.activestate.com/recipes/52308/
    Allows convenient attribute-style access of data to avoid square-brace-itis
    def __init__(self, **kw):
        dict.__init__(self, kw)
        self.__dict__ = self
    def __getstate__(self):
        return self
    def __setstate__(self, state):
        self.__dict__ = self
    def __str__(self):
        state = ["%s=%r" % (attribute, value)
                 for (attribute, value)
                 in self.__dict__.items()]
        return '\n'.join(state)

def eps(arr):
    """return a suitable value of epsilon for the float type of the given
    return np.finfo(arr.dtype).eps
def run_process(plain_old_command_string):
    """run a command given in a string and return the output"""
    return subprocess.Popen(

def get_code_revision():
    """return revision number of hg tip"""
    return run_process("hg tip --template {node}")

def fract(a):
    """shorthand for 'fractional part of'"""
    return np.remainder(a,1.0)
def pp(val):
    """handy pretty printer"""
    return val
# #memoisation business from the decorator documentation
# # http://micheles.googlecode.com/hg/decorator/documentation.html
# def _memoize(func, *args, **kw):
#     if kw: # frozenset is used to ensure hashability
#         key = args, frozenset(kw.iteritems())
#     else:
#         key = args
#     cache = func.cache # attributed added by memoize
#     if key in cache:
#         return cache[key]
#     else:
#         cache[key] = result = func(*args, **kw)
#         return result
# def memoize(f):
#     f.cache = {}
#     return decorator(_memoize, f)

def get_arguments():
        """Returns tuple containing dictionary of calling function's
           named arguments and a list of calling function's unnamed
           positional arguments.
           Thanks Kelley Yancey http://kbyanc.blogspot.com/2007/07/python-aggregating-function-arguments.html
        posname, kwname, args = getargvalues(stack()[1][0])[-3:]
        posargs = args.pop(posname, [])
        args.update(args.pop(kwname, []))
        return args, posargs

def get_kw_arguments():
        """Returns a dictionary of calling function's
           named arguments.
           Based on get_arguments, above
        posname, kwname, args = getargvalues(stack()[1][0])[-3:]
        posargs = args.pop(posname, [])
        args.update(args.pop(kwname, []))
        return args

def deep_dict_get(d, path, destructive=False):
    """takes a dict and a list and gets the contents of dict with that many
    nested addresses down as are in the address. If destructive is set,
    additionally remove the value from its parent.

    Oh, i can't explain. shut up and watch."""

    for p in path:
            d_ = d
            d = d[p]
        except KeyError:
            raise KeyError(
              'invalid key "%s" in path %s' % (
                p, repr(path)
        except Exception, exc:
            raise exc.__class__(
              '"%s" at key %s in path %s' % (
                str(exc), p, repr(path)

    if destructive:
    return d

def deep_dict_set(d, path, val):
    """Opposite of deep_dict_get"""
    #we use the first element a lot, so give it a name
    p = path[0]
    if len(path) == 1:
        d[p] = val
        d.setdefault(p, {})
        deep_dict_set(d[p], path[1:], val)

def make_iterable(a):
    if hasattr(a, '__iter__'): return a
    return [a]

def addict(dicts,
    """add dictionaries in a useful way - if nested members are scalar, keep
    both in a list. if they are lists, concatenate the lists."""
    if collatabletest is None:
        collatabletest = lambda s: isinstance(s,
          (float, int, np.ndarray, list))
    if path is None:
        path = []
    if dicts:
        proto_d = deep_dict_get(dicts[0], path)
    else: #empty list
        return {}
    summed = defaultdict(list)
    if destructive:
        for k, v in proto_d.items():
            local_path = path + [k]
            if collatabletest(v):
                for d in dicts:
                            d, local_path, destructive=True
            elif isinstance(v, dict):
                summed[k] = addict(
                     dicts, local_path, collatabletest=collatabletest
            if any([isinstance(i, dict) for i in summed[k]]):
                raise TypeError( "dict adding failed")
        for k, v in proto_d.iteritems():
            local_path = path + [k]
            if collatabletest(v):
                for d in dicts:
                            d, local_path, destructive=False
            elif isinstance(v, dict):
                summed[k] = addict(
                    dicts, local_path, collatabletest=collatabletest)
            if any([isinstance(i, dict) for i in summed[k]]):
                raise TypeError( "dict adding failed")
    return dict(summed)
def edge_pairs_to_vertex_list(edge_pairs):
    """convert [[0,1], [0,2], [1,2]] lists into [0,1,2] lists. Assumes that
    vertex numbers are non-negative reals."""
    unused_edges = edge_pairs.copy() #we're going to overwrite here.
    vertex_list = -np.ones(len(unused_edges), dtype=edge_pairs.dtype)
    vertex_list[0] = unused_edges[0][0]
    unused_edges[0,:] = -1 #-1 is our sentinel value
    for i in xrange(edge_pairs.shape[0]-1):
        to_find = vertex_list[i]
        next_edge_i = (unused_edges==to_find).nonzero()[0][0]
        a, b = unused_edges[next_edge_i,:]
        next_vertex = b if a==to_find else a
        vertex_list[i+1] = next_vertex
        unused_edges[next_edge_i,:] = -1
    return vertex_list

class Rng(object):
    Python's RNGs use scurrilous globals. sometimes you need an RNG keeping
    private state.
    This is not optimised in any way and probably slams your CPU.
    This will need to be made more modular if we want to use numpy to do
    stuff, since numpy calls its state methods [gs]et_state instead of
    [gs]etstate. Grrr.
    Attribute access should be memoised if this is going to be called
    This is about as thread-safe as storing your trousers in aviation fuel."""
    def __init__(self, seed=None, *args, **kwargs):
        super(Rng, self).__init__(*args, **kwargs)
    def seed(self, seed):
        self._world_state = random.getstate()
        self._my_state = random.getstate()
    def __getattr__( self, name ):
        Return a proxy wrapper object if this is a method call.
        if name.startswith('_'):
            return object.__getattribute__(self, name)
            att = getattr(random, name)
            if callable(att):
                def _statefullcall(*args, **kwargs):
                        self._world_state = random.getstate()
                        ret = att(*args, **kwargs)
                        self._my_state = random.getstate()
                    return ret
                return _statefullcall
                return att

def hashint(i):
    use glibc's LCG coefficients to map an integer into a different
    return np.uint32((i*1103515245+12345) % 4294967296)

def minify(dictish):
    """destructively transform a (nested) dict of lists into one of arrays of
    the 32 bits size. memory saving."""
    if isinstance(dictish, np.ndarray):
        if dictish.dtype.kind=='i':
            return dictish.astype(np.int32)
        elif dictish.dtype.kind=='f':
            return dictish.astype(np.float32)
        return dictish
    if isinstance(dictish, list):
        if np.all([isinstance(item, int) for item in dictish]):
            v = np.asarray(dictish, dtype='int32')
        elif np.all([isinstance(item, (float,int)) for item in dictish]):
            v = np.asarray(dictish, dtype='float32')
            v = np.asarray(dictish)
        return v
    elif isinstance(dictish, dict):
        for k, v in dictish.iteritems():
            dictish[k] = minify(v)
    #else: Custom class? don't touch it.
    return dictish

def d(**kwargs):
    """miniature util to return dicts from keyword bundles to stop punctuation
    stabbing you in the eyeballs"""
    return kwargs
def nested_to_path_dict(n_dict):
    r_tuples = list(_nested_to_path_dict_helper([], n_dict))
    return dict([(".".join(key_chunks), val) for key_chunks, val in r_tuples])

def _nested_to_path_dict_helper(path_so_far, sub_dict):
    for p, val in sub_dict.iteritems():
        if hasattr(val, 'iteritems'):
            #is a dict, so recurse
            for tup in _nested_to_path_dict_helper(
                path_so_far + [p], val):
                yield tup
            # just return the vals otherwise
            yield path_so_far + [p], val

def path_to_nested_dict(p_dict):
    n_dict = {}
    for rpath, val in p_dict.iteritems():
        deep_dict_set(n_dict, rpath.split('.'), val)
    return n_dict