bubble-economy / flock.py

 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 #!/usr/bin/env python # encoding: utf-8 """ flock.py Created by dan mackinlay on 2012-09-12. TODO: * Consider calculating MI between *relative* axial positions or velocities of pairs of particles over time, rather than branching histories, which also might work in the infinite-time limit. * Parallelise in something natural, such as redis or 0MQ, or ruffus. Probably ruffus + sqlite is easiest. """ from __future__ import division import numpy as np import scipy as sp from scipy.spatial import distance import pyentropy from random import sample import itertools # we expect params to contain the following keys: # n_agents, dt, noise, radius, steps DEFAULT_TRIAL_PARAMS = { 'n_agents': 100, 'dt': 0.01, 'noise': 0.2, 'radius': 0.05, 'discard_steps': 100, 'branch_steps': 1, 'n_branches': 100, } def swept_experiment(sweepables=None, base_params=DEFAULT_TRIAL_PARAMS, oversample=1, start_seed=10000, n_pairs=10): sweepables = sweepables or {} #cast to empty dict swept_names = sweepables.keys() swept_iterables = [sweepables[name] for name in swept_names] estimates = [] seed = start_seed for swept_values in itertools.product(*swept_iterables): these_params = base_params.copy() these_params.update(zip(swept_names, swept_values)) for sample_i in xrange(oversample): trial = branched_trial( these_params, seed=seed ) estimates.append(( trial[0].params, analyse_branched_trial( trial, n_pairs=n_pairs ) )) seed = abs(hash(str(5))) #start seed in very different places next time return estimates def branched_trial(base_params=DEFAULT_TRIAL_PARAMS, seed=1): """ Run a sim, stop it after a number of iterations. Then run alternate versions with a given seed, keeping the results. """ base_simstate = do_sim(basic_flock_factory(base_params), steps=base_params['discard_steps'], seed=seed) #the sim might decorate the params dict with additional stuff. mostly the seed, i suppose. base_params = base_simstate.params #since we are branching, we store the original seed in 'base_seed' and keep 'seed' for most recent seed # should seeds have a separate data structure for clarity? base_params['base_seed'] = seed return [ do_sim(base_simstate, steps=base_params['branch_steps'], seed=seed+i+1) for i in xrange(base_params['n_branches']) ] def do_sim(simstate, steps=100, seed=1): """ Starting from the state in *simstate*, iterate the simulation *steps* times with different seeds. """ simstate = simstate.clone() params = simstate.params np.random.seed(seed) #update seed to reflect the most recent seed used. I suppose that's kind of variable overloading. #some cleaner method would be wise. params['seed'] = seed locs = simstate.locs vels = simstate.vels for step in xrange(steps): #The algorithm: #take neighbourhood sum, normalise, perturb, normalise. #Step 1: take neighbourhood sum, normalise: sp_neighbours = basic_calc_neighbours(locs, params) # this should give us the sum of all neighbouring velocities, # including the agent's own. vels = normalise_lengths(sp_neighbours * vels) #Step 2: perturb, normalise: vels = normalise_lengths(vels * (1-params['noise']) + random_unit_vectors(params) * params['noise']) #boundary conditions? locs = fract(locs + params['dt'] * vels) simstate.locs = locs simstate.vels = vels return simstate def analyse_branched_trial(branches, n_pairs=10): """Take a list of trials and analyse them.""" #I'm punting I can ignore most of the state vector, e.g. locs. # stacked_vels = np.dstack([b.vels for b in branches]) #In fact, do we really need multiple axes of info for vels? Nah. stacked_vels = np.vstack([b.vels[:,0] for b in branches]) n_agents = stacked_vels.shape[0] # Now, we estimate distributions of projected velocity in order to, in turn, estimate mean # mutual information. ests = np.zeros(n_pairs) for i in xrange(n_pairs): #this is not sliced efficiently. Should restack. Anyway... pair = stacked_vels[sample(xrange(n_agents), 2), :] ests[i] = ince_mi_dist_cont(pair[0], pair[1]) return ests class SimState(object): """A holder for some simulation state. Copy using the clone method to avoid that damn mutable state dictionary giving you problems.""" params = None statevars = ['vels', 'locs'] def __init__(self, params): self.params = params.copy() def clone(self): """make a new copy of me with arrays pointing to the same memory locations but with params dict copied so that branching thing works without trampling state.""" clone = self.__class__(self.params.copy()) for name in self.statevars: setattr(clone, name, getattr(self, name)) return clone def basic_flock_factory(params): """Initialise a sim with basic random state.""" sim = SimState(params=params) sim.locs = np.random.uniform(size=(params['n_agents'], 2)) sim.vels = random_unit_vectors(params) return sim def normalise_lengths(row_vectors): """normalise the lengths of some stacked row vectors. """ return row_vectors / np.sqrt(np.sum(np.square(row_vectors), 1)).reshape(-1,1) def random_isotropic_vectors(params): """general a matrix of isotropically-distributed row vectors of mean length 1""" #the normal distribution is isotropic in all dimensions return np.random.normal(0., 2 ** -0.5, (params['n_agents'], 2)) def random_unit_vectors(params): """general a matrix of isotropically-distributed unit row vectors""" return normalise_lengths(random_isotropic_vectors(params)) def distances(row_vectors): """construct an upper-triangular pairwise distance matrix from stacked rows of observation vectors. Take same args as pdist proper. nb - always casts to 64 bit floats.""" return distance.squareform( distance.pdist(row_vectors) ) def basic_calc_neighbours(row_vectors, params): #some forgotten bit of profiling long ago suggested that this weird casting saved some CPU cycles return sp.sparse.csr_matrix( (distances(row_vectors)