Wiki

Clone wiki

SparkNeuralNets / Home

Scala / Spark Neural Network framework

See Overview for Architecture and Features.

Run Example

Performance

I've only tested this locally so far.

There's no explicit code optimisation in the framework, so it is not particularly performant. It is however fast enough to run comparisons against Tensorflow for full data sets. That said, although the data as been verified against Tensorflow for single minibatches, the learning rate is about 4 times slower when running multiple mini-batches. I think this may be improved if / when I add Batch Normalization.

Tensorflow

Configuration

This uses tf.train.AdamOptimizer with all parameters other than the learning rate set to their default values.

#!python
def create_TF_run_config(X, learning_rate, iterations, optimizer):
    params_CNN = [tf_conv_layer('SAME', 1, 3, 4, 8), 
                 tf_pool_layer(8,8),
                 tf_conv_layer('SAME', 1, 8, 2, 16), 
                 tf_pool_layer(4, 4)]
    params = tf_global_params(X, learning_rate, iterations, optimizer), params_CNN, tf_fully_connected_layer()
    return tf_assign_layer_values(params)

Execution:

#!python
tf.reset_default_graph()
np.random.seed(1)

ops.reset_default_graph()  
params = create_TF_run_config(X_train, 0.009, 25, 'adam')
costs = tf_train_and_predict(X_train,Y_train,X_test,Y_test, params)
print(np.array(costs))

Results

tf_run_stats.png

Scala

Configuration

#!scala

  @Test def localData(): Unit = {
    val ts = System.currentTimeMillis()
    val (xs, ys) = DataSetLoader.getTrainingData()
    println(s"Loaded ${xs.size} training samples in ${System.currentTimeMillis() - ts}ms")

    val globalParams = Parameters()
      .setIterations(25)
      .setLearningRate(0.009)
      .setAdam(0.9, 0.999, 1e-08)
      .setSamplerSpec("random", 64)

    val cnnParams = CNNParameters(globalParams)
      .setCNNLayerParams(ConvolutionLayerParameters("same", 1, 3, 4, 8),
        PoolingLayerParameters(8, 8, true),
        ConvolutionLayerParameters("same", 1, 8, 2, 16),
        PoolingLayerParameters(4, 4, true))
      .setFilterWeightsAndBiasesInitializer("xavier")
      .createCNNSampler(ys, xs)

    val fcnParams = Parameters(globalParams)
      .setLayerDims(64, 6)
      .setActivations("softmax")
      .setCostFunction("crossentropy")
      .setWeightInitializer("xavier")

    val (xst, yst) = DataSetLoader.getTestData()
    println(s"Loaded ${xst.size} test samples")

    val id = CNNFCNModel.run(cnnParams, fcnParams, xs, ys, xst, yst, DataSetLoader.PATH + "model/", "localData10")
    println(s"Run $id finished")
  }

Results

scala_run_stats.png

Test Example

  1. Generate and persist a small dataset
  2. Capture gradients and values for all Tensorflow trainable veriables within each iteration of the run
  3. Run an equivalent pure Numpy configuration to persist all intermediate layer outputs and verify the correctness of the Numpy run against the Tensorflow weights and gradients
  4. Within the Scala framework, run a TensorflowRegression

Python

Parameters used by Tensorflow and Numpy runs:

#!python
samples = 1
iterations = 1
learning_rate = 0.009
optimizer = 'simple'

Create the configuration for the Tensorflow run:

#!python
def create_TF_run_config(X):
    params_CNN = [tf_conv_layer('SAME', 1, 3, 4, 8), 
                 tf_pool_layer(8,8),
                 tf_conv_layer('SAME', 1, 8, 2, 16), 
                 tf_pool_layer(4, 4)]
    params = tf_global_params(X, learning_rate, iterations, optimizer), params_CNN, tf_fully_connected_layer()
    return tf_assign_layer_values(params)```
Execute Tensorflow run and persist data:

#!python
tf.reset_default_graph()
np.random.seed(1)
x = np.random.randn(samples,64,64,3)
y = np.random.randn(samples,6)

ops.reset_default_graph()  
params = create_TF_run_config(x)
tf_train_test_data(x,y,params, record = True)

Create the configuration for the equivalent numpy run. The conv layer configuration differs from the Tensorflow as the padding for SAME is not derived but must be supplied.

#!python
def get_np_params_and_data():
    global_params, X, Y = load_global_params_and_data(samples, iterations, optimizer, learning_rate)
    params_CNN = [np_conv_layer(1, (1, 2, 1, 2)), 
                 np_pool_layer(8,8),
                 np_conv_layer(1, (0, 1, 0, 1)), 
                 np_pool_layer(4, 4)]
    params_FCN = np_fully_connected_layer()
    params = np_assign_layer_values((global_params,params_CNN, params_FCN))
    return params, X, Y

Execute the Numpy run and verify it's correctness:

#!python
params, X, Y = get_np_params_and_data()
np_train(X, Y, params, record=True)
verify_updated_values(params)

Scala

Wrap default CNN Layer configurations to capture outputs, weights and gradients

#!scala
    val clp1 = ConvolutionLayerParameters("same", 1, 3, 4, 8)
    val clp1Debugger = new BackPropLinearLayerIOCapturer(1, clp1)
    val plp2 = PoolingLayerParameters(8, 8)
    val plp2Debugger = new PoolingIOCapturer(2, plp2)

    val clp3 = ConvolutionLayerParameters("same", 1, 8, 2, 16)
    val clp3FeedForwardDebugger = new FeedForwardLayerIOCapturer(3, clp3)
    val clp3BackPropDebugger = new BackPropLinearLayerIOCapturer(3, clp3)
    val clp3BackPropNonLinearDebugger = new BackPropNonLinearLayerIOCapturer(3, clp3)

    val plp4 = PoolingLayerParameters(4, 4)
    val plp4Debugger = new PoolingIOCapturer(4, plp4)

Using the dataset generated above together with the Tensorflow initial weights, configure the run:

#!scala
    val xs = TensorflowRegression.getH5MultiChannel("1_1_tf_simple_X.txt")
    val ys = TensorflowRegression.getH5DataSet("1_1_tf_simple_Y.txt")

    val globalParams = Parameters()
      .setIterations(1)
      .setLearningRate(0.009)
      .setOptimizer("simple")
      .setSamplerSpec("sequential", 1, 1)

    val filterWeightsAndBiases = TensorflowRegression.loadTestCNNWeights()
    val cnnParams = CNNParameters(globalParams)
      .setCNNLayerParams(clp1,
        plp2,
        clp3,
        plp4)
      .setFilterWeightsAndBiases(filterWeightsAndBiases)
      .createCNNSampler(ys, xs)

    val weightsAndBiases = TensorflowRegression.loadTestFCNWeights()
    val fcnParams = Parameters(globalParams)
      .setLayerDims(64, 6)
      .setActivations("softmax")
      .setCostFunction("crossentropyTF")
      .setWeightsAndBiases(Array((weightsAndBiases)))
    val fcnImpls = new FCNDataCaptureImplementation(fcnParams)
    fcnParams.setFCNImplementations(fcnImpls)

Execute the run, then verify data as required:

#!scala
    val cost = CNNFCNLinkedNetwork.train(cnnParams, fcnParams)
    assertEquals(-3.78975, cost, CNNTestUtils.DEFAULT_PRECISION)

    val z5 = fcnImpls.feedForwardOutputsAndActivations(0).last._1
    val eZ5 = TensorflowRegression.getH5DataSet("1_1_np_simple_Z5_0.txt")
    MatrixTestUtils.assertMatricesEqual(eZ5, z5, CNNTestUtils.DEFAULT_PRECISION)


    val dW3 = clp3BackPropDebugger.dWeights(0)
    val edW3 = TensorflowRegression.getH5MultiChannel("1_1_np_simple_dW3_0.txt")
    CNNTestUtils.assertMCIEquals(edW3(0), dW3(0), CNNTestUtils.DEFAULT_PRECISION)

Etc

Updated