hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1556691 - in /hama/trunk: ./ examples/src/main/java/org/apache/hama/examples/ examples/src/test/java/org/apache/hama/examples/ graph/src/main/java/org/apache/hama/graph/ graph/src/test/java/org/apache/hama/graph/ graph/src/test/java/org/ap...
Date Thu, 09 Jan 2014 01:10:59 GMT
Author: edwardyoon
Date: Thu Jan  9 01:10:59 2014
New Revision: 1556691

URL: http://svn.apache.org/r1556691
Log:
HAMA-838: Refactor aggregators

Added:
    hama/trunk/examples/src/test/java/org/apache/hama/examples/AggregatorsTest.java
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1556691&r1=1556690&r2=1556691&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Thu Jan  9 01:10:59 2014
@@ -20,6 +20,7 @@ Release 0.7.0 (unreleased changes)
 
   IMPROVEMENTS
 
+   HAMA-838: Refactor aggregators (Anastasis Andronidis)
    HAMA-783: Improve the InMemory verticesInfo implementations (edwardyoon)
    HAMA-829: Improve code and fix Javadoc warnings in org.apache.hama.pipes (Martin Illecker)
    HAMA-808: Hama Pipes Testcase (Martin Illecker)

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1556691&r1=1556690&r2=1556691&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java Thu Jan  9 01:10:59
2014
@@ -40,6 +40,7 @@ import org.apache.hama.graph.VertexInput
  * Real pagerank with dangling node contribution.
  */
 public class PageRank {
+  private static final String AVG_AGGREGATOR = "average.aggregator";
 
   public static class PageRankVertex extends
       Vertex<Text, NullWritable, DoubleWritable> {
@@ -63,29 +64,29 @@ public class PageRank {
     public void compute(Iterable<DoubleWritable> messages) throws IOException {
       // initialize this vertex to 1 / count of global vertices in this graph
       if (this.getSuperstepCount() == 0) {
-        this.setValue(new DoubleWritable(1.0 / this.getNumVertices()));
+        setValue(new DoubleWritable(1.0 / this.getNumVertices()));
       } else if (this.getSuperstepCount() >= 1) {
         double sum = 0;
         for (DoubleWritable msg : messages) {
           sum += msg.get();
         }
         double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
-        this.setValue(new DoubleWritable(alpha + (sum * DAMPING_FACTOR)));
+        setValue(new DoubleWritable(alpha + (sum * DAMPING_FACTOR)));
+        aggregate(AVG_AGGREGATOR, this.getValue());
       }
 
       // if we have not reached our global error yet, then proceed.
-      DoubleWritable globalError = getLastAggregatedValue(0);
+      DoubleWritable globalError = (DoubleWritable) getAggregatedValue(AVG_AGGREGATOR);
+
       if (globalError != null && this.getSuperstepCount() > 2
           && MAXIMUM_CONVERGENCE_ERROR > globalError.get()) {
         voteToHalt();
-        return;
+      } else {
+        // in each superstep we are going to send a new rank to our neighbours
+        sendMessageToNeighbors(new DoubleWritable(this.getValue().get()
+            / this.getEdges().size()));
       }
-
-      // in each superstep we are going to send a new rank to our neighbours
-      sendMessageToNeighbors(new DoubleWritable(this.getValue().get()
-          / this.getEdges().size()));
     }
-
   }
 
   public static class PagerankSeqReader
@@ -126,7 +127,7 @@ public class PageRank {
     }
 
     // error
-    pageJob.setAggregatorClass(AverageAggregator.class);
+    pageJob.registerAggregator(AVG_AGGREGATOR, AverageAggregator.class);
 
     // Vertex reader
     pageJob.setVertexInputReaderClass(PagerankSeqReader.class);

Added: hama/trunk/examples/src/test/java/org/apache/hama/examples/AggregatorsTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/AggregatorsTest.java?rev=1556691&view=auto
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/AggregatorsTest.java (added)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/AggregatorsTest.java Thu Jan
 9 01:10:59 2014
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.TextInputFormat;
+import org.apache.hama.bsp.TextOutputFormat;
+import org.apache.hama.graph.GraphJob;
+import org.apache.hama.graph.SumAggregator;
+import org.apache.hama.graph.Vertex;
+import org.apache.hama.graph.VertexInputReader;
+import org.junit.Test;
+
+/**
+ * Unit test for aggregators
+ */
+public class AggregatorsTest extends TestCase {
+  private static String OUTPUT = "/tmp/page-out";
+  private Configuration conf = new HamaConfiguration();
+  private FileSystem fs;
+
+  private void deleteTempDirs() {
+    try {
+      if (fs.exists(new Path(OUTPUT)))
+        fs.delete(new Path(OUTPUT), true);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  private void verifyResult() throws IOException {
+    FileStatus[] globStatus = fs.globStatus(new Path(OUTPUT + "/part-*"));
+    for (FileStatus fts : globStatus) {
+      BufferedReader reader = new BufferedReader(new InputStreamReader(
+          fs.open(fts.getPath())));
+      String line = null;
+
+      String[] results = { "6.0", "2.0", "3.0", "4.0" };
+
+      for (int i = 1; i < 5; i++) {
+        line = reader.readLine();
+        String[] split = line.split("\t");
+        assertTrue(split[0].equals(String.valueOf(i)));
+        assertTrue(split[1].equals(results[i - 1]));
+        System.out.println(split[0] + " : " + split[1]);
+      }
+    }
+  }
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    fs = FileSystem.get(conf);
+  }
+
+  @Test
+  public void test() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    try {
+      CustomAggregators
+          .main(new String[] { "src/test/resources/dg.txt", OUTPUT });
+      verifyResult();
+    } finally {
+      deleteTempDirs();
+    }
+  }
+
+  static class CustomAggregators {
+
+    public static class GraphTextReader
+        extends
+        VertexInputReader<LongWritable, Text, Text, NullWritable, DoubleWritable> {
+
+      @Override
+      public boolean parseVertex(LongWritable key, Text value,
+          Vertex<Text, NullWritable, DoubleWritable> vertex) throws Exception {
+
+        vertex.setVertexID(value);
+        vertex
+            .setValue(new DoubleWritable(Double.parseDouble(value.toString())));
+
+        return true;
+      }
+    }
+
+    public static class GraphVertex extends
+        Vertex<Text, NullWritable, DoubleWritable> {
+
+      @Override
+      public void compute(Iterable<DoubleWritable> msgs) throws IOException {
+
+        // We will send 2 custom messages on superstep 2 and 4 only!
+        if (this.getSuperstepCount() == 2) {
+          this.aggregate("mySum", new DoubleWritable(1.0));
+        }
+
+        if (this.getSuperstepCount() == 4) {
+          this.aggregate("mySum", new DoubleWritable(2.0));
+        }
+
+        // We will get the first aggrigation result from our custom aggregator
+        // on superstep 3,
+        // and we will store the result only in vertex 4.
+        // This vertex should have value = 4.
+        if (this.getSuperstepCount() == 3
+            && this.getVertexID().toString().equals("4")) {
+          this.setValue((DoubleWritable) this.getAggregatedValue("mySum"));
+        }
+
+        // By setting vertex number 3 to halt, we will see a change on the
+        // aggregating results
+        // in both custom and global aggregators.
+        // This vertex should have value = 3.
+        if (this.getSuperstepCount() == 3
+            && this.getVertexID().toString().equals("3")) {
+          this.voteToHalt();
+        }
+
+        // This vertex should have value = 6 (3 vertices are working x 2 the
+        // custom value)
+        if (this.getSuperstepCount() == 5
+            && this.getVertexID().toString().equals("1")) {
+          this.setValue((DoubleWritable) this.getAggregatedValue("mySum"));
+        }
+
+        if (this.getSuperstepCount() == 6) {
+          this.voteToHalt();
+        }
+      }
+    }
+
+    public static void main(String[] args) throws IOException,
+        InterruptedException, ClassNotFoundException {
+      if (args.length != 2) {
+        printUsage();
+      }
+      HamaConfiguration conf = new HamaConfiguration(new Configuration());
+      GraphJob graphJob = createJob(args, conf);
+      long startTime = System.currentTimeMillis();
+      if (graphJob.waitForCompletion(true)) {
+        System.out.println("Job Finished in "
+            + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+      }
+    }
+
+    private static void printUsage() {
+      System.out.println("Usage: <input> <output>");
+      System.exit(-1);
+    }
+
+    private static GraphJob createJob(String[] args, HamaConfiguration conf)
+        throws IOException {
+      GraphJob graphJob = new GraphJob(conf, CustomAggregators.class);
+      graphJob.setJobName("Custom Aggregators");
+      graphJob.setVertexClass(GraphVertex.class);
+
+      graphJob.registerAggregator("mySum", SumAggregator.class);
+
+      graphJob.setInputPath(new Path(args[0]));
+      graphJob.setOutputPath(new Path(args[1]));
+
+      graphJob.setVertexIDClass(Text.class);
+      graphJob.setVertexValueClass(DoubleWritable.class);
+      graphJob.setEdgeValueClass(NullWritable.class);
+
+      graphJob.setInputFormat(TextInputFormat.class);
+
+      graphJob.setVertexInputReaderClass(GraphTextReader.class);
+      graphJob.setPartitioner(HashPartitioner.class);
+
+      graphJob.setOutputFormat(TextOutputFormat.class);
+      graphJob.setOutputKeyClass(Text.class);
+      graphJob.setOutputValueClass(DoubleWritable.class);
+
+      return graphJob;
+    }
+  }
+}

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java?rev=1556691&r1=1556690&r2=1556691&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java Thu Jan  9
01:10:59 2014
@@ -17,22 +17,20 @@
  */
 package org.apache.hama.graph;
 
-import java.io.IOException;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.sync.SyncException;
-
-import com.google.common.base.Preconditions;
 
 /**
  * Runner class to do the tasks that need to be done if aggregation was
@@ -41,117 +39,31 @@ import com.google.common.base.Preconditi
  */
 @SuppressWarnings("rawtypes")
 public final class AggregationRunner<V extends WritableComparable, E extends Writable,
M extends Writable> {
+  private Map<String, Aggregator> Aggregators;
+  private Map<String, Writable> aggregatorResults;
+  private Set<String> aggregatorsUsed;
 
-  // multiple aggregator arrays
-  private Aggregator<M, Vertex<V, E, M>>[] aggregators;
-  private Set<Integer> skipAggregators;
-  private Writable[] globalAggregatorResult;
-  private IntWritable[] globalAggregatorIncrement;
-  private boolean[] isAbstractAggregator;
-  private String[] aggregatorClassNames;
-  private Text[] aggregatorValueFlag;
-  private Text[] aggregatorIncrementFlag;
-  // aggregator on the master side
-  private Aggregator<M, Vertex<V, E, M>>[] masterAggregator;
-
-  private boolean enabled = false;
   private Configuration conf;
+  private Text textWrap = new Text();
 
-  @SuppressWarnings("unchecked")
   public void setupAggregators(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
     this.conf = peer.getConfiguration();
-    String aggregatorClasses = peer.getConfiguration().get(
+
+    this.aggregatorResults = new HashMap<String, Writable>(4);
+    this.Aggregators = new HashMap<String, Aggregator>(4);
+    this.aggregatorsUsed = new HashSet<String>(4);
+
+    String customAggregatorClasses = peer.getConfiguration().get(
         GraphJob.AGGREGATOR_CLASS_ATTR);
-    this.skipAggregators = new HashSet<Integer>();
-    if (aggregatorClasses != null) {
-      enabled = true;
-      aggregatorClassNames = aggregatorClasses.split(";");
-      // init to the split size
-      aggregators = new Aggregator[aggregatorClassNames.length];
-      globalAggregatorResult = new Writable[aggregatorClassNames.length];
-      globalAggregatorIncrement = new IntWritable[aggregatorClassNames.length];
-      isAbstractAggregator = new boolean[aggregatorClassNames.length];
-      aggregatorValueFlag = new Text[aggregatorClassNames.length];
-      aggregatorIncrementFlag = new Text[aggregatorClassNames.length];
-      if (GraphJobRunner.isMasterTask(peer)) {
-        masterAggregator = new Aggregator[aggregatorClassNames.length];
-      }
-      for (int i = 0; i < aggregatorClassNames.length; i++) {
-        aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
-        aggregatorValueFlag[i] = new Text(
-            GraphJobRunner.S_FLAG_AGGREGATOR_VALUE + ";" + i);
-        aggregatorIncrementFlag[i] = new Text(
-            GraphJobRunner.S_FLAG_AGGREGATOR_INCREMENT + ";" + i);
-        if (aggregators[i] instanceof AbstractAggregator) {
-          isAbstractAggregator[i] = true;
-        }
-        if (GraphJobRunner.isMasterTask(peer)) {
-          masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
-        }
-      }
-    }
-  }
 
-  /**
-   * Runs the aggregators by sending their values to the master task.
-   * @param changedVertexCnt 
-   */
-  public void sendAggregatorValues(
-      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
-      int activeVertices, int changedVertexCnt) throws IOException {
-    // send msgCounts to the master task
-    MapWritable updatedCnt = new MapWritable();
-    updatedCnt.put(GraphJobRunner.FLAG_MESSAGE_COUNTS, new IntWritable(
-        activeVertices));
-    // send total number of vertices changes
-    updatedCnt.put(GraphJobRunner.FLAG_VERTEX_ALTER_COUNTER, new LongWritable(
-        changedVertexCnt));
-    // also send aggregated values to the master
-    if (aggregators != null) {
-      for (int i = 0; i < this.aggregators.length; i++) {
-        if (!this.skipAggregators.contains(i)) {
-          updatedCnt.put(aggregatorValueFlag[i], aggregators[i].getValue());
-          if (isAbstractAggregator[i]) {
-            updatedCnt.put(aggregatorIncrementFlag[i],
-                ((AbstractAggregator<M, Vertex<V, E, M>>) aggregators[i])
-                    .getTimesAggregated());
-          }          
-        }
-      }
-      for (int i = 0; i < aggregators.length; i++) {
-        if (!this.skipAggregators.contains(i)) {
-          // now create new aggregators for the next iteration
-          aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
-          if (GraphJobRunner.isMasterTask(peer)) {
-            masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
-          }          
-        }
-      }
-    }
-    peer.send(GraphJobRunner.getMasterTask(peer), new GraphJobMessage(
-        updatedCnt));
-  }
+    if (customAggregatorClasses != null) {
+      String[] custAggrs = customAggregatorClasses.split(";");
 
-  /**
-   * Aggregates the last value before computation and the value after the
-   * computation.
-   * 
-   * @param lastValue the value before compute().
-   * @param v the vertex.
-   */
-  public void aggregateVertex(M lastValue, Vertex<V, E, M> v) {
-    if (isEnabled()) {
-      for (int i = 0; i < this.aggregators.length; i++) {
-        if (!this.skipAggregators.contains(i)) {
-          Aggregator<M, Vertex<V, E, M>> aggregator = this.aggregators[i];
-          aggregator.aggregate(v, v.getValue());
-          if (isAbstractAggregator[i]) {
-            AbstractAggregator<M, Vertex<V, E, M>> intern = (AbstractAggregator<M,
Vertex<V, E, M>>) aggregator;
-            intern.aggregate(v, lastValue, v.getValue());
-            intern.aggregateInternal();
-          }          
-        }
+      for (String aggr : custAggrs) {
+        String[] Name_AggrClass = aggr.split("@", 2);
+        this.Aggregators.put(Name_AggrClass[0],
+            getNewAggregator(Name_AggrClass[1]));
       }
     }
   }
@@ -161,26 +73,20 @@ public final class AggregationRunner<V e
    * peer and updates the given map accordingly.
    */
   public void doMasterAggregation(MapWritable updatedCnt) {
-    if (isEnabled()) {
-      // work through the master aggregators
-      for (int i = 0; i < masterAggregator.length; i++) {
-        if (!this.skipAggregators.contains(i)) {
-          Writable lastAggregatedValue = masterAggregator[i].getValue();
-          if (isAbstractAggregator[i]) {
-            final AbstractAggregator<M, Vertex<V, E, M>> intern = ((AbstractAggregator<M,
Vertex<V, E, M>>) masterAggregator[i]);
-            final Writable finalizeAggregation = intern.finalizeAggregation();
-            if (intern.finalizeAggregation() != null) {
-              lastAggregatedValue = finalizeAggregation;
-            }
-            // this count is usually the times of active
-            // vertices in the graph
-            updatedCnt.put(aggregatorIncrementFlag[i],
-                intern.getTimesAggregated());
-          }
-          updatedCnt.put(aggregatorValueFlag[i], lastAggregatedValue);   
-        }
-      }
+    // Get results only from used aggregators.
+    for (String name : this.aggregatorsUsed) {
+      updatedCnt.put(new Text(name), this.Aggregators.get(name).getValue());
+    }
+    this.aggregatorsUsed.clear();
+
+    // Reset all custom aggregators. TODO: Change the aggregation interface to
+    // include clean() method.
+    Map<String, Aggregator> tmp = new HashMap<String, Aggregator>(4);
+    for (Entry<String, Aggregator> e : this.Aggregators.entrySet()) {
+      String aggClass = e.getValue().getClass().getName();
+      tmp.put(e.getKey(), getNewAggregator(aggClass));
     }
+    this.Aggregators = tmp;
   }
 
   /**
@@ -190,13 +96,20 @@ public final class AggregationRunner<V e
    *         we haven't seen any messages anymore.
    */
   public boolean receiveAggregatedValues(MapWritable updatedValues,
-      long iteration) throws IOException, SyncException, InterruptedException {
-    // map is the first value that is in the queue
-    for (int i = 0; i < aggregators.length; i++) {
-        globalAggregatorResult[i] = updatedValues.get(aggregatorValueFlag[i]);
-        globalAggregatorIncrement[i] = (IntWritable) updatedValues
-            .get(aggregatorIncrementFlag[i]);
+      long iteration) {
+    // In every superstep, we create a new result collection as we don't save
+    // history.
+    // If a value is missing, the user will take a null result. By creating a
+    // new collection
+    // every time, we can reduce the network cost (because we send less
+    // information by skipping null values)
+    // But we are losing in GC.
+    this.aggregatorResults = new HashMap<String, Writable>(4);
+    for (String name : this.Aggregators.keySet()) {
+      this.textWrap.set(name);
+      this.aggregatorResults.put(name, updatedValues.get(textWrap));
     }
+
     IntWritable count = (IntWritable) updatedValues
         .get(GraphJobRunner.FLAG_MESSAGE_COUNTS);
     if (count != null && count.get() == Integer.MIN_VALUE) {
@@ -206,47 +119,16 @@ public final class AggregationRunner<V e
   }
 
   /**
-   * @return true if aggregators were defined. Normally used by the internal
-   *         stateful methods, outside shouldn't use it too extensively.
+   * Method to let the custom master aggregator read messages from peers and
+   * aggregate a value.
    */
-  public boolean isEnabled() {
-    return enabled;
-  }
-
-  /**
-   * Method to let the master read messages from peers and aggregate a value.
-   */
-  public void masterReadAggregatedValue(Text textIndex, M value) {
-    int index = Integer.parseInt(textIndex.toString().split(";")[1]);
-    masterAggregator[index].aggregate(null, value);
-  }
-
-  /**
-   * Method to let the master read messages from peers and aggregate the
-   * incremental value.
-   */
-  public void masterReadAggregatedIncrementalValue(Text textIndex, M value) {
-    int index = Integer.parseInt(textIndex.toString().split(";")[1]);
-    if (isAbstractAggregator[index]) {
-      ((AbstractAggregator<M, Vertex<V, E, M>>) masterAggregator[index])
-          .addTimesAggregated(((IntWritable) value).get());
-    }
-  }
+  @SuppressWarnings("unchecked")
+  public void masterAggregation(Text name, Writable value) {
+    String nameIdx = name.toString().split(";", 2)[1];
+    this.Aggregators.get(nameIdx).aggregate(null, value);
 
-  /**
-   * This method adds an id of an aggregator that will be skipped in the current
-   * superstep.
-   */
-  public void addSkipAggregator(int index) {
-    this.skipAggregators.add(index);
-  }
-
-  /**
-   * This method adds an id of an aggregator that will be skipped in the current
-   * superstep.
-   */
-  void resetSkipAggregators() {
-    this.skipAggregators.clear();
+    // When it's time to send the values, we can see which aggregators are used.
+    this.aggregatorsUsed.add(nameIdx);
   }
 
   @SuppressWarnings("unchecked")
@@ -261,13 +143,7 @@ public final class AggregationRunner<V e
         + " could not be found or instantiated!");
   }
 
-  public final Writable getLastAggregatedValue(int index) {
-    return globalAggregatorResult[Preconditions.checkPositionIndex(index,
-        globalAggregatorResult.length)];
-  }
-
-  public final IntWritable getNumLastAggregatedVertices(int index) {
-    return globalAggregatorIncrement[Preconditions.checkPositionIndex(index,
-        globalAggregatorIncrement.length)];
+  public final Writable getAggregatedValue(String name) {
+    return this.aggregatorResults.get(name);
   }
 }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1556691&r1=1556690&r2=1556691&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Thu Jan  9 01:10:59
2014
@@ -102,23 +102,20 @@ public class GraphJob extends BSPJob {
   }
 
   /**
-   * Set the aggregator for the job.
-   */
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  public void setAggregatorClass(Class<? extends Aggregator> cls) {
-    this.setAggregatorClass(new Class[] { cls });
-  }
-
-  /**
-   * Sets multiple aggregators for the job.
-   */
+  * Custom aggregator registration. Add a custom aggregator
+  * that will aggregate massages sent from the user.
+  *
+  * @param name identifies an aggregator
+  * @param aggregatorClass the aggregator class
+  */
   @SuppressWarnings("rawtypes")
-  public void setAggregatorClass(Class<? extends Aggregator>... cls) {
-    String classNames = "";
-    for (Class<? extends Aggregator> cl : cls) {
-      classNames += cl.getName() + ";";
-    }
-    conf.set(AGGREGATOR_CLASS_ATTR, classNames);
+  public void registerAggregator(String name, Class<? extends
+      Aggregator> aggregatorClass) {
+    String prevAggrs = this.conf.get(AGGREGATOR_CLASS_ATTR, "");
+
+    prevAggrs += name + "@" + aggregatorClass.getName() + ";";
+
+    this.conf.set(AGGREGATOR_CLASS_ATTR, prevAggrs);
   }
 
   /**

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1556691&r1=1556690&r2=1556691&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java Thu Jan  9 01:10:59
2014
@@ -92,7 +92,6 @@ public final class GraphJobMessage imple
     } else {
       vertexId.write(out);
     }
-
   }
 
   public void fastReadFields(DataInput in) throws IOException {
@@ -217,6 +216,7 @@ public final class GraphJobMessage imple
       buffer = new DataInputBuffer();
     }
 
+    @Override
     public synchronized int compare(byte[] b1, int s1, int l1, byte[] b2,
         int s2, int l2) {
       try {

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1556691&r1=1556690&r2=1556691&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Thu Jan  9 01:10:59
2014
@@ -64,12 +64,11 @@ public final class GraphJobRunner<V exte
   // make sure that these values don't collide with the vertex names
   public static final String S_FLAG_MESSAGE_COUNTS = "hama.0";
   public static final String S_FLAG_AGGREGATOR_VALUE = "hama.1";
-  public static final String S_FLAG_AGGREGATOR_INCREMENT = "hama.2";
-  public static final String S_FLAG_VERTEX_INCREASE = "hama.3";
-  public static final String S_FLAG_VERTEX_DECREASE = "hama.4";
-  public static final String S_FLAG_VERTEX_ALTER_COUNTER = "hama.5";
-  public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.6";
-  public static final String S_FLAG_AGGREGATOR_SKIP = "hama.7";
+  public static final String S_FLAG_VERTEX_INCREASE = "hama.2";
+  public static final String S_FLAG_VERTEX_DECREASE = "hama.3";
+  public static final String S_FLAG_VERTEX_ALTER_COUNTER = "hama.4";
+  public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.5";
+
   public static final Text FLAG_MESSAGE_COUNTS = new Text(S_FLAG_MESSAGE_COUNTS);
   public static final Text FLAG_VERTEX_INCREASE = new Text(
       S_FLAG_VERTEX_INCREASE);
@@ -79,8 +78,6 @@ public final class GraphJobRunner<V exte
       S_FLAG_VERTEX_ALTER_COUNTER);
   public static final Text FLAG_VERTEX_TOTAL_VERTICES = new Text(
       S_FLAG_VERTEX_TOTAL_VERTICES);
-  public static final Text FLAG_AGGREGATOR_SKIP = new Text(
-      S_FLAG_AGGREGATOR_SKIP);
 
   public static final String MESSAGE_COMBINER_CLASS_KEY = "hama.vertex.message.combiner.class";
   public static final String VERTEX_CLASS_KEY = "hama.graph.vertex.class";
@@ -183,22 +180,10 @@ public final class GraphJobRunner<V exte
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException, SyncException, InterruptedException {
 
-    if (isMasterTask(peer) && iteration == 1) {
-      MapWritable updatedCnt = new MapWritable();
-      updatedCnt.put(
-          FLAG_VERTEX_TOTAL_VERTICES,
-          new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES)
-              .getCounter())));
-      // send the updates from the master tasks back to the slaves
-      for (String peerName : peer.getAllPeerNames()) {
-        peer.send(peerName, new GraphJobMessage(updatedCnt));
-      }
-    }
-
-    // this is only done in every second iteration
-    if (isMasterTask(peer) && iteration > 1) {
+    // This run only on master
+    if (isMasterTask(peer)) {
       MapWritable updatedCnt = new MapWritable();
-      // send total number of vertices.
+      // send total number of vertices
       updatedCnt.put(
           FLAG_VERTEX_TOTAL_VERTICES,
           new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES)
@@ -214,26 +199,26 @@ public final class GraphJobRunner<V exte
         peer.send(peerName, new GraphJobMessage(updatedCnt));
       }
     }
-    if (getAggregationRunner().isEnabled() && iteration > 1) {
-      // in case we need to sync, we need to replay the messages that already
-      // are added to the queue. This prevents loosing messages when using
-      // aggregators.
-      if (firstVertexMessage != null) {
-        peer.send(peer.getPeerName(), firstVertexMessage);
-      }
-      GraphJobMessage msg = null;
-      while ((msg = peer.getCurrentMessage()) != null) {
-        peer.send(peer.getPeerName(), msg);
-      }
-      // now sync
-      peer.sync();
-      // now the map message must be read that might be send from the master
-      updated = getAggregationRunner().receiveAggregatedValues(
-          peer.getCurrentMessage().getMap(), iteration);
-      // set the first vertex message back to the message it had before sync
-      firstVertexMessage = peer.getCurrentMessage();
+
+    // in case we need to sync, we need to replay the messages that already
+    // are added to the queue. This prevents loosing messages when using
+    // aggregators.
+    if (firstVertexMessage != null) {
+      peer.send(peer.getPeerName(), firstVertexMessage);
     }
-    this.aggregationRunner.resetSkipAggregators();
+
+    GraphJobMessage msg = null;
+    while ((msg = peer.getCurrentMessage()) != null) {
+      peer.send(peer.getPeerName(), msg);
+    }
+
+    // now sync
+    peer.sync();
+    // now the map message must be read that might be send from the master
+    updated = getAggregationRunner().receiveAggregatedValues(
+        peer.getCurrentMessage().getMap(), iteration);
+    // set the first vertex message back to the message it had before sync
+    firstVertexMessage = peer.getCurrentMessage();
     return firstVertexMessage;
   }
 
@@ -274,7 +259,6 @@ public final class GraphJobRunner<V exte
       }
 
       if (!vertex.isHalted()) {
-        M lastValue = vertex.getValue();
         if (iterable == null) {
           vertex.compute(Collections.<M> emptyList());
         } else {
@@ -286,7 +270,6 @@ public final class GraphJobRunner<V exte
           }
           currentMessage = iterable.getOverflowMessage();
         }
-        getAggregationRunner().aggregateVertex(lastValue, vertex);
         activeVertices++;
       }
 
@@ -296,8 +279,7 @@ public final class GraphJobRunner<V exte
     }
     vertices.finishSuperstep();
 
-    getAggregationRunner().sendAggregatorValues(peer, activeVertices,
-        this.changedVertexCnt);
+    sendControllValues(activeVertices, this.changedVertexCnt);
     iteration++;
   }
 
@@ -357,15 +339,13 @@ public final class GraphJobRunner<V exte
     while (skippingIterator.hasNext()) {
       Vertex<V, E, M> vertex = skippingIterator.next();
 
-      M lastValue = vertex.getValue();
       // Calls setup method.
       vertex.setup(conf);
       vertex.compute(Collections.singleton(vertex.getValue()));
-      getAggregationRunner().aggregateVertex(lastValue, vertex);
       vertices.finishVertexComputation(vertex);
     }
     vertices.finishSuperstep();
-    getAggregationRunner().sendAggregatorValues(peer, 1, this.changedVertexCnt);
+    sendControllValues(1, this.changedVertexCnt);
     iteration++;
   }
 
@@ -594,14 +574,10 @@ public final class GraphJobRunner<V exte
             } else {
               globalUpdateCounts += ((IntWritable) e.getValue()).get();
             }
-          } else if (getAggregationRunner().isEnabled()
-              && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) {
-            getAggregationRunner().masterReadAggregatedValue(vertexID,
-                (M) e.getValue());
-          } else if (getAggregationRunner().isEnabled()
-              && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) {
-            getAggregationRunner().masterReadAggregatedIncrementalValue(
-                vertexID, (M) e.getValue());
+
+          } else if (vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) {
+            this.getAggregationRunner().masterAggregation(vertexID,
+                e.getValue());
           } else if (FLAG_VERTEX_INCREASE.equals(vertexID)) {
             dynamicAdditions = true;
             addVertex((Vertex<V, E, M>) e.getValue());
@@ -619,21 +595,11 @@ public final class GraphJobRunner<V exte
                   "A message to increase vertex count is in a wrong place: "
                       + peer);
             }
-          } else if (FLAG_AGGREGATOR_SKIP.equals(vertexID)) {
-            if (isMasterTask(peer)) {
-              this.getAggregationRunner().addSkipAggregator(
-                  ((IntWritable) e.getValue()).get());
-            } else {
-              throw new UnsupportedOperationException(
-                  "A message to skip aggregators is in a wrong peer: " + peer);
-            }
           }
         }
-
       } else {
         throw new UnsupportedOperationException("Unknown message type: " + msg);
       }
-
     }
 
     // If we applied any changes to vertices, we need to call finishAdditions
@@ -677,23 +643,20 @@ public final class GraphJobRunner<V exte
   }
 
   /**
-   * Gets the last aggregated value at the given index. The index is dependend
-   * on how the aggregators were configured during job setup phase.
-   * 
-   * @return the value of the aggregator, or null if none was defined.
-   */
-  public final Writable getLastAggregatedValue(int index) {
-    return getAggregationRunner().getLastAggregatedValue(index);
-  }
-
-  /**
-   * Gets the last aggregated number of vertices at the given index. The index
-   * is dependend on how the aggregators were configured during job setup phase.
+   * Runs internal aggregators and send their values to the master task.
    * 
-   * @return the value of the aggregator, or null if none was defined.
+   * @param activeVertices number of active vertices in this peer
+   * @param changedVertexCnt number of added/removed vertices in a superstep
    */
-  public final IntWritable getNumLastAggregatedVertices(int index) {
-    return getAggregationRunner().getNumLastAggregatedVertices(index);
+  private void sendControllValues(int activeVertices, int changedVertexCnt)
+      throws IOException {
+    // send msgCounts to the master task
+    MapWritable updatedCnt = new MapWritable();
+    updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(activeVertices));
+    // send total number of vertices changes
+    updatedCnt.put(FLAG_VERTEX_ALTER_COUNTER,
+        new LongWritable(changedVertexCnt));
+    peer.send(getMasterTask(peer), new GraphJobMessage(updatedCnt));
   }
 
   /**

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java?rev=1556691&r1=1556690&r2=1556691&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java Thu Jan
 9 01:10:59 2014
@@ -82,6 +82,7 @@ public class OffHeapVerticesInfo<V exten
     vertices.dump();
   }
 
+  @Override
   public void addVertex(Vertex<V, E, M> vertex) {
     vertices.put(vertex.getVertexID(), vertex);
   }
@@ -108,6 +109,7 @@ public class OffHeapVerticesInfo<V exten
     vertices.clear();
   }
 
+  @Override
   public int size() {
     return (int) this.vertices.entries();
   }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1556691&r1=1556690&r2=1556691&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Thu Jan  9 01:10:59 2014
@@ -27,8 +27,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hama.HamaConfiguration;
@@ -75,7 +75,7 @@ public abstract class Vertex<V extends W
   @Override
   public void setup(HamaConfiguration conf) {
   }
-  
+
   @Override
   public void sendMessage(Edge<V, E> e, M msg) throws IOException {
     runner.getPeer().send(getDestinationPeerName(e),
@@ -120,24 +120,26 @@ public abstract class Vertex<V extends W
   private void alterVertexCounter(int i) throws IOException {
     this.runner.setChangedVertexCnt(this.runner.getChangedVertexCnt() + i);
   }
-  
+
   @Override
-  public void addVertex(V vertexID, List<Edge<V, E>> edges, M value) throws IOException
{
+  public void addVertex(V vertexID, List<Edge<V, E>> edges, M value)
+      throws IOException {
     MapWritable msg = new MapWritable();
     // Create the new vertex.
-    Vertex<V, E, M> vertex = GraphJobRunner.<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
+    Vertex<V, E, M> vertex = GraphJobRunner
+        .<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
     vertex.setEdges(edges);
     vertex.setValue(value);
     vertex.setVertexID(vertexID);
-    
+
     msg.put(GraphJobRunner.FLAG_VERTEX_INCREASE, vertex);
     // Find the proper partition to host the new vertex.
-    int partition = getPartitioner().getPartition(vertexID, value, 
+    int partition = getPartitioner().getPartition(vertexID, value,
         runner.getPeer().getNumPeers());
     String destPeer = runner.getPeer().getAllPeerNames()[partition];
-    
+
     runner.getPeer().send(destPeer, new GraphJobMessage(msg));
-    
+
     alterVertexCounter(1);
   }
 
@@ -145,11 +147,11 @@ public abstract class Vertex<V extends W
   public void remove() throws IOException {
     MapWritable msg = new MapWritable();
     msg.put(GraphJobRunner.FLAG_VERTEX_DECREASE, this.vertexID);
-    
+
     // Get master task peer.
     String destPeer = GraphJobRunner.getMasterTask(this.getPeer());
     runner.getPeer().send(destPeer, new GraphJobMessage(msg));
-    
+
     alterVertexCounter(-1);
   }
 
@@ -192,31 +194,6 @@ public abstract class Vertex<V extends W
     return runner.getMaxIteration();
   }
 
-  /**
-   * Get the last aggregated value of the defined aggregator, null if nothing
-   * was configured or not returned a result. You have to supply an index, the
-   * index is defined by the order you set the aggregator classes in
-   * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero,
-   * so if you have a single aggregator you can retrieve it via
-   * {@link #getLastAggregatedValue}(0).
-   */
-  @SuppressWarnings("unchecked")
-  public M getLastAggregatedValue(int index) {
-    return (M) runner.getLastAggregatedValue(index);
-  }
-
-  /**
-   * Get the number of aggregated vertices in the last superstep. Or null if no
-   * aggregator is available.You have to supply an index, the index is defined
-   * by the order you set the aggregator classes in
-   * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero,
-   * so if you have a single aggregator you can retrieve it via
-   * {@link #getNumLastAggregatedVertices}(0).
-   */
-  public IntWritable getNumLastAggregatedVertices(int index) {
-    return runner.getNumLastAggregatedVertices(index);
-  }
-
   public int getNumPeers() {
     return runner.getPeer().getNumPeers();
   }
@@ -245,21 +222,6 @@ public abstract class Vertex<V extends W
     this.votedToHalt = true;
   }
 
-  /**
-   * Disable an aggregator for the next superstep. The returning value of 
-   * the aggregator will be null.
-   */
-  public void skipAggregator(int index) throws IOException {
-    MapWritable msg = new MapWritable();
-    msg.put(GraphJobRunner.FLAG_AGGREGATOR_SKIP, new IntWritable(index));
-    
-    this.runner.getAggregationRunner().addSkipAggregator(index);
-    
-    // Get master task peer.
-    String destPeer = GraphJobRunner.getMasterTask(this.getPeer());
-    runner.getPeer().send(destPeer, new GraphJobMessage(msg));
-  }
-
   void setActive() {
     this.votedToHalt = false;
   }
@@ -314,7 +276,7 @@ public abstract class Vertex<V extends W
       }
       this.value.readFields(in);
     }
-    
+
     this.edges = new ArrayList<Edge<V, E>>();
     if (in.readBoolean()) {
       int num = in.readInt();
@@ -345,7 +307,7 @@ public abstract class Vertex<V extends W
       out.writeBoolean(true);
       vertexID.write(out);
     }
-    
+
     if (value == null) {
       out.writeBoolean(false);
     } else {
@@ -399,6 +361,30 @@ public abstract class Vertex<V extends W
 
   }
 
+  /**
+   * Provides a value to the specified aggregator.
+   * 
+   * @throws IOException
+   * 
+   * @param name identifies an aggregator
+   * @param value value to be aggregated
+   */
+  @Override
+  public void aggregate(String name, M value) throws IOException {
+    MapWritable msg = new MapWritable();
+    msg.put(new Text(GraphJobRunner.S_FLAG_AGGREGATOR_VALUE + ";" + name),
+        value);
+
+    // Get master task peer.
+    String destPeer = GraphJobRunner.getMasterTask(this.getPeer());
+    runner.getPeer().send(destPeer, new GraphJobMessage(msg));
+  }
+
+  @Override
+  public Writable getAggregatedValue(String name) {
+    return this.runner.getAggregationRunner().getAggregatedValue(name);
+  }
+
   protected void setRunner(GraphJobRunner<V, E, M> runner) {
     this.runner = runner;
   }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1556691&r1=1556690&r2=1556691&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java Thu Jan  9 01:10:59
2014
@@ -112,4 +112,19 @@ public interface VertexInterface<V exten
    */
   public M getValue();
 
+  /**
+   * Provides a value to the specified aggregator.
+   * 
+   * @throws IOException
+   * 
+   * @param name identifies a aggregator
+   * @param value value to be aggregated
+   */
+  public void aggregate(String name, M value) throws IOException;
+
+  /**
+   * Returns the value of the specified aggregator.
+   */
+  public Writable getAggregatedValue(String name);
+
 }

Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1556691&r1=1556690&r2=1556691&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Thu Jan 
9 01:10:59 2014
@@ -44,7 +44,7 @@ import org.junit.Before;
 public class TestSubmitGraphJob extends TestBSPMasterGroomServer {
 
   String[] input = new String[] { "stackoverflow.com\tyahoo.com",
-      "facebook.com\ttwitter.com", 
+      "facebook.com\ttwitter.com",
       "facebook.com\tgoogle.com\tnasa.gov",
       "yahoo.com\tnasa.gov\tstackoverflow.com",
       "twitter.com\tgoogle.com\tfacebook.com",
@@ -56,6 +56,7 @@ public class TestSubmitGraphJob extends 
   @SuppressWarnings("rawtypes")
   private static final List<Class<? extends VerticesInfo>> vi = new ArrayList<Class<?
extends VerticesInfo>>();
 
+  @Override
   @Before
   public void setUp() throws Exception {
     super.setUp();
@@ -84,7 +85,7 @@ public class TestSubmitGraphJob extends 
     // set the defaults
     bsp.setMaxIteration(30);
 
-    bsp.setAggregatorClass(AverageAggregator.class);
+    bsp.registerAggregator("avg", AverageAggregator.class);
 
     bsp.setInputFormat(SequenceFileInputFormat.class);
     bsp.setInputKeyClass(Text.class);

Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java?rev=1556691&r1=1556690&r2=1556691&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java Thu Jan  9
01:10:59 2014
@@ -43,7 +43,7 @@ public class PageRank {
 
   public static class PageRankVertex extends
       Vertex<Text, NullWritable, DoubleWritable> {
-    
+
     static double DAMPING_FACTOR = 0.85;
     static double MAXIMUM_CONVERGENCE_ERROR = 0.001;
 
@@ -74,7 +74,7 @@ public class PageRank {
       }
 
       // if we have not reached our global error yet, then proceed.
-      DoubleWritable globalError = getLastAggregatedValue(0);
+      DoubleWritable globalError = (DoubleWritable) getAggregatedValue("avg");
       if (globalError != null && this.getSuperstepCount() > 2
           && MAXIMUM_CONVERGENCE_ERROR > globalError.get()) {
         voteToHalt();
@@ -84,6 +84,8 @@ public class PageRank {
       // in each superstep we are going to send a new rank to our neighbours
       sendMessageToNeighbors(new DoubleWritable(this.getValue().get()
           / this.getEdges().size()));
+
+      this.aggregate("avg", this.getValue());
     }
 
   }
@@ -126,7 +128,7 @@ public class PageRank {
     }
 
     // error
-    pageJob.setAggregatorClass(AverageAggregator.class);
+    pageJob.registerAggregator("avg", AverageAggregator.class);
 
     // Vertex reader
     pageJob.setVertexInputReaderClass(PagerankSeqReader.class);



Mime
View raw message