Commits

Joe Kington committed 6e3ffad

Initial Commit

Comments (0)

Files changed (5)

+import numpy as np
+
+PAW_CODE_LUT = {0:'LF', 1:'RH', 2:'RF', 3:'LH'}
+INV_PAW_CODE_LUT = {'LF':0, 'RH':1, 'RF':2, 'LH':3}
+STD_NUMX, STD_NUMY = 20, 20
+
+def find_paws(data):
+    """Detects and isolates contiguous regions in the input array"""
+    from scipy.ndimage import uniform_filter, label, find_objects
+    from scipy.ndimage.morphology import binary_fill_holes 
+    from scipy.ndimage.morphology import generate_binary_structure
+
+    data = uniform_filter(data, 4)
+    # Threshold the blurred data (this needs to be a bit > 0 due to the blur)
+    thresh = data > (0.05 * data.max())
+    # Fill any interior holes in the paws to get cleaner regions...
+    filled = binary_fill_holes(thresh)
+    # Label each contiguous paw
+    coded_paws, num_paws = label(filled)
+    # Isolate the extent of each paw
+    data_slices = find_objects(coded_paws)
+    return data_slices
+
+def standardize_paw(paw):
+    """Standardizes a pawprint onto a STD_NUMYxSTD_NUMX grid. Returns a 1D, 
+    flattened version of the paw data resample onto this grid."""
+    from scipy.ndimage import map_coordinates
+    paw = paw.sum(axis=2)
+    ny, nx = paw.shape
+
+    # Trim off any "blank" edges around the paw...
+    mask = paw > 0.01 * paw.max()
+    y, x = np.mgrid[:ny, :nx]
+    ymin, ymax = y[mask].min(), y[mask].max() 
+    xmin, xmax = x[mask].min(), x[mask].max() 
+
+    # Make a 20x20 grid to resample the paw pressure values onto
+    xi = np.linspace(xmin, xmax, STD_NUMX)
+    yi = np.linspace(ymin, ymax, STD_NUMY)
+    xi, yi = np.meshgrid(xi, yi)
+
+    # Resample the values onto the 20x20 grid
+    coords = np.vstack([yi.flatten(), xi.flatten()])
+    zi = map_coordinates(paw, coords)
+
+    # Rescale the pressure values
+    zi -= zi.min()
+    zi /= zi.max()
+    zi -= zi.mean() #<- Helps distinguish front from hind paws...
+    return zi.flatten()
+
+def pattern_classification(data_slices):   
+    """Attempts to classify a sequence of paw impacts based on their spatial
+    and temporal pattern alone. Input "data_slices" must be sorted by initial
+    contact time!"""
+    #TODO: Add check to see if the paw before the last impact
+    #      landed off the sensor...
+
+    # Get the centroid for each paw impact...
+    paw_coords = []
+    for x,y,z in data_slices:
+        paw_coords.append([(item.stop + item.start) / 2.0 for item in (x,y)])
+    paw_coords = np.array(paw_coords)
+
+    # Make a vector between each sucessive impact...
+    dx, dy = np.diff(paw_coords, axis=0).T
+
+    #-- Group paws -------------------------------------------
+    paw_number = np.arange(len(paw_coords))
+
+    # Did we miss the hind paw impact after the first 
+    # front paw impact? If so, first dx will be positive...
+    if dx[0] > 0: 
+        paw_number[1:] += 1
+
+    # Are we starting with the left or right front paw...
+    # We assume we're starting with the left, and check dy[0].
+    # If dy[0] > 0 (i.e. the next paw impacts to the left), then
+    # it's actually the right front paw, instead of the left.
+    if dy[0] > 0: # Right front paw impact...
+        paw_number += 2
+            
+    paw_codes = paw_number % 4
+    paw_labels = [PAW_CODE_LUT[code] for code in paw_codes]
+
+    problems = paw_pattern_problems(paw_labels, dx, dy)
+    return paw_labels, problems
+
+def paw_pattern_problems(paw_labels, dx, dy):
+    """Check whether or not the label sequence "paw_labels" conforms to our
+    expected spatial pattern of paw impacts."""
+    # Check for problems... (This could be written a _lot_ more cleanly...)
+    problems = False
+    last = paw_labels[0]
+    for paw, dy, dx in zip(paw_labels[1:], dy, dx):
+        # Going from a left paw to a right, dy should be negative
+        if last.startswith('L') and paw.startswith('R') and (dy > 0):
+            problems = True
+            break
+        # Going from a right paw to a left, dy should be positive
+        if last.startswith('R') and paw.startswith('L') and (dy < 0):
+            problems = True
+            break
+        # Going from a front paw to a hind paw, dx should be negative
+        if last.endswith('F') and paw.endswith('H') and (dx > 0):
+            problems = True
+            break
+        # Going from a hind paw to a front paw, dx should be positive
+        if last.endswith('H') and paw.endswith('F') and (dx < 0):
+            problems = True
+            break
+        last = paw
+    return problems
+
+class ClassificationSystem(object):
+    """Holds the data and methods necessary to project a paw using an
+    eigenimage-based method. The basis vectors, etc are stored on disk
+    as numpy saved arrays (.npy)."""
+    def __init__(self):
+        try:
+            # Try to load the approriate data files
+            self.load_data()
+        except IOError:
+            # If the data files aren't there, just issue a warning...
+            # (presumably this is a first run, and they need to be built)
+            import warnings
+            warnings.warn('Classification data files not found!', 
+                    warnings.RuntimeWarning)
+
+    def __call__(self, paw):
+        """Calls self.classify"""
+        return self.classify(paw)
+
+    def load_data(self):
+        # This is a seperate method so that the data may be reloaded on the
+        # fly if it's changed...
+        import os
+        basedir = os.path.dirname(__file__)
+        load = lambda filename: np.load(os.path.join(basedir, filename))
+        self.template_paws = load('template_paws.npy')
+        self.average_paw = load('average_paw.npy')
+        self.basis_stds = load('basis_stds.npy')
+        self.basis_vecs = load('basis_vecs.npy')
+
+    def classify(paw):
+        """Classifies a (standardized) pawprint based on how close its eigenpaw
+        score is to known eigenpaw scores of the paws for each leg. Returns a code
+        of "LF", "LH", "RF", or "RH" for the left front, left hind, etc paws."""
+        # Subtract the average paw (known a-priori from the training dataset)
+        paw -= self.average_paw
+        # Project the paw into eigenpaw-space
+        scores = paw.dot(self.basis_vecs) 
+        # "whiten" the score so that all dimensions are equally important
+        scores /= self.basis_stds
+        # Select which template paw is the closest to the given paw...
+        diff = self.template_paws - scores
+        diff *= diff
+        diff = np.sqrt(diff.sum(axis=1))
+        # Return the ascii code for the closest template paw.
+        return PAW_CODE_LUT[diff.argmin()]
+
+# Create an instance of ClassificationSystem, so that you can just call
+# analysis.classify(paw)
+classify = ClassificationSystem()
+import numpy as np
+
+import data
+import analysis
+
+def main():
+    # Read through all data and select measurements where the trapezoid
+    # pattern algorithm thinks it correctly classified the paw impacts,
+    # then use these data to build up a set of "eigenpaws" (basis_vecs)
+    paw_data, average_paw, codes, basis_vecs = make_training_dataset()
+
+    # Project the standardized paw data into the space specified by the
+    # basis_vecs (i.e. project them into eigenpaw-space)
+    paw_scores = (paw_data - average_paw).dot(basis_vecs)
+
+    # In order for the distance classification to be equally sensitive to
+    # all dimensions of the score vector, we need to rescale things by the
+    # standard deviation in each dimension. 
+    basis_stds = paw_scores.std(axis=0)
+    paw_scores /= basis_stds
+
+    # Group the paw scores by code and determine what the average score vector
+    # looks like for each leg (i.e. left front, right hind, etc)
+    template_paws = []
+    for i in range(4):
+        leg_data = paw_scores[codes == i]
+        template_paws.append(leg_data.mean(axis=0))
+    template_paws = np.vstack(template_paws)
+
+    #-- Save the key data that we need to do this same classification
+    # on any given paw....
+    np.save('average_paw.npy', average_paw)
+    np.save('basis_stds.npy', basis_stds)
+    np.save('basis_vecs.npy', basis_vecs)
+    np.save('template_paws.npy', template_paws)
+
+    #- Optionally, we can save the raw training data for later analysis...
+    np.save('training_paw_data.npy', paw_data)
+    np.save('training_paw_codes.npy', codes)
+
+def make_training_dataset():
+    paw_data, codes = [], []
+    for run in data.measurements:
+        print 'Processing: ', run.dog.name, run.name
+        result = process_measurement_for_training(run)
+        if result is not None:
+            print '    Using this measurement in the training dataset'
+            paw_data.extend(result[0])
+            codes.extend(result[1])
+    paw_data = np.vstack(paw_data)
+    codes = np.array(codes)
+    num_basis_vecs = 50
+    average_paw = paw_data.mean(axis=0)
+    basis_vecs = make_eigenpaws(paw_data - average_paw, num_basis_vecs)
+    return paw_data, average_paw, codes, basis_vecs
+
+def process_measurement_for_training(run):
+    data_slices = [impact.data_slice for impact in run.impacts]
+    labels, problems = analysis.pattern_classification(data_slices)
+    paws, codes = [], []
+    if not problems:
+        # Always discard the last impact, as we can't ID it reliably
+        for impact, label in zip(run.impacts[:-1], labels[:-1]):
+            # Discard impacts that touch the edge of the sensor pad
+            if not touches_edges(impact, run.data): 
+                paws.append(impact.standardized_pawprint)
+                codes.append(analysis.INV_PAW_CODE_LUT[label])
+        return paws, codes
+    else:
+        return None
+
+def touches_edges(impact, data):
+    nx, ny, nt = data.shape
+    xtouch = (impact.x.min() == 0) or (impact.x.max() == nx)
+    ytouch = (impact.y.min() == 0) or (impact.y.max() == ny)
+    return xtouch or ytouch
+
+def make_eigenpaws(paw_data, num_basis_vecs=50):
+    """Creates a set of eigenpaws based on paw_data.
+    paw_data is a numdata by numdimensions matrix of all of the observations.
+    The mean (average_paw) must have been removed from paw_data prior to 
+    calling this function."""
+    # Determine the eigenvectors of the covariance matrix of the data
+    cov = np.cov(paw_data.T)
+    eigvals, eigvecs = np.linalg.eig(cov)
+
+    # Sort the eigenvectors by ascending eigenvalue (largest is last)
+    eig_idx = np.argsort(eigvals)
+    sorted_eigvecs = eigvecs[:,eig_idx]
+    sorted_eigvals = eigvals[:,eig_idx]
+
+    # Now choose a cutoff number of eigenvectors to use 
+    basis_vecs = sorted_eigvecs[:,-num_basis_vecs:]
+
+    return basis_vecs
+
+if __name__ == '__main__':
+    main()
Binary file added.
+import os
+import numpy as np
+import h5py
+
+import analysis
+
+basedir = os.path.dirname(__file__)
+data = h5py.File(os.path.join(basedir, 'data.hdf5'), 'r')
+
+class Once(object):
+    """
+    A decorator class that ensures that properties are only evaluated once.
+    From: <http://code.activestate.com/recipes/363602-lazy-property-evaluation/>
+    """
+    def __init__(self, calculate_function):
+        self._calculate = calculate_function
+
+    def __get__(self, obj, _=None):
+        if obj is None:
+            return self
+        value = self._calculate(obj)
+        setattr(obj, self._calculate.func_name, value)
+        return value
+
+class Dog(object):
+    def __init__(self, group):
+        self.group = group
+        self.name = group.attrs['name']
+
+    @property
+    def measurements(self):
+        for measurement in self.group.values():
+            yield Measurement(measurement)
+
+class Measurement(object):
+    def __init__(self, group):
+        self.dog = group.parent
+        self.group = group
+        self.name = group.attrs['name']
+        self.date = group.attrs['date']
+
+    @Once
+    def data(self):
+        return self.group['pressure'][:]
+
+    @Once
+    def time(self):
+        return self.group['time'][:]
+
+    @Once
+    def impacts(self):
+        data, time = self.data, self.time
+        # Find paw impacts in data
+        data_slices = analysis.find_paws(data)
+        # Sort slices by initial time of impact
+        data_slices.sort(key=lambda s: s[-1].start)
+        return [Impact(datslice, data, time) for datslice in data_slices]
+
+class Impact(object):
+    def __init__(self, slices, data, time):
+        self.data_slice = slices
+        self.xslice, self.yslice, self.timeslice = self.data_slice
+        self.pawprint = data[slices]
+        self.time = time[self.timeslice]
+        self.x = np.ogrid[self.xslice]
+        self.y = np.ogrid[self.yslice]
+        self.duration = self.time.ptp()
+
+    @Once
+    def standardized_pawprint(self):
+        return analysis.standardize_paw(self.pawprint)
+
+    @Once
+    def paw(self):
+        return analysis.classify(self.standardized_pawprint)
+
+dogs = (Dog(item) for item in data.values())
+measurements = (run for dog in dogs for run in dog.measurements)
+import os
+import numpy as np
+import h5py
+
+DATADIR = os.path.join(
+            os.path.dirname(__file__),
+            'data')
+def main():
+    h5file = h5py.File('junk.hdf5', 'w')
+
+    for dir in os.listdir(DATADIR):
+        dogname = os.path.basename(dir.replace('_', ' '))
+        print dogname
+
+        dog_group = h5file.create_group(dogname)
+        dog_group.attrs['name'] = dogname
+        print dog_group.attrs['name']
+
+        for filename in os.listdir(os.path.join(DATADIR, dir)):
+            datafile = Datafile(os.path.join(DATADIR, dir, filename))
+            meas_name, datestring, _ = filename.split(' - ')
+            print filename
+
+            time, data = datafile.load()
+            measurement_group = dog_group.create_group(meas_name)
+            measurement_group.attrs['name'] = meas_name
+            measurement_group.attrs['date'] = datestring
+            measurement_group.create_dataset('pressure', data=data, compression='gzip')
+            measurement_group.create_dataset('time', data=time)
+
+    h5file.close()
+
+class Datafile(object):
+    """
+    Reads in the results of a single measurement.
+    Expects an ascii file of timesteps formatted similar to this:
+
+    Frame 0 (0.00 ms)
+    0.0 0.0 0.0
+    0.0 0.0 0.0
+
+    Frame 1 (0.53 ms)
+    0.0 0.0 0.0
+    0.0 0.0 0.0
+    ...
+    """
+    def __init__(self, filename):
+        self.filename = filename
+
+    def __iter__(self):
+        """Iterates over timesteps. Yields a time and a pressure array."""
+        def read_frame(infile):
+            """Reads a frame from the infile."""
+            frame_header = infile.next().strip().split()
+            time = float(frame_header[-2][1:])
+            data = []
+            while True:
+                line = infile.next().strip().split()
+                if line == []:
+                    break
+                data.append(line)
+            return time, np.array(data, dtype=np.float32)
+
+        with open(self.filename) as infile:
+            while True:
+                yield read_frame(infile)
+
+    def load(self):
+        """Reads all data in the datafile. Returns an array of times for each
+        slice, and a 3D array of pressure data with shape (nx, ny, ntimes)."""
+        times, dataslices = [], []
+        for time, data in self:
+            times.append(time)
+            dataslices.append(data)
+        return np.array(times, dtype=np.float32), np.dstack(dataslices)
+
+
+if __name__ == '__main__':
+    main()
+        
+