hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xg...@apache.org
Subject hadoop git commit: YARN-5999. AMRMClientAsync will stop if any exceptions thrown on allocate call. Contributed by Jian He
Date Wed, 14 Dec 2016 23:06:43 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 85083567b -> 236dbe348


YARN-5999. AMRMClientAsync will stop if any exceptions thrown on allocate call. Contributed
by Jian He


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/236dbe34
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/236dbe34
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/236dbe34

Branch: refs/heads/branch-2
Commit: 236dbe3485e2bf58da6f244ea789cae18b5f4e2c
Parents: 8508356
Author: Xuan <xgong@apache.org>
Authored: Wed Dec 14 15:05:42 2016 -0800
Committer: Xuan <xgong@apache.org>
Committed: Wed Dec 14 15:05:42 2016 -0800

----------------------------------------------------------------------
 .../api/async/impl/AMRMClientAsyncImpl.java     | 31 ++++++++------------
 .../api/async/impl/TestAMRMClientAsync.java     |  2 +-
 2 files changed, 14 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/236dbe34/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
index ae0ab9d..bba6993 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
@@ -60,14 +60,12 @@ extends AMRMClientAsync<T> {
   private final HeartbeatThread heartbeatThread;
   private final CallbackHandlerThread handlerThread;
 
-  private final BlockingQueue<AllocateResponse> responseQueue;
+  private final BlockingQueue<Object> responseQueue;
   
   private final Object unregisterHeartbeatLock = new Object();
   
   private volatile boolean keepRunning;
   private volatile float progress;
-  
-  private volatile Throwable savedException;
 
   /**
    *
@@ -87,7 +85,6 @@ extends AMRMClientAsync<T> {
     handlerThread = new CallbackHandlerThread();
     responseQueue = new LinkedBlockingQueue<>();
     keepRunning = true;
-    savedException = null;
   }
 
   /**
@@ -108,9 +105,8 @@ extends AMRMClientAsync<T> {
     super(client, intervalMs, callbackHandler);
     heartbeatThread = new HeartbeatThread();
     handlerThread = new CallbackHandlerThread();
-    responseQueue = new LinkedBlockingQueue<AllocateResponse>();
+    responseQueue = new LinkedBlockingQueue<Object>();
     keepRunning = true;
-    savedException = null;
   }
 
   @Override
@@ -262,7 +258,7 @@ extends AMRMClientAsync<T> {
     
     public void run() {
       while (true) {
-        AllocateResponse response = null;
+        Object response = null;
         // synchronization ensures we don't send heartbeats after unregistering
         synchronized (unregisterHeartbeatLock) {
           if (!keepRunning) {
@@ -277,10 +273,7 @@ extends AMRMClientAsync<T> {
             return;
           } catch (Throwable ex) {
             LOG.error("Exception on heartbeat", ex);
-            savedException = ex;
-            // interrupt handler thread in case it waiting on the queue
-            handlerThread.interrupt();
-            return;
+            response = ex;
           }
           if (response != null) {
             while (true) {
@@ -313,18 +306,20 @@ extends AMRMClientAsync<T> {
           return;
         }
         try {
-          AllocateResponse response;
-          if(savedException != null) {
-            LOG.error("Stopping callback due to: ", savedException);
-            handler.onError(savedException);
-            return;
-          }
+          Object object;
           try {
-            response = responseQueue.take();
+            object = responseQueue.take();
           } catch (InterruptedException ex) {
             LOG.info("Interrupted while waiting for queue", ex);
             continue;
           }
+          if (object instanceof Throwable) {
+            progress = handler.getProgress();
+            handler.onError((Throwable) object);
+            continue;
+          }
+
+          AllocateResponse response = (AllocateResponse) object;
           List<NodeReport> updatedNodes = response.getUpdatedNodes();
           if (!updatedNodes.isEmpty()) {
             handler.onNodesUpdated(updatedNodes);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/236dbe34/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
index dac82e4..ba38340 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
@@ -213,7 +213,7 @@ public class TestAMRMClientAsync {
     
     asyncClient.stop();
     // stopping should have joined all threads and completed all callbacks
-    Assert.assertTrue(callbackHandler.callbackCount == 0);
+    Assert.assertTrue(callbackHandler.callbackCount > 0);
   }
 
   @Test (timeout = 10000)


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message