hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1682802 - in /hama/trunk/graph/src/main/java/org/apache/hama/graph: GraphJobRunner.java MapVerticesInfo.java
Date Sun, 31 May 2015 21:50:59 GMT
Author: edwardyoon
Date: Sun May 31 21:50:59 2015
New Revision: 1682802

URL: http://svn.apache.org/r1682802
Log:
HAMA-959: Change to atomic counter from sync counter in MapVerticesInfo

Modified:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1682802&r1=1682801&r2=1682802&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Sun May 31 21:50:59
2015
@@ -28,6 +28,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -108,6 +109,10 @@ public final class GraphJobRunner<V exte
   private int maxIteration = -1;
   private long iteration = 0;
 
+  // global counter for thread exceptions
+  // TODO find more graceful way to handle thread exceptions.
+  private AtomicInteger errorCount = new AtomicInteger(0);
+  
   private AggregationRunner<V, E, M> aggregationRunner;
   private VertexOutputWriter<Writable, Writable, V, E, M> vertexOutputWriter;
   private Combiner<Writable> combiner;
@@ -116,6 +121,10 @@ public final class GraphJobRunner<V exte
 
   private RejectedExecutionHandler retryHandler = new RetryRejectedExecutionHandler();
 
+  // Below maps are used for grouping messages into single GraphJobMessage, based on vertex
ID.
+  private final ConcurrentHashMap<Integer, GraphJobMessage> partitionMessages = new
ConcurrentHashMap<Integer, GraphJobMessage>();
+  private final ConcurrentHashMap<V, GraphJobMessage> vertexMessages = new ConcurrentHashMap<V,
GraphJobMessage>();
+
   @Override
   public final void setup(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
@@ -242,7 +251,7 @@ public final class GraphJobRunner<V exte
   private void doSuperstep(GraphJobMessage currentMessage,
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
-    this.errorCount = 0;
+    this.errorCount.set(0);
     long startTime = System.currentTimeMillis();
 
     this.changedVertexCnt = 0;
@@ -269,7 +278,7 @@ public final class GraphJobRunner<V exte
       throw new IOException(e);
     }
 
-    if (errorCount > 0) {
+    if (errorCount.get() > 0) {
       throw new IOException("there were " + errorCount
           + " exceptions during compute vertices.");
     }
@@ -305,7 +314,7 @@ public final class GraphJobRunner<V exte
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
     this.changedVertexCnt = 0;
-    this.errorCount = 0;
+    this.errorCount.set(0);
     vertices.startSuperstep();
 
     ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
@@ -324,7 +333,7 @@ public final class GraphJobRunner<V exte
       throw new IOException(e);
     }
 
-    if (errorCount > 0) {
+    if (errorCount.get() > 0) {
       throw new IOException("there were " + errorCount
           + " exceptions during compute vertices.");
     }
@@ -334,10 +343,8 @@ public final class GraphJobRunner<V exte
     finishSuperstep();
   }
 
-  private int errorCount = 0;
-
-  public synchronized void incrementErrorCount() {
-    errorCount++;
+  public void incrementErrorCount() {
+    errorCount.incrementAndGet();
   }
 
   class ComputeRunnable implements Runnable {
@@ -430,8 +437,6 @@ public final class GraphJobRunner<V exte
     EDGE_VALUE_CLASS = edgeValueClass;
   }
 
-  private final ConcurrentHashMap<Integer, GraphJobMessage> messages = new ConcurrentHashMap<Integer,
GraphJobMessage>();
-
   /**
    * Loads vertices into memory of each peer.
    */
@@ -441,7 +446,7 @@ public final class GraphJobRunner<V exte
       throws IOException, SyncException, InterruptedException {
 
     for (int i = 0; i < peer.getNumPeers(); i++) {
-      messages.put(i, new GraphJobMessage());
+      partitionMessages.put(i, new GraphJobMessage());
     }
 
     VertexInputReader<Writable, Writable, V, E, M> reader = (VertexInputReader<Writable,
Writable, V, E, M>) ReflectionUtils
@@ -479,7 +484,7 @@ public final class GraphJobRunner<V exte
     executor.awaitTermination(60, TimeUnit.SECONDS);
 
     Iterator<Entry<Integer, GraphJobMessage>> it;
-    it = messages.entrySet().iterator();
+    it = partitionMessages.entrySet().iterator();
     while (it.hasNext()) {
       Entry<Integer, GraphJobMessage> e = it.next();
       it.remove();
@@ -547,7 +552,7 @@ public final class GraphJobRunner<V exte
         if (peer.getPeerIndex() == partition) {
           addVertex(vertex);
         } else {
-          messages.get(partition).add(WritableUtils.serialize(vertex));
+          partitionMessages.get(partition).add(WritableUtils.serialize(vertex));
         }
       } catch (Exception e) {
         throw new RuntimeException(e);
@@ -690,20 +695,18 @@ public final class GraphJobRunner<V exte
     vertices.finishAdditions();
   }
 
-  private final ConcurrentHashMap<V, GraphJobMessage> storage = new ConcurrentHashMap<V,
GraphJobMessage>();
-
   public void sendMessage(V vertexID, byte[] msg) throws IOException {
-    if (!storage.containsKey(vertexID)) {
+    if (!vertexMessages.containsKey(vertexID)) {
       // To save bit memory we don't set vertexID twice
-      storage.putIfAbsent(vertexID, new GraphJobMessage());
+      vertexMessages.putIfAbsent(vertexID, new GraphJobMessage());
     }
-    storage.get(vertexID).add(msg);
+    vertexMessages.get(vertexID).add(msg);
   }
 
   public void finishSuperstep() throws IOException {
     vertices.finishSuperstep();
 
-    Iterator<Entry<V, GraphJobMessage>> it = storage.entrySet().iterator();
+    Iterator<Entry<V, GraphJobMessage>> it = vertexMessages.entrySet().iterator();
     while (it.hasNext()) {
       Entry<V, GraphJobMessage> e = it.next();
       it.remove();

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java?rev=1682802&r1=1682801&r2=1682802&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java Sun May 31 21:50:59
2015
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -44,7 +45,7 @@ public final class MapVerticesInfo<V ext
 
   private GraphJobRunner<V, E, M> runner;
 
-  private int activeVertices = 0;
+  private AtomicInteger activeVertices = new AtomicInteger(0);
 
   @Override
   public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
@@ -132,8 +133,8 @@ public final class MapVerticesInfo<V ext
     vertices.put(vertex.getVertexID(), WritableUtils.serialize(vertex));
   }
 
-  public synchronized void incrementCount() {
-    activeVertices++;
+  public void incrementCount() {
+    activeVertices.incrementAndGet();
   }
 
   @Override
@@ -150,10 +151,10 @@ public final class MapVerticesInfo<V ext
 
   @Override
   public void finishSuperstep() throws IOException {
-    activeVertices = 0;
+    activeVertices.set(0);
   }
 
   public int getActiveVerticesNum() {
-    return activeVertices;
+    return activeVertices.get();
   }
 }



Mime
View raw message