hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jia...@apache.org
Subject svn commit: r1609255 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-client/ hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/ hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/h...
Date Wed, 09 Jul 2014 18:28:01 GMT
Author: jianhe
Date: Wed Jul  9 18:28:00 2014
New Revision: 1609255

URL: http://svn.apache.org/r1609255
Log:
Merge r1609254 from trunk. YARN-1366. Changed AMRMClient to re-register with RM and send outstanding
requests back to RM on work-preserving RM restart. Contributed by Rohith

Added:
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
      - copied unchanged from r1609254, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/core-site.xml
      - copied unchanged from r1609254, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/core-site.xml
Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1609255&r1=1609254&r2=1609255&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Wed Jul  9 18:28:00 2014
@@ -57,6 +57,9 @@ Release 2.5.0 - UNRELEASED
     YARN-1367. Changed NM to not kill containers on NM resync if RM work-preserving
     restart is enabled. (Anubhav Dhoot via jianhe)
 
+    YARN-1366. Changed AMRMClient to re-register with RM and send outstanding requests
+    back to RM on work-preserving RM restart. (Rohith via jianhe)
+
   IMPROVEMENTS
 
     YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml?rev=1609255&r1=1609254&r2=1609255&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
Wed Jul  9 18:28:00 2014
@@ -123,6 +123,12 @@
   		<groupId>org.apache.hadoop</groupId>
   		<artifactId>hadoop-yarn-common</artifactId>
   	</dependency>
+    <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-yarn-common</artifactId>
+        <type>test-jar</type>
+        <scope>test</scope>
+    </dependency>
     <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
     <dependency>
       <groupId>org.apache.hadoop</groupId>

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java?rev=1609255&r1=1609254&r2=1609255&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
Wed Jul  9 18:28:00 2014
@@ -207,14 +207,21 @@ public abstract class AMRMClient<T exten
   
   /**
    * Request additional containers and receive new container allocations.
-   * Requests made via <code>addContainerRequest</code> are sent to the 
-   * <code>ResourceManager</code>. New containers assigned to the master are

-   * retrieved. Status of completed containers and node health updates are 
-   * also retrieved.
-   * This also doubles up as a heartbeat to the ResourceManager and must be 
-   * made periodically.
-   * The call may not always return any new allocations of containers.
-   * App should not make concurrent allocate requests. May cause request loss.
+   * Requests made via <code>addContainerRequest</code> are sent to the
+   * <code>ResourceManager</code>. New containers assigned to the master are
+   * retrieved. Status of completed containers and node health updates are also
+   * retrieved. This also doubles up as a heartbeat to the ResourceManager and
+   * must be made periodically. The call may not always return any new
+   * allocations of containers. App should not make concurrent allocate
+   * requests. May cause request loss.
+   * 
+   * <p>
+   * Note : If the user has not removed container requests that have already
+   * been satisfied, then the re-register may end up sending the entire
+   * container requests to the RM (including matched requests). Which would mean
+   * the RM could end up giving it a lot of new allocated containers.
+   * </p>
+   * 
    * @param progressIndicator Indicates progress made by the master
    * @return the response of the allocate request
    * @throws YarnException

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java?rev=1609255&r1=1609254&r2=1609255&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
Wed Jul  9 18:28:00 2014
@@ -234,8 +234,7 @@ extends AMRMClientAsync<T> {
           while (true) {
             try {
               responseQueue.put(response);
-              if (response.getAMCommand() == AMCommand.AM_RESYNC
-                  || response.getAMCommand() == AMCommand.AM_SHUTDOWN) {
+              if (response.getAMCommand() == AMCommand.AM_SHUTDOWN) {
                 return;
               }
               break;
@@ -280,7 +279,6 @@ extends AMRMClientAsync<T> {
 
           if (response.getAMCommand() != null) {
             switch(response.getAMCommand()) {
-            case AM_RESYNC:
             case AM_SHUTDOWN:
               handler.onShutdownRequest();
               LOG.info("Shutdown requested. Stopping callback.");

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java?rev=1609255&r1=1609254&r2=1609255&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
Wed Jul  9 18:28:00 2014
@@ -47,7 +47,9 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -59,6 +61,7 @@ import org.apache.hadoop.yarn.client.api
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.util.RackResolver;
@@ -77,10 +80,18 @@ public class AMRMClientImpl<T extends Co
   
   private int lastResponseId = 0;
 
+  protected String appHostName;
+  protected int appHostPort;
+  protected String appTrackingUrl;
+
   protected ApplicationMasterProtocol rmClient;
   protected Resource clusterAvailableResources;
   protected int clusterNodeCount;
   
+  // blacklistedNodes is required for keeping history of blacklisted nodes that
+  // are sent to RM. On RESYNC command from RM, blacklistedNodes are used to get
+  // current blacklisted nodes and send back to RM.
+  protected final Set<String> blacklistedNodes = new HashSet<String>();
   protected final Set<String> blacklistAdditions = new HashSet<String>();
   protected final Set<String> blacklistRemovals = new HashSet<String>();
   
@@ -150,6 +161,10 @@ public class AMRMClientImpl<T extends Co
   protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
       new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
   protected final Set<ContainerId> release = new TreeSet<ContainerId>();
+  // pendingRelease holds history or release requests.request is removed only if
+  // RM sends completedContainer.
+  // How it different from release? --> release is for per allocate() request.
+  protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>();
   
   public AMRMClientImpl() {
     super(AMRMClientImpl.class.getName());
@@ -185,19 +200,27 @@ public class AMRMClientImpl<T extends Co
   public RegisterApplicationMasterResponse registerApplicationMaster(
       String appHostName, int appHostPort, String appTrackingUrl)
       throws YarnException, IOException {
+    this.appHostName = appHostName;
+    this.appHostPort = appHostPort;
+    this.appTrackingUrl = appTrackingUrl;
     Preconditions.checkArgument(appHostName != null,
         "The host name should not be null");
     Preconditions.checkArgument(appHostPort >= -1, "Port number of the host"
         + " should be any integers larger than or equal to -1");
-    // do this only once ???
+
+    return registerApplicationMaster();
+  }
+
+  private RegisterApplicationMasterResponse registerApplicationMaster()
+      throws YarnException, IOException {
     RegisterApplicationMasterRequest request =
-        RegisterApplicationMasterRequest.newInstance(appHostName, appHostPort,
-          appTrackingUrl);
+        RegisterApplicationMasterRequest.newInstance(this.appHostName,
+            this.appHostPort, this.appTrackingUrl);
     RegisterApplicationMasterResponse response =
         rmClient.registerApplicationMaster(request);
-
     synchronized (this) {
-      if(!response.getNMTokensFromPreviousAttempts().isEmpty()) {
+      lastResponseId = 0;
+      if (!response.getNMTokensFromPreviousAttempts().isEmpty()) {
         populateNMTokens(response.getNMTokensFromPreviousAttempts());
       }
     }
@@ -249,6 +272,25 @@ public class AMRMClientImpl<T extends Co
       }
 
       allocateResponse = rmClient.allocate(allocateRequest);
+      if (isResyncCommand(allocateResponse)) {
+        LOG.warn("ApplicationMaster is out of sync with ResourceManager,"
+            + " hence resyncing.");
+        synchronized (this) {
+          release.addAll(this.pendingRelease);
+          blacklistAdditions.addAll(this.blacklistedNodes);
+          for (Map<String, TreeMap<Resource, ResourceRequestInfo>> rr : remoteRequestsTable
+              .values()) {
+            for (Map<Resource, ResourceRequestInfo> capabalities : rr.values()) {
+              for (ResourceRequestInfo request : capabalities.values()) {
+                addResourceRequestToAsk(request.remoteRequest);
+              }
+            }
+          }
+        }
+        // re register with RM
+        registerApplicationMaster();
+        return allocate(progressIndicator);
+      }
 
       synchronized (this) {
         // update these on successful RPC
@@ -258,6 +300,11 @@ public class AMRMClientImpl<T extends Co
         if (!allocateResponse.getNMTokens().isEmpty()) {
           populateNMTokens(allocateResponse.getNMTokens());
         }
+        if (!pendingRelease.isEmpty()
+            && !allocateResponse.getCompletedContainersStatuses().isEmpty()) {
+          removePendingReleaseRequests(allocateResponse
+              .getCompletedContainersStatuses());
+        }
       }
     } finally {
       // TODO how to differentiate remote yarn exception vs error in rpc
@@ -288,6 +335,18 @@ public class AMRMClientImpl<T extends Co
     return allocateResponse;
   }
 
+  protected void removePendingReleaseRequests(
+      List<ContainerStatus> completedContainersStatuses) {
+    for (ContainerStatus containerStatus : completedContainersStatuses) {
+      pendingRelease.remove(containerStatus.getContainerId());
+    }
+  }
+
+  private boolean isResyncCommand(AllocateResponse allocateResponse) {
+    return allocateResponse.getAMCommand() != null
+        && allocateResponse.getAMCommand() == AMCommand.AM_RESYNC;
+  }
+
   @Private
   @VisibleForTesting
   protected void populateNMTokens(List<NMToken> nmTokens) {
@@ -324,6 +383,12 @@ public class AMRMClientImpl<T extends Co
     } catch (InterruptedException e) {
       LOG.info("Interrupted while waiting for application"
           + " to be removed from RMStateStore");
+    } catch (ApplicationMasterNotRegisteredException e) {
+      LOG.warn("ApplicationMaster is out of sync with ResourceManager,"
+          + " hence resyncing.");
+      // re register with RM
+      registerApplicationMaster();
+      unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl);
     }
   }
   
@@ -414,6 +479,7 @@ public class AMRMClientImpl<T extends Co
   public synchronized void releaseAssignedContainer(ContainerId containerId) {
     Preconditions.checkArgument(containerId != null,
         "ContainerId can not be null.");
+    pendingRelease.add(containerId);
     release.add(containerId);
   }
   
@@ -655,6 +721,7 @@ public class AMRMClientImpl<T extends Co
     
     if (blacklistAdditions != null) {
       this.blacklistAdditions.addAll(blacklistAdditions);
+      this.blacklistedNodes.addAll(blacklistAdditions);
       // if some resources are also in blacklistRemovals updated before, we 
       // should remove them here.
       this.blacklistRemovals.removeAll(blacklistAdditions);
@@ -662,6 +729,7 @@ public class AMRMClientImpl<T extends Co
     
     if (blacklistRemovals != null) {
       this.blacklistRemovals.addAll(blacklistRemovals);
+      this.blacklistedNodes.removeAll(blacklistRemovals);
       // if some resources are in blacklistAdditions before, we should remove
       // them here.
       this.blacklistAdditions.removeAll(blacklistRemovals);

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java?rev=1609255&r1=1609254&r2=1609255&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
Wed Jul  9 18:28:00 2014
@@ -203,39 +203,6 @@ public class TestAMRMClientAsync {
     Assert.assertTrue(callbackHandler.callbackCount == 0);
   }
 
-  @Test//(timeout=10000)
-  public void testAMRMClientAsyncReboot() throws Exception {
-    Configuration conf = new Configuration();
-    TestCallbackHandler callbackHandler = new TestCallbackHandler();
-    @SuppressWarnings("unchecked")
-    AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
-    
-    final AllocateResponse rebootResponse = createAllocateResponse(
-        new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
-    rebootResponse.setAMCommand(AMCommand.AM_RESYNC);
-    when(client.allocate(anyFloat())).thenReturn(rebootResponse);
-    
-    AMRMClientAsync<ContainerRequest> asyncClient = 
-        AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
-    asyncClient.init(conf);
-    asyncClient.start();
-    
-    synchronized (callbackHandler.notifier) {
-      asyncClient.registerApplicationMaster("localhost", 1234, null);
-      while(callbackHandler.reboot == false) {
-        try {
-          callbackHandler.notifier.wait();
-        } catch (InterruptedException e) {
-          e.printStackTrace();
-        }
-      }
-    }
-    
-    asyncClient.stop();
-    // stopping should have joined all threads and completed all callbacks
-    Assert.assertTrue(callbackHandler.callbackCount == 0);
-  }
-  
   @Test (timeout = 10000)
   public void testAMRMClientAsyncShutDown() throws Exception {
     Configuration conf = new Configuration();



Mime
View raw message