hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1341895 - in /incubator/hama/trunk: ./ examples/src/main/java/org/apache/hama/examples/ graph/src/main/java/org/apache/hama/graph/ graph/src/test/java/org/apache/hama/graph/ graph/src/test/java/org/apache/hama/graph/example/
Date Wed, 23 May 2012 15:28:17 GMT
Author: tjungblut
Date: Wed May 23 15:28:16 2012
New Revision: 1341895

URL: http://svn.apache.org/viewvc?rev=1341895&view=rev
Log:
[HAMA-579] Add multiple aggregators


Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Aggregator.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
    incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
    incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1341895&r1=1341894&r2=1341895&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed May 23 15:28:16 2012
@@ -18,7 +18,8 @@ Release 0.5 - April 10, 2012 
   BUG FIXES
 
   IMPROVEMENTS
-    
+
+    HAMA-579: Add multiple aggregators (tjungblut)
     HAMA-576: Improve sendMessages in Vertex (tjungblut)
     HAMA-575: Generify graph package (tjungblut)    
     HAMA-571: Provide graph repair function in GraphJobRunner (tjungblut)

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1341895&r1=1341894&r2=1341895&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java Wed
May 23 15:28:16 2012
@@ -75,7 +75,7 @@ public class PageRank {
       }
 
       // if we have not reached our global error yet, then proceed.
-      DoubleWritable globalError = getLastAggregatedValue();
+      DoubleWritable globalError = getLastAggregatedValue(0);
       if (globalError != null && this.getSuperstepCount() > 2
           && MAXIMUM_CONVERGENCE_ERROR > globalError.get()) {
         return;

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java?rev=1341895&r1=1341894&r2=1341895&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java
(original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java
Wed May 23 15:28:16 2012
@@ -24,12 +24,14 @@ import org.apache.hadoop.io.DoubleWritab
  * after the compute, then calculates the difference and globally accumulates
  * (sums them up) them.
  */
-public class AbsDiffAggregator extends AbstractAggregator<DoubleWritable> {
+public class AbsDiffAggregator extends
+    AbstractAggregator<DoubleWritable, Vertex<?, DoubleWritable, ?>> {
 
   double absoluteDifference = 0.0d;
 
   @Override
-  public void aggregate(DoubleWritable oldValue, DoubleWritable newValue) {
+  public void aggregate(Vertex<?, DoubleWritable, ?> v,
+      DoubleWritable oldValue, DoubleWritable newValue) {
     // make sure it's nullsafe
     if (oldValue != null) {
       absoluteDifference += Math.abs(oldValue.get() - newValue.get());

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java?rev=1341895&r1=1341894&r2=1341895&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java
(original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java
Wed May 23 15:28:16 2012
@@ -26,8 +26,8 @@ import org.apache.hadoop.io.Writable;
  * For tracking cases it increments an internal counter on each call of
  * aggregate.
  */
-public abstract class AbstractAggregator<V extends Writable> implements
-    Aggregator<V> {
+public abstract class AbstractAggregator<V extends Writable, VERTEX extends Vertex<?,
V, ?>>
+    implements Aggregator<V, VERTEX> {
 
   private int timesAggregated = 0;
 
@@ -48,18 +48,21 @@ public abstract class AbstractAggregator
   /**
    * Observes a value of a vertex after the compute method. This is intended to
    * be overriden by the user and is just an empty implementation in this class.
+   * Please make sure that you are null-checking vertex, since on a master task
+   * this will always be null.
    */
   @Override
-  public void aggregate(V value) {
+  public void aggregate(VERTEX vertex, V value) {
 
   }
 
   /**
    * Observes the old value of a vertex and the new value at the same time. This
    * is intended to be overridden by the user and is just an empty
-   * implementation in this class.
+   * implementation in this class.Please make sure that you are null-checking
+   * vertex, since on a master task this will always be null.
    */
-  public void aggregate(V oldValue, V newValue) {
+  public void aggregate(VERTEX vertex, V oldValue, V newValue) {
 
   }
 

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Aggregator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Aggregator.java?rev=1341895&r1=1341894&r2=1341895&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Aggregator.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Aggregator.java Wed May
23 15:28:16 2012
@@ -27,12 +27,12 @@ import org.apache.hadoop.io.Writable;
  * The result of an aggregator from the last superstep can be picked up by the
  * vertex itself via {@link Vertex}#getLastAggregatedValue();
  */
-public interface Aggregator<V extends Writable> {
+public interface Aggregator<V extends Writable, VERTEX extends Vertex<?, ?, ?>>
{
 
   /**
    * Observes a new vertex value.
    */
-  public void aggregate(V value);
+  public void aggregate(VERTEX vertex, V value);
 
   /**
    * Gets a vertex value.

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1341895&r1=1341894&r2=1341895&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Wed May 23
15:28:16 2012
@@ -92,9 +92,21 @@ public class GraphJob extends BSPJob {
   /**
    * Set the aggregator for the job.
    */
-  public void setAggregatorClass(@SuppressWarnings("rawtypes")
-  Class<? extends Aggregator> cls) {
-    conf.setClass(AGGREGATOR_CLASS_ATTR, cls, Aggregator.class);
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public void setAggregatorClass(Class<? extends Aggregator> cls) {
+    this.setAggregatorClass(new Class[] { cls });
+  }
+
+  /**
+   * Sets multiple aggregators for the job.
+   */
+  @SuppressWarnings("rawtypes")
+  public void setAggregatorClass(Class<? extends Aggregator>... cls) {
+    String classNames = "";
+    for (Class<? extends Aggregator> cl : cls) {
+      classNames += cl.getName() + ";";
+    }
+    conf.set(AGGREGATOR_CLASS_ATTR, classNames);
   }
 
   @SuppressWarnings("unchecked")

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1341895&r1=1341894&r2=1341895&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Wed
May 23 15:28:16 2012
@@ -64,10 +64,6 @@ public final class GraphJobRunner<VERTEX
 
   private static final Text FLAG_MESSAGE_COUNTS = new Text(
       S_FLAG_MESSAGE_COUNTS);
-  private static final Text FLAG_AGGREGATOR_VALUE = new Text(
-      S_FLAG_AGGREGATOR_VALUE);
-  private static final Text FLAG_AGGREGATOR_INCREMENT = new Text(
-      S_FLAG_AGGREGATOR_INCREMENT);
 
   public static final String MESSAGE_COMBINER_CLASS = "hama.vertex.message.combiner.class";
   public static final String GRAPH_REPAIR = "hama.graph.repair";
@@ -75,13 +71,16 @@ public final class GraphJobRunner<VERTEX
   private Configuration conf;
   private Combiner<VERTEX_VALUE> combiner;
 
-  private Aggregator<VERTEX_VALUE> aggregator;
-  private Writable globalAggregatorResult;
-  private IntWritable globalAggregatorIncrement;
-  private boolean isAbstractAggregator;
-
+  // multiple aggregator arrays
+  private Aggregator<VERTEX_VALUE, Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>>[]
aggregators;
+  private Writable[] globalAggregatorResult;
+  private IntWritable[] globalAggregatorIncrement;
+  private boolean[] isAbstractAggregator;
+  private String[] aggregatorClassNames;
+  private Text[] aggregatorValueFlag;
+  private Text[] aggregatorIncrementFlag;
   // aggregator on the master side
-  private Aggregator<VERTEX_VALUE> masterAggregator;
+  private Aggregator<VERTEX_VALUE, Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>>[]
masterAggregator;
 
   private Map<VERTEX_ID, Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>> vertices
= new HashMap<VERTEX_ID, Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>>();
 
@@ -141,17 +140,31 @@ public final class GraphJobRunner<VERTEX
           conf.getClass("hama.vertex.message.combiner.class", Combiner.class),
           conf);
     }
-
-    if (!conf.getClass("hama.graph.aggregator.class", Aggregator.class).equals(
-        Aggregator.class)) {
-      LOG.debug("Aggregator class: " + conf.get(MESSAGE_COMBINER_CLASS));
-
-      aggregator = getNewAggregator();
-      if (aggregator instanceof AbstractAggregator) {
-        isAbstractAggregator = true;
-      }
+    String aggregatorClasses = conf.get(GraphJob.AGGREGATOR_CLASS_ATTR);
+    if (aggregatorClasses != null) {
+      LOG.debug("Aggregator classes: " + aggregatorClasses);
+      aggregatorClassNames = aggregatorClasses.split(";");
+      // init to the split size
+      aggregators = new Aggregator[aggregatorClassNames.length];
+      globalAggregatorResult = new Writable[aggregatorClassNames.length];
+      globalAggregatorIncrement = new IntWritable[aggregatorClassNames.length];
+      isAbstractAggregator = new boolean[aggregatorClassNames.length];
+      aggregatorValueFlag = new Text[aggregatorClassNames.length];
+      aggregatorIncrementFlag = new Text[aggregatorClassNames.length];
       if (isMasterTask(peer)) {
-        masterAggregator = getNewAggregator();
+        masterAggregator = new Aggregator[aggregatorClassNames.length];
+      }
+      for (int i = 0; i < aggregatorClassNames.length; i++) {
+        aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
+        aggregatorValueFlag[i] = new Text(S_FLAG_AGGREGATOR_VALUE + ";" + i);
+        aggregatorIncrementFlag[i] = new Text(S_FLAG_AGGREGATOR_INCREMENT + ";"
+            + i);
+        if (aggregators[i] instanceof AbstractAggregator) {
+          isAbstractAggregator[i] = true;
+        }
+        if (isMasterTask(peer)) {
+          masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
+        }
       }
     }
 
@@ -165,15 +178,19 @@ public final class GraphJobRunner<VERTEX
       msgIterator.add(v.getValue());
       VERTEX_VALUE lastValue = v.getValue();
       v.compute(msgIterator.iterator());
-      if (aggregator != null) {
-        aggregator.aggregate(v.getValue());
-        if (isAbstractAggregator) {
-          AbstractAggregator<VERTEX_VALUE> intern = ((AbstractAggregator<VERTEX_VALUE>)
aggregator);
-          intern.aggregate(lastValue, v.getValue());
-          intern.aggregateInternal();
+      if (this.aggregators != null) {
+        for (int i = 0; i < this.aggregators.length; i++) {
+          Aggregator<VERTEX_VALUE, Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>>
aggregator = this.aggregators[i];
+          aggregator.aggregate(v, v.getValue());
+          if (isAbstractAggregator[i]) {
+            AbstractAggregator<VERTEX_VALUE, Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>>
intern = (AbstractAggregator<VERTEX_VALUE, Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>>)
aggregator;
+            intern.aggregate(v, lastValue, v.getValue());
+            intern.aggregateInternal();
+          }
         }
       }
     }
+    runAggregators(peer, 1);
     iteration++;
   }
 
@@ -200,20 +217,23 @@ public final class GraphJobRunner<VERTEX
           updatedCnt.put(FLAG_MESSAGE_COUNTS,
               new IntWritable(Integer.MIN_VALUE));
         } else {
-          if (aggregator != null) {
-            Writable lastAggregatedValue = masterAggregator.getValue();
-            if (isAbstractAggregator) {
-              final AbstractAggregator<VERTEX_VALUE> intern = ((AbstractAggregator<VERTEX_VALUE>)
aggregator);
-              final Writable finalizeAggregation = intern.finalizeAggregation();
-              if (intern.finalizeAggregation() != null) {
-                lastAggregatedValue = finalizeAggregation;
+          if (aggregators != null) {
+            for (int i = 0; i < masterAggregator.length; i++) {
+              Writable lastAggregatedValue = masterAggregator[i].getValue();
+              if (isAbstractAggregator[i]) {
+                final AbstractAggregator<VERTEX_VALUE, Vertex<VERTEX_ID, VERTEX_VALUE,
EDGE_VALUE_TYPE>> intern = ((AbstractAggregator<VERTEX_VALUE, Vertex<VERTEX_ID,
VERTEX_VALUE, EDGE_VALUE_TYPE>>) aggregators[i]);
+                final Writable finalizeAggregation = intern
+                    .finalizeAggregation();
+                if (intern.finalizeAggregation() != null) {
+                  lastAggregatedValue = finalizeAggregation;
+                }
+                // this count is usually the times of active
+                // vertices in the graph
+                updatedCnt.put(aggregatorIncrementFlag[i],
+                    intern.getTimesAggregated());
               }
-              // this count is usually the times of active
-              // vertices in the graph
-              updatedCnt.put(FLAG_AGGREGATOR_INCREMENT,
-                  intern.getTimesAggregated());
+              updatedCnt.put(aggregatorValueFlag[i], lastAggregatedValue);
             }
-            updatedCnt.put(FLAG_AGGREGATOR_VALUE, lastAggregatedValue);
           }
         }
         for (String peerName : peer.getAllPeerNames()) {
@@ -222,17 +242,14 @@ public final class GraphJobRunner<VERTEX
       }
       // if we have an aggregator defined, we must make an additional sync
       // to have the updated values available on all our peers.
-      if (aggregator != null && iteration > 1) {
+      if (aggregators != null && iteration > 1) {
         peer.sync();
 
         MapWritable updatedValues = peer.getCurrentMessage().getMap();
-        globalAggregatorResult = updatedValues.get(FLAG_AGGREGATOR_VALUE);
-        globalAggregatorIncrement = (IntWritable) updatedValues
-            .get(FLAG_AGGREGATOR_INCREMENT);
-
-        aggregator = getNewAggregator();
-        if (isMasterTask(peer)) {
-          masterAggregator = getNewAggregator();
+        for (int i = 0; i < aggregators.length; i++) {
+          globalAggregatorResult[i] = updatedValues.get(aggregatorValueFlag[i]);
+          globalAggregatorIncrement[i] = (IntWritable) updatedValues
+              .get(aggregatorIncrementFlag[i]);
         }
         IntWritable count = (IntWritable) updatedValues
             .get(FLAG_MESSAGE_COUNTS);
@@ -257,32 +274,54 @@ public final class GraphJobRunner<VERTEX
             .get(e.getKey());
         VERTEX_VALUE lastValue = vertex.getValue();
         vertex.compute(msgs.iterator());
-        if (aggregator != null) {
-          aggregator.aggregate(vertex.getValue());
-          if (isAbstractAggregator) {
-            AbstractAggregator<VERTEX_VALUE> intern = ((AbstractAggregator<VERTEX_VALUE>)
aggregator);
-            intern.aggregate(lastValue, vertex.getValue());
-            intern.aggregateInternal();
+        if (aggregators != null) {
+          if (this.aggregators != null) {
+            for (int i = 0; i < this.aggregators.length; i++) {
+              Aggregator<VERTEX_VALUE, Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>>
aggregator = this.aggregators[i];
+              aggregator.aggregate(vertex, vertex.getValue());
+              if (isAbstractAggregator[i]) {
+                AbstractAggregator<VERTEX_VALUE, Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>>
intern = ((AbstractAggregator<VERTEX_VALUE, Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>>)
aggregator);
+                intern.aggregate(vertex, lastValue, vertex.getValue());
+                intern.aggregateInternal();
+              }
+            }
           }
         }
         iterator.remove();
       }
 
-      // send msgCounts to the master task
-      MapWritable updatedCnt = new MapWritable();
-      updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(messagesSize));
-      // also send aggregated values to the master
-      if (aggregator != null) {
-        updatedCnt.put(FLAG_AGGREGATOR_VALUE, aggregator.getValue());
-        if (isAbstractAggregator) {
-          updatedCnt.put(FLAG_AGGREGATOR_INCREMENT,
-              ((AbstractAggregator<VERTEX_VALUE>) aggregator)
-                  .getTimesAggregated());
+      runAggregators(peer, messagesSize);
+      iteration++;
+    }
+  }
+
+  private void runAggregators(
+      BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable,
Writable, GraphJobMessage> peer,
+      int messagesSize) throws IOException {
+    // send msgCounts to the master task
+    MapWritable updatedCnt = new MapWritable();
+    updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(messagesSize));
+    // also send aggregated values to the master
+    if (aggregators != null) {
+      for (int i = 0; i < this.aggregators.length; i++) {
+        updatedCnt.put(aggregatorValueFlag[i], aggregators[i].getValue());
+        if (isAbstractAggregator[i]) {
+          updatedCnt
+              .put(
+                  aggregatorIncrementFlag[i],
+                  ((AbstractAggregator<VERTEX_VALUE, Vertex<VERTEX_ID, VERTEX_VALUE,
EDGE_VALUE_TYPE>>) aggregators[i])
+                      .getTimesAggregated());
+        }
+      }
+      for (int i = 0; i < aggregators.length; i++) {
+        // now create new aggregators for the next iteration
+        aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
+        if (isMasterTask(peer)) {
+          masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
         }
       }
-      peer.send(masterTask, new GraphJobMessage(updatedCnt));
-      iteration++;
     }
+    peer.send(masterTask, new GraphJobMessage(updatedCnt));
   }
 
   @SuppressWarnings("unchecked")
@@ -305,20 +344,23 @@ public final class GraphJobRunner<VERTEX
         msgs.add(value);
       } else if (msg.isMapMessage()) {
         for (Entry<Writable, Writable> e : msg.getMap().entrySet()) {
-          VERTEX_ID vertexID = (VERTEX_ID) e.getKey();
+          Text vertexID = (Text) e.getKey();
           if (FLAG_MESSAGE_COUNTS.equals(vertexID)) {
             if (((IntWritable) e.getValue()).get() == Integer.MIN_VALUE) {
               updated = false;
             } else {
               globalUpdateCounts += ((IntWritable) e.getValue()).get();
             }
-          } else if (aggregator != null
-              && FLAG_AGGREGATOR_VALUE.equals(vertexID)) {
-            masterAggregator.aggregate((VERTEX_VALUE) e.getValue());
-          } else if (aggregator != null
-              && FLAG_AGGREGATOR_INCREMENT.equals(vertexID)) {
-            if (isAbstractAggregator) {
-              ((AbstractAggregator<VERTEX_VALUE>) masterAggregator)
+          } else if (aggregators != null
+              && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) {
+            int index = Integer.parseInt(vertexID.toString().split(";")[1]);
+            masterAggregator[index]
+                .aggregate(null, (VERTEX_VALUE) e.getValue());
+          } else if (aggregators != null
+              && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) {
+            int index = Integer.parseInt(vertexID.toString().split(";")[1]);
+            if (isAbstractAggregator[index]) {
+              ((AbstractAggregator<VERTEX_VALUE, Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>>)
masterAggregator[index])
                   .addTimesAggregated(((IntWritable) e.getValue()).get());
             }
           }
@@ -462,9 +504,16 @@ public final class GraphJobRunner<VERTEX
   }
 
   @SuppressWarnings("unchecked")
-  private Aggregator<VERTEX_VALUE> getNewAggregator() {
-    return (Aggregator<VERTEX_VALUE>) ReflectionUtils.newInstance(
-        conf.getClass("hama.graph.aggregator.class", Aggregator.class), conf);
+  private Aggregator<VERTEX_VALUE, Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>>
getNewAggregator(
+      String clsName) {
+    try {
+      return (Aggregator<VERTEX_VALUE, Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>>)
ReflectionUtils
+          .newInstance(conf.getClassByName(clsName), conf);
+    } catch (ClassNotFoundException e) {
+      e.printStackTrace();
+    }
+    throw new IllegalArgumentException("Aggregator class " + clsName
+        + " could not be found or instantiated!");
   }
 
   private boolean isMasterTask(
@@ -484,12 +533,12 @@ public final class GraphJobRunner<VERTEX
     return maxIteration;
   }
 
-  public final Writable getLastAggregatedValue() {
-    return globalAggregatorResult;
+  public final Writable getLastAggregatedValue(int index) {
+    return globalAggregatorResult[index];
   }
 
-  public final IntWritable getNumLastAggregatedVertices() {
-    return globalAggregatorIncrement;
+  public final IntWritable getNumLastAggregatedVertices(int index) {
+    return globalAggregatorIncrement[index];
   }
 
 }

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java?rev=1341895&r1=1341894&r2=1341895&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java Wed
May 23 15:28:16 2012
@@ -19,12 +19,13 @@ package org.apache.hama.graph;
 
 import org.apache.hadoop.io.IntWritable;
 
-public class MaxAggregator extends AbstractAggregator<IntWritable> {
+public class MaxAggregator extends
+    AbstractAggregator<IntWritable, Vertex<?, IntWritable, ?>> {
 
   int max = Integer.MIN_VALUE;
 
   @Override
-  public void aggregate(IntWritable value) {
+  public void aggregate(Vertex<?, IntWritable, ?> vertex, IntWritable value) {
     if (value.get() > max) {
       max = value.get();
     }

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java?rev=1341895&r1=1341894&r2=1341895&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java Wed
May 23 15:28:16 2012
@@ -19,12 +19,13 @@ package org.apache.hama.graph;
 
 import org.apache.hadoop.io.IntWritable;
 
-public class MinAggregator extends AbstractAggregator<IntWritable> {
+public class MinAggregator extends
+    AbstractAggregator<IntWritable, Vertex<?, IntWritable, ?>> {
 
   int min = Integer.MAX_VALUE;
 
   @Override
-  public void aggregate(IntWritable value) {
+  public void aggregate(Vertex<?, IntWritable, ?> vertex, IntWritable value) {
     if (value.get() < min) {
       min = value.get();
     }

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1341895&r1=1341894&r2=1341895&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Wed May 23
15:28:16 2012
@@ -92,19 +92,27 @@ public abstract class Vertex<ID_TYPE ext
 
   /**
    * Get the last aggregated value of the defined aggregator, null if nothing
-   * was configured or not returned a result.
+   * was configured or not returned a result. You have to supply an index, the
+   * index is defined by the order you set the aggregator classes in
+   * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero,
+   * so if you have a single aggregator you can retrieve it via
+   * {@link #getLastAggregatedValue}(0).
    */
   @SuppressWarnings("unchecked")
-  public MSG_TYPE getLastAggregatedValue() {
-    return (MSG_TYPE) runner.getLastAggregatedValue();
+  public MSG_TYPE getLastAggregatedValue(int index) {
+    return (MSG_TYPE) runner.getLastAggregatedValue(index);
   }
 
   /**
    * Get the number of aggregated vertices in the last superstep. Or null if no
-   * aggregator is available.
+   * aggregator is available.You have to supply an index, the index is defined
+   * by the order you set the aggregator classes in
+   * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero,
+   * so if you have a single aggregator you can retrieve it via
+   * {@link #getNumLastAggregatedVertices}(0).
    */
-  public IntWritable getNumLastAggregatedVertices() {
-    return runner.getNumLastAggregatedVertices();
+  public IntWritable getNumLastAggregatedVertices(int index) {
+    return runner.getNumLastAggregatedVertices(index);
   }
 
   public int getNumPeers() {

Modified: incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1341895&r1=1341894&r2=1341895&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
(original)
+++ incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
Wed May 23 15:28:16 2012
@@ -75,6 +75,7 @@ public class TestSubmitGraphJob extends 
   private static String INPUT = "/tmp/pagerank-real-tmp.seq";
   private static String OUTPUT = "/tmp/pagerank-real-out";
 
+  @SuppressWarnings("unchecked")
   @Override
   public void testSubmitJob() throws Exception {
 
@@ -97,7 +98,7 @@ public class TestSubmitGraphJob extends 
     // we need to include a vertex in its adjacency list,
     // otherwise the pagerank result has a constant loss
     bsp.set("hama.graph.self.ref", "true");
-    bsp.setAggregatorClass(AverageAggregator.class);
+    bsp.setAggregatorClass(AverageAggregator.class, SumAggregator.class);
 
     bsp.setVertexIDClass(Text.class);
     bsp.setVertexValueClass(DoubleWritable.class);

Modified: incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java?rev=1341895&r1=1341894&r2=1341895&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java (original)
+++ incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java Wed
May 23 15:28:16 2012
@@ -64,10 +64,15 @@ public class PageRank {
         }
         double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
         this.setValue(new DoubleWritable(alpha + (DAMPING_FACTOR * sum)));
+        if (this.getSuperstepCount() > 1) {
+          if(this.getLastAggregatedValue(1).get() < 0.99 || this.getLastAggregatedValue(1).get()
> 1.0){
+            throw new RuntimeException("Sum aggregator hasn't summed correctly! " + this.getLastAggregatedValue(1).get());
+          }
+        }
       }
 
       // if we have not reached our global error yet, then proceed.
-      DoubleWritable globalError = getLastAggregatedValue();
+      DoubleWritable globalError = getLastAggregatedValue(0);
       if (globalError != null && this.getSuperstepCount() > 2
           && MAXIMUM_CONVERGENCE_ERROR > globalError.get()) {
         return;



Mime
View raw message