ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From avija...@apache.org
Subject ambari git commit: AMBARI-17445 : Storm metrics sink: expand metrics name to contain additional information (Jungtaek Lim via avijayan)
Date Wed, 06 Jul 2016 00:24:31 GMT
Repository: ambari
Updated Branches:
  refs/heads/branch-2.4 98d6c59dc -> 64ea518ad


AMBARI-17445 : Storm metrics sink: expand metrics name to contain additional information (Jungtaek
Lim via avijayan)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/64ea518a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/64ea518a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/64ea518a

Branch: refs/heads/branch-2.4
Commit: 64ea518ad2712454330c883d22eeeff77307f41d
Parents: 98d6c59
Author: Aravindan Vijayan <avijayan@hortonworks.com>
Authored: Tue Jul 5 17:24:17 2016 -0700
Committer: Aravindan Vijayan <avijayan@hortonworks.com>
Committed: Tue Jul 5 17:24:25 2016 -0700

----------------------------------------------------------------------
 .../sink/storm/StormTimelineMetricsSink.java    | 32 +++++++++++++----
 .../storm/StormTimelineMetricsSinkTest.java     | 10 ++++--
 .../sink/storm/StormTimelineMetricsSink.java    | 37 ++++++++++----------
 .../storm/StormTimelineMetricsSinkTest.java     | 28 ++++-----------
 4 files changed, 58 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/64ea518a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index 89906d8..83c6b67 100644
--- a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -40,11 +40,15 @@ import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCach
 import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT;
 
 public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer
{
+  public static final String CLUSTER_REPORTER_APP_ID = "clusterReporterAppId";
+  public static final String DEFAULT_CLUSTER_REPORTER_APP_ID = "storm";
+
   private String collectorUri;
   private TimelineMetricsCache metricsCache;
   private String hostname;
   private int timeoutSeconds;
   private String topologyName;
+  private String applicationId;
 
   @Override
   protected String getCollectorUri() {
@@ -76,6 +80,7 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink
implem
         String.valueOf(MAX_RECS_PER_NAME_DEFAULT)));
     int metricsSendInterval = Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL,
         String.valueOf(MAX_EVICTION_TIME_MILLIS)));
+    applicationId = configuration.getProperty(CLUSTER_REPORTER_APP_ID, DEFAULT_CLUSTER_REPORTER_APP_ID);
     metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval);
     collectorUri = configuration.getProperty(COLLECTOR_PROPERTY) + WS_V1_TIMELINE_METRICS;
     if (collectorUri.toLowerCase().startsWith("https://")) {
@@ -96,9 +101,13 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink
implem
       List<DataPoint> populatedDataPoints = populateDataPoints(dataPoint);
 
       for (DataPoint populatedDataPoint : populatedDataPoints) {
+        String metricName = createMetricName(taskInfo.srcComponentId, taskInfo.srcWorkerHost,
+            taskInfo.srcWorkerPort, taskInfo.srcTaskId, populatedDataPoint.name);
+
+        LOG.debug("populated datapoint: " + metricName + " = " + populatedDataPoint.value);
+
         TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp * 1000,
-            taskInfo.srcComponentId, taskInfo.srcTaskId, taskInfo.srcWorkerHost, populatedDataPoint.name,
-            Double.valueOf(populatedDataPoint.value.toString()));
+            taskInfo.srcWorkerHost, metricName, Double.valueOf(populatedDataPoint.value.toString()));
 
         // Put intermediate values into the cache until it is time to send
         metricsCache.putTimelineMetric(timelineMetric);
@@ -128,6 +137,11 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink
implem
     LOG.info("Stopping Storm Metrics Sink");
   }
 
+  // purpose just for testing
+  void setTopologyName(String topologyName) {
+    this.topologyName = topologyName;
+  }
+
   private String removeNonce(String topologyId) {
     return topologyId.substring(0, topologyId.substring(0, topologyId.lastIndexOf("-")).lastIndexOf("-"));
   }
@@ -176,12 +190,12 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink
implem
     }
   }
 
-  private TimelineMetric createTimelineMetric(long currentTimeMillis, String componentId,
int taskId, String hostName,
+  private TimelineMetric createTimelineMetric(long currentTimeMillis, String hostName,
       String attributeName, Double attributeValue) {
     TimelineMetric timelineMetric = new TimelineMetric();
-    timelineMetric.setMetricName(createMetricName(componentId, taskId, attributeName));
+    timelineMetric.setMetricName(attributeName);
     timelineMetric.setHostName(hostName);
-    timelineMetric.setAppId(topologyName);
+    timelineMetric.setAppId(applicationId);
     timelineMetric.setStartTime(currentTimeMillis);
     timelineMetric.setType(ClassUtils.getShortCanonicalName(
         attributeValue, "Number"));
@@ -189,8 +203,12 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink
implem
     return timelineMetric;
   }
 
-  private String createMetricName(String componentId, int taskId, String attributeName) {
-    String metricName = componentId + "." + taskId + "." + attributeName;
+  private String createMetricName(String componentId, String workerHost, int workerPort,
int taskId,
+      String attributeName) {
+    // <topology name>.<component name>.<worker host>.<worker port>.<task
id>.<metric name>
+    String metricName = topologyName + "." + componentId + "." + workerHost + "." + workerPort
+
+        "." + taskId + "." + attributeName;
+
     // since '._' is treat as special character (separator) so it should be replaced
     return metricName.replace('_', '-');
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/64ea518a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
index 557d088..e8f218c 100644
--- a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
+++ b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
@@ -42,6 +42,7 @@ public class StormTimelineMetricsSinkTest {
   @Test
   public void testNonNumericMetricMetricExclusion() throws InterruptedException, IOException
{
     StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
+    stormTimelineMetricsSink.setTopologyName("topology");
     TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
     stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
     replay(timelineMetricsCache);
@@ -54,8 +55,10 @@ public class StormTimelineMetricsSinkTest {
   @Test
   public void testNumericMetricMetricSubmission() throws InterruptedException, IOException
{
     StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
+    stormTimelineMetricsSink.setTopologyName("topology");
     TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
-    expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1")).andReturn(new
TimelineMetric()).once();
+    expect(timelineMetricsCache.getTimelineMetric("topology.testComponent.localhost.1234.42.key1"))
+        .andReturn(new TimelineMetric()).once();
     timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
     expectLastCall().once();
     stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
@@ -70,10 +73,11 @@ public class StormTimelineMetricsSinkTest {
   @Ignore // TODO: Fix for failover
   public void testMapMetricMetricSubmission() throws InterruptedException, IOException {
     StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
+    stormTimelineMetricsSink.setTopologyName("topology");
     TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
-    expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1.field1"))
+    expect(timelineMetricsCache.getTimelineMetric("topology.testComponent.localhost.1234.42.key1.field1"))
         .andReturn(new TimelineMetric()).once();
-    expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1.field2"))
+    expect(timelineMetricsCache.getTimelineMetric("topology.testComponent.localhost.1234.42.key1.field2"))
         .andReturn(new TimelineMetric()).once();
     timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
     expectLastCall().once();

http://git-wip-us.apache.org/repos/asf/ambari/blob/64ea518a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index 951471a..f87779c 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -40,13 +40,15 @@ import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCach
 import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT;
 
 public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer
{
-  public static final int SYSTEM_BOLT_TASK_ID = -1;
+  public static final String CLUSTER_REPORTER_APP_ID = "clusterReporterAppId";
+  public static final String DEFAULT_CLUSTER_REPORTER_APP_ID = "storm";
 
   private String collectorUri;
   private TimelineMetricsCache metricsCache;
   private String hostname;
   private int timeoutSeconds;
   private String topologyName;
+  private String applicationId;
 
   @Override
   protected String getCollectorUri() {
@@ -78,6 +80,7 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink
implem
         String.valueOf(MAX_RECS_PER_NAME_DEFAULT)));
     int metricsSendInterval = Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL,
         String.valueOf(MAX_EVICTION_TIME_MILLIS)));
+    applicationId = configuration.getProperty(CLUSTER_REPORTER_APP_ID, DEFAULT_CLUSTER_REPORTER_APP_ID);
     metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval);
     collectorUri = configuration.getProperty(COLLECTOR_PROPERTY) + WS_V1_TIMELINE_METRICS;
     if (collectorUri.toLowerCase().startsWith("https://")) {
@@ -98,17 +101,13 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink
implem
       List<DataPoint> populatedDataPoints = populateDataPoints(dataPoint);
 
       for (DataPoint populatedDataPoint : populatedDataPoints) {
-        String metricName;
-        if (taskInfo.srcTaskId == SYSTEM_BOLT_TASK_ID) {
-          metricName = createMetricNameForSystemBolt(taskInfo, populatedDataPoint.name);
-        } else {
-          metricName = createMetricName(taskInfo.srcComponentId, taskInfo.srcTaskId, populatedDataPoint.name);
-        }
+        String metricName = createMetricName(taskInfo.srcComponentId, taskInfo.srcWorkerHost,
+            taskInfo.srcWorkerPort, taskInfo.srcTaskId, populatedDataPoint.name);
 
         LOG.debug("populated datapoint: " + metricName + " = " + populatedDataPoint.value);
 
-        TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp * 1000, taskInfo.srcWorkerHost,
-            metricName, Double.valueOf(populatedDataPoint.value.toString()));
+        TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp * 1000,
+            taskInfo.srcWorkerHost, metricName, Double.valueOf(populatedDataPoint.value.toString()));
 
         // Put intermediate values into the cache until it is time to send
         metricsCache.putTimelineMetric(timelineMetric);
@@ -138,6 +137,11 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink
implem
     LOG.info("Stopping Storm Metrics Sink");
   }
 
+  // purpose just for testing
+  void setTopologyName(String topologyName) {
+    this.topologyName = topologyName;
+  }
+
   private String removeNonce(String topologyId) {
     return topologyId.substring(0, topologyId.substring(0, topologyId.lastIndexOf("-")).lastIndexOf("-"));
   }
@@ -191,7 +195,7 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink
implem
     TimelineMetric timelineMetric = new TimelineMetric();
     timelineMetric.setMetricName(attributeName);
     timelineMetric.setHostName(hostName);
-    timelineMetric.setAppId(topologyName);
+    timelineMetric.setAppId(applicationId);
     timelineMetric.setStartTime(currentTimeMillis);
     timelineMetric.setType(ClassUtils.getShortCanonicalName(
         attributeValue, "Number"));
@@ -199,15 +203,12 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink
implem
     return timelineMetric;
   }
 
-  private String createMetricName(String componentId, int taskId, String attributeName) {
-    String metricName = componentId + "." + taskId + "." + attributeName;
-    // since '._' is treat as special character (separator) so it should be replaced
-    return metricName.replace('_', '-');
-  }
+  private String createMetricName(String componentId, String workerHost, int workerPort,
int taskId,
+      String attributeName) {
+    // <topology name>.<component name>.<worker host>.<worker port>.<task
id>.<metric name>
+    String metricName = topologyName + "." + componentId + "." + workerHost + "." + workerPort
+
+        "." + taskId + "." + attributeName;
 
-  private String createMetricNameForSystemBolt(TaskInfo taskInfo, String attributeName) {
-    String metricName = taskInfo.srcComponentId + "." + taskInfo.srcWorkerHost + "." +
-        taskInfo.srcWorkerPort + "." + attributeName;
     // since '._' is treat as special character (separator) so it should be replaced
     return metricName.replace('_', '-');
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/64ea518a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
index 10ccab3..6fad6b3 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.metrics2.sink.storm;
 
-import static org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink.SYSTEM_BOLT_TASK_ID;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.createNiceMock;
 import static org.easymock.EasyMock.expect;
@@ -43,6 +42,7 @@ public class StormTimelineMetricsSinkTest {
   @Test
   public void testNonNumericMetricMetricExclusion() throws InterruptedException, IOException
{
     StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
+    stormTimelineMetricsSink.setTopologyName("topology");
     TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
     stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
     replay(timelineMetricsCache);
@@ -55,31 +55,16 @@ public class StormTimelineMetricsSinkTest {
   @Test
   public void testNumericMetricMetricSubmission() throws InterruptedException, IOException
{
     StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
+    stormTimelineMetricsSink.setTopologyName("topology");
     TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
-    expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1")).andReturn(new
TimelineMetric()).once();
-    timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
-    expectLastCall().once();
-    stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
-    replay(timelineMetricsCache);
-    stormTimelineMetricsSink.handleDataPoints(
-        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
-        Collections.singleton(new IMetricsConsumer.DataPoint("key1", 42)));
-    verify(timelineMetricsCache);
-  }
-
-  @Test
-  @Ignore // TODO: Fix for failover
-  public void testNumericMetricFromSystemBoltMetricSubmission() throws InterruptedException,
IOException {
-    StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
-    TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
-    expect(timelineMetricsCache.getTimelineMetric("testComponent.localhost.1234.key1"))
+    expect(timelineMetricsCache.getTimelineMetric("topology.testComponent.localhost.1234.42.key1"))
         .andReturn(new TimelineMetric()).once();
     timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
     expectLastCall().once();
     stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
     replay(timelineMetricsCache);
     stormTimelineMetricsSink.handleDataPoints(
-        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", SYSTEM_BOLT_TASK_ID,
20000L, 60),
+        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
         Collections.singleton(new IMetricsConsumer.DataPoint("key1", 42)));
     verify(timelineMetricsCache);
   }
@@ -88,10 +73,11 @@ public class StormTimelineMetricsSinkTest {
   @Ignore // TODO: Fix for failover
   public void testMapMetricMetricSubmission() throws InterruptedException, IOException {
     StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
+    stormTimelineMetricsSink.setTopologyName("topology");
     TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
-    expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1.field1"))
+    expect(timelineMetricsCache.getTimelineMetric("topology.testComponent.localhost.1234.42.key1.field1"))
         .andReturn(new TimelineMetric()).once();
-    expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1.field2"))
+    expect(timelineMetricsCache.getTimelineMetric("topology.testComponent.localhost.1234.42.key1.field2"))
         .andReturn(new TimelineMetric()).once();
     timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
     expectLastCall().once();


Mime
View raw message