Wiki
Clone wikiSparkNeuralNets / Home
Scala / Spark Neural Network framework
See Overview for Architecture and Features.
Run Example
Performance
I've only tested this locally so far.
There's no explicit code optimisation in the framework, so it is not particularly performant. It is however fast enough to run comparisons against Tensorflow for full data sets. That said, although the data as been verified against Tensorflow for single minibatches, the learning rate is about 4 times slower when running multiple mini-batches. I think this may be improved if / when I add Batch Normalization.
Tensorflow
Configuration
This uses tf.train.AdamOptimizer with all parameters other than the learning rate set to their default values.
#!python def create_TF_run_config(X, learning_rate, iterations, optimizer): params_CNN = [tf_conv_layer('SAME', 1, 3, 4, 8), tf_pool_layer(8,8), tf_conv_layer('SAME', 1, 8, 2, 16), tf_pool_layer(4, 4)] params = tf_global_params(X, learning_rate, iterations, optimizer), params_CNN, tf_fully_connected_layer() return tf_assign_layer_values(params)
Execution:
#!python tf.reset_default_graph() np.random.seed(1) ops.reset_default_graph() params = create_TF_run_config(X_train, 0.009, 25, 'adam') costs = tf_train_and_predict(X_train,Y_train,X_test,Y_test, params) print(np.array(costs))
Results
Scala
Configuration
#!scala @Test def localData(): Unit = { val ts = System.currentTimeMillis() val (xs, ys) = DataSetLoader.getTrainingData() println(s"Loaded ${xs.size} training samples in ${System.currentTimeMillis() - ts}ms") val globalParams = Parameters() .setIterations(25) .setLearningRate(0.009) .setAdam(0.9, 0.999, 1e-08) .setSamplerSpec("random", 64) val cnnParams = CNNParameters(globalParams) .setCNNLayerParams(ConvolutionLayerParameters("same", 1, 3, 4, 8), PoolingLayerParameters(8, 8, true), ConvolutionLayerParameters("same", 1, 8, 2, 16), PoolingLayerParameters(4, 4, true)) .setFilterWeightsAndBiasesInitializer("xavier") .createCNNSampler(ys, xs) val fcnParams = Parameters(globalParams) .setLayerDims(64, 6) .setActivations("softmax") .setCostFunction("crossentropy") .setWeightInitializer("xavier") val (xst, yst) = DataSetLoader.getTestData() println(s"Loaded ${xst.size} test samples") val id = CNNFCNModel.run(cnnParams, fcnParams, xs, ys, xst, yst, DataSetLoader.PATH + "model/", "localData10") println(s"Run $id finished") }
Results
Test Example
- Generate and persist a small dataset
- Capture gradients and values for all Tensorflow trainable veriables within each iteration of the run
- Run an equivalent pure Numpy configuration to persist all intermediate layer outputs and verify the correctness of the Numpy run against the Tensorflow weights and gradients
- Within the Scala framework, run a TensorflowRegression
Python
Parameters used by Tensorflow and Numpy runs:
#!python samples = 1 iterations = 1 learning_rate = 0.009 optimizer = 'simple'
Create the configuration for the Tensorflow run:
#!python def create_TF_run_config(X): params_CNN = [tf_conv_layer('SAME', 1, 3, 4, 8), tf_pool_layer(8,8), tf_conv_layer('SAME', 1, 8, 2, 16), tf_pool_layer(4, 4)] params = tf_global_params(X, learning_rate, iterations, optimizer), params_CNN, tf_fully_connected_layer() return tf_assign_layer_values(params)```
#!python tf.reset_default_graph() np.random.seed(1) x = np.random.randn(samples,64,64,3) y = np.random.randn(samples,6) ops.reset_default_graph() params = create_TF_run_config(x) tf_train_test_data(x,y,params, record = True)
Create the configuration for the equivalent numpy run. The conv layer configuration differs from the Tensorflow as the padding for SAME is not derived but must be supplied.
#!python def get_np_params_and_data(): global_params, X, Y = load_global_params_and_data(samples, iterations, optimizer, learning_rate) params_CNN = [np_conv_layer(1, (1, 2, 1, 2)), np_pool_layer(8,8), np_conv_layer(1, (0, 1, 0, 1)), np_pool_layer(4, 4)] params_FCN = np_fully_connected_layer() params = np_assign_layer_values((global_params,params_CNN, params_FCN)) return params, X, Y
Execute the Numpy run and verify it's correctness:
#!python params, X, Y = get_np_params_and_data() np_train(X, Y, params, record=True) verify_updated_values(params)
Scala
Wrap default CNN Layer configurations to capture outputs, weights and gradients
#!scala val clp1 = ConvolutionLayerParameters("same", 1, 3, 4, 8) val clp1Debugger = new BackPropLinearLayerIOCapturer(1, clp1) val plp2 = PoolingLayerParameters(8, 8) val plp2Debugger = new PoolingIOCapturer(2, plp2) val clp3 = ConvolutionLayerParameters("same", 1, 8, 2, 16) val clp3FeedForwardDebugger = new FeedForwardLayerIOCapturer(3, clp3) val clp3BackPropDebugger = new BackPropLinearLayerIOCapturer(3, clp3) val clp3BackPropNonLinearDebugger = new BackPropNonLinearLayerIOCapturer(3, clp3) val plp4 = PoolingLayerParameters(4, 4) val plp4Debugger = new PoolingIOCapturer(4, plp4)
Using the dataset generated above together with the Tensorflow initial weights, configure the run:
#!scala val xs = TensorflowRegression.getH5MultiChannel("1_1_tf_simple_X.txt") val ys = TensorflowRegression.getH5DataSet("1_1_tf_simple_Y.txt") val globalParams = Parameters() .setIterations(1) .setLearningRate(0.009) .setOptimizer("simple") .setSamplerSpec("sequential", 1, 1) val filterWeightsAndBiases = TensorflowRegression.loadTestCNNWeights() val cnnParams = CNNParameters(globalParams) .setCNNLayerParams(clp1, plp2, clp3, plp4) .setFilterWeightsAndBiases(filterWeightsAndBiases) .createCNNSampler(ys, xs) val weightsAndBiases = TensorflowRegression.loadTestFCNWeights() val fcnParams = Parameters(globalParams) .setLayerDims(64, 6) .setActivations("softmax") .setCostFunction("crossentropyTF") .setWeightsAndBiases(Array((weightsAndBiases))) val fcnImpls = new FCNDataCaptureImplementation(fcnParams) fcnParams.setFCNImplementations(fcnImpls)
Execute the run, then verify data as required:
#!scala val cost = CNNFCNLinkedNetwork.train(cnnParams, fcnParams) assertEquals(-3.78975, cost, CNNTestUtils.DEFAULT_PRECISION) val z5 = fcnImpls.feedForwardOutputsAndActivations(0).last._1 val eZ5 = TensorflowRegression.getH5DataSet("1_1_np_simple_Z5_0.txt") MatrixTestUtils.assertMatricesEqual(eZ5, z5, CNNTestUtils.DEFAULT_PRECISION) val dW3 = clp3BackPropDebugger.dWeights(0) val edW3 = TensorflowRegression.getH5MultiChannel("1_1_np_simple_dW3_0.txt") CNNTestUtils.assertMCIEquals(edW3(0), dW3(0), CNNTestUtils.DEFAULT_PRECISION)
Etc
Updated