1. dan mackinlay
  2. bubble-economy


bubble-economy / flock.py

#!/usr/bin/env python
# encoding: utf-8

Created by dan mackinlay on 2012-09-12.

* Consider calculating MI between *relative* axial positions or velocities of pairs of particles over time,
  rather than branching histories, which also might work in the infinite-time limit.
* Parallelise in something natural, such as redis or 0MQ, or ruffus.
* store data in sqlite for ease of processing
* store intermediate trials in sqlite too
* calibrate pyentropy to work out why i get large positive information between deterministic variables.
from __future__ import division

import numpy as np
import scipy as sp
from scipy.spatial import distance
import pyentropy
from random import sample
import itertools
from pprint import pprint

# we expect params to contain the following keys:
# n_agents, dt, noise, radius, steps
    'n_agents': 100,
    'dt': 0.01,
    'noise': 0.2,
    'radius': 0.05,
    'discard_steps': 100,
    'branch_steps': 10,
    'n_branches': 100,

def swept_experiment(sweepables=None, base_params=DEFAULT_TRIAL_PARAMS, oversample=10, start_seed=10000, n_pairs=10):
    sweepables = sweepables or {} #cast to empty dict
    swept_names = sweepables.keys()
    swept_iterables = [sweepables[name] for name in swept_names]
    estimates = []
    seed = start_seed
    n_vals = 1
    for iterable in swept_iterables:
        n_vals *= len(iterable)
    for param_i, swept_values in enumerate(itertools.product(*swept_iterables)):
        these_params = base_params.copy()
        these_params.update(zip(swept_names, swept_values))
        print "simulating"
        print "%0.2f %%" % (100.0*param_i/n_vals)
        for oversample_i in xrange(oversample):
            trial = branched_trial(
            seed = abs(hash(str(5))) #start seed in very different places next time
    return estimates

def branched_trial(base_params=DEFAULT_TRIAL_PARAMS, seed=1):
    Run a sim, stop it after a number of iterations.
    Then run alternate versions with a given seed, keeping the results.
    base_simstate = do_sim(basic_flock_factory(base_params), steps=base_params['discard_steps'], seed=seed)
    #the sim might decorate the params dict with additional stuff. mostly the seed, i suppose.
    base_params = base_simstate.params
    #since we are branching, we store the original seed in 'base_seed' and keep 'seed' for most recent seed
    # should seeds have a separate data structure for clarity?
    base_params['base_seed'] = seed
    return [
        do_sim(base_simstate, steps=base_params['branch_steps'], seed=seed+i+1)
        for i in xrange(base_params['n_branches'])

def do_sim(simstate, steps=100, seed=1):
    Starting from the state in *simstate*, iterate the simulation *steps* times with different seeds.
    simstate = simstate.clone()
    params = simstate.params
    #update seed to reflect the most recent seed used. I suppose that's kind of variable overloading.
    #some cleaner method would be wise.
    params['seed'] = seed
    locs = simstate.locs
    vels = simstate.vels
    for step in xrange(steps):
        #The algorithm: 
        #take neighbourhood sum, normalise, perturb, normalise.
        #Step 1: take neighbourhood sum, normalise:
        sp_neighbours = basic_calc_neighbours(locs, params)
        # this should give us the sum of all neighbouring velocities,
        # including the agent's own.
        vels = normalise_lengths(sp_neighbours * vels)
        #Step 2: perturb, normalise:
        vels = normalise_lengths(vels * (1-params['noise']) + random_unit_vectors(params) * params['noise'])
        #boundary conditions?
        locs = fract(locs + params['dt'] * vels)
    simstate.locs = locs
    simstate.vels = vels
    return simstate        

def analyse_branched_trial(branches, n_pairs=10):
    """Take a list of trials and analyse them."""
    #I'm punting I can ignore most of the state vector, e.g. locs.
    # stacked_vels = np.dstack([b.vels for b in branches])
    #In fact, do we really need multiple axes of info for vels? Nah.
    stacked_vels = np.vstack([b.vels[:,0] for b in branches])
    n_agents = stacked_vels.shape[0]
    # Now, we estimate distributions of projected velocity in order to, in turn, estimate mean
    # mutual information.
    ests = np.zeros(n_pairs)
    for i in xrange(n_pairs):
        #this is not sliced efficiently. Should restack. Anyway...
        pair = stacked_vels[sample(xrange(n_agents), 2), :]
        ests[i] = ince_mi_dist_cont(pair[0], pair[1])   
    return ests

class SimState(object):
    """A holder for some simulation state.
    Copy using the clone method to avoid that damn mutable state dictionary giving you problems."""
    params = None
    statevars = ['vels', 'locs']
    def __init__(self, params):
        self.params = params.copy()        
    def clone(self):
        """make a new copy of me with arrays pointing to the same memory locations
        but with params dict copied so that branching thing works without trampling state."""
        clone = self.__class__(self.params.copy())
        for name in self.statevars:
            setattr(clone, name, getattr(self, name))
        return clone

def basic_flock_factory(params):
    """Initialise a sim with basic random state."""
    sim = SimState(params=params)
    sim.locs = np.random.uniform(size=(params['n_agents'], 2))
    sim.vels = random_unit_vectors(params)
    return sim

def normalise_lengths(row_vectors):
    """normalise the lengths of some stacked row vectors.
    return row_vectors / np.sqrt(np.sum(np.square(row_vectors), 1)).reshape(-1,1)

def random_isotropic_vectors(params):
    """general a matrix of isotropically-distributed row vectors of mean length 1"""
    #the normal distribution is isotropic in all dimensions
    return np.random.normal(0., 2 ** -0.5, (params['n_agents'], 2))

def random_unit_vectors(params):
    """general a matrix of isotropically-distributed unit row vectors"""
    return normalise_lengths(random_isotropic_vectors(params))

def distances(row_vectors):
    """construct an upper-triangular pairwise distance matrix from stacked
    rows of observation vectors. Take same args as pdist proper. nb - always
    casts to 64 bit floats."""
    return distance.squareform(

def basic_calc_neighbours(row_vectors, params):
    #some forgotten bit of profiling long ago suggested that this weird casting saved some CPU cycles
    return sp.sparse.csr_matrix(

def fract(a):
    """shorthand for 'fractional part of'"""
    return np.remainder(a,1.0)

def ince_mi_dist_cont(X, Y, n_bins=None, **kwargs):
    """wraps pyentropy's grandiose object initialisation and quantisation
    and just tells us what the bloody MI is, with no intermediate fiddling
    about, from continuous input.
    If no n_bins is given, we choose the maximimum statistically valid number
    by the cochrane criterion, and leave it be. This will give good results in
    all except particularly pathological cases. (bins will be (marginwise)
    approximately equal occupancy)
    # kwargs.setdefault('method', 'qe')
    # kwargs.setdefault('sampling', 'kt')
    kwargs.setdefault('method', 'plugin')
    kwargs.setdefault('sampling', 'naive')
    X = np.asanyarray(X).ravel()
    Y = np.asanyarray(Y).ravel()
    if n_bins is None:
        n_bins = choose_n_bins(X.size)
    X, _ = pyentropy.quantise(X, n_bins, centers=False)
    Y, _ = pyentropy.quantise(Y, n_bins, centers=False)
    del(_) #Go away, extraneous data!
    ds = pyentropy.DiscreteSystem(
      X, (1, n_bins),
      Y, (1, n_bins)
    return ds.I()

def choose_n_bins(n_samples, test=True):
    """according to Cellucci et al (2005), the maximal number of bins is given
    if n_samples<20 and test:
        raise ValueError("%d is too small a number to bin" % n_samples)
    return int(float(n_samples/5.)**(0.5))