ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From swa...@apache.org
Subject ambari git commit: Capture HDFS metrics per RPC port number in AMS and Grafana. (swagle)
Date Fri, 04 Mar 2016 19:57:37 GMT
Repository: ambari
Updated Branches:
  refs/heads/trunk 211a48d56 -> f3659cce6


Capture HDFS metrics per RPC port number in AMS and Grafana. (swagle)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f3659cce
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f3659cce
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f3659cce

Branch: refs/heads/trunk
Commit: f3659cce6946895bc8fdda0bfb1232643d49223a
Parents: 211a48d
Author: Siddharth Wagle <swagle@hortonworks.com>
Authored: Fri Mar 4 11:57:22 2016 -0800
Committer: Siddharth Wagle <swagle@hortonworks.com>
Committed: Fri Mar 4 11:57:28 2016 -0800

----------------------------------------------------------------------
 .../timeline/AbstractTimelineMetricsSink.java   |   6 +-
 .../timeline/HadoopTimelineMetricsSink.java     | 112 +++++++----
 .../timeline/HadoopTimelineMetricsSinkTest.java | 184 +++++++++++++++++--
 .../server/api/services/AmbariMetaInfo.java     |  96 ++++++----
 .../metrics/timeline/AMSPropertyProvider.java   |  27 ++-
 .../controller/utilities/PropertyHelper.java    |  77 ++++++++
 .../server/upgrade/UpgradeCatalog222.java       |  90 +++++++++
 .../common-services/HDFS/2.1.0.2.0/widgets.json |  58 ++++--
 .../2.0.6/hooks/before-START/scripts/params.py  |  57 ++++++
 .../templates/hadoop-metrics2.properties.j2     |  11 ++
 .../StackArtifactResourceProviderTest.java      |  50 ++++-
 .../utilities/PropertyHelperTest.java           |  14 ++
 .../server/upgrade/UpgradeCatalog222Test.java   |  89 +++++++--
 13 files changed, 756 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/f3659cce/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index b2810b7..28d3b9c 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -47,6 +47,9 @@ public abstract class AbstractTimelineMetricsSink {
   public static final String COLLECTOR_PROPERTY = "collector";
   public static final int DEFAULT_POST_TIMEOUT_SECONDS = 10;
   public static final String SKIP_COUNTER_TRANSFROMATION = "skipCounterDerivative";
+  public static final String RPC_METRIC_PREFIX = "metric.rpc";
+  public static final String RPC_METRIC_NAME_SUFFIX = "suffix";
+  public static final String RPC_METRIC_PORT_SUFFIX = "port";
 
   public static final String WS_V1_TIMELINE_METRICS = "/ws/v1/timeline/metrics";
 
@@ -78,8 +81,7 @@ public abstract class AbstractTimelineMetricsSink {
     HttpURLConnection connection = null;
     try {
       if (connectUrl == null) {
-        throw new IOException("Unknown URL. " +
-          "Unable to connect to metrics collector.");
+        throw new IOException("Unknown URL. Unable to connect to metrics collector.");
       }
       String jsonData = mapper.writeValueAsString(metrics);
       connection = connectUrl.startsWith("https") ?

http://git-wip-us.apache.org/repos/asf/ambari/blob/f3659cce/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
index 6da9257..db8791f 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
@@ -17,31 +17,31 @@
  */
 package org.apache.hadoop.metrics2.sink.timeline;
 
-import java.net.SocketAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.commons.configuration.SubsetConfiguration;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricType;
 import org.apache.hadoop.metrics2.MetricsRecord;
 import org.apache.hadoop.metrics2.MetricsSink;
 import org.apache.hadoop.metrics2.MetricsTag;
-import org.apache.hadoop.metrics2.MetricType;
 import org.apache.hadoop.metrics2.impl.MsInfo;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.apache.hadoop.metrics2.util.Servers;
 import org.apache.hadoop.net.DNS;
 
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink implements MetricsSink {
@@ -54,9 +54,13 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
   private static final String SERVICE_NAME_PREFIX = "serviceName-prefix";
   private static final String SERVICE_NAME = "serviceName";
   private int timeoutSeconds = 10;
+  private SubsetConfiguration conf;
+  // Cache the rpc port used and the suffix to use if the port tag is found
+  private Map<String, String> rpcPortSuffixes = new HashMap<>(10);
 
   @Override
   public void init(SubsetConfiguration conf) {
+    this.conf = conf;
     LOG.info("Initializing Timeline metrics sink.");
 
     // Take the hostname from the DNS class.
@@ -83,8 +87,7 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
     if (metricsServers == null || metricsServers.isEmpty()) {
       LOG.error("No Metric collector configured.");
     } else {
-      collectorUri = conf.getString(COLLECTOR_PROPERTY).trim()
-          + WS_V1_TIMELINE_METRICS;
+      collectorUri = conf.getString(COLLECTOR_PROPERTY).trim() + WS_V1_TIMELINE_METRICS;
       if (collectorUri.toLowerCase().startsWith("https://")) {
         String trustStorePath = conf.getString(SSL_KEYSTORE_PATH_PROPERTY).trim();
         String trustStoreType = conf.getString(SSL_KEYSTORE_TYPE_PROPERTY).trim();
@@ -109,27 +112,40 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
     Iterator<String> it = (Iterator<String>) conf.getKeys();
     while (it.hasNext()) {
       String propertyName = it.next();
-      if (propertyName != null && propertyName.startsWith(TAGS_FOR_PREFIX_PROPERTY_PREFIX)) {
-        String contextName = propertyName.substring(TAGS_FOR_PREFIX_PROPERTY_PREFIX.length());
-        String[] tags = conf.getStringArray(propertyName);
-        boolean useAllTags = false;
-        Set<String> set = null;
-        if (tags.length > 0) {
-          set = new HashSet<String>();
-          for (String tag : tags) {
-            tag = tag.trim();
-            useAllTags |= tag.equals("*");
-            if (tag.length() > 0) {
-              set.add(tag);
+      if (propertyName != null) {
+        if (propertyName.startsWith(TAGS_FOR_PREFIX_PROPERTY_PREFIX)) {
+          String contextName = propertyName.substring(TAGS_FOR_PREFIX_PROPERTY_PREFIX.length());
+          String[] tags = conf.getStringArray(propertyName);
+          boolean useAllTags = false;
+          Set<String> set = null;
+          if (tags.length > 0) {
+            set = new HashSet<String>();
+            for (String tag : tags) {
+              tag = tag.trim();
+              useAllTags |= tag.equals("*");
+              if (tag.length() > 0) {
+                set.add(tag);
+              }
+            }
+            if (useAllTags) {
+              set = null;
             }
           }
-          if (useAllTags) {
-            set = null;
-          }
+          useTagsMap.put(contextName, set);
+        }
+        // Customized RPC ports
+        if (propertyName.startsWith(RPC_METRIC_PREFIX)) {
+          // metric.rpc.client.port
+          int beginIdx = RPC_METRIC_PREFIX.length() + 1;
+          String suffixStr = propertyName.substring(beginIdx); // client.port
+          String configPrefix = suffixStr.substring(0, suffixStr.indexOf(".")); // client
+          rpcPortSuffixes.put(conf.getString(propertyName).trim(), configPrefix.trim());
         }
-        useTagsMap.put(contextName, set);
       }
     }
+    if (!rpcPortSuffixes.isEmpty()) {
+      LOG.info("RPC port properties configured: " + rpcPortSuffixes);
+    }
   }
 
   /**
@@ -172,14 +188,41 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
       StringBuilder sb = new StringBuilder();
       sb.append(contextName);
       sb.append('.');
-      sb.append(recordName);
+      // Similar to GangliaContext adding processName to distinguish jvm
+      // metrics for co-hosted daemons. We only do this for HBase since the
+      // appId is shared for Master and RS.
+      if (contextName.equals("jvm")) {
+        if (record.tags() != null) {
+          for (MetricsTag tag : record.tags()) {
+            if (tag.info().name().equalsIgnoreCase("processName") &&
+               (tag.value().equals("RegionServer") || tag.value().equals("Master"))) {
+              sb.append(tag.value());
+              sb.append('.');
+            }
+          }
+        }
+      }
 
+      sb.append(recordName);
       appendPrefix(record, sb);
-      sb.append(".");
+      sb.append('.');
+
+      // Add port tag for rpc metrics to distinguish rpc calls based on port
+      if (!rpcPortSuffixes.isEmpty() && contextName.contains("rpc")) {
+        if (record.tags() != null) {
+          for (MetricsTag tag : record.tags()) {
+            if (tag.info().name().equalsIgnoreCase("port") &&
+                rpcPortSuffixes.keySet().contains(tag.value())) {
+              sb.append(rpcPortSuffixes.get(tag.value()));
+              sb.append('.');
+            }
+          }
+        }
+      }
+
       int sbBaseLen = sb.length();
 
-      Collection<AbstractMetric> metrics =
-        (Collection<AbstractMetric>) record.metrics();
+      Collection<AbstractMetric> metrics = (Collection<AbstractMetric>) record.metrics();
 
       List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
       long startTime = record.timestamp();
@@ -247,4 +290,5 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
   public void flush() {
     // TODO: Buffering implementation
   }
+
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/f3659cce/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
index 528384e..4a5abcc 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
@@ -18,6 +18,34 @@
 
 package org.apache.hadoop.metrics2.sink.timeline;
 
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricType;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.OutputStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
 import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.COLLECTOR_PROPERTY;
 import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.MAX_METRIC_ROW_CACHE_SIZE;
 import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.METRICS_SEND_INTERVAL;
@@ -31,28 +59,14 @@ import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
 
-import java.io.OutputStream;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.commons.configuration.SubsetConfiguration;
-import org.apache.hadoop.metrics2.AbstractMetric;
-import org.apache.hadoop.metrics2.MetricType;
-import org.apache.hadoop.metrics2.MetricsRecord;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
 @RunWith(PowerMockRunner.class)
 public class HadoopTimelineMetricsSinkTest {
 
+  @Before
+  public void setup() {
+    Logger.getLogger("org.apache.hadoop.metrics2.sink.timeline").setLevel(Level.DEBUG);
+  }
+
   @Test
   @PrepareForTest({URL.class, OutputStream.class})
   public void testPutMetrics() throws Exception {
@@ -241,5 +255,137 @@ public class HadoopTimelineMetricsSinkTest {
     Assert.assertEquals(new Double(6.0), values.next());
   }
 
+  @Test
+  public void testRPCPortSuffixHandledCorrectly() throws Exception {
+    HadoopTimelineMetricsSink sink =
+      createMockBuilder(HadoopTimelineMetricsSink.class)
+        .withConstructor().addMockedMethod("appendPrefix")
+        .addMockedMethod("emitMetrics").createNiceMock();
+
+    SubsetConfiguration conf = createNiceMock(SubsetConfiguration.class);
+    expect(conf.getString(eq("slave.host.name"))).andReturn("testhost").anyTimes();
+    expect(conf.getParent()).andReturn(null).anyTimes();
+    expect(conf.getPrefix()).andReturn("service").anyTimes();
+    expect(conf.getString(eq(COLLECTOR_PROPERTY))).andReturn("localhost:63188").anyTimes();
+    expect(conf.getString(eq("serviceName-prefix"), eq(""))).andReturn("").anyTimes();
+
+    expect(conf.getInt(eq(MAX_METRIC_ROW_CACHE_SIZE), anyInt())).andReturn(10).anyTimes();
+    expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(10).anyTimes();
+
+    conf.setListDelimiter(eq(','));
+    expectLastCall().anyTimes();
+
+    Set<String> rpcPortSuffixes = new HashSet<String>() {{
+      add("metric.rpc.client.port");
+      add("metric.rpc.datanode.port");
+      add("metric.rpc.healthcheck.port");
+    }};
+
+    expect(conf.getKeys()).andReturn(rpcPortSuffixes.iterator());
+    expect(conf.getString("metric.rpc.client.port")).andReturn("8020");
+    expect(conf.getString("metric.rpc.datanode.port")).andReturn("8040");
+    expect(conf.getString("metric.rpc.healthcheck.port")).andReturn("8060");
 
+    AbstractMetric metric = createNiceMock(AbstractMetric.class);
+    expect(metric.name()).andReturn("rpc.metricName").anyTimes();
+    expect(metric.value()).andReturn(1.0).once();
+    expect(metric.value()).andReturn(2.0).once();
+    expect(metric.value()).andReturn(3.0).once();
+    expect(metric.value()).andReturn(4.0).once();
+    expect(metric.value()).andReturn(5.0).once();
+    expect(metric.value()).andReturn(6.0).once();
+
+    MetricsRecord record = createNiceMock(MetricsRecord.class);
+    expect(record.name()).andReturn("testMetric").anyTimes();
+    expect(record.context()).andReturn("rpc").anyTimes();
+    Collection<MetricsTag> tags1 = Collections.singletonList(
+      new MetricsTag(new MetricsInfo() {
+        @Override
+        public String name() {
+          return "port";
+        }
+
+        @Override
+        public String description() {
+          return null;
+        }
+      }, "8020")
+    );
+    Collection<MetricsTag> tags2 = Collections.singletonList(
+      new MetricsTag(new MetricsInfo() {
+        @Override
+        public String name() {
+          return "port";
+        }
+
+        @Override
+        public String description() {
+          return null;
+        }
+      }, "8040")
+    );
+    expect(record.tags()).andReturn(tags1).times(6);
+    expect(record.tags()).andReturn(tags2).times(6);
+
+    sink.appendPrefix(eq(record), (StringBuilder) anyObject());
+    expectLastCall().anyTimes().andStubAnswer(new IAnswer<Object>() {
+      @Override
+      public Object answer() throws Throwable {
+        return null;
+      }
+    });
+
+    final Long now = System.currentTimeMillis();
+    // TODO: Current implementation of cache needs > 1 elements to evict any
+    expect(record.timestamp()).andReturn(now).times(2);
+    expect(record.timestamp()).andReturn(now + 100l).times(2);
+    expect(record.timestamp()).andReturn(now + 200l).once();
+    expect(record.timestamp()).andReturn(now + 300l).once();
+
+    expect(record.metrics()).andReturn(Arrays.asList(metric)).anyTimes();
+
+    final List<TimelineMetrics> capturedMetrics = new ArrayList<TimelineMetrics>();
+    sink.emitMetrics((TimelineMetrics) anyObject());
+    expectLastCall().andStubAnswer(new IAnswer<Object>() {
+      @Override
+      public Object answer() throws Throwable {
+        capturedMetrics.add((TimelineMetrics) EasyMock.getCurrentArguments()[0]);
+        return null;
+      }
+    });
+
+    replay(conf, sink, record, metric);
+
+    sink.init(conf);
+
+    // time = t1
+    sink.putMetrics(record);
+    // time = t1
+    sink.putMetrics(record);
+    // time = t2
+    sink.putMetrics(record);
+    // Evict
+    // time = t2
+    sink.putMetrics(record);
+    // time = t3
+    sink.putMetrics(record);
+    // time = t4
+    sink.putMetrics(record);
+
+    verify(conf, sink, record, metric);
+
+    Assert.assertEquals(2, capturedMetrics.size());
+    Iterator<TimelineMetrics> metricsIterator = capturedMetrics.iterator();
+
+    // t1, t2
+    TimelineMetric timelineMetric1 = metricsIterator.next().getMetrics().get(0);
+    Assert.assertEquals(2, timelineMetric1.getMetricValues().size());
+    // Assert the tag added to the name
+    Assert.assertEquals("rpc.testMetric.client.rpc.metricName", timelineMetric1.getMetricName());
+    // t3, t4
+    TimelineMetric timelineMetric2 = metricsIterator.next().getMetrics().get(0);
+    Assert.assertEquals(2, timelineMetric2.getMetricValues().size());
+    // Assert the tag added to the name
+    Assert.assertEquals("rpc.testMetric.datanode.rpc.metricName", timelineMetric2.getMetricName());
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/f3659cce/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
index 81aced4..2b863d5 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
@@ -27,6 +27,7 @@ import org.apache.ambari.server.ParentObjectNotFoundException;
 import org.apache.ambari.server.StackAccessException;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.RootServiceResponseFactory.Services;
+import org.apache.ambari.server.controller.utilities.PropertyHelper;
 import org.apache.ambari.server.customactions.ActionDefinition;
 import org.apache.ambari.server.customactions.ActionDefinitionManager;
 import org.apache.ambari.server.events.AlertDefinitionDisabledEvent;
@@ -335,7 +336,7 @@ public class AmbariMetaInfo {
 
     DependencyInfo foundDependency = null;
     List<DependencyInfo> componentDependencies = getComponentDependencies(
-        stackName, version, service, component);
+      stackName, version, service, component);
     Iterator<DependencyInfo> iter = componentDependencies.iterator();
     while (foundDependency == null && iter.hasNext()) {
       DependencyInfo dependency = iter.next();
@@ -363,7 +364,7 @@ public class AmbariMetaInfo {
     for (RepositoryInfo repo : repository) {
       if (!reposResult.containsKey(repo.getOsType())) {
         reposResult.put(repo.getOsType(),
-            new ArrayList<RepositoryInfo>());
+          new ArrayList<RepositoryInfo>());
       }
       reposResult.get(repo.getOsType()).add(repo);
     }
@@ -879,7 +880,7 @@ public class AmbariMetaInfo {
       try {
         map = gson.fromJson(new FileReader(svc.getMetricsFile()), type);
 
-        svc.setMetrics(updateComponentMetricMapWithAggregateFunctionIds(map));
+        svc.setMetrics(processMetricDefinition(map));
 
       } catch (Exception e) {
         LOG.error ("Could not read the metrics file", e);
@@ -892,43 +893,52 @@ public class AmbariMetaInfo {
 
   /**
    * Add aggregate function support for all stack defined metrics.
+   *
+   * Refactor Namenode RPC metrics for different kinds of ports.
    */
-  private Map<String, Map<String, List<MetricDefinition>>> updateComponentMetricMapWithAggregateFunctionIds(
-      Map<String, Map<String, List<MetricDefinition>>> metricMap) {
+  private Map<String, Map<String, List<MetricDefinition>>> processMetricDefinition(
+    Map<String, Map<String, List<MetricDefinition>>> metricMap) {
 
     if (!metricMap.isEmpty()) {
         // For every Component
-      for (Map<String, List<MetricDefinition>> componentMetricDef :  metricMap.values()) {
+      for (Map.Entry<String, Map<String, List<MetricDefinition>>> componentMetricDefEntry : metricMap.entrySet()) {
+        String componentName = componentMetricDefEntry.getKey();
         // For every Component / HostComponent category
-        for (Map.Entry<String, List<MetricDefinition>> metricDefEntry : componentMetricDef.entrySet()) {
-          // NOTE: Only Component aggregates supported for now.
-          if (metricDefEntry.getKey().equals(Component.name())) {
-            //For every metric definition
-            for (MetricDefinition metricDefinition : metricDefEntry.getValue()) {
-              // Metrics System metrics only
-              if (metricDefinition.getType().equals("ganglia")) {
-                // Create a new map for each category
-                for (Map<String, Metric> metricByCategory : metricDefinition.getMetricsByCategory().values()) {
-                  Map<String, Metric> newMetrics = new HashMap<String, Metric>();
-
-                  // For every function id
-                  for (String identifierToAdd : AGGREGATE_FUNCTION_IDENTIFIERS) {
-
-                    for (Map.Entry<String, Metric> metricEntry : metricByCategory.entrySet()) {
-                      String newMetricKey = metricEntry.getKey() + identifierToAdd;
-                      Metric currentMetric = metricEntry.getValue();
-                      Metric newMetric = new Metric(
-                        currentMetric.getName() + identifierToAdd,
-                        currentMetric.isPointInTime(),
-                        currentMetric.isTemporal(),
-                        currentMetric.isAmsHostMetric(),
-                        currentMetric.getUnit()
-                      );
-                      newMetrics.put(newMetricKey, newMetric);
+        for (Map.Entry<String, List<MetricDefinition>> metricDefEntry : componentMetricDefEntry.getValue().entrySet()) {
+          //For every metric definition
+          for (MetricDefinition metricDefinition : metricDefEntry.getValue()) {
+            // Metrics System metrics only
+            if (metricDefinition.getType().equals("ganglia")) {
+              for (Map.Entry<String, Map<String, Metric>> metricByCategory : metricDefinition.getMetricsByCategory().entrySet()) {
+                String category = metricByCategory.getKey();
+                Iterator<Map.Entry<String, Metric>> iterator = metricByCategory.getValue().entrySet().iterator();
+                Map<String, Metric> newMetricsToAdd = new HashMap<>();
+
+                while (iterator.hasNext()) {
+                  Map.Entry<String, Metric> metricEntry = iterator.next();
+                  // Process Namenode rpc metrics
+                  Map<String, Metric> replacementMetrics = PropertyHelper.processRpcMetricDefinition(
+                    componentName, metricEntry.getKey(), metricEntry.getValue());
+                  if (replacementMetrics != null) {
+                    iterator.remove(); // Remove current metric entry
+                    newMetricsToAdd.putAll(replacementMetrics);
+                    // Add aggregate functions for replacement metrics
+                    if (metricDefEntry.getKey().equals(Component.name())) {
+                      for (Map.Entry<String, Metric> replacementMetric : replacementMetrics.entrySet()) {
+                        newMetricsToAdd.putAll(getAggregateFunctionMetrics(replacementMetric.getKey(),
+                          replacementMetric.getValue()));
+                      }
+                    }
+                  } else {
+                    // NOTE: Only Component aggregates supported for now.
+                    if (metricDefEntry.getKey().equals(Component.name())) {
+                      Map<String, Metric> aggregateFunctionMetrics =
+                        getAggregateFunctionMetrics(metricEntry.getKey(), metricEntry.getValue());
+                      newMetricsToAdd.putAll(aggregateFunctionMetrics);
                     }
                   }
-                  metricByCategory.putAll(newMetrics);
                 }
+                metricByCategory.getValue().putAll(newMetricsToAdd);
               }
             }
           }
@@ -939,6 +949,24 @@ public class AmbariMetaInfo {
     return metricMap;
   }
 
+  private Map<String, Metric> getAggregateFunctionMetrics(String metricName, Metric currentMetric) {
+    Map<String, Metric> newMetrics = new HashMap<String, Metric>();
+    // For every function id
+    for (String identifierToAdd : AGGREGATE_FUNCTION_IDENTIFIERS) {
+      String newMetricKey = metricName + identifierToAdd;
+      Metric newMetric = new Metric(
+        currentMetric.getName() + identifierToAdd,
+        currentMetric.isPointInTime(),
+        currentMetric.isTemporal(),
+        currentMetric.isAmsHostMetric(),
+        currentMetric.getUnit()
+      );
+      newMetrics.put(newMetricKey, newMetric);
+    }
+
+    return newMetrics;
+  }
+
   /**
    * Gets the metrics for a Role (component).
    * @return the list of defined metrics.
@@ -947,8 +975,8 @@ public class AmbariMetaInfo {
       String serviceName, String componentName, String metricType)
       throws AmbariException {
 
-    Map<String, Map<String, List<MetricDefinition>>> map =
-      getServiceMetrics(stackName, stackVersion, serviceName);
+    Map<String, Map<String, List<MetricDefinition>>> map = getServiceMetrics(
+      stackName, stackVersion, serviceName);
 
     if (map != null && map.containsKey(componentName)) {
       if (map.get(componentName).containsKey(metricType)) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/f3659cce/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
index a674371..4bc9fd7 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
@@ -69,6 +69,13 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
   private static AtomicInteger printSkipPopulateMsgHostCompCounter = new AtomicInteger(0);
   private static final Map<String, String> timelineAppIdCache = new ConcurrentHashMap<>(10);
 
+  private static final Map<String, String> JVM_PROCESS_NAMES = new HashMap<>(2);
+
+  static {
+    JVM_PROCESS_NAMES.put("HBASE_MASTER", "Master.");
+    JVM_PROCESS_NAMES.put("HBASE_REGIONSERVER", "RegionServer.");
+  }
+
   public AMSPropertyProvider(Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap,
                              URLStreamProvider streamProvider,
                              ComponentSSLConfiguration configuration,
@@ -411,6 +418,7 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
         if (metricsMap != null) {
           for (String propertyId : propertyIdSet) {
             if (propertyId != null) {
+//              propertyId = postProcessPropertyId(propertyId, getComponentName(resource));
               if (metricsMap.containsKey(propertyId)){
                 if (containsArguments(propertyId)) {
                   int i = 1;
@@ -630,7 +638,9 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
               requests.put(temporalInfo, metricsRequest);
             }
             metricsRequest.putResource(getComponentName(resource), resource);
-            metricsRequest.putPropertyId(propertyInfo.getPropertyId(), propertyId);
+            metricsRequest.putPropertyId(
+              preprocessPropertyId(propertyInfo.getPropertyId(), getComponentName(resource)),
+              propertyId);
             // If request is for a host metric we need to create multiple requests
             if (propertyInfo.isAmsHostMetric()) {
               metricsRequest.putHosComponentHostMetric(propertyInfo.getPropertyId());
@@ -643,6 +653,21 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
     return requestMap;
   }
 
+  /**
+   * Account for the processName added to the jvm metrics by the HadoopSink.
+   * E.g.: jvm.RegionServer.JvmMetrics.GcTimeMillis
+   *
+   */
+  private String preprocessPropertyId(String propertyId, String componentName) {
+    if (propertyId.startsWith("jvm") && JVM_PROCESS_NAMES.keySet().contains(componentName)) {
+      String newPropertyId = propertyId.replace("jvm.", "jvm." + JVM_PROCESS_NAMES.get(componentName));
+      LOG.debug("Pre-process: " + propertyId + ", to: " + newPropertyId);
+      return newPropertyId;
+    }
+
+    return propertyId;
+  }
+
   static URIBuilder getAMSUriBuilder(String hostname, int port, boolean httpsEnabled) {
     URIBuilder uriBuilder = new URIBuilder();
     uriBuilder.setScheme(httpsEnabled ? "https" : "http");

http://git-wip-us.apache.org/repos/asf/ambari/blob/f3659cce/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/PropertyHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/PropertyHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/PropertyHelper.java
index cefe953..41da279 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/PropertyHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/PropertyHelper.java
@@ -36,6 +36,7 @@ import org.apache.ambari.server.controller.spi.Request;
 import org.apache.ambari.server.controller.spi.Resource;
 import org.apache.ambari.server.controller.spi.SortRequest;
 import org.apache.ambari.server.controller.spi.TemporalInfo;
+import org.apache.ambari.server.state.stack.Metric;
 import org.apache.commons.lang.StringUtils;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.type.TypeReference;
@@ -67,6 +68,9 @@ public class PropertyHelper {
   private static final Map<Resource.InternalType, Map<String, Map<String, PropertyInfo>>> SQLSERVER_PROPERTY_IDS = readPropertyProviderIds(SQLSERVER_PROPERTIES_FILE);
   private static final Map<Resource.InternalType, Map<Resource.Type, String>> KEY_PROPERTY_IDS = readKeyPropertyIds(KEY_PROPERTIES_FILE);
 
+  // Suffixes to add for Namenode rpc metrics prefixes
+  private static final Map<String, List<String>> RPC_METRIC_SUFFIXES = new HashMap<>();
+
   /**
    * Regular expression to check for replacement arguments (e.g. $1) in a property id.
    */
@@ -92,6 +96,11 @@ public class PropertyHelper {
    */
   private static final Pattern METRIC_CATEGORY_TOKENIZE_REGEX = Pattern.compile("/+(?=([^\"\\\\\\\\]*(\\\\\\\\.|\"([^\"\\\\\\\\]*\\\\\\\\.)*[^\"\\\\\\\\]*\"))*[^\"]*$)");
 
+  static {
+    RPC_METRIC_SUFFIXES.put("rpc.rpc", Arrays.asList("client", "datanode", "healthcheck"));
+    RPC_METRIC_SUFFIXES.put("rpcdetailed.rpcdetailed", Arrays.asList("client", "datanode", "healthcheck"));
+  }
+
   public static String getPropertyId(String category, String name) {
     String propertyId =  (category == null || category.isEmpty())? name :
         (name == null || name.isEmpty()) ? category : category + EXTERNAL_PATH_SEP + name;
@@ -626,4 +635,72 @@ public class PropertyHelper {
     }
     return false;
   }
+
+
+  /**
+   * Special handle rpc port tags added to metric names for HDFS Namenode
+   *
+   * Returns the replacement definitions
+   */
+  public static Map<String, org.apache.ambari.server.state.stack.Metric> processRpcMetricDefinition(
+      String componentName, String propertyId, org.apache.ambari.server.state.stack.Metric metric) {
+    Map<String, org.apache.ambari.server.state.stack.Metric> replacementMap = null;
+    if (componentName.equalsIgnoreCase("NAMENODE")) {
+      for (Map.Entry<String, List<String>> entry : RPC_METRIC_SUFFIXES.entrySet()) {
+        String prefix = entry.getKey();
+        if (metric.getName().startsWith(prefix)) {
+          replacementMap = new HashMap<>();
+          for (String suffix : entry.getValue()) {
+            org.apache.ambari.server.state.stack.Metric newMetric = new org.apache.ambari.server.state.stack.Metric(
+              insertTagInToMetricName(suffix, metric.getName(), prefix),
+              metric.isPointInTime(),
+              metric.isTemporal(),
+              metric.isAmsHostMetric(),
+              metric.getUnit()
+            );
+
+            replacementMap.put(insertTagInToMetricName(suffix, propertyId, prefix), newMetric);
+          }
+        }
+      }
+    }
+    return replacementMap;
+  }
+
+  /**
+   * Returns tag inserted metric name after the prefix.
+   * @param tag E.g.: client
+   * @param metricName : rpc.rpc.CallQueueLength Or metrics/rpc/CallQueueLen
+   * @param prefix : rpc.rpc
+   * @return rpc.rpc.client.CallQueueLength Or metrics/rpc/client/CallQueueLen
+   */
+  static String insertTagInToMetricName(String tag, String metricName, String prefix) {
+    String sepExpr = "\\.";
+    String seperator = ".";
+    if (metricName.indexOf(EXTERNAL_PATH_SEP) != -1) {
+      sepExpr = Character.toString(EXTERNAL_PATH_SEP);
+      seperator = sepExpr;
+    }
+    String prefixSep = prefix.contains(".") ? "\\." : "" + EXTERNAL_PATH_SEP;
+
+    // Remove separator if any
+    if (prefix.substring(prefix.length() - 1).equals(prefixSep)) {
+      prefix = prefix.substring(0, prefix.length() - 1);
+    }
+    int pos = prefix.split(prefixSep).length - 1;
+    String[] parts = metricName.split(sepExpr);
+    StringBuilder sb = new StringBuilder();
+
+    for (int i = 0; i < parts.length; i++) {
+      sb.append(parts[i]);
+      if (i < parts.length - 1) {
+        sb.append(seperator);
+      }
+      if (i == pos) { // append the tag
+        sb.append(tag);
+        sb.append(seperator);
+      }
+    }
+    return sb.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/f3659cce/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog222.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog222.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog222.java
index 8ea455d..78cf8bd 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog222.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog222.java
@@ -18,25 +18,38 @@
 
 package org.apache.ambari.server.upgrade;
 
+import com.google.gson.Gson;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
+import com.google.gson.reflect.TypeToken;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
 import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
 import org.apache.ambari.server.orm.dao.DaoUtils;
+import org.apache.ambari.server.orm.dao.WidgetDAO;
 import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
+import org.apache.ambari.server.orm.entities.WidgetEntity;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Config;
 import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.ServiceInfo;
 import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.StackInfo;
+import org.apache.ambari.server.state.stack.WidgetLayout;
+import org.apache.ambari.server.state.stack.WidgetLayoutInfo;
 import org.apache.ambari.server.utils.VersionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.FileReader;
+import java.lang.reflect.Type;
 import java.sql.SQLException;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -80,6 +93,9 @@ public class UpgradeCatalog222 extends AbstractUpgradeCatalog {
   public static final String CLUSTER_HOUR_TABLE_TTL = "timeline.metrics.cluster.aggregator.hourly.ttl";
   public static final String CLUSTER_DAILY_TABLE_TTL = "timeline.metrics.cluster.aggregator.daily.ttl";
 
+  private static final String[] HDFS_WIDGETS_TO_UPDATE = new String[] {
+    "NameNode RPC", "NN Connection Load" };
+
 
   // ----- Constructors ------------------------------------------------------
 
@@ -133,6 +149,7 @@ public class UpgradeCatalog222 extends AbstractUpgradeCatalog {
     updateAMSConfigs();
     updateHiveConfig();
     updateHostRoleCommands();
+    updateHDFSWidgetDefinition();
   }
 
   protected void updateStormConfigs() throws  AmbariException {
@@ -336,6 +353,79 @@ public class UpgradeCatalog222 extends AbstractUpgradeCatalog {
     }
   }
 
+  protected void updateHDFSWidgetDefinition() throws AmbariException {
+    LOG.info("Updating HDFS widget definition.");
+    AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class);
+    AmbariMetaInfo ambariMetaInfo = injector.getInstance(AmbariMetaInfo.class);
+    Type widgetLayoutType = new TypeToken<Map<String, List<WidgetLayout>>>(){}.getType();
+    Gson gson = injector.getInstance(Gson.class);
+    WidgetDAO widgetDAO = injector.getInstance(WidgetDAO.class);
+
+    Clusters clusters = ambariManagementController.getClusters();
+
+    Map<String, Cluster> clusterMap = getCheckedClusterMap(clusters);
+    for (final Cluster cluster : clusterMap.values()) {
+      long clusterID = cluster.getClusterId();
+
+      for (String widgetName : HDFS_WIDGETS_TO_UPDATE) {
+        List<WidgetEntity> widgetEntities = widgetDAO.findByName(clusterID,
+          widgetName, "ambari", "HDFS_SUMMARY");
+
+        if (widgetEntities != null) {
+          WidgetEntity entityToUpdate = null;
+          if (widgetEntities.size() > 1) {
+            LOG.info("Found more that 1 entity with name = "+ widgetName +
+              " for cluster = " + cluster.getClusterName() + ", skipping update.");
+          } else {
+            entityToUpdate = widgetEntities.iterator().next();
+          }
+          if (entityToUpdate != null) {
+            LOG.info("Updating widget: " + entityToUpdate.getWidgetName());
+            // Get the definition from widgets.json file
+            WidgetLayoutInfo targetWidgetLayoutInfo = null;
+            StackId stackId = cluster.getDesiredStackVersion();
+            Map<String, Object> widgetDescriptor = null;
+            StackInfo stackInfo = ambariMetaInfo.getStack(stackId.getStackName(), stackId.getStackVersion());
+            ServiceInfo serviceInfo = stackInfo.getService("HDFS");
+            File widgetDescriptorFile = serviceInfo.getWidgetsDescriptorFile();
+            if (widgetDescriptorFile != null && widgetDescriptorFile.exists()) {
+              try {
+                widgetDescriptor = gson.fromJson(new FileReader(widgetDescriptorFile), widgetLayoutType);
+              } catch (Exception ex) {
+                String msg = "Error loading widgets from file: " + widgetDescriptorFile;
+                LOG.error(msg, ex);
+                widgetDescriptor = null;
+              }
+            }
+            if (widgetDescriptor != null) {
+              LOG.debug("Loaded widget descriptor: " + widgetDescriptor);
+              for (Object artifact : widgetDescriptor.values()) {
+                List<WidgetLayout> widgetLayouts = (List<WidgetLayout>) artifact;
+                for (WidgetLayout widgetLayout : widgetLayouts) {
+                  if (widgetLayout.getLayoutName().equals("default_hdfs_dashboard")) {
+                    for (WidgetLayoutInfo layoutInfo : widgetLayout.getWidgetLayoutInfoList()) {
+                      if (layoutInfo.getWidgetName().equals(widgetName)) {
+                        targetWidgetLayoutInfo = layoutInfo;
+                      }
+                    }
+                  }
+                }
+              }
+            }
+            if (targetWidgetLayoutInfo != null) {
+              entityToUpdate.setMetrics(gson.toJson(targetWidgetLayoutInfo.getMetricsInfo()));
+              entityToUpdate.setWidgetValues(gson.toJson(targetWidgetLayoutInfo.getValues()));
+              widgetDAO.merge(entityToUpdate);
+            } else {
+              LOG.warn("Unable to find widget layout info for " + widgetName +
+                " in the stack: " + stackId);
+            }
+          }
+        }
+      }
+    }
+  }
+
   protected void updateHiveConfig() throws AmbariException {
     AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class);
     for (final Cluster cluster : getCheckedClusterMap(ambariManagementController.getClusters()).values()) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/f3659cce/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/widgets.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/widgets.json b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/widgets.json
index 7e93a6e..89aab13 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/widgets.json
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/widgets.json
@@ -74,8 +74,16 @@
           "is_visible": true,
           "metrics": [
             {
-              "name": "rpc.rpc.NumOpenConnections",
-              "metric_path": "metrics/rpc/NumOpenConnections",
+              "name": "rpc.rpc.client.NumOpenConnections",
+              "metric_path": "metrics/rpc/client/NumOpenConnections",
+              "category": "",
+              "service_name": "HDFS",
+              "component_name": "NAMENODE",
+              "host_component_criteria": "host_components/metrics/dfs/FSNamesystem/HAState=active"
+            },
+            {
+              "name": "rpc.rpc.datanode.NumOpenConnections",
+              "metric_path": "metrics/rpc/datanode/NumOpenConnections",
               "category": "",
               "service_name": "HDFS",
               "component_name": "NAMENODE",
@@ -84,8 +92,12 @@
           ],
           "values": [
             {
-              "name": "Open Connections",
-              "value": "${rpc.rpc.NumOpenConnections}"
+              "name": "Open Client Connections",
+              "value": "${rpc.rpc.client.NumOpenConnections}"
+            },
+            {
+              "name": "Open Datanode Connections",
+              "value": "${rpc.rpc.datanode.NumOpenConnections}"
             }
           ],
           "properties": {
@@ -216,15 +228,29 @@
           "is_visible": true,
           "metrics": [
             {
-              "name": "rpc.rpc.RpcQueueTimeAvgTime",
-              "metric_path": "metrics/rpc/RpcQueueTime_avg_time",
+              "name": "rpc.rpc.client.RpcQueueTimeAvgTime",
+              "metric_path": "metrics/rpc/client/RpcQueueTime_avg_time",
+              "service_name": "HDFS",
+              "component_name": "NAMENODE",
+              "host_component_criteria": "host_components/metrics/dfs/FSNamesystem/HAState=active"
+            },
+            {
+              "name": "rpc.rpc.client.RpcProcessingTimeAvgTime",
+              "metric_path": "metrics/rpc/client/RpcProcessingTime_avg_time",
               "service_name": "HDFS",
               "component_name": "NAMENODE",
               "host_component_criteria": "host_components/metrics/dfs/FSNamesystem/HAState=active"
             },
             {
-              "name": "rpc.rpc.RpcProcessingTimeAvgTime",
-              "metric_path": "metrics/rpc/RpcProcessingTime_avg_time",
+              "name": "rpc.rpc.datanode.RpcQueueTimeAvgTime",
+              "metric_path": "metrics/rpc/datanode/RpcQueueTime_avg_time",
+              "service_name": "HDFS",
+              "component_name": "NAMENODE",
+              "host_component_criteria": "host_components/metrics/dfs/FSNamesystem/HAState=active"
+            },
+            {
+              "name": "rpc.rpc.datanode.RpcProcessingTimeAvgTime",
+              "metric_path": "metrics/rpc/datanode/RpcProcessingTime_avg_time",
               "service_name": "HDFS",
               "component_name": "NAMENODE",
               "host_component_criteria": "host_components/metrics/dfs/FSNamesystem/HAState=active"
@@ -232,12 +258,20 @@
           ],
           "values": [
             {
-              "name": "RPC Queue Wait time",
-              "value": "${rpc.rpc.RpcQueueTimeAvgTime}"
+              "name": "Client RPC Queue Wait time",
+              "value": "${rpc.rpc.client.RpcQueueTimeAvgTime}"
+            },
+            {
+              "name": "Client RPC Processing time",
+              "value": "${rpc.rpc.client.RpcProcessingTimeAvgTime}"
+            },
+            {
+              "name": "Datanode RPC Queue Wait time",
+              "value": "${rpc.rpc.datanode.RpcQueueTimeAvgTime}"
             },
             {
-              "name": "RPC Processing time",
-              "value": "${rpc.rpc.RpcProcessingTimeAvgTime}"
+              "name": "Datanode RPC Processing time",
+              "value": "${rpc.rpc.datanode.RpcProcessingTimeAvgTime}"
             }
           ],
           "properties": {

http://git-wip-us.apache.org/repos/asf/ambari/blob/f3659cce/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
index 4d129c7..069d1ae 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
@@ -236,6 +236,63 @@ smoke_user =  config['configurations']['cluster-env']['smokeuser']
 smoke_hdfs_user_dir = format("/user/{smoke_user}")
 smoke_hdfs_user_mode = 0770
 
+
+##### Namenode RPC ports - metrics config section start #####
+
+# Figure out the rpc ports for current namenode
+nn_rpc_client_port = None
+nn_rpc_dn_port = None
+nn_rpc_healthcheck_port = None
+
+namenode_id = None
+namenode_rpc = None
+
+dfs_ha_enabled = False
+dfs_ha_nameservices = default("/configurations/hdfs-site/dfs.nameservices", None)
+dfs_ha_namenode_ids = default(format("/configurations/hdfs-site/dfs.ha.namenodes.{dfs_ha_nameservices}"), None)
+
+dfs_ha_namemodes_ids_list = []
+other_namenode_id = None
+
+if dfs_ha_namenode_ids:
+ dfs_ha_namemodes_ids_list = dfs_ha_namenode_ids.split(",")
+ dfs_ha_namenode_ids_array_len = len(dfs_ha_namemodes_ids_list)
+ if dfs_ha_namenode_ids_array_len > 1:
+   dfs_ha_enabled = True
+
+if dfs_ha_enabled:
+ for nn_id in dfs_ha_namemodes_ids_list:
+   nn_host = config['configurations']['hdfs-site'][format('dfs.namenode.rpc-address.{dfs_ha_nameservices}.{nn_id}')]
+   if hostname in nn_host:
+     namenode_id = nn_id
+     namenode_rpc = nn_host
+   pass
+ pass
+else:
+ namenode_rpc = default('/configurations/hdfs-site/dfs.namenode.rpc-address', None)
+
+if namenode_rpc:
+ nn_rpc_client_port = namenode_rpc.split(':')[1].strip()
+
+if dfs_ha_enabled:
+ dfs_service_rpc_address = default(format('/configurations/hdfs-site/dfs.namenode.servicerpc-address.{namenode_id}'), None)
+ dfs_lifeline_rpc_address = default(format('/configurations/hdfs-site/dfs.namenode.lifeline.rpc-address.{namenode_id}'), None)
+else:
+ dfs_service_rpc_address = default('/configurations/hdfs-site/dfs.namenode.servicerpc-address', None)
+ dfs_lifeline_rpc_address = default(format('/configurations/hdfs-site/dfs.namenode.lifeline.rpc-address'), None)
+
+if dfs_service_rpc_address:
+ nn_rpc_dn_port = dfs_service_rpc_address.split(':')[1].strip()
+
+if dfs_lifeline_rpc_address:
+ nn_rpc_healthcheck_port = dfs_lifeline_rpc_address.split(':')[1].strip()
+
+is_nn_client_port_configured = False if nn_rpc_client_port is None else True
+is_nn_dn_port_configured = False if nn_rpc_dn_port is None else True
+is_nn_healthcheck_port_configured = False if nn_rpc_healthcheck_port is None else True
+
+##### end #####
+
 import functools
 #create partial functions with common arguments for every HdfsResource call
 #to create/delete/copyfromlocal hdfs directories/files we need to call params.HdfsResource in code

http://git-wip-us.apache.org/repos/asf/ambari/blob/f3659cce/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2 b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
index f9c2164..47b504f 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
@@ -91,4 +91,15 @@ reducetask.sink.timeline.collector={{metric_collector_protocol}}://{{metric_coll
 
 resourcemanager.sink.timeline.tagsForPrefix.yarn=Queue
 
+{% if is_nn_client_port_configured %}
+# Namenode rpc ports customization
+namenode.sink.timeline.metric.rpc.client.port={{nn_rpc_client_port}}
+{% endif %}
+{% if is_nn_dn_port_configured %}
+namenode.sink.timeline.metric.rpc.datanode.port={{nn_rpc_dn_port}}
+{% endif %}
+{% if is_nn_healthcheck_port_configured %}
+namenode.sink.timeline.metric.rpc.healthcheck.port={{nn_rpc_healthcheck_port}}
+{% endif %}
+
 {% endif %}

http://git-wip-us.apache.org/repos/asf/ambari/blob/f3659cce/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackArtifactResourceProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackArtifactResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackArtifactResourceProviderTest.java
index f4c212c..fda5e79 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackArtifactResourceProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackArtifactResourceProviderTest.java
@@ -118,8 +118,7 @@ public class StackArtifactResourceProviderTest {
     Map<String, Object> descriptor = propertyMap.get(ARTIFACT_DATA_PROPERTY_ID + "/HDFS/DATANODE");
     Assert.assertNotNull(descriptor);
     Assert.assertEquals(1, ((ArrayList) descriptor.get("Component")).size());
-    MetricDefinition md = (MetricDefinition) ((ArrayList) descriptor.get
-      ("Component")).iterator().next();
+    MetricDefinition md = (MetricDefinition) ((ArrayList) descriptor.get("Component")).iterator().next();
 
     Metric m1 = md.getMetrics().get("metrics/dfs/datanode/heartBeats_avg_time");
     Metric m2 = md.getMetrics().get("metrics/rpc/closeRegion_num_ops");
@@ -132,6 +131,53 @@ public class StackArtifactResourceProviderTest {
   }
 
   @Test
+  public void testGetMetricsDescriptorRpcForNamenode() throws Exception {
+    AmbariManagementController managementController = createNiceMock(AmbariManagementController.class);
+
+    expect(managementController.getAmbariMetaInfo()).andReturn(metaInfo).anyTimes();
+
+    replay(managementController);
+
+    StackArtifactResourceProvider resourceProvider = getStackArtifactResourceProvider(managementController);
+
+    Set<String> propertyIds = new HashSet<String>();
+    propertyIds.add(ARTIFACT_NAME_PROPERTY_ID);
+    propertyIds.add(STACK_NAME_PROPERTY_ID);
+    propertyIds.add(STACK_VERSION_PROPERTY_ID);
+    propertyIds.add(STACK_SERVICE_NAME_PROPERTY_ID);
+    propertyIds.add(ARTIFACT_DATA_PROPERTY_ID);
+
+    Request request = PropertyHelper.getReadRequest(propertyIds);
+
+    Predicate predicate = new PredicateBuilder().property
+      (ARTIFACT_NAME_PROPERTY_ID).equals("metrics_descriptor").and().property
+      (STACK_NAME_PROPERTY_ID).equals("OTHER").and().property
+      (STACK_VERSION_PROPERTY_ID).equals("1.0").and().property
+      (STACK_SERVICE_NAME_PROPERTY_ID).equals("HDFS").toPredicate();
+
+    Set<Resource> resources = resourceProvider.getResources(request, predicate);
+
+    Assert.assertEquals(1, resources.size());
+    Resource resource = resources.iterator().next();
+    Map<String, Map<String, Object>> propertyMap = resource.getPropertiesMap();
+    Map<String, Object> descriptor = propertyMap.get(ARTIFACT_DATA_PROPERTY_ID + "/HDFS/NAMENODE");
+    Assert.assertNotNull(descriptor);
+    Assert.assertEquals(2, ((ArrayList) descriptor.get("Component")).size());
+    MetricDefinition md = (MetricDefinition) ((ArrayList) descriptor.get("Component")).iterator().next();
+
+    Assert.assertEquals("rpcdetailed.rpcdetailed.client.BlockReceivedAndDeletedAvgTime",
+      md.getMetrics().get("metrics/rpcdetailed/client/blockReceived_avg_time").getName());
+
+    Assert.assertEquals("rpc.rpc.healthcheck.CallQueueLength",
+      md.getMetrics().get("metrics/rpc/healthcheck/callQueueLen").getName());
+
+    Assert.assertEquals("rpcdetailed.rpcdetailed.datanode.DeleteNumOps",
+      md.getMetrics().get("metrics/rpcdetailed/datanode/delete_num_ops").getName());
+
+    verify(managementController);
+  }
+
+  @Test
   @SuppressWarnings("unchecked")
   public void testGetWidgetDescriptorForService() throws Exception {
     AmbariManagementController managementController = createNiceMock(AmbariManagementController.class);

http://git-wip-us.apache.org/repos/asf/ambari/blob/f3659cce/ambari-server/src/test/java/org/apache/ambari/server/controller/utilities/PropertyHelperTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/utilities/PropertyHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/utilities/PropertyHelperTest.java
index d450177..2beb462 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/utilities/PropertyHelperTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/utilities/PropertyHelperTest.java
@@ -302,6 +302,20 @@ public class PropertyHelperTest {
     }
   }
 
+  @Test
+  public void testInsertTagIntoMetricName() {
+    Assert.assertEquals("rpc.rpc.client.CallQueueLength",
+      PropertyHelper.insertTagInToMetricName("client", "rpc.rpc.CallQueueLength", "rpc.rpc"));
+
+    Assert.assertEquals("rpc.rpc.client.CallQueueLength",
+      PropertyHelper.insertTagInToMetricName("client", "rpc.rpc.CallQueueLength", "rpc.rpc."));
+
+    Assert.assertEquals("metrics/rpc/client/CallQueueLen",
+      PropertyHelper.insertTagInToMetricName("client", "metrics/rpc/CallQueueLen", "rpc.rpc"));
+
+    Assert.assertEquals("metrics/rpc/client/CallQueueLen",
+      PropertyHelper.insertTagInToMetricName("client", "metrics/rpc/CallQueueLen", "rpc.rpc."));
+  }
 
   // remove any replacement tokens (e.g. $1.replaceAll(\",q(\\d+)=\",\"/\").substring(1)) in the metric names
   private static Map<String, Map<String, PropertyInfo>> normalizeMetricNames(Map<String, Map<String, PropertyInfo>> gids) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/f3659cce/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog222Test.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog222Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog222Test.java
index 1d18206..f6dcb18 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog222Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog222Test.java
@@ -40,24 +40,34 @@ import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
 import org.apache.ambari.server.orm.dao.StackDAO;
+import org.apache.ambari.server.orm.dao.WidgetDAO;
 import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
 import org.apache.ambari.server.orm.entities.StackEntity;
+import org.apache.ambari.server.orm.entities.WidgetEntity;
+import org.apache.ambari.server.stack.StackManagerFactory;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Config;
 import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.ServiceInfo;
 import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.StackInfo;
 import org.apache.ambari.server.state.stack.OsFamily;
+import org.apache.commons.io.FileUtils;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import javax.persistence.EntityManager;
+import java.io.File;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -83,7 +93,8 @@ public class UpgradeCatalog222Test {
   private UpgradeCatalogHelper upgradeCatalogHelper;
   private StackEntity desiredStackEntity;
 
-
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
   @Before
   public void init() {
@@ -114,16 +125,18 @@ public class UpgradeCatalog222Test {
     Method updateAMSConfigs = UpgradeCatalog222.class.getDeclaredMethod("updateAMSConfigs");
     Method updateHiveConfigs = UpgradeCatalog222.class.getDeclaredMethod("updateHiveConfig");
     Method updateHostRoleCommands = UpgradeCatalog222.class.getDeclaredMethod("updateHostRoleCommands");
+    Method updateHDFSWidget = UpgradeCatalog222.class.getDeclaredMethod("updateHDFSWidgetDefinition");
 
 
     UpgradeCatalog222 upgradeCatalog222 = createMockBuilder(UpgradeCatalog222.class)
-            .addMockedMethod(addNewConfigurationsFromXml)
-            .addMockedMethod(updateAlerts)
-            .addMockedMethod(updateStormConfigs)
-            .addMockedMethod(updateAMSConfigs)
-            .addMockedMethod(updateHiveConfigs)
-            .addMockedMethod(updateHostRoleCommands)
-            .createMock();
+      .addMockedMethod(addNewConfigurationsFromXml)
+      .addMockedMethod(updateAlerts)
+      .addMockedMethod(updateStormConfigs)
+      .addMockedMethod(updateAMSConfigs)
+      .addMockedMethod(updateHiveConfigs)
+      .addMockedMethod(updateHostRoleCommands)
+      .addMockedMethod(updateHDFSWidget)
+      .createMock();
 
     upgradeCatalog222.addNewConfigurationsFromXml();
     expectLastCall().once();
@@ -137,6 +150,8 @@ public class UpgradeCatalog222Test {
     expectLastCall().once();
     upgradeCatalog222.updateHiveConfig();
     expectLastCall().once();
+    upgradeCatalog222.updateHDFSWidgetDefinition();
+    expectLastCall().once();
 
     replay(upgradeCatalog222);
 
@@ -176,8 +191,8 @@ public class UpgradeCatalog222Test {
     expect(mockAlertDefinitionDAO.findByName(eq(clusterId), eq("yarn_app_timeline_server_webui")))
             .andReturn(mockATSWebAlert).atLeastOnce();
     expect(mockATSWebAlert.getSource()).andReturn("{\"uri\": {\n" +
-            "            \"http\": \"{{yarn-site/yarn.timeline-service.webapp.address}}/ws/v1/timeline\",\n" +
-            "            \"https\": \"{{yarn-site/yarn.timeline-service.webapp.https.address}}/ws/v1/timeline\" } }");
+      "            \"http\": \"{{yarn-site/yarn.timeline-service.webapp.address}}/ws/v1/timeline\",\n" +
+      "            \"https\": \"{{yarn-site/yarn.timeline-service.webapp.https.address}}/ws/v1/timeline\" } }");
 
     mockATSWebAlert.setSource("{\"uri\":{\"http\":\"{{yarn-site/yarn.timeline-service.webapp.address}}/ws/v1/timeline\",\"https\":\"{{yarn-site/yarn.timeline-service.webapp.https.address}}/ws/v1/timeline\"}}");
     expectLastCall().once();
@@ -255,7 +270,6 @@ public class UpgradeCatalog222Test {
     easyMockSupport.verifyAll();
   }
 
-
   @Test
   public void testAmsSiteUpdateConfigs() throws Exception{
 
@@ -330,7 +344,60 @@ public class UpgradeCatalog222Test {
 
     Map<String, String> updatedProperties = propertiesCapture.getValue();
     assertTrue(Maps.difference(newPropertiesAmsSite, updatedProperties).areEqual());
+  }
 
+  @Test
+  public void testHDFSWidgetUpdate() throws Exception {
+    final Clusters clusters = createNiceMock(Clusters.class);
+    final Cluster cluster = createNiceMock(Cluster.class);
+    final AmbariManagementController controller = createNiceMock(AmbariManagementController.class);
+    final Gson gson = new Gson();
+    final WidgetDAO widgetDAO = createNiceMock(WidgetDAO.class);
+    final AmbariMetaInfo metaInfo = createNiceMock(AmbariMetaInfo.class);
+    WidgetEntity widgetEntity = createNiceMock(WidgetEntity.class);
+    StackId stackId = new StackId("HDP", "2.0.0");
+    StackInfo stackInfo = createNiceMock(StackInfo.class);
+    ServiceInfo serviceInfo = createNiceMock(ServiceInfo.class);
+
+    String widgetStr = "{\"layouts\":[{\"layout_name\":\"default_hdfs_dashboard\",\"display_name\":\"Standard HDFS Dashboard\",\"section_name\":\"HDFS_SUMMARY\",\"widgetLayoutInfo\":[{\"widget_name\":\"NameNode RPC\",\"metrics\":[],\"values\":[]}]}]}";
+
+    File dataDirectory = temporaryFolder.newFolder();
+    File file = new File(dataDirectory, "hdfs_widget.json");
+    FileUtils.writeStringToFile(file, widgetStr);
+
+    final Injector mockInjector = Guice.createInjector(new AbstractModule() {
+      @Override
+      protected void configure() {
+        bind(EntityManager.class).toInstance(createNiceMock(EntityManager.class));
+        bind(AmbariManagementController.class).toInstance(controller);
+        bind(Clusters.class).toInstance(clusters);
+        bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class));
+        bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
+        bind(Gson.class).toInstance(gson);
+        bind(WidgetDAO.class).toInstance(widgetDAO);
+        bind(StackManagerFactory.class).toInstance(createNiceMock(StackManagerFactory.class));
+        bind(AmbariMetaInfo.class).toInstance(metaInfo);
+      }
+    });
+    expect(controller.getClusters()).andReturn(clusters).anyTimes();
+    expect(clusters.getClusters()).andReturn(new HashMap<String, Cluster>() {{
+      put("normal", cluster);
+    }}).anyTimes();
+    expect(cluster.getClusterId()).andReturn(1L).anyTimes();
+    expect(widgetDAO.findByName(1L, "NameNode RPC", "ambari", "HDFS_SUMMARY"))
+      .andReturn(Collections.singletonList(widgetEntity));
+    expect(cluster.getDesiredStackVersion()).andReturn(stackId);
+    expect(metaInfo.getStack("HDP", "2.0.0")).andReturn(stackInfo);
+    expect(stackInfo.getService("HDFS")).andReturn(serviceInfo);
+    expect(serviceInfo.getWidgetsDescriptorFile()).andReturn(file);
+    expect(widgetDAO.merge(widgetEntity)).andReturn(null);
+    expect(widgetEntity.getWidgetName()).andReturn("Namenode RPC").anyTimes();
+
+    replay(clusters, cluster, controller, widgetDAO, metaInfo, widgetEntity, stackInfo, serviceInfo);
+
+    mockInjector.getInstance(UpgradeCatalog222.class).updateHDFSWidgetDefinition();
+
+    verify(clusters, cluster, controller, widgetDAO, widgetEntity, stackInfo, serviceInfo);
   }
 
   @Test


Mime
View raw message