hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1490122 - in /hama/trunk: ./ ml/src/main/java/org/apache/hama/ml/perception/ ml/src/test/java/org/apache/hama/ml/perception/
Date Thu, 06 Jun 2013 04:06:26 GMT
Author: edwardyoon
Date: Thu Jun  6 04:06:26 2013
New Revision: 1490122

URL: http://svn.apache.org/r1490122
Log:
HAMA-681: Add Multi Layer Perceptron (Yexi Jiang via edwardyoon)

Added:
    hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/
    hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/CostFunction.java
    hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/CostFunctionFactory.java
    hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/LogisticCostFunction.java
    hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/MLPMessage.java
    hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/MultiLayerPerceptron.java
    hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/PerceptronTrainer.java
    hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/Sigmoid.java
    hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SmallMLPMessage.java
    hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SmallMLPTrainer.java
    hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SmallMultiLayerPerceptron.java
    hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SquaredError.java
    hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SquashingFunction.java
    hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SquashingFunctionFactory.java
    hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/Tanh.java
    hama/trunk/ml/src/test/java/org/apache/hama/ml/perception/
    hama/trunk/ml/src/test/java/org/apache/hama/ml/perception/TestSmallMLPMessage.java
    hama/trunk/ml/src/test/java/org/apache/hama/ml/perception/TestSmallMultiLayerPerceptron.java
Modified:
    hama/trunk/CHANGES.txt

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1490122&r1=1490121&r2=1490122&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Thu Jun  6 04:06:26 2013
@@ -4,6 +4,7 @@ Release 0.7 (unreleased changes)
 
   NEW FEATURES
 
+   HAMA-681: Add Multi Layer Perceptron (Yexi Jiang via edwardyoon)
    HAMA-619: Hama Pipes (Martin Illecker via edwardyoon)
 
   BUG FIXES

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/CostFunction.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/CostFunction.java?rev=1490122&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/CostFunction.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/CostFunction.java Thu Jun  6 04:06:26 2013
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.perception;
+
+/**
+ * The common interface for cost functions.
+ * 
+ */
+public abstract class CostFunction {
+
+  /**
+   * Get the error evaluated by squared error.
+   * 
+   * @param target The target value.
+   * @param actual The actual value.
+   * @return
+   */
+  public abstract double calculate(double target, double actual);
+
+  /**
+   * Get the partial derivative of squared error.
+   * 
+   * @param target
+   * @param actual
+   * @return
+   */
+  public abstract double calculateDerivative(double target, double actual);
+
+}

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/CostFunctionFactory.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/CostFunctionFactory.java?rev=1490122&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/CostFunctionFactory.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/CostFunctionFactory.java Thu Jun  6 04:06:26 2013
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.perception;
+
+/**
+ * The cost function factory that generates the cost function by name.
+ */
+public class CostFunctionFactory {
+
+  /**
+   * Get the cost function according to the name. If no matched cost function is
+   * found, return the SquaredError by default.
+   * 
+   * @param name The name of the cost function.
+   * @return The cost function instance.
+   */
+  public static CostFunction getCostFunction(String name) {
+    if (name.equalsIgnoreCase("SquaredError")) {
+      return new SquaredError();
+    } else if (name.equalsIgnoreCase("LogisticError")) {
+      return new LogisticCostFunction();
+    }
+    return new SquaredError();
+  }
+}

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/LogisticCostFunction.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/LogisticCostFunction.java?rev=1490122&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/LogisticCostFunction.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/LogisticCostFunction.java Thu Jun  6 04:06:26 2013
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.perception;
+
+/**
+ * The logistic cost function.
+ * 
+ * <pre>
+ * cost(t, y) = - t * log(y) - (1 - t) * log(1 - y),
+ * where t denotes the target value, y denotes the estimated value.
+ * </pre>
+ */
+public class LogisticCostFunction extends CostFunction {
+
+  @Override
+  public double calculate(double target, double actual) {
+    return -target * Math.log(actual) - (1 - target) * Math.log(1 - actual);
+  }
+
+  @Override
+  public double calculateDerivative(double target, double actual) {
+    double adjustedTarget = target;
+    double adjustedActual = actual;
+    if (adjustedActual == 1) {
+      adjustedActual = 0.999;
+    } else if (actual == 0) {
+      adjustedActual = 0.001;
+    }
+    if (adjustedTarget == 1) {
+      adjustedTarget = 0.999;
+    } else if (adjustedTarget == 0) {
+      adjustedTarget = 0.001;
+    }
+    return -adjustedTarget / adjustedActual + (1 - adjustedTarget)
+        / (1 - adjustedActual);
+  }
+
+}

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/MLPMessage.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/MLPMessage.java?rev=1490122&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/MLPMessage.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/MLPMessage.java Thu Jun  6 04:06:26 2013
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.perception;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * MLPMessage is used to hold the parameters that needs to be sent between the
+ * tasks.
+ */
+public abstract class MLPMessage implements Writable {
+  protected boolean terminated;
+
+  public MLPMessage(boolean terminated) {
+    setTerminated(terminated);
+  }
+
+  public void setTerminated(boolean terminated) {
+    this.terminated = terminated;
+  }
+
+  public boolean isTerminated() {
+    return terminated;
+  }
+
+}

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/MultiLayerPerceptron.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/MultiLayerPerceptron.java?rev=1490122&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/MultiLayerPerceptron.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/MultiLayerPerceptron.java Thu Jun  6 04:06:26 2013
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.perception;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hama.ml.math.DoubleVector;
+
+/**
+ * PerceptronBase defines the common behavior of all the concrete perceptrons.
+ */
+public abstract class MultiLayerPerceptron {
+
+  /* The trainer for the model */
+  protected PerceptronTrainer trainer;
+  /* The file path that contains the model meta-data */
+  protected String modelPath;
+
+  /* Model meta-data */
+  protected String MLPType;
+  protected double learningRate;
+  protected boolean regularization;
+  protected double momentum;
+  protected int numberOfLayers;
+  protected String squashingFunctionName;
+  protected String costFunctionName;
+  protected int[] layerSizeArray;
+
+  protected CostFunction costFunction;
+  protected SquashingFunction squashingFunction;
+
+  /**
+   * Initialize the MLP.
+   * 
+   * @param learningRate Larger learningRate makes MLP learn more aggressive.
+   * @param regularization Turn on regularization make MLP less likely to
+   *          overfit.
+   * @param momentum The momentum makes the historical adjust have affect to
+   *          current adjust.
+   * @param squashingFunctionName The name of squashing function.
+   * @param costFunctionName The name of the cost function.
+   * @param layerSizeArray The number of neurons for each layer. Note that the
+   *          actual size of each layer is one more than the input size.
+   */
+  public MultiLayerPerceptron(double learningRate, boolean regularization,
+      double momentum, String squashingFunctionName, String costFunctionName,
+      int[] layerSizeArray) {
+    this.learningRate = learningRate;
+    this.regularization = regularization; // no regularization
+    this.momentum = momentum; // no momentum
+    this.squashingFunctionName = squashingFunctionName;
+    this.costFunctionName = costFunctionName;
+    this.layerSizeArray = layerSizeArray;
+    this.numberOfLayers = this.layerSizeArray.length;
+
+    this.costFunction = CostFunctionFactory
+        .getCostFunction(this.costFunctionName);
+    this.squashingFunction = SquashingFunctionFactory
+        .getSquashingFunction(this.squashingFunctionName);
+  }
+
+  /**
+   * Initialize a multi-layer perceptron with existing model.
+   * 
+   * @param modelPath Location of existing model meta-data.
+   */
+  public MultiLayerPerceptron(String modelPath) {
+    this.modelPath = modelPath;
+  }
+
+  /**
+   * Train the model with given data. This method invokes a perceptron training
+   * BSP task to train the model. It then write the model to modelPath.
+   * 
+   * @param dataInputPath The path of the data.
+   * @param trainingParams Extra parameters for training.
+   */
+  public abstract void train(Path dataInputPath,
+      Map<String, String> trainingParams) throws Exception;
+
+  /**
+   * Get the output based on the input instance and the learned model.
+   * 
+   * @param featureVector The feature of an instance to feed the perceptron.
+   * @return The results.
+   */
+  public abstract DoubleVector output(DoubleVector featureVector)
+      throws Exception;
+
+  /**
+   * Read the model meta-data from the specified location.
+   * 
+   * @throws IOException
+   */
+  protected abstract void readFromModel() throws IOException;
+
+  /**
+   * Write the model data to specified location.
+   * 
+   * @param modelPath The location in file system to store the model.
+   * @throws IOException
+   */
+  public abstract void writeModelToFile(String modelPath) throws IOException;
+
+  public String getModelPath() {
+    return modelPath;
+  }
+
+  public String getMLPType() {
+    return MLPType;
+  }
+
+  public double getLearningRate() {
+    return learningRate;
+  }
+
+  public boolean isRegularization() {
+    return regularization;
+  }
+
+  public double getMomentum() {
+    return momentum;
+  }
+
+  public int getNumberOfLayers() {
+    return numberOfLayers;
+  }
+
+  public String getSquashingFunctionName() {
+    return squashingFunctionName;
+  }
+
+  public String getCostFunctionName() {
+    return costFunctionName;
+  }
+
+  public int[] getLayerSizeArray() {
+    return layerSizeArray;
+  }
+
+}

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/PerceptronTrainer.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/PerceptronTrainer.java?rev=1490122&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/PerceptronTrainer.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/PerceptronTrainer.java Thu Jun  6 04:06:26 2013
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.perception;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.ml.writable.VectorWritable;
+
+/**
+ * The trainer that is used to train the perceptron with BSP. The trainer would
+ * read the training data and obtain the trained parameters of the model.
+ * 
+ */
+public abstract class PerceptronTrainer extends
+    BSP<LongWritable, VectorWritable, NullWritable, NullWritable, MLPMessage> {
+
+  protected Configuration conf;
+  protected int maxIteration;
+  protected int batchSize;
+  protected String trainingMode;
+
+  @Override
+  public void setup(
+      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, MLPMessage> peer)
+      throws IOException, SyncException, InterruptedException {
+    conf = peer.getConfiguration();
+    trainingMode = conf.get("training.mode");
+    batchSize = conf.getInt("training.batch.size", 100); // mini-batch by
+                                                         // default
+    this.extraSetup(peer);
+  }
+
+  /**
+   * Handle extra setup for sub-classes.
+   * 
+   * @param peer
+   * @throws IOException
+   * @throws SyncException
+   * @throws InterruptedException
+   */
+  protected void extraSetup(
+      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, MLPMessage> peer)
+      throws IOException, SyncException, InterruptedException {
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public abstract void bsp(
+      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, MLPMessage> peer)
+      throws IOException, SyncException, InterruptedException;
+
+  @Override
+  public void cleanup(
+      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, MLPMessage> peer)
+      throws IOException {
+
+    this.extraCleanup(peer);
+  }
+
+  /**
+   * Handle extra cleanup for sub-classes.
+   * 
+   * @param peer
+   * @throws IOException
+   * @throws SyncException
+   * @throws InterruptedException
+   */
+  protected void extraCleanup(
+      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, MLPMessage> peer)
+      throws IOException {
+  }
+
+}

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/Sigmoid.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/Sigmoid.java?rev=1490122&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/Sigmoid.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/Sigmoid.java Thu Jun  6 04:06:26 2013
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.perception;
+
+
+/**
+ * The Sigmoid function
+ * 
+ * <pre>
+ * f(z) = 1 / (1 + e^{-z})
+ * </pre>
+ */
+public class Sigmoid extends SquashingFunction {
+
+  @Override
+  public double calculate(int index, double value) {
+    return 1.0 / (1 + Math.exp(-value));
+  }
+
+  @Override
+  public double calculateDerivative(double value) {
+    return value * (1 - value);
+  }
+}

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SmallMLPMessage.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SmallMLPMessage.java?rev=1490122&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SmallMLPMessage.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SmallMLPMessage.java Thu Jun  6 04:06:26 2013
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.perception;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hama.ml.math.DenseDoubleMatrix;
+import org.apache.hama.ml.writable.MatrixWritable;
+
+/**
+ * SmallMLPMessage is used to exchange information for the
+ * {@link SmallMultiLayerPerceptron}. It send the whole parameter matrix from
+ * one task to another.
+ */
+public class SmallMLPMessage extends MLPMessage {
+
+  private int owner; // the ID of the task who creates the message
+  private DenseDoubleMatrix[] weightUpdatedMatrices;
+  private int numOfMatrices;
+
+  public SmallMLPMessage(int owner, boolean terminated, DenseDoubleMatrix[] mat) {
+    super(terminated);
+    this.owner = owner;
+    this.weightUpdatedMatrices = mat;
+    this.numOfMatrices = this.weightUpdatedMatrices == null ? 0
+        : this.weightUpdatedMatrices.length;
+  }
+
+  /**
+   * Get the owner task Id of the message.
+   * 
+   * @return
+   */
+  public int getOwner() {
+    return owner;
+  }
+
+  /**
+   * Get the updated weight matrices.
+   * 
+   * @return
+   */
+  public DenseDoubleMatrix[] getWeightsUpdatedMatrices() {
+    return this.weightUpdatedMatrices;
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    this.owner = input.readInt();
+    this.terminated = input.readBoolean();
+    this.numOfMatrices = input.readInt();
+    this.weightUpdatedMatrices = new DenseDoubleMatrix[this.numOfMatrices];
+    for (int i = 0; i < this.numOfMatrices; ++i) {
+      this.weightUpdatedMatrices[i] = (DenseDoubleMatrix) MatrixWritable
+          .read(input);
+    }
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    output.writeInt(this.owner);
+    output.writeBoolean(this.terminated);
+    output.writeInt(this.numOfMatrices);
+    for (int i = 0; i < this.numOfMatrices; ++i) {
+      MatrixWritable.write(this.weightUpdatedMatrices[i], output);
+    }
+  }
+
+}

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SmallMLPTrainer.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SmallMLPTrainer.java?rev=1490122&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SmallMLPTrainer.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SmallMLPTrainer.java Thu Jun  6 04:06:26 2013
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.perception;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.BitSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.ml.math.DenseDoubleMatrix;
+import org.apache.hama.ml.writable.VectorWritable;
+
+/**
+ * The perceptron trainer for small scale MLP.
+ */
+public class SmallMLPTrainer extends PerceptronTrainer {
+
+  private static final Log LOG = LogFactory.getLog(SmallMLPTrainer.class);
+  /* used by master only, check whether all slaves finishes reading */
+  private BitSet statusSet;
+
+  private int numTrainingInstanceRead = 0;
+  /* Once reader reaches the EOF, the training procedure would be terminated */
+  private boolean terminateTraining = false;
+
+  private SmallMultiLayerPerceptron inMemoryPerceptron;
+
+  private int[] layerSizeArray;
+
+  @Override
+  protected void extraSetup(
+      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, MLPMessage> peer) {
+
+    this.statusSet = new BitSet(peer.getConfiguration().getInt("tasks", 1));
+
+    String outputModelPath = conf.get("modelPath");
+    if (outputModelPath == null || outputModelPath.trim().length() == 0) {
+      try {
+        throw new Exception("Please specify output model path.");
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+
+    String modelPath = conf.get("existingModelPath");
+    // build model from scratch
+    if (modelPath == null || modelPath.trim().length() == 0) {
+      double learningRate = Double.parseDouble(conf.get("learningRate"));
+      boolean regularization = Boolean.parseBoolean(conf.get("regularization"));
+      double momentum = Double.parseDouble(conf.get("momentum"));
+      String squashingFunctionName = conf.get("squashingFunctionName");
+      String costFunctionName = conf.get("costFunctionName");
+      String[] layerSizeArrayStr = conf.get("layerSizeArray").trim().split(" ");
+      this.layerSizeArray = new int[layerSizeArrayStr.length];
+      for (int i = 0; i < this.layerSizeArray.length; ++i) {
+        this.layerSizeArray[i] = Integer.parseInt(layerSizeArrayStr[i]);
+      }
+
+      this.inMemoryPerceptron = new SmallMultiLayerPerceptron(learningRate,
+          regularization, momentum, squashingFunctionName, costFunctionName,
+          layerSizeArray);
+      LOG.info("Training model from scratch.");
+    } else { // read model from existing data
+      this.inMemoryPerceptron = new SmallMultiLayerPerceptron(modelPath);
+      LOG.info("Training with existing model.");
+    }
+
+  }
+
+  @Override
+  protected void extraCleanup(
+      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, MLPMessage> peer) {
+    LOG.info(String.format("Task %d totally read %d records.\n",
+        peer.getPeerIndex(), this.numTrainingInstanceRead));
+    // master write learned model to disk
+    if (peer.getPeerIndex() == 0) {
+      try {
+        LOG.info(String.format("Master write learned model to %s\n",
+            conf.get("modelPath")));
+        this.inMemoryPerceptron.writeModelToFile(conf.get("modelPath"));
+      } catch (IOException e) {
+        System.err.println("Please set a correct model path.");
+      }
+    }
+  }
+
+  @Override
+  public void bsp(
+      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, MLPMessage> peer)
+      throws IOException, SyncException, InterruptedException {
+    LOG.info("Start training...");
+    if (trainingMode.equalsIgnoreCase("minibatch.gradient.descent")) {
+      LOG.info("Training Mode: minibatch.gradient.descent");
+      trainByMinibatch(peer);
+    }
+
+    LOG.info(String.format("Task %d finished.", peer.getPeerIndex()));
+  }
+
+  /**
+   * Train the MLP with stochastic gradient descent.
+   * 
+   * @param peer
+   * @throws IOException
+   * @throws SyncException
+   * @throws InterruptedException
+   */
+  private void trainByMinibatch(
+      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, MLPMessage> peer)
+      throws IOException, SyncException, InterruptedException {
+
+    int maxIteration = conf.getInt("training.iteration", 1);
+    LOG.info("# of Training Iteration: " + maxIteration);
+
+    for (int i = 0; i < maxIteration; ++i) {
+      if (peer.getPeerIndex() == 0) {
+        LOG.info(String.format("Iteration [%d] begins...", i));
+      }
+      peer.reopenInput();
+      // reset status
+      if (peer.getPeerIndex() == 0) {
+        this.statusSet = new BitSet(peer.getConfiguration().getInt("tasks", 1));
+      }
+      this.terminateTraining = false;
+      peer.sync();
+      while (true) {
+        // each slate task updates weights according to training data
+        boolean terminate = updateWeights(peer);
+        peer.sync();
+
+        // master merges the updates
+        if (peer.getPeerIndex() == 0) {
+          mergeUpdate(peer);
+        }
+        peer.sync();
+
+        if (terminate) {
+          break;
+        }
+      }
+
+    }
+
+  }
+
+  /**
+   * Merge the updates from slaves task.
+   * 
+   * @param peer
+   * @throws IOException
+   */
+  private void mergeUpdate(
+      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, MLPMessage> peer)
+      throws IOException {
+    // initialize the cache
+    DenseDoubleMatrix[] mergedUpdates = this.getZeroWeightMatrices();
+
+    int numOfPartitions = peer.getNumCurrentMessages();
+
+    // aggregates the weights update
+    while (peer.getNumCurrentMessages() > 0) {
+      SmallMLPMessage message = (SmallMLPMessage) peer.getCurrentMessage();
+      if (message.isTerminated()) {
+        this.statusSet.set(message.getOwner());
+      }
+
+      DenseDoubleMatrix[] weightUpdates = message.getWeightsUpdatedMatrices();
+      for (int m = 0; m < mergedUpdates.length; ++m) {
+        mergedUpdates[m] = (DenseDoubleMatrix) mergedUpdates[m]
+            .add(weightUpdates[m]);
+      }
+    }
+
+    if (numOfPartitions != 0) {
+      // calculate the global mean (the mean of batches from all slave tasks) of
+      // the weight updates
+      for (int m = 0; m < mergedUpdates.length; ++m) {
+        mergedUpdates[m] = (DenseDoubleMatrix) mergedUpdates[m]
+            .divide(numOfPartitions);
+      }
+
+      // check if all tasks finishes reading data
+      if (this.statusSet.cardinality() == conf.getInt("tasks", 1)) {
+        this.terminateTraining = true;
+      }
+
+      // update the weight matrices
+      this.inMemoryPerceptron.updateWeightMatrices(mergedUpdates);
+    }
+
+    // broadcast updated weight matrices
+    for (String peerName : peer.getAllPeerNames()) {
+      SmallMLPMessage msg = new SmallMLPMessage(peer.getPeerIndex(),
+          this.terminateTraining, this.inMemoryPerceptron.getWeightMatrices());
+      peer.send(peerName, msg);
+    }
+
+  }
+
+  /**
+   * Train the MLP with training data.
+   * 
+   * @param peer
+   * @return Whether terminates.
+   * @throws IOException
+   */
+  private boolean updateWeights(
+      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, MLPMessage> peer)
+      throws IOException {
+    // receive update message sent by master
+    if (peer.getNumCurrentMessages() > 0) {
+      SmallMLPMessage message = (SmallMLPMessage) peer.getCurrentMessage();
+      this.terminateTraining = message.isTerminated();
+      // each slave renew its weight matrices
+      this.inMemoryPerceptron.setWeightMatrices(message
+          .getWeightsUpdatedMatrices());
+      if (this.terminateTraining) {
+        return true;
+      }
+    }
+
+    // update weight according to training data
+    DenseDoubleMatrix[] weightUpdates = this.getZeroWeightMatrices();
+
+    int count = 0;
+    LongWritable recordId = new LongWritable();
+    VectorWritable trainingInstance = new VectorWritable();
+    boolean hasMore = false;
+    while (count++ < this.batchSize) {
+      hasMore = peer.readNext(recordId, trainingInstance);
+
+      try {
+        DenseDoubleMatrix[] singleTrainingInstanceUpdates = this.inMemoryPerceptron
+            .trainByInstance(trainingInstance.getVector());
+        // aggregate the updates
+        for (int m = 0; m < weightUpdates.length; ++m) {
+          weightUpdates[m] = (DenseDoubleMatrix) weightUpdates[m]
+              .add(singleTrainingInstanceUpdates[m]);
+        }
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+
+      ++numTrainingInstanceRead;
+      if (!hasMore) {
+        break;
+      }
+    }
+
+    // calculate the local mean (the mean of the local batch) of weight updates
+    for (int m = 0; m < weightUpdates.length; ++m) {
+      weightUpdates[m] = (DenseDoubleMatrix) weightUpdates[m].divide(count);
+    }
+
+    LOG.info(String.format("Task %d has read %d records.",
+        peer.getPeerIndex(), this.numTrainingInstanceRead));
+
+    // send the weight updates to master task
+    SmallMLPMessage message = new SmallMLPMessage(peer.getPeerIndex(),
+        !hasMore, weightUpdates);
+    peer.send(peer.getPeerName(0), message); // send status to master
+
+    return !hasMore;
+  }
+
+  /**
+   * Initialize the weight matrices.
+   */
+  private DenseDoubleMatrix[] getZeroWeightMatrices() {
+    DenseDoubleMatrix[] weightUpdateCache = new DenseDoubleMatrix[this.layerSizeArray.length - 1];
+    // initialize weight matrix each layer
+    for (int i = 0; i < weightUpdateCache.length; ++i) {
+      weightUpdateCache[i] = new DenseDoubleMatrix(this.layerSizeArray[i] + 1,
+          this.layerSizeArray[i + 1]);
+    }
+    return weightUpdateCache;
+  }
+
+  /**
+   * Print out the weights.
+   * 
+   * @param mat
+   * @return
+   */
+  protected static String weightsToString(DenseDoubleMatrix[] mat) {
+    StringBuilder sb = new StringBuilder();
+
+    for (int i = 0; i < mat.length; ++i) {
+      sb.append(String.format("Matrix [%d]\n", i));
+      double[][] values = mat[i].getValues();
+      for (int d = 0; d < values.length; ++d) {
+        sb.append(Arrays.toString(values[d]));
+        sb.append('\n');
+      }
+      sb.append('\n');
+    }
+    return sb.toString();
+  }
+
+}

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SmallMultiLayerPerceptron.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SmallMultiLayerPerceptron.java?rev=1490122&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SmallMultiLayerPerceptron.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SmallMultiLayerPerceptron.java Thu Jun  6 04:06:26 2013
@@ -0,0 +1,465 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.perception;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.ml.math.DenseDoubleMatrix;
+import org.apache.hama.ml.math.DenseDoubleVector;
+import org.apache.hama.ml.math.DoubleVector;
+import org.apache.hama.ml.writable.MatrixWritable;
+import org.apache.hama.ml.writable.VectorWritable;
+import org.mortbay.log.Log;
+
+/**
+ * SmallMultiLayerPerceptronBSP is a kind of multilayer perceptron whose
+ * parameters can be fit into the memory of a single machine. This kind of model
+ * can be trained and used more efficiently than the BigMultiLayerPerceptronBSP,
+ * whose parameters are distributedly stored in multiple machines.
+ * 
+ * In general, it it is a multilayer perceptron that consists of one input
+ * layer, multiple hidden layer and one output layer.
+ * 
+ * The number of neurons in the input layer should be consistent with the number
+ * of features in the training instance. The number of neurons in the output
+ * layer
+ */
+public final class SmallMultiLayerPerceptron extends MultiLayerPerceptron
+    implements Writable {
+
+  /* The in-memory weight matrix */
+  private DenseDoubleMatrix[] weightMatrice;
+
+  /**
+   * {@inheritDoc}
+   */
+  public SmallMultiLayerPerceptron(double learningRate, boolean regularization,
+      double momentum, String squashingFunctionName, String costFunctionName,
+      int[] layerSizeArray) {
+    super(learningRate, regularization, momentum, squashingFunctionName,
+        costFunctionName, layerSizeArray);
+    this.MLPType = "SmallMLP";
+    initializeWeightMatrix();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public SmallMultiLayerPerceptron(String modelPath) {
+    super(modelPath);
+    if (modelPath != null) {
+      try {
+        this.readFromModel();
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  /**
+   * Initialize weight matrix using Gaussian distribution. Each weight is
+   * initialized in range (-0.5, 0.5)
+   */
+  private void initializeWeightMatrix() {
+    this.weightMatrice = new DenseDoubleMatrix[this.numberOfLayers - 1];
+    // each layer contains one bias neuron
+    Random rnd = new Random();
+    for (int i = 0; i < this.numberOfLayers - 1; ++i) {
+      // add weights for bias
+      this.weightMatrice[i] = new DenseDoubleMatrix(this.layerSizeArray[i] + 1,
+          this.layerSizeArray[i + 1]);
+      int rowCount = this.weightMatrice[i].getRowCount();
+      int colCount = this.weightMatrice[i].getColumnCount();
+      for (int row = 0; row < rowCount; ++row) {
+        for (int col = 0; col < colCount; ++col) {
+          this.weightMatrice[i].set(row, col, rnd.nextDouble() - 0.5);
+        }
+      }
+    }
+  }
+
+  @Override
+  /**
+   * {@inheritDoc}
+   * The model meta-data is stored in memory.
+   */
+  public DoubleVector output(DoubleVector featureVector) throws Exception {
+    List<double[]> outputCache = this.outputInternal(featureVector);
+    // the output of the last layer is the output of the MLP
+    return new DenseDoubleVector(outputCache.get(outputCache.size() - 1));
+  }
+
+  private List<double[]> outputInternal(DoubleVector featureVector)
+      throws Exception {
+
+    // store the output of the hidden layers and output layer, each array store
+    // one layer
+    List<double[]> outputCache = new ArrayList<double[]>();
+
+    // start from the first hidden layer
+    double[] intermediateResults = new double[this.layerSizeArray[0] + 1];
+    if (intermediateResults.length - 1 != featureVector.getDimension()) {
+      throw new Exception(
+          "Input feature dimension incorrect! The dimension of input layer is "
+              + (this.layerSizeArray[0] - 1)
+              + ", but the dimension of input feature is "
+              + featureVector.getDimension());
+    }
+
+    // fill with input features
+    intermediateResults[0] = 1.0; // bias
+    for (int i = 0; i < featureVector.getDimension(); ++i) {
+      intermediateResults[i + 1] = featureVector.get(i);
+    }
+    outputCache.add(intermediateResults);
+
+    // forward the intermediate results to next layer
+    for (int fromLayer = 0; fromLayer < this.numberOfLayers - 1; ++fromLayer) {
+      intermediateResults = forward(fromLayer, intermediateResults);
+      outputCache.add(intermediateResults);
+    }
+
+    return outputCache;
+  }
+
+  /**
+   * Calculate the intermediate results of layer fromLayer + 1.
+   * 
+   * @param fromLayer The index of layer that forwards the intermediate results
+   *          from.
+   * @return
+   */
+  private double[] forward(int fromLayer, double[] intermediateResult) {
+    int toLayer = fromLayer + 1;
+    double[] results = null;
+    int offset = 0;
+
+    if (toLayer < this.layerSizeArray.length - 1) { // add bias if it is not
+                                                    // output layer
+      results = new double[this.layerSizeArray[toLayer] + 1];
+      offset = 1;
+      results[0] = 1.0; // the bias
+    } else {
+      results = new double[this.layerSizeArray[toLayer]]; // no bias
+    }
+
+    for (int neuronIdx = 0; neuronIdx < this.layerSizeArray[toLayer]; ++neuronIdx) {
+      // aggregate the results from previous layer
+      for (int prevNeuronIdx = 0; prevNeuronIdx < this.layerSizeArray[fromLayer] + 1; ++prevNeuronIdx) {
+        results[neuronIdx + offset] += this.weightMatrice[fromLayer].get(
+            prevNeuronIdx, neuronIdx) * intermediateResult[prevNeuronIdx];
+      }
+      // calculate via squashing function
+      results[neuronIdx + offset] = this.squashingFunction.calculate(0,
+          results[neuronIdx + offset]);
+    }
+
+    return results;
+  }
+
+  /**
+   * Get the updated weights using one training instance.
+   * 
+   * @param trainingInstance The trainingInstance is the concatenation of
+   *          feature vector and class label vector.
+   * @return The update of each weight.
+   * @throws Exception
+   */
+  DenseDoubleMatrix[] trainByInstance(DoubleVector trainingInstance)
+      throws Exception {
+    // initialize weight update matrices
+    DenseDoubleMatrix[] weightUpdateMatrices = new DenseDoubleMatrix[this.layerSizeArray.length - 1];
+    for (int m = 0; m < weightUpdateMatrices.length; ++m) {
+      weightUpdateMatrices[m] = new DenseDoubleMatrix(
+          this.layerSizeArray[m] + 1, this.layerSizeArray[m + 1]);
+    }
+
+    if (trainingInstance == null) {
+      return weightUpdateMatrices;
+    }
+
+    double[] trainingVec = trainingInstance.toArray();
+    double[] trainingFeature = Arrays.copyOfRange(trainingVec, 0,
+        this.layerSizeArray[0]);
+    double[] trainingLabels = Arrays.copyOfRange(trainingVec,
+        this.layerSizeArray[0], trainingVec.length);
+
+    DoubleVector trainingFeatureVec = new DenseDoubleVector(trainingFeature);
+    List<double[]> outputCache = this.outputInternal(trainingFeatureVec);
+
+    // calculate the delta of output layer
+    double[] delta = new double[this.layerSizeArray[this.layerSizeArray.length - 1]];
+    double[] outputLayerOutput = outputCache.get(outputCache.size() - 1);
+    double[] lastHiddenLayerOutput = outputCache.get(outputCache.size() - 2);
+
+    for (int j = 0; j < delta.length; ++j) {
+      delta[j] = this.squashingFunction
+          .calculateDerivative(outputLayerOutput[j])
+          * this.costFunction.calculateDerivative(trainingLabels[j],
+              outputLayerOutput[j]);
+
+      // calculate the weight update matrix between the last hidden layer and
+      // the output layer
+      for (int i = 0; i < this.layerSizeArray[this.layerSizeArray.length - 2] + 1; ++i) {
+        double updatedValue = this.learningRate * delta[j]
+            * lastHiddenLayerOutput[i];
+        weightUpdateMatrices[weightUpdateMatrices.length - 1].set(i, j,
+            updatedValue);
+      }
+    }
+
+    // calculate the delta for each hidden layer through back-propagation
+    for (int l = this.layerSizeArray.length - 2; l >= 1; --l) {
+      delta = backpropagate(l, delta, outputCache, weightUpdateMatrices);
+    }
+
+    return weightUpdateMatrices;
+  }
+
+  /**
+   * Back-propagate the errors from nextLayer to prevLayer. The weight updated
+   * information will be stored in the weightUpdateMatrices, and the delta of
+   * the prevLayer would be returned.
+   * 
+   * @param curLayerIdx The layer index of the current layer.
+   * @param nextLayerDelta The delta of the next layer.
+   * @param outputCache The cache of the output of all the layers.
+   * @param weightUpdateMatrices The weight update matrices.
+   * @return The delta of the previous layer, will be used for next iteration of
+   *         back-propagation.
+   */
+  private double[] backpropagate(int curLayerIdx, double[] nextLayerDelta,
+      List<double[]> outputCache, DenseDoubleMatrix[] weightUpdateMatrices) {
+    int prevLayerIdx = curLayerIdx - 1;
+    double[] delta = new double[this.layerSizeArray[curLayerIdx]];
+    double[] curLayerOutput = outputCache.get(curLayerIdx);
+    double[] prevLayerOutput = outputCache.get(prevLayerIdx);
+
+    // for each neuron j in nextLayer, calculate the delta
+    for (int j = 0; j < delta.length; ++j) {
+      // aggregate delta from next layer
+      for (int k = 0; k < nextLayerDelta.length; ++k) {
+        double weight = this.weightMatrice[curLayerIdx].get(j, k);
+        delta[j] += weight * nextLayerDelta[k];
+      }
+      delta[j] *= this.squashingFunction
+          .calculateDerivative(curLayerOutput[j + 1]);
+
+      // calculate the weight update matrix between the previous layer and the
+      // current layer
+      for (int i = 0; i < weightUpdateMatrices[prevLayerIdx].getRowCount(); ++i) {
+        double updatedValue = this.learningRate * delta[j] * prevLayerOutput[i];
+        weightUpdateMatrices[prevLayerIdx].set(i, j, updatedValue);
+      }
+    }
+
+    return delta;
+  }
+
+  @Override
+  /**
+   * {@inheritDoc}
+   */
+  public void train(Path dataInputPath, Map<String, String> trainingParams)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    // create the BSP training job
+    Configuration conf = new Configuration();
+    for (Map.Entry<String, String> entry : trainingParams.entrySet()) {
+      conf.set(entry.getKey(), entry.getValue());
+    }
+
+    // put model related parameters
+    if (modelPath == null || modelPath.trim().length() == 0) { // build model
+                                                               // from scratch
+      conf.set("MLPType", this.MLPType);
+      conf.set("learningRate", "" + this.learningRate);
+      conf.set("regularization", "" + this.regularization);
+      conf.set("momentum", "" + this.momentum);
+      conf.set("squashingFunctionName", this.squashingFunctionName);
+      conf.set("costFunctionName", this.costFunctionName);
+      StringBuilder layerSizeArraySb = new StringBuilder();
+      for (int layerSize : this.layerSizeArray) {
+        layerSizeArraySb.append(layerSize);
+        layerSizeArraySb.append(' ');
+      }
+      conf.set("layerSizeArray", layerSizeArraySb.toString());
+    }
+
+    HamaConfiguration hamaConf = new HamaConfiguration(conf);
+
+    BSPJob job = new BSPJob(hamaConf, SmallMLPTrainer.class);
+    job.setJobName("Small scale MLP training");
+    job.setJarByClass(SmallMLPTrainer.class);
+    job.setBspClass(SmallMLPTrainer.class);
+    job.setInputPath(dataInputPath);
+    job.setInputFormat(org.apache.hama.bsp.SequenceFileInputFormat.class);
+    job.setInputKeyClass(LongWritable.class);
+    job.setInputValueClass(VectorWritable.class);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(NullWritable.class);
+    job.setOutputFormat(org.apache.hama.bsp.NullOutputFormat.class);
+
+    int numTasks = conf.getInt("tasks", 1);
+    job.setNumBspTask(numTasks);
+    job.waitForCompletion(true);
+
+    // reload learned model
+    Log.info(String.format("Reload model from %s.",
+        trainingParams.get("modelPath")));
+    this.modelPath = trainingParams.get("modelPath");
+    this.readFromModel();
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    this.MLPType = WritableUtils.readString(input);
+    this.learningRate = input.readDouble();
+    this.regularization = input.readBoolean();
+    this.momentum = input.readDouble();
+    this.numberOfLayers = input.readInt();
+    this.squashingFunctionName = WritableUtils.readString(input);
+    this.costFunctionName = WritableUtils.readString(input);
+    // read the number of neurons for each layer
+    this.layerSizeArray = new int[this.numberOfLayers];
+    for (int i = 0; i < numberOfLayers; ++i) {
+      this.layerSizeArray[i] = input.readInt();
+    }
+    this.weightMatrice = new DenseDoubleMatrix[this.numberOfLayers - 1];
+    for (int i = 0; i < numberOfLayers - 1; ++i) {
+      this.weightMatrice[i] = (DenseDoubleMatrix) MatrixWritable.read(input);
+    }
+    this.squashingFunction = SquashingFunctionFactory
+        .getSquashingFunction(this.squashingFunctionName);
+    this.costFunction = CostFunctionFactory
+        .getCostFunction(this.costFunctionName);
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    WritableUtils.writeString(output, MLPType);
+    output.writeDouble(learningRate);
+    output.writeBoolean(regularization);
+    output.writeDouble(momentum);
+    output.writeInt(numberOfLayers);
+    WritableUtils.writeString(output, squashingFunctionName);
+    WritableUtils.writeString(output, costFunctionName);
+
+    // write the number of neurons for each layer
+    for (int i = 0; i < this.numberOfLayers; ++i) {
+      output.writeInt(this.layerSizeArray[i]);
+    }
+    for (int i = 0; i < numberOfLayers - 1; ++i) {
+      MatrixWritable matrixWritable = new MatrixWritable(this.weightMatrice[i]);
+      matrixWritable.write(output);
+    }
+  }
+
+  /**
+   * Read the model meta-data from the specified location.
+   * 
+   * @throws IOException
+   */
+  @Override
+  protected void readFromModel() throws IOException {
+    Configuration conf = new Configuration();
+    try {
+      URI uri = new URI(modelPath);
+      FileSystem fs = FileSystem.get(uri, conf);
+      FSDataInputStream is = new FSDataInputStream(fs.open(new Path(modelPath)));
+      this.readFields(is);
+    } catch (URISyntaxException e) {
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * Write the model to file.
+   * 
+   * @throws IOException
+   */
+  @Override
+  public void writeModelToFile(String modelPath) throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    FSDataOutputStream stream = fs.create(new Path(modelPath), true);
+    this.write(stream);
+    stream.close();
+  }
+
+  DenseDoubleMatrix[] getWeightMatrices() {
+    return this.weightMatrice;
+  }
+
+  void setWeightMatrices(DenseDoubleMatrix[] newMatrices) {
+    this.weightMatrice = newMatrices;
+  }
+
+  /**
+   * Update the weight matrices with given updates.
+   * 
+   * @param updateMatrices The updates weights in matrix format.
+   */
+  void updateWeightMatrices(DenseDoubleMatrix[] updateMatrices) {
+    for (int m = 0; m < this.weightMatrice.length; ++m) {
+      this.weightMatrice[m] = (DenseDoubleMatrix) this.weightMatrice[m]
+          .add(updateMatrices[m]);
+    }
+  }
+
+  /**
+   * Print out the weights.
+   * 
+   * @param mat
+   * @return
+   */
+  static String weightsToString(DenseDoubleMatrix[] mat) {
+    StringBuilder sb = new StringBuilder();
+
+    for (int i = 0; i < mat.length; ++i) {
+      sb.append(String.format("Matrix [%d]\n", i));
+      double[][] values = mat[i].getValues();
+      for (int d = 0; d < values.length; ++d) {
+        sb.append(Arrays.toString(values[d]));
+        sb.append('\n');
+      }
+      sb.append('\n');
+    }
+    return sb.toString();
+  }
+
+}

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SquaredError.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SquaredError.java?rev=1490122&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SquaredError.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SquaredError.java Thu Jun  6 04:06:26 2013
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.perception;
+
+/**
+ * Square error cost function.
+ * 
+ * <pre>
+ * cost(t, y) = 0.5 * (t - y) &circ; 2
+ * </pre>
+ */
+public class SquaredError extends CostFunction {
+
+  @Override
+  /**
+   * {@inheritDoc}
+   */
+  public double calculate(double target, double actual) {
+    double diff = target - actual;
+    return 0.5 * diff * diff;
+  }
+
+  @Override
+  /**
+   * {@inheritDoc}
+   */
+  public double calculateDerivative(double target, double actual) {
+    return target - actual;
+  }
+
+}

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SquashingFunction.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SquashingFunction.java?rev=1490122&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SquashingFunction.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SquashingFunction.java Thu Jun  6 04:06:26 2013
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.perception;
+
+import org.apache.hama.ml.math.DoubleVectorFunction;
+
+/**
+ * The squashing function to activate the neurons.
+ * 
+ */
+public abstract class SquashingFunction implements DoubleVectorFunction {
+
+  /**
+   * Calculates the result with a given index and value of a vector.
+   */
+  @Override
+  public abstract double calculate(int index, double value);
+
+  /**
+   * Apply the gradient descent to each of the elements in vector.
+   * 
+   * @param vector
+   * @return
+   */
+  public abstract double calculateDerivative(double value);
+}

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SquashingFunctionFactory.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SquashingFunctionFactory.java?rev=1490122&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SquashingFunctionFactory.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/SquashingFunctionFactory.java Thu Jun  6 04:06:26 2013
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.perception;
+
+/**
+ * Get the squashing function according to the name.
+ */
+public class SquashingFunctionFactory {
+
+  /**
+   * Get the squashing function instance according to the name. If no matched
+   * squahsing function is found, return the sigmoid squashing function by
+   * default.
+   * 
+   * @param name The name of the squashing function.
+   * @return The instance of the squashing function.
+   */
+  public static SquashingFunction getSquashingFunction(String name) {
+    if (name.equalsIgnoreCase("Sigmoid")) {
+      return new Sigmoid();
+    } else if (name.equalsIgnoreCase("Tanh")) {
+      return new Tanh();
+    }
+    return new Sigmoid();
+  }
+
+}

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/Tanh.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/Tanh.java?rev=1490122&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/Tanh.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/perception/Tanh.java Thu Jun  6 04:06:26 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.perception;
+
+/**
+ * The hyperbolic tangent function. It is used as a squashing function in
+ * multi-layer perceptron.
+ */
+public class Tanh extends SquashingFunction {
+
+  @Override
+  public double calculate(int index, double value) {
+    return Math.tanh(value);
+  }
+
+  @Override
+  public double calculateDerivative(double value) {
+    return 1 - value * value;
+  }
+
+}

Added: hama/trunk/ml/src/test/java/org/apache/hama/ml/perception/TestSmallMLPMessage.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/test/java/org/apache/hama/ml/perception/TestSmallMLPMessage.java?rev=1490122&view=auto
==============================================================================
--- hama/trunk/ml/src/test/java/org/apache/hama/ml/perception/TestSmallMLPMessage.java (added)
+++ hama/trunk/ml/src/test/java/org/apache/hama/ml/perception/TestSmallMLPMessage.java Thu Jun  6 04:06:26 2013
@@ -0,0 +1,89 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.perception;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hama.ml.math.DenseDoubleMatrix;
+import org.junit.Test;
+
+
+/**
+ * Test the functionalities of SmallMLPMessage
+ * 
+ */
+public class TestSmallMLPMessage {
+
+  @Test
+  public void testReadWrite() {
+    int owner = 101;
+    double[][] mat = { { 1, 2, 3 }, { 4, 5, 6 }, { 7, 8, 9 } };
+
+    double[][] mat2 = { { 10, 20 }, { 30, 40 }, { 50, 60 } };
+
+    double[][][] mats = { mat, mat2 };
+
+    DenseDoubleMatrix[] matrices = new DenseDoubleMatrix[] {
+        new DenseDoubleMatrix(mat), new DenseDoubleMatrix(mat2) };
+
+    SmallMLPMessage message = new SmallMLPMessage(owner, true, matrices);
+
+    Configuration conf = new Configuration();
+    String strPath = "/tmp/testSmallMLPMessage";
+    Path path = new Path(strPath);
+    try {
+      FileSystem fs = FileSystem.get(new URI(strPath), conf);
+      FSDataOutputStream out = fs.create(path, true);
+      message.write(out);
+      out.close();
+
+      FSDataInputStream in = fs.open(path);
+      SmallMLPMessage outMessage = new SmallMLPMessage(0, false, null);
+      outMessage.readFields(in);
+
+      assertEquals(owner, outMessage.getOwner());
+      DenseDoubleMatrix[] outMatrices = outMessage.getWeightsUpdatedMatrices();
+      // check each matrix
+      for (int i = 0; i < outMatrices.length; ++i) {
+        double[][] outMat = outMessage.getWeightsUpdatedMatrices()[i]
+            .getValues();
+        for (int j = 0; j < outMat.length; ++j) {
+          assertArrayEquals(mats[i][j], outMat[j], 0.0001);
+        }
+      }
+
+      fs.delete(path, true);
+    } catch (IOException e) {
+      e.printStackTrace();
+    } catch (URISyntaxException e) {
+      e.printStackTrace();
+    }
+
+  }
+}

Added: hama/trunk/ml/src/test/java/org/apache/hama/ml/perception/TestSmallMultiLayerPerceptron.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/test/java/org/apache/hama/ml/perception/TestSmallMultiLayerPerceptron.java?rev=1490122&view=auto
==============================================================================
--- hama/trunk/ml/src/test/java/org/apache/hama/ml/perception/TestSmallMultiLayerPerceptron.java (added)
+++ hama/trunk/ml/src/test/java/org/apache/hama/ml/perception/TestSmallMultiLayerPerceptron.java Thu Jun  6 04:06:26 2013
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.perception;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hama.ml.math.DenseDoubleMatrix;
+import org.apache.hama.ml.math.DenseDoubleVector;
+import org.apache.hama.ml.math.DoubleMatrix;
+import org.apache.hama.ml.math.DoubleVector;
+import org.apache.hama.ml.writable.MatrixWritable;
+import org.apache.hama.ml.writable.VectorWritable;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestSmallMultiLayerPerceptron {
+
+  /**
+   * Write and read the parameters of MLP.
+   */
+  @Test
+  public void testWriteReadMLP() {
+    String modelPath = "/tmp/sampleModel-testWriteReadMLP.data";
+    double learningRate = 0.5;
+    boolean regularization = false; // no regularization
+    double momentum = 0; // no momentum
+    String squashingFunctionName = "Sigmoid";
+    String costFunctionName = "SquaredError";
+    int[] layerSizeArray = new int[] { 3, 2, 2, 3 };
+    MultiLayerPerceptron mlp = new SmallMultiLayerPerceptron(learningRate,
+        regularization, momentum, squashingFunctionName, costFunctionName,
+        layerSizeArray);
+    try {
+      mlp.writeModelToFile(modelPath);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    try {
+      // read the meta-data
+      Configuration conf = new Configuration();
+      FileSystem fs = FileSystem.get(conf);
+      mlp = new SmallMultiLayerPerceptron(modelPath);
+      assertEquals("SmallMLP", mlp.getMLPType());
+      assertEquals(learningRate, mlp.getLearningRate(), 0.001);
+      assertEquals(regularization, mlp.isRegularization());
+      assertEquals(layerSizeArray.length, mlp.getNumberOfLayers());
+      assertEquals(momentum, mlp.getMomentum(), 0.001);
+      assertEquals(squashingFunctionName, mlp.getSquashingFunctionName());
+      assertEquals(costFunctionName, mlp.getCostFunctionName());
+      assertArrayEquals(layerSizeArray, mlp.getLayerSizeArray());
+      // delete test file
+      fs.delete(new Path(modelPath), true);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * Test the output of an example MLP.
+   */
+  @Test
+  public void testOutput() {
+    // write the MLP meta-data manually
+    String modelPath = "/tmp/sampleModel-testOutput.data";
+    Configuration conf = new Configuration();
+    try {
+      FileSystem fs = FileSystem.get(conf);
+      FSDataOutputStream output = fs.create(new Path(modelPath), true);
+
+      String MLPType = "SmallMLP";
+      double learningRate = 0.5;
+      boolean regularization = false;
+      double momentum = 0;
+      String squashingFunctionName = "Sigmoid";
+      String costFunctionName = "SquaredError";
+      int[] layerSizeArray = new int[] { 3, 2, 3, 3 };
+      int numberOfLayers = layerSizeArray.length;
+
+      WritableUtils.writeString(output, MLPType);
+      output.writeDouble(learningRate);
+      output.writeBoolean(regularization);
+      output.writeDouble(momentum);
+      output.writeInt(numberOfLayers);
+      WritableUtils.writeString(output, squashingFunctionName);
+      WritableUtils.writeString(output, costFunctionName);
+
+      // write the number of neurons for each layer
+      for (int i = 0; i < numberOfLayers; ++i) {
+        output.writeInt(layerSizeArray[i]);
+      }
+
+      double[][] matrix01 = { // 4 by 2
+      { 0.5, 0.2 }, { 0.1, 0.1 }, { 0.2, 0.5 }, { 0.1, 0.5 } };
+
+      double[][] matrix12 = { // 3 by 3
+      { 0.1, 0.2, 0.5 }, { 0.2, 0.5, 0.2 }, { 0.5, 0.5, 0.1 } };
+
+      double[][] matrix23 = { // 4 by 3
+      { 0.2, 0.5, 0.2 }, { 0.5, 0.1, 0.5 }, { 0.1, 0.2, 0.1 },
+          { 0.1, 0.2, 0.5 } };
+
+      DoubleMatrix[] matrices = { new DenseDoubleMatrix(matrix01),
+          new DenseDoubleMatrix(matrix12), new DenseDoubleMatrix(matrix23) };
+      for (DoubleMatrix mat : matrices) {
+        MatrixWritable.write(mat, output);
+      }
+      output.close();
+
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    // initial the mlp with existing model meta-data and get the output
+    MultiLayerPerceptron mlp = new SmallMultiLayerPerceptron(modelPath);
+    DoubleVector input = new DenseDoubleVector(new double[] { 1, 2, 3 });
+    try {
+      DoubleVector result = mlp.output(input);
+      assertArrayEquals(new double[] { 0.6636557, 0.7009963, 0.7213835 },
+          result.toArray(), 0.0001);
+    } catch (Exception e1) {
+      e1.printStackTrace();
+    }
+
+    // delete meta-data
+    try {
+      FileSystem fs = FileSystem.get(conf);
+      fs.delete(new Path(modelPath), true);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+  }
+
+  /**
+   * Test the MLP on XOR problem.
+   */
+  @Test
+  public void testSingleInstanceTraining() {
+    // generate training data
+    DoubleVector[] trainingData = new DenseDoubleVector[] {
+        new DenseDoubleVector(new double[] { 0, 0, 0 }),
+        new DenseDoubleVector(new double[] { 0, 1, 1 }),
+        new DenseDoubleVector(new double[] { 1, 0, 1 }),
+        new DenseDoubleVector(new double[] { 1, 1, 0 }) };
+
+    // set parameters
+    double learningRate = 0.6;
+    boolean regularization = false; // no regularization
+    double momentum = 0; // no momentum
+    String squashingFunctionName = "Sigmoid";
+    String costFunctionName = "SquaredError";
+    int[] layerSizeArray = new int[] { 2, 5, 1 };
+    SmallMultiLayerPerceptron mlp = new SmallMultiLayerPerceptron(learningRate,
+        regularization, momentum, squashingFunctionName, costFunctionName,
+        layerSizeArray);
+
+    try {
+      // train by multiple instances
+      Random rnd = new Random();
+      for (int i = 0; i < 30000; ++i) {
+        DenseDoubleMatrix[] weightUpdates = mlp
+            .trainByInstance(trainingData[rnd.nextInt(4)]);
+        mlp.updateWeightMatrices(weightUpdates);
+      }
+
+      // System.out.printf("Weight matrices: %s\n",
+      // mlp.weightsToString(mlp.getWeightMatrices()));
+      for (int i = 0; i < trainingData.length; ++i) {
+        DenseDoubleVector testVec = (DenseDoubleVector) trainingData[i]
+            .slice(2);
+        assertEquals(trainingData[i].toArray()[2], mlp.output(testVec)
+            .toArray()[0], 0.2);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * Test the XOR problem.
+   */
+  @Test
+  public void testTrainingByXOR() {
+    // write in some training instances
+    Configuration conf = new Configuration();
+    String strDataPath = "/tmp/xor-training-by-xor";
+    Path dataPath = new Path(strDataPath);
+
+    // generate training data
+    DoubleVector[] trainingData = new DenseDoubleVector[] {
+        new DenseDoubleVector(new double[] { 0, 0, 0 }),
+        new DenseDoubleVector(new double[] { 0, 1, 1 }),
+        new DenseDoubleVector(new double[] { 1, 0, 1 }),
+        new DenseDoubleVector(new double[] { 1, 1, 0 }) };
+
+    try {
+      URI uri = new URI(strDataPath);
+      FileSystem fs = FileSystem.get(uri, conf);
+      fs.delete(dataPath, true);
+      if (!fs.exists(dataPath)) {
+        fs.createNewFile(dataPath);
+        SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf,
+            dataPath, LongWritable.class, VectorWritable.class);
+
+        for (int i = 0; i < 1000; ++i) {
+          VectorWritable vecWritable = new VectorWritable(trainingData[i % 4]);
+          writer.append(new LongWritable(i), vecWritable);
+        }
+        writer.close();
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+
+    // begin training
+    String modelPath = "/tmp/xorModel-training-by-xor.data";
+    double learningRate = 0.6;
+    boolean regularization = false; // no regularization
+    double momentum = 0; // no momentum
+    String squashingFunctionName = "Tanh";
+    String costFunctionName = "SquaredError";
+    int[] layerSizeArray = new int[] { 2, 5, 1 };
+    SmallMultiLayerPerceptron mlp = new SmallMultiLayerPerceptron(learningRate,
+        regularization, momentum, squashingFunctionName, costFunctionName,
+        layerSizeArray);
+
+    Map<String, String> trainingParams = new HashMap<String, String>();
+    trainingParams.put("training.iteration", "10000");
+    trainingParams.put("training.mode", "minibatch.gradient.descent");
+    trainingParams.put("training.batch.size", "100");
+    trainingParams.put("tasks", "3");
+    trainingParams.put("modelPath", modelPath);
+
+    try {
+      mlp.train(dataPath, trainingParams);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+
+    // test the model
+    for (int i = 0; i < trainingData.length; ++i) {
+      DenseDoubleVector testVec = (DenseDoubleVector) trainingData[i].slice(2);
+      try {
+        DenseDoubleVector actual = (DenseDoubleVector) mlp.output(testVec);
+        assertEquals(trainingData[i].toArray()[2], actual.get(0), 0.2);
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+}



Mime
View raw message