hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wan...@apache.org
Subject [01/42] hadoop git commit: YARN-5603. Metrics for Federation StateStore. (Ellen Hui via asuresh) [Forced Update!]
Date Tue, 22 Aug 2017 23:15:35 GMT
Repository: hadoop
Updated Branches:
  refs/heads/YARN-3926 d84143733 -> b78f1b730 (forced update)


YARN-5603. Metrics for Federation StateStore. (Ellen Hui via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/75abc9a8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/75abc9a8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/75abc9a8

Branch: refs/heads/YARN-3926
Commit: 75abc9a8e2cf1c7d2c574ede720df59421512be3
Parents: b6bfb2f
Author: Arun Suresh <asuresh@apache.org>
Authored: Mon Aug 21 22:43:08 2017 -0700
Committer: Arun Suresh <asuresh@apache.org>
Committed: Mon Aug 21 22:43:08 2017 -0700

----------------------------------------------------------------------
 .../store/impl/SQLFederationStateStore.java     |  79 ++++++++
 .../FederationStateStoreClientMetrics.java      | 184 +++++++++++++++++++
 .../federation/store/metrics/package-info.java  |  17 ++
 .../TestFederationStateStoreClientMetrics.java  | 146 +++++++++++++++
 4 files changed, 426 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/75abc9a8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
index 63d8e42..533f9c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
 import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
+import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics;
 import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
 import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
@@ -72,6 +73,8 @@ import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembership
 import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
 import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
 import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.MonotonicClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -137,6 +140,7 @@ public class SQLFederationStateStore implements FederationStateStore {
   private String url;
   private int maximumPoolSize;
   private HikariDataSource dataSource = null;
+  private final Clock clock = new MonotonicClock();
 
   @Override
   public void init(Configuration conf) throws YarnException {
@@ -203,7 +207,9 @@ public class SQLFederationStateStore implements FederationStateStore {
       cstmt.registerOutParameter(9, java.sql.Types.INTEGER);
 
       // Execute the query
+      long startTime = clock.getTime();
       cstmt.executeUpdate();
+      long stopTime = clock.getTime();
 
       // Check the ROWCOUNT value, if it is equal to 0 it means the call
       // did not add a new subcluster into FederationStateStore
@@ -222,8 +228,11 @@ public class SQLFederationStateStore implements FederationStateStore
{
 
       LOG.info(
           "Registered the SubCluster " + subClusterId + " into the StateStore");
+      FederationStateStoreClientMetrics
+          .succeededStateStoreCall(stopTime - startTime);
 
     } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
       FederationStateStoreUtils.logAndThrowRetriableException(LOG,
           "Unable to register the SubCluster " + subClusterId
               + " into the StateStore",
@@ -260,7 +269,9 @@ public class SQLFederationStateStore implements FederationStateStore {
       cstmt.registerOutParameter(3, java.sql.Types.INTEGER);
 
       // Execute the query
+      long startTime = clock.getTime();
       cstmt.executeUpdate();
+      long stopTime = clock.getTime();
 
       // Check the ROWCOUNT value, if it is equal to 0 it means the call
       // did not deregister the subcluster into FederationStateStore
@@ -278,8 +289,11 @@ public class SQLFederationStateStore implements FederationStateStore
{
 
       LOG.info("Deregistered the SubCluster " + subClusterId + " state to "
           + state.toString());
+      FederationStateStoreClientMetrics
+          .succeededStateStoreCall(stopTime - startTime);
 
     } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
       FederationStateStoreUtils.logAndThrowRetriableException(LOG,
           "Unable to deregister the sub-cluster " + subClusterId + " state to "
               + state.toString(),
@@ -317,7 +331,9 @@ public class SQLFederationStateStore implements FederationStateStore {
       cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
 
       // Execute the query
+      long startTime = clock.getTime();
       cstmt.executeUpdate();
+      long stopTime = clock.getTime();
 
       // Check the ROWCOUNT value, if it is equal to 0 it means the call
       // did not update the subcluster into FederationStateStore
@@ -336,8 +352,11 @@ public class SQLFederationStateStore implements FederationStateStore
{
 
       LOG.info("Heartbeated the StateStore for the specified SubCluster "
           + subClusterId);
+      FederationStateStoreClientMetrics
+          .succeededStateStoreCall(stopTime - startTime);
 
     } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
       FederationStateStoreUtils.logAndThrowRetriableException(LOG,
           "Unable to heartbeat the StateStore for the specified SubCluster "
               + subClusterId,
@@ -378,7 +397,9 @@ public class SQLFederationStateStore implements FederationStateStore {
       cstmt.registerOutParameter(9, java.sql.Types.VARCHAR);
 
       // Execute the query
+      long startTime = clock.getTime();
       cstmt.execute();
+      long stopTime = clock.getTime();
 
       String amRMAddress = cstmt.getString(2);
       String clientRMAddress = cstmt.getString(3);
@@ -403,6 +424,9 @@ public class SQLFederationStateStore implements FederationStateStore {
           clientRMAddress, rmAdminAddress, webAppAddress, lastHeartBeat, state,
           lastStartTime, capability);
 
+      FederationStateStoreClientMetrics
+          .succeededStateStoreCall(stopTime - startTime);
+
       // Check if the output it is a valid subcluster
       try {
         FederationMembershipStateStoreInputValidator
@@ -417,6 +441,7 @@ public class SQLFederationStateStore implements FederationStateStore {
             + subClusterInfo.toString());
       }
     } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
       FederationStateStoreUtils.logAndThrowRetriableException(LOG,
           "Unable to obtain the SubCluster information for " + subClusterId, e);
     } finally {
@@ -439,7 +464,9 @@ public class SQLFederationStateStore implements FederationStateStore {
       cstmt = conn.prepareCall(CALL_SP_GET_SUBCLUSTERS);
 
       // Execute the query
+      long startTime = clock.getTime();
       rs = cstmt.executeQuery();
+      long stopTime = clock.getTime();
 
       while (rs.next()) {
 
@@ -459,6 +486,10 @@ public class SQLFederationStateStore implements FederationStateStore
{
             amRMAddress, clientRMAddress, rmAdminAddress, webAppAddress,
             lastHeartBeat, state, lastStartTime, capability);
 
+        FederationStateStoreClientMetrics
+            .succeededStateStoreCall(stopTime - startTime);
+
+
         // Check if the output it is a valid subcluster
         try {
           FederationMembershipStateStoreInputValidator
@@ -477,6 +508,7 @@ public class SQLFederationStateStore implements FederationStateStore {
       }
 
     } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
       FederationStateStoreUtils.logAndThrowRetriableException(LOG,
           "Unable to obtain the information for all the SubClusters ", e);
     } finally {
@@ -513,11 +545,16 @@ public class SQLFederationStateStore implements FederationStateStore
{
       cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
 
       // Execute the query
+      long startTime = clock.getTime();
       cstmt.executeUpdate();
+      long stopTime = clock.getTime();
 
       subClusterHome = cstmt.getString(3);
       SubClusterId subClusterIdHome = SubClusterId.newInstance(subClusterHome);
 
+      FederationStateStoreClientMetrics
+          .succeededStateStoreCall(stopTime - startTime);
+
       // For failover reason, we check the returned SubClusterId.
       // If it is equal to the subclusterId we sent, the call added the new
       // application into FederationStateStore. If the call returns a different
@@ -554,6 +591,7 @@ public class SQLFederationStateStore implements FederationStateStore {
       }
 
     } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
       FederationStateStoreUtils
           .logAndThrowRetriableException(LOG,
               "Unable to insert the newly generated application "
@@ -592,7 +630,9 @@ public class SQLFederationStateStore implements FederationStateStore {
       cstmt.registerOutParameter(3, java.sql.Types.INTEGER);
 
       // Execute the query
+      long startTime = clock.getTime();
       cstmt.executeUpdate();
+      long stopTime = clock.getTime();
 
       // Check the ROWCOUNT value, if it is equal to 0 it means the call
       // did not update the application into FederationStateStore
@@ -611,8 +651,11 @@ public class SQLFederationStateStore implements FederationStateStore
{
       LOG.info(
           "Update the SubCluster to {} for application {} in the StateStore",
           subClusterId, appId);
+      FederationStateStoreClientMetrics
+          .succeededStateStoreCall(stopTime - startTime);
 
     } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
       FederationStateStoreUtils
           .logAndThrowRetriableException(LOG,
               "Unable to update the application "
@@ -645,7 +688,9 @@ public class SQLFederationStateStore implements FederationStateStore {
       cstmt.registerOutParameter(2, java.sql.Types.VARCHAR);
 
       // Execute the query
+      long startTime = clock.getTime();
       cstmt.execute();
+      long stopTime = clock.getTime();
 
       if (cstmt.getString(2) != null) {
         homeRM = SubClusterId.newInstance(cstmt.getString(2));
@@ -659,7 +704,12 @@ public class SQLFederationStateStore implements FederationStateStore
{
         LOG.debug("Got the information about the specified application  "
             + request.getApplicationId() + ". The AM is running in " + homeRM);
       }
+
+      FederationStateStoreClientMetrics
+          .succeededStateStoreCall(stopTime - startTime);
+
     } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
       FederationStateStoreUtils.logAndThrowRetriableException(LOG,
           "Unable to obtain the application information "
               + "for the specified application " + request.getApplicationId(),
@@ -688,7 +738,9 @@ public class SQLFederationStateStore implements FederationStateStore {
       cstmt = conn.prepareCall(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER);
 
       // Execute the query
+      long startTime = clock.getTime();
       rs = cstmt.executeQuery();
+      long stopTime = clock.getTime();
 
       while (rs.next()) {
 
@@ -701,7 +753,11 @@ public class SQLFederationStateStore implements FederationStateStore
{
             SubClusterId.newInstance(homeSubCluster)));
       }
 
+      FederationStateStoreClientMetrics
+          .succeededStateStoreCall(stopTime - startTime);
+
     } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
       FederationStateStoreUtils.logAndThrowRetriableException(LOG,
           "Unable to obtain the information for all the applications ", e);
     } finally {
@@ -731,7 +787,9 @@ public class SQLFederationStateStore implements FederationStateStore {
       cstmt.registerOutParameter(2, java.sql.Types.INTEGER);
 
       // Execute the query
+      long startTime = clock.getTime();
       cstmt.executeUpdate();
+      long stopTime = clock.getTime();
 
       // Check the ROWCOUNT value, if it is equal to 0 it means the call
       // did not delete the application from FederationStateStore
@@ -750,8 +808,11 @@ public class SQLFederationStateStore implements FederationStateStore
{
 
       LOG.info("Delete from the StateStore the application: {}",
           request.getApplicationId());
+      FederationStateStoreClientMetrics
+          .succeededStateStoreCall(stopTime - startTime);
 
     } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
       FederationStateStoreUtils.logAndThrowRetriableException(LOG,
           "Unable to delete the application " + request.getApplicationId(), e);
     } finally {
@@ -782,7 +843,9 @@ public class SQLFederationStateStore implements FederationStateStore {
       cstmt.registerOutParameter(3, java.sql.Types.VARBINARY);
 
       // Execute the query
+      long startTime = clock.getTime();
       cstmt.executeUpdate();
+      long stopTime = clock.getTime();
 
       // Check if the output it is a valid policy
       if (cstmt.getString(2) != null && cstmt.getBytes(3) != null) {
@@ -798,7 +861,11 @@ public class SQLFederationStateStore implements FederationStateStore
{
         return null;
       }
 
+      FederationStateStoreClientMetrics
+          .succeededStateStoreCall(stopTime - startTime);
+
     } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
       FederationStateStoreUtils.logAndThrowRetriableException(LOG,
           "Unable to select the policy for the queue :" + request.getQueue(),
           e);
@@ -833,7 +900,9 @@ public class SQLFederationStateStore implements FederationStateStore {
       cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
 
       // Execute the query
+      long startTime = clock.getTime();
       cstmt.executeUpdate();
+      long stopTime = clock.getTime();
 
       // Check the ROWCOUNT value, if it is equal to 0 it means the call
       // did not add a new policy into FederationStateStore
@@ -852,8 +921,11 @@ public class SQLFederationStateStore implements FederationStateStore
{
 
       LOG.info("Insert into the state store the policy for the queue: "
           + policyConf.getQueue());
+      FederationStateStoreClientMetrics
+          .succeededStateStoreCall(stopTime - startTime);
 
     } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
       FederationStateStoreUtils.logAndThrowRetriableException(LOG,
           "Unable to insert the newly generated policy for the queue :"
               + policyConf.getQueue(),
@@ -880,7 +952,9 @@ public class SQLFederationStateStore implements FederationStateStore {
       cstmt = conn.prepareCall(CALL_SP_GET_POLICIES_CONFIGURATIONS);
 
       // Execute the query
+      long startTime = clock.getTime();
       rs = cstmt.executeQuery();
+      long stopTime = clock.getTime();
 
       while (rs.next()) {
 
@@ -894,7 +968,12 @@ public class SQLFederationStateStore implements FederationStateStore
{
                 ByteBuffer.wrap(policyInfo));
         policyConfigurations.add(subClusterPolicyConfiguration);
       }
+
+      FederationStateStoreClientMetrics
+          .succeededStateStoreCall(stopTime - startTime);
+
     } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
       FederationStateStoreUtils.logAndThrowRetriableException(LOG,
           "Unable to obtain the policy information for all the queues.", e);
     } finally {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/75abc9a8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/FederationStateStoreClientMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/FederationStateStoreClientMetrics.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/FederationStateStoreClientMetrics.java
new file mode 100644
index 0000000..27b46cd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/FederationStateStoreClientMetrics.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.metrics;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
+import org.apache.hadoop.metrics2.lib.MutableRate;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Performance metrics for FederationStateStore implementations.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+@Metrics(about = "Performance and usage metrics for Federation StateStore",
+         context = "fedr")
+public final class FederationStateStoreClientMetrics implements MetricsSource {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(FederationStateStoreClientMetrics.class);
+
+  private static final MetricsRegistry REGISTRY =
+      new MetricsRegistry("FederationStateStoreClientMetrics");
+  private final static Method[] STATESTORE_API_METHODS =
+      FederationStateStore.class.getMethods();
+
+  // Map method names to counter objects
+  private static final Map<String, MutableCounterLong> API_TO_FAILED_CALLS =
+      new HashMap<String, MutableCounterLong>();
+  private static final Map<String, MutableRate> API_TO_SUCCESSFUL_CALLS =
+      new HashMap<String, MutableRate>();
+
+  // Provide quantile latency for each api call.
+  private static final Map<String, MutableQuantiles> API_TO_QUANTILE_METRICS =
+      new HashMap<String, MutableQuantiles>();
+
+  // Error string templates for logging calls from methods not in
+  // FederationStateStore API
+  private static final String UNKOWN_FAIL_ERROR_MSG =
+      "Not recording failed call for unknown FederationStateStore method {}";
+  private static final String UNKNOWN_SUCCESS_ERROR_MSG =
+      "Not recording successful call for unknown "
+          + "FederationStateStore method {}";
+
+  // Aggregate metrics are shared, and don't have to be looked up per call
+  @Metric("Total number of successful calls and latency(ms)")
+  private static MutableRate totalSucceededCalls;
+
+  @Metric("Total number of failed StateStore calls")
+  private static MutableCounterLong totalFailedCalls;
+
+  // This after the static members are initialized, or the constructor will
+  // throw a NullPointerException
+  private static final FederationStateStoreClientMetrics S_INSTANCE =
+      DefaultMetricsSystem.instance()
+          .register(new FederationStateStoreClientMetrics());
+
+  synchronized public static FederationStateStoreClientMetrics getInstance() {
+    return S_INSTANCE;
+  }
+
+  private FederationStateStoreClientMetrics() {
+    // Create the metrics for each method and put them into the map
+    for (Method m : STATESTORE_API_METHODS) {
+      String methodName = m.getName();
+      LOG.debug("Registering Federation StateStore Client metrics for {}",
+          methodName);
+
+      // This metric only records the number of failed calls; it does not
+      // capture latency information
+      API_TO_FAILED_CALLS.put(methodName,
+          REGISTRY.newCounter(methodName + "_numFailedCalls",
+              "# failed calls to " + methodName, 0L));
+
+      // This metric records both the number and average latency of successful
+      // calls.
+      API_TO_SUCCESSFUL_CALLS.put(methodName,
+          REGISTRY.newRate(methodName + "_successfulCalls",
+              "# successful calls and latency(ms) for" + methodName));
+
+      // This metric records the quantile-based latency of each successful call,
+      // re-sampled every 10 seconds.
+      API_TO_QUANTILE_METRICS.put(methodName,
+          REGISTRY.newQuantiles(methodName + "Latency",
+              "Quantile latency (ms) for " + methodName, "ops", "latency", 10));
+    }
+  }
+
+  public static void failedStateStoreCall() {
+    String methodName =
+        Thread.currentThread().getStackTrace()[2].getMethodName();
+    MutableCounterLong methodMetric = API_TO_FAILED_CALLS.get(methodName);
+    if (methodMetric == null) {
+      LOG.error(UNKOWN_FAIL_ERROR_MSG, methodName);
+      return;
+    }
+
+    totalFailedCalls.incr();
+    methodMetric.incr();
+  }
+
+  public static void succeededStateStoreCall(long duration) {
+    String methodName =
+        Thread.currentThread().getStackTrace()[2].getMethodName();
+    MutableRate methodMetric = API_TO_SUCCESSFUL_CALLS.get(methodName);
+    MutableQuantiles methodQuantileMetric =
+        API_TO_QUANTILE_METRICS.get(methodName);
+    if (methodMetric == null || methodQuantileMetric == null) {
+      LOG.error(UNKNOWN_SUCCESS_ERROR_MSG, methodName);
+      return;
+    }
+
+    totalSucceededCalls.add(duration);
+    methodMetric.add(duration);
+    methodQuantileMetric.add(duration);
+  }
+
+  @Override
+  public void getMetrics(MetricsCollector collector, boolean all) {
+    REGISTRY.snapshot(collector.addRecord(REGISTRY.info()), all);
+  }
+
+  // Getters for unit testing
+  @VisibleForTesting
+  static long getNumFailedCallsForMethod(String methodName) {
+    return API_TO_FAILED_CALLS.get(methodName).value();
+  }
+
+  @VisibleForTesting
+  static long getNumSucceessfulCallsForMethod(String methodName) {
+    return API_TO_SUCCESSFUL_CALLS.get(methodName).lastStat().numSamples();
+  }
+
+  @VisibleForTesting
+  static double getLatencySucceessfulCallsForMethod(String methodName) {
+    return API_TO_SUCCESSFUL_CALLS.get(methodName).lastStat().mean();
+  }
+
+  @VisibleForTesting
+  static long getNumFailedCalls() {
+    return totalFailedCalls.value();
+  }
+
+  @VisibleForTesting
+  static long getNumSucceededCalls() {
+    return totalSucceededCalls.lastStat().numSamples();
+  }
+
+  @VisibleForTesting
+  static double getLatencySucceededCalls() {
+    return totalSucceededCalls.lastStat().mean();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/75abc9a8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/package-info.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/package-info.java
new file mode 100644
index 0000000..eb548f4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/package-info.java
@@ -0,0 +1,17 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.metrics;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/75abc9a8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/metrics/TestFederationStateStoreClientMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/metrics/TestFederationStateStoreClientMetrics.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/metrics/TestFederationStateStoreClientMetrics.java
new file mode 100644
index 0000000..241d5e2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/metrics/TestFederationStateStoreClientMetrics.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.metrics;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unittests for {@link FederationStateStoreClientMetrics}.
+ *
+ */
+public class TestFederationStateStoreClientMetrics {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestFederationStateStoreClientMetrics.class);
+
+  private MockBadFederationStateStore badStateStore =
+      new MockBadFederationStateStore();
+  private MockGoodFederationStateStore goodStateStore =
+      new MockGoodFederationStateStore();
+
+  @Test
+  public void testAggregateMetricInit() {
+    LOG.info("Test: aggregate metrics are initialized correctly");
+
+    Assert.assertEquals(0,
+        FederationStateStoreClientMetrics.getNumSucceededCalls());
+    Assert.assertEquals(0,
+        FederationStateStoreClientMetrics.getNumFailedCalls());
+
+    LOG.info("Test: aggregate metrics are updated correctly");
+  }
+
+  @Test
+  public void testSuccessfulCalls() {
+    LOG.info("Test: Aggregate and method successful calls updated correctly");
+
+    long totalGoodBefore =
+        FederationStateStoreClientMetrics.getNumSucceededCalls();
+    long apiGoodBefore = FederationStateStoreClientMetrics
+        .getNumSucceessfulCallsForMethod("registerSubCluster");
+
+    goodStateStore.registerSubCluster(100);
+
+    Assert.assertEquals(totalGoodBefore + 1,
+        FederationStateStoreClientMetrics.getNumSucceededCalls());
+    Assert.assertEquals(100,
+        FederationStateStoreClientMetrics.getLatencySucceededCalls(), 0);
+    Assert.assertEquals(apiGoodBefore + 1,
+        FederationStateStoreClientMetrics.getNumSucceededCalls());
+    Assert.assertEquals(100, FederationStateStoreClientMetrics
+        .getLatencySucceessfulCallsForMethod("registerSubCluster"), 0);
+
+    LOG.info("Test: Running stats correctly calculated for 2 metrics");
+
+    goodStateStore.registerSubCluster(200);
+
+    Assert.assertEquals(totalGoodBefore + 2,
+        FederationStateStoreClientMetrics.getNumSucceededCalls());
+    Assert.assertEquals(150,
+        FederationStateStoreClientMetrics.getLatencySucceededCalls(), 0);
+    Assert.assertEquals(apiGoodBefore + 2,
+        FederationStateStoreClientMetrics.getNumSucceededCalls());
+    Assert.assertEquals(150, FederationStateStoreClientMetrics
+        .getLatencySucceessfulCallsForMethod("registerSubCluster"), 0);
+
+  }
+
+  @Test
+  public void testFailedCalls() {
+
+    long totalBadbefore = FederationStateStoreClientMetrics.getNumFailedCalls();
+    long apiBadBefore = FederationStateStoreClientMetrics
+        .getNumFailedCallsForMethod("registerSubCluster");
+
+    badStateStore.registerSubCluster();
+
+    LOG.info("Test: Aggregate and method failed calls updated correctly");
+    Assert.assertEquals(totalBadbefore + 1,
+        FederationStateStoreClientMetrics.getNumFailedCalls());
+    Assert.assertEquals(apiBadBefore + 1, FederationStateStoreClientMetrics
+        .getNumFailedCallsForMethod("registerSubCluster"));
+
+  }
+
+  @Test
+  public void testCallsUnknownMethod() {
+
+    long totalBadbefore = FederationStateStoreClientMetrics.getNumFailedCalls();
+    long apiBadBefore = FederationStateStoreClientMetrics
+        .getNumFailedCallsForMethod("registerSubCluster");
+    long totalGoodBefore =
+        FederationStateStoreClientMetrics.getNumSucceededCalls();
+    long apiGoodBefore = FederationStateStoreClientMetrics
+        .getNumSucceessfulCallsForMethod("registerSubCluster");
+
+    LOG.info("Calling Metrics class directly");
+    FederationStateStoreClientMetrics.failedStateStoreCall();
+    FederationStateStoreClientMetrics.succeededStateStoreCall(100);
+
+    LOG.info("Test: Aggregate and method calls did not update");
+    Assert.assertEquals(totalBadbefore,
+        FederationStateStoreClientMetrics.getNumFailedCalls());
+    Assert.assertEquals(apiBadBefore, FederationStateStoreClientMetrics
+        .getNumFailedCallsForMethod("registerSubCluster"));
+
+    Assert.assertEquals(totalGoodBefore,
+        FederationStateStoreClientMetrics.getNumSucceededCalls());
+    Assert.assertEquals(apiGoodBefore, FederationStateStoreClientMetrics
+        .getNumSucceessfulCallsForMethod("registerSubCluster"));
+
+  }
+
+  // Records failures for all calls
+  private class MockBadFederationStateStore {
+    public void registerSubCluster() {
+      LOG.info("Mocked: failed registerSubCluster call");
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+    }
+  }
+
+  // Records successes for all calls
+  private class MockGoodFederationStateStore {
+    public void registerSubCluster(long duration) {
+      LOG.info("Mocked: successful registerSubCluster call with duration {}",
+          duration);
+      FederationStateStoreClientMetrics.succeededStateStoreCall(duration);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message