PyRNN / pyrnn / elmannet.py

# -*- coding: utf-8 -*-
import numpy

class NetVars(object):
    """
    Variables for network structure
    """

    def make_grad_for(self, varnames):
        """
        make same shape `numpy.array` of variable in `varnames`
        named as `dEdXxx` if `xxx` is in `varnames`.
        """
        for vn in varnames:
            dEdVn = 'dEd' + vn[0].upper() + vn[1:]
            self.__dict__[dEdVn] = numpy.zeros_like(self.__getattribute__(vn))

    def make_update_for(self, varnames):
        """
        make same shape `numpy.array` of variable in `varnames`
        named as `dXxx` if `xxx` is in `varnames`.
        """
        for vn in varnames:
            dVn = 'd' + vn[0].upper() + vn[1:]
            self.__dict__[dVn] = numpy.zeros_like(self.__getattribute__(vn))

    def get_vars(self, varnames):
        """
        return list of variables
        """
        return [self.__getattribute__(vn) for vn in varnames]

class ElmanNet(object):

    def __init__(self, num_o, num_c, num_i, steps):
        """
        num_o: number of output neurons
        num_i: number of input neurons
        num_c: number of context neurons
        steps: time steps
        """
        self.ns = NetVars()

        self.ns.time_step = numpy.arange(steps, dtype=numpy.int)
        self.ns.Eto = numpy.zeros((steps, num_o,), dtype=numpy.float)
        
        self.ns.to = numpy.zeros((steps, num_o,), dtype=numpy.float)
        self.ns.xo = numpy.zeros((steps, num_o,), dtype=numpy.float)
        self.ns.uo = numpy.zeros((steps, num_o,), dtype=numpy.float)
        self.ns.xc = numpy.zeros((steps, num_c,), dtype=numpy.float)
        self.ns.uc = numpy.zeros((steps, num_c,), dtype=numpy.float)
        self.ns.ti = numpy.zeros((steps, num_i,), dtype=numpy.float)
        self.ns.xi = numpy.zeros((steps, num_i,), dtype=numpy.float)

        self.ns.woc = numpy.zeros((num_o, num_c,), dtype=numpy.float)
        self.ns.wcc = numpy.zeros((num_c, num_c,), dtype=numpy.float)
        self.ns.wci = numpy.zeros((num_c, num_i,), dtype=numpy.float)

        self.ns.bo = numpy.zeros((num_o,), dtype=numpy.float)
        self.ns.bc = numpy.zeros((num_c,), dtype=numpy.float)
        self.ns.ec = numpy.ones((num_c,), dtype=numpy.float)

        self.ns.dXodUo = numpy.zeros((steps, num_o,), dtype=numpy.float)
        self.ns.dXcdUc = numpy.zeros((steps, num_c,), dtype=numpy.float)
        
        self.ns.make_grad_for(['uo','uc','woc','wcc','wci','bo','bc'])
        self.ns.make_update_for(['woc','wcc','wci','bo','bc'])
        self.ns.dUc0 = numpy.zeros((num_c,), dtype=numpy.float)

        self.fb_rate  = 1
        self.fb_delay = 1
        
        self.set_activation()
        self.momentum = 0
        self.learn_rate = 0.1

    def set_activation(self):
        self.avo = numpy.tanh
        self.avc = numpy.tanh

    def fptt(self):
        xo = self.ns.xo
        uo = self.ns.uo
        xc = self.ns.xc
        uc = self.ns.uc
        xi = self.ns.xi

        woc = self.ns.woc
        wcc = self.ns.wcc
        wci = self.ns.wci

        bo = self.ns.bo
        bc = self.ns.bc
        ec = self.ns.ec

        avo = self.avo
        avc = self.avc
        dot = numpy.dot

        xc[0] = avc(uc[0])
        for t in self.ns.time_step[1:]:
            uc[t] = (1-ec)*uc[t-1] + ec*(dot(wci,xi[t])+dot(wcc,xc[t-1])+bc)
            xc[t] = avc(uc[t])
            uo[t] = dot(woc,xc[t])+bo
            xo[t] = avo(uo[t])

    def fptt_with_fb(self):
        xo = self.ns.xo
        uo = self.ns.uo
        xc = self.ns.xc
        uc = self.ns.uc
        xi = self.ns.xi

        woc = self.ns.woc
        wcc = self.ns.wcc
        wci = self.ns.wci

        bo = self.ns.bo
        bc = self.ns.bc
        ec = self.ns.ec

        avo = self.avo
        avc = self.avc
        dot = numpy.dot

        fbr = self.fb_rate
        fbd = self.fb_delay

        xc[fbd-1] = avc(uc[fbd-1])
        for t in self.ns.time_step[fbd:]:
            xi[t] = fbr * xo[t-fbd] + (1 - fbr) * to[t-fbd]
            uc[t] = (1-ec)*uc[t-1] + ec*(dot(wci,xi[t])+dot(wcc,xc[t-1])+bc)
            xc[t] = avc(uc[t])
            uo[t] = dot(woc,xc[t])+bo
            xo[t] = avo(uo[t])

    def set_dXdU(self):
        self.ns.dXodUo[:] = 1 - self.ns.xo**2
        self.ns.dXcdUc[:] = 1 - self.ns.xc**2

    def set_E(self):
        self.set_dXdU()
        delta = self.ns.xo - self.ns.to
        self.ns.dEdUo = delta * self.ns.dXodUo
        self.ns.Eto   = delta**2 / 2.0

    def bptt(self):
        xc = self.ns.xc
        xi = self.ns.xi
        woc = self.ns.woc
        ec = self.ns.ec
        
        dEdUc  = self.ns.dEdUc
        dEdUo  = self.ns.dEdUo
        dEdWoc = self.ns.dEdWoc
        dEdWcc = self.ns.dEdWcc
        dEdWci = self.ns.dEdWci
        dEdBo  = self.ns.dEdBo
        dEdBc  = self.ns.dEdBc
        dXcdUc = self.ns.dXcdUc
        dUcdUc = self.dUcdUc
        
        dot = numpy.dot
        outer = numpy.outer

        for t in self.ns.time_step[:0:-1]:
            dEdUc[t] += dot(dEdUo[t],woc)*dXcdUc[t]
            dEdUc[t-1] += dot(dEdUc[t],dUcdUc(t-1))
            dEdWoc[:] += outer(dEdUo[t]   ,xc[t])
            dEdWcc[:] += outer(dEdUc[t]*ec,xc[t-1])
            dEdWci[:] += outer(dEdUc[t]*ec,xi[t])
            dEdBo[:] += dEdUo[t]
            dEdBc[:] += dEdUc[t]*ec

    def dUcdUc(self,t):
        "return matrix dUc'[t+1]/dUc[t]"
        ec = self.ns.ec
        wcc = self.ns.wcc
        dXcdUc = self.ns.dXcdUc
        dot = numpy.dot
        diag = numpy.diag
        return diag(1-ec) + dot(diag(ec),dot(wcc,diag(dXcdUc[t])))

    def reset_grad(self):
        for var in self.ns.get_vars(['dEdUc','dEdBo','dEdBc',
                                     'dEdWoc','dEdWcc','dEdWci',]):
            var.fill(0)

    def set_input(self):
        self.ns.xi[:] = self.ns.ti

    def set_learn_rate(self, _learn_rate):
        num = self.ns.to.shape[1]*self.ns.to.shape[0]
        self.learn_rate = float(_learn_rate)/num

    def change_net(self):
        mo = self.momentum
        lr = self.learn_rate
        list_V    = self.ns.get_vars(['bo','bc', 'woc','wcc','wci',])
        list_dV   = self.ns.get_vars(['dBo','dBc', 'dWoc','dWcc','dWci',])
        list_dEdV = self.ns.get_vars(['dEdBo','dEdBc','dEdWoc','dEdWcc','dEdWci',])
        for (dV, dEdV) in zip(list_dV, list_dEdV):
            dV[:] = mo * dV - lr * dEdV
        for (V, dV) in zip(list_V, list_dV):
            V[:] += dV
        self.ns.dUc0[:] = mo * self.ns.dUc0 - lr * self.ns.dEdUc[0]
        self.ns.uc[0] += self.ns.dUc0
        
    def record(self):
        rms = lambda x: numpy.sqrt((x**2).mean())
        return numpy.array([
            self.ns.Eto[1:].mean(),
            ((self.ns.to[1:]-self.ns.xo[1:])**2).mean(),
            rms(self.ns.woc),
            rms(self.ns.wcc),
            rms(self.ns.wci),
            rms(self.ns.bo),
            rms(self.ns.bc),
            rms(self.ns.uc[0]),
            ])

    def randomize_param(self):
        num_o = self.ns.xo.shape[1]
        num_c = self.ns.xc.shape[1]
        num_i = self.ns.xi.shape[1]
        rnd = numpy.random.random_sample
        self.ns.woc = (rnd((num_o,num_c))-0.5) * 2 /num_c
        self.ns.wcc = (rnd((num_c,num_c))-0.5) * 2 /num_c
        self.ns.wci = (rnd((num_c,num_i))-0.5) * 2 /num_c
        self.ns.bo = (rnd(num_o)-0.5) * 2 /num_c
        self.ns.bc = (rnd(num_c)-0.5) * 2 /num_c
        self.ns.uc[0] = (rnd(num_c)-0.5) * 2

class ElmanNetSigm(ElmanNet):
    def set_activation(self):
        self.avo = lambda x: 1 / (1 + numpy.exp(-x))
        self.avc = numpy.tanh
    
    def set_dXdU(self):
        self.ns.dXodUo[:] = (1 - self.ns.xo)*self.ns.xo
        self.ns.dXcdUc[:] = 1 - self.ns.xc**2

@numpy.vectorize
def error_func_bern(t,x):
    if t <= 0:
        return - numpy.log(1-x)
    elif t >= 1:
        return - numpy.log(x)
    else:
        return ( t * (numpy.log(t) - numpy.log(x)) +
                 (1-t) * (numpy.log(1-t) - numpy.log(1-x)) )

class ElmanNetBern(ElmanNet):
    def set_activation(self):
        self.avo = lambda x: 1 / (1 + numpy.exp(-x))
        self.avc = numpy.tanh
    
    def set_dXdU(self):
        self.ns.dXodUo[:] = (1 - self.ns.xo)*self.ns.xo
        self.ns.dXcdUc[:] = 1 - self.ns.xc**2

    def set_E(self):
        self.set_dXdU()
        self.ns.dEdUo = self.ns.xo - self.ns.to
        self.ns.Eto = error_func_bern(self.ns.to, self.ns.xo)

def softmax(x):
    e = numpy.exp(x)
    return e/numpy.sum(e)

@numpy.vectorize
def error_func_diri(t,x):
    if t <= 0:
        return 0.0
    return t*(numpy.log(t) - numpy.log(x))

class ElmanNetDiri(ElmanNet):
    def set_activation(self):
        self.avo = softmax
        self.avc = numpy.tanh
    
    def set_dXdU(self):
        self.ns.dXcdUc[:] = 1 - self.ns.xc**2

    def set_E(self):
        self.set_dXdU()
        self.ns.dEdUo = self.ns.xo - self.ns.to
        self.ns.Eto = error_func_diri(self.ns.to, self.ns.xo)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.