hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1560672 - in /hama/trunk: core/src/main/java/org/apache/hama/bsp/ examples/src/main/java/org/apache/hama/examples/ graph/src/main/java/org/apache/hama/graph/ graph/src/test/java/org/apache/hama/graph/ graph/src/test/java/org/apache/hama/gr...
Date Thu, 23 Jan 2014 12:50:13 GMT
Author: edwardyoon
Date: Thu Jan 23 12:50:13 2014
New Revision: 1560672

URL: http://svn.apache.org/r1560672
Log:
HAMA-838: Refactor aggregator.

Modified:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/Aggregator.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/SumAggregator.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAbsDiffAggregator.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAverageAggregator.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestMinMaxAggregator.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSumAggregator.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu Jan 23 12:50:13
2014
@@ -459,11 +459,13 @@ public final class BSPPeerImpl<K1, V1, K
   }
 
   public final void close() {
-    long combinedMessages = this.getCounter(PeerCounter.TOTAL_MESSAGES_SENT)
-        .getCounter()
-        - this.getCounter(PeerCounter.TOTAL_MESSAGES_RECEIVED).getCounter();
-    this.getCounter(PeerCounter.TOTAL_MESSAGES_COMBINED).increment(
-        combinedMessages);
+    if (conf.get(Constants.COMBINER_CLASS) != null) {
+      long combinedMessages = this.getCounter(PeerCounter.TOTAL_MESSAGES_SENT)
+          .getCounter()
+          - this.getCounter(PeerCounter.TOTAL_MESSAGES_RECEIVED).getCounter();
+      this.getCounter(PeerCounter.TOTAL_MESSAGES_COMBINED).increment(
+          combinedMessages);
+    }
 
     // there are many catches, because we want to close always every component
     // even if the one before failed.

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java Thu Jan 23 12:50:13
2014
@@ -19,7 +19,6 @@ package org.apache.hama.examples;
 
 import java.io.IOException;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -71,13 +70,15 @@ public class PageRank {
         }
         double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
         setValue(new DoubleWritable(alpha + (sum * DAMPING_FACTOR)));
+        aggregate(0, this.getValue());
       }
 
       // if we have not reached our global error yet, then proceed.
-      DoubleWritable globalError = getLastAggregatedValue(0);
-
+      DoubleWritable globalError = getAggregatedValue(0);
+      
       if (globalError != null && this.getSuperstepCount() > 2
           && MAXIMUM_CONVERGENCE_ERROR > globalError.get()) {
+        System.out.println(globalError);
         voteToHalt();
       } else {
         // in each superstep we are going to send a new rank to our neighbours
@@ -126,7 +127,7 @@ public class PageRank {
 
     // error
     pageJob.setAggregatorClass(AverageAggregator.class);
-    
+
     // Vertex reader
     pageJob.setVertexInputReaderClass(PagerankSeqReader.class);
 
@@ -153,7 +154,7 @@ public class PageRank {
     if (args.length < 2)
       printUsage();
 
-    HamaConfiguration conf = new HamaConfiguration(new Configuration());
+    HamaConfiguration conf = new HamaConfiguration();
     GraphJob pageJob = createJob(args, conf);
 
     long startTime = System.currentTimeMillis();

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java Thu Jan 23
12:50:13 2014
@@ -25,13 +25,12 @@ import org.apache.hadoop.io.DoubleWritab
  * (sums them up) them.
  */
 public class AbsDiffAggregator extends
-    AbstractAggregator<DoubleWritable, Vertex<?, ?, DoubleWritable>> {
+    AbstractAggregator<DoubleWritable> {
 
   double absoluteDifference = 0.0d;
 
   @Override
-  public void aggregate(Vertex<?, ?, DoubleWritable> v,
-      DoubleWritable oldValue, DoubleWritable newValue) {
+  public void aggregate(DoubleWritable oldValue, DoubleWritable newValue) {
     // make sure it's nullsafe
     if (oldValue != null) {
       absoluteDifference += Math.abs(oldValue.get() - newValue.get());
@@ -41,8 +40,7 @@ public class AbsDiffAggregator extends
   // when a master aggregates he aggregated values, he calls this, so let's just
   // sum up here.
   @Override
-  public void aggregate(Vertex<?, ?, DoubleWritable> vertex,
-      DoubleWritable value) {
+  public void aggregate(DoubleWritable value) {
     absoluteDifference += value.get();
   }
 

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java Thu Jan 23
12:50:13 2014
@@ -26,8 +26,8 @@ import org.apache.hadoop.io.Writable;
  * For tracking cases it increments an internal counter on each call of
  * aggregate.
  */
-public abstract class AbstractAggregator<M extends Writable, VERTEX extends Vertex<?,
?, M>>
-    implements Aggregator<M, VERTEX> {
+public abstract class AbstractAggregator<M extends Writable>
+    implements Aggregator<M> {
 
   private int timesAggregated = 0;
 
@@ -52,7 +52,7 @@ public abstract class AbstractAggregator
    * this will always be null.
    */
   @Override
-  public void aggregate(VERTEX vertex, M value) {
+  public void aggregate(M value) {
 
   }
 
@@ -62,7 +62,7 @@ public abstract class AbstractAggregator
    * implementation in this class.Please make sure that you are null-checking
    * vertex, since on a master task this will always be null.
    */
-  public void aggregate(VERTEX vertex, M oldValue, M newValue) {
+  public void aggregate(M oldValue, M newValue) {
 
   }
 

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java Thu Jan 23
12:50:13 2014
@@ -41,7 +41,7 @@ import com.google.common.base.Preconditi
 public final class AggregationRunner<V extends WritableComparable, E extends Writable,
M extends Writable> {
 
   // multiple aggregator arrays
-  private Aggregator<M, Vertex<V, E, M>>[] aggregators;
+  private Aggregator<M>[] aggregators;
   private Writable[] globalAggregatorResult;
   private IntWritable[] globalAggregatorIncrement;
   private boolean[] isAbstractAggregator;
@@ -49,7 +49,7 @@ public final class AggregationRunner<V e
   private Text[] aggregatorValueFlag;
   private Text[] aggregatorIncrementFlag;
   // aggregator on the master side
-  private Aggregator<M, Vertex<V, E, M>>[] masterAggregator;
+  private Aggregator<M>[] masterAggregator;
 
   private boolean enabled = false;
   private Configuration conf;
@@ -110,8 +110,7 @@ public final class AggregationRunner<V e
         updatedCnt.put(aggregatorValueFlag[i], aggregators[i].getValue());
         if (isAbstractAggregator[i]) {
           updatedCnt.put(aggregatorIncrementFlag[i],
-              ((AbstractAggregator<M, Vertex<V, E, M>>) aggregators[i])
-                  .getTimesAggregated());
+              ((AbstractAggregator<M>) aggregators[i]).getTimesAggregated());
         }
       }
       for (int i = 0; i < aggregators.length; i++) {
@@ -133,16 +132,14 @@ public final class AggregationRunner<V e
    * @param lastValue the value before compute().
    * @param v the vertex.
    */
-  public void aggregateVertex(M lastValue, Vertex<V, E, M> v) {
+  public void aggregateVertex(int index, M lastValue, M value) {
     if (isEnabled()) {
-      for (int i = 0; i < this.aggregators.length; i++) {
-        Aggregator<M, Vertex<V, E, M>> aggregator = this.aggregators[i];
-        aggregator.aggregate(v, v.getValue());
-        if (isAbstractAggregator[i]) {
-          AbstractAggregator<M, Vertex<V, E, M>> intern = (AbstractAggregator<M,
Vertex<V, E, M>>) aggregator;
-          intern.aggregate(v, lastValue, v.getValue());
-          intern.aggregateInternal();
-        }
+      Aggregator<M> aggregator = this.aggregators[index];
+      aggregator.aggregate(value);
+      if (isAbstractAggregator[index]) {
+        AbstractAggregator<M> intern = (AbstractAggregator<M>) aggregator;
+        intern.aggregate(lastValue, value);
+        intern.aggregateInternal();
       }
     }
   }
@@ -157,7 +154,7 @@ public final class AggregationRunner<V e
       for (int i = 0; i < masterAggregator.length; i++) {
         Writable lastAggregatedValue = masterAggregator[i].getValue();
         if (isAbstractAggregator[i]) {
-          final AbstractAggregator<M, Vertex<V, E, M>> intern = ((AbstractAggregator<M,
Vertex<V, E, M>>) masterAggregator[i]);
+          final AbstractAggregator<M> intern = ((AbstractAggregator<M>) masterAggregator[i]);
           final Writable finalizeAggregation = intern.finalizeAggregation();
           if (intern.finalizeAggregation() != null) {
             lastAggregatedValue = finalizeAggregation;
@@ -207,7 +204,7 @@ public final class AggregationRunner<V e
    */
   public void masterReadAggregatedValue(Text textIndex, M value) {
     int index = Integer.parseInt(textIndex.toString().split(";")[1]);
-    masterAggregator[index].aggregate(null, value);
+    masterAggregator[index].aggregate(value);
   }
 
   /**
@@ -217,15 +214,15 @@ public final class AggregationRunner<V e
   public void masterReadAggregatedIncrementalValue(Text textIndex, M value) {
     int index = Integer.parseInt(textIndex.toString().split(";")[1]);
     if (isAbstractAggregator[index]) {
-      ((AbstractAggregator<M, Vertex<V, E, M>>) masterAggregator[index])
+      ((AbstractAggregator<M>) masterAggregator[index])
           .addTimesAggregated(((IntWritable) value).get());
     }
   }
 
   @SuppressWarnings("unchecked")
-  private Aggregator<M, Vertex<V, E, M>> getNewAggregator(String clsName) {
+  private Aggregator<M> getNewAggregator(String clsName) {
     try {
-      return (Aggregator<M, Vertex<V, E, M>>) ReflectionUtils.newInstance(
+      return (Aggregator<M>) ReflectionUtils.newInstance(
           conf.getClassByName(clsName), conf);
     } catch (ClassNotFoundException e) {
       e.printStackTrace();

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Aggregator.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Aggregator.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Aggregator.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Aggregator.java Thu Jan 23 12:50:13
2014
@@ -27,12 +27,12 @@ import org.apache.hadoop.io.Writable;
  * The result of an aggregator from the last superstep can be picked up by the
  * vertex itself via {@link Vertex}#getLastAggregatedValue();
  */
-public interface Aggregator<M extends Writable, VERTEX extends Vertex<?, ?, ?>>
{
+public interface Aggregator<M extends Writable> {
 
   /**
    * Observes a new vertex value.
    */
-  public void aggregate(VERTEX vertex, M value);
+  public void aggregate(M value);
 
   /**
    * Gets a vertex value.

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Thu Jan 23 12:50:13
2014
@@ -46,7 +46,6 @@ public class GraphJob extends BSPJob {
 
   public final static String VERTEX_OUTPUT_WRITER_CLASS_ATTR = "hama.graph.vertex.output.writer.class";
   public final static String AGGREGATOR_CLASS_ATTR = "hama.graph.aggregator.class";
-  public final static String VERTEX_MESSAGE_COMBINER_CLASS_ATTR = "hama.vertex.message.combiner.class";
 
   /**
    * Creates a new Graph Job with the given configuration and an exampleClass.
@@ -164,7 +163,7 @@ public class GraphJob extends BSPJob {
   @Override
   public void setCombinerClass(Class<? extends Combiner<? extends Writable>>
cls) {
     ensureState(JobState.DEFINE);
-    conf.setClass(VERTEX_MESSAGE_COMBINER_CLASS_ATTR, cls, Combiner.class);
+    conf.setClass(Constants.COMBINER_CLASS, cls, Combiner.class);
   }
 
   /**

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Thu Jan 23 12:50:13
2014
@@ -78,7 +78,6 @@ public final class GraphJobRunner<V exte
   public static final Text FLAG_VERTEX_TOTAL_VERTICES = new Text(
       S_FLAG_VERTEX_TOTAL_VERTICES);
 
-  public static final String MESSAGE_COMBINER_CLASS_KEY = "hama.vertex.message.combiner.class";
   public static final String VERTEX_CLASS_KEY = "hama.graph.vertex.class";
 
   private HamaConfiguration conf;
@@ -269,14 +268,12 @@ public final class GraphJobRunner<V exte
       }
 
       if (!vertex.isHalted()) {
-        M lastValue = vertex.getValue();
         if (iterable == null) {
           vertex.compute(Collections.<M> emptyList());
         } else {
           vertex.compute(iterable);
           currentMessage = iterable.getOverflowMessage();
         }
-        getAggregationRunner().aggregateVertex(lastValue, vertex);
         activeVertices++;
       }
 
@@ -338,7 +335,6 @@ public final class GraphJobRunner<V exte
    * Seed the vertices first with their own values in compute. This is the first
    * superstep after the vertices have been loaded.
    */
-  @SuppressWarnings("unused")
   private void doInitialSuperstep(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
@@ -347,7 +343,6 @@ public final class GraphJobRunner<V exte
     IDSkippingIterator<V, E, M> skippingIterator = vertices.skippingIterator();
     while (skippingIterator.hasNext()) {
       Vertex<V, E, M> vertex = skippingIterator.next();
-      M lastValue = vertex.getValue();
 
       // Calls setup method.
       vertex.setup(conf);

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java Thu Jan 23 12:50:13
2014
@@ -20,12 +20,12 @@ package org.apache.hama.graph;
 import org.apache.hadoop.io.IntWritable;
 
 public class MaxAggregator extends
-    AbstractAggregator<IntWritable, Vertex<?, ?, IntWritable>> {
+    AbstractAggregator<IntWritable> {
 
   int max = Integer.MIN_VALUE;
 
   @Override
-  public void aggregate(Vertex<?, ?, IntWritable> vertex, IntWritable value) {
+  public void aggregate(IntWritable value) {
     if (value.get() > max) {
       max = value.get();
     }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java Thu Jan 23 12:50:13
2014
@@ -20,12 +20,12 @@ package org.apache.hama.graph;
 import org.apache.hadoop.io.IntWritable;
 
 public class MinAggregator extends
-    AbstractAggregator<IntWritable, Vertex<?, ?, IntWritable>> {
+    AbstractAggregator<IntWritable> {
 
   int min = Integer.MAX_VALUE;
 
   @Override
-  public void aggregate(Vertex<?, ?, IntWritable> vertex, IntWritable value) {
+  public void aggregate(IntWritable value) {
     if (value.get() < min) {
       min = value.get();
     }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
Thu Jan 23 12:50:13 2014
@@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.Constants;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.message.OutgoingMessageManager;
@@ -51,14 +52,13 @@ public class OutgoingVertexMessagesManag
   @SuppressWarnings("unchecked")
   @Override
   public void init(Configuration conf) {
-    if (!conf.getClass(GraphJobRunner.MESSAGE_COMBINER_CLASS_KEY,
-        Combiner.class).equals(Combiner.class)) {
-      LOG.debug("Combiner class: "
-          + conf.get(GraphJobRunner.MESSAGE_COMBINER_CLASS_KEY));
+    if (!conf.getClass(Constants.COMBINER_CLASS, Combiner.class).equals(
+        Combiner.class)) {
+      LOG.debug("Combiner class: " + conf.get(Constants.COMBINER_CLASS));
 
       combiner = (Combiner<Writable>) org.apache.hadoop.util.ReflectionUtils
-          .newInstance(conf.getClass("hama.vertex.message.combiner.class",
-              Combiner.class), conf);
+          .newInstance(conf.getClass(Constants.COMBINER_CLASS, Combiner.class),
+              conf);
     }
   }
 

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/SumAggregator.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/SumAggregator.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/SumAggregator.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/SumAggregator.java Thu Jan 23 12:50:13
2014
@@ -23,13 +23,12 @@ import org.apache.hadoop.io.DoubleWritab
  * Sums all vertex values globally.
  */
 public class SumAggregator extends
-    AbstractAggregator<DoubleWritable, Vertex<?, ?, DoubleWritable>> {
+    AbstractAggregator<DoubleWritable> {
 
   double sum = 0.0d;
 
   @Override
-  public void aggregate(Vertex<?, ?, DoubleWritable> vertex,
-      DoubleWritable value) {
+  public void aggregate(DoubleWritable value) {
     sum += value.get();
   }
 

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Thu Jan 23 12:50:13 2014
@@ -58,6 +58,7 @@ public abstract class Vertex<V extends W
   private transient GraphJobRunner<V, E, M> runner;
 
   private V vertexID;
+  private M oldValue;
   private M value;
   private List<Edge<V, E>> edges;
 
@@ -183,6 +184,7 @@ public abstract class Vertex<V extends W
 
   @Override
   public void setValue(M value) {
+    this.oldValue = this.value;
     this.value = value;
   }
 
@@ -194,31 +196,6 @@ public abstract class Vertex<V extends W
     return runner.getMaxIteration();
   }
 
-  /**
-   * Get the last aggregated value of the defined aggregator, null if nothing
-   * was configured or not returned a result. You have to supply an index, the
-   * index is defined by the order you set the aggregator classes in
-   * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero,
-   * so if you have a single aggregator you can retrieve it via
-   * {@link #getLastAggregatedValue}(0).
-   */
-  @SuppressWarnings("unchecked")
-  public M getLastAggregatedValue(int index) {
-    return (M) runner.getLastAggregatedValue(index);
-  }
-
-  /**
-   * Get the number of aggregated vertices in the last superstep. Or null if no
-   * aggregator is available.You have to supply an index, the index is defined
-   * by the order you set the aggregator classes in
-   * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero,
-   * so if you have a single aggregator you can retrieve it via
-   * {@link #getNumLastAggregatedVertices}(0).
-   */
-  public IntWritable getNumLastAggregatedVertices(int index) {
-    return runner.getNumLastAggregatedVertices(index);
-  }
-
   public int getNumPeers() {
     return runner.getPeer().getNumPeers();
   }
@@ -406,4 +383,35 @@ public abstract class Vertex<V extends W
     vertex.readFields(dis);
     return vertex;
   }
+  
+  @Override
+  public void aggregate(int index, M value) throws IOException {
+    this.runner.getAggregationRunner().aggregateVertex(index, oldValue, value);
+  }
+
+  /**
+   * Get the last aggregated value of the defined aggregator, null if nothing
+   * was configured or not returned a result. You have to supply an index, the
+   * index is defined by the order you set the aggregator classes in
+   * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero,
+   * so if you have a single aggregator you can retrieve it via
+   * {@link #getLastAggregatedValue}(0).
+   */
+  @SuppressWarnings("unchecked")
+  @Override
+  public M getAggregatedValue(int index) {
+    return (M) runner.getLastAggregatedValue(index);
+  }
+  
+  /**
+   * Get the number of aggregated vertices in the last superstep. Or null if no
+   * aggregator is available.You have to supply an index, the index is defined
+   * by the order you set the aggregator classes in
+   * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero,
+   * so if you have a single aggregator you can retrieve it via
+   * {@link #getNumLastAggregatedVertices}(0).
+   */
+  public IntWritable getNumLastAggregatedVertices(int index) {
+    return runner.getNumLastAggregatedVertices(index);
+  }
 }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java Thu Jan 23 12:50:13
2014
@@ -111,5 +111,20 @@ public interface VertexInterface<V exten
    * Gets the vertex value
    */
   public M getValue();
+  
+   /**
+    * Provides a value to the specified aggregator.
+    *
+    * @throws IOException
+    *
+    * @param name identifies a aggregator
+    * @param value value to be aggregated
+    */
+   public void aggregate(int index, M value) throws IOException;
+
+   /**
+    * Returns the value of the specified aggregator.
+    */
+   public Writable getAggregatedValue(int index);
 
 }

Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAbsDiffAggregator.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAbsDiffAggregator.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAbsDiffAggregator.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAbsDiffAggregator.java Thu Jan
23 12:50:13 2014
@@ -27,9 +27,9 @@ public class TestAbsDiffAggregator exten
   @Test
   public void testAggregator() {
     AbsDiffAggregator diff = new AbsDiffAggregator();
-    diff.aggregate(null, new DoubleWritable(5), new DoubleWritable(2));
-    diff.aggregate(null, new DoubleWritable(5), new DoubleWritable(2));
-    diff.aggregate(null, null, new DoubleWritable(5));
+    diff.aggregate(new DoubleWritable(5), new DoubleWritable(2));
+    diff.aggregate(new DoubleWritable(5), new DoubleWritable(2));
+    diff.aggregate(null, new DoubleWritable(5));
 
     // 0, because this is totally worthless for diffs
     assertEquals(0, diff.getTimesAggregated().get());

Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAverageAggregator.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAverageAggregator.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAverageAggregator.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAverageAggregator.java Thu Jan
23 12:50:13 2014
@@ -27,11 +27,11 @@ public class TestAverageAggregator exten
   @Test
   public void testAggregator() {
     AverageAggregator diff = new AverageAggregator();
-    diff.aggregate(null, new DoubleWritable(5), new DoubleWritable(2));
+    diff.aggregate(new DoubleWritable(5), new DoubleWritable(2));
     diff.aggregateInternal();
-    diff.aggregate(null, new DoubleWritable(5), new DoubleWritable(2));
+    diff.aggregate(new DoubleWritable(5), new DoubleWritable(2));
     diff.aggregateInternal();
-    diff.aggregate(null, null, new DoubleWritable(5));
+    diff.aggregate(null, new DoubleWritable(5));
     diff.aggregateInternal();
 
     assertEquals(3, diff.getTimesAggregated().get());

Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestMinMaxAggregator.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestMinMaxAggregator.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestMinMaxAggregator.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestMinMaxAggregator.java Thu Jan
23 12:50:13 2014
@@ -27,8 +27,8 @@ public class TestMinMaxAggregator extend
   @Test
   public void testMinAggregator() {
     MinAggregator diff = new MinAggregator();
-    diff.aggregate(null, new IntWritable(5));
-    diff.aggregate(null, new IntWritable(25));
+    diff.aggregate(new IntWritable(5));
+    diff.aggregate(new IntWritable(25));
     assertEquals(5, diff.getValue().get());
 
   }
@@ -36,8 +36,8 @@ public class TestMinMaxAggregator extend
   @Test
   public void testMaxAggregator() {
     MaxAggregator diff = new MaxAggregator();
-    diff.aggregate(null, new IntWritable(5));
-    diff.aggregate(null, new IntWritable(25));
+    diff.aggregate(new IntWritable(5));
+    diff.aggregate(new IntWritable(25));
     assertEquals(25, diff.getValue().get());
   }
 

Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSumAggregator.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSumAggregator.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSumAggregator.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSumAggregator.java Thu Jan 23
12:50:13 2014
@@ -27,8 +27,8 @@ public class TestSumAggregator extends T
   @Test
   public void testAggregator() {
     SumAggregator diff = new SumAggregator();
-    diff.aggregate(null, new DoubleWritable(5));
-    diff.aggregate(null, new DoubleWritable(5));
+    diff.aggregate(new DoubleWritable(5));
+    diff.aggregate(new DoubleWritable(5));
     assertEquals(10, (int) diff.getValue().get());
 
   }

Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java Thu Jan 23
12:50:13 2014
@@ -74,7 +74,7 @@ public class PageRank {
       }
 
       // if we have not reached our global error yet, then proceed.
-      DoubleWritable globalError = this.getLastAggregatedValue(0);
+      DoubleWritable globalError = this.getAggregatedValue(0);
       if (globalError != null && this.getSuperstepCount() > 2
           && MAXIMUM_CONVERGENCE_ERROR > globalError.get()) {
         voteToHalt();



Mime
View raw message