1. dan mackinlay
  2. bubble-economy

Source

bubble-economy / experiments.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
#!/usr/bin/env python
# encoding: utf-8
"""
experiments.py

Experiments are collections of trials generated somehow or other.

Copyright (c) 2011 Dan MacKinlay. All rights reserved.
"""

# import multiprocessing
from traders import TraderSet, WicksTraderSet, \
    ErgodicWicksTraderSet, do_trade_with_args
import sampling
import numpy as np
import scipy as sp
import time
from collections import defaultdict
import cPickle as pickle
from utils import get_code_revision, get_kw_arguments, \
    deep_dict_get, addict, Bunch, minify, make_iterable, nested_to_path_dict
import os.path
from path import path
from settings import EXPERIMENT_OUTPUT_PATH
from warehouse import Box, Warehouse, digest
from local_exceptions import BubbleEconomyException, MethodologicallySuspect
from fsdict import FSDict
from dispatch import DispatcherFactory
import trader_stats

# cache the mquantiles fn to only import masked array stats if necc.
# (Why is the best quantile function in scipy for masked arrays?)
mquantiles = None

#Defined in settings file to prevent import loops
from settings import DEFAULT_SUMMARY_STATS, ALLOWED_SUMMARY_STATS


def perform_experiment(
        experiment_id,
        limit=5,
        repeat=5,
        sampler="BasicRandomSampler",
        processes=3,
        seed_offset=0,
        single_shot=False,
        picloud=False,
        command_line="",
        clobber=False,
        desc="",
        concurrency='none',
        **experiment_kwargs):
    if single_shot:
        limit = 1
        repeat = 1
    #fancy footwork to have mutable default arguments:
    experiment_kwargs.setdefault('stats_set', [])
    
    warehouse = get_trial_warehouse(experiment_id)
    code_revision = get_code_revision()
    
    metadata_fs = get_experiment_metadata_fs(experiment_id)
    experiment_meta_new = {
      "command_line": command_line,
      "code_revision": code_revision,
      "desc": desc
    }
    experiment_meta_from_file = metadata_fs.setdefault(
      experiment_id+".meta",
      experiment_meta_new
    )
    
    if experiment_meta_from_file != experiment_meta_new:
        if clobber:
            print """Overwriting experiment metadata.
              Old parameters: %s
              New parameters: %s""" % (
                str(experiment_meta_from_file),
                str(experiment_meta_new)
            )
            for k in metadata_fs: del(metadata_fs[k])
            metadata_fs.update(experiment_meta_new)
        else:
            raise MethodologicallySuspect(
              """
              Running the same experiment id with different parameters?
              Specify --clobber if you really want to do this.
              Old parameters: %s
              New parameters: %s""" % (
                str(experiment_meta_from_file),
                str(experiment_meta_new)
              )
            )
    dispatcher = DispatcherFactory(
      concurrency=concurrency,
      task=simulate_and_analyse_trial,
      warehouse=warehouse,
      identifier="bubble_economy/" + experiment_id,
      processes=processes
    )
    Sampler = getattr(sampling, sampler)
    
    sampler = Sampler(
      dispatcher=dispatcher,
      limit=limit,
      repeat=repeat,
      kwargs=experiment_kwargs,
      seed_offset=seed_offset,
    )

    experiments = sampler.sample()
    return experiments

def simulate_and_analyse_trial(
        box,
        trader_factory,
        stats_set,
        keep_raw_data=False,
        save_model_to_box=False,
        **trial_bunch):
    """Wraps both simulation and analysis phase for a single trial.
    
    Return a Box instance with the desired stats tacked on.
    """
    print "Simulating", str(trial_bunch)
    
    try:
        #run experiment
        ts = do_trade_with_args(
          trader_factory=trader_factory,
          **trial_bunch)
        #analyse it
        print "Analysing", str(trial_bunch)
        stats = do_stats(ts, stats_set)
    except BubbleEconomyException, e:
        print e
        #something not too major. ideally, we would log these.
        return None
    #get rid of raw data if necessary - probably you only care if
    #serialising it.    
    if not keep_raw_data:
        ts.prune_data()
    ts.pack_neighbours()
    if save_model_to_box:
        #do we wish to instrospect the intermediate values?
        #beware, this can consume hundreds of megabytes of bandwidth
        #or minutes of pickling
        box.traderset = ts
        
    box.stats = stats
    return box

def reanalyse_experiment(
        experiment_id,
        stats_set,
        processes=3,
        picloud=False,
        command_line="",
        desc="",
        concurrency='none',
        **experiment_kwargs):
    #fancy footwork to have mutable default arguments:
    import trader_stats_sets
      
    warehouse = get_trial_warehouse(experiment_id)
    stats_set = getattr(trader_stats_sets, stats_set)
    
    metadata_fs = get_experiment_metadata_fs(experiment_id)
    experiment_meta_from_file = metadata_fs[experiment_id+".meta"]

    dispatcher = DispatcherFactory(
      concurrency=concurrency,
      task=reanalyse_trial,
      warehouse=warehouse,
      identifier="bubble_economy/" + experiment_id,
      processes=processes
    )
    # import pdb; pdb.set_trace()

    return [box.clear_cache() for box in
        dispatcher.batch_dispatch([
            dict(box=box, stats_set=stats_set)
            for box in warehouse.find()
        ])
    ]

def reanalyse_trial(
        box,
        stats_set,
        dry_run=False,
        **kwargs):
    """Do re-analysis (additional/replacement stats) on an existent box
    
    Return a Box instance with the desired stats tacked on.
    """
    print "Reanalysing", str(box._basename)
    print str(box._metadata)
    
    try:
        #run experiment
        ts = box.traderset
        #analyse it
        stats = getattr(box, 'stats', {})
        new_stats = do_stats(ts, stats_set)
        stats.update(new_stats)
        box.stats = stats
        del(new_stats)
        if not dry_run:
            box.store()
            box.clear_cache()
    except BubbleEconomyException, e:
        print e
        #something not too major. ideally, we would log these.
        return None        
    return box

def do_stats(traderset, stats_set, stats=None):
    """calculate stats in stats_set and append them to `stats` if they are not
    already there."""
    stats = stats or {}

    for name in stats_set:
        if not isinstance(name, basestring):
            #a tuple of (key, func_string, kwarg)
            key, name, stats_args = name
        else:
            key = name
            stats_args = {}
        
        print "inferring", key
        if key in stats: continue
        stats_fn = getattr(trader_stats, name)
        stats[key] = minify(make_iterable(
          stats_fn(traderset, **stats_args)
        ))
    return stats

def get_trial_warehouse(exp_name=None):
    return get_exp_warehouse(exp_name, 'trials')

def get_exp_warehouse(*parts):
    """convenience function to get a warehouse at the given subdir and
    ensure that said path exists"""
    parts = [EXPERIMENT_OUTPUT_PATH] + list(parts)
    fullpath = path(os.path.join(*[
        p for p in parts if p
    ]))
    fullpath.makedirs()
    return Warehouse(fullpath)

def get_experiment_metadata_fs(exp_name=None):
    "return an FSDict looking at the given directory."
    return get_exp_warehouse(exp_name, 'metadata').fs

def write_summary(experiment_name,
        cached=False,
        as_text=False,
        sum_stats=DEFAULT_SUMMARY_STATS,
        exclude=(
            'discard_steps', 
            'norm_periodic',
            'factory',
            'trader_factory',
            'keep_raw_data'),
        *args, **kwargs):
    
    #we only write TSV ATM
    if not as_text:
        return
    
    import csv
    exp = get_reduced_experiment(experiment_name,
        sum_stats=sum_stats,
        cached=cached)
        
    outdir = EXPERIMENT_OUTPUT_PATH/'text'
    outdir.makedirs()
    
    text_path = outdir/(experiment_name + '.tsv')
    # Let's index from 1 on the assumption we are going into R
    idx = 1
    param_headers = exp[0][0].keys()
    for f in exclude + ('stats_set',):
        try:
            param_headers.remove(f)
        except ValueError:
            pass
    all_headers = [None] + param_headers + ['stat_name', 'stat_value']
    
    flat_stats = nested_to_path_dict(exp[1])
    with text_path.open('wb') as text_file:
        text_writer = csv.DictWriter(text_file,
            fieldnames = all_headers,
            dialect='excel-tab')
        text_writer.writeheader()
        for stat_name, stat_val_list in flat_stats.iteritems():
            if stat_name.endswith('.____stats'): continue
            for trial_idx, stat_val in enumerate(stat_val_list):
                my_trial_meta = exp[0][trial_idx]
                d = dict([(key, my_trial_meta[key]) for key in param_headers])
                d[None] = idx
                d['stat_name'] = stat_name
                try:
                    for val in stat_val:
                        d['stat_value'] = stat_val
                        print "writing", d
                        text_writer.writerow(d)
                        idx += 1
                except TypeError:
                    #not iterable. just write one row
                    d['stat_value'] = stat_val
                    print "writing", d
                    text_writer.writerow(d)
                    idx += 1

def get_reduced_experiment(exp_name, cached=True,
        sum_stats=DEFAULT_SUMMARY_STATS):
    outdir = EXPERIMENT_OUTPUT_PATH/'summaries'
    outdir.makedirs()
    cache_path = outdir/(exp_name + '.pickle')
    if cached and cache_path.exists():
        with cache_path.open('rb') as inz:
            return pickle.load(inz)
    
    #No luck loading from cache; calculate it and cache it now.
    trial_tuple = reduce_experiment(
        get_trial_warehouse(exp_name),
        sum_stats=sum_stats
    )
    with cache_path.open('wb') as outz:
        pickle.dump(trial_tuple, outz, protocol=2)
    return trial_tuple

def reduce_experiment(warehouse, sum_stats=DEFAULT_SUMMARY_STATS):
    """extract the experiment names and arguments from a large trial run.
    
    Currently this only looks at scalar measures in nested dicts.
    """
    
    trials = list(warehouse.find(
        None,
        cache=False))
    num_trials = len(trials)
    deseeded_trial_ids = defaultdict(list)
    
    #ignoring seed values, aggregate over all trials
    for i in xrange(num_trials):
        print "examining box", i
        trial_key_dict = trials[i]._metadata
        trial_key_dict.pop('seed')
        trial_key_flat = digest(trial_key_dict)
        deseeded_trial_ids[trial_key_flat].append(i)
    
    distinct_params = len(deseeded_trial_ids)
    
    #now for each set of repeated trials, collect the set together
    deseeded_trial_list = []
    
    for k in deseeded_trial_ids.keys():
        print "examining parameter key", k
        trial_ids = deseeded_trial_ids[k]
        these_trials = [trials[j] for j in trial_ids]
        trial_args = these_trials[0]._metadata
        trial_basenames = [b._basename for b in these_trials]
        print "examining trials %s" % str(trial_basenames)
        trial_statses = [b.stats for b in these_trials]
        deseeded_trial_list.append(
            (trial_args, trial_basenames, trial_statses))
        del(deseeded_trial_ids[k])
        del(these_trials)    
        del(trial_basenames)
        del(trial_statses)
    
    # now, walk that list again and evert the list of nested dicts of floats
    # into nested dicts of lists of floats
    everted_trial_basenames = []
    everted_trial_args = []
    everted_trial_stats = []
    
    for i in xrange(len(deseeded_trial_list)):
        print "examining bundled stats group", i
        trial_args, trial_basenames, trial_statses = deseeded_trial_list.pop()
        everted_trial_basenames.append(trial_basenames)
        everted_trial_args.append(trial_args)
        everted_trial_stats.append(addict(trial_statses, destructive=True))
    
    del(deseeded_trial_list)
    
    for i in xrange(len(everted_trial_stats)):
        print "collating bundled stats group", i
        everted_trial_stats[i] = seek_and_study(
            everted_trial_stats[i], sum_stats=sum_stats)
        
    stats_arrays = addict(everted_trial_stats)
    del(everted_trial_stats)
    
    return everted_trial_args, stats_arrays, everted_trial_basenames

def seek_and_study(d, sum_stats=DEFAULT_SUMMARY_STATS, *args, **kwargs):
    """walk a nested dict and turn any list or array in it into
    correctly-nested lists, or at least some some scalar summaries.
    Destructive op for memory saving reasons; this is biggish data.
    
    sum_stats can be a set including 'mean', 'std_dev', 'multiplicity',
    'raw' and 'quartiles'.
    
    This is basically a reduce phase, but I didn't start out aware i was
    writing a MapReduce, so it's clunky."""
    global mquantiles
    
    if 'quartiles' in sum_stats and mquantiles is None:
        from scipy.stats.mstats import mquantiles
    
    for k, v in d.iteritems():
        if isinstance(v, dict):
            d[k] = seek_and_study(v, 
                sum_stats=sum_stats, *args, **kwargs)
        elif isinstance(v, (list, np.ndarray)):
            summary = {}
            # if any([isinstance(i, dict) for i in v]):
                # print "eeeeek"
            try:
                v = np.asarray(v)
                if 'mean' in sum_stats:
                    summary['mean'] = v.mean()
                if 'std_dev' in sum_stats:
                    summary['std_dev'] = v.var(ddof=1)**0.5
                if 'multiplicity' in sum_stats:
                    summary['multiplicity'] = v.size
                if 'quartiles' in sum_stats:
                    quantiles = mquantiles(v,
                        [0, 0.25, 0.5, 0.75, 1])
                    summary['quartile.0'] = quantiles[0]
                    summary['quartile.25'] = quantiles[1]
                    summary['quartile.50'] = quantiles[2]
                    summary['quartile.75'] = quantiles[3]
                    summary['quartile.100'] = quantiles[4]
                if 'raw' in sum_stats:
                    summary['raw'] = v
                    
                #sentinel to ease avoiding this next time
                summary['____stats'] = None 
                d[k] = summary
            except TypeError:
                #Crap. Having trouble getting this one right.
                import pdb; pdb.set_trace()
                
        else: #string? object?
            pass
    return d

def file_for_stat(exp_name, stat_name, headers):
    stats_files = file_for_stat.stats_files
    if stat_name in stats_files:
        return stats_files[stat_name]
    handle = open(str(path(EXPERIMENT_OUTPUT_PATH)/'text/%s.%s.csv') % (exp_name, stat_name), 'w')
    stats_files[stat_name] = handle
    handle.write(",".join(headers)+"\n")
    return handle

file_for_stat.stats_files = {}

def _close_files_for_stat():
    for k in file_for_stat.stats_files.keys():
        file_for_stat.stats_files[k].close()
        del(file_for_stat.stats_files[k])
        
file_for_stat.close=_close_files_for_stat

del(_close_files_for_stat)