Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3596E200D39 for ; Sat, 28 Oct 2017 01:48:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 34147160BF2; Fri, 27 Oct 2017 23:48:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D96BB160BF4 for ; Sat, 28 Oct 2017 01:47:58 +0200 (CEST) Received: (qmail 96620 invoked by uid 500); 27 Oct 2017 23:47:58 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 96611 invoked by uid 99); 27 Oct 2017 23:47:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 27 Oct 2017 23:47:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DF2FFDFC35; Fri, 27 Oct 2017 23:47:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: inigoiri@apache.org To: common-commits@hadoop.apache.org Date: Fri, 27 Oct 2017 23:47:57 -0000 Message-Id: <6fe6fe9af461448197fd1c10afe17264@git.apache.org> In-Reply-To: <519e724fea38465f9299c3d23c661d30@git.apache.org> References: <519e724fea38465f9299c3d23c661d30@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] hadoop git commit: YARN-7276. Federation Router Web Service fixes. Contributed by Inigo Goiri. archived-at: Fri, 27 Oct 2017 23:48:01 -0000 YARN-7276. Federation Router Web Service fixes. Contributed by Inigo Goiri. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8be57070 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8be57070 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8be57070 Branch: refs/heads/trunk Commit: 8be5707067509b78bde5fcf60072ae988d5a9f32 Parents: d55a849 Author: Inigo Goiri Authored: Fri Oct 27 16:46:05 2017 -0700 Committer: Inigo Goiri Committed: Fri Oct 27 16:46:05 2017 -0700 ---------------------------------------------------------------------- .../hadoop/yarn/server/router/Router.java | 6 + .../webapp/DefaultRequestInterceptorREST.java | 12 +- .../webapp/FederationInterceptorREST.java | 255 ++-- .../router/webapp/RouterWebServiceUtil.java | 59 +- .../server/router/webapp/RouterWebServices.java | 97 +- .../webapp/BaseRouterWebServicesTest.java | 400 ++---- .../yarn/server/router/webapp/JavaProcess.java | 15 +- .../webapp/TestRouterWebServicesREST.java | 1169 ++++++++++-------- 8 files changed, 1018 insertions(+), 995 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8be57070/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java index 121e534..76050d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringUtils; @@ -74,6 +75,8 @@ public class Router extends CompositeService { */ public static final int SHUTDOWN_HOOK_PRIORITY = 30; + private static final String METRICS_NAME = "Router"; + public Router() { super(Router.class.getName()); } @@ -95,6 +98,8 @@ public class Router extends CompositeService { webAppAddress = WebAppUtils.getWebAppBindURL(this.conf, YarnConfiguration.ROUTER_BIND_HOST, WebAppUtils.getRouterWebAppURLWithoutScheme(this.conf)); + // Metrics + DefaultMetricsSystem.initialize(METRICS_NAME); super.serviceInit(conf); } @@ -118,6 +123,7 @@ public class Router extends CompositeService { return; } super.serviceStop(); + DefaultMetricsSystem.shutdown(); } protected void shutDown() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/8be57070/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java index abd8ca6..72ed02f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java @@ -129,7 +129,9 @@ public class DefaultRequestInterceptorREST public NodesInfo getNodes(String states) { // states will be part of additionalParam Map additionalParam = new HashMap(); - additionalParam.put(RMWSConsts.STATES, new String[] {states}); + if (states != null && !states.isEmpty()) { + additionalParam.put(RMWSConsts.STATES, new String[] {states}); + } return RouterWebServiceUtil.genericForward(webAppAddress, null, NodesInfo.class, HTTPMethods.GET, RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.NODES, null, @@ -226,9 +228,11 @@ public class DefaultRequestInterceptorREST public LabelsToNodesInfo getLabelsToNodes(Set labels) throws IOException { // labels will be part of additionalParam - Map additionalParam = new HashMap(); - additionalParam.put(RMWSConsts.LABELS, - labels.toArray(new String[labels.size()])); + Map additionalParam = new HashMap<>(); + if (labels != null && !labels.isEmpty()) { + additionalParam.put(RMWSConsts.LABELS, + labels.toArray(new String[labels.size()])); + } return RouterWebServiceUtil.genericForward(webAppAddress, null, LabelsToNodesInfo.class, HTTPMethods.GET, RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.LABEL_MAPPINGS, null, http://git-wip-us.apache.org/repos/asf/hadoop/blob/8be57070/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index 5adcc62..6e67634 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.router.webapp; import java.io.IOException; +import java.security.Principal; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -26,12 +27,15 @@ import java.util.Map; import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletRequestWrapper; import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; @@ -48,6 +52,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; @@ -121,29 +126,33 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { @Override public void init(String user) { federationFacade = FederationStateStoreFacade.getInstance(); - rand = new Random(System.currentTimeMillis()); + rand = new Random(); final Configuration conf = this.getConf(); try { - policyFacade = new RouterPolicyFacade(conf, federationFacade, - this.federationFacade.getSubClusterResolver(), null); + SubClusterResolver subClusterResolver = + this.federationFacade.getSubClusterResolver(); + policyFacade = new RouterPolicyFacade( + conf, federationFacade, subClusterResolver, null); } catch (FederationPolicyInitializationException e) { - LOG.error(e.getMessage()); + throw new YarnRuntimeException(e); } - numSubmitRetries = - conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY, - YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY); + numSubmitRetries = conf.getInt( + YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY, + YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY); - interceptors = new HashMap(); + interceptors = new HashMap<>(); routerMetrics = RouterMetrics.getMetrics(); - threadpool = HadoopExecutors.newCachedThreadPool(new ThreadFactoryBuilder() - .setNameFormat("FederationInterceptorREST #%d").build()); - - returnPartialReport = - conf.getBoolean(YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED, - YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED); + threadpool = HadoopExecutors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setNameFormat("FederationInterceptorREST #%d") + .build()); + + returnPartialReport = conf.getBoolean( + YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED, + YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED); } private SubClusterId getRandomActiveSubCluster( @@ -156,8 +165,8 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { } List list = new ArrayList<>(activeSubclusters.keySet()); - FederationPolicyUtils.validateSubClusterAvailability(list, - blackListSubClusters); + FederationPolicyUtils.validateSubClusterAvailability( + list, blackListSubClusters); if (blackListSubClusters != null) { @@ -176,8 +185,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { if (interceptors.containsKey(subClusterId)) { return interceptors.get(subClusterId); } else { - LOG.error("The interceptor for SubCluster " + subClusterId - + " does not exist in the cache."); + LOG.error( + "The interceptor for SubCluster {} does not exist in the cache.", + subClusterId); return null; } } @@ -187,9 +197,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { final Configuration conf = this.getConf(); - String interceptorClassName = - conf.get(YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS, - YarnConfiguration.DEFAULT_ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS); + String interceptorClassName = conf.get( + YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS, + YarnConfiguration.DEFAULT_ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS); DefaultRequestInterceptorREST interceptorInstance = null; try { Class interceptorClass = conf.getClassByName(interceptorClassName); @@ -210,7 +220,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { e); } - interceptorInstance.setWebAppAddress(webAppAddress); + interceptorInstance.setWebAppAddress("http://" + webAppAddress); interceptorInstance.setSubClusterId(subClusterId); interceptors.put(subClusterId, interceptorInstance); return interceptorInstance; @@ -272,8 +282,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { .entity(e.getLocalizedMessage()).build(); } - LOG.debug( - "getNewApplication try #" + i + " on SubCluster " + subClusterId); + LOG.debug("getNewApplication try #{} on SubCluster {}", i, subClusterId); DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(subClusterId, @@ -282,11 +291,12 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { try { response = interceptor.createNewApplication(hsr); } catch (Exception e) { - LOG.warn("Unable to create a new ApplicationId in SubCluster " - + subClusterId.getId(), e); + LOG.warn("Unable to create a new ApplicationId in SubCluster {}", + subClusterId.getId(), e); } - if (response != null && response.getStatus() == 200) { + if (response != null && + response.getStatus() == HttpServletResponse.SC_OK) { long stopTime = clock.getTime(); routerMetrics.succeededAppsCreated(stopTime - startTime); @@ -302,7 +312,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { String errMsg = "Fail to create a new application."; LOG.error(errMsg); routerMetrics.incrAppsFailedCreated(); - return Response.status(Status.INTERNAL_SERVER_ERROR).entity(errMsg).build(); + return Response + .status(Status.INTERNAL_SERVER_ERROR) + .entity(errMsg) + .build(); } /** @@ -381,7 +394,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { routerMetrics.incrAppsFailedSubmitted(); String errMsg = "Missing ApplicationSubmissionContextInfo or " + "applicationSubmissionContex information."; - return Response.status(Status.BAD_REQUEST).entity(errMsg).build(); + return Response + .status(Status.BAD_REQUEST) + .entity(errMsg) + .build(); } ApplicationId applicationId = null; @@ -389,7 +405,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { applicationId = ApplicationId.fromString(newApp.getApplicationId()); } catch (IllegalArgumentException e) { routerMetrics.incrAppsFailedSubmitted(); - return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage()) + return Response + .status(Status.BAD_REQUEST) + .entity(e.getLocalizedMessage()) .build(); } @@ -405,11 +423,13 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { subClusterId = policyFacade.getHomeSubcluster(context, blacklist); } catch (YarnException e) { routerMetrics.incrAppsFailedSubmitted(); - return Response.status(Status.SERVICE_UNAVAILABLE) - .entity(e.getLocalizedMessage()).build(); + return Response + .status(Status.SERVICE_UNAVAILABLE) + .entity(e.getLocalizedMessage()) + .build(); } - LOG.info("submitApplication appId" + applicationId + " try #" + i - + " on SubCluster " + subClusterId); + LOG.info("submitApplication appId {} try #{} on SubCluster {}", + applicationId, i, subClusterId); ApplicationHomeSubCluster appHomeSubCluster = ApplicationHomeSubCluster.newInstance(applicationId, subClusterId); @@ -424,8 +444,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { routerMetrics.incrAppsFailedSubmitted(); String errMsg = "Unable to insert the ApplicationId " + applicationId + " into the FederationStateStore"; - return Response.status(Status.SERVICE_UNAVAILABLE) - .entity(errMsg + " " + e.getLocalizedMessage()).build(); + return Response + .status(Status.SERVICE_UNAVAILABLE) + .entity(errMsg + " " + e.getLocalizedMessage()) + .build(); } } else { try { @@ -441,15 +463,19 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { federationFacade.getApplicationHomeSubCluster(applicationId); } catch (YarnException e1) { routerMetrics.incrAppsFailedSubmitted(); - return Response.status(Status.SERVICE_UNAVAILABLE) - .entity(e1.getLocalizedMessage()).build(); + return Response + .status(Status.SERVICE_UNAVAILABLE) + .entity(e1.getLocalizedMessage()) + .build(); } if (subClusterId == subClusterIdInStateStore) { - LOG.info("Application " + applicationId - + " already submitted on SubCluster " + subClusterId); + LOG.info("Application {} already submitted on SubCluster {}", + applicationId, subClusterId); } else { routerMetrics.incrAppsFailedSubmitted(); - return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg) + return Response + .status(Status.SERVICE_UNAVAILABLE) + .entity(errMsg) .build(); } } @@ -460,8 +486,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { subClusterInfo = federationFacade.getSubCluster(subClusterId); } catch (YarnException e) { routerMetrics.incrAppsFailedSubmitted(); - return Response.status(Status.SERVICE_UNAVAILABLE) - .entity(e.getLocalizedMessage()).build(); + return Response + .status(Status.SERVICE_UNAVAILABLE) + .entity(e.getLocalizedMessage()) + .build(); } Response response = null; @@ -470,13 +498,14 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { subClusterInfo.getRMWebServiceAddress()).submitApplication(newApp, hsr); } catch (Exception e) { - LOG.warn("Unable to submit the application " + applicationId - + "to SubCluster " + subClusterId.getId(), e); + LOG.warn("Unable to submit the application {} to SubCluster {}", + applicationId, subClusterId.getId(), e); } - if (response != null && response.getStatus() == 202) { - LOG.info("Application " + context.getApplicationName() + " with appId " - + applicationId + " submitted on " + subClusterId); + if (response != null && + response.getStatus() == HttpServletResponse.SC_ACCEPTED) { + LOG.info("Application {} with appId {} submitted on {}", + context.getApplicationName(), applicationId, subClusterId); long stopTime = clock.getTime(); routerMetrics.succeededAppsSubmitted(stopTime - startTime); @@ -493,7 +522,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { String errMsg = "Application " + newApp.getApplicationName() + " with appId " + applicationId + " failed to be submitted."; LOG.error(errMsg); - return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg).build(); + return Response + .status(Status.SERVICE_UNAVAILABLE) + .entity(errMsg) + .build(); } /** @@ -541,9 +573,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { return null; } - AppInfo response = getOrCreateInterceptorForSubCluster(subClusterId, - subClusterInfo.getRMWebServiceAddress()).getApp(hsr, appId, - unselectedFields); + DefaultRequestInterceptorREST interceptor = + getOrCreateInterceptorForSubCluster( + subClusterId, subClusterInfo.getRMWebServiceAddress()); + AppInfo response = interceptor.getApp(hsr, appId, unselectedFields); long stopTime = clock.getTime(); routerMetrics.succeededAppsRetrieved(stopTime - startTime); @@ -579,7 +612,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { applicationId = ApplicationId.fromString(appId); } catch (IllegalArgumentException e) { routerMetrics.incrAppsFailedKilled(); - return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage()) + return Response + .status(Status.BAD_REQUEST) + .entity(e.getLocalizedMessage()) .build(); } @@ -591,7 +626,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { subClusterInfo = federationFacade.getSubCluster(subClusterId); } catch (YarnException e) { routerMetrics.incrAppsFailedKilled(); - return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage()) + return Response + .status(Status.BAD_REQUEST) + .entity(e.getLocalizedMessage()) .build(); } @@ -642,26 +679,28 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { } // Send the requests in parallel - - ExecutorCompletionService compSvc = - new ExecutorCompletionService(this.threadpool); + CompletionService compSvc = + new ExecutorCompletionService<>(this.threadpool); for (final SubClusterInfo info : subClustersActive.values()) { + // HttpServletRequest does not work with ExecutorCompletionService. + // Create a duplicate hsr. + final HttpServletRequest hsrCopy = clone(hsr); compSvc.submit(new Callable() { @Override public AppsInfo call() { DefaultRequestInterceptorREST interceptor = - getOrCreateInterceptorForSubCluster(info.getSubClusterId(), - info.getClientRMServiceAddress()); - AppsInfo rmApps = interceptor.getApps(hsr, stateQuery, statesQuery, - finalStatusQuery, userQuery, queueQuery, count, startedBegin, - startedEnd, finishBegin, finishEnd, applicationTypes, - applicationTags, unselectedFields); + getOrCreateInterceptorForSubCluster( + info.getSubClusterId(), info.getRMWebServiceAddress()); + AppsInfo rmApps = interceptor.getApps(hsrCopy, stateQuery, + statesQuery, finalStatusQuery, userQuery, queueQuery, count, + startedBegin, startedEnd, finishBegin, finishEnd, + applicationTypes, applicationTags, unselectedFields); if (rmApps == null) { routerMetrics.incrMultipleAppsFailedRetrieved(); - LOG.error("Subcluster " + info.getSubClusterId() - + " failed to return appReport."); + LOG.error("Subcluster {} failed to return appReport.", + info.getSubClusterId()); return null; } return rmApps; @@ -670,8 +709,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { } // Collect all the responses in parallel - - for (int i = 0; i < subClustersActive.values().size(); i++) { + for (int i = 0; i < subClustersActive.size(); i++) { try { Future future = compSvc.take(); AppsInfo appsResponse = future.get(); @@ -684,7 +722,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { } } catch (Throwable e) { routerMetrics.incrMultipleAppsFailedRetrieved(); - LOG.warn("Failed to get application report ", e); + LOG.warn("Failed to get application report", e); } } @@ -693,9 +731,41 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { } // Merge all the application reports got from all the available YARN RMs + return RouterWebServiceUtil.mergeAppsInfo( + apps.getApps(), returnPartialReport); + } - return RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), - returnPartialReport); + /** + * Get a copy of a HTTP request. This is for thread safety. + * @param hsr HTTP servlet request to copy. + * @return Copy of the HTTP request. + */ + private HttpServletRequestWrapper clone(final HttpServletRequest hsr) { + if (hsr == null) { + return null; + } + return new HttpServletRequestWrapper(hsr) { + public Map getParameterMap() { + return hsr.getParameterMap(); + } + public String getPathInfo() { + return hsr.getPathInfo(); + } + public String getRemoteUser() { + return hsr.getRemoteUser(); + } + public Principal getUserPrincipal() { + return hsr.getUserPrincipal(); + } + public String getHeader(String value) { + // we override only Accept + if (value.equals(HttpHeaders.ACCEPT)) { + return RouterWebServiceUtil.getMediaTypeFromHttpServletRequest( + hsr, AppsInfo.class); + } + return null; + } + }; } /** @@ -729,8 +799,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { } // Send the requests in parallel - - ExecutorCompletionService compSvc = + CompletionService compSvc = new ExecutorCompletionService(this.threadpool); for (final SubClusterInfo info : subClustersActive.values()) { @@ -738,14 +807,14 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { @Override public NodeInfo call() { DefaultRequestInterceptorREST interceptor = - getOrCreateInterceptorForSubCluster(info.getSubClusterId(), - info.getClientRMServiceAddress()); + getOrCreateInterceptorForSubCluster( + info.getSubClusterId(), info.getRMWebServiceAddress()); try { NodeInfo nodeInfo = interceptor.getNode(nodeId); return nodeInfo; } catch (Exception e) { - LOG.error("Subcluster " + info.getSubClusterId() - + " failed to return nodeInfo."); + LOG.error("Subcluster {} failed to return nodeInfo.", + info.getSubClusterId()); return null; } } @@ -754,7 +823,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { // Collect all the responses in parallel NodeInfo nodeInfo = null; - for (int i = 0; i < subClustersActive.values().size(); i++) { + for (int i = 0; i < subClustersActive.size(); i++) { try { Future future = compSvc.take(); NodeInfo nodeResponse = future.get(); @@ -763,8 +832,8 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { if (nodeResponse != null) { // Check if the node was already found in a different SubCluster and // it has an old health report - if (nodeInfo == null || nodeInfo.getLastHealthUpdate() < nodeResponse - .getLastHealthUpdate()) { + if (nodeInfo == null || nodeInfo.getLastHealthUpdate() < + nodeResponse.getLastHealthUpdate()) { nodeInfo = nodeResponse; } } @@ -806,13 +875,12 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { try { subClustersActive = federationFacade.getSubClusters(true); } catch (YarnException e) { - LOG.error(e.getMessage()); + LOG.error("Cannot get nodes: {}", e.getMessage()); return new NodesInfo(); } // Send the requests in parallel - - ExecutorCompletionService compSvc = + CompletionService compSvc = new ExecutorCompletionService(this.threadpool); for (final SubClusterInfo info : subClustersActive.values()) { @@ -820,14 +888,14 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { @Override public NodesInfo call() { DefaultRequestInterceptorREST interceptor = - getOrCreateInterceptorForSubCluster(info.getSubClusterId(), - info.getClientRMServiceAddress()); + getOrCreateInterceptorForSubCluster( + info.getSubClusterId(), info.getRMWebServiceAddress()); try { NodesInfo nodesInfo = interceptor.getNodes(states); return nodesInfo; } catch (Exception e) { - LOG.error("Subcluster " + info.getSubClusterId() - + " failed to return nodesInfo."); + LOG.error("Subcluster {} failed to return nodesInfo.", + info.getSubClusterId()); return null; } } @@ -836,7 +904,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { // Collect all the responses in parallel - for (int i = 0; i < subClustersActive.values().size(); i++) { + for (int i = 0; i < subClustersActive.size(); i++) { try { Future future = compSvc.take(); NodesInfo nodesResponse = future.get(); @@ -870,8 +938,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { } // Send the requests in parallel - - ExecutorCompletionService compSvc = + CompletionService compSvc = new ExecutorCompletionService(this.threadpool); for (final SubClusterInfo info : subClustersActive.values()) { @@ -879,14 +946,14 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { @Override public ClusterMetricsInfo call() { DefaultRequestInterceptorREST interceptor = - getOrCreateInterceptorForSubCluster(info.getSubClusterId(), - info.getClientRMServiceAddress()); + getOrCreateInterceptorForSubCluster( + info.getSubClusterId(), info.getRMWebServiceAddress()); try { ClusterMetricsInfo metrics = interceptor.getClusterMetricsInfo(); return metrics; } catch (Exception e) { - LOG.error("Subcluster " + info.getSubClusterId() - + " failed to return Cluster Metrics."); + LOG.error("Subcluster {} failed to return Cluster Metrics.", + info.getSubClusterId()); return null; } } @@ -895,7 +962,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { // Collect all the responses in parallel - for (int i = 0; i < subClustersActive.values().size(); i++) { + for (int i = 0; i < subClustersActive.size(); i++) { try { Future future = compSvc.take(); ClusterMetricsInfo metricsResponse = future.get(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/8be57070/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java index 76435f0..40bdbd8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.router.webapp; +import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT; +import static javax.servlet.http.HttpServletResponse.SC_OK; + import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; @@ -28,6 +31,7 @@ import java.util.Map; import java.util.Map.Entry; import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; @@ -45,6 +49,8 @@ import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.ForbiddenException; import org.apache.hadoop.yarn.webapp.NotFoundException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.sun.jersey.api.ConflictException; import com.sun.jersey.api.client.Client; @@ -52,8 +58,6 @@ import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.WebResource; import com.sun.jersey.api.client.WebResource.Builder; import com.sun.jersey.core.util.MultivaluedMapImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * The Router webservice util class. @@ -85,9 +89,11 @@ public final class RouterWebServiceUtil { * call in case the call has no servlet request * @return the retrieved entity from the REST call */ - protected static T genericForward(String webApp, HttpServletRequest hsr, - final Class returnType, HTTPMethods method, String targetPath, - Object formParam, Map additionalParam) { + protected static T genericForward( + final String webApp, final HttpServletRequest hsr, + final Class returnType, final HTTPMethods method, + final String targetPath, final Object formParam, + final Map additionalParam) { UserGroupInformation callerUGI = null; @@ -121,14 +127,22 @@ public final class RouterWebServiceUtil { ClientResponse response = RouterWebServiceUtil.invokeRMWebService( webApp, targetPath, method, - (hsr == null) ? null : hsr.getPathInfo(), paramMap, formParam); + (hsr == null) ? null : hsr.getPathInfo(), paramMap, formParam, + getMediaTypeFromHttpServletRequest(hsr, returnType)); if (Response.class.equals(returnType)) { return (T) RouterWebServiceUtil.clientResponseToResponse(response); } // YARN RM can answer with Status.OK or it throws an exception - if (response.getStatus() == 200) { + if (response.getStatus() == SC_OK) { return response.getEntity(returnType); } + if (response.getStatus() == SC_NO_CONTENT) { + try { + return returnType.getConstructor().newInstance(); + } catch (RuntimeException | ReflectiveOperationException e) { + LOG.error("Cannot create empty entity for {}", returnType, e); + } + } RouterWebServiceUtil.retrieveException(response); return null; } @@ -147,7 +161,7 @@ public final class RouterWebServiceUtil { */ private static ClientResponse invokeRMWebService(String webApp, String path, HTTPMethods method, String additionalPath, - Map queryParams, Object formParam) { + Map queryParams, Object formParam, String mediaType) { Client client = Client.create(); WebResource webResource = client.resource(webApp).path(path); @@ -168,14 +182,12 @@ public final class RouterWebServiceUtil { webResource = webResource.queryParams(paramMap); } - // I can forward the call in JSON or XML since the Router will convert it - // again in Object before send it back to the client Builder builder = null; if (formParam != null) { - builder = webResource.entity(formParam, MediaType.APPLICATION_XML); - builder = builder.accept(MediaType.APPLICATION_XML); + builder = webResource.entity(formParam, mediaType); + builder = builder.accept(mediaType); } else { - builder = webResource.accept(MediaType.APPLICATION_XML); + builder = webResource.accept(mediaType); } ClientResponse response = null; @@ -428,4 +440,25 @@ public final class RouterWebServiceUtil { + metricsResponse.getShutdownNodes()); } + /** + * Extract from HttpServletRequest the MediaType in output. + */ + protected static String getMediaTypeFromHttpServletRequest( + HttpServletRequest request, final Class returnType) { + if (request == null) { + // By default we return XML for REST call without HttpServletRequest + return MediaType.APPLICATION_XML; + } + // TODO + if (!returnType.equals(Response.class)) { + return MediaType.APPLICATION_XML; + } + String header = request.getHeader(HttpHeaders.ACCEPT); + if (header == null || header.equals("*")) { + // By default we return JSON + return MediaType.APPLICATION_JSON; + } + return header; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8be57070/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java index bbb83268..b327252 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java @@ -158,12 +158,19 @@ public class RouterWebServices implements RMWebServiceProtocol { } @VisibleForTesting - protected RequestInterceptorChainWrapper getInterceptorChain() { + protected RequestInterceptorChainWrapper getInterceptorChain( + final HttpServletRequest hsr) { String user = ""; + if (hsr != null) { + user = hsr.getRemoteUser(); + } try { - user = UserGroupInformation.getCurrentUser().getUserName(); + if (user == null || user.equals("")) { + // Yarn Router user + user = UserGroupInformation.getCurrentUser().getUserName(); + } } catch (IOException e) { - LOG.error("IOException " + e.getMessage()); + LOG.error("Cannot get user: {}", e.getMessage()); } if (!userPipelineMap.containsKey(user)) { initializePipeline(user); @@ -316,7 +323,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Override public ClusterInfo getClusterInfo() { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); return pipeline.getRootInterceptor().getClusterInfo(); } @@ -327,7 +334,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Override public ClusterMetricsInfo getClusterMetricsInfo() { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); return pipeline.getRootInterceptor().getClusterMetricsInfo(); } @@ -338,7 +345,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Override public SchedulerTypeInfo getSchedulerInfo() { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); return pipeline.getRootInterceptor().getSchedulerInfo(); } @@ -350,7 +357,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public String dumpSchedulerLogs(@FormParam(RMWSConsts.TIME) String time, @Context HttpServletRequest hsr) throws IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().dumpSchedulerLogs(time, hsr); } @@ -361,7 +368,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Override public NodesInfo getNodes(@QueryParam(RMWSConsts.STATES) String states) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); return pipeline.getRootInterceptor().getNodes(states); } @@ -372,7 +379,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Override public NodeInfo getNode(@PathParam(RMWSConsts.NODEID) String nodeId) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); return pipeline.getRootInterceptor().getNode(nodeId); } @@ -396,7 +403,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @QueryParam(RMWSConsts.APPLICATION_TAGS) Set applicationTags, @QueryParam(RMWSConsts.DESELECTS) Set unselectedFields) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getApps(hsr, stateQuery, statesQuery, finalStatusQuery, userQuery, queueQuery, count, startedBegin, startedEnd, finishBegin, finishEnd, applicationTypes, applicationTags, @@ -411,7 +418,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public ActivitiesInfo getActivities(@Context HttpServletRequest hsr, @QueryParam(RMWSConsts.NODEID) String nodeId) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getActivities(hsr, nodeId); } @@ -424,7 +431,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @QueryParam(RMWSConsts.APP_ID) String appId, @QueryParam(RMWSConsts.MAX_TIME) String time) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppActivities(hsr, appId, time); } @@ -438,7 +445,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @QueryParam(RMWSConsts.STATES) Set stateQueries, @QueryParam(RMWSConsts.APPLICATION_TYPES) Set typeQueries) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppStatistics(hsr, stateQueries, typeQueries); } @@ -452,7 +459,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @PathParam(RMWSConsts.APPID) String appId, @QueryParam(RMWSConsts.DESELECTS) Set unselectedFields) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getApp(hsr, appId, unselectedFields); } @@ -464,7 +471,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public AppState getAppState(@Context HttpServletRequest hsr, @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppState(hsr, appId); } @@ -478,7 +485,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException, YarnException, InterruptedException, IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().updateAppState(targetState, hsr, appId); } @@ -491,7 +498,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public NodeToLabelsInfo getNodeToLabels(@Context HttpServletRequest hsr) throws IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getNodeToLabels(hsr); } @@ -503,7 +510,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public LabelsToNodesInfo getLabelsToNodes( @QueryParam(RMWSConsts.LABELS) Set labels) throws IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); return pipeline.getRootInterceptor().getLabelsToNodes(labels); } @@ -516,7 +523,7 @@ public class RouterWebServices implements RMWebServiceProtocol { final NodeToLabelsEntryList newNodeToLabels, @Context HttpServletRequest hsr) throws Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().replaceLabelsOnNodes(newNodeToLabels, hsr); } @@ -531,7 +538,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Context HttpServletRequest hsr, @PathParam(RMWSConsts.NODEID) String nodeId) throws Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().replaceLabelsOnNode(newNodeLabelsName, hsr, nodeId); } @@ -544,7 +551,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public NodeLabelsInfo getClusterNodeLabels(@Context HttpServletRequest hsr) throws IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getClusterNodeLabels(hsr); } @@ -556,7 +563,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public Response addToClusterNodeLabels(NodeLabelsInfo newNodeLabels, @Context HttpServletRequest hsr) throws Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().addToClusterNodeLabels(newNodeLabels, hsr); } @@ -570,7 +577,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @QueryParam(RMWSConsts.LABELS) Set oldNodeLabels, @Context HttpServletRequest hsr) throws Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor() .removeFromCluserNodeLabels(oldNodeLabels, hsr); } @@ -583,7 +590,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public NodeLabelsInfo getLabelsOnNode(@Context HttpServletRequest hsr, @PathParam(RMWSConsts.NODEID) String nodeId) throws IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getLabelsOnNode(hsr, nodeId); } @@ -595,7 +602,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public AppPriority getAppPriority(@Context HttpServletRequest hsr, @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppPriority(hsr, appId); } @@ -609,7 +616,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException, YarnException, InterruptedException, IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor() .updateApplicationPriority(targetPriority, hsr, appId); } @@ -622,7 +629,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public AppQueue getAppQueue(@Context HttpServletRequest hsr, @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppQueue(hsr, appId); } @@ -636,7 +643,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException, YarnException, InterruptedException, IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().updateAppQueue(targetQueue, hsr, appId); } @@ -649,7 +656,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public Response createNewApplication(@Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().createNewApplication(hsr); } @@ -662,7 +669,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().submitApplication(newApp, hsr); } @@ -675,7 +682,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException, Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().postDelegationToken(tokenData, hsr); } @@ -687,7 +694,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public Response postDelegationTokenExpiration(@Context HttpServletRequest hsr) throws AuthorizationException, IOException, Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().postDelegationTokenExpiration(hsr); } @@ -700,7 +707,7 @@ public class RouterWebServices implements RMWebServiceProtocol { throws AuthorizationException, IOException, InterruptedException, Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().cancelDelegationToken(hsr); } @@ -712,7 +719,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public Response createNewReservation(@Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().createNewReservation(hsr); } @@ -725,7 +732,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().submitReservation(resContext, hsr); } @@ -738,7 +745,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().updateReservation(resContext, hsr); } @@ -751,7 +758,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().deleteReservation(resContext, hsr); } @@ -768,7 +775,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @QueryParam(RMWSConsts.INCLUDE_RESOURCE) @DefaultValue(DEFAULT_INCLUDE_RESOURCE) boolean includeResourceAllocations, @Context HttpServletRequest hsr) throws Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().listReservation(queue, reservationId, startTime, endTime, includeResourceAllocations, hsr); } @@ -782,7 +789,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @PathParam(RMWSConsts.APPID) String appId, @PathParam(RMWSConsts.TYPE) String type) throws AuthorizationException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppTimeout(hsr, appId, type); } @@ -794,7 +801,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public AppTimeoutsInfo getAppTimeouts(@Context HttpServletRequest hsr, @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppTimeouts(hsr, appId); } @@ -808,7 +815,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException, YarnException, InterruptedException, IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().updateApplicationTimeout(appTimeout, hsr, appId); } @@ -821,7 +828,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public AppAttemptsInfo getAppAttempts(@Context HttpServletRequest hsr, @PathParam(RMWSConsts.APPID) String appId) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppAttempts(hsr, appId); } @@ -834,7 +841,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @PathParam(RMWSConsts.APPID) String appId, @PathParam(RMWSConsts.APPATTEMPTID) String appAttemptId) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(req); return pipeline.getRootInterceptor().getAppAttempt(req, res, appId, appAttemptId); } @@ -848,7 +855,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @PathParam(RMWSConsts.APPID) String appId, @PathParam(RMWSConsts.APPATTEMPTID) String appAttemptId) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(req); return pipeline.getRootInterceptor().getContainers(req, res, appId, appAttemptId); } @@ -863,7 +870,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @PathParam(RMWSConsts.APPATTEMPTID) String appAttemptId, @PathParam(RMWSConsts.CONTAINERID) String containerId) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(req); return pipeline.getRootInterceptor().getContainer(req, res, appId, appAttemptId, containerId); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8be57070/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java index 7d42084..9480850 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java @@ -20,15 +20,15 @@ package org.apache.hadoop.yarn.server.router.webapp; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import java.io.IOException; -import java.security.PrivilegedExceptionAction; +import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.core.Response; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -128,487 +128,263 @@ public abstract class BaseRouterWebServicesTest { protected ClusterInfo get(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public ClusterInfo run() throws Exception { - return routerWebService.get(); - } - }); + // HSR is not used here + return routerWebService.get(); } protected ClusterInfo getClusterInfo(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public ClusterInfo run() throws Exception { - return routerWebService.getClusterInfo(); - } - }); + // HSR is not used here + return routerWebService.getClusterInfo(); } protected ClusterMetricsInfo getClusterMetricsInfo(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public ClusterMetricsInfo run() throws Exception { - return routerWebService.getClusterMetricsInfo(); - } - }); + // HSR is not used here + return routerWebService.getClusterMetricsInfo(); } protected SchedulerTypeInfo getSchedulerInfo(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public SchedulerTypeInfo run() throws Exception { - return routerWebService.getSchedulerInfo(); - } - }); + // HSR is not used here + return routerWebService.getSchedulerInfo(); } protected String dumpSchedulerLogs(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public String run() throws Exception { - return routerWebService.dumpSchedulerLogs(null, null); - } - }); + return routerWebService.dumpSchedulerLogs(null, + createHttpServletRequest(user)); } protected NodesInfo getNodes(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public NodesInfo run() throws Exception { - return routerWebService.getNodes(null); - } - }); + return routerWebService.getNodes(null); } protected NodeInfo getNode(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public NodeInfo run() throws Exception { - return routerWebService.getNode(null); - } - }); + return routerWebService.getNode(null); } protected AppsInfo getApps(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppsInfo run() throws Exception { - return routerWebService.getApps(null, null, null, null, null, null, - null, null, null, null, null, null, null, null); - } - }); + return routerWebService.getApps(createHttpServletRequest(user), null, null, + null, null, null, null, null, null, null, null, null, null, null); } protected ActivitiesInfo getActivities(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public ActivitiesInfo run() throws Exception { - return routerWebService.getActivities(null, null); - } - }); + return routerWebService.getActivities( + createHttpServletRequest(user), null); } protected AppActivitiesInfo getAppActivities(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppActivitiesInfo run() throws Exception { - return routerWebService.getAppActivities(null, null, null); - } - }); + return routerWebService.getAppActivities( + createHttpServletRequest(user), null, null); } protected ApplicationStatisticsInfo getAppStatistics(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public ApplicationStatisticsInfo run() throws Exception { - return routerWebService.getAppStatistics(null, null, null); - } - }); + return routerWebService.getAppStatistics( + createHttpServletRequest(user), null, null); } protected AppInfo getApp(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppInfo run() throws Exception { - return routerWebService.getApp(null, null, null); - } - }); + return routerWebService.getApp(createHttpServletRequest(user), null, null); } protected AppState getAppState(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppState run() throws Exception { - return routerWebService.getAppState(null, null); - } - }); + return routerWebService.getAppState(createHttpServletRequest(user), null); } protected Response updateAppState(String user) throws AuthorizationException, YarnException, InterruptedException, IOException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.updateAppState(null, null, null); - } - }); + return routerWebService.updateAppState( + null, createHttpServletRequest(user), null); } protected NodeToLabelsInfo getNodeToLabels(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public NodeToLabelsInfo run() throws Exception { - return routerWebService.getNodeToLabels(null); - } - }); + return routerWebService.getNodeToLabels(createHttpServletRequest(user)); } protected LabelsToNodesInfo getLabelsToNodes(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public LabelsToNodesInfo run() throws Exception { - return routerWebService.getLabelsToNodes(null); - } - }); + return routerWebService.getLabelsToNodes(null); } protected Response replaceLabelsOnNodes(String user) throws Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.replaceLabelsOnNodes(null, null); - } - }); + return routerWebService.replaceLabelsOnNodes( + null, createHttpServletRequest(user)); } protected Response replaceLabelsOnNode(String user) throws Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.replaceLabelsOnNode(null, null, null); - } - }); + return routerWebService.replaceLabelsOnNode( + null, createHttpServletRequest(user), null); } protected NodeLabelsInfo getClusterNodeLabels(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public NodeLabelsInfo run() throws Exception { - return routerWebService.getClusterNodeLabels(null); - } - }); + return routerWebService.getClusterNodeLabels( + createHttpServletRequest(user)); } protected Response addToClusterNodeLabels(String user) throws Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.addToClusterNodeLabels(null, null); - } - }); + return routerWebService.addToClusterNodeLabels( + null, createHttpServletRequest(user)); } protected Response removeFromCluserNodeLabels(String user) throws Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.removeFromCluserNodeLabels(null, null); - } - }); + return routerWebService.removeFromCluserNodeLabels( + null, createHttpServletRequest(user)); } protected NodeLabelsInfo getLabelsOnNode(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public NodeLabelsInfo run() throws Exception { - return routerWebService.getLabelsOnNode(null, null); - } - }); + return routerWebService.getLabelsOnNode( + createHttpServletRequest(user), null); } protected AppPriority getAppPriority(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppPriority run() throws Exception { - return routerWebService.getAppPriority(null, null); - } - }); + return routerWebService.getAppPriority( + createHttpServletRequest(user), null); } protected Response updateApplicationPriority(String user) throws AuthorizationException, YarnException, InterruptedException, IOException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.updateApplicationPriority(null, null, null); - } - }); + return routerWebService.updateApplicationPriority( + null, createHttpServletRequest(user), null); } protected AppQueue getAppQueue(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppQueue run() throws Exception { - return routerWebService.getAppQueue(null, null); - } - }); + return routerWebService.getAppQueue(createHttpServletRequest(user), null); } protected Response updateAppQueue(String user) throws AuthorizationException, YarnException, InterruptedException, IOException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.updateAppQueue(null, null, null); - } - }); + return routerWebService.updateAppQueue( + null, createHttpServletRequest(user), null); } protected Response createNewApplication(String user) throws AuthorizationException, IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.createNewApplication(null); - } - }); + return routerWebService.createNewApplication( + createHttpServletRequest(user)); } protected Response submitApplication(String user) throws AuthorizationException, IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.submitApplication(null, null); - } - }); + return routerWebService.submitApplication( + null, createHttpServletRequest(user)); } protected Response postDelegationToken(String user) throws AuthorizationException, IOException, InterruptedException, Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.postDelegationToken(null, null); - } - }); + return routerWebService.postDelegationToken( + null, createHttpServletRequest(user)); } protected Response postDelegationTokenExpiration(String user) throws AuthorizationException, IOException, Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.postDelegationTokenExpiration(null); - } - }); + return routerWebService.postDelegationTokenExpiration( + createHttpServletRequest(user)); } protected Response cancelDelegationToken(String user) throws AuthorizationException, IOException, InterruptedException, Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.cancelDelegationToken(null); - } - }); + return routerWebService.cancelDelegationToken( + createHttpServletRequest(user)); } protected Response createNewReservation(String user) throws AuthorizationException, IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.createNewReservation(null); - } - }); + return routerWebService.createNewReservation( + createHttpServletRequest(user)); } protected Response submitReservation(String user) throws AuthorizationException, IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.submitReservation(null, null); - } - }); + return routerWebService.submitReservation( + null, createHttpServletRequest(user)); } protected Response updateReservation(String user) throws AuthorizationException, IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.updateReservation(null, null); - } - }); + return routerWebService.updateReservation( + null, createHttpServletRequest(user)); } protected Response deleteReservation(String user) throws AuthorizationException, IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.deleteReservation(null, null); - } - }); + return routerWebService.deleteReservation( + null, createHttpServletRequest(user)); } protected Response listReservation(String user) throws Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.listReservation(null, null, 0, 0, false, - null); - } - }); + return routerWebService.listReservation( + null, null, 0, 0, false, createHttpServletRequest(user)); } protected AppTimeoutInfo getAppTimeout(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppTimeoutInfo run() throws Exception { - return routerWebService.getAppTimeout(null, null, null); - } - }); + return routerWebService.getAppTimeout( + createHttpServletRequest(user), null, null); } protected AppTimeoutsInfo getAppTimeouts(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppTimeoutsInfo run() throws Exception { - return routerWebService.getAppTimeouts(null, null); - } - }); + return routerWebService.getAppTimeouts( + createHttpServletRequest(user), null); } protected Response updateApplicationTimeout(String user) throws AuthorizationException, YarnException, InterruptedException, IOException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.updateApplicationTimeout(null, null, null); - } - }); + return routerWebService.updateApplicationTimeout( + null, createHttpServletRequest(user), null); } protected AppAttemptsInfo getAppAttempts(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppAttemptsInfo run() throws Exception { - return routerWebService.getAppAttempts(null, null); - } - }); + return routerWebService.getAppAttempts( + createHttpServletRequest(user), null); } protected AppAttemptInfo getAppAttempt(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppAttemptInfo run() throws Exception { - return routerWebService.getAppAttempt(null, null, null, null); - } - }); + return routerWebService.getAppAttempt( + createHttpServletRequest(user), null, null, null); } protected ContainersInfo getContainers(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public ContainersInfo run() throws Exception { - return routerWebService.getContainers(null, null, null, null); - } - }); + return routerWebService.getContainers( + createHttpServletRequest(user), null, null, null); } protected ContainerInfo getContainer(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public ContainerInfo run() throws Exception { - return routerWebService.getContainer(null, null, null, null, null); - } - }); + return routerWebService.getContainer( + createHttpServletRequest(user), null, null, null, null); } protected RequestInterceptorChainWrapper getInterceptorChain(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public RequestInterceptorChainWrapper run() throws Exception { - return routerWebService.getInterceptorChain(); - } - }); + HttpServletRequest request = createHttpServletRequest(user); + return routerWebService.getInterceptorChain(request); } + private HttpServletRequest createHttpServletRequest(String user) { + HttpServletRequest request = mock(HttpServletRequest.class); + when(request.getRemoteUser()).thenReturn(user); + return request; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8be57070/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/JavaProcess.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/JavaProcess.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/JavaProcess.java index d32013f..6c0938c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/JavaProcess.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/JavaProcess.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.router.webapp; import java.io.File; import java.io.IOException; +import java.util.List; /** * Helper class to start a new process. @@ -28,13 +29,23 @@ public class JavaProcess { private Process process = null; - public JavaProcess(Class klass) throws IOException, InterruptedException { + public JavaProcess(Class clazz) throws IOException, InterruptedException { + this(clazz, null); + } + + public JavaProcess(Class clazz, List addClasspaths) + throws IOException, InterruptedException { String javaHome = System.getProperty("java.home"); String javaBin = javaHome + File.separator + "bin" + File.separator + "java"; String classpath = System.getProperty("java.class.path"); classpath = classpath.concat("./src/test/resources"); - String className = klass.getCanonicalName(); + if (addClasspaths != null) { + for (String addClasspath : addClasspaths) { + classpath = classpath.concat(File.pathSeparatorChar + addClasspath); + } + } + String className = clazz.getCanonicalName(); ProcessBuilder builder = new ProcessBuilder(javaBin, "-cp", classpath, className); builder.inheritIO(); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org