hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1675208 - /hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Date Tue, 21 Apr 2015 23:20:12 GMT
Author: edwardyoon
Date: Tue Apr 21 23:20:11 2015
New Revision: 1675208

URL: http://svn.apache.org/r1675208
Log:
Add rejectexecutionhandler

Modified:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1675208&r1=1675207&r2=1675208&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Tue Apr 21 23:20:11
2015
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.commons.logging.Log;
@@ -115,6 +116,8 @@ public final class GraphJobRunner<V exte
 
   private BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer;
 
+  private RejectedExecutionHandler retryHandler = new RetryRejectedExecutionHandler();
+  
   @Override
   public final void setup(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
@@ -252,6 +255,7 @@ public final class GraphJobRunner<V exte
     ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
         .newCachedThreadPool();
     executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 1024));
+    executor.setRejectedExecutionHandler(retryHandler);
 
     long loopStartTime = System.currentTimeMillis();
     while (currentMessage != null) {
@@ -302,7 +306,8 @@ public final class GraphJobRunner<V exte
     ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
         .newCachedThreadPool();
     executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 1024));
-
+    executor.setRejectedExecutionHandler(retryHandler);
+    
     for (Vertex<V, E, M> v : vertices.getValues()) {
       Runnable worker = new ComputeRunnable(v);
       executor.execute(worker);
@@ -422,7 +427,8 @@ public final class GraphJobRunner<V exte
     ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
         .newCachedThreadPool();
     executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 1024));
-
+    executor.setRejectedExecutionHandler(retryHandler);
+    
     try {
       KeyValuePair<Writable, Writable> next = null;
       while ((next = peer.readNext()) != null) {
@@ -707,6 +713,20 @@ public final class GraphJobRunner<V exte
     };
   }
 
+  class RetryRejectedExecutionHandler implements RejectedExecutionHandler {
+
+    @Override
+    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+      try {
+        Thread.sleep(10);
+      } catch (InterruptedException e) {
+        LOG.error(e);
+      }
+      executor.execute(r);
+    }
+
+  }
+
   /**
    * @return the destination peer name of the destination of the given directed
    *         edge.



Mime
View raw message