hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andro...@apache.org
Subject svn commit: r1529170 - in /hama/trunk: CHANGES.txt graph/src/main/java/org/apache/hama/graph/AggregationRunner.java graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java graph/src/main/java/org/apache/hama/graph/Vertex.java
Date Fri, 04 Oct 2013 14:29:15 GMT
Author: andronat
Date: Fri Oct  4 14:29:15 2013
New Revision: 1529170

URL: http://svn.apache.org/r1529170
Log:
HAMA-807: Make aggregators skip supersteps

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1529170&r1=1529169&r2=1529170&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Fri Oct  4 14:29:15 2013
@@ -4,16 +4,17 @@ Release 0.6.3 (unreleased changes)
 
   NEW FEATURES
   
+   HAMA-807: Make aggregators skip supersteps (Anastasis Andronidis)
    HAMA-800: Hama Pipes Examples (Martin Illecker)
    HAMA-804: Create NeuralNetwork Example (Yexi Jiang)
    HAMA-795: Implement Autoencoder based on NeuralNetwork (Yexi Jiang)
    HAMA-767: Add vertex addition/removal APIs (Anastasis Andronidis via edwardyoon)
    HAMA-594: Semi-Clustering Algorithm Implementation (Renil Jeseph via edwardyoon)
-   HAMA-801: Snappy fails on Mac OS with JDK 1.7 (Anastasis Andronidis via tommaso)
 
   BUG FIXES
 
    HAMA-805: Problem initializing pipes in HamaStreaming (Martin Illecker)  
+   HAMA-801: Snappy fails on Mac OS with JDK 1.7 (Anastasis Andronidis via tommaso)
    HAMA-789: BspPeer launched fail because port is bound by others (Suraj Menon via edwardyoon)
    HAMA-791: Fix the problem that MultilayerPerceptron fails to learn a good hypothesis sometimes.
(Yexi Jiang)
    HAMA-782: The arguments of DoubleVector.slice(int, int) method will mislead the user.
(Yexi Jiang)

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java?rev=1529170&r1=1529169&r2=1529170&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java Fri Oct  4
14:29:15 2013
@@ -18,6 +18,8 @@
 package org.apache.hama.graph;
 
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
@@ -42,6 +44,7 @@ public final class AggregationRunner<V e
 
   // multiple aggregator arrays
   private Aggregator<M, Vertex<V, E, M>>[] aggregators;
+  private Set<Integer> skipAggregators;
   private Writable[] globalAggregatorResult;
   private IntWritable[] globalAggregatorIncrement;
   private boolean[] isAbstractAggregator;
@@ -60,6 +63,7 @@ public final class AggregationRunner<V e
     this.conf = peer.getConfiguration();
     String aggregatorClasses = peer.getConfiguration().get(
         GraphJob.AGGREGATOR_CLASS_ATTR);
+    this.skipAggregators = new HashSet<Integer>();
     if (aggregatorClasses != null) {
       enabled = true;
       aggregatorClassNames = aggregatorClasses.split(";");
@@ -106,18 +110,22 @@ public final class AggregationRunner<V e
     // also send aggregated values to the master
     if (aggregators != null) {
       for (int i = 0; i < this.aggregators.length; i++) {
-        updatedCnt.put(aggregatorValueFlag[i], aggregators[i].getValue());
-        if (isAbstractAggregator[i]) {
-          updatedCnt.put(aggregatorIncrementFlag[i],
-              ((AbstractAggregator<M, Vertex<V, E, M>>) aggregators[i])
-                  .getTimesAggregated());
+        if (!this.skipAggregators.contains(i)) {
+          updatedCnt.put(aggregatorValueFlag[i], aggregators[i].getValue());
+          if (isAbstractAggregator[i]) {
+            updatedCnt.put(aggregatorIncrementFlag[i],
+                ((AbstractAggregator<M, Vertex<V, E, M>>) aggregators[i])
+                    .getTimesAggregated());
+          }          
         }
       }
       for (int i = 0; i < aggregators.length; i++) {
-        // now create new aggregators for the next iteration
-        aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
-        if (GraphJobRunner.isMasterTask(peer)) {
-          masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
+        if (!this.skipAggregators.contains(i)) {
+          // now create new aggregators for the next iteration
+          aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
+          if (GraphJobRunner.isMasterTask(peer)) {
+            masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
+          }          
         }
       }
     }
@@ -135,12 +143,14 @@ public final class AggregationRunner<V e
   public void aggregateVertex(M lastValue, Vertex<V, E, M> v) {
     if (isEnabled()) {
       for (int i = 0; i < this.aggregators.length; i++) {
-        Aggregator<M, Vertex<V, E, M>> aggregator = this.aggregators[i];
-        aggregator.aggregate(v, v.getValue());
-        if (isAbstractAggregator[i]) {
-          AbstractAggregator<M, Vertex<V, E, M>> intern = (AbstractAggregator<M,
Vertex<V, E, M>>) aggregator;
-          intern.aggregate(v, lastValue, v.getValue());
-          intern.aggregateInternal();
+        if (!this.skipAggregators.contains(i)) {
+          Aggregator<M, Vertex<V, E, M>> aggregator = this.aggregators[i];
+          aggregator.aggregate(v, v.getValue());
+          if (isAbstractAggregator[i]) {
+            AbstractAggregator<M, Vertex<V, E, M>> intern = (AbstractAggregator<M,
Vertex<V, E, M>>) aggregator;
+            intern.aggregate(v, lastValue, v.getValue());
+            intern.aggregateInternal();
+          }          
         }
       }
     }
@@ -154,19 +164,21 @@ public final class AggregationRunner<V e
     if (isEnabled()) {
       // work through the master aggregators
       for (int i = 0; i < masterAggregator.length; i++) {
-        Writable lastAggregatedValue = masterAggregator[i].getValue();
-        if (isAbstractAggregator[i]) {
-          final AbstractAggregator<M, Vertex<V, E, M>> intern = ((AbstractAggregator<M,
Vertex<V, E, M>>) masterAggregator[i]);
-          final Writable finalizeAggregation = intern.finalizeAggregation();
-          if (intern.finalizeAggregation() != null) {
-            lastAggregatedValue = finalizeAggregation;
+        if (!this.skipAggregators.contains(i)) {
+          Writable lastAggregatedValue = masterAggregator[i].getValue();
+          if (isAbstractAggregator[i]) {
+            final AbstractAggregator<M, Vertex<V, E, M>> intern = ((AbstractAggregator<M,
Vertex<V, E, M>>) masterAggregator[i]);
+            final Writable finalizeAggregation = intern.finalizeAggregation();
+            if (intern.finalizeAggregation() != null) {
+              lastAggregatedValue = finalizeAggregation;
+            }
+            // this count is usually the times of active
+            // vertices in the graph
+            updatedCnt.put(aggregatorIncrementFlag[i],
+                intern.getTimesAggregated());
           }
-          // this count is usually the times of active
-          // vertices in the graph
-          updatedCnt.put(aggregatorIncrementFlag[i],
-              intern.getTimesAggregated());
+          updatedCnt.put(aggregatorValueFlag[i], lastAggregatedValue);   
         }
-        updatedCnt.put(aggregatorValueFlag[i], lastAggregatedValue);
       }
     }
   }
@@ -181,9 +193,9 @@ public final class AggregationRunner<V e
       long iteration) throws IOException, SyncException, InterruptedException {
     // map is the first value that is in the queue
     for (int i = 0; i < aggregators.length; i++) {
-      globalAggregatorResult[i] = updatedValues.get(aggregatorValueFlag[i]);
-      globalAggregatorIncrement[i] = (IntWritable) updatedValues
-          .get(aggregatorIncrementFlag[i]);
+        globalAggregatorResult[i] = updatedValues.get(aggregatorValueFlag[i]);
+        globalAggregatorIncrement[i] = (IntWritable) updatedValues
+            .get(aggregatorIncrementFlag[i]);
     }
     IntWritable count = (IntWritable) updatedValues
         .get(GraphJobRunner.FLAG_MESSAGE_COUNTS);
@@ -221,6 +233,22 @@ public final class AggregationRunner<V e
     }
   }
 
+  /**
+   * This method adds an id of an aggregator that will be skipped in the current
+   * superstep.
+   */
+  public void addSkipAggregator(int index) {
+    this.skipAggregators.add(index);
+  }
+
+  /**
+   * This method adds an id of an aggregator that will be skipped in the current
+   * superstep.
+   */
+  void resetSkipAggregators() {
+    this.skipAggregators.clear();
+  }
+
   @SuppressWarnings("unchecked")
   private Aggregator<M, Vertex<V, E, M>> getNewAggregator(String clsName) {
     try {

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1529170&r1=1529169&r2=1529170&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Fri Oct  4 14:29:15
2013
@@ -68,11 +68,13 @@ public final class GraphJobRunner<V exte
   public static final String S_FLAG_VERTEX_DECREASE = "hama.4";
   public static final String S_FLAG_VERTEX_ALTER_COUNTER = "hama.5";
   public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.6";
+  public static final String S_FLAG_AGGREGATOR_SKIP = "hama.7";
   public static final Text FLAG_MESSAGE_COUNTS = new Text(S_FLAG_MESSAGE_COUNTS);
   public static final Text FLAG_VERTEX_INCREASE = new Text(S_FLAG_VERTEX_INCREASE);
   public static final Text FLAG_VERTEX_DECREASE = new Text(S_FLAG_VERTEX_DECREASE);
   public static final Text FLAG_VERTEX_ALTER_COUNTER = new Text(S_FLAG_VERTEX_ALTER_COUNTER);
   public static final Text FLAG_VERTEX_TOTAL_VERTICES = new Text(S_FLAG_VERTEX_TOTAL_VERTICES);
+  public static final Text FLAG_AGGREGATOR_SKIP = new Text(S_FLAG_AGGREGATOR_SKIP);
 
   public static final String MESSAGE_COMBINER_CLASS_KEY = "hama.vertex.message.combiner.class";
   public static final String VERTEX_CLASS_KEY = "hama.graph.vertex.class";
@@ -193,14 +195,14 @@ public final class GraphJobRunner<V exte
       if (globalUpdateCounts == 0) {
         updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(Integer.MIN_VALUE));
       } else {
-        aggregationRunner.doMasterAggregation(updatedCnt);
+        getAggregationRunner().doMasterAggregation(updatedCnt);
       }
       // send the updates from the master tasks back to the slaves
       for (String peerName : peer.getAllPeerNames()) {
         peer.send(peerName, new GraphJobMessage(updatedCnt));
       }
     }
-    if (aggregationRunner.isEnabled() && iteration > 1) {
+    if (getAggregationRunner().isEnabled() && iteration > 1) {
       // in case we need to sync, we need to replay the messages that already
       // are added to the queue. This prevents loosing messages when using
       // aggregators.
@@ -214,11 +216,12 @@ public final class GraphJobRunner<V exte
       // now sync
       peer.sync();
       // now the map message must be read that might be send from the master
-      updated = aggregationRunner.receiveAggregatedValues(peer
+      updated = getAggregationRunner().receiveAggregatedValues(peer
           .getCurrentMessage().getMap(), iteration);
       // set the first vertex message back to the message it had before sync
       firstVertexMessage = peer.getCurrentMessage();
     }
+    this.aggregationRunner.resetSkipAggregators();
     return firstVertexMessage;
   }
 
@@ -267,7 +270,7 @@ public final class GraphJobRunner<V exte
           }
           currentMessage = iterable.getOverflowMessage();
         }
-        aggregationRunner.aggregateVertex(lastValue, vertex);
+        getAggregationRunner().aggregateVertex(lastValue, vertex);
         activeVertices++;
       }
 
@@ -277,7 +280,7 @@ public final class GraphJobRunner<V exte
     }
     vertices.finishSuperstep();
 
-    aggregationRunner.sendAggregatorValues(peer, activeVertices, this.changedVertexCnt);
+    getAggregationRunner().sendAggregatorValues(peer, activeVertices, this.changedVertexCnt);
     iteration++;
   }
 
@@ -338,11 +341,11 @@ public final class GraphJobRunner<V exte
       Vertex<V, E, M> vertex = skippingIterator.next();
       M lastValue = vertex.getValue();
       vertex.compute(Collections.singleton(vertex.getValue()));
-      aggregationRunner.aggregateVertex(lastValue, vertex);
+      getAggregationRunner().aggregateVertex(lastValue, vertex);
       vertices.finishVertexComputation(vertex);
     }
     vertices.finishSuperstep();
-    aggregationRunner.sendAggregatorValues(peer, 1, this.changedVertexCnt);
+    getAggregationRunner().sendAggregatorValues(peer, 1, this.changedVertexCnt);
     iteration++;
   }
 
@@ -376,8 +379,8 @@ public final class GraphJobRunner<V exte
     vertexOutputWriter = (VertexOutputWriter<Writable, Writable, V, E, M>) ReflectionUtils
         .newInstance(outputWriter);
 
-    aggregationRunner = new AggregationRunner<V, E, M>();
-    aggregationRunner.setupAggregators(peer);
+    setAggregationRunner(new AggregationRunner<V, E, M>());
+    getAggregationRunner().setupAggregators(peer);
 
     Class<? extends VerticesInfo<V, E, M>> verticesInfoClass = (Class<? extends
VerticesInfo<V, E, M>>) conf.getClass("hama.graph.vertices.info", ListVerticesInfo.class,
VerticesInfo.class);
     vertices = ReflectionUtils.newInstance(verticesInfoClass);
@@ -549,13 +552,13 @@ public final class GraphJobRunner<V exte
             } else {
               globalUpdateCounts += ((IntWritable) e.getValue()).get();
             }
-          } else if (aggregationRunner.isEnabled()
+          } else if (getAggregationRunner().isEnabled()
               && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) {
-            aggregationRunner.masterReadAggregatedValue(vertexID,
+            getAggregationRunner().masterReadAggregatedValue(vertexID,
                 (M) e.getValue());
-          } else if (aggregationRunner.isEnabled()
+          } else if (getAggregationRunner().isEnabled()
               && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) {
-            aggregationRunner.masterReadAggregatedIncrementalValue(vertexID,
+            getAggregationRunner().masterReadAggregatedIncrementalValue(vertexID,
                 (M) e.getValue());
           } else if (FLAG_VERTEX_INCREASE.equals(vertexID)) {
             dynamicAdditions = true;
@@ -571,6 +574,12 @@ public final class GraphJobRunner<V exte
             } else {
               throw new UnsupportedOperationException("A message to increase vertex count
is in a wrong place: " + peer);
             }
+          } else if (FLAG_AGGREGATOR_SKIP.equals(vertexID)) {
+            if (isMasterTask(peer)) {
+              this.getAggregationRunner().addSkipAggregator(((IntWritable) e.getValue()).get());
+            } else {
+              throw new UnsupportedOperationException("A message to skip aggregators is in
a wrong peer: " + peer);
+            }
           }
         }
 
@@ -626,7 +635,7 @@ public final class GraphJobRunner<V exte
    * @return the value of the aggregator, or null if none was defined.
    */
   public final Writable getLastAggregatedValue(int index) {
-    return aggregationRunner.getLastAggregatedValue(index);
+    return getAggregationRunner().getLastAggregatedValue(index);
   }
 
   /**
@@ -636,7 +645,7 @@ public final class GraphJobRunner<V exte
    * @return the value of the aggregator, or null if none was defined.
    */
   public final IntWritable getNumLastAggregatedVertices(int index) {
-    return aggregationRunner.getNumLastAggregatedVertices(index);
+    return getAggregationRunner().getNumLastAggregatedVertices(index);
   }
 
   /**
@@ -707,4 +716,18 @@ public final class GraphJobRunner<V exte
     this.changedVertexCnt = changedVertexCnt;
   }
 
+  /**
+   * @return the aggregationRunner
+   */
+  AggregationRunner<V, E, M> getAggregationRunner() {
+    return aggregationRunner;
+  }
+
+  /**
+   * @param aggregationRunner the aggregationRunner to set
+   */
+  void setAggregationRunner(AggregationRunner<V, E, M> aggregationRunner) {
+    this.aggregationRunner = aggregationRunner;
+  }
+
 }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1529170&r1=1529169&r2=1529170&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Fri Oct  4 14:29:15 2013
@@ -228,7 +228,7 @@ public abstract class Vertex<V extends W
    * @return the configured partitioner instance to message vertices.
    */
   public Partitioner<V, M> getPartitioner() {
-    return (Partitioner<V, M>) runner.getPartitioner();
+    return runner.getPartitioner();
   }
 
   @Override
@@ -241,6 +241,21 @@ public abstract class Vertex<V extends W
     this.votedToHalt = true;
   }
 
+  /**
+   * Disable an aggregator for the next superstep. The returning value of 
+   * the aggregator will be null.
+   */
+  public void skipAggregator(int index) throws IOException {
+    MapWritable msg = new MapWritable();
+    msg.put(GraphJobRunner.FLAG_AGGREGATOR_SKIP, new IntWritable(index));
+    
+    this.runner.getAggregationRunner().addSkipAggregator(index);
+    
+    // Get master task peer.
+    String destPeer = GraphJobRunner.getMasterTask(this.getPeer());
+    runner.getPeer().send(destPeer, new GraphJobMessage(msg));
+  }
+
   void setActive() {
     this.votedToHalt = false;
   }



Mime
View raw message