hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From su...@apache.org
Subject hadoop git commit: YARN-6962. Add support for updateContainers when allocating using FederationInterceptor. (Botong Huang via Subru).
Date Thu, 28 Sep 2017 20:04:12 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk 85d81ae58 -> ca669f9f8


YARN-6962. Add support for updateContainers when allocating using FederationInterceptor. (Botong
Huang via Subru).


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ca669f9f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ca669f9f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ca669f9f

Branch: refs/heads/trunk
Commit: ca669f9f8bc7abe5b7d4648c589aa1756bd336d1
Parents: 85d81ae5
Author: Subru Krishnan <subru@apache.org>
Authored: Thu Sep 28 13:04:03 2017 -0700
Committer: Subru Krishnan <subru@apache.org>
Committed: Thu Sep 28 13:04:03 2017 -0700

----------------------------------------------------------------------
 .../amrmproxy/FederationInterceptor.java        | 86 +++++++++++++-------
 .../amrmproxy/TestFederationInterceptor.java    | 54 ++++++++++++
 2 files changed, 111 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca669f9f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index 28724aa..33cfca3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -540,30 +540,33 @@ public class FederationInterceptor extends AbstractRequestInterceptor
{
       }
     }
 
-    if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty(
-        request.getResourceBlacklistRequest().getBlacklistAdditions())) {
-      for (String resourceName : request.getResourceBlacklistRequest()
-          .getBlacklistAdditions()) {
-        SubClusterId subClusterId = getSubClusterForNode(resourceName);
-        if (subClusterId != null) {
-          AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
-              subClusterId, request, requestMap);
-          newRequest.getResourceBlacklistRequest().getBlacklistAdditions()
-              .add(resourceName);
+    if (request.getResourceBlacklistRequest() != null) {
+      if (!isNullOrEmpty(
+          request.getResourceBlacklistRequest().getBlacklistAdditions())) {
+        for (String resourceName : request.getResourceBlacklistRequest()
+            .getBlacklistAdditions()) {
+          SubClusterId subClusterId = getSubClusterForNode(resourceName);
+          if (subClusterId != null) {
+            AllocateRequest newRequest =
+                findOrCreateAllocateRequestForSubCluster(subClusterId, request,
+                    requestMap);
+            newRequest.getResourceBlacklistRequest().getBlacklistAdditions()
+                .add(resourceName);
+          }
         }
       }
-    }
-
-    if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty(
-        request.getResourceBlacklistRequest().getBlacklistRemovals())) {
-      for (String resourceName : request.getResourceBlacklistRequest()
-          .getBlacklistRemovals()) {
-        SubClusterId subClusterId = getSubClusterForNode(resourceName);
-        if (subClusterId != null) {
-          AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
-              subClusterId, request, requestMap);
-          newRequest.getResourceBlacklistRequest().getBlacklistRemovals()
-              .add(resourceName);
+      if (!isNullOrEmpty(
+          request.getResourceBlacklistRequest().getBlacklistRemovals())) {
+        for (String resourceName : request.getResourceBlacklistRequest()
+            .getBlacklistRemovals()) {
+          SubClusterId subClusterId = getSubClusterForNode(resourceName);
+          if (subClusterId != null) {
+            AllocateRequest newRequest =
+                findOrCreateAllocateRequestForSubCluster(subClusterId, request,
+                    requestMap);
+            newRequest.getResourceBlacklistRequest().getBlacklistRemovals()
+                .add(resourceName);
+          }
         }
       }
     }
@@ -896,13 +899,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor
{
       }
     }
 
-    if (!isNullOrEmpty(otherResponse.getNMTokens())) {
-      if (!isNullOrEmpty(homeResponse.getNMTokens())) {
-        homeResponse.getNMTokens().addAll(otherResponse.getNMTokens());
-      } else {
-        homeResponse.setNMTokens(otherResponse.getNMTokens());
-      }
-    }
+    homeResponse.setNumClusterNodes(
+        homeResponse.getNumClusterNodes() + otherResponse.getNumClusterNodes());
 
     PreemptionMessage homePreempMessage = homeResponse.getPreemptionMessage();
     PreemptionMessage otherPreempMessage = otherResponse.getPreemptionMessage();
@@ -935,6 +933,31 @@ public class FederationInterceptor extends AbstractRequestInterceptor
{
         spar1.getContainers().addAll(spar2.getContainers());
       }
     }
+
+    if (!isNullOrEmpty(otherResponse.getNMTokens())) {
+      if (!isNullOrEmpty(homeResponse.getNMTokens())) {
+        homeResponse.getNMTokens().addAll(otherResponse.getNMTokens());
+      } else {
+        homeResponse.setNMTokens(otherResponse.getNMTokens());
+      }
+    }
+
+    if (!isNullOrEmpty(otherResponse.getUpdatedContainers())) {
+      if (!isNullOrEmpty(homeResponse.getUpdatedContainers())) {
+        homeResponse.getUpdatedContainers()
+            .addAll(otherResponse.getUpdatedContainers());
+      } else {
+        homeResponse.setUpdatedContainers(otherResponse.getUpdatedContainers());
+      }
+    }
+
+    if (!isNullOrEmpty(otherResponse.getUpdateErrors())) {
+      if (!isNullOrEmpty(homeResponse.getUpdateErrors())) {
+        homeResponse.getUpdateErrors().addAll(otherResponse.getUpdateErrors());
+      } else {
+        homeResponse.setUpdateErrors(otherResponse.getUpdateErrors());
+      }
+    }
   }
 
   /**
@@ -1052,6 +1075,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor
{
     return this.uamPool.getAllUAMIds().size();
   }
 
+  @VisibleForTesting
+  public Map<SubClusterId, List<AllocateResponse>> getAsyncResponseSink() {
+    return this.asyncResponseSink;
+  }
+
   /**
    * Private structure for encapsulating SubClusterId and
    * RegisterApplicationMasterResponse instances.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca669f9f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
index 34b0741..3db0e35 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -20,7 +20,9 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
@@ -36,8 +38,15 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.PreemptionMessage;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.UpdateContainerError;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -493,4 +502,49 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
     } catch (YarnException e) {
     }
   }
+
+  @Test
+  public void testAllocateResponse() throws Exception {
+    interceptor.registerApplicationMaster(
+        RegisterApplicationMasterRequest.newInstance(null, 0, null));
+    AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
+
+    Map<SubClusterId, List<AllocateResponse>> asyncResponseSink =
+        interceptor.getAsyncResponseSink();
+
+    ContainerId cid = ContainerId.newContainerId(attemptId, 0);
+    ContainerStatus cStatus = Records.newRecord(ContainerStatus.class);
+    cStatus.setContainerId(cid);
+    Container container =
+        Container.newInstance(cid, null, null, null, null, null);
+
+    AllocateResponse response = Records.newRecord(AllocateResponse.class);
+    response.setAllocatedContainers(Collections.singletonList(container));
+    response.setCompletedContainersStatuses(Collections.singletonList(cStatus));
+    response.setUpdatedNodes(
+        Collections.singletonList(Records.newRecord(NodeReport.class)));
+    response.setNMTokens(
+        Collections.singletonList(Records.newRecord(NMToken.class)));
+    response.setUpdatedContainers(
+        Collections.singletonList(Records.newRecord(UpdatedContainer.class)));
+    response.setUpdateErrors(Collections
+        .singletonList(Records.newRecord(UpdateContainerError.class)));
+    response.setAvailableResources(Records.newRecord(Resource.class));
+    response.setPreemptionMessage(Records.newRecord(PreemptionMessage.class));
+
+    List<AllocateResponse> list = new ArrayList<>();
+    list.add(response);
+    asyncResponseSink.put(SubClusterId.newInstance("SC-1"), list);
+
+    response = interceptor.allocate(allocateRequest);
+
+    Assert.assertEquals(1, response.getAllocatedContainers().size());
+    Assert.assertNotNull(response.getAvailableResources());
+    Assert.assertEquals(1, response.getCompletedContainersStatuses().size());
+    Assert.assertEquals(1, response.getUpdatedNodes().size());
+    Assert.assertNotNull(response.getPreemptionMessage());
+    Assert.assertEquals(1, response.getNMTokens().size());
+    Assert.assertEquals(1, response.getUpdatedContainers().size());
+    Assert.assertEquals(1, response.getUpdateErrors().size());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message