TODO: * Consider calculating MI between *relative* axial positions or velocities of pairs of particles over time, rather than branching histories, which also might work in the infinite-time limit. * Parallelise in something natural, such as redis or 0MQ, or ruffus. Probably ruffus + sqlite is easiest. """ from __future__ import division import numpy as np import scipy as sp from scipy.spatial import distance import pyentropy from random import sample import itertools # we expect params to contain the following keys: # n_agents, dt, noise, radius, steps DEFAULT_TRIAL_PARAMS = { 'n_agents': 100, 'dt': 0.01, 'noise': 0.2, 'radius': 0.05, 'discard_steps': 100, 'branch_steps': 1, 'n_branches': 100, } def swept_experiment(sweepables=None, base_params=DEFAULT_TRIAL_PARAMS, oversample=1, start_seed=10000, n_pairs=10): sweepables = sweepables or {} #cast to empty dict swept_names = sweepables.keys() swept_iterables = [sweepables[name] for name in swept_names] estimates = [] seed = start_seed for swept_values in itertools.product(*swept_iterables): these_params = base_params.copy() these_params.update(zip(swept_names, swept_values)) for sample_i in xrange(oversample): trial = branched_trial( these_params, seed=seed ) estimates.append(( trial[0].params, analyse_branched_trial( trial, n_pairs=n_pairs ) )) seed = abs(hash(str(5))) #start seed in very different places next time return estimates def branched_trial(base_params=DEFAULT_TRIAL_PARAMS, seed=1): """ Run a sim, stop it after a number of iterations. Then run alternate versions with a given seed, keeping the results. """ base_simstate = do_sim(basic_flock_factory(base_params), steps=base_params['discard_steps'], seed=seed) #the sim might decorate the params dict with additional stuff. mostly the seed, i suppose. base_params = base_simstate.params #since we are branching, we store the original seed in 'base_seed' and keep 'seed' for most recent seed # should seeds have a separate data structure for clarity? base_params['base_seed'] = seed return [ do_sim(base_simstate, steps=base_params['branch_steps'], seed=seed+i+1) for i in xrange(base_params['n_branches']) ] def do_sim(simstate, steps=100, seed=1): """ Starting from the state in *simstate*, iterate the simulation *steps* times with different seeds. """ simstate = simstate.clone() params = simstate.params np.random.seed(seed) #update seed to reflect the most recent seed used. I suppose that's kind of variable overloading. #some cleaner method would be wise. params['seed'] = seed locs = simstate.locs vels = simstate.vels for step in xrange(steps): #The algorithm: #take neighbourhood sum, normalise, perturb, normalise. #Step 1: take neighbourhood sum, normalise: sp_neighbours = basic_calc_neighbours(locs, params) # this should give us the sum of all neighbouring velocities, # including the agent's own. vels = normalise_lengths(sp_neighbours * vels) #Step 2: perturb, normalise: vels = normalise_lengths(vels * (1-params['noise']) + random_unit_vectors(params) * params['noise']) #boundary conditions? locs = fract(locs + params['dt'] * vels) simstate.locs = locs simstate.vels = vels return simstate def analyse_branched_trial(branches, n_pairs=10): """Take a list of trials and analyse them.""" #I'm punting I can ignore most of the state vector, e.g. locs. # stacked_vels = np.dstack([b.vels for b in branches]) #In fact, do we really need multiple axes of info for vels? Nah. stacked_vels = np.vstack([b.vels[:,0] for b in branches]) n_agents = stacked_vels.shape[0] # Now, we estimate distributions of projected velocity in order to, in turn, estimate mean # mutual information. ests = np.zeros(n_pairs) for i in xrange(n_pairs): #this is not sliced efficiently. Should restack. Anyway... pair = stacked_vels[sample(xrange(n_agents), 2), :] ests[i] = ince_mi_dist_cont(pair[0], pair[1]) return ests class SimState(object): """A holder for some simulation state. Copy using the clone method to avoid that damn mutable state dictionary giving you problems.""" params = None statevars = ['vels', 'locs'] def __init__(self, params): self.params = params.copy() def clone(self): """make a new copy of me with arrays pointing to the same memory locations but with params dict copied so that branching thing works without trampling state.""" clone = self.__class__(self.params.copy()) for name in self.statevars: setattr(clone, name, getattr(self, name)) return clone def basic_flock_factory(params): """Initialise a sim with basic random state.""" sim = SimState(params=params) sim.locs = np.random.uniform(size=(params['n_agents'], 2)) sim.vels = random_unit_vectors(params) return sim def normalise_lengths(row_vectors): """normalise the lengths of some stacked row vectors. """ return row_vectors / np.sqrt(np.sum(np.square(row_vectors), 1)).reshape(-1,1) def random_isotropic_vectors(params): """general a matrix of isotropically-distributed row vectors of mean length 1""" #the normal distribution is isotropic in all dimensions return np.random.normal(0., 2 ** -0.5, (params['n_agents'], 2)) def random_unit_vectors(params): """general a matrix of isotropically-distributed unit row vectors""" return normalise_lengths(random_isotropic_vectors(params)) def distances(row_vectors): """construct an upper-triangular pairwise distance matrix from stacked rows of observation vectors. Take same args as pdist proper. nb - always casts to 64 bit floats.""" return distance.squareform( distance.pdist(row_vectors) ) def basic_calc_neighbours(row_vectors, params): #some forgotten bit of profiling long ago suggested that this weird casting saved some CPU cycles return sp.sparse.csr_matrix( (distances(row_vectors)