Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 031F01890B for ; Thu, 12 Nov 2015 18:22:28 +0000 (UTC) Received: (qmail 50052 invoked by uid 500); 12 Nov 2015 18:22:17 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 49654 invoked by uid 500); 12 Nov 2015 18:22:17 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 44845 invoked by uid 99); 12 Nov 2015 18:22:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Nov 2015 18:22:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9B5F9E5E2F; Thu, 12 Nov 2015 18:22:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wheat9@apache.org To: common-commits@hadoop.apache.org Date: Thu, 12 Nov 2015 18:22:51 -0000 Message-Id: <1ff6d503d25741b78d8abc91d19f4b24@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [39/50] [abbrv] hadoop git commit: YARN-1510. Make NMClient support change container resources. (Meng Ding via wangda) YARN-1510. Make NMClient support change container resources. (Meng Ding via wangda) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c99925d6 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c99925d6 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c99925d6 Branch: refs/heads/HDFS-8707 Commit: c99925d6dd0235f0d27536f0bebd129e435688fb Parents: 493e8ae Author: Wangda Tan Authored: Tue Nov 10 11:45:46 2015 -0800 Committer: Wangda Tan Committed: Tue Nov 10 11:45:46 2015 -0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../distributedshell/ApplicationMaster.java | 12 +- .../apache/hadoop/yarn/client/api/NMClient.java | 16 ++ .../yarn/client/api/async/NMClientAsync.java | 154 ++++++++++++++++++- .../api/async/impl/NMClientAsyncImpl.java | 127 ++++++++++++++- .../yarn/client/api/impl/NMClientImpl.java | 35 ++++- .../api/async/impl/TestNMClientAsync.java | 93 ++++++++++- .../yarn/client/api/impl/TestNMClient.java | 31 +++- 8 files changed, 452 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c99925d6/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index edb1026..bad9757 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -244,6 +244,9 @@ Release 2.8.0 - UNRELEASED YARN-2729. Support script based NodeLabelsProvider Interface in Distributed Node Label Configuration Setup. (Naganarasimha G R via rohithsharmaks) + YARN-1510. Make NMClient support change container resources. + (Meng Ding via wangda) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before http://git-wip-us.apache.org/repos/asf/hadoop/blob/c99925d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 5d2d6c2..c1b9643 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -858,8 +858,7 @@ public class ApplicationMaster { } @VisibleForTesting - static class NMCallbackHandler - implements NMClientAsync.CallbackHandler { + static class NMCallbackHandler extends NMClientAsync.AbstractCallbackHandler { private ConcurrentMap containers = new ConcurrentHashMap(); @@ -908,6 +907,10 @@ public class ApplicationMaster { } @Override + public void onContainerResourceIncreased( + ContainerId containerId, Resource resource) {} + + @Override public void onStartContainerError(ContainerId containerId, Throwable t) { LOG.error("Failed to start Container " + containerId); containers.remove(containerId); @@ -926,6 +929,11 @@ public class ApplicationMaster { LOG.error("Failed to stop Container " + containerId); containers.remove(containerId); } + + @Override + public void onIncreaseContainerResourceError( + ContainerId containerId, Throwable t) {} + } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/c99925d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java index 08b911b..47270f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java @@ -87,6 +87,22 @@ public abstract class NMClient extends AbstractService { throws YarnException, IOException; /** + *

Increase the resource of a container.

+ * + *

The ApplicationMaster or other applications that use the + * client must provide the details of the container, including the Id and + * the target resource encapsulated in the updated container token via + * {@link Container}. + *

+ * + * @param container the container with updated token + * @throws YarnException + * @throws IOException + */ + public abstract void increaseContainerResource(Container container) + throws YarnException, IOException; + + /** *

Stop an started container.

* * @param containerId the Id of the started container http://git-wip-us.apache.org/repos/asf/hadoop/blob/c99925d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java index 5cb504d..8e90564 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; @@ -51,11 +52,16 @@ import com.google.common.annotations.VisibleForTesting; * *
  * {@code
- * class MyCallbackHandler implements NMClientAsync.CallbackHandler {
+ * class MyCallbackHandler extends NMClientAsync.AbstractCallbackHandler {
  *   public void onContainerStarted(ContainerId containerId,
  *       Map allServiceResponse) {
  *     [post process after the container is started, process the response]
  *   }
+
+ *   public void onContainerResourceIncreased(ContainerId containerId,
+ *       Resource resource) {
+ *     [post process after the container resource is increased]
+ *   }
  *
  *   public void onContainerStatusReceived(ContainerId containerId,
  *       ContainerStatus containerStatus) {
@@ -112,20 +118,57 @@ public abstract class NMClientAsync extends AbstractService {
   protected CallbackHandler callbackHandler;
 
   public static NMClientAsync createNMClientAsync(
+      AbstractCallbackHandler callbackHandler) {
+    return new NMClientAsyncImpl(callbackHandler);
+  }
+
+  protected NMClientAsync(AbstractCallbackHandler callbackHandler) {
+    this (NMClientAsync.class.getName(), callbackHandler);
+  }
+
+  protected NMClientAsync(
+      String name, AbstractCallbackHandler callbackHandler) {
+    this (name, new NMClientImpl(), callbackHandler);
+  }
+
+  protected NMClientAsync(String name, NMClient client,
+      AbstractCallbackHandler callbackHandler) {
+    super(name);
+    this.setClient(client);
+    this.setCallbackHandler(callbackHandler);
+  }
+
+  /**
+   * @deprecated Use {@link #createNMClientAsync(AbstractCallbackHandler)}
+   *             instead.
+   */
+  @Deprecated
+  public static NMClientAsync createNMClientAsync(
       CallbackHandler callbackHandler) {
     return new NMClientAsyncImpl(callbackHandler);
   }
-  
+
+  /**
+   * @deprecated Use {@link #NMClientAsync(AbstractCallbackHandler)}
+   *             instead.
+   */
+  @Deprecated
   protected NMClientAsync(CallbackHandler callbackHandler) {
     this (NMClientAsync.class.getName(), callbackHandler);
   }
 
+  /**
+   * @deprecated Use {@link #NMClientAsync(String, AbstractCallbackHandler)}
+   *             instead.
+   */
+  @Deprecated
   protected NMClientAsync(String name, CallbackHandler callbackHandler) {
     this (name, new NMClientImpl(), callbackHandler);
   }
 
   @Private
   @VisibleForTesting
+  @Deprecated
   protected NMClientAsync(String name, NMClient client,
       CallbackHandler callbackHandler) {
     super(name);
@@ -136,6 +179,8 @@ public abstract class NMClientAsync extends AbstractService {
   public abstract void startContainerAsync(
       Container container, ContainerLaunchContext containerLaunchContext);
 
+  public abstract void increaseContainerResourceAsync(Container container);
+
   public abstract void stopContainerAsync(
       ContainerId containerId, NodeId nodeId);
 
@@ -160,6 +205,110 @@ public abstract class NMClientAsync extends AbstractService {
 
   /**
    * 

+ * The callback abstract class. The callback functions need to be implemented + * by {@link NMClientAsync} users. The APIs are called when responses from + * NodeManager are available. + *

+ * + *

+ * Once a callback happens, the users can chose to act on it in blocking or + * non-blocking manner. If the action on callback is done in a blocking + * manner, some of the threads performing requests on NodeManagers may get + * blocked depending on how many threads in the pool are busy. + *

+ * + *

+ * The implementation of the callback functions should not throw the + * unexpected exception. Otherwise, {@link NMClientAsync} will just + * catch, log and then ignore it. + *

+ */ + public abstract static class AbstractCallbackHandler + implements CallbackHandler { + /** + * The API is called when NodeManager responds to indicate its + * acceptance of the starting container request. + * + * @param containerId the Id of the container + * @param allServiceResponse a Map between the auxiliary service names and + * their outputs + */ + public abstract void onContainerStarted(ContainerId containerId, + Map allServiceResponse); + + /** + * The API is called when NodeManager responds with the status + * of the container. + * + * @param containerId the Id of the container + * @param containerStatus the status of the container + */ + public abstract void onContainerStatusReceived(ContainerId containerId, + ContainerStatus containerStatus); + + /** + * The API is called when NodeManager responds to indicate the + * container is stopped. + * + * @param containerId the Id of the container + */ + public abstract void onContainerStopped(ContainerId containerId); + + /** + * The API is called when an exception is raised in the process of + * starting a container. + * + * @param containerId the Id of the container + * @param t the raised exception + */ + public abstract void onStartContainerError( + ContainerId containerId, Throwable t); + + /** + * The API is called when NodeManager responds to indicate + * the container resource has been successfully increased. + * + * @param containerId the Id of the container + * @param resource the target resource of the container + */ + public abstract void onContainerResourceIncreased( + ContainerId containerId, Resource resource); + + /** + * The API is called when an exception is raised in the process of + * querying the status of a container. + * + * @param containerId the Id of the container + * @param t the raised exception + */ + public abstract void onGetContainerStatusError( + ContainerId containerId, Throwable t); + + /** + * The API is called when an exception is raised in the process of + * increasing container resource. + * + * @param containerId the Id of the container + * @param t the raised exception + */ + public abstract void onIncreaseContainerResourceError( + ContainerId containerId, Throwable t); + + /** + * The API is called when an exception is raised in the process of + * stopping a container. + * + * @param containerId the Id of the container + * @param t the raised exception + */ + public abstract void onStopContainerError( + ContainerId containerId, Throwable t); + } + + /** + * @deprecated Use {@link NMClientAsync.AbstractCallbackHandler} instead. + * + *

* The callback interface needs to be implemented by {@link NMClientAsync} * users. The APIs are called when responses from NodeManager are * available. @@ -178,6 +327,7 @@ public abstract class NMClientAsync extends AbstractService { * catch, log and then ignore it. *

*/ + @Deprecated public static interface CallbackHandler { /** * The API is called when NodeManager responds to indicate its http://git-wip-us.apache.org/repos/asf/hadoop/blob/c99925d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java index 39682df..575ce13 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java @@ -82,16 +82,46 @@ public class NMClientAsyncImpl extends NMClientAsync { protected ConcurrentMap containers = new ConcurrentHashMap(); + public NMClientAsyncImpl(AbstractCallbackHandler callbackHandler) { + this(NMClientAsync.class.getName(), callbackHandler); + } + + public NMClientAsyncImpl( + String name, AbstractCallbackHandler callbackHandler) { + this(name, new NMClientImpl(), callbackHandler); + } + + @Private + @VisibleForTesting + protected NMClientAsyncImpl(String name, NMClient client, + AbstractCallbackHandler callbackHandler) { + super(name, client, callbackHandler); + this.client = client; + this.callbackHandler = callbackHandler; + } + + /** + * @deprecated Use {@link + * #NMClientAsyncImpl(NMClientAsync.AbstractCallbackHandler)} + * instead. + */ + @Deprecated public NMClientAsyncImpl(CallbackHandler callbackHandler) { this(NMClientAsync.class.getName(), callbackHandler); } + /** + * @deprecated Use {@link #NMClientAsyncImpl(String, + * NMClientAsync.AbstractCallbackHandler)} instead. + */ + @Deprecated public NMClientAsyncImpl(String name, CallbackHandler callbackHandler) { this(name, new NMClientImpl(), callbackHandler); } @Private @VisibleForTesting + @Deprecated protected NMClientAsyncImpl(String name, NMClient client, CallbackHandler callbackHandler) { super(name, client, callbackHandler); @@ -229,6 +259,29 @@ public class NMClientAsyncImpl extends NMClientAsync { } } + public void increaseContainerResourceAsync(Container container) { + if (!(callbackHandler instanceof AbstractCallbackHandler)) { + LOG.error("Callback handler does not implement container resource " + + "increase callback methods"); + return; + } + AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler; + if (containers.get(container.getId()) == null) { + handler.onIncreaseContainerResourceError( + container.getId(), + RPCUtil.getRemoteException( + "Container " + container.getId() + + " is neither started nor scheduled to start")); + } + try { + events.put(new IncreaseContainerResourceEvent(container)); + } catch (InterruptedException e) { + LOG.warn("Exception when scheduling the event of increasing resource of " + + "Container " + container.getId()); + handler.onIncreaseContainerResourceError(container.getId(), e); + } + } + public void stopContainerAsync(ContainerId containerId, NodeId nodeId) { if (containers.get(containerId) == null) { callbackHandler.onStopContainerError(containerId, @@ -276,7 +329,8 @@ public class NMClientAsyncImpl extends NMClientAsync { protected static enum ContainerEventType { START_CONTAINER, STOP_CONTAINER, - QUERY_CONTAINER + QUERY_CONTAINER, + INCREASE_CONTAINER_RESOURCE } protected static class ContainerEvent @@ -327,6 +381,21 @@ public class NMClientAsyncImpl extends NMClientAsync { } } + protected static class IncreaseContainerResourceEvent extends ContainerEvent { + private Container container; + + public IncreaseContainerResourceEvent(Container container) { + super(container.getId(), container.getNodeId(), + container.getContainerToken(), + ContainerEventType.INCREASE_CONTAINER_RESOURCE); + this.container = container; + } + + public Container getContainer() { + return container; + } + } + protected static class StatefulContainer implements EventHandler { @@ -344,7 +413,9 @@ public class NMClientAsyncImpl extends NMClientAsync { ContainerEventType.STOP_CONTAINER, new OutOfOrderTransition()) // Transitions from RUNNING state - // RUNNING -> RUNNING should be the invalid transition + .addTransition(ContainerState.RUNNING, ContainerState.RUNNING, + ContainerEventType.INCREASE_CONTAINER_RESOURCE, + new IncreaseContainerResourceTransition()) .addTransition(ContainerState.RUNNING, EnumSet.of(ContainerState.DONE, ContainerState.FAILED), ContainerEventType.STOP_CONTAINER, @@ -353,12 +424,14 @@ public class NMClientAsyncImpl extends NMClientAsync { // Transition from DONE state .addTransition(ContainerState.DONE, ContainerState.DONE, EnumSet.of(ContainerEventType.START_CONTAINER, - ContainerEventType.STOP_CONTAINER)) + ContainerEventType.STOP_CONTAINER, + ContainerEventType.INCREASE_CONTAINER_RESOURCE)) // Transition from FAILED state .addTransition(ContainerState.FAILED, ContainerState.FAILED, EnumSet.of(ContainerEventType.START_CONTAINER, - ContainerEventType.STOP_CONTAINER)); + ContainerEventType.STOP_CONTAINER, + ContainerEventType.INCREASE_CONTAINER_RESOURCE)); protected static class StartContainerTransition implements MultipleArcTransition { + @Override + public void transition( + StatefulContainer container, ContainerEvent event) { + if (!(container.nmClientAsync.getCallbackHandler() + instanceof AbstractCallbackHandler)) { + LOG.error("Callback handler does not implement container resource " + + "increase callback methods"); + return; + } + AbstractCallbackHandler handler = + (AbstractCallbackHandler) container.nmClientAsync + .getCallbackHandler(); + try { + if (!(event instanceof IncreaseContainerResourceEvent)) { + throw new AssertionError("Unexpected event type. Expecting:" + + "IncreaseContainerResourceEvent. Got:" + event); + } + IncreaseContainerResourceEvent increaseEvent = + (IncreaseContainerResourceEvent) event; + container.nmClientAsync.getClient().increaseContainerResource( + increaseEvent.getContainer()); + try { + handler.onContainerResourceIncreased( + increaseEvent.getContainerId(), increaseEvent.getContainer() + .getResource()); + } catch (Throwable thr) { + // Don't process user created unchecked exception + LOG.info("Unchecked exception is thrown from " + + "onContainerResourceIncreased for Container " + + event.getContainerId(), thr); + } + } catch (Exception e) { + try { + handler.onIncreaseContainerResourceError(event.getContainerId(), e); + } catch (Throwable thr) { + // Don't process user created unchecked exception + LOG.info("Unchecked exception is thrown from " + + "onIncreaseContainerResourceError for Container " + + event.getContainerId(), thr); + } + } + } + } + protected static class StopContainerTransition implements MultipleArcTransition { http://git-wip-us.apache.org/repos/asf/hadoop/blob/c99925d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java index 3518f35..e047368 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java @@ -35,6 +35,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -147,8 +149,7 @@ public class NMClientImpl extends NMClient { private ContainerState state; - public StartedContainer(ContainerId containerId, NodeId nodeId, - Token containerToken) { + public StartedContainer(ContainerId containerId, NodeId nodeId) { this.containerId = containerId; this.nodeId = nodeId; state = ContainerState.NEW; @@ -232,6 +233,34 @@ public class NMClientImpl extends NMClient { } @Override + public void increaseContainerResource(Container container) + throws YarnException, IOException { + ContainerManagementProtocolProxyData proxy = null; + try { + proxy = cmProxy.getProxy( + container.getNodeId().toString(), container.getId()); + List increaseTokens = new ArrayList<>(); + increaseTokens.add(container.getContainerToken()); + IncreaseContainersResourceRequest increaseRequest = + IncreaseContainersResourceRequest + .newInstance(increaseTokens); + IncreaseContainersResourceResponse response = + proxy.getContainerManagementProtocol() + .increaseContainersResource(increaseRequest); + if (response.getFailedRequests() != null + && response.getFailedRequests().containsKey(container.getId())) { + Throwable t = response.getFailedRequests().get(container.getId()) + .deSerialize(); + parseAndThrowException(t); + } + } finally { + if (proxy != null) { + cmProxy.mayBeCloseProxy(proxy); + } + } + } + + @Override public void stopContainer(ContainerId containerId, NodeId nodeId) throws YarnException, IOException { StartedContainer startedContainer = getStartedContainer(containerId); @@ -308,7 +337,7 @@ public class NMClientImpl extends NMClient { protected synchronized StartedContainer createStartedContainer( Container container) throws YarnException, IOException { StartedContainer startedContainer = new StartedContainer(container.getId(), - container.getNodeId(), container.getContainerToken()); + container.getNodeId()); return startedContainer; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c99925d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java index 6f9d41d..48f3431 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerArray; +import org.apache.hadoop.yarn.api.records.Resource; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; @@ -50,7 +51,6 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.async.NMClientAsync; -import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -117,6 +117,10 @@ public class TestNMClientAsync { asyncClient.startContainerAsync(container, clc); } while (!((TestCallbackHandler1) asyncClient.getCallbackHandler()) + .isIncreaseResourceFailureCallsExecuted()) { + Thread.sleep(10); + } + while (!((TestCallbackHandler1) asyncClient.getCallbackHandler()) .isStopFailureCallsExecuted()) { Thread.sleep(10); } @@ -183,7 +187,7 @@ public class TestNMClientAsync { } private class TestCallbackHandler1 - implements NMClientAsync.CallbackHandler { + extends NMClientAsync.AbstractCallbackHandler { private boolean path = true; @@ -196,6 +200,10 @@ public class TestNMClientAsync { private AtomicInteger actualQueryFailure = new AtomicInteger(0); private AtomicInteger actualStopSuccess = new AtomicInteger(0); private AtomicInteger actualStopFailure = new AtomicInteger(0); + private AtomicInteger actualIncreaseResourceSuccess = + new AtomicInteger(0); + private AtomicInteger actualIncreaseResourceFailure = + new AtomicInteger(0); private AtomicIntegerArray actualStartSuccessArray; private AtomicIntegerArray actualStartFailureArray; @@ -203,6 +211,8 @@ public class TestNMClientAsync { private AtomicIntegerArray actualQueryFailureArray; private AtomicIntegerArray actualStopSuccessArray; private AtomicIntegerArray actualStopFailureArray; + private AtomicIntegerArray actualIncreaseResourceSuccessArray; + private AtomicIntegerArray actualIncreaseResourceFailureArray; private Set errorMsgs = Collections.synchronizedSet(new HashSet()); @@ -217,6 +227,10 @@ public class TestNMClientAsync { actualQueryFailureArray = new AtomicIntegerArray(expectedFailure); actualStopSuccessArray = new AtomicIntegerArray(expectedSuccess); actualStopFailureArray = new AtomicIntegerArray(expectedFailure); + actualIncreaseResourceSuccessArray = + new AtomicIntegerArray(expectedSuccess); + actualIncreaseResourceFailureArray = + new AtomicIntegerArray(expectedFailure); } @SuppressWarnings("deprecation") @@ -236,7 +250,11 @@ public class TestNMClientAsync { asyncClient.getContainerStatusAsync(containerId, nodeId); } else { // move on to the following failure tests - asyncClient.stopContainerAsync(containerId, nodeId); + // make sure we pass in the container with the same + // containerId + Container container = Container.newInstance( + containerId, nodeId, null, null, null, containerToken); + asyncClient.increaseContainerResourceAsync(container); } // Shouldn't crash the test thread @@ -255,7 +273,11 @@ public class TestNMClientAsync { actualQuerySuccess.addAndGet(1); actualQuerySuccessArray.set(containerId.getId(), 1); // move on to the following success tests - asyncClient.stopContainerAsync(containerId, nodeId); + // make sure we pass in the container with the same + // containerId + Container container = Container.newInstance( + containerId, nodeId, null, null, null, containerToken); + asyncClient.increaseContainerResourceAsync(container); // Shouldn't crash the test thread throw new RuntimeException("Ignorable Exception"); @@ -263,6 +285,23 @@ public class TestNMClientAsync { @SuppressWarnings("deprecation") @Override + public void onContainerResourceIncreased( + ContainerId containerId, Resource resource) { + if (containerId.getId() >= expectedSuccess) { + errorMsgs.add("Container " + containerId + + " should throw the exception onContainerResourceIncreased"); + return; + } + actualIncreaseResourceSuccess.addAndGet(1); + actualIncreaseResourceSuccessArray.set(containerId.getId(), 1); + // move on to the following success tests + asyncClient.stopContainerAsync(containerId, nodeId); + // throw a fake user exception, and shouldn't crash the test + throw new RuntimeException("Ignorable Exception"); + } + + @SuppressWarnings("deprecation") + @Override public void onContainerStopped(ContainerId containerId) { if (containerId.getId() >= expectedSuccess) { errorMsgs.add("Container " + containerId + @@ -302,6 +341,26 @@ public class TestNMClientAsync { @SuppressWarnings("deprecation") @Override + public void onIncreaseContainerResourceError( + ContainerId containerId, Throwable t) { + if (containerId.getId() < expectedSuccess + expectedFailure) { + errorMsgs.add("Container " + containerId + + " shouldn't throw the exception onIncreaseContainerResourceError"); + return; + } + actualIncreaseResourceFailure.addAndGet(1); + actualIncreaseResourceFailureArray.set( + containerId.getId() - expectedSuccess - expectedFailure, 1); + // increase container resource error should NOT change the + // the container status to FAILED + // move on to the following failure tests + asyncClient.stopContainerAsync(containerId, nodeId); + // Shouldn't crash the test thread + throw new RuntimeException("Ignorable Exception"); + } + + @SuppressWarnings("deprecation") + @Override public void onStopContainerError(ContainerId containerId, Throwable t) { if (t instanceof RuntimeException) { errorMsgs.add("Unexpected throwable from callback functions should be" + @@ -345,10 +404,12 @@ public class TestNMClientAsync { boolean isAllSuccessCallsExecuted = actualStartSuccess.get() == expectedSuccess && actualQuerySuccess.get() == expectedSuccess && + actualIncreaseResourceSuccess.get() == expectedSuccess && actualStopSuccess.get() == expectedSuccess; if (isAllSuccessCallsExecuted) { assertAtomicIntegerArray(actualStartSuccessArray); assertAtomicIntegerArray(actualQuerySuccessArray); + assertAtomicIntegerArray(actualIncreaseResourceSuccessArray); assertAtomicIntegerArray(actualStopSuccessArray); } return isAllSuccessCallsExecuted; @@ -365,6 +426,15 @@ public class TestNMClientAsync { return isStartAndQueryFailureCallsExecuted; } + public boolean isIncreaseResourceFailureCallsExecuted() { + boolean isIncreaseResourceFailureCallsExecuted = + actualIncreaseResourceFailure.get() == expectedFailure; + if (isIncreaseResourceFailureCallsExecuted) { + assertAtomicIntegerArray(actualIncreaseResourceFailureArray); + } + return isIncreaseResourceFailureCallsExecuted; + } + public boolean isStopFailureCallsExecuted() { boolean isStopFailureCallsExecuted = actualStopFailure.get() == expectedFailure; @@ -392,6 +462,8 @@ public class TestNMClientAsync { when(client.getContainerStatus(any(ContainerId.class), any(NodeId.class))).thenReturn( recordFactory.newRecordInstance(ContainerStatus.class)); + doNothing().when(client).increaseContainerResource( + any(Container.class)); doNothing().when(client).stopContainer(any(ContainerId.class), any(NodeId.class)); break; @@ -411,6 +483,8 @@ public class TestNMClientAsync { when(client.getContainerStatus(any(ContainerId.class), any(NodeId.class))).thenReturn( recordFactory.newRecordInstance(ContainerStatus.class)); + doThrow(RPCUtil.getRemoteException("Increase Resource Exception")) + .when(client).increaseContainerResource(any(Container.class)); doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client) .stopContainer(any(ContainerId.class), any(NodeId.class)); } @@ -493,7 +567,7 @@ public class TestNMClientAsync { } private class TestCallbackHandler2 - implements NMClientAsync.CallbackHandler { + extends NMClientAsync.AbstractCallbackHandler { private CyclicBarrier barrierC; private AtomicBoolean exceptionOccurred = new AtomicBoolean(false); @@ -512,6 +586,10 @@ public class TestNMClientAsync { } @Override + public void onContainerResourceIncreased( + ContainerId containerId, Resource resource) {} + + @Override public void onContainerStopped(ContainerId containerId) { } @@ -537,9 +615,12 @@ public class TestNMClientAsync { } @Override + public void onIncreaseContainerResourceError( + ContainerId containerId, Throwable t) {} + + @Override public void onStopContainerError(ContainerId containerId, Throwable t) { } - } private Container mockContainer(int i) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/c99925d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java index 0d4a271..cd04130 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java @@ -210,10 +210,10 @@ public class TestNMClient { testContainerManagement(nmClient, allocateContainers(rmClient, 5)); rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, - null, null); + null, null); // don't stop the running containers stopNmClient(false); - assertFalse(nmClient.startedContainers. isEmpty()); + assertFalse(nmClient.startedContainers.isEmpty()); //now cleanup nmClient.cleanupRunningContainers(); assertEquals(0, nmClient.startedContainers.size()); @@ -298,6 +298,16 @@ public class TestNMClient { e.getMessage().contains("is not handled by this NodeManager")); } + // increaseContainerResource shouldn't be called before startContainer, + // otherwise, NodeManager cannot find the container + try { + nmClient.increaseContainerResource(container); + fail("Exception is expected"); + } catch (YarnException e) { + assertTrue("The thrown exception is not expected", + e.getMessage().contains("is not handled by this NodeManager")); + } + // stopContainer shouldn't be called before startContainer, // otherwise, an exception will be thrown try { @@ -332,6 +342,8 @@ public class TestNMClient { // NodeManager may still need some time to make the container started testGetContainerStatus(container, i, ContainerState.RUNNING, "", Arrays.asList(new Integer[] {-1000})); + // Test increase container API and make sure requests can reach NM + testIncreaseContainerResource(container); try { nmClient.stopContainer(container.getId(), container.getNodeId()); @@ -397,4 +409,19 @@ public class TestNMClient { } } + private void testIncreaseContainerResource(Container container) + throws YarnException, IOException { + try { + nmClient.increaseContainerResource(container); + } catch (YarnException e) { + // NM container will only be in LOCALIZED state, so expect the increase + // action to fail. + if (!e.getMessage().contains( + "can only be changed when a container is in RUNNING state")) { + throw (AssertionError) + (new AssertionError("Exception is not expected: " + e) + .initCause(e)); + } + } + } }