hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1560605 - in /hama/trunk: examples/src/main/java/org/apache/hama/examples/ examples/src/test/java/org/apache/hama/examples/ graph/src/main/java/org/apache/hama/graph/
Date Thu, 23 Jan 2014 08:16:45 GMT
Author: edwardyoon
Date: Thu Jan 23 08:16:44 2014
New Revision: 1560605

URL: http://svn.apache.org/r1560605
Log:
Revert r1556691

Removed:
    hama/trunk/examples/src/test/java/org/apache/hama/examples/AggregatorsTest.java
Modified:
    hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1560605&r1=1560604&r2=1560605&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java Thu Jan 23 08:16:44
2014
@@ -40,7 +40,6 @@ import org.apache.hama.graph.VertexInput
  * Real pagerank with dangling node contribution.
  */
 public class PageRank {
-  private static final String AVG_AGGREGATOR = "average.aggregator";
 
   public static class PageRankVertex extends
       Vertex<Text, NullWritable, DoubleWritable> {
@@ -72,11 +71,10 @@ public class PageRank {
         }
         double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
         setValue(new DoubleWritable(alpha + (sum * DAMPING_FACTOR)));
-        aggregate(AVG_AGGREGATOR, this.getValue());
       }
 
       // if we have not reached our global error yet, then proceed.
-      DoubleWritable globalError = (DoubleWritable) getAggregatedValue(AVG_AGGREGATOR);
+      DoubleWritable globalError = getLastAggregatedValue(0);
 
       if (globalError != null && this.getSuperstepCount() > 2
           && MAXIMUM_CONVERGENCE_ERROR > globalError.get()) {
@@ -127,8 +125,8 @@ public class PageRank {
     }
 
     // error
-    pageJob.registerAggregator(AVG_AGGREGATOR, AverageAggregator.class);
-
+    pageJob.setAggregatorClass(AverageAggregator.class);
+    
     // Vertex reader
     pageJob.setVertexInputReaderClass(PagerankSeqReader.class);
 

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java?rev=1560605&r1=1560604&r2=1560605&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java Thu Jan 23
08:16:44 2014
@@ -17,20 +17,22 @@
  */
 package org.apache.hama.graph;
 
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.sync.SyncException;
+
+import com.google.common.base.Preconditions;
 
 /**
  * Runner class to do the tasks that need to be done if aggregation was
@@ -39,31 +41,117 @@ import org.apache.hama.bsp.BSPPeer;
  */
 @SuppressWarnings("rawtypes")
 public final class AggregationRunner<V extends WritableComparable, E extends Writable,
M extends Writable> {
-  private Map<String, Aggregator> Aggregators;
-  private Map<String, Writable> aggregatorResults;
-  private Set<String> aggregatorsUsed;
 
+  // multiple aggregator arrays
+  private Aggregator<M, Vertex<V, E, M>>[] aggregators;
+  private Set<Integer> skipAggregators;
+  private Writable[] globalAggregatorResult;
+  private IntWritable[] globalAggregatorIncrement;
+  private boolean[] isAbstractAggregator;
+  private String[] aggregatorClassNames;
+  private Text[] aggregatorValueFlag;
+  private Text[] aggregatorIncrementFlag;
+  // aggregator on the master side
+  private Aggregator<M, Vertex<V, E, M>>[] masterAggregator;
+
+  private boolean enabled = false;
   private Configuration conf;
-  private Text textWrap = new Text();
 
+  @SuppressWarnings("unchecked")
   public void setupAggregators(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
     this.conf = peer.getConfiguration();
-
-    this.aggregatorResults = new HashMap<String, Writable>(4);
-    this.Aggregators = new HashMap<String, Aggregator>(4);
-    this.aggregatorsUsed = new HashSet<String>(4);
-
-    String customAggregatorClasses = peer.getConfiguration().get(
+    String aggregatorClasses = peer.getConfiguration().get(
         GraphJob.AGGREGATOR_CLASS_ATTR);
+    this.skipAggregators = new HashSet<Integer>();
+    if (aggregatorClasses != null) {
+      enabled = true;
+      aggregatorClassNames = aggregatorClasses.split(";");
+      // init to the split size
+      aggregators = new Aggregator[aggregatorClassNames.length];
+      globalAggregatorResult = new Writable[aggregatorClassNames.length];
+      globalAggregatorIncrement = new IntWritable[aggregatorClassNames.length];
+      isAbstractAggregator = new boolean[aggregatorClassNames.length];
+      aggregatorValueFlag = new Text[aggregatorClassNames.length];
+      aggregatorIncrementFlag = new Text[aggregatorClassNames.length];
+      if (GraphJobRunner.isMasterTask(peer)) {
+        masterAggregator = new Aggregator[aggregatorClassNames.length];
+      }
+      for (int i = 0; i < aggregatorClassNames.length; i++) {
+        aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
+        aggregatorValueFlag[i] = new Text(
+            GraphJobRunner.S_FLAG_AGGREGATOR_VALUE + ";" + i);
+        aggregatorIncrementFlag[i] = new Text(
+            GraphJobRunner.S_FLAG_AGGREGATOR_INCREMENT + ";" + i);
+        if (aggregators[i] instanceof AbstractAggregator) {
+          isAbstractAggregator[i] = true;
+        }
+        if (GraphJobRunner.isMasterTask(peer)) {
+          masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
+        }
+      }
+    }
+  }
 
-    if (customAggregatorClasses != null) {
-      String[] custAggrs = customAggregatorClasses.split(";");
+  /**
+   * Runs the aggregators by sending their values to the master task.
+   * @param changedVertexCnt 
+   */
+  public void sendAggregatorValues(
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
+      int activeVertices, int changedVertexCnt) throws IOException {
+    // send msgCounts to the master task
+    MapWritable updatedCnt = new MapWritable();
+    updatedCnt.put(GraphJobRunner.FLAG_MESSAGE_COUNTS, new IntWritable(
+        activeVertices));
+    // send total number of vertices changes
+    updatedCnt.put(GraphJobRunner.FLAG_VERTEX_ALTER_COUNTER, new LongWritable(
+        changedVertexCnt));
+    // also send aggregated values to the master
+    if (aggregators != null) {
+      for (int i = 0; i < this.aggregators.length; i++) {
+        if (!this.skipAggregators.contains(i)) {
+          updatedCnt.put(aggregatorValueFlag[i], aggregators[i].getValue());
+          if (isAbstractAggregator[i]) {
+            updatedCnt.put(aggregatorIncrementFlag[i],
+                ((AbstractAggregator<M, Vertex<V, E, M>>) aggregators[i])
+                    .getTimesAggregated());
+          }          
+        }
+      }
+      for (int i = 0; i < aggregators.length; i++) {
+        if (!this.skipAggregators.contains(i)) {
+          // now create new aggregators for the next iteration
+          aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
+          if (GraphJobRunner.isMasterTask(peer)) {
+            masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
+          }          
+        }
+      }
+    }
+    peer.send(GraphJobRunner.getMasterTask(peer), new GraphJobMessage(
+        updatedCnt));
+  }
 
-      for (String aggr : custAggrs) {
-        String[] Name_AggrClass = aggr.split("@", 2);
-        this.Aggregators.put(Name_AggrClass[0],
-            getNewAggregator(Name_AggrClass[1]));
+  /**
+   * Aggregates the last value before computation and the value after the
+   * computation.
+   * 
+   * @param lastValue the value before compute().
+   * @param v the vertex.
+   */
+  public void aggregateVertex(M lastValue, Vertex<V, E, M> v) {
+    if (isEnabled()) {
+      for (int i = 0; i < this.aggregators.length; i++) {
+        if (!this.skipAggregators.contains(i)) {
+          Aggregator<M, Vertex<V, E, M>> aggregator = this.aggregators[i];
+          aggregator.aggregate(v, v.getValue());
+          if (isAbstractAggregator[i]) {
+            AbstractAggregator<M, Vertex<V, E, M>> intern = (AbstractAggregator<M,
Vertex<V, E, M>>) aggregator;
+            intern.aggregate(v, lastValue, v.getValue());
+            intern.aggregateInternal();
+          }          
+        }
       }
     }
   }
@@ -73,20 +161,26 @@ public final class AggregationRunner<V e
    * peer and updates the given map accordingly.
    */
   public void doMasterAggregation(MapWritable updatedCnt) {
-    // Get results only from used aggregators.
-    for (String name : this.aggregatorsUsed) {
-      updatedCnt.put(new Text(name), this.Aggregators.get(name).getValue());
-    }
-    this.aggregatorsUsed.clear();
-
-    // Reset all custom aggregators. TODO: Change the aggregation interface to
-    // include clean() method.
-    Map<String, Aggregator> tmp = new HashMap<String, Aggregator>(4);
-    for (Entry<String, Aggregator> e : this.Aggregators.entrySet()) {
-      String aggClass = e.getValue().getClass().getName();
-      tmp.put(e.getKey(), getNewAggregator(aggClass));
+    if (isEnabled()) {
+      // work through the master aggregators
+      for (int i = 0; i < masterAggregator.length; i++) {
+        if (!this.skipAggregators.contains(i)) {
+          Writable lastAggregatedValue = masterAggregator[i].getValue();
+          if (isAbstractAggregator[i]) {
+            final AbstractAggregator<M, Vertex<V, E, M>> intern = ((AbstractAggregator<M,
Vertex<V, E, M>>) masterAggregator[i]);
+            final Writable finalizeAggregation = intern.finalizeAggregation();
+            if (intern.finalizeAggregation() != null) {
+              lastAggregatedValue = finalizeAggregation;
+            }
+            // this count is usually the times of active
+            // vertices in the graph
+            updatedCnt.put(aggregatorIncrementFlag[i],
+                intern.getTimesAggregated());
+          }
+          updatedCnt.put(aggregatorValueFlag[i], lastAggregatedValue);   
+        }
+      }
     }
-    this.Aggregators = tmp;
   }
 
   /**
@@ -96,20 +190,13 @@ public final class AggregationRunner<V e
    *         we haven't seen any messages anymore.
    */
   public boolean receiveAggregatedValues(MapWritable updatedValues,
-      long iteration) {
-    // In every superstep, we create a new result collection as we don't save
-    // history.
-    // If a value is missing, the user will take a null result. By creating a
-    // new collection
-    // every time, we can reduce the network cost (because we send less
-    // information by skipping null values)
-    // But we are losing in GC.
-    this.aggregatorResults = new HashMap<String, Writable>(4);
-    for (String name : this.Aggregators.keySet()) {
-      this.textWrap.set(name);
-      this.aggregatorResults.put(name, updatedValues.get(textWrap));
+      long iteration) throws IOException, SyncException, InterruptedException {
+    // map is the first value that is in the queue
+    for (int i = 0; i < aggregators.length; i++) {
+        globalAggregatorResult[i] = updatedValues.get(aggregatorValueFlag[i]);
+        globalAggregatorIncrement[i] = (IntWritable) updatedValues
+            .get(aggregatorIncrementFlag[i]);
     }
-
     IntWritable count = (IntWritable) updatedValues
         .get(GraphJobRunner.FLAG_MESSAGE_COUNTS);
     if (count != null && count.get() == Integer.MIN_VALUE) {
@@ -119,16 +206,47 @@ public final class AggregationRunner<V e
   }
 
   /**
-   * Method to let the custom master aggregator read messages from peers and
-   * aggregate a value.
+   * @return true if aggregators were defined. Normally used by the internal
+   *         stateful methods, outside shouldn't use it too extensively.
    */
-  @SuppressWarnings("unchecked")
-  public void masterAggregation(Text name, Writable value) {
-    String nameIdx = name.toString().split(";", 2)[1];
-    this.Aggregators.get(nameIdx).aggregate(null, value);
+  public boolean isEnabled() {
+    return enabled;
+  }
+
+  /**
+   * Method to let the master read messages from peers and aggregate a value.
+   */
+  public void masterReadAggregatedValue(Text textIndex, M value) {
+    int index = Integer.parseInt(textIndex.toString().split(";")[1]);
+    masterAggregator[index].aggregate(null, value);
+  }
 
-    // When it's time to send the values, we can see which aggregators are used.
-    this.aggregatorsUsed.add(nameIdx);
+  /**
+   * Method to let the master read messages from peers and aggregate the
+   * incremental value.
+   */
+  public void masterReadAggregatedIncrementalValue(Text textIndex, M value) {
+    int index = Integer.parseInt(textIndex.toString().split(";")[1]);
+    if (isAbstractAggregator[index]) {
+      ((AbstractAggregator<M, Vertex<V, E, M>>) masterAggregator[index])
+          .addTimesAggregated(((IntWritable) value).get());
+    }
+  }
+
+  /**
+   * This method adds an id of an aggregator that will be skipped in the current
+   * superstep.
+   */
+  public void addSkipAggregator(int index) {
+    this.skipAggregators.add(index);
+  }
+
+  /**
+   * This method adds an id of an aggregator that will be skipped in the current
+   * superstep.
+   */
+  void resetSkipAggregators() {
+    this.skipAggregators.clear();
   }
 
   @SuppressWarnings("unchecked")
@@ -143,7 +261,13 @@ public final class AggregationRunner<V e
         + " could not be found or instantiated!");
   }
 
-  public final Writable getAggregatedValue(String name) {
-    return this.aggregatorResults.get(name);
+  public final Writable getLastAggregatedValue(int index) {
+    return globalAggregatorResult[Preconditions.checkPositionIndex(index,
+        globalAggregatorResult.length)];
+  }
+
+  public final IntWritable getNumLastAggregatedVertices(int index) {
+    return globalAggregatorIncrement[Preconditions.checkPositionIndex(index,
+        globalAggregatorIncrement.length)];
   }
-}
+}
\ No newline at end of file

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1560605&r1=1560604&r2=1560605&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Thu Jan 23 08:16:44
2014
@@ -107,20 +107,23 @@ public class GraphJob extends BSPJob {
   }
 
   /**
-   * Custom aggregator registration. Add a custom aggregator that will aggregate
-   * massages sent from the user.
-   * 
-   * @param name identifies an aggregator
-   * @param aggregatorClass the aggregator class
+   * Set the aggregator for the job.
    */
-  @SuppressWarnings("rawtypes")
-  public void registerAggregator(String name,
-      Class<? extends Aggregator> aggregatorClass) {
-    String prevAggrs = this.conf.get(AGGREGATOR_CLASS_ATTR, "");
-
-    prevAggrs += name + "@" + aggregatorClass.getName() + ";";
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public void setAggregatorClass(Class<? extends Aggregator> cls) {
+    this.setAggregatorClass(new Class[] { cls });
+  }
 
-    this.conf.set(AGGREGATOR_CLASS_ATTR, prevAggrs);
+  /**
+   * Sets multiple aggregators for the job.
+   */
+  @SuppressWarnings("rawtypes")
+  public void setAggregatorClass(Class<? extends Aggregator>... cls) {
+    String classNames = "";
+    for (Class<? extends Aggregator> cl : cls) {
+      classNames += cl.getName() + ";";
+    }
+    conf.set(AGGREGATOR_CLASS_ATTR, classNames);
   }
 
   /**

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1560605&r1=1560604&r2=1560605&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Thu Jan 23 08:16:44
2014
@@ -63,11 +63,12 @@ public final class GraphJobRunner<V exte
   // make sure that these values don't collide with the vertex names
   public static final String S_FLAG_MESSAGE_COUNTS = "hama.0";
   public static final String S_FLAG_AGGREGATOR_VALUE = "hama.1";
-  public static final String S_FLAG_VERTEX_INCREASE = "hama.2";
-  public static final String S_FLAG_VERTEX_DECREASE = "hama.3";
-  public static final String S_FLAG_VERTEX_ALTER_COUNTER = "hama.4";
-  public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.5";
-
+  public static final String S_FLAG_AGGREGATOR_INCREMENT = "hama.2";
+  public static final String S_FLAG_VERTEX_INCREASE = "hama.3";
+  public static final String S_FLAG_VERTEX_DECREASE = "hama.4";
+  public static final String S_FLAG_VERTEX_ALTER_COUNTER = "hama.5";
+  public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.6";
+  public static final String S_FLAG_AGGREGATOR_SKIP = "hama.7";
   public static final Text FLAG_MESSAGE_COUNTS = new Text(S_FLAG_MESSAGE_COUNTS);
   public static final Text FLAG_VERTEX_INCREASE = new Text(
       S_FLAG_VERTEX_INCREASE);
@@ -178,10 +179,22 @@ public final class GraphJobRunner<V exte
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException, SyncException, InterruptedException {
 
-    // This run only on master
-    if (isMasterTask(peer)) {
+    if (isMasterTask(peer) && iteration == 1) {
+      MapWritable updatedCnt = new MapWritable();
+      updatedCnt.put(
+          FLAG_VERTEX_TOTAL_VERTICES,
+          new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES)
+              .getCounter())));
+      // send the updates from the master tasks back to the slaves
+      for (String peerName : peer.getAllPeerNames()) {
+        peer.send(peerName, new GraphJobMessage(updatedCnt));
+      }
+    }
+
+    // this is only done in every second iteration
+    if (isMasterTask(peer) && iteration > 1) {
       MapWritable updatedCnt = new MapWritable();
-      // send total number of vertices
+      // send total number of vertices.
       updatedCnt.put(
           FLAG_VERTEX_TOTAL_VERTICES,
           new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES)
@@ -197,18 +210,26 @@ public final class GraphJobRunner<V exte
         peer.send(peerName, new GraphJobMessage(updatedCnt));
       }
     }
-
-    if (firstVertexMessage != null) {
-      peer.send(peer.getPeerName(), firstVertexMessage);
+    if (getAggregationRunner().isEnabled() && iteration > 1) {
+      // in case we need to sync, we need to replay the messages that already
+      // are added to the queue. This prevents loosing messages when using
+      // aggregators.
+      if (firstVertexMessage != null) {
+        peer.send(peer.getPeerName(), firstVertexMessage);
+      }
+      GraphJobMessage msg = null;
+      while ((msg = peer.getCurrentMessage()) != null) {
+        peer.send(peer.getPeerName(), msg);
+      }
+      // now sync
+      peer.sync();
+      // now the map message must be read that might be send from the master
+      updated = getAggregationRunner().receiveAggregatedValues(
+          peer.getCurrentMessage().getMap(), iteration);
+      // set the first vertex message back to the message it had before sync
+      firstVertexMessage = peer.getCurrentMessage();
     }
-
-    // now sync
-    peer.sync();
-    // now the map message must be read that might be send from the master
-    updated = getAggregationRunner().receiveAggregatedValues(
-        peer.getCurrentMessage().getMap(), iteration);
-    // set the first vertex message back to the message it had before sync
-    firstVertexMessage = peer.getCurrentMessage();
+    this.aggregationRunner.resetSkipAggregators();
     return firstVertexMessage;
   }
 
@@ -250,12 +271,14 @@ public final class GraphJobRunner<V exte
       }
 
       if (!vertex.isHalted()) {
+        M lastValue = vertex.getValue();
         if (iterable == null) {
           vertex.compute(Collections.<M> emptyList());
         } else {
           vertex.compute(iterable);
           currentMessage = iterable.getOverflowMessage();
         }
+        getAggregationRunner().aggregateVertex(lastValue, vertex);
         activeVertices++;
       }
 
@@ -265,7 +288,8 @@ public final class GraphJobRunner<V exte
     }
     vertices.finishSuperstep();
 
-    sendControllValues(activeVertices, this.changedVertexCnt);
+    getAggregationRunner().sendAggregatorValues(peer, activeVertices,
+        this.changedVertexCnt);
     iteration++;
   }
 
@@ -316,6 +340,7 @@ public final class GraphJobRunner<V exte
    * Seed the vertices first with their own values in compute. This is the first
    * superstep after the vertices have been loaded.
    */
+  @SuppressWarnings("unused")
   private void doInitialSuperstep(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
@@ -324,6 +349,7 @@ public final class GraphJobRunner<V exte
     IDSkippingIterator<V, E, M> skippingIterator = vertices.skippingIterator();
     while (skippingIterator.hasNext()) {
       Vertex<V, E, M> vertex = skippingIterator.next();
+      M lastValue = vertex.getValue();
 
       // Calls setup method.
       vertex.setup(conf);
@@ -331,7 +357,7 @@ public final class GraphJobRunner<V exte
       vertices.finishVertexComputation(vertex);
     }
     vertices.finishSuperstep();
-    sendControllValues(1, this.changedVertexCnt);
+    getAggregationRunner().sendAggregatorValues(peer, 1, this.changedVertexCnt);
     iteration++;
   }
 
@@ -551,10 +577,14 @@ public final class GraphJobRunner<V exte
             } else {
               globalUpdateCounts += ((IntWritable) e.getValue()).get();
             }
-
-          } else if (vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) {
-            this.getAggregationRunner().masterAggregation(vertexID,
-                e.getValue());
+          } else if (getAggregationRunner().isEnabled()
+              && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) {
+            getAggregationRunner().masterReadAggregatedValue(vertexID,
+                (M) e.getValue());
+          } else if (getAggregationRunner().isEnabled()
+              && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) {
+            getAggregationRunner().masterReadAggregatedIncrementalValue(
+                vertexID, (M) e.getValue());
           } else if (FLAG_VERTEX_INCREASE.equals(vertexID)) {
             dynamicAdditions = true;
             addVertex((Vertex<V, E, M>) e.getValue());
@@ -574,9 +604,11 @@ public final class GraphJobRunner<V exte
             }
           }
         }
+
       } else {
         throw new UnsupportedOperationException("Unknown message type: " + msg);
       }
+
     }
 
     // If we applied any changes to vertices, we need to call finishAdditions
@@ -620,20 +652,23 @@ public final class GraphJobRunner<V exte
   }
 
   /**
-   * Runs internal aggregators and send their values to the master task.
+   * Gets the last aggregated value at the given index. The index is dependend
+   * on how the aggregators were configured during job setup phase.
    * 
-   * @param activeVertices number of active vertices in this peer
-   * @param changedVertexCnt number of added/removed vertices in a superstep
+   * @return the value of the aggregator, or null if none was defined.
    */
-  private void sendControllValues(int activeVertices, int changedVertexCnt)
-      throws IOException {
-    // send msgCounts to the master task
-    MapWritable updatedCnt = new MapWritable();
-    updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(activeVertices));
-    // send total number of vertices changes
-    updatedCnt.put(FLAG_VERTEX_ALTER_COUNTER,
-        new LongWritable(changedVertexCnt));
-    peer.send(getMasterTask(peer), new GraphJobMessage(updatedCnt));
+  public final Writable getLastAggregatedValue(int index) {
+    return getAggregationRunner().getLastAggregatedValue(index);
+  }
+
+  /**
+   * Gets the last aggregated number of vertices at the given index. The index
+   * is dependend on how the aggregators were configured during job setup phase.
+   * 
+   * @return the value of the aggregator, or null if none was defined.
+   */
+  public final IntWritable getNumLastAggregatedVertices(int index) {
+    return getAggregationRunner().getNumLastAggregatedVertices(index);
   }
 
   /**

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1560605&r1=1560604&r2=1560605&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Thu Jan 23 08:16:44 2014
@@ -27,8 +27,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hama.HamaConfiguration;
@@ -194,6 +194,31 @@ public abstract class Vertex<V extends W
     return runner.getMaxIteration();
   }
 
+  /**
+   * Get the last aggregated value of the defined aggregator, null if nothing
+   * was configured or not returned a result. You have to supply an index, the
+   * index is defined by the order you set the aggregator classes in
+   * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero,
+   * so if you have a single aggregator you can retrieve it via
+   * {@link #getLastAggregatedValue}(0).
+   */
+  @SuppressWarnings("unchecked")
+  public M getLastAggregatedValue(int index) {
+    return (M) runner.getLastAggregatedValue(index);
+  }
+
+  /**
+   * Get the number of aggregated vertices in the last superstep. Or null if no
+   * aggregator is available.You have to supply an index, the index is defined
+   * by the order you set the aggregator classes in
+   * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero,
+   * so if you have a single aggregator you can retrieve it via
+   * {@link #getNumLastAggregatedVertices}(0).
+   */
+  public IntWritable getNumLastAggregatedVertices(int index) {
+    return runner.getNumLastAggregatedVertices(index);
+  }
+
   public int getNumPeers() {
     return runner.getPeer().getNumPeers();
   }
@@ -361,30 +386,6 @@ public abstract class Vertex<V extends W
 
   }
 
-  /**
-   * Provides a value to the specified aggregator.
-   * 
-   * @throws IOException
-   * 
-   * @param name identifies an aggregator
-   * @param value value to be aggregated
-   */
-  @Override
-  public void aggregate(String name, M value) throws IOException {
-    MapWritable msg = new MapWritable();
-    msg.put(new Text(GraphJobRunner.S_FLAG_AGGREGATOR_VALUE + ";" + name),
-        value);
-
-    // Get master task peer.
-    String destPeer = GraphJobRunner.getMasterTask(this.getPeer());
-    runner.getPeer().send(destPeer, new GraphJobMessage(msg));
-  }
-
-  @Override
-  public Writable getAggregatedValue(String name) {
-    return this.runner.getAggregationRunner().getAggregatedValue(name);
-  }
-
   protected void setRunner(GraphJobRunner<V, E, M> runner) {
     this.runner = runner;
   }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1560605&r1=1560604&r2=1560605&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java Thu Jan 23 08:16:44
2014
@@ -112,19 +112,4 @@ public interface VertexInterface<V exten
    */
   public M getValue();
 
-  /**
-   * Provides a value to the specified aggregator.
-   * 
-   * @throws IOException
-   * 
-   * @param name identifies a aggregator
-   * @param value value to be aggregated
-   */
-  public void aggregate(String name, M value) throws IOException;
-
-  /**
-   * Returns the value of the specified aggregator.
-   */
-  public Writable getAggregatedValue(String name);
-
 }



Mime
View raw message