horn-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject incubator-horn git commit: HORN-24: Add neuron object manager
Date Tue, 24 May 2016 12:49:59 GMT
Repository: incubator-horn
Updated Branches:
  refs/heads/master ca560628c -> b5386349d


HORN-24: Add neuron object manager


Project: http://git-wip-us.apache.org/repos/asf/incubator-horn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-horn/commit/b5386349
Tree: http://git-wip-us.apache.org/repos/asf/incubator-horn/tree/b5386349
Diff: http://git-wip-us.apache.org/repos/asf/incubator-horn/diff/b5386349

Branch: refs/heads/master
Commit: b5386349d28979cb753a36b85f35865394954986
Parents: ca56062
Author: Edward J. Yoon <edwardyoon@apache.org>
Authored: Sat May 21 22:01:04 2016 +0900
Committer: Edward J. Yoon <edwardyoon@apache.org>
Committed: Tue May 24 21:46:50 2016 +0900

----------------------------------------------------------------------
 README.md                                       |   6 +-
 conf/horn-env.sh                                |   2 +-
 conf/log4j.properties                           |  86 +++++++++
 .../horn/core/AbstractLayeredNeuralNetwork.java |   1 -
 .../apache/horn/core/LayeredNeuralNetwork.java  | 174 ++++++++++---------
 .../horn/core/LayeredNeuralNetworkTrainer.java  |  42 +++--
 src/main/java/org/apache/horn/core/Neuron.java  |   4 +
 7 files changed, 213 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/b5386349/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index b797783..5fd125e 100644
--- a/README.md
+++ b/README.md
@@ -41,7 +41,7 @@ Then, we measure the margin of error of the output and adjust the weights
accord
 ```
 The advantages of this programming model is easy and intuitive to use.
 
-Also, Apache Horn provides a simplified and intuitive configuration interface. To create
neural network job and submit it to existing Hadoop or Hama cluster, we just add the layer
with its properties such as squashing function and neuron class. The below example configures
the create 4-layer neural network with 500 neurons in hidden layers for train MNIST dataset:
+Also, Apache Horn provides a simplified and intuitive configuration interface. To create
neural network job and submit it to existing Hadoop or Hama cluster, we just add the layer
with its properties such as squashing function and neuron class. The below example configures
the create 2-layer neural network with 100 neurons in hidden layers for train MNIST dataset:
 ```Java
   HornJob job = new HornJob(conf, MultiLayerPerceptron.class);
   job.setLearningRate(learningRate);
@@ -65,10 +65,10 @@ Download a MNIST training and label datasets, and convert into a HDFS
sequence f
 Then, train it with following command (in this example, we used η 0.01, α 0.9, λ 0.0005,
100 hidden units, and minibatch 10):
 ```
  % bin/horn jar horn-0.x.0.jar MultiLayerPerceptron /tmp/model /tmp/mnist.seq \
-   0.01 0.9 0.00075 784 100 10 10 12000
+   0.01 0.9 0.0005 784 100 10 10 12000
 ```
 
-With this default example, you'll reach over the 95% accuracy. The local-mode of multithread-based
parallel synchronous SGD will took around 1 hour to train. 
+With this default example, you'll reach over the 95% accuracy. The local-mode parallel synchronous
SGD based on multithreading will took around 30 mins ~ 1 hour to train. 
 
 ## High Scalability
 

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/b5386349/conf/horn-env.sh
----------------------------------------------------------------------
diff --git a/conf/horn-env.sh b/conf/horn-env.sh
index ca7ed32..26d190f 100644
--- a/conf/horn-env.sh
+++ b/conf/horn-env.sh
@@ -22,5 +22,5 @@
 # Set environment variables here.
 
 # The java implementation to use.  Required.
-export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home
+export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/
 

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/b5386349/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/conf/log4j.properties b/conf/log4j.properties
new file mode 100644
index 0000000..8e3b877
--- /dev/null
+++ b/conf/log4j.properties
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Define some default values that can be overridden by system properties
+hama.root.logger=INFO,console
+hama.log.dir=.
+hama.log.file=hama.log
+
+# Define the root logger to the system property "hama.root.logger".
+log4j.rootLogger=${hama.root.logger}
+
+# Logging Threshold
+log4j.threshhold=ALL
+
+#
+# Daily Rolling File Appender
+#
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hama.log.dir}/${hama.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hama.tasklog.taskid=null
+hama.tasklog.noKeepSplits=4
+hama.tasklog.totalLogFileSize=100
+hama.tasklog.purgeLogSplits=true
+hama.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hama.bsp.TaskLogAppender
+log4j.appender.TLA.taskId=${hama.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hama.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+log4j.logger.org.apache.zookeeper=ERROR
+log4j.logger.org.apache.avro=ERROR
+log4j.logger.org.apache.hama=INFO
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this 
+#
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+#log4j.logger.org.apache.hadoop.dfs=DEBUG
+#log4j.logger.org.apache.hama=DEBUG
+#log4j.logger.org.apache.zookeeper=DEBUG

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/b5386349/src/main/java/org/apache/horn/core/AbstractLayeredNeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/AbstractLayeredNeuralNetwork.java b/src/main/java/org/apache/horn/core/AbstractLayeredNeuralNetwork.java
index b82ad41..5ec57a2 100644
--- a/src/main/java/org/apache/horn/core/AbstractLayeredNeuralNetwork.java
+++ b/src/main/java/org/apache/horn/core/AbstractLayeredNeuralNetwork.java
@@ -33,7 +33,6 @@ import org.apache.horn.core.Constants.TrainingMethod;
 import org.apache.horn.funcs.CategoricalCrossEntropy;
 import org.apache.horn.funcs.CrossEntropy;
 import org.apache.horn.funcs.FunctionFactory;
-import org.mortbay.log.Log;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/b5386349/src/main/java/org/apache/horn/core/LayeredNeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/LayeredNeuralNetwork.java b/src/main/java/org/apache/horn/core/LayeredNeuralNetwork.java
index d4f2f3e..d33726e 100644
--- a/src/main/java/org/apache/horn/core/LayeredNeuralNetwork.java
+++ b/src/main/java/org/apache/horn/core/LayeredNeuralNetwork.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.Constants;
 import org.apache.hama.commons.io.MatrixWritable;
 import org.apache.hama.commons.io.VectorWritable;
 import org.apache.hama.commons.math.DenseDoubleMatrix;
@@ -45,7 +46,9 @@ import org.apache.hama.commons.math.DoubleVector;
 import org.apache.hama.util.ReflectionUtils;
 import org.apache.horn.core.Constants.LearningStyle;
 import org.apache.horn.core.Constants.TrainingMethod;
+import org.apache.horn.examples.MultiLayerPerceptron.StandardNeuron;
 import org.apache.horn.funcs.FunctionFactory;
+import org.apache.horn.funcs.IdentityFunction;
 import org.apache.horn.funcs.SoftMax;
 
 import com.google.common.base.Preconditions;
@@ -80,6 +83,8 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork {
 
   protected int finalLayerIdx;
 
+  private List<Neuron[]> neurons = new ArrayList<Neuron[]>();
+
   public LayeredNeuralNetwork() {
     this.layerSizeList = Lists.newArrayList();
     this.weightMatrixList = Lists.newArrayList();
@@ -90,6 +95,33 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork
{
 
   public LayeredNeuralNetwork(HamaConfiguration conf, String modelPath) {
     super(conf, modelPath);
+
+    // initialize neuron objects
+    for (int i = 0; i < layerSizeList.size(); i++) {
+      int numOfNeurons = layerSizeList.get(i);
+      Class neuronClass;
+      if (i == 0)
+        neuronClass = Neuron.class;
+      else
+        neuronClass = neuronClassList.get(i - 1);
+
+      Neuron[] tmp = new Neuron[numOfNeurons];
+      for (int j = 0; j < numOfNeurons; j++) {
+        Neuron n = newNeuronInstance(StandardNeuron.class);
+        if (i > 0)
+          n.setSquashingFunction(squashingFunctionList.get(i - 1));
+        else
+          n.setSquashingFunction(new IdentityFunction());
+
+        n.setLayerIndex(i);
+
+        n.setLearningRate(this.learningRate);
+        n.setMomentumWeight(this.momentumWeight);
+        tmp[j] = n;
+      }
+
+      neurons.add(tmp);
+    }
   }
 
   @Override
@@ -234,6 +266,8 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork
{
   public void readFields(DataInput input) throws IOException {
     super.readFields(input);
 
+    this.finalLayerIdx = input.readInt();
+
     // read neuron classes
     int neuronClasses = input.readInt();
     this.neuronClassList = Lists.newArrayList();
@@ -273,6 +307,8 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork
{
   public void write(DataOutput output) throws IOException {
     super.write(output);
 
+    output.writeInt(finalLayerIdx);
+
     // write neuron classes
     output.writeInt(this.neuronClassList.size());
     for (Class<? extends Neuron> clazz : this.neuronClassList) {
@@ -321,11 +357,8 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork
{
       instanceWithBias.set(i, transformedInstance.get(i - 1));
     }
 
-    List<DoubleVector> outputCache = getOutputInternal(instanceWithBias);
     // return the output of the last layer
-    DoubleVector result = outputCache.get(outputCache.size() - 1);
-    // remove bias
-    return result.sliceUnsafe(1, result.getDimension() - 1);
+    return getOutputInternal(instanceWithBias);
   }
 
   /**
@@ -335,16 +368,24 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork
{
    * @param instanceWithBias The instance contains the features.
    * @return Cached output of each layer.
    */
-  public List<DoubleVector> getOutputInternal(DoubleVector instanceWithBias) {
-    List<DoubleVector> outputCache = new ArrayList<DoubleVector>();
-    // fill with instance
-    DoubleVector intermediateOutput = instanceWithBias;
-    outputCache.add(intermediateOutput);
+  public DoubleVector getOutputInternal(DoubleVector instanceWithBias) {
+    // sets the output of input layer
+    Neuron[] inputLayer = neurons.get(0);
+    for (int i = 0; i < inputLayer.length; i++) {
+      inputLayer[i].setOutput(instanceWithBias.get(i));
+    }
 
     for (int i = 0; i < this.layerSizeList.size() - 1; ++i) {
-      forward(i, outputCache);
+      forward(i);
     }
-    return outputCache;
+
+    DoubleVector output = new DenseDoubleVector(
+        neurons.get(this.finalLayerIdx).length);
+    for (int i = 0; i < output.getDimension(); i++) {
+      output.set(i, neurons.get(this.finalLayerIdx)[i].getOutput());
+    }
+
+    return output;
   }
 
   /**
@@ -360,38 +401,30 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork
{
    * Forward the calculation for one layer.
    * 
    * @param fromLayer The index of the previous layer.
-   * @param intermediateOutput The intermediateOutput of previous layer.
-   * @return a new vector with the result of the operation.
    */
-  protected void forward(int fromLayer, List<DoubleVector> outputCache) {
-    DoubleVector previousOutput = outputCache.get(fromLayer * 2); // skip
-                                                                  // intermediate
-                                                                  // output
-
+  protected void forward(int fromLayer) {
+    int curLayerIdx = fromLayer + 1;
     DoubleMatrix weightMatrix = this.weightMatrixList.get(fromLayer);
 
-    // LOG.info("intermediate: " + intermediateOutput.toString());
-    // DoubleVector vec = weightMatrix.multiplyVectorUnsafe(intermediateOutput);
-    // vec = vec.applyToElements(this.squashingFunctionList.get(fromLayer));
-
     DoubleFunction squashingFunction = getSquashingFunction(fromLayer);
-
     DoubleVector vec = new DenseDoubleVector(weightMatrix.getRowCount());
 
     for (int row = 0; row < weightMatrix.getRowCount(); row++) {
       List<Synapse<DoubleWritable, DoubleWritable>> msgs = new ArrayList<Synapse<DoubleWritable,
DoubleWritable>>();
       for (int col = 0; col < weightMatrix.getColumnCount(); col++) {
         msgs.add(new Synapse<DoubleWritable, DoubleWritable>(
-            new DoubleWritable(previousOutput.get(col)), new DoubleWritable(
-                weightMatrix.get(row, col))));
+            new DoubleWritable(neurons.get(fromLayer)[col].getOutput()),
+            new DoubleWritable(weightMatrix.get(row, col))));
       }
-      Iterable<Synapse<DoubleWritable, DoubleWritable>> iterable = msgs;
-      Neuron n = newNeuronInstance(this.neuronClassList.get(fromLayer));
-      n.setSquashingFunction(squashingFunction);
-      n.setLayerIndex(fromLayer);
+
+      Neuron n;
+      if (curLayerIdx == finalLayerIdx)
+        n = neurons.get(curLayerIdx)[row];
+      else
+        n = neurons.get(curLayerIdx)[row + 1];
 
       try {
-        n.forward(iterable);
+        n.forward(msgs);
       } catch (IOException e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
@@ -404,24 +437,21 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork
{
       IntermediateOutput interlayer = (IntermediateOutput) ReflectionUtils
           .newInstance(SoftMax.SoftMaxOutputComputer.class);
       try {
-        outputCache.add(vec);
         vec = interlayer.interlayer(vec);
+
+        for (int i = 0; i < vec.getDimension(); i++) {
+          neurons.get(curLayerIdx)[i].setOutput(vec.get(i));
+        }
+
       } catch (IOException e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
       }
-    } else {
-      outputCache.add(null);
     }
 
     // add bias
-    DoubleVector vecWithBias = new DenseDoubleVector(vec.getDimension() + 1);
-    vecWithBias.set(0, 1);
-    for (int i = 0; i < vec.getDimension(); ++i) {
-      vecWithBias.set(i + 1, vec.get(i));
-    }
-
-    outputCache.add(vecWithBias);
+    if (curLayerIdx != finalLayerIdx)
+      neurons.get(curLayerIdx)[0].setOutput(1);
   }
 
   /**
@@ -482,15 +512,13 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork
{
       labels = transformedVector.deepCopy();
     }
 
-    List<DoubleVector> internalResults = this.getOutputInternal(inputInstance);
-    DoubleVector output = internalResults.get(internalResults.size() - 1);
+    DoubleVector output = this.getOutputInternal(inputInstance);
 
     // get the training error
-    calculateTrainingError(labels,
-        output.deepCopy().sliceUnsafe(1, output.getDimension() - 1));
+    calculateTrainingError(labels, output);
 
     if (this.trainingMethod.equals(TrainingMethod.GRADIENT_DESCENT)) {
-      return this.trainByInstanceGradientDescent(labels, internalResults);
+      return this.trainByInstanceGradientDescent(labels);
     } else {
       throw new IllegalArgumentException(
           String.format("Training method is not supported."));
@@ -504,10 +532,7 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork
{
    * @param trainingInstance
    * @return The weight update matrices.
    */
-  private DoubleMatrix[] trainByInstanceGradientDescent(DoubleVector labels,
-      List<DoubleVector> internalResults) {
-
-    DoubleVector output = internalResults.get(internalResults.size() - 1);
+  private DoubleMatrix[] trainByInstanceGradientDescent(DoubleVector labels) {
 
     // initialize weight update matrices
     DenseDoubleMatrix[] weightUpdateMatrices = new DenseDoubleMatrix[this.weightMatrixList
@@ -526,26 +551,25 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork
{
         .get(this.weightMatrixList.size() - 1);
 
     for (int i = 0; i < deltaVec.getDimension(); ++i) {
+      double finalOut = neurons.get(finalLayerIdx)[i].getOutput();
       double costFuncDerivative = this.costFunction.applyDerivative(
-          labels.get(i), output.get(i + 1));
+          labels.get(i), finalOut);
       // add regularization
       costFuncDerivative += this.regularizationWeight
           * lastWeightMatrix.getRowVector(i).sum();
 
       if (!squashingFunction.getFunctionName().equalsIgnoreCase(
           SoftMax.class.getSimpleName())) {
-        costFuncDerivative *= squashingFunction.applyDerivative(output
-            .get(i + 1));
+        costFuncDerivative *= squashingFunction.applyDerivative(finalOut);
       }
 
+      neurons.get(finalLayerIdx)[i].setDelta(costFuncDerivative);
       deltaVec.set(i, costFuncDerivative);
     }
 
     // start from previous layer of output layer
     for (int layer = this.layerSizeList.size() - 2; layer >= 0; --layer) {
-      output = internalResults.get(layer * 2); // skip intermediate output
-      deltaVec = backpropagate(layer, deltaVec, output,
-          weightUpdateMatrices[layer]);
+      backpropagate(layer, weightUpdateMatrices[layer]);
     }
 
     this.setPrevWeightMatrices(weightUpdateMatrices);
@@ -558,53 +582,40 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork
{
    * delta of the prevLayer would be returned.
    * 
    * @param layer Index of current layer.
-   * @param internalOutput Internal output of current layer.
-   * @param deltaVec Delta of next layer.
-   * @return the squashing function of the specified position.
    */
-  private DoubleVector backpropagate(int curLayerIdx,
-      DoubleVector nextLayerDelta, DoubleVector curLayerOutput,
+  private void backpropagate(int curLayerIdx,
+  // DoubleVector nextLayerDelta, DoubleVector curLayerOutput,
       DenseDoubleMatrix weightUpdateMatrix) {
 
     // get layer related information
-    DoubleFunction squashingFunction = this.squashingFunctionList
-        .get(curLayerIdx);
     DoubleMatrix weightMatrix = this.weightMatrixList.get(curLayerIdx);
     DoubleMatrix prevWeightMatrix = this.prevWeightUpdatesList.get(curLayerIdx);
 
-    // next layer is not output layer, remove the delta of bias neuron
-    if (curLayerIdx != this.layerSizeList.size() - 2) {
-      nextLayerDelta = nextLayerDelta.slice(1,
-          nextLayerDelta.getDimension() - 1);
-    }
-
     DoubleVector deltaVector = new DenseDoubleVector(
         weightMatrix.getColumnCount());
 
     for (int row = 0; row < weightMatrix.getColumnCount(); ++row) {
-      Neuron n = newNeuronInstance(this.neuronClassList.get(curLayerIdx));
-      // calls setup method
-      n.setLearningRate(this.learningRate);
-      n.setMomentumWeight(this.momentumWeight);
-      n.setLayerIndex(curLayerIdx);
-
-      n.setSquashingFunction(squashingFunction);
-      n.setOutput(curLayerOutput.get(row));
+      Neuron n = neurons.get(curLayerIdx)[row];
+      n.setWeightVector(weightMatrix.getRowCount());
 
       List<Synapse<DoubleWritable, DoubleWritable>> msgs = new ArrayList<Synapse<DoubleWritable,
DoubleWritable>>();
 
-      n.setWeightVector(weightMatrix.getRowCount());
-
       for (int col = 0; col < weightMatrix.getRowCount(); ++col) {
+        double deltaOfNextLayer;
+        if (curLayerIdx + 1 == this.finalLayerIdx)
+          deltaOfNextLayer = neurons.get(curLayerIdx + 1)[col].getDelta();
+        else
+          deltaOfNextLayer = neurons.get(curLayerIdx + 1)[col + 1].getDelta();
+
         msgs.add(new Synapse<DoubleWritable, DoubleWritable>(
-            new DoubleWritable(nextLayerDelta.get(col)), new DoubleWritable(
+            new DoubleWritable(deltaOfNextLayer), new DoubleWritable(
                 weightMatrix.get(col, row)), new DoubleWritable(
                 prevWeightMatrix.get(col, row))));
       }
 
       Iterable<Synapse<DoubleWritable, DoubleWritable>> iterable = msgs;
       try {
-        n.backward(iterable);
+        n.backward(msgs);
       } catch (IOException e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
@@ -615,7 +626,6 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork
{
       deltaVector.set(row, n.getDelta());
     }
 
-    return deltaVector;
   }
 
   @Override
@@ -642,6 +652,8 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork
{
     job.setJarByClass(LayeredNeuralNetworkTrainer.class);
     job.setBspClass(LayeredNeuralNetworkTrainer.class);
 
+    job.getConfiguration().setInt(Constants.ADDITIONAL_BSP_TASKS, 1);
+    
     job.setInputPath(new Path(conf.get("training.input.path")));
     job.setInputFormat(org.apache.hama.bsp.SequenceFileInputFormat.class);
     job.setInputKeyClass(LongWritable.class);

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/b5386349/src/main/java/org/apache/horn/core/LayeredNeuralNetworkTrainer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/LayeredNeuralNetworkTrainer.java b/src/main/java/org/apache/horn/core/LayeredNeuralNetworkTrainer.java
index 350200f..275dd75 100644
--- a/src/main/java/org/apache/horn/core/LayeredNeuralNetworkTrainer.java
+++ b/src/main/java/org/apache/horn/core/LayeredNeuralNetworkTrainer.java
@@ -96,7 +96,8 @@ public final class LayeredNeuralNetworkTrainer
       try {
         LOG.info(String.format("End of training, number of iterations: %d.",
             this.iterations));
-        LOG.info(String.format("Write model back to %s", inMemoryModel.getModelPath()));
+        LOG.info(String.format("Write model back to %s",
+            inMemoryModel.getModelPath()));
         this.inMemoryModel.writeModelToFile();
       } catch (IOException e) {
         e.printStackTrace();
@@ -121,15 +122,21 @@ public final class LayeredNeuralNetworkTrainer
 
     while (this.iterations++ < maxIterations) {
       // each groom calculate the matrices updates according to local data
-      calculateUpdates(peer);
-      peer.sync();
-
-      // master merge the updates model
-      if (peer.getPeerIndex() == 0) {
-        mergeUpdates(peer);
+      if (peer.getPeerIndex() != peer.getNumPeers() - 1) {
+        calculateUpdates(peer);
+      } else {
+        // doing summation received updates
+        if (peer.getSuperstepCount() > 0) {
+          // and broadcasts previous updated weights
+          mergeUpdates(peer);
+        }
       }
+      
       peer.sync();
-      if (this.isConverge) {
+      
+      if(isConverge) {
+        if(peer.getPeerIndex() == peer.getNumPeers() - 1)
+          peer.sync();
         break;
       }
     }
@@ -189,7 +196,7 @@ public final class LayeredNeuralNetworkTrainer
         .getPrevMatricesUpdates();
     ParameterMessage outMessage = new ParameterMessage(avgTrainingError, false,
         weightUpdates, prevWeightUpdates);
-    peer.send(peer.getPeerName(0), outMessage);
+    peer.send(peer.getPeerName(peer.getNumPeers() - 1), outMessage);
   }
 
   /**
@@ -202,9 +209,9 @@ public final class LayeredNeuralNetworkTrainer
       BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, ParameterMessage>
peer)
       throws IOException {
     int numMessages = peer.getNumCurrentMessages();
-    boolean isConverge = false;
+    boolean converge = false;
     if (numMessages == 0) { // converges
-      isConverge = true;
+      converge = true;
       return;
     }
 
@@ -239,10 +246,11 @@ public final class LayeredNeuralNetworkTrainer
     this.inMemoryModel.setPrevWeightMatrices(prevMatricesUpdates);
 
     // check convergence
-    if (iterations % convergenceCheckInterval == 0) {
+    if (peer.getSuperstepCount() > 0
+        && iterations % convergenceCheckInterval == 0) {
       if (prevAvgTrainingError < curAvgTrainingError) {
         // error cannot decrease any more
-        isConverge = true;
+        converge = true;
       }
       // update
       prevAvgTrainingError = curAvgTrainingError;
@@ -251,13 +259,15 @@ public final class LayeredNeuralNetworkTrainer
       curAvgTrainingError = 0;
     }
     curAvgTrainingError += avgTrainingError / convergenceCheckInterval;
-
+    this.isConverge = converge;
+    
     // broadcast updated weight matrices
     for (String peerName : peer.getAllPeerNames()) {
-      ParameterMessage msg = new ParameterMessage(0, isConverge,
+      ParameterMessage msg = new ParameterMessage(0, converge,
           this.inMemoryModel.getWeightMatrices(),
           this.inMemoryModel.getPrevMatricesUpdates());
-      peer.send(peerName, msg);
+      if (!peer.getPeerName().equals(peerName))
+        peer.send(peerName, msg);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/b5386349/src/main/java/org/apache/horn/core/Neuron.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/Neuron.java b/src/main/java/org/apache/horn/core/Neuron.java
index af18c79..1c0f475 100644
--- a/src/main/java/org/apache/horn/core/Neuron.java
+++ b/src/main/java/org/apache/horn/core/Neuron.java
@@ -62,6 +62,10 @@ public abstract class Neuron<M extends Writable> implements Writable,
NeuronInte
     this.delta = gradient;
   }
 
+  public void setDelta(double delta) {
+    this.delta = delta;
+  }
+  
   public double getDelta() {
     return delta;
   }


Mime
View raw message