Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 065BC200CC5 for ; Mon, 26 Jun 2017 22:27:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 050A8160BF5; Mon, 26 Jun 2017 20:27:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7A611160BDA for ; Mon, 26 Jun 2017 22:27:44 +0200 (CEST) Received: (qmail 85327 invoked by uid 500); 26 Jun 2017 20:27:43 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 85314 invoked by uid 99); 26 Jun 2017 20:27:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 26 Jun 2017 20:27:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 353A8DFE22; Mon, 26 Jun 2017 20:27:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: subru@apache.org To: common-commits@hadoop.apache.org Date: Mon, 26 Jun 2017 20:27:43 -0000 Message-Id: <8e69b362d9844b08b0fee278a9cc6dde@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] hadoop git commit: YARN-3659. Federation: routing client invocations transparently to multiple RMs. (Giovanni Matteo Fumarola via Subru). archived-at: Mon, 26 Jun 2017 20:27:46 -0000 Repository: hadoop Updated Branches: refs/heads/YARN-2915 ff2f39aea -> 590d959ee http://git-wip-us.apache.org/repos/asf/hadoop/blob/590d959e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java new file mode 100644 index 0000000..87dfc95 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -0,0 +1,403 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; +import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Extends the {@code BaseRouterClientRMTest} and overrides methods in order to + * use the {@code RouterClientRMService} pipeline test cases for testing the + * {@code FederationInterceptor} class. The tests for + * {@code RouterClientRMService} has been written cleverly so that it can be + * reused to validate different request intercepter chains. + */ +public class TestFederationClientInterceptor extends BaseRouterClientRMTest { + private static final Logger LOG = + LoggerFactory.getLogger(TestFederationClientInterceptor.class); + + private TestableFederationClientInterceptor interceptor; + private MemoryFederationStateStore stateStore; + private FederationStateStoreTestUtil stateStoreUtil; + private List subClusters; + + private String user = "test-user"; + + private final static int NUM_SUBCLUSTER = 4; + + @Override + public void setUp() { + super.setUpConfig(); + interceptor = new TestableFederationClientInterceptor(); + + stateStore = new MemoryFederationStateStore(); + stateStore.init(this.getConf()); + FederationStateStoreFacade.getInstance().reinitialize(stateStore, + getConf()); + stateStoreUtil = new FederationStateStoreTestUtil(stateStore); + + interceptor.setConf(this.getConf()); + interceptor.init(user); + + subClusters = new ArrayList(); + + try { + for (int i = 0; i < NUM_SUBCLUSTER; i++) { + SubClusterId sc = SubClusterId.newInstance(Integer.toString(i)); + stateStoreUtil.registerSubCluster(sc); + subClusters.add(sc); + } + } catch (YarnException e) { + LOG.error(e.getMessage()); + Assert.fail(); + } + + } + + @Override + public void tearDown() { + interceptor.shutdown(); + super.tearDown(); + } + + @Override + protected YarnConfiguration createConfiguration() { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); + String mockPassThroughInterceptorClass = + PassThroughClientRequestInterceptor.class.getName(); + + // Create a request intercepter pipeline for testing. The last one in the + // chain is the federation intercepter that calls the mock resource manager. + // The others in the chain will simply forward it to the next one in the + // chain + conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE, + mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass + + "," + TestableFederationClientInterceptor.class.getName()); + + conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER, + UniformBroadcastPolicyManager.class.getName()); + + // Disable StateStoreFacade cache + conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0); + + return conf; + } + + /** + * This test validates the correctness of GetNewApplication. The return + * ApplicationId has to belong to one of the SubCluster in the cluster. + */ + @Test + public void testGetNewApplication() + throws YarnException, IOException, InterruptedException { + System.out.println("Test FederationClientInterceptor: Get New Application"); + + GetNewApplicationRequest request = GetNewApplicationRequest.newInstance(); + GetNewApplicationResponse response = interceptor.getNewApplication(request); + + Assert.assertNotNull(response); + Assert.assertNotNull(response.getApplicationId()); + Assert.assertTrue( + response.getApplicationId().getClusterTimestamp() < NUM_SUBCLUSTER); + Assert.assertTrue(response.getApplicationId().getClusterTimestamp() >= 0); + } + + /** + * This test validates the correctness of SubmitApplication. The application + * has to be submitted to one of the SubCluster in the cluster. + */ + @Test + public void testSubmitApplication() + throws YarnException, IOException, InterruptedException { + System.out.println("Test FederationClientInterceptor: Submit Application"); + + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationSubmissionContext context = ApplicationSubmissionContext + .newInstance(appId, "", "", null, null, false, false, -1, null, null); + SubmitApplicationRequest request = + SubmitApplicationRequest.newInstance(context); + + SubmitApplicationResponse response = interceptor.submitApplication(request); + + Assert.assertNotNull(response); + SubClusterId scIdResult = stateStoreUtil.queryApplicationHomeSC(appId); + Assert.assertNotNull(scIdResult); + Assert.assertTrue(subClusters.contains(scIdResult)); + } + + /** + * This test validates the correctness of SubmitApplication in case of + * multiple submission. The first retry has to be submitted to the same + * SubCluster of the first attempt. + */ + @Test + public void testSubmitApplicationMultipleSubmission() + throws YarnException, IOException, InterruptedException { + System.out.println( + "Test FederationClientInterceptor: Submit Application - Multiple"); + + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationSubmissionContext context = ApplicationSubmissionContext + .newInstance(appId, "", "", null, null, false, false, -1, null, null); + SubmitApplicationRequest request = + SubmitApplicationRequest.newInstance(context); + + // First attempt + SubmitApplicationResponse response = interceptor.submitApplication(request); + + Assert.assertNotNull(response); + SubClusterId scIdResult = stateStoreUtil.queryApplicationHomeSC(appId); + Assert.assertNotNull(scIdResult); + + // First retry + response = interceptor.submitApplication(request); + + Assert.assertNotNull(response); + SubClusterId scIdResult2 = stateStoreUtil.queryApplicationHomeSC(appId); + Assert.assertNotNull(scIdResult2); + Assert.assertEquals(scIdResult, scIdResult); + } + + /** + * This test validates the correctness of SubmitApplication in case of empty + * request. + */ + @Test + public void testSubmitApplicationEmptyRequest() + throws YarnException, IOException, InterruptedException { + System.out.println( + "Test FederationClientInterceptor: Submit Application - Empty"); + try { + interceptor.submitApplication(null); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue( + e.getMessage().startsWith("Missing submitApplication request or " + + "applicationSubmissionContex information.")); + } + try { + interceptor.submitApplication(SubmitApplicationRequest.newInstance(null)); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue( + e.getMessage().startsWith("Missing submitApplication request or " + + "applicationSubmissionContex information.")); + } + try { + ApplicationSubmissionContext context = ApplicationSubmissionContext + .newInstance(null, "", "", null, null, false, false, -1, null, null); + SubmitApplicationRequest request = + SubmitApplicationRequest.newInstance(context); + interceptor.submitApplication(request); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue( + e.getMessage().startsWith("Missing submitApplication request or " + + "applicationSubmissionContex information.")); + } + } + + /** + * This test validates the correctness of ForceKillApplication in case the + * application exists in the cluster. + */ + @Test + public void testForceKillApplication() + throws YarnException, IOException, InterruptedException { + System.out + .println("Test FederationClientInterceptor: Force Kill Application"); + + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationSubmissionContext context = ApplicationSubmissionContext + .newInstance(appId, "", "", null, null, false, false, -1, null, null); + + SubmitApplicationRequest request = + SubmitApplicationRequest.newInstance(context); + // Submit the application we are going to kill later + SubmitApplicationResponse response = interceptor.submitApplication(request); + + Assert.assertNotNull(response); + Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); + + KillApplicationRequest requestKill = + KillApplicationRequest.newInstance(appId); + KillApplicationResponse responseKill = + interceptor.forceKillApplication(requestKill); + Assert.assertNotNull(responseKill); + } + + /** + * This test validates the correctness of ForceKillApplication in case of + * application does not exist in StateStore. + */ + @Test + public void testForceKillApplicationNotExists() + throws YarnException, IOException, InterruptedException { + System.out.println("Test FederationClientInterceptor: " + + "Force Kill Application - Not Exists"); + + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + KillApplicationRequest requestKill = + KillApplicationRequest.newInstance(appId); + try { + interceptor.forceKillApplication(requestKill); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().equals( + "Application " + appId + " does not exist in FederationStateStore")); + } + } + + /** + * This test validates the correctness of ForceKillApplication in case of + * empty request. + */ + @Test + public void testForceKillApplicationEmptyRequest() + throws YarnException, IOException, InterruptedException { + System.out.println( + "Test FederationClientInterceptor: Force Kill Application - Empty"); + try { + interceptor.forceKillApplication(null); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().startsWith( + "Missing forceKillApplication request or ApplicationId.")); + } + try { + interceptor + .forceKillApplication(KillApplicationRequest.newInstance(null)); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().startsWith( + "Missing forceKillApplication request or ApplicationId.")); + } + } + + /** + * This test validates the correctness of GetApplicationReport in case the + * application exists in the cluster. + */ + @Test + public void testGetApplicationReport() + throws YarnException, IOException, InterruptedException { + System.out + .println("Test FederationClientInterceptor: Get Application Report"); + + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationSubmissionContext context = ApplicationSubmissionContext + .newInstance(appId, "", "", null, null, false, false, -1, null, null); + + SubmitApplicationRequest request = + SubmitApplicationRequest.newInstance(context); + // Submit the application we want the report later + SubmitApplicationResponse response = interceptor.submitApplication(request); + + Assert.assertNotNull(response); + Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); + + GetApplicationReportRequest requestGet = + GetApplicationReportRequest.newInstance(appId); + + GetApplicationReportResponse responseGet = + interceptor.getApplicationReport(requestGet); + + Assert.assertNotNull(responseGet); + } + + /** + * This test validates the correctness of GetApplicationReport in case the + * application does not exist in StateStore. + */ + @Test + public void testGetApplicationNotExists() + throws YarnException, IOException, InterruptedException { + System.out.println( + "Test ApplicationClientProtocol: Get Application Report - Not Exists"); + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + GetApplicationReportRequest requestGet = + GetApplicationReportRequest.newInstance(appId); + try { + interceptor.getApplicationReport(requestGet); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().equals( + "Application " + appId + " does not exist in FederationStateStore")); + } + } + + /** + * This test validates the correctness of GetApplicationReport in case of + * empty request. + */ + @Test + public void testGetApplicationEmptyRequest() + throws YarnException, IOException, InterruptedException { + System.out.println( + "Test FederationClientInterceptor: Get Application Report - Empty"); + try { + interceptor.getApplicationReport(null); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue( + e.getMessage().startsWith("Missing getApplicationReport request or " + + "applicationId information.")); + } + try { + interceptor + .getApplicationReport(GetApplicationReportRequest.newInstance(null)); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue( + e.getMessage().startsWith("Missing getApplicationReport request or " + + "applicationId information.")); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/590d959e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java new file mode 100644 index 0000000..a655c16 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java @@ -0,0 +1,295 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; +import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; +import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Extends the {@code BaseRouterClientRMTest} and overrides methods in order to + * use the {@code RouterClientRMService} pipeline test cases for testing the + * {@code FederationInterceptor} class. The tests for + * {@code RouterClientRMService} has been written cleverly so that it can be + * reused to validate different request intercepter chains. + * + * It tests the case with SubClusters down and the Router logic of retries. We + * have 1 good SubCluster and 2 bad ones for all the tests. + */ +public class TestFederationClientInterceptorRetry + extends BaseRouterClientRMTest { + private static final Logger LOG = + LoggerFactory.getLogger(TestFederationClientInterceptorRetry.class); + + private TestableFederationClientInterceptor interceptor; + private MemoryFederationStateStore stateStore; + private FederationStateStoreTestUtil stateStoreUtil; + + private String user = "test-user"; + + // running and registered + private static SubClusterId good; + + // registered but not running + private static SubClusterId bad1; + private static SubClusterId bad2; + + private static List scs = new ArrayList(); + + @Override + public void setUp() { + super.setUpConfig(); + interceptor = new TestableFederationClientInterceptor(); + + stateStore = new MemoryFederationStateStore(); + stateStore.init(this.getConf()); + FederationStateStoreFacade.getInstance().reinitialize(stateStore, + getConf()); + stateStoreUtil = new FederationStateStoreTestUtil(stateStore); + + interceptor.setConf(this.getConf()); + interceptor.init(user); + + // Create SubClusters + good = SubClusterId.newInstance("0"); + bad1 = SubClusterId.newInstance("1"); + bad2 = SubClusterId.newInstance("2"); + scs.add(good); + scs.add(bad1); + scs.add(bad2); + + // The mock RM will not start in these SubClusters, this is done to simulate + // a SubCluster down + + interceptor.registerBadSubCluster(bad1); + interceptor.registerBadSubCluster(bad2); + } + + @Override + public void tearDown() { + interceptor.shutdown(); + super.tearDown(); + } + + private void setupCluster(List scsToRegister) + throws YarnException { + + try { + // Clean up the StateStore before every test + stateStoreUtil.deregisterAllSubClusters(); + + for (SubClusterId sc : scsToRegister) { + stateStoreUtil.registerSubCluster(sc); + } + } catch (YarnException e) { + LOG.error(e.getMessage()); + Assert.fail(); + } + } + + @Override + protected YarnConfiguration createConfiguration() { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); + String mockPassThroughInterceptorClass = + PassThroughClientRequestInterceptor.class.getName(); + + // Create a request intercepter pipeline for testing. The last one in the + // chain is the federation intercepter that calls the mock resource manager. + // The others in the chain will simply forward it to the next one in the + // chain + conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE, + mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass + + "," + TestableFederationClientInterceptor.class.getName()); + + conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER, + UniformBroadcastPolicyManager.class.getName()); + + // Disable StateStoreFacade cache + conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0); + + return conf; + } + + /** + * This test validates the correctness of GetNewApplication in case the + * cluster is composed of only 1 bad SubCluster. + */ + @Test + public void testGetNewApplicationOneBadSC() + throws YarnException, IOException, InterruptedException { + + System.out.println("Test getNewApplication with one bad SubCluster"); + setupCluster(Arrays.asList(bad2)); + + try { + interceptor.getNewApplication(GetNewApplicationRequest.newInstance()); + Assert.fail(); + } catch (Exception e) { + System.out.println(e.toString()); + Assert.assertTrue(e.getMessage() + .equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE)); + } + } + + /** + * This test validates the correctness of GetNewApplication in case the + * cluster is composed of only 2 bad SubClusters. + */ + @Test + public void testGetNewApplicationTwoBadSCs() + throws YarnException, IOException, InterruptedException { + System.out.println("Test getNewApplication with two bad SubClusters"); + setupCluster(Arrays.asList(bad1, bad2)); + + try { + interceptor.getNewApplication(GetNewApplicationRequest.newInstance()); + Assert.fail(); + } catch (Exception e) { + System.out.println(e.toString()); + Assert.assertTrue(e.getMessage() + .equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE)); + } + } + + /** + * This test validates the correctness of GetNewApplication in case the + * cluster is composed of only 1 bad SubCluster and 1 good one. + */ + @Test + public void testGetNewApplicationOneBadOneGood() + throws YarnException, IOException, InterruptedException { + System.out.println("Test getNewApplication with one bad, one good SC"); + setupCluster(Arrays.asList(good, bad2)); + GetNewApplicationResponse response = null; + try { + response = + interceptor.getNewApplication(GetNewApplicationRequest.newInstance()); + } catch (Exception e) { + Assert.fail(); + } + Assert.assertEquals(Integer.parseInt(good.getId()), + response.getApplicationId().getClusterTimestamp()); + } + + /** + * This test validates the correctness of SubmitApplication in case the + * cluster is composed of only 1 bad SubCluster. + */ + @Test + public void testSubmitApplicationOneBadSC() + throws YarnException, IOException, InterruptedException { + + System.out.println("Test submitApplication with one bad SubCluster"); + setupCluster(Arrays.asList(bad2)); + + final ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + + ApplicationSubmissionContext context = ApplicationSubmissionContext + .newInstance(appId, "", "", null, null, false, false, -1, null, null); + final SubmitApplicationRequest request = + SubmitApplicationRequest.newInstance(context); + try { + interceptor.submitApplication(request); + Assert.fail(); + } catch (Exception e) { + System.out.println(e.toString()); + Assert.assertTrue(e.getMessage() + .equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE)); + } + } + + /** + * This test validates the correctness of SubmitApplication in case the + * cluster is composed of only 2 bad SubClusters. + */ + @Test + public void testSubmitApplicationTwoBadSCs() + throws YarnException, IOException, InterruptedException { + System.out.println("Test submitApplication with two bad SubClusters"); + setupCluster(Arrays.asList(bad1, bad2)); + + final ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + + ApplicationSubmissionContext context = ApplicationSubmissionContext + .newInstance(appId, "", "", null, null, false, false, -1, null, null); + final SubmitApplicationRequest request = + SubmitApplicationRequest.newInstance(context); + try { + interceptor.submitApplication(request); + Assert.fail(); + } catch (Exception e) { + System.out.println(e.toString()); + Assert.assertTrue(e.getMessage() + .equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE)); + } + } + + /** + * This test validates the correctness of SubmitApplication in case the + * cluster is composed of only 1 bad SubCluster and a good one. + */ + @Test + public void testSubmitApplicationOneBadOneGood() + throws YarnException, IOException, InterruptedException { + System.out.println("Test submitApplication with one bad, one good SC"); + setupCluster(Arrays.asList(good, bad2)); + + final ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + + ApplicationSubmissionContext context = ApplicationSubmissionContext + .newInstance(appId, "", "", null, null, false, false, -1, null, null); + final SubmitApplicationRequest request = + SubmitApplicationRequest.newInstance(context); + try { + interceptor.submitApplication(request); + } catch (Exception e) { + Assert.fail(); + } + Assert.assertEquals(good, + stateStore + .getApplicationHomeSubCluster( + GetApplicationHomeSubClusterRequest.newInstance(appId)) + .getApplicationHomeSubCluster().getHomeSubCluster()); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/590d959e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java new file mode 100644 index 0000000..e4a1a42 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.MockResourceManagerFacade; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; + +/** + * Extends the FederationClientInterceptor and overrides methods to provide a + * testable implementation of FederationClientInterceptor. + */ +public class TestableFederationClientInterceptor + extends FederationClientInterceptor { + + private ConcurrentHashMap mockRMs = + new ConcurrentHashMap<>(); + + private List badSubCluster = new ArrayList(); + + @Override + protected ApplicationClientProtocol getClientRMProxyForSubCluster( + SubClusterId subClusterId) throws YarnException { + + MockResourceManagerFacade mockRM = null; + synchronized (this) { + if (mockRMs.containsKey(subClusterId)) { + mockRM = mockRMs.get(subClusterId); + } else { + mockRM = new MockResourceManagerFacade(super.getConf(), 0, + Integer.parseInt(subClusterId.getId()), + !badSubCluster.contains(subClusterId)); + mockRMs.put(subClusterId, mockRM); + + } + return mockRM; + } + } + + /** + * For testing purpose, some subclusters has to be down to simulate particular + * scenarios as RM Failover, network issues. For this reason we keep track of + * these bad subclusters. This method make the subcluster unusable. + * + * @param badSC the subcluster to make unusable + */ + protected void registerBadSubCluster(SubClusterId badSC) { + badSubCluster.add(badSC); + if (mockRMs.contains(badSC)) { + mockRMs.get(badSC).setRunningMode(false); + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org