Source

bubble-economy / flock.py

#!/usr/bin/env python
# encoding: utf-8
"""
flock.py

workhorse functions for doing flocking simulations

Created by dan mackinlay on 2012-09-12.
"""
from __future__ import division

import numpy as np
import scipy as sp
from scipy.spatial import distance
import pyentropy
from random import sample
import itertools
from pprint import pprint
from settings import OUTPUT_DB_URL

# we expect params to contain the following keys:
# n_agents, dt, noise, radius, steps
DEFAULT_TRIAL_PARAMS = {
    'n_agents': 100,
    'dt': 0.01,
    'noise': 0.2,
    'radius': 0.05,
    'discard_steps': 100,
    'branch_steps': 10,
    'n_branches': 100,
}

def stashed_swept_experiment(db_url = OUTPUT_DB_URL, *args, **kwargs):
    import persistence
    session = persistence.get_db_session(OUTPUT_DB_URL)
    for params, stat_vals in swept_experiment_iterator(*args, **kwargs):
        for stat_val in stat_vals:
            persistence.persist_stat(session, "mi", stat_val, params)
    return session

def swept_experiment(*args, **kwargs):
    return [e for e in swept_experiment_iterator(*args, **kwargs)]
    
def swept_experiment_iterator(sweepables=None, base_params=DEFAULT_TRIAL_PARAMS, oversample=10, start_seed=10000, n_pairs=10):
    sweepables = sweepables or {} #cast to empty dict
    swept_names = sweepables.keys()
    swept_iterables = [sweepables[name] for name in swept_names]
    seed = start_seed
    n_vals = 1
    for iterable in swept_iterables:
        n_vals *= len(iterable)
    for param_i, swept_values in enumerate(itertools.product(*swept_iterables)):
        these_params = base_params.copy()
        these_params.update(zip(swept_names, swept_values))
        print "simulating"
        pprint(these_params)
        print "%0.2f %%" % (100.0*param_i/n_vals)
        for oversample_i in xrange(oversample):
            trial = branched_trial(
                these_params,
                seed=seed
            )
            yield (
                trial[0].params,
                analyse_branched_trial(
                    trial,
                    n_pairs=n_pairs
                )
            )
            seed = abs(hash(str(5))) #start seed in a different place next time around

def branched_trial(base_params=DEFAULT_TRIAL_PARAMS, seed=1):
    """
    Run a sim, stop it after a number of iterations.
    Then run alternate versions with a given seed, keeping the results.
    """
    base_simstate = do_sim(basic_flock_factory(base_params), steps=base_params['discard_steps'], seed=seed)
    #the sim might decorate the params dict with additional stuff. mostly the seed, i suppose.
    base_params = base_simstate.params
    #since we are branching, we store the original seed in 'base_seed' and keep 'seed' for most recent seed
    # should seeds have a separate data structure for clarity?
    base_params['base_seed'] = seed
    return [
        do_sim(base_simstate, steps=base_params['branch_steps'], seed=seed+i+1)
        for i in xrange(base_params['n_branches'])
    ]

def do_sim(simstate, steps=100, seed=1):
    """
    Starting from the state in *simstate*, iterate the simulation *steps* times with different seeds.
    """
    simstate = simstate.clone()
    params = simstate.params
    np.random.seed(seed)
    #update seed to reflect the most recent seed used. I suppose that's kind of variable overloading.
    #some cleaner method would be wise.
    params['seed'] = seed
    locs = simstate.locs
    vels = simstate.vels
    for step in xrange(steps):
        #The algorithm: 
        #take neighbourhood sum, normalise, perturb, normalise.
        #Step 1: take neighbourhood sum, normalise:
        sp_neighbours = basic_calc_neighbours(locs, params)
        # this should give us the sum of all neighbouring velocities,
        # including the agent's own.
        vels = normalise_lengths(sp_neighbours * vels)
        #Step 2: perturb, normalise:
        vels = normalise_lengths(vels * (1-params['noise']) + random_unit_vectors(params) * params['noise'])
        #boundary conditions?
        locs = fract(locs + params['dt'] * vels)
    simstate.locs = locs
    simstate.vels = vels
    return simstate        

def analyse_branched_trial(branches, n_pairs=10):
    """Take a list of trials and analyse them."""
    #I'm punting I can ignore most of the state vector, e.g. locs.
    # stacked_vels = np.dstack([b.vels for b in branches])
    #In fact, do we really need multiple axes of info for vels? Nah.
    stacked_vels = np.vstack([b.vels[:,0] for b in branches])
    n_agents = stacked_vels.shape[0]
    # Now, we estimate distributions of projected velocity in order to, in turn, estimate mean
    # mutual information.
    ests = np.zeros(n_pairs)
    for i in xrange(n_pairs):
        #this is not sliced efficiently. Should restack. Anyway...
        pair_indices = sample(xrange(n_agents), 2)
        pair = stacked_vels[:, pair_indices].T
        ests[i] = ince_mi_dist_cont(pair[0], pair[1])   
    return ests

class SimState(object):
    """A holder for some simulation state.
    Copy using the clone method to avoid that damn mutable state dictionary giving you problems."""
    params = None
    statevars = ['vels', 'locs']
    def __init__(self, params):
        self.params = params.copy()        
    def clone(self):
        """make a new copy of me with arrays pointing to the same memory locations
        but with params dict copied so that branching thing works without trampling state."""
        clone = self.__class__(self.params.copy())
        for name in self.statevars:
            setattr(clone, name, getattr(self, name))
        return clone

def basic_flock_factory(params):
    """Initialise a sim with basic random state."""
    sim = SimState(params=params)
    sim.locs = np.random.uniform(size=(params['n_agents'], 2))
    sim.vels = random_unit_vectors(params)
    return sim

def normalise_lengths(row_vectors):
    """normalise the lengths of some stacked row vectors.
    """
    return row_vectors / np.sqrt(np.sum(np.square(row_vectors), 1)).reshape(-1,1)

def random_isotropic_vectors(params):
    """general a matrix of isotropically-distributed row vectors of mean length 1"""
    #the normal distribution is isotropic in all dimensions
    return np.random.normal(0., 2 ** -0.5, (params['n_agents'], 2))

def random_unit_vectors(params):
    """general a matrix of isotropically-distributed unit row vectors"""
    return normalise_lengths(random_isotropic_vectors(params))

def distances(row_vectors):
    """construct an upper-triangular pairwise distance matrix from stacked
    rows of observation vectors. Take same args as pdist proper. nb - always
    casts to 64 bit floats."""
    return distance.squareform(
      distance.pdist(row_vectors)
    )

def basic_calc_neighbours(row_vectors, params):
    #some forgotten bit of profiling long ago suggested that this weird casting saved some CPU cycles
    return sp.sparse.csr_matrix(
      (distances(row_vectors)<params['radius']).astype('int8')
    )

def fract(a):
    """shorthand for 'fractional part of'"""
    return np.remainder(a,1.0)

def ince_mi_dist_cont(X, Y, n_bins=None, **kwargs):
    """wraps pyentropy's grandiose object initialisation and quantisation
    and just tells us what the bloody MI is, with no intermediate fiddling
    about, from continuous input.
    
    If no n_bins is given, we choose the maximimum statistically valid number
    by the Cochrane criterion, and leave it be. This will give OK results in
    all except pathological cases: Bins will be (marginwise)
    approximately equal occupancy.
    """
    # kwargs.setdefault('method', 'qe')
    # kwargs.setdefault('sampling', 'kt')
    kwargs.setdefault('method', 'plugin')
    kwargs.setdefault('sampling', 'naive')
    X = np.asanyarray(X).ravel()
    Y = np.asanyarray(Y).ravel()
    if n_bins is None:
        n_bins = choose_n_bins(X.size)
    X, X_bins = pyentropy.quantise(X, n_bins, centers=False)
    Y, Y_bins = pyentropy.quantise(Y, n_bins, centers=False)
    ds = pyentropy.DiscreteSystem(
      X, (1, n_bins),
      Y, (1, n_bins)
    )
    ds.calculate_entropies(**kwargs)
    I = ds.I()
    return I

def choose_n_bins(n_samples, test=True):
    """according to Cellucci et al (2005), the maximal number of bins is given
    thus"""
    if n_samples<20 and test:
        raise ValueError("%d is too small a number to bin" % n_samples)
    return int(float(n_samples/5.)**(0.5))