hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomm...@apache.org
Subject svn commit: r1394791 - /hama/trunk/ml/src/main/java/org/apache/hama/ml/regression/GradientDescentBSP.java
Date Fri, 05 Oct 2012 20:39:27 GMT
Author: tommaso
Date: Fri Oct  5 20:39:26 2012
New Revision: 1394791

URL: http://svn.apache.org/viewvc?rev=1394791&view=rev
Log:
[HAMA-651] - fixed format and added input reset on #getXSize

Modified:
    hama/trunk/ml/src/main/java/org/apache/hama/ml/regression/GradientDescentBSP.java

Modified: hama/trunk/ml/src/main/java/org/apache/hama/ml/regression/GradientDescentBSP.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/regression/GradientDescentBSP.java?rev=1394791&r1=1394790&r2=1394791&view=diff
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/regression/GradientDescentBSP.java (original)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/regression/GradientDescentBSP.java Fri
Oct  5 20:39:26 2012
@@ -17,8 +17,6 @@
  */
 package org.apache.hama.ml.regression;
 
-import java.io.IOException;
-
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hama.bsp.BSP;
@@ -31,155 +29,158 @@ import org.apache.hama.util.KeyValuePair
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+
 /**
  * A gradient descent (see <code>http://en.wikipedia.org/wiki/Gradient_descent</code>)
BSP based abstract implementation.
  * Each extending class should implement the #hypothesis(DoubleVector theta, DoubleVector
x) method for a specific
  */
 public abstract class GradientDescentBSP extends BSP<VectorWritable, DoubleWritable, NullWritable,
NullWritable, VectorWritable> {
 
-    private static final Logger log = LoggerFactory.getLogger(GradientDescentBSP.class);
-    static final String INITIAL_THETA_VALUES = "initial.theta.values";
-    static final String ALPHA = "alpha";
-
-    private boolean master;
-    private DoubleVector theta;
-
-    @Override
-    public void setup(BSPPeer<VectorWritable, DoubleWritable, NullWritable, NullWritable,
VectorWritable> peer) throws IOException, SyncException, InterruptedException {
-        master = peer.getPeerIndex() == peer.getNumPeers() / 2;
-    }
+  private static final Logger log = LoggerFactory.getLogger(GradientDescentBSP.class);
+  static final String INITIAL_THETA_VALUES = "initial.theta.values";
+  static final String ALPHA = "alpha";
+
+  private boolean master;
+  private DoubleVector theta;
 
-    @Override
-    public void bsp(BSPPeer<VectorWritable, DoubleWritable, NullWritable, NullWritable,
VectorWritable> peer) throws IOException, SyncException, InterruptedException {
+  @Override
+  public void setup(BSPPeer<VectorWritable, DoubleWritable, NullWritable, NullWritable,
VectorWritable> peer) throws IOException, SyncException, InterruptedException {
+    master = peer.getPeerIndex() == peer.getNumPeers() / 2;
+  }
 
-        while (true) {
+  @Override
+  public void bsp(BSPPeer<VectorWritable, DoubleWritable, NullWritable, NullWritable,
VectorWritable> peer) throws IOException, SyncException, InterruptedException {
 
-            getTheta(peer);
+    while (true) {
 
-            // first superstep : calculate cost function in parallel
+      getTheta(peer);
 
-            double localCost = 0d;
+      // first superstep : calculate cost function in parallel
 
-            int numRead = 0;
+      double localCost = 0d;
 
-            // read an input
-            KeyValuePair<VectorWritable, DoubleWritable> kvp;
-            while ((kvp = peer.readNext()) != null) {
+      int numRead = 0;
 
-                // calculate cost for given input
-                double y = kvp.getValue().get();
-                DoubleVector x = kvp.getKey().getVector();
-                double costForX = y * Math.log(hypothesis(theta, x)) + (1 - y) * Math.log(1
- hypothesis(theta, x));
+      // read an input
+      KeyValuePair<VectorWritable, DoubleWritable> kvp;
+      while ((kvp = peer.readNext()) != null) {
 
-                // adds to local cost
-                localCost += costForX;
-                numRead++;
-            }
+        // calculate cost for given input
+        double y = kvp.getValue().get();
+        DoubleVector x = kvp.getKey().getVector();
+        double costForX = y * Math.log(hypothesis(theta, x)) + (1 - y) * Math.log(1 - hypothesis(theta,
x));
 
-            // cost is sent and aggregated by each
-            double totalCost = localCost;
+        // adds to local cost
+        localCost += costForX;
+        numRead++;
+      }
 
-            for (String peerName : peer.getAllPeerNames()) {
-                peer.send(peerName, new VectorWritable(new DenseDoubleVector(new double[]{localCost,
numRead})));
-            }
-            peer.sync();
+      // cost is sent and aggregated by each
+      double totalCost = localCost;
 
-            // second superstep : cost calculation
+      for (String peerName : peer.getAllPeerNames()) {
+        peer.send(peerName, new VectorWritable(new DenseDoubleVector(new double[]{localCost,
numRead})));
+      }
+      peer.sync();
 
-            VectorWritable costResult;
-            while ((costResult = peer.getCurrentMessage()) != null) {
-                totalCost += costResult.getVector().get(0);
-                numRead += costResult.getVector().get(1);
-            }
+      // second superstep : cost calculation
 
-            totalCost = totalCost * (-1 / numRead);
-            if (log.isInfoEnabled()) {
-                log.info("cost is " + totalCost);
-            }
+      VectorWritable costResult;
+      while ((costResult = peer.getCurrentMessage()) != null) {
+        totalCost += costResult.getVector().get(0);
+        numRead += costResult.getVector().get(1);
+      }
 
-            peer.sync();
+      totalCost = totalCost * (-1 / numRead);
+      if (log.isInfoEnabled()) {
+        log.info("cost is " + totalCost);
+      }
 
-            peer.reopenInput();
+      peer.sync();
 
-            double[] thetaDelta = new double[theta.getLength()];
+      peer.reopenInput();
 
-            // second superstep : calculate partial derivatives in parallel
-            while ((kvp = peer.readNext()) != null) {
-                DoubleVector x = kvp.getKey().getVector();
-                double y = kvp.getValue().get();
-                double difference = hypothesis(theta, x) - y;
-                for (int j = 0; j < theta.getLength(); j++) {
-                    thetaDelta[j] += difference * x.get(j);
-                }
-            }
+      double[] thetaDelta = new double[theta.getLength()];
 
-            // send thetaDelta to the each peer
-            for (String peerName : peer.getAllPeerNames()) {
-                peer.send(peerName, new VectorWritable(new DenseDoubleVector(thetaDelta)));
-            }
+      // second superstep : calculate partial derivatives in parallel
+      while ((kvp = peer.readNext()) != null) {
+        DoubleVector x = kvp.getKey().getVector();
+        double y = kvp.getValue().get();
+        double difference = hypothesis(theta, x) - y;
+        for (int j = 0; j < theta.getLength(); j++) {
+          thetaDelta[j] += difference * x.get(j);
+        }
+      }
 
-            peer.sync();
+      // send thetaDelta to the each peer
+      for (String peerName : peer.getAllPeerNames()) {
+        peer.send(peerName, new VectorWritable(new DenseDoubleVector(thetaDelta)));
+      }
 
-            VectorWritable thetaDeltaSlice;
-            while ((thetaDeltaSlice = peer.getCurrentMessage()) != null) {
-                double[] newTheta = new double[theta.getLength()];
+      peer.sync();
 
-                for (int j = 0; j < theta.getLength(); j++) {
-                    newTheta[j] += thetaDeltaSlice.getVector().get(j);
-                }
+      VectorWritable thetaDeltaSlice;
+      while ((thetaDeltaSlice = peer.getCurrentMessage()) != null) {
+        double[] newTheta = new double[theta.getLength()];
 
-                for (int j = 0; j < theta.getLength(); j++) {
-                    newTheta[j] = theta.get(j) - newTheta[j] * peer.getConfiguration().getFloat(ALPHA,
0.3f);
-                }
+        for (int j = 0; j < theta.getLength(); j++) {
+          newTheta[j] += thetaDeltaSlice.getVector().get(j);
+        }
 
-                theta = new DenseDoubleVector(newTheta);
+        for (int j = 0; j < theta.getLength(); j++) {
+          newTheta[j] = theta.get(j) - newTheta[j] * peer.getConfiguration().getFloat(ALPHA,
0.3f);
+        }
 
-                if (log.isInfoEnabled()) {
-                    log.info("new theta for cost " + totalCost + " is " + theta.toArray().toString());
-                }
-            }
-            peer.sync();
+        theta = new DenseDoubleVector(newTheta);
 
-            // eventually break execution !?
-            if (totalCost == 0) {
-                // TODO change this as just 0 is too strict
-                break;
-            }
+        if (log.isInfoEnabled()) {
+          log.info("new theta for cost " + totalCost + " is " + theta.toArray().toString());
         }
+      }
+      peer.sync();
 
+      // eventually break execution !?
+      if (totalCost == 0) {
+        // TODO change this as just 0 is too strict
+        break;
+      }
     }
 
-    /**
-     * Applies the hypothesis given a set of parameters theta to a given input x
-     *
-     * @param theta the parameters vector
-     * @param x     the input
-     * @return a <code>double</code> number
-     */
-    public abstract double hypothesis(DoubleVector theta, DoubleVector x);
-
-
-    public void getTheta(BSPPeer<VectorWritable, DoubleWritable, NullWritable, NullWritable,
VectorWritable> peer) throws IOException, SyncException, InterruptedException {
-        if (master && theta == null) {
-            int size = getXSize(peer);
-            theta = new DenseDoubleVector(size, peer.getConfiguration().getInt(INITIAL_THETA_VALUES,
10));
-            for (String peerName : peer.getAllPeerNames()) {
-                peer.send(peerName, new VectorWritable(theta));
-            }
-            peer.sync();
-        } else {
-            peer.sync();
-            VectorWritable vectorWritable = peer.getCurrentMessage();
-            theta = vectorWritable.getVector();
-        }
+  }
+
+  /**
+   * Applies the hypothesis given a set of parameters theta to a given input x
+   *
+   * @param theta the parameters vector
+   * @param x     the input
+   * @return a <code>double</code> number
+   */
+  public abstract double hypothesis(DoubleVector theta, DoubleVector x);
+
+
+  public void getTheta(BSPPeer<VectorWritable, DoubleWritable, NullWritable, NullWritable,
VectorWritable> peer) throws IOException, SyncException, InterruptedException {
+    if (master && theta == null) {
+      int size = getXSize(peer);
+      theta = new DenseDoubleVector(size, peer.getConfiguration().getInt(INITIAL_THETA_VALUES,
10));
+      for (String peerName : peer.getAllPeerNames()) {
+        peer.send(peerName, new VectorWritable(theta));
+      }
+      peer.sync();
+    } else {
+      peer.sync();
+      VectorWritable vectorWritable = peer.getCurrentMessage();
+      theta = vectorWritable.getVector();
     }
+  }
 
-    private int getXSize(BSPPeer<VectorWritable, DoubleWritable, NullWritable, NullWritable,
VectorWritable> peer) throws IOException {
-        VectorWritable key = null;
-        peer.readNext(key, null);
-        if (key == null) {
-            throw new IOException("cannot read input vector size");
-        }
-        return key.getVector().getLength();
+  private int getXSize(BSPPeer<VectorWritable, DoubleWritable, NullWritable, NullWritable,
VectorWritable> peer) throws IOException {
+    VectorWritable key = null;
+    peer.readNext(key, null);
+    peer.reopenInput(); // reset input to start
+    if (key == null) {
+      throw new IOException("cannot read input vector size");
     }
+    return key.getVector().getLength();
+  }
 }



Mime
View raw message