hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andro...@apache.org
Subject svn commit: r1561622 - in /hama/trunk: CHANGES.txt graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Date Mon, 27 Jan 2014 09:44:59 GMT
Author: andronat
Date: Mon Jan 27 09:44:58 2014
New Revision: 1561622

URL: http://svn.apache.org/r1561622
Log:
HAMA-860

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1561622&r1=1561621&r2=1561622&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Mon Jan 27 09:44:58 2014
@@ -14,6 +14,7 @@ Release 0.7.0 (unreleased changes)
 
   BUG FIXES
 
+	 HAMA-860: Make aggregators start from the first superstep (Anastasis Andronidis)
    HAMA-857: Graph Combiners is wrongly implemented (edwardyoon)
    HAMA-845: The size() of Spilling Queue returns always numMessagesWritten (edwardyoon)
    HAMA-834: Fix KMeans example (Martin Illecker)

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1561622&r1=1561621&r2=1561622&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Mon Jan 27 09:44:58
2014
@@ -45,7 +45,7 @@ import org.apache.hama.util.ReflectionUt
 
 /**
  * Fully generic graph job runner.
- * 
+ *
  * @param <V> the id type of a vertex.
  * @param <E> the value type of an edge.
  * @param <M> the value type of a vertex.
@@ -177,20 +177,8 @@ public final class GraphJobRunner<V exte
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException, SyncException, InterruptedException {
 
-    if (isMasterTask(peer) && iteration == 1) {
-      MapWritable updatedCnt = new MapWritable();
-      updatedCnt.put(
-          FLAG_VERTEX_TOTAL_VERTICES,
-          new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES)
-              .getCounter())));
-      // send the updates from the master tasks back to the slaves
-      for (String peerName : peer.getAllPeerNames()) {
-        peer.send(peerName, new GraphJobMessage(updatedCnt));
-      }
-    }
-
     // this is only done in every second iteration
-    if (isMasterTask(peer) && iteration > 1) {
+    if (isMasterTask(peer)) {
       MapWritable updatedCnt = new MapWritable();
       // send total number of vertices.
       updatedCnt.put(
@@ -208,7 +196,7 @@ public final class GraphJobRunner<V exte
         peer.send(peerName, new GraphJobMessage(updatedCnt));
       }
     }
-    if (getAggregationRunner().isEnabled() && iteration > 1) {
+    if (getAggregationRunner().isEnabled()) {
       // in case we need to sync, we need to replay the messages that already
       // are added to the queue. This prevents loosing messages when using
       // aggregators.
@@ -465,7 +453,7 @@ public final class GraphJobRunner<V exte
 
   /**
    * Add new vertex into memory of each peer.
-   * 
+   *
    * @throws IOException
    */
   private void addVertex(Vertex<V, E, M> vertex) throws IOException {
@@ -483,7 +471,7 @@ public final class GraphJobRunner<V exte
 
   /**
    * Remove vertex from this peer.
-   * 
+   *
    * @throws IOException
    */
   private void removeVertex(V vertexID) {
@@ -494,7 +482,7 @@ public final class GraphJobRunner<V exte
 
   /**
    * After all inserts are done, we must finalize the VertexInfo data structure.
-   * 
+   *
    * @throws IOException
    */
   private void finishAdditions() throws IOException {
@@ -505,7 +493,7 @@ public final class GraphJobRunner<V exte
 
   /**
    * After all inserts are done, we must finalize the VertexInfo data structure.
-   * 
+   *
    * @throws IOException
    */
   private void finishRemovals() throws IOException {
@@ -542,7 +530,7 @@ public final class GraphJobRunner<V exte
   /**
    * Parses the messages in every superstep and does actions according to flags
    * in the messages.
-   * 
+   *
    * @return the first vertex message, null if none received.
    */
   @SuppressWarnings("unchecked")
@@ -647,7 +635,7 @@ public final class GraphJobRunner<V exte
   /**
    * Gets the last aggregated value at the given index. The index is dependend
    * on how the aggregators were configured during job setup phase.
-   * 
+   *
    * @return the value of the aggregator, or null if none was defined.
    */
   public final Writable getLastAggregatedValue(int index) {
@@ -657,7 +645,7 @@ public final class GraphJobRunner<V exte
   /**
    * Gets the last aggregated number of vertices at the given index. The index
    * is dependend on how the aggregators were configured during job setup phase.
-   * 
+   *
    * @return the value of the aggregator, or null if none was defined.
    */
   public final IntWritable getNumLastAggregatedVertices(int index) {



Mime
View raw message