Source

draft-ann / bp.py

Full commit
#!/usr/bin/env python
#-*- coding:utf-8 -*-

from collections import namedtuple

import numpy as np
import matplotlib.pyplot as plt


NeuronNumber = namedtuple("NeuronNumber", ["input", "hidden", "output"])
LearningRate = namedtuple("LearningRate", ["alpha", "beta", "gamma"])
sigmoid = lambda x: 1 / (1 + np.exp(-x))


class BackPropagationNetwork(object):
    """Back Propagation Neuron Network"""

    def __init__(self, num, learning_rate):
        """Initialize a neuron network.
        :param num: named tuple `bpnn.NeuronNumber`, the numbers of neuron
                    nodes in three layer.
        :param learning_rate: named tuple `bpnn.LearningRate`, three rate
                              argument alpha, beta and gamma.
        """
        self.num = num
        self.learning_rate = learning_rate

        # initialize weights matrix
        self.input_to_hidden = np.random.rand(num.input, num.hidden) * 2 - 1
        self.hidden_to_output = np.random.rand(num.hidden, num.output) * 2 - 1

        # initialize thresholds
        self.hidden_thresholds = np.random.rand(1, num.hidden) * 2 - 1
        self.output_thresholds = np.random.rand(1, num.output) * 2 - 1

        # initialize weights' error
        self.input_to_hidden_error = np.zeros(self.input_to_hidden.shape)
        self.hidden_to_output_error = np.zeros(self.hidden_to_output.shape)

        # thresholds' error
        self.hidden_thresholds_error = np.zeros(self.hidden_thresholds.shape)
        self.output_thresholds_error = np.zeros(self.output_thresholds.shape)

    def train(self, data, desired_result):
        """Train the network with sample data.
        :param data: sample data array, it should have a shape (sp_a, sp_b),
                     sp_a been not limited and sp_b should equal to number of
                     input layer nodes.
        :param desired_result: the desired training result for sample data,
                               it should hava a shape (sp_a, 1), sp_a should
                               equal to the number of sample items.
        """
        # validate input np.array
        num_of_records, num_of_input_nodes = data.shape
        num_of_result_records, num_of_output_nodes = desired_result.shape
        assert num_of_input_nodes == self.num.input
        assert num_of_output_nodes == self.num.output
        assert num_of_records == num_of_result_records

        # normalize data to interval [-1, 1)
        max_values = data.max(axis=0)
        min_values = data.min(axis=0)
        data = ((data - min_values) / (max_values - min_values) - 0.5) * 2

        # iterate training network
        alpha, beta, gamma = self.learning_rate  
        evaluate_error = 0
        for index, item in enumerate(data):
            # desired result of current item
            desired_result_item = desired_result[index]

            # forward propagation
            item.shape = 1, -1
            hidden_activation = sigmoid(item.dot(self.input_to_hidden) +
                                        self.hidden_thresholds)
            output_activation = sigmoid(hidden_activation.dot(
                                        self.hidden_to_output) +
                                        self.output_thresholds)

            # back propagation and calculate the errors
            output_error = (output_activation * (1 - output_activation) *
                            (desired_result_item - output_activation))
            hidden_error = (hidden_activation * (1 - hidden_activation) *
                            output_error.dot(self.hidden_to_output.T))

            # adjust weights from hidden layer to output layer
            self.hidden_to_output_error = (alpha * hidden_activation.T *
                                           output_error + gamma *
                                           self.hidden_to_output_error)
            self.hidden_to_output += self.hidden_to_output_error

            # adjust weights from input layer to hidden layer
            self.input_to_hidden_error = beta * item.T.dot(hidden_error)
            self.input_to_hidden += self.input_to_hidden_error

            # adjust thresholds
            self.hidden_thresholds_error = (beta * hidden_error + gamma *
                                            self.hidden_thresholds_error)
            self.hidden_thresholds += self.hidden_thresholds_error

            self.output_thresholds_error = (alpha * output_error + gamma *
                                            self.output_thresholds_error)
            self.output_thresholds += self.output_thresholds_error

            # evaluate output layer error
            evaluate_error += (output_error ** 2).sum() * 0.5

            yield evaluate_error / len(data)

    def train_until(self, error_less_than, *args, **kwargs):
        """Train the network repeatedly until error less than a threshold.
        This is a wrap of `BackPropagationNetwork.train` method, only add a
        argument `error_less_than`.
        """
        error = np.Inf
        while error > error_less_than:
            training = self.train(*args, **kwargs)
            for error in training:
                yield error

# -------------
# Training Case
# -------------

def load_and_preprocess_data(filepath):
    """Load data from a text format file."""
    flower = namedtuple("flower", ["typeid", "a", "b", "c", "d"])
    with open(filepath, "r") as datafile:
        dataset = datafile.readlines()
    for record in dataset:
        record = record.strip().split(" ")[1:]
        record = (int(item) for item in record)
        yield flower(*record)


def main():
    # create a neuron net
    bpnet = BackPropagationNetwork(NeuronNumber(input=4, hidden=3, output=3),
                                   LearningRate(0.1, 0.1, 0.85))

    # load and preprocess train data
    raw_data = load_and_preprocess_data("./data.dat")
    raw_data = np.array(list(raw_data), dtype=np.double)
    train_data = raw_data[:, 1:]
    desired_result = np.array([[(1 if i == typeid else 0) for i in range(3)]
                               for typeid in raw_data[:, 0]], dtype=np.double)

    # train
    training = bpnet.train_until(data=train_data[:115, :],
                                 desired_result=desired_result[:115, :],
                                 error_less_than=0.0005)
    training_result = list(training)
    
    x = np.arange(len(training_result), step=10)
    y = np.array([training_result[ix] for ix in x])
    plt.figure(figsize=(8, 4))
    plt.plot(x, y, label="error (x)", color="blue")
    plt.xlabel("training times")
    plt.ylabel("error")
    plt.title("BP Demo")
    plt.legend()
    plt.show()


if __name__ == "__main__":
    main()