hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1560606 - in /hama/trunk/graph/src/main/java/org/apache/hama/graph: AggregationRunner.java GraphJobRunner.java
Date Thu, 23 Jan 2014 08:31:03 GMT
Author: edwardyoon
Date: Thu Jan 23 08:31:02 2014
New Revision: 1560606

URL: http://svn.apache.org/r1560606
Log:
Remove skipiterator feature

Modified:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java?rev=1560606&r1=1560605&r2=1560606&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java Thu Jan 23
08:31:02 2014
@@ -18,8 +18,6 @@
 package org.apache.hama.graph;
 
 import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
@@ -44,7 +42,6 @@ public final class AggregationRunner<V e
 
   // multiple aggregator arrays
   private Aggregator<M, Vertex<V, E, M>>[] aggregators;
-  private Set<Integer> skipAggregators;
   private Writable[] globalAggregatorResult;
   private IntWritable[] globalAggregatorIncrement;
   private boolean[] isAbstractAggregator;
@@ -63,7 +60,6 @@ public final class AggregationRunner<V e
     this.conf = peer.getConfiguration();
     String aggregatorClasses = peer.getConfiguration().get(
         GraphJob.AGGREGATOR_CLASS_ATTR);
-    this.skipAggregators = new HashSet<Integer>();
     if (aggregatorClasses != null) {
       enabled = true;
       aggregatorClassNames = aggregatorClasses.split(";");
@@ -95,7 +91,8 @@ public final class AggregationRunner<V e
 
   /**
    * Runs the aggregators by sending their values to the master task.
-   * @param changedVertexCnt 
+   * 
+   * @param changedVertexCnt
    */
   public void sendAggregatorValues(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
@@ -110,22 +107,18 @@ public final class AggregationRunner<V e
     // also send aggregated values to the master
     if (aggregators != null) {
       for (int i = 0; i < this.aggregators.length; i++) {
-        if (!this.skipAggregators.contains(i)) {
-          updatedCnt.put(aggregatorValueFlag[i], aggregators[i].getValue());
-          if (isAbstractAggregator[i]) {
-            updatedCnt.put(aggregatorIncrementFlag[i],
-                ((AbstractAggregator<M, Vertex<V, E, M>>) aggregators[i])
-                    .getTimesAggregated());
-          }          
+        updatedCnt.put(aggregatorValueFlag[i], aggregators[i].getValue());
+        if (isAbstractAggregator[i]) {
+          updatedCnt.put(aggregatorIncrementFlag[i],
+              ((AbstractAggregator<M, Vertex<V, E, M>>) aggregators[i])
+                  .getTimesAggregated());
         }
       }
       for (int i = 0; i < aggregators.length; i++) {
-        if (!this.skipAggregators.contains(i)) {
-          // now create new aggregators for the next iteration
-          aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
-          if (GraphJobRunner.isMasterTask(peer)) {
-            masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
-          }          
+        // now create new aggregators for the next iteration
+        aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
+        if (GraphJobRunner.isMasterTask(peer)) {
+          masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
         }
       }
     }
@@ -143,14 +136,12 @@ public final class AggregationRunner<V e
   public void aggregateVertex(M lastValue, Vertex<V, E, M> v) {
     if (isEnabled()) {
       for (int i = 0; i < this.aggregators.length; i++) {
-        if (!this.skipAggregators.contains(i)) {
-          Aggregator<M, Vertex<V, E, M>> aggregator = this.aggregators[i];
-          aggregator.aggregate(v, v.getValue());
-          if (isAbstractAggregator[i]) {
-            AbstractAggregator<M, Vertex<V, E, M>> intern = (AbstractAggregator<M,
Vertex<V, E, M>>) aggregator;
-            intern.aggregate(v, lastValue, v.getValue());
-            intern.aggregateInternal();
-          }          
+        Aggregator<M, Vertex<V, E, M>> aggregator = this.aggregators[i];
+        aggregator.aggregate(v, v.getValue());
+        if (isAbstractAggregator[i]) {
+          AbstractAggregator<M, Vertex<V, E, M>> intern = (AbstractAggregator<M,
Vertex<V, E, M>>) aggregator;
+          intern.aggregate(v, lastValue, v.getValue());
+          intern.aggregateInternal();
         }
       }
     }
@@ -164,21 +155,19 @@ public final class AggregationRunner<V e
     if (isEnabled()) {
       // work through the master aggregators
       for (int i = 0; i < masterAggregator.length; i++) {
-        if (!this.skipAggregators.contains(i)) {
-          Writable lastAggregatedValue = masterAggregator[i].getValue();
-          if (isAbstractAggregator[i]) {
-            final AbstractAggregator<M, Vertex<V, E, M>> intern = ((AbstractAggregator<M,
Vertex<V, E, M>>) masterAggregator[i]);
-            final Writable finalizeAggregation = intern.finalizeAggregation();
-            if (intern.finalizeAggregation() != null) {
-              lastAggregatedValue = finalizeAggregation;
-            }
-            // this count is usually the times of active
-            // vertices in the graph
-            updatedCnt.put(aggregatorIncrementFlag[i],
-                intern.getTimesAggregated());
+        Writable lastAggregatedValue = masterAggregator[i].getValue();
+        if (isAbstractAggregator[i]) {
+          final AbstractAggregator<M, Vertex<V, E, M>> intern = ((AbstractAggregator<M,
Vertex<V, E, M>>) masterAggregator[i]);
+          final Writable finalizeAggregation = intern.finalizeAggregation();
+          if (intern.finalizeAggregation() != null) {
+            lastAggregatedValue = finalizeAggregation;
           }
-          updatedCnt.put(aggregatorValueFlag[i], lastAggregatedValue);   
+          // this count is usually the times of active
+          // vertices in the graph
+          updatedCnt.put(aggregatorIncrementFlag[i],
+              intern.getTimesAggregated());
         }
+        updatedCnt.put(aggregatorValueFlag[i], lastAggregatedValue);
       }
     }
   }
@@ -193,9 +182,9 @@ public final class AggregationRunner<V e
       long iteration) throws IOException, SyncException, InterruptedException {
     // map is the first value that is in the queue
     for (int i = 0; i < aggregators.length; i++) {
-        globalAggregatorResult[i] = updatedValues.get(aggregatorValueFlag[i]);
-        globalAggregatorIncrement[i] = (IntWritable) updatedValues
-            .get(aggregatorIncrementFlag[i]);
+      globalAggregatorResult[i] = updatedValues.get(aggregatorValueFlag[i]);
+      globalAggregatorIncrement[i] = (IntWritable) updatedValues
+          .get(aggregatorIncrementFlag[i]);
     }
     IntWritable count = (IntWritable) updatedValues
         .get(GraphJobRunner.FLAG_MESSAGE_COUNTS);
@@ -233,22 +222,6 @@ public final class AggregationRunner<V e
     }
   }
 
-  /**
-   * This method adds an id of an aggregator that will be skipped in the current
-   * superstep.
-   */
-  public void addSkipAggregator(int index) {
-    this.skipAggregators.add(index);
-  }
-
-  /**
-   * This method adds an id of an aggregator that will be skipped in the current
-   * superstep.
-   */
-  void resetSkipAggregators() {
-    this.skipAggregators.clear();
-  }
-
   @SuppressWarnings("unchecked")
   private Aggregator<M, Vertex<V, E, M>> getNewAggregator(String clsName) {
     try {
@@ -270,4 +243,4 @@ public final class AggregationRunner<V e
     return globalAggregatorIncrement[Preconditions.checkPositionIndex(index,
         globalAggregatorIncrement.length)];
   }
-}
\ No newline at end of file
+}

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1560606&r1=1560605&r2=1560606&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Thu Jan 23 08:31:02
2014
@@ -68,7 +68,6 @@ public final class GraphJobRunner<V exte
   public static final String S_FLAG_VERTEX_DECREASE = "hama.4";
   public static final String S_FLAG_VERTEX_ALTER_COUNTER = "hama.5";
   public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.6";
-  public static final String S_FLAG_AGGREGATOR_SKIP = "hama.7";
   public static final Text FLAG_MESSAGE_COUNTS = new Text(S_FLAG_MESSAGE_COUNTS);
   public static final Text FLAG_VERTEX_INCREASE = new Text(
       S_FLAG_VERTEX_INCREASE);
@@ -229,7 +228,6 @@ public final class GraphJobRunner<V exte
       // set the first vertex message back to the message it had before sync
       firstVertexMessage = peer.getCurrentMessage();
     }
-    this.aggregationRunner.resetSkipAggregators();
     return firstVertexMessage;
   }
 
@@ -742,7 +740,7 @@ public final class GraphJobRunner<V exte
   /**
    * @return the aggregationRunner
    */
-  AggregationRunner<V, E, M> getAggregationRunner() {
+  public AggregationRunner<V, E, M> getAggregationRunner() {
     return aggregationRunner;
   }
 



Mime
View raw message