hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject [17/36] hadoop git commit: YARN-6923. Metrics for Federation Router. (Giovanni Matteo Fumarola via asuresh)
Date Wed, 23 Aug 2017 02:57:28 GMT
YARN-6923. Metrics for Federation Router. (Giovanni Matteo Fumarola via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ae8fb13b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ae8fb13b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ae8fb13b

Branch: refs/heads/YARN-5734
Commit: ae8fb13b312b30de50d65b5450b565d50d690e9e
Parents: 75abc9a
Author: Arun Suresh <asuresh@apache.org>
Authored: Mon Aug 21 22:50:24 2017 -0700
Committer: Arun Suresh <asuresh@apache.org>
Committed: Mon Aug 21 22:50:24 2017 -0700

----------------------------------------------------------------------
 .../yarn/server/router/RouterMetrics.java       | 203 +++++++++++++++
 .../clientrm/FederationClientInterceptor.java   |  37 ++-
 .../webapp/FederationInterceptorREST.java       | 116 +++++++--
 .../yarn/server/router/TestRouterMetrics.java   | 248 +++++++++++++++++++
 .../webapp/TestFederationInterceptorREST.java   |  12 +-
 5 files changed, 593 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8fb13b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
new file mode 100644
index 0000000..42361a3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.*;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.hadoop.metrics2.lib.Interns.info;
+
+/**
+ * This class is for maintaining the various Router Federation Interceptor
+ * activity statistics and publishing them through the metrics interfaces.
+ */
+@InterfaceAudience.Private
+@Metrics(about = "Metrics for Router Federation Interceptor", context = "fedr")
+public final class RouterMetrics {
+
+  private static final MetricsInfo RECORD_INFO =
+      info("RouterMetrics", "Router Federation Interceptor");
+  private static AtomicBoolean isInitialized = new AtomicBoolean(false);
+
+  // Metrics for operation failed
+  @Metric("# of applications failed to be submitted")
+  private MutableGaugeInt numAppsFailedSubmitted;
+  @Metric("# of applications failed to be created")
+  private MutableGaugeInt numAppsFailedCreated;
+  @Metric("# of applications failed to be killed")
+  private MutableGaugeInt numAppsFailedKilled;
+  @Metric("# of application reports failed to be retrieved")
+  private MutableGaugeInt numAppsFailedRetrieved;
+
+  // Aggregate metrics are shared, and don't have to be looked up per call
+  @Metric("Total number of successful Submitted apps and latency(ms)")
+  private MutableRate totalSucceededAppsSubmitted;
+  @Metric("Total number of successful Killed apps and latency(ms)")
+  private MutableRate totalSucceededAppsKilled;
+  @Metric("Total number of successful Created apps and latency(ms)")
+  private MutableRate totalSucceededAppsCreated;
+  @Metric("Total number of successful Retrieved app reports and latency(ms)")
+  private MutableRate totalSucceededAppsRetrieved;
+
+  /**
+   * Provide quantile counters for all latencies.
+   */
+  private MutableQuantiles submitApplicationLatency;
+  private MutableQuantiles getNewApplicationLatency;
+  private MutableQuantiles killApplicationLatency;
+  private MutableQuantiles getApplicationReportLatency;
+
+  private static volatile RouterMetrics INSTANCE = null;
+  private static MetricsRegistry registry;
+
+  private RouterMetrics() {
+    registry = new MetricsRegistry(RECORD_INFO);
+    registry.tag(RECORD_INFO, "Router");
+    getNewApplicationLatency = registry.newQuantiles("getNewApplicationLatency",
+        "latency of get new application", "ops", "latency", 10);
+    submitApplicationLatency = registry.newQuantiles("submitApplicationLatency",
+        "latency of submit application", "ops", "latency", 10);
+    killApplicationLatency = registry.newQuantiles("killApplicationLatency",
+        "latency of kill application", "ops", "latency", 10);
+    getApplicationReportLatency =
+        registry.newQuantiles("getApplicationReportLatency",
+            "latency of get application report", "ops", "latency", 10);
+  }
+
+  public static RouterMetrics getMetrics() {
+    if (!isInitialized.get()) {
+      synchronized (RouterMetrics.class) {
+        if (INSTANCE == null) {
+          INSTANCE = DefaultMetricsSystem.instance().register("RouterMetrics",
+              "Metrics for the Yarn Router", new RouterMetrics());
+          isInitialized.set(true);
+        }
+      }
+    }
+    return INSTANCE;
+  }
+
+  @VisibleForTesting
+  synchronized static void destroy() {
+    isInitialized.set(false);
+    INSTANCE = null;
+  }
+
+  @VisibleForTesting
+  public long getNumSucceededAppsCreated() {
+    return totalSucceededAppsCreated.lastStat().numSamples();
+  }
+
+  @VisibleForTesting
+  public long getNumSucceededAppsSubmitted() {
+    return totalSucceededAppsSubmitted.lastStat().numSamples();
+  }
+
+  @VisibleForTesting
+  public long getNumSucceededAppsKilled() {
+    return totalSucceededAppsKilled.lastStat().numSamples();
+  }
+
+  @VisibleForTesting
+  public long getNumSucceededAppsRetrieved() {
+    return totalSucceededAppsRetrieved.lastStat().numSamples();
+  }
+
+  @VisibleForTesting
+  public double getLatencySucceededAppsCreated() {
+    return totalSucceededAppsCreated.lastStat().mean();
+  }
+
+  @VisibleForTesting
+  public double getLatencySucceededAppsSubmitted() {
+    return totalSucceededAppsSubmitted.lastStat().mean();
+  }
+
+  @VisibleForTesting
+  public double getLatencySucceededAppsKilled() {
+    return totalSucceededAppsKilled.lastStat().mean();
+  }
+
+  @VisibleForTesting
+  public double getLatencySucceededGetAppReport() {
+    return totalSucceededAppsRetrieved.lastStat().mean();
+  }
+
+  @VisibleForTesting
+  public int getAppsFailedCreated() {
+    return numAppsFailedCreated.value();
+  }
+
+  @VisibleForTesting
+  public int getAppsFailedSubmitted() {
+    return numAppsFailedSubmitted.value();
+  }
+
+  @VisibleForTesting
+  public int getAppsFailedKilled() {
+    return numAppsFailedKilled.value();
+  }
+
+  @VisibleForTesting
+  public int getAppsFailedRetrieved() {
+    return numAppsFailedRetrieved.value();
+  }
+
+  public void succeededAppsCreated(long duration) {
+    totalSucceededAppsCreated.add(duration);
+    getNewApplicationLatency.add(duration);
+  }
+
+  public void succeededAppsSubmitted(long duration) {
+    totalSucceededAppsSubmitted.add(duration);
+    submitApplicationLatency.add(duration);
+  }
+
+  public void succeededAppsKilled(long duration) {
+    totalSucceededAppsKilled.add(duration);
+    killApplicationLatency.add(duration);
+  }
+
+  public void succeededAppsRetrieved(long duration) {
+    totalSucceededAppsRetrieved.add(duration);
+    getApplicationReportLatency.add(duration);
+  }
+
+  public void incrAppsFailedCreated() {
+    numAppsFailedCreated.incr();
+  }
+
+  public void incrAppsFailedSubmitted() {
+    numAppsFailedSubmitted.incr();
+  }
+
+  public void incrAppsFailedKilled() {
+    numAppsFailedKilled.incr();
+  }
+
+  public void incrAppsFailedRetrieved() {
+    numAppsFailedRetrieved.incr();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8fb13b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
index 7268ebd..3a36eec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
@@ -98,7 +98,10 @@ import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSub
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.router.RouterMetrics;
 import org.apache.hadoop.yarn.server.router.RouterServerUtil;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.MonotonicClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -130,6 +133,8 @@ public class FederationClientInterceptor
   private FederationStateStoreFacade federationFacade;
   private Random rand;
   private RouterPolicyFacade policyFacade;
+  private RouterMetrics routerMetrics;
+  private final Clock clock = new MonotonicClock();
 
   @Override
   public void init(String userName) {
@@ -153,7 +158,7 @@ public class FederationClientInterceptor
 
     clientRMProxies =
         new ConcurrentHashMap<SubClusterId, ApplicationClientProtocol>();
-
+    routerMetrics = RouterMetrics.getMetrics();
   }
 
   @Override
@@ -220,6 +225,9 @@ public class FederationClientInterceptor
   @Override
   public GetNewApplicationResponse getNewApplication(
       GetNewApplicationRequest request) throws YarnException, IOException {
+
+    long startTime = clock.getTime();
+
     Map<SubClusterId, SubClusterInfo> subClustersActive =
         federationFacade.getSubClusters(true);
 
@@ -238,6 +246,9 @@ public class FederationClientInterceptor
       }
 
       if (response != null) {
+
+        long stopTime = clock.getTime();
+        routerMetrics.succeededAppsCreated(stopTime - startTime);
         return response;
       } else {
         // Empty response from the ResourceManager.
@@ -247,6 +258,7 @@ public class FederationClientInterceptor
 
     }
 
+    routerMetrics.incrAppsFailedCreated();
     String errMsg = "Fail to create a new application.";
     LOG.error(errMsg);
     throw new YarnException(errMsg);
@@ -320,9 +332,13 @@ public class FederationClientInterceptor
   @Override
   public SubmitApplicationResponse submitApplication(
       SubmitApplicationRequest request) throws YarnException, IOException {
+
+    long startTime = clock.getTime();
+
     if (request == null || request.getApplicationSubmissionContext() == null
         || request.getApplicationSubmissionContext()
             .getApplicationId() == null) {
+      routerMetrics.incrAppsFailedSubmitted();
       RouterServerUtil
           .logAndThrowException("Missing submitApplication request or "
               + "applicationSubmissionContex information.", null);
@@ -350,6 +366,7 @@ public class FederationClientInterceptor
           subClusterId =
               federationFacade.addApplicationHomeSubCluster(appHomeSubCluster);
         } catch (YarnException e) {
+          routerMetrics.incrAppsFailedSubmitted();
           String message = "Unable to insert the ApplicationId " + applicationId
               + " into the FederationStateStore";
           RouterServerUtil.logAndThrowException(message, e);
@@ -368,6 +385,7 @@ public class FederationClientInterceptor
             LOG.info("Application " + applicationId
                 + " already submitted on SubCluster " + subClusterId);
           } else {
+            routerMetrics.incrAppsFailedSubmitted();
             RouterServerUtil.logAndThrowException(message, e);
           }
         }
@@ -388,6 +406,8 @@ public class FederationClientInterceptor
         LOG.info("Application "
             + request.getApplicationSubmissionContext().getApplicationName()
             + " with appId " + applicationId + " submitted on " + subClusterId);
+        long stopTime = clock.getTime();
+        routerMetrics.succeededAppsSubmitted(stopTime - startTime);
         return response;
       } else {
         // Empty response from the ResourceManager.
@@ -396,6 +416,7 @@ public class FederationClientInterceptor
       }
     }
 
+    routerMetrics.incrAppsFailedSubmitted();
     String errMsg = "Application "
         + request.getApplicationSubmissionContext().getApplicationName()
         + " with appId " + applicationId + " failed to be submitted.";
@@ -423,7 +444,10 @@ public class FederationClientInterceptor
   public KillApplicationResponse forceKillApplication(
       KillApplicationRequest request) throws YarnException, IOException {
 
+    long startTime = clock.getTime();
+
     if (request == null || request.getApplicationId() == null) {
+      routerMetrics.incrAppsFailedKilled();
       RouterServerUtil.logAndThrowException(
           "Missing forceKillApplication request or ApplicationId.", null);
     }
@@ -434,6 +458,7 @@ public class FederationClientInterceptor
       subClusterId = federationFacade
           .getApplicationHomeSubCluster(request.getApplicationId());
     } catch (YarnException e) {
+      routerMetrics.incrAppsFailedKilled();
       RouterServerUtil.logAndThrowException("Application " + applicationId
           + " does not exist in FederationStateStore", e);
     }
@@ -447,6 +472,7 @@ public class FederationClientInterceptor
           + subClusterId);
       response = clientRMProxy.forceKillApplication(request);
     } catch (Exception e) {
+      routerMetrics.incrAppsFailedKilled();
       LOG.error("Unable to kill the application report for "
           + request.getApplicationId() + "to SubCluster "
           + subClusterId.getId(), e);
@@ -458,6 +484,8 @@ public class FederationClientInterceptor
           + applicationId + " to SubCluster " + subClusterId.getId());
     }
 
+    long stopTime = clock.getTime();
+    routerMetrics.succeededAppsKilled(stopTime - startTime);
     return response;
   }
 
@@ -481,7 +509,10 @@ public class FederationClientInterceptor
   public GetApplicationReportResponse getApplicationReport(
       GetApplicationReportRequest request) throws YarnException, IOException {
 
+    long startTime = clock.getTime();
+
     if (request == null || request.getApplicationId() == null) {
+      routerMetrics.incrAppsFailedRetrieved();
       RouterServerUtil.logAndThrowException(
           "Missing getApplicationReport request or applicationId information.",
           null);
@@ -493,6 +524,7 @@ public class FederationClientInterceptor
       subClusterId = federationFacade
           .getApplicationHomeSubCluster(request.getApplicationId());
     } catch (YarnException e) {
+      routerMetrics.incrAppsFailedRetrieved();
       RouterServerUtil
           .logAndThrowException("Application " + request.getApplicationId()
               + " does not exist in FederationStateStore", e);
@@ -505,6 +537,7 @@ public class FederationClientInterceptor
     try {
       response = clientRMProxy.getApplicationReport(request);
     } catch (Exception e) {
+      routerMetrics.incrAppsFailedRetrieved();
       LOG.error("Unable to get the application report for "
           + request.getApplicationId() + "to SubCluster "
           + subClusterId.getId(), e);
@@ -517,6 +550,8 @@ public class FederationClientInterceptor
           + subClusterId.getId());
     }
 
+    long stopTime = clock.getTime();
+    routerMetrics.succeededAppsRetrieved(stopTime - startTime);
     return response;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8fb13b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
index 8ecc19d..4c7d4b1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
@@ -18,7 +18,19 @@
 
 package org.apache.hadoop.yarn.server.router.webapp;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.authorize.AuthorizationException;
@@ -36,20 +48,42 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
-import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.DelegationToken;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntryList;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
+import org.apache.hadoop.yarn.server.router.RouterMetrics;
 import org.apache.hadoop.yarn.server.router.RouterServerUtil;
 import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.MonotonicClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-import java.io.IOException;
-import java.util.*;
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Extends the {@code AbstractRESTRequestInterceptor} class and provides an
@@ -66,6 +100,8 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor
{
   private FederationStateStoreFacade federationFacade;
   private Random rand;
   private RouterPolicyFacade policyFacade;
+  private RouterMetrics routerMetrics;
+  private final Clock clock = new MonotonicClock();
 
   private Map<SubClusterId, DefaultRequestInterceptorREST> interceptors;
 
@@ -88,6 +124,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor
{
             YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY);
 
     interceptors = new HashMap<SubClusterId, DefaultRequestInterceptorREST>();
+    routerMetrics = RouterMetrics.getMetrics();
   }
 
   private SubClusterId getRandomActiveSubCluster(
@@ -191,10 +228,14 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor
{
   @Override
   public Response createNewApplication(HttpServletRequest hsr)
       throws AuthorizationException, IOException, InterruptedException {
+
+    long startTime = clock.getTime();
+
     Map<SubClusterId, SubClusterInfo> subClustersActive;
     try {
       subClustersActive = federationFacade.getSubClusters(true);
     } catch (YarnException e) {
+      routerMetrics.incrAppsFailedCreated();
       return Response.status(Status.INTERNAL_SERVER_ERROR)
           .entity(e.getLocalizedMessage()).build();
     }
@@ -207,6 +248,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor
{
       try {
         subClusterId = getRandomActiveSubCluster(subClustersActive, blacklist);
       } catch (YarnException e) {
+        routerMetrics.incrAppsFailedCreated();
         return Response.status(Status.SERVICE_UNAVAILABLE)
             .entity(e.getLocalizedMessage()).build();
       }
@@ -226,6 +268,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor
{
       }
 
       if (response != null && response.getStatus() == 200) {
+
+        long stopTime = clock.getTime();
+        routerMetrics.succeededAppsCreated(stopTime - startTime);
+
         return response;
       } else {
         // Empty response from the ResourceManager.
@@ -236,6 +282,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor
{
 
     String errMsg = "Fail to create a new application.";
     LOG.error(errMsg);
+    routerMetrics.incrAppsFailedCreated();
     return Response.status(Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
   }
 
@@ -308,7 +355,11 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor
{
   public Response submitApplication(ApplicationSubmissionContextInfo newApp,
       HttpServletRequest hsr)
       throws AuthorizationException, IOException, InterruptedException {
+
+    long startTime = clock.getTime();
+
     if (newApp == null || newApp.getApplicationId() == null) {
+      routerMetrics.incrAppsFailedSubmitted();
       String errMsg = "Missing ApplicationSubmissionContextInfo or "
           + "applicationSubmissionContex information.";
       return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
@@ -318,6 +369,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor
{
     try {
       applicationId = ApplicationId.fromString(newApp.getApplicationId());
     } catch (IllegalArgumentException e) {
+      routerMetrics.incrAppsFailedSubmitted();
       return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage())
           .build();
     }
@@ -333,6 +385,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor
{
       try {
         subClusterId = policyFacade.getHomeSubcluster(context, blacklist);
       } catch (YarnException e) {
+        routerMetrics.incrAppsFailedSubmitted();
         return Response.status(Status.SERVICE_UNAVAILABLE)
             .entity(e.getLocalizedMessage()).build();
       }
@@ -349,6 +402,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor
{
           subClusterId =
               federationFacade.addApplicationHomeSubCluster(appHomeSubCluster);
         } catch (YarnException e) {
+          routerMetrics.incrAppsFailedSubmitted();
           String errMsg = "Unable to insert the ApplicationId " + applicationId
               + " into the FederationStateStore";
           return Response.status(Status.SERVICE_UNAVAILABLE)
@@ -367,6 +421,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor
{
             subClusterIdInStateStore =
                 federationFacade.getApplicationHomeSubCluster(applicationId);
           } catch (YarnException e1) {
+            routerMetrics.incrAppsFailedSubmitted();
             return Response.status(Status.SERVICE_UNAVAILABLE)
                 .entity(e1.getLocalizedMessage()).build();
           }
@@ -374,6 +429,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor
{
             LOG.info("Application " + applicationId
                 + " already submitted on SubCluster " + subClusterId);
           } else {
+            routerMetrics.incrAppsFailedSubmitted();
             return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg)
                 .build();
           }
@@ -384,6 +440,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor
{
       try {
         subClusterInfo = federationFacade.getSubCluster(subClusterId);
       } catch (YarnException e) {
+        routerMetrics.incrAppsFailedSubmitted();
         return Response.status(Status.SERVICE_UNAVAILABLE)
             .entity(e.getLocalizedMessage()).build();
       }
@@ -401,6 +458,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor
{
       if (response != null && response.getStatus() == 202) {
         LOG.info("Application " + context.getApplicationName() + " with appId "
             + applicationId + " submitted on " + subClusterId);
+
+        long stopTime = clock.getTime();
+        routerMetrics.succeededAppsSubmitted(stopTime - startTime);
+
         return response;
       } else {
         // Empty response from the ResourceManager.
@@ -409,6 +470,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor
{
       }
     }
 
+    routerMetrics.incrAppsFailedSubmitted();
     String errMsg = "Application " + newApp.getApplicationName()
         + " with appId " + applicationId + " failed to be submitted.";
     LOG.error(errMsg);
@@ -435,10 +497,13 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor
{
   public AppInfo getApp(HttpServletRequest hsr, String appId,
       Set<String> unselectedFields) {
 
+    long startTime = clock.getTime();
+
     ApplicationId applicationId = null;
     try {
       applicationId = ApplicationId.fromString(appId);
     } catch (IllegalArgumentException e) {
+      routerMetrics.incrAppsFailedRetrieved();
       return null;
     }
 
@@ -448,16 +513,23 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor
{
       subClusterId =
           federationFacade.getApplicationHomeSubCluster(applicationId);
       if (subClusterId == null) {
+        routerMetrics.incrAppsFailedRetrieved();
         return null;
       }
       subClusterInfo = federationFacade.getSubCluster(subClusterId);
     } catch (YarnException e) {
+      routerMetrics.incrAppsFailedRetrieved();
       return null;
     }
 
-    return getOrCreateInterceptorForSubCluster(subClusterId,
+    AppInfo response = getOrCreateInterceptorForSubCluster(subClusterId,
         subClusterInfo.getRMWebServiceAddress()).getApp(hsr, appId,
             unselectedFields);
+
+    long stopTime = clock.getTime();
+    routerMetrics.succeededAppsRetrieved(stopTime - startTime);
+
+    return response;
   }
 
   /**
@@ -481,23 +553,37 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor
{
       String appId) throws AuthorizationException, YarnException,
       InterruptedException, IOException {
 
+    long startTime = clock.getTime();
+
     ApplicationId applicationId = null;
     try {
       applicationId = ApplicationId.fromString(appId);
     } catch (IllegalArgumentException e) {
+      routerMetrics.incrAppsFailedKilled();
       return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage())
           .build();
     }
 
-    SubClusterId subClusterId =
-        federationFacade.getApplicationHomeSubCluster(applicationId);
-
-    SubClusterInfo subClusterInfo =
-        federationFacade.getSubCluster(subClusterId);
+    SubClusterInfo subClusterInfo = null;
+    SubClusterId subClusterId = null;
+    try {
+      subClusterId =
+          federationFacade.getApplicationHomeSubCluster(applicationId);
+      subClusterInfo = federationFacade.getSubCluster(subClusterId);
+    } catch (YarnException e) {
+      routerMetrics.incrAppsFailedKilled();
+      return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage())
+          .build();
+    }
 
-    return getOrCreateInterceptorForSubCluster(subClusterId,
+    Response response = getOrCreateInterceptorForSubCluster(subClusterId,
         subClusterInfo.getRMWebServiceAddress()).updateAppState(targetState,
             hsr, appId);
+
+    long stopTime = clock.getTime();
+    routerMetrics.succeededAppsRetrieved(stopTime - startTime);
+
+    return response;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8fb13b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
new file mode 100644
index 0000000..3cdafd8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class validates the correctness of Router Federation Interceptor
+ * Metrics.
+ */
+public class TestRouterMetrics {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestRouterMetrics.class);
+
+  // All the operations in the bad subcluster failed.
+  private MockBadSubCluster badSubCluster = new MockBadSubCluster();
+  // All the operations in the bad subcluster succeed.
+  private MockGoodSubCluster goodSubCluster = new MockGoodSubCluster();
+
+  private static RouterMetrics metrics = RouterMetrics.getMetrics();
+
+  @BeforeClass
+  public static void init() {
+
+    LOG.info("Test: aggregate metrics are initialized correctly");
+
+    Assert.assertEquals(0, metrics.getNumSucceededAppsCreated());
+    Assert.assertEquals(0, metrics.getNumSucceededAppsSubmitted());
+    Assert.assertEquals(0, metrics.getNumSucceededAppsKilled());
+    Assert.assertEquals(0, metrics.getNumSucceededAppsRetrieved());
+
+    Assert.assertEquals(0, metrics.getAppsFailedCreated());
+    Assert.assertEquals(0, metrics.getAppsFailedSubmitted());
+    Assert.assertEquals(0, metrics.getAppsFailedKilled());
+    Assert.assertEquals(0, metrics.getAppsFailedRetrieved());
+
+    LOG.info("Test: aggregate metrics are updated correctly");
+  }
+
+  /**
+   * This test validates the correctness of the metric: Created Apps
+   * successfully.
+   */
+  @Test
+  public void testSucceededAppsCreated() {
+
+    long totalGoodBefore = metrics.getNumSucceededAppsCreated();
+
+    goodSubCluster.getNewApplication(100);
+
+    Assert.assertEquals(totalGoodBefore + 1,
+        metrics.getNumSucceededAppsCreated());
+    Assert.assertEquals(100, metrics.getLatencySucceededAppsCreated(), 0);
+
+    goodSubCluster.getNewApplication(200);
+
+    Assert.assertEquals(totalGoodBefore + 2,
+        metrics.getNumSucceededAppsCreated());
+    Assert.assertEquals(150, metrics.getLatencySucceededAppsCreated(), 0);
+  }
+
+  /**
+   * This test validates the correctness of the metric: Failed to create Apps.
+   */
+  @Test
+  public void testAppsFailedCreated() {
+
+    long totalBadbefore = metrics.getAppsFailedCreated();
+
+    badSubCluster.getNewApplication();
+
+    Assert.assertEquals(totalBadbefore + 1, metrics.getAppsFailedCreated());
+  }
+
+  /**
+   * This test validates the correctness of the metric: Submitted Apps
+   * successfully.
+   */
+  @Test
+  public void testSucceededAppsSubmitted() {
+
+    long totalGoodBefore = metrics.getNumSucceededAppsSubmitted();
+
+    goodSubCluster.submitApplication(100);
+
+    Assert.assertEquals(totalGoodBefore + 1,
+        metrics.getNumSucceededAppsSubmitted());
+    Assert.assertEquals(100, metrics.getLatencySucceededAppsSubmitted(), 0);
+
+    goodSubCluster.submitApplication(200);
+
+    Assert.assertEquals(totalGoodBefore + 2,
+        metrics.getNumSucceededAppsSubmitted());
+    Assert.assertEquals(150, metrics.getLatencySucceededAppsSubmitted(), 0);
+  }
+
+  /**
+   * This test validates the correctness of the metric: Failed to submit Apps.
+   */
+  @Test
+  public void testAppsFailedSubmitted() {
+
+    long totalBadbefore = metrics.getAppsFailedSubmitted();
+
+    badSubCluster.submitApplication();
+
+    Assert.assertEquals(totalBadbefore + 1, metrics.getAppsFailedSubmitted());
+  }
+
+  /**
+   * This test validates the correctness of the metric: Killed Apps
+   * successfully.
+   */
+  @Test
+  public void testSucceededAppsKilled() {
+
+    long totalGoodBefore = metrics.getNumSucceededAppsKilled();
+
+    goodSubCluster.forceKillApplication(100);
+
+    Assert.assertEquals(totalGoodBefore + 1,
+        metrics.getNumSucceededAppsKilled());
+    Assert.assertEquals(100, metrics.getLatencySucceededAppsKilled(), 0);
+
+    goodSubCluster.forceKillApplication(200);
+
+    Assert.assertEquals(totalGoodBefore + 2,
+        metrics.getNumSucceededAppsKilled());
+    Assert.assertEquals(150, metrics.getLatencySucceededAppsKilled(), 0);
+  }
+
+  /**
+   * This test validates the correctness of the metric: Failed to kill Apps.
+   */
+  @Test
+  public void testAppsFailedKilled() {
+
+    long totalBadbefore = metrics.getAppsFailedKilled();
+
+    badSubCluster.forceKillApplication();
+
+    Assert.assertEquals(totalBadbefore + 1, metrics.getAppsFailedKilled());
+  }
+
+  /**
+   * This test validates the correctness of the metric: Retrieved Apps
+   * successfully.
+   */
+  @Test
+  public void testSucceededAppsReport() {
+
+    long totalGoodBefore = metrics.getNumSucceededAppsRetrieved();
+
+    goodSubCluster.getApplicationReport(100);
+
+    Assert.assertEquals(totalGoodBefore + 1,
+        metrics.getNumSucceededAppsRetrieved());
+    Assert.assertEquals(100, metrics.getLatencySucceededGetAppReport(), 0);
+
+    goodSubCluster.getApplicationReport(200);
+
+    Assert.assertEquals(totalGoodBefore + 2,
+        metrics.getNumSucceededAppsRetrieved());
+    Assert.assertEquals(150, metrics.getLatencySucceededGetAppReport(), 0);
+  }
+
+  /**
+   * This test validates the correctness of the metric: Failed to retrieve Apps.
+   */
+  @Test
+  public void testAppsReportFailed() {
+
+    long totalBadbefore = metrics.getAppsFailedRetrieved();
+
+    badSubCluster.getApplicationReport();
+
+    Assert.assertEquals(totalBadbefore + 1, metrics.getAppsFailedRetrieved());
+  }
+
+  // Records failures for all calls
+  private class MockBadSubCluster {
+    public void getNewApplication() {
+      LOG.info("Mocked: failed getNewApplication call");
+      metrics.incrAppsFailedCreated();
+    }
+
+    public void submitApplication() {
+      LOG.info("Mocked: failed submitApplication call");
+      metrics.incrAppsFailedSubmitted();
+    }
+
+    public void forceKillApplication() {
+      LOG.info("Mocked: failed forceKillApplication call");
+      metrics.incrAppsFailedKilled();
+    }
+
+    public void getApplicationReport() {
+      LOG.info("Mocked: failed getApplicationReport call");
+      metrics.incrAppsFailedRetrieved();
+    }
+  }
+
+  // Records successes for all calls
+  private class MockGoodSubCluster {
+    public void getNewApplication(long duration) {
+      LOG.info("Mocked: successful getNewApplication call with duration {}",
+          duration);
+      metrics.succeededAppsCreated(duration);
+    }
+
+    public void submitApplication(long duration) {
+      LOG.info("Mocked: successful submitApplication call with duration {}",
+          duration);
+      metrics.succeededAppsSubmitted(duration);
+    }
+
+    public void forceKillApplication(long duration) {
+      LOG.info("Mocked: successful forceKillApplication call with duration {}",
+          duration);
+      metrics.succeededAppsKilled(duration);
+    }
+
+    public void getApplicationReport(long duration) {
+      LOG.info("Mocked: successful getApplicationReport call with duration {}",
+          duration);
+      metrics.succeededAppsRetrieved(duration);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8fb13b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
index d918149..fb6cdd8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
@@ -276,13 +276,11 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest
{
     ApplicationId appId =
         ApplicationId.newInstance(System.currentTimeMillis(), 1);
     AppState appState = new AppState("KILLED");
-    try {
-      interceptor.updateAppState(appState, null, appId.toString());
-      Assert.fail();
-    } catch (YarnException e) {
-      Assert.assertTrue(
-          e.getMessage().equals("Application " + appId + " does not exist"));
-    }
+
+    Response response =
+        interceptor.updateAppState(appState, null, appId.toString());
+    Assert.assertEquals(BAD_REQUEST, response.getStatus());
+
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message