hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From su...@apache.org
Subject [44/50] [abbrv] hadoop git commit: YARN-5411. Create a proxy chain for ApplicationClientProtocol in the Router. (Giovanni Matteo Fumarola via Subru).
Date Mon, 08 May 2017 23:54:12 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f4191c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java
new file mode 100644
index 0000000..12b933b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrm;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Extends the AbstractRequestInterceptorClient class and provides an
+ * implementation that simply forwards the client requests to the cluster
+ * resource manager.
+ *
+ */
+public class DefaultClientRequestInterceptor
+    extends AbstractClientRequestInterceptor {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DefaultClientRequestInterceptor.class);
+  private ApplicationClientProtocol clientRMProxy;
+  private UserGroupInformation user = null;
+
+  @Override
+  public void init(String userName) {
+    super.init(userName);
+    try {
+      // Do not create a proxy user if user name matches the user name on
+      // current UGI
+      if (userName.equalsIgnoreCase(
+          UserGroupInformation.getCurrentUser().getUserName())) {
+        user = UserGroupInformation.getCurrentUser();
+      } else {
+        user = UserGroupInformation.createProxyUser(userName,
+            UserGroupInformation.getCurrentUser());
+      }
+
+      final Configuration conf = this.getConf();
+
+      clientRMProxy =
+          user.doAs(new PrivilegedExceptionAction<ApplicationClientProtocol>() {
+            @Override
+            public ApplicationClientProtocol run() throws Exception {
+              return ClientRMProxy.createRMProxy(conf,
+                  ApplicationClientProtocol.class);
+            }
+          });
+    } catch (IOException e) {
+      String message = "Error while creating Router ClientRM Service for user:";
+      if (user != null) {
+        message += ", user: " + user;
+      }
+
+      LOG.info(message);
+      throw new YarnRuntimeException(message, e);
+    } catch (Exception e) {
+      throw new YarnRuntimeException(e);
+    }
+  }
+
+  @Override
+  public void setNextInterceptor(ClientRequestInterceptor next) {
+    throw new YarnRuntimeException(
+        "setNextInterceptor is being called on DefaultRequestInterceptor,"
+            + "which should be the last one in the chain "
+            + "Check if the interceptor pipeline configuration is correct");
+  }
+
+  @Override
+  public GetNewApplicationResponse getNewApplication(
+      GetNewApplicationRequest request) throws YarnException, IOException {
+    return clientRMProxy.getNewApplication(request);
+  }
+
+  @Override
+  public SubmitApplicationResponse submitApplication(
+      SubmitApplicationRequest request) throws YarnException, IOException {
+    return clientRMProxy.submitApplication(request);
+  }
+
+  @Override
+  public KillApplicationResponse forceKillApplication(
+      KillApplicationRequest request) throws YarnException, IOException {
+    return clientRMProxy.forceKillApplication(request);
+  }
+
+  @Override
+  public GetClusterMetricsResponse getClusterMetrics(
+      GetClusterMetricsRequest request) throws YarnException, IOException {
+    return clientRMProxy.getClusterMetrics(request);
+  }
+
+  @Override
+  public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
+      throws YarnException, IOException {
+    return clientRMProxy.getClusterNodes(request);
+  }
+
+  @Override
+  public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
+      throws YarnException, IOException {
+    return clientRMProxy.getQueueInfo(request);
+  }
+
+  @Override
+  public GetQueueUserAclsInfoResponse getQueueUserAcls(
+      GetQueueUserAclsInfoRequest request) throws YarnException, IOException {
+    return clientRMProxy.getQueueUserAcls(request);
+  }
+
+  @Override
+  public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+      MoveApplicationAcrossQueuesRequest request)
+      throws YarnException, IOException {
+    return clientRMProxy.moveApplicationAcrossQueues(request);
+  }
+
+  @Override
+  public GetNewReservationResponse getNewReservation(
+      GetNewReservationRequest request) throws YarnException, IOException {
+    return clientRMProxy.getNewReservation(request);
+  }
+
+  @Override
+  public ReservationSubmissionResponse submitReservation(
+      ReservationSubmissionRequest request) throws YarnException, IOException {
+    return clientRMProxy.submitReservation(request);
+  }
+
+  @Override
+  public ReservationListResponse listReservations(
+      ReservationListRequest request) throws YarnException, IOException {
+    return clientRMProxy.listReservations(request);
+  }
+
+  @Override
+  public ReservationUpdateResponse updateReservation(
+      ReservationUpdateRequest request) throws YarnException, IOException {
+    return clientRMProxy.updateReservation(request);
+  }
+
+  @Override
+  public ReservationDeleteResponse deleteReservation(
+      ReservationDeleteRequest request) throws YarnException, IOException {
+    return clientRMProxy.deleteReservation(request);
+  }
+
+  @Override
+  public GetNodesToLabelsResponse getNodeToLabels(
+      GetNodesToLabelsRequest request) throws YarnException, IOException {
+    return clientRMProxy.getNodeToLabels(request);
+  }
+
+  @Override
+  public GetLabelsToNodesResponse getLabelsToNodes(
+      GetLabelsToNodesRequest request) throws YarnException, IOException {
+    return clientRMProxy.getLabelsToNodes(request);
+  }
+
+  @Override
+  public GetClusterNodeLabelsResponse getClusterNodeLabels(
+      GetClusterNodeLabelsRequest request) throws YarnException, IOException {
+    return clientRMProxy.getClusterNodeLabels(request);
+  }
+
+  @Override
+  public GetApplicationReportResponse getApplicationReport(
+      GetApplicationReportRequest request) throws YarnException, IOException {
+    return clientRMProxy.getApplicationReport(request);
+  }
+
+  @Override
+  public GetApplicationsResponse getApplications(GetApplicationsRequest request)
+      throws YarnException, IOException {
+    return clientRMProxy.getApplications(request);
+  }
+
+  @Override
+  public GetApplicationAttemptReportResponse getApplicationAttemptReport(
+      GetApplicationAttemptReportRequest request)
+      throws YarnException, IOException {
+    return clientRMProxy.getApplicationAttemptReport(request);
+  }
+
+  @Override
+  public GetApplicationAttemptsResponse getApplicationAttempts(
+      GetApplicationAttemptsRequest request) throws YarnException, IOException {
+    return clientRMProxy.getApplicationAttempts(request);
+  }
+
+  @Override
+  public GetContainerReportResponse getContainerReport(
+      GetContainerReportRequest request) throws YarnException, IOException {
+    return clientRMProxy.getContainerReport(request);
+  }
+
+  @Override
+  public GetContainersResponse getContainers(GetContainersRequest request)
+      throws YarnException, IOException {
+    return clientRMProxy.getContainers(request);
+  }
+
+  @Override
+  public GetDelegationTokenResponse getDelegationToken(
+      GetDelegationTokenRequest request) throws YarnException, IOException {
+    return clientRMProxy.getDelegationToken(request);
+  }
+
+  @Override
+  public RenewDelegationTokenResponse renewDelegationToken(
+      RenewDelegationTokenRequest request) throws YarnException, IOException {
+    return clientRMProxy.renewDelegationToken(request);
+  }
+
+  @Override
+  public CancelDelegationTokenResponse cancelDelegationToken(
+      CancelDelegationTokenRequest request) throws YarnException, IOException {
+    return clientRMProxy.cancelDelegationToken(request);
+  }
+
+  @Override
+  public FailApplicationAttemptResponse failApplicationAttempt(
+      FailApplicationAttemptRequest request) throws YarnException, IOException {
+    return clientRMProxy.failApplicationAttempt(request);
+  }
+
+  @Override
+  public UpdateApplicationPriorityResponse updateApplicationPriority(
+      UpdateApplicationPriorityRequest request)
+      throws YarnException, IOException {
+    return clientRMProxy.updateApplicationPriority(request);
+  }
+
+  @Override
+  public SignalContainerResponse signalToContainer(
+      SignalContainerRequest request) throws YarnException, IOException {
+    return clientRMProxy.signalToContainer(request);
+  }
+
+  @Override
+  public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
+      UpdateApplicationTimeoutsRequest request)
+      throws YarnException, IOException {
+    return clientRMProxy.updateApplicationTimeouts(request);
+  }
+
+  @VisibleForTesting
+  public void setRMClient(ApplicationClientProtocol clientRM) {
+    this.clientRMProxy = clientRM;
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f4191c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java
new file mode 100644
index 0000000..00016dd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java
@@ -0,0 +1,544 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrm;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.LRUCacheHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * RouterClientRMService is a service that runs on each router that can be used
+ * to intercept and inspect ApplicationClientProtocol messages from client to
+ * the cluster resource manager. It listens ApplicationClientProtocol messages
+ * from the client and creates a request intercepting pipeline instance for each
+ * client. The pipeline is a chain of intercepter instances that can inspect and
+ * modify the request/response as needed. The main difference with
+ * AMRMProxyService is the protocol they implement.
+ */
+public class RouterClientRMService extends AbstractService
+    implements ApplicationClientProtocol {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RouterClientRMService.class);
+
+  private Server server;
+  private InetSocketAddress listenerEndpoint;
+
+  // For each user we store an interceptors' pipeline.
+  // For performance issue we use LRU cache to keep in memory the newest ones
+  // and remove the oldest used ones.
+  private Map<String, RequestInterceptorChainWrapper> userPipelineMap;
+
+  public RouterClientRMService() {
+    super(RouterClientRMService.class.getName());
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    LOG.info("Starting Router ClientRMService");
+    Configuration conf = getConfig();
+    YarnRPC rpc = YarnRPC.create(conf);
+    UserGroupInformation.setConfiguration(conf);
+
+    this.listenerEndpoint =
+        conf.getSocketAddr(YarnConfiguration.ROUTER_CLIENTRM_ADDRESS,
+            YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_ADDRESS,
+            YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PORT);
+
+    int maxCacheSize =
+        conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE,
+            YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE);
+    this.userPipelineMap = Collections.synchronizedMap(
+        new LRUCacheHashMap<String, RequestInterceptorChainWrapper>(
+            maxCacheSize, true));
+
+    Configuration serverConf = new Configuration(conf);
+
+    int numWorkerThreads =
+        serverConf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT,
+            YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT);
+
+    this.server = rpc.getServer(ApplicationClientProtocol.class, this,
+        listenerEndpoint, serverConf, null, numWorkerThreads);
+
+    this.server.start();
+    LOG.info("Router ClientRMService listening on address: "
+        + this.server.getListenerAddress());
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    LOG.info("Stopping Router ClientRMService");
+    if (this.server != null) {
+      this.server.stop();
+    }
+    userPipelineMap.clear();
+    super.serviceStop();
+  }
+
+  /**
+   * Returns the comma separated intercepter class names from the configuration.
+   *
+   * @param conf
+   * @return the intercepter class names as an instance of ArrayList
+   */
+  private List<String> getInterceptorClassNames(Configuration conf) {
+    String configuredInterceptorClassNames =
+        conf.get(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
+            YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_INTERCEPTOR_CLASS);
+
+    List<String> interceptorClassNames = new ArrayList<String>();
+    Collection<String> tempList =
+        StringUtils.getStringCollection(configuredInterceptorClassNames);
+    for (String item : tempList) {
+      interceptorClassNames.add(item.trim());
+    }
+
+    return interceptorClassNames;
+  }
+
+  @Override
+  public GetNewApplicationResponse getNewApplication(
+      GetNewApplicationRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().getNewApplication(request);
+  }
+
+  @Override
+  public SubmitApplicationResponse submitApplication(
+      SubmitApplicationRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().submitApplication(request);
+  }
+
+  @Override
+  public KillApplicationResponse forceKillApplication(
+      KillApplicationRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().forceKillApplication(request);
+  }
+
+  @Override
+  public GetClusterMetricsResponse getClusterMetrics(
+      GetClusterMetricsRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().getClusterMetrics(request);
+  }
+
+  @Override
+  public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
+      throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().getClusterNodes(request);
+  }
+
+  @Override
+  public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
+      throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().getQueueInfo(request);
+  }
+
+  @Override
+  public GetQueueUserAclsInfoResponse getQueueUserAcls(
+      GetQueueUserAclsInfoRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().getQueueUserAcls(request);
+  }
+
+  @Override
+  public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+      MoveApplicationAcrossQueuesRequest request)
+      throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().moveApplicationAcrossQueues(request);
+  }
+
+  @Override
+  public GetNewReservationResponse getNewReservation(
+      GetNewReservationRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().getNewReservation(request);
+  }
+
+  @Override
+  public ReservationSubmissionResponse submitReservation(
+      ReservationSubmissionRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().submitReservation(request);
+  }
+
+  @Override
+  public ReservationListResponse listReservations(
+      ReservationListRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().listReservations(request);
+  }
+
+  @Override
+  public ReservationUpdateResponse updateReservation(
+      ReservationUpdateRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().updateReservation(request);
+  }
+
+  @Override
+  public ReservationDeleteResponse deleteReservation(
+      ReservationDeleteRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().deleteReservation(request);
+  }
+
+  @Override
+  public GetNodesToLabelsResponse getNodeToLabels(
+      GetNodesToLabelsRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().getNodeToLabels(request);
+  }
+
+  @Override
+  public GetLabelsToNodesResponse getLabelsToNodes(
+      GetLabelsToNodesRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().getLabelsToNodes(request);
+  }
+
+  @Override
+  public GetClusterNodeLabelsResponse getClusterNodeLabels(
+      GetClusterNodeLabelsRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().getClusterNodeLabels(request);
+  }
+
+  @Override
+  public GetApplicationReportResponse getApplicationReport(
+      GetApplicationReportRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().getApplicationReport(request);
+  }
+
+  @Override
+  public GetApplicationsResponse getApplications(GetApplicationsRequest request)
+      throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().getApplications(request);
+  }
+
+  @Override
+  public GetApplicationAttemptReportResponse getApplicationAttemptReport(
+      GetApplicationAttemptReportRequest request)
+      throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().getApplicationAttemptReport(request);
+  }
+
+  @Override
+  public GetApplicationAttemptsResponse getApplicationAttempts(
+      GetApplicationAttemptsRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().getApplicationAttempts(request);
+  }
+
+  @Override
+  public GetContainerReportResponse getContainerReport(
+      GetContainerReportRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().getContainerReport(request);
+  }
+
+  @Override
+  public GetContainersResponse getContainers(GetContainersRequest request)
+      throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().getContainers(request);
+  }
+
+  @Override
+  public GetDelegationTokenResponse getDelegationToken(
+      GetDelegationTokenRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().getDelegationToken(request);
+  }
+
+  @Override
+  public RenewDelegationTokenResponse renewDelegationToken(
+      RenewDelegationTokenRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().renewDelegationToken(request);
+  }
+
+  @Override
+  public CancelDelegationTokenResponse cancelDelegationToken(
+      CancelDelegationTokenRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().cancelDelegationToken(request);
+  }
+
+  @Override
+  public FailApplicationAttemptResponse failApplicationAttempt(
+      FailApplicationAttemptRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().failApplicationAttempt(request);
+  }
+
+  @Override
+  public UpdateApplicationPriorityResponse updateApplicationPriority(
+      UpdateApplicationPriorityRequest request)
+      throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().updateApplicationPriority(request);
+  }
+
+  @Override
+  public SignalContainerResponse signalToContainer(
+      SignalContainerRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().signalToContainer(request);
+  }
+
+  @Override
+  public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
+      UpdateApplicationTimeoutsRequest request)
+      throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().updateApplicationTimeouts(request);
+  }
+
+  private RequestInterceptorChainWrapper getInterceptorChain()
+      throws IOException {
+    String user = UserGroupInformation.getCurrentUser().getUserName();
+    if (!userPipelineMap.containsKey(user)) {
+      initializePipeline(user);
+    }
+    return userPipelineMap.get(user);
+  }
+
+  /**
+   * Gets the Request intercepter chains for all the users.
+   *
+   * @return the request intercepter chains.
+   */
+  @VisibleForTesting
+  protected Map<String, RequestInterceptorChainWrapper> getPipelines() {
+    return this.userPipelineMap;
+  }
+
+  /**
+   * This method creates and returns reference of the first intercepter in the
+   * chain of request intercepter instances.
+   *
+   * @return the reference of the first intercepter in the chain
+   */
+  @VisibleForTesting
+  protected ClientRequestInterceptor createRequestInterceptorChain() {
+    Configuration conf = getConfig();
+
+    List<String> interceptorClassNames = getInterceptorClassNames(conf);
+
+    ClientRequestInterceptor pipeline = null;
+    ClientRequestInterceptor current = null;
+    for (String interceptorClassName : interceptorClassNames) {
+      try {
+        Class<?> interceptorClass = conf.getClassByName(interceptorClassName);
+        if (ClientRequestInterceptor.class.isAssignableFrom(interceptorClass)) {
+          ClientRequestInterceptor interceptorInstance =
+              (ClientRequestInterceptor) ReflectionUtils
+                  .newInstance(interceptorClass, conf);
+          if (pipeline == null) {
+            pipeline = interceptorInstance;
+            current = interceptorInstance;
+            continue;
+          } else {
+            current.setNextInterceptor(interceptorInstance);
+            current = interceptorInstance;
+          }
+        } else {
+          throw new YarnRuntimeException(
+              "Class: " + interceptorClassName + " not instance of "
+                  + ClientRequestInterceptor.class.getCanonicalName());
+        }
+      } catch (ClassNotFoundException e) {
+        throw new YarnRuntimeException(
+            "Could not instantiate ApplicationClientRequestInterceptor: "
+                + interceptorClassName,
+            e);
+      }
+    }
+
+    if (pipeline == null) {
+      throw new YarnRuntimeException(
+          "RequestInterceptor pipeline is not configured in the system");
+    }
+    return pipeline;
+  }
+
+  /**
+   * Initializes the request intercepter pipeline for the specified application.
+   *
+   * @param user
+   */
+  private void initializePipeline(String user) {
+    RequestInterceptorChainWrapper chainWrapper = null;
+    synchronized (this.userPipelineMap) {
+      if (this.userPipelineMap.containsKey(user)) {
+        LOG.info("Request to start an already existing user: {}"
+            + " was received, so ignoring.", user);
+        return;
+      }
+
+      chainWrapper = new RequestInterceptorChainWrapper();
+      this.userPipelineMap.put(user, chainWrapper);
+    }
+
+    // We register the pipeline instance in the map first and then initialize it
+    // later because chain initialization can be expensive and we would like to
+    // release the lock as soon as possible to prevent other applications from
+    // blocking when one application's chain is initializing
+    LOG.info("Initializing request processing pipeline for application "
+        + "for the user: {}", user);
+
+    try {
+      ClientRequestInterceptor interceptorChain =
+          this.createRequestInterceptorChain();
+      interceptorChain.init(user);
+      chainWrapper.init(interceptorChain);
+    } catch (Exception e) {
+      synchronized (this.userPipelineMap) {
+        this.userPipelineMap.remove(user);
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * Private structure for encapsulating RequestInterceptor and user instances.
+   *
+   */
+  @Private
+  public static class RequestInterceptorChainWrapper {
+    private ClientRequestInterceptor rootInterceptor;
+
+    /**
+     * Initializes the wrapper with the specified parameters.
+     *
+     * @param interceptor the first interceptor in the pipeline
+     */
+    public synchronized void init(ClientRequestInterceptor interceptor) {
+      this.rootInterceptor = interceptor;
+    }
+
+    /**
+     * Gets the root request intercepter.
+     *
+     * @return the root request intercepter
+     */
+    public synchronized ClientRequestInterceptor getRootInterceptor() {
+      return rootInterceptor;
+    }
+
+    /**
+     * Shutdown the chain of interceptors when the object is destroyed.
+     */
+    @Override
+    protected void finalize() {
+      rootInterceptor.shutdown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f4191c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/package-info.java
new file mode 100644
index 0000000..7d1dadd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Router ClientRM Proxy Service package. **/
+package org.apache.hadoop.yarn.server.router.clientrm;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f4191c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouter.java
deleted file mode 100644
index a31d6b9..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouter.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.router;
-
-/**
- * Test class for YARN Router.
- */
-public class TestRouter {
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f4191c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java
new file mode 100644
index 0000000..a283a62
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java
@@ -0,0 +1,574 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrm;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.UTCClock;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+/**
+ * Base class for all the RouterClientRMService test cases. It provides utility
+ * methods that can be used by the concrete test case classes.
+ *
+ */
+public abstract class BaseRouterClientRMTest {
+
+  /**
+   * The RouterClientRMService instance that will be used by all the test cases.
+   */
+  private MockRouterClientRMService clientrmService;
+  /**
+   * Thread pool used for asynchronous operations.
+   */
+  private static ExecutorService threadpool = Executors.newCachedThreadPool();
+  private Configuration conf;
+  private AsyncDispatcher dispatcher;
+
+  public final static int TEST_MAX_CACHE_SIZE = 10;
+
+  protected MockRouterClientRMService getRouterClientRMService() {
+    Assert.assertNotNull(this.clientrmService);
+    return this.clientrmService;
+  }
+
+  @Before
+  public void setUp() {
+    this.conf = new YarnConfiguration();
+    String mockPassThroughInterceptorClass =
+        PassThroughClientRequestInterceptor.class.getName();
+
+    // Create a request intercepter pipeline for testing. The last one in the
+    // chain will call the mock resource manager. The others in the chain will
+    // simply forward it to the next one in the chain
+    this.conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
+        mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
+            + "," + mockPassThroughInterceptorClass + ","
+            + MockClientRequestInterceptor.class.getName());
+
+    this.conf.setInt(YarnConfiguration.ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE,
+        TEST_MAX_CACHE_SIZE);
+
+    this.dispatcher = new AsyncDispatcher();
+    this.dispatcher.init(conf);
+    this.dispatcher.start();
+    this.clientrmService = createAndStartRouterClientRMService();
+  }
+
+  @After
+  public void tearDown() {
+    if (clientrmService != null) {
+      clientrmService.stop();
+      clientrmService = null;
+    }
+    if (this.dispatcher != null) {
+      this.dispatcher.stop();
+    }
+  }
+
+  protected ExecutorService getThreadPool() {
+    return threadpool;
+  }
+
+  protected MockRouterClientRMService createAndStartRouterClientRMService() {
+    MockRouterClientRMService svc = new MockRouterClientRMService();
+    svc.init(conf);
+    svc.start();
+    return svc;
+  }
+
+  protected static class MockRouterClientRMService
+      extends RouterClientRMService {
+    public MockRouterClientRMService() {
+      super();
+    }
+  }
+
+  protected GetNewApplicationResponse getNewApplication(String user)
+      throws YarnException, IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<GetNewApplicationResponse>() {
+          @Override
+          public GetNewApplicationResponse run() throws Exception {
+            GetNewApplicationRequest req =
+                GetNewApplicationRequest.newInstance();
+            GetNewApplicationResponse response =
+                getRouterClientRMService().getNewApplication(req);
+            return response;
+          }
+        });
+  }
+
+  protected SubmitApplicationResponse submitApplication(
+      final ApplicationId appId, String user)
+      throws YarnException, IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<SubmitApplicationResponse>() {
+          @Override
+          public SubmitApplicationResponse run() throws Exception {
+            ApplicationSubmissionContext context =
+                ApplicationSubmissionContext.newInstance(appId, "", "", null,
+                    null, false, false, -1, null, null);
+            SubmitApplicationRequest req =
+                SubmitApplicationRequest.newInstance(context);
+            SubmitApplicationResponse response =
+                getRouterClientRMService().submitApplication(req);
+            return response;
+          }
+        });
+  }
+
+  protected KillApplicationResponse forceKillApplication(
+      final ApplicationId appId, String user)
+      throws YarnException, IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<KillApplicationResponse>() {
+          @Override
+          public KillApplicationResponse run() throws Exception {
+            KillApplicationRequest req =
+                KillApplicationRequest.newInstance(appId);
+            KillApplicationResponse response =
+                getRouterClientRMService().forceKillApplication(req);
+            return response;
+          }
+        });
+  }
+
+  protected GetClusterMetricsResponse getClusterMetrics(String user)
+      throws YarnException, IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<GetClusterMetricsResponse>() {
+          @Override
+          public GetClusterMetricsResponse run() throws Exception {
+            GetClusterMetricsRequest req =
+                GetClusterMetricsRequest.newInstance();
+            GetClusterMetricsResponse response =
+                getRouterClientRMService().getClusterMetrics(req);
+            return response;
+          }
+        });
+  }
+
+  protected GetClusterNodesResponse getClusterNodes(String user)
+      throws YarnException, IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<GetClusterNodesResponse>() {
+          @Override
+          public GetClusterNodesResponse run() throws Exception {
+            GetClusterNodesRequest req = GetClusterNodesRequest.newInstance();
+            GetClusterNodesResponse response =
+                getRouterClientRMService().getClusterNodes(req);
+            return response;
+          }
+        });
+  }
+
+  protected GetQueueInfoResponse getQueueInfo(String user)
+      throws YarnException, IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<GetQueueInfoResponse>() {
+          @Override
+          public GetQueueInfoResponse run() throws Exception {
+            GetQueueInfoRequest req =
+                GetQueueInfoRequest.newInstance("default", false, false, false);
+            GetQueueInfoResponse response =
+                getRouterClientRMService().getQueueInfo(req);
+            return response;
+          }
+        });
+  }
+
+  protected GetQueueUserAclsInfoResponse getQueueUserAcls(String user)
+      throws YarnException, IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<GetQueueUserAclsInfoResponse>() {
+          @Override
+          public GetQueueUserAclsInfoResponse run() throws Exception {
+            GetQueueUserAclsInfoRequest req =
+                GetQueueUserAclsInfoRequest.newInstance();
+            GetQueueUserAclsInfoResponse response =
+                getRouterClientRMService().getQueueUserAcls(req);
+            return response;
+          }
+        });
+  }
+
+  protected MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+      String user, final ApplicationId appId)
+      throws YarnException, IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user).doAs(
+        new PrivilegedExceptionAction<MoveApplicationAcrossQueuesResponse>() {
+          @Override
+          public MoveApplicationAcrossQueuesResponse run() throws Exception {
+
+            MoveApplicationAcrossQueuesRequest req =
+                MoveApplicationAcrossQueuesRequest.newInstance(appId,
+                    "newQueue");
+            MoveApplicationAcrossQueuesResponse response =
+                getRouterClientRMService().moveApplicationAcrossQueues(req);
+            return response;
+          }
+        });
+  }
+
+  public GetNewReservationResponse getNewReservation(String user)
+      throws YarnException, IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<GetNewReservationResponse>() {
+          @Override
+          public GetNewReservationResponse run() throws Exception {
+            GetNewReservationResponse response = getRouterClientRMService()
+                .getNewReservation(GetNewReservationRequest.newInstance());
+            return response;
+          }
+        });
+  }
+
+  protected ReservationSubmissionResponse submitReservation(String user,
+      final ReservationId reservationId)
+      throws YarnException, IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<ReservationSubmissionResponse>() {
+          @Override
+          public ReservationSubmissionResponse run() throws Exception {
+            Clock clock = new UTCClock();
+            long arrival = clock.getTime();
+            long duration = 60000;
+            long deadline = (long) (arrival + 1.05 * duration);
+
+            ReservationSubmissionRequest req = createSimpleReservationRequest(1,
+                arrival, deadline, duration, reservationId);
+            ReservationSubmissionResponse response =
+                getRouterClientRMService().submitReservation(req);
+            return response;
+          }
+        });
+  }
+
+  protected ReservationUpdateResponse updateReservation(String user,
+      final ReservationId reservationId)
+      throws YarnException, IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<ReservationUpdateResponse>() {
+          @Override
+          public ReservationUpdateResponse run() throws Exception {
+            Clock clock = new UTCClock();
+            long arrival = clock.getTime();
+            long duration = 60000;
+            long deadline = (long) (arrival + 1.05 * duration);
+            ReservationDefinition rDef =
+                createSimpleReservationRequest(1, arrival, deadline, duration,
+                    reservationId).getReservationDefinition();
+
+            ReservationUpdateRequest req =
+                ReservationUpdateRequest.newInstance(rDef, reservationId);
+            ReservationUpdateResponse response =
+                getRouterClientRMService().updateReservation(req);
+            return response;
+          }
+        });
+  }
+
+  protected ReservationDeleteResponse deleteReservation(String user,
+      final ReservationId reservationId)
+      throws YarnException, IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<ReservationDeleteResponse>() {
+          @Override
+          public ReservationDeleteResponse run() throws Exception {
+            ReservationDeleteRequest req =
+                ReservationDeleteRequest.newInstance(reservationId);
+            ReservationDeleteResponse response =
+                getRouterClientRMService().deleteReservation(req);
+            return response;
+          }
+        });
+  }
+
+  protected GetNodesToLabelsResponse getNodeToLabels(String user)
+      throws YarnException, IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<GetNodesToLabelsResponse>() {
+          @Override
+          public GetNodesToLabelsResponse run() throws Exception {
+            GetNodesToLabelsRequest req = GetNodesToLabelsRequest.newInstance();
+            GetNodesToLabelsResponse response =
+                getRouterClientRMService().getNodeToLabels(req);
+            return response;
+          }
+        });
+  }
+
+  protected GetLabelsToNodesResponse getLabelsToNodes(String user)
+      throws YarnException, IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<GetLabelsToNodesResponse>() {
+          @Override
+          public GetLabelsToNodesResponse run() throws Exception {
+            GetLabelsToNodesRequest req = GetLabelsToNodesRequest.newInstance();
+            GetLabelsToNodesResponse response =
+                getRouterClientRMService().getLabelsToNodes(req);
+            return response;
+          }
+        });
+  }
+
+  protected GetClusterNodeLabelsResponse getClusterNodeLabels(String user)
+      throws YarnException, IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<GetClusterNodeLabelsResponse>() {
+          @Override
+          public GetClusterNodeLabelsResponse run() throws Exception {
+            GetClusterNodeLabelsRequest req =
+                GetClusterNodeLabelsRequest.newInstance();
+            GetClusterNodeLabelsResponse response =
+                getRouterClientRMService().getClusterNodeLabels(req);
+            return response;
+          }
+        });
+  }
+
+  protected GetApplicationReportResponse getApplicationReport(String user,
+      final ApplicationId appId)
+      throws YarnException, IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<GetApplicationReportResponse>() {
+          @Override
+          public GetApplicationReportResponse run() throws Exception {
+            GetApplicationReportRequest req =
+                GetApplicationReportRequest.newInstance(appId);
+            GetApplicationReportResponse response =
+                getRouterClientRMService().getApplicationReport(req);
+            return response;
+          }
+        });
+  }
+
+  protected GetApplicationsResponse getApplications(String user)
+      throws YarnException, IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<GetApplicationsResponse>() {
+          @Override
+          public GetApplicationsResponse run() throws Exception {
+            GetApplicationsRequest req = GetApplicationsRequest.newInstance();
+            GetApplicationsResponse response =
+                getRouterClientRMService().getApplications(req);
+            return response;
+          }
+        });
+  }
+
+  protected GetApplicationAttemptReportResponse getApplicationAttemptReport(
+      String user, final ApplicationAttemptId appAttemptId)
+      throws YarnException, IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user).doAs(
+        new PrivilegedExceptionAction<GetApplicationAttemptReportResponse>() {
+          @Override
+          public GetApplicationAttemptReportResponse run() throws Exception {
+            GetApplicationAttemptReportRequest req =
+                GetApplicationAttemptReportRequest.newInstance(appAttemptId);
+            GetApplicationAttemptReportResponse response =
+                getRouterClientRMService().getApplicationAttemptReport(req);
+            return response;
+          }
+        });
+  }
+
+  protected GetApplicationAttemptsResponse getApplicationAttempts(String user,
+      final ApplicationId applicationId)
+      throws YarnException, IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<GetApplicationAttemptsResponse>() {
+          @Override
+          public GetApplicationAttemptsResponse run() throws Exception {
+            GetApplicationAttemptsRequest req =
+                GetApplicationAttemptsRequest.newInstance(applicationId);
+            GetApplicationAttemptsResponse response =
+                getRouterClientRMService().getApplicationAttempts(req);
+            return response;
+          }
+        });
+  }
+
+  protected GetContainerReportResponse getContainerReport(String user,
+      final ContainerId containerId)
+      throws YarnException, IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<GetContainerReportResponse>() {
+          @Override
+          public GetContainerReportResponse run() throws Exception {
+            GetContainerReportRequest req =
+                GetContainerReportRequest.newInstance(containerId);
+            GetContainerReportResponse response =
+                getRouterClientRMService().getContainerReport(req);
+            return response;
+          }
+        });
+  }
+
+  protected GetContainersResponse getContainers(String user,
+      final ApplicationAttemptId appAttemptId)
+      throws YarnException, IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<GetContainersResponse>() {
+          @Override
+          public GetContainersResponse run() throws Exception {
+            GetContainersRequest req =
+                GetContainersRequest.newInstance(appAttemptId);
+            GetContainersResponse response =
+                getRouterClientRMService().getContainers(req);
+            return response;
+          }
+        });
+  }
+
+  protected GetDelegationTokenResponse getDelegationToken(final String user)
+      throws YarnException, IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<GetDelegationTokenResponse>() {
+          @Override
+          public GetDelegationTokenResponse run() throws Exception {
+            GetDelegationTokenRequest req =
+                GetDelegationTokenRequest.newInstance(user);
+            GetDelegationTokenResponse response =
+                getRouterClientRMService().getDelegationToken(req);
+            return response;
+          }
+        });
+  }
+
+  protected RenewDelegationTokenResponse renewDelegationToken(String user,
+      final Token token)
+      throws YarnException, IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<RenewDelegationTokenResponse>() {
+          @Override
+          public RenewDelegationTokenResponse run() throws Exception {
+            RenewDelegationTokenRequest req =
+                RenewDelegationTokenRequest.newInstance(token);
+            RenewDelegationTokenResponse response =
+                getRouterClientRMService().renewDelegationToken(req);
+            return response;
+          }
+        });
+  }
+
+  protected CancelDelegationTokenResponse cancelDelegationToken(String user,
+      final Token token)
+      throws YarnException, IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<CancelDelegationTokenResponse>() {
+          @Override
+          public CancelDelegationTokenResponse run() throws Exception {
+            CancelDelegationTokenRequest req =
+                CancelDelegationTokenRequest.newInstance(token);
+            CancelDelegationTokenResponse response =
+                getRouterClientRMService().cancelDelegationToken(req);
+            return response;
+          }
+        });
+  }
+
+  private ReservationSubmissionRequest createSimpleReservationRequest(
+      int numContainers, long arrival, long deadline, long duration,
+      ReservationId reservationId) {
+    // create a request with a single atomic ask
+    ReservationRequest r = ReservationRequest
+        .newInstance(Resource.newInstance(1024, 1), numContainers, 1, duration);
+    ReservationRequests reqs = ReservationRequests.newInstance(
+        Collections.singletonList(r), ReservationRequestInterpreter.R_ALL);
+    ReservationDefinition rDef = ReservationDefinition.newInstance(arrival,
+        deadline, reqs, "testRouterClientRMService#reservation");
+    ReservationSubmissionRequest request = ReservationSubmissionRequest
+        .newInstance(rDef, "dedicated", reservationId);
+    return request;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f4191c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/MockClientRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/MockClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/MockClientRequestInterceptor.java
new file mode 100644
index 0000000..b38703f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/MockClientRequestInterceptor.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrm;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
+
+/**
+ * This class mocks the ClientRequestInterceptor.
+ */
+public class MockClientRequestInterceptor
+    extends DefaultClientRequestInterceptor {
+
+  public void init(String user) {
+    MockResourceManagerFacade mockRM = new MockResourceManagerFacade(
+        new YarnConfiguration(super.getConf()), 0);
+    super.setRMClient(mockRM);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f4191c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/PassThroughClientRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/PassThroughClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/PassThroughClientRequestInterceptor.java
new file mode 100644
index 0000000..c403bd5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/PassThroughClientRequestInterceptor.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrm;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * Mock intercepter that does not do anything other than forwarding it to the
+ * next intercepter in the chain.
+ */
+public class PassThroughClientRequestInterceptor
+    extends AbstractClientRequestInterceptor {
+
+  @Override
+  public GetNewApplicationResponse getNewApplication(
+      GetNewApplicationRequest request) throws YarnException, IOException {
+    return getNextInterceptor().getNewApplication(request);
+  }
+
+  @Override
+  public SubmitApplicationResponse submitApplication(
+      SubmitApplicationRequest request) throws YarnException, IOException {
+    return getNextInterceptor().submitApplication(request);
+  }
+
+  @Override
+  public KillApplicationResponse forceKillApplication(
+      KillApplicationRequest request) throws YarnException, IOException {
+    return getNextInterceptor().forceKillApplication(request);
+  }
+
+  @Override
+  public GetClusterMetricsResponse getClusterMetrics(
+      GetClusterMetricsRequest request) throws YarnException, IOException {
+    return getNextInterceptor().getClusterMetrics(request);
+  }
+
+  @Override
+  public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
+      throws YarnException, IOException {
+    return getNextInterceptor().getClusterNodes(request);
+  }
+
+  @Override
+  public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
+      throws YarnException, IOException {
+    return getNextInterceptor().getQueueInfo(request);
+  }
+
+  @Override
+  public GetQueueUserAclsInfoResponse getQueueUserAcls(
+      GetQueueUserAclsInfoRequest request) throws YarnException, IOException {
+    return getNextInterceptor().getQueueUserAcls(request);
+  }
+
+  @Override
+  public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+      MoveApplicationAcrossQueuesRequest request)
+      throws YarnException, IOException {
+    return getNextInterceptor().moveApplicationAcrossQueues(request);
+  }
+
+  @Override
+  public GetNewReservationResponse getNewReservation(
+      GetNewReservationRequest request) throws YarnException, IOException {
+    return getNextInterceptor().getNewReservation(request);
+  }
+
+  @Override
+  public ReservationSubmissionResponse submitReservation(
+      ReservationSubmissionRequest request) throws YarnException, IOException {
+    return getNextInterceptor().submitReservation(request);
+  }
+
+  @Override
+  public ReservationListResponse listReservations(
+      ReservationListRequest request) throws YarnException, IOException {
+    return getNextInterceptor().listReservations(request);
+  }
+
+  @Override
+  public ReservationUpdateResponse updateReservation(
+      ReservationUpdateRequest request) throws YarnException, IOException {
+    return getNextInterceptor().updateReservation(request);
+  }
+
+  @Override
+  public ReservationDeleteResponse deleteReservation(
+      ReservationDeleteRequest request) throws YarnException, IOException {
+    return getNextInterceptor().deleteReservation(request);
+  }
+
+  @Override
+  public GetNodesToLabelsResponse getNodeToLabels(
+      GetNodesToLabelsRequest request) throws YarnException, IOException {
+    return getNextInterceptor().getNodeToLabels(request);
+  }
+
+  @Override
+  public GetLabelsToNodesResponse getLabelsToNodes(
+      GetLabelsToNodesRequest request) throws YarnException, IOException {
+    return getNextInterceptor().getLabelsToNodes(request);
+  }
+
+  @Override
+  public GetClusterNodeLabelsResponse getClusterNodeLabels(
+      GetClusterNodeLabelsRequest request) throws YarnException, IOException {
+    return getNextInterceptor().getClusterNodeLabels(request);
+  }
+
+  @Override
+  public GetApplicationReportResponse getApplicationReport(
+      GetApplicationReportRequest request) throws YarnException, IOException {
+    return getNextInterceptor().getApplicationReport(request);
+  }
+
+  @Override
+  public GetApplicationsResponse getApplications(GetApplicationsRequest request)
+      throws YarnException, IOException {
+    return getNextInterceptor().getApplications(request);
+  }
+
+  @Override
+  public GetApplicationAttemptReportResponse getApplicationAttemptReport(
+      GetApplicationAttemptReportRequest request)
+      throws YarnException, IOException {
+    return getNextInterceptor().getApplicationAttemptReport(request);
+  }
+
+  @Override
+  public GetApplicationAttemptsResponse getApplicationAttempts(
+      GetApplicationAttemptsRequest request) throws YarnException, IOException {
+    return getNextInterceptor().getApplicationAttempts(request);
+  }
+
+  @Override
+  public GetContainerReportResponse getContainerReport(
+      GetContainerReportRequest request) throws YarnException, IOException {
+    return getNextInterceptor().getContainerReport(request);
+  }
+
+  @Override
+  public GetContainersResponse getContainers(GetContainersRequest request)
+      throws YarnException, IOException {
+    return getNextInterceptor().getContainers(request);
+  }
+
+  @Override
+  public GetDelegationTokenResponse getDelegationToken(
+      GetDelegationTokenRequest request) throws YarnException, IOException {
+    return getNextInterceptor().getDelegationToken(request);
+  }
+
+  @Override
+  public RenewDelegationTokenResponse renewDelegationToken(
+      RenewDelegationTokenRequest request) throws YarnException, IOException {
+    return getNextInterceptor().renewDelegationToken(request);
+  }
+
+  @Override
+  public CancelDelegationTokenResponse cancelDelegationToken(
+      CancelDelegationTokenRequest request) throws YarnException, IOException {
+    return getNextInterceptor().cancelDelegationToken(request);
+  }
+
+  @Override
+  public FailApplicationAttemptResponse failApplicationAttempt(
+      FailApplicationAttemptRequest request) throws YarnException, IOException {
+    return getNextInterceptor().failApplicationAttempt(request);
+  }
+
+  @Override
+  public UpdateApplicationPriorityResponse updateApplicationPriority(
+      UpdateApplicationPriorityRequest request)
+      throws YarnException, IOException {
+    return getNextInterceptor().updateApplicationPriority(request);
+  }
+
+  @Override
+  public SignalContainerResponse signalToContainer(
+      SignalContainerRequest request) throws YarnException, IOException {
+    return getNextInterceptor().signalToContainer(request);
+  }
+
+  @Override
+  public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
+      UpdateApplicationTimeoutsRequest request)
+      throws YarnException, IOException {
+    return getNextInterceptor().updateApplicationTimeouts(request);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message