hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1381252 - in /hama/trunk: CHANGES.txt graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Date Wed, 05 Sep 2012 17:12:01 GMT
Author: tjungblut
Date: Wed Sep  5 17:12:01 2012
New Revision: 1381252

URL: http://svn.apache.org/viewvc?rev=1381252&view=rev
Log:
[HAMA-635]: Number of vertices value is inconsistent among tasks by Yuesheng Hu


Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1381252&r1=1381251&r2=1381252&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Wed Sep  5 17:12:01 2012
@@ -6,6 +6,7 @@ Release 0.6 (unreleased changes)
 
   BUG FIXES
 
+   HAMA-635: Number of vertices value is inconsistent among tasks (Yuesheng Hu via tjungblut)
    HAMA-633: Fix CI Failures (tjungblut)
    HAMA-631: Add "commons-httpclient-3.1.jar" (Paul Gyuho Song via edwardyoon)
    HAMA-608: LocalRunner should honor the configured queues (tjungblut)

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1381252&r1=1381251&r2=1381252&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java Wed Sep  5 17:12:01
2012
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -38,6 +39,7 @@ public final class GraphJobMessage imple
   public static final int VERTEX_FLAG = 0x02;
   public static final int REPAIR_FLAG = 0x04;
   public static final int PARTITION_FLAG = 0x08;
+  public static final int VERTICES_SIZE_FLAG = 0x10;
 
   // staticly defined because it is process-wide information, therefore in caps
   // considered as a constant
@@ -52,6 +54,7 @@ public final class GraphJobMessage imple
   private Writable vertexId;
   private Writable vertexValue;
   private Vertex<?, ?, ?> vertex;
+  private IntWritable vertices_size;
 
   public GraphJobMessage() {
   }
@@ -77,6 +80,11 @@ public final class GraphJobMessage imple
     this.vertex = vertex;
   }
 
+  public GraphJobMessage(IntWritable size) {
+    this.flag = VERTICES_SIZE_FLAG;
+    this.vertices_size = size;
+  }
+
   @Override
   public void write(DataOutput out) throws IOException {
     out.writeByte(this.flag);
@@ -108,6 +116,8 @@ public final class GraphJobMessage imple
           out.writeBoolean(false);
         }
       }
+    } else if (isVerticesSizeMessage()) {
+      vertices_size.write(out);
     } else {
       vertexId.write(out);
     }
@@ -153,6 +163,9 @@ public final class GraphJobMessage imple
             new Edge<Writable, Writable>(edgeVertexID, destination, edgeValue));
       }
       this.vertex = vertex;
+    } else if (isVerticesSizeMessage()) {
+      vertices_size = new IntWritable();
+      vertices_size.readFields(in);
     } else {
       vertexId = ReflectionUtils.newInstance(VERTEX_ID_CLASS, null);
       vertexId.readFields(in);
@@ -175,6 +188,10 @@ public final class GraphJobMessage imple
     return vertex;
   }
 
+  public IntWritable getVerticesSize() {
+    return vertices_size;
+  }
+
   public boolean isMapMessage() {
     return flag == MAP_FLAG;
   }
@@ -191,6 +208,10 @@ public final class GraphJobMessage imple
     return flag == PARTITION_FLAG;
   }
 
+  public boolean isVerticesSizeMessage() {
+    return flag == VERTICES_SIZE_FLAG;
+  }
+
   @Override
   public String toString() {
     return "GraphJobMessage [flag=" + flag + ", map=" + map + ", vertexId="

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1381252&r1=1381251&r2=1381252&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Wed Sep  5 17:12:01
2012
@@ -87,7 +87,7 @@ public final class GraphJobRunner<V exte
   private boolean updated = true;
   private int globalUpdateCounts = 0;
 
-  private long numberVertices;
+  private long numberVertices = 0;
   // -1 is deactivated
   private int maxIteration = -1;
   private long iteration;
@@ -169,7 +169,19 @@ public final class GraphJobRunner<V exte
             VertexInputReader.class), conf);
 
     loadVertices(peer, repairNeeded, runtimePartitioning, partitioner, reader);
-    numberVertices = vertices.size() * peer.getNumPeers();
+   
+    for (String peerName : peer.getAllPeerNames()) {
+      peer.send(peerName, new GraphJobMessage(new IntWritable(vertices.size())));
+    }
+    
+    peer.sync();
+
+    GraphJobMessage msg = null;
+    while ((msg = peer.getCurrentMessage()) != null) {
+      if (msg.isVerticesSizeMessage()) {
+        numberVertices += msg.getVerticesSize().get();
+      }
+    }
     // TODO refactor this to a single step
     for (Entry<V, Vertex<V, E, M>> e : vertices.entrySet()) {
       LinkedList<M> msgIterator = new LinkedList<M>();



Mime
View raw message