ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From avija...@apache.org
Subject ambari git commit: AMBARI-17249 : Storm metrics sink should include worker host and port to metric name when metrics are coming from SystemBolt (Jungtaek Lim via avijayan)
Date Thu, 30 Jun 2016 18:30:35 GMT
Repository: ambari
Updated Branches:
  refs/heads/trunk 1be5dfd92 -> 3ff0ddc66


 AMBARI-17249 : Storm metrics sink should include worker host and port to metric name when
metrics are coming from SystemBolt (Jungtaek Lim via avijayan)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/3ff0ddc6
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/3ff0ddc6
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/3ff0ddc6

Branch: refs/heads/trunk
Commit: 3ff0ddc663c58ae044385ac18660478484ae4cb5
Parents: 1be5dfd
Author: Aravindan Vijayan <avijayan@hortonworks.com>
Authored: Thu Jun 30 11:30:23 2016 -0700
Committer: Aravindan Vijayan <avijayan@hortonworks.com>
Committed: Thu Jun 30 11:30:23 2016 -0700

----------------------------------------------------------------------
 .../sink/storm/StormTimelineMetricsSink.java    | 27 ++++++++++++++++----
 .../storm/StormTimelineMetricsSinkTest.java     | 18 +++++++++++++
 2 files changed, 40 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/3ff0ddc6/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index eb572b3..0ef09d6 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -40,6 +40,8 @@ import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCach
 import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT;
 
 public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer
{
+  public static final int SYSTEM_BOLT_TASK_ID = -1;
+
   private String collectorUri;
   private TimelineMetricsCache metricsCache;
   private String hostname;
@@ -127,9 +129,17 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink
implem
       List<DataPoint> populatedDataPoints = populateDataPoints(dataPoint);
 
       for (DataPoint populatedDataPoint : populatedDataPoints) {
-        TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp * 1000,
-            taskInfo.srcComponentId, taskInfo.srcTaskId, taskInfo.srcWorkerHost, populatedDataPoint.name,
-            Double.valueOf(populatedDataPoint.value.toString()));
+        String metricName;
+        if (taskInfo.srcTaskId == SYSTEM_BOLT_TASK_ID) {
+          metricName = createMetricNameForSystemBolt(taskInfo, populatedDataPoint.name);
+        } else {
+          metricName = createMetricName(taskInfo.srcComponentId, taskInfo.srcTaskId, populatedDataPoint.name);
+        }
+
+        LOG.debug("populated datapoint: " + metricName + " = " + populatedDataPoint.value);
+
+        TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp * 1000, taskInfo.srcWorkerHost,
+            metricName, Double.valueOf(populatedDataPoint.value.toString()));
 
         // Put intermediate values into the cache until it is time to send
         metricsCache.putTimelineMetric(timelineMetric);
@@ -207,10 +217,10 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink
implem
     }
   }
 
-  private TimelineMetric createTimelineMetric(long currentTimeMillis, String componentId,
int taskId, String hostName,
+  private TimelineMetric createTimelineMetric(long currentTimeMillis, String hostName,
       String attributeName, Double attributeValue) {
     TimelineMetric timelineMetric = new TimelineMetric();
-    timelineMetric.setMetricName(createMetricName(componentId, taskId, attributeName));
+    timelineMetric.setMetricName(attributeName);
     timelineMetric.setHostName(hostName);
     timelineMetric.setAppId(topologyName);
     timelineMetric.setStartTime(currentTimeMillis);
@@ -226,6 +236,13 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink
implem
     return metricName.replace('_', '-');
   }
 
+  private String createMetricNameForSystemBolt(TaskInfo taskInfo, String attributeName) {
+    String metricName = taskInfo.srcComponentId + "." + taskInfo.srcWorkerHost + "." +
+        taskInfo.srcWorkerPort + "." + attributeName;
+    // since '._' is treat as special character (separator) so it should be replaced
+    return metricName.replace('_', '-');
+  }
+
   public void setMetricsCache(TimelineMetricsCache metricsCache) {
     this.metricsCache = metricsCache;
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/3ff0ddc6/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
index c4b54b4..4ea7396 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.metrics2.sink.storm;
 
+import static org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink.SYSTEM_BOLT_TASK_ID;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.createMockBuilder;
 import static org.easymock.EasyMock.createNiceMock;
@@ -71,6 +72,23 @@ public class StormTimelineMetricsSinkTest {
 
   @Test
   @Ignore // TODO: Fix for failover
+  public void testNumericMetricFromSystemBoltMetricSubmission() throws InterruptedException,
IOException {
+    StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
+    TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
+    expect(timelineMetricsCache.getTimelineMetric("testComponent.localhost.1234.key1"))
+        .andReturn(new TimelineMetric()).once();
+    timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
+    expectLastCall().once();
+    stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
+    replay(timelineMetricsCache);
+    stormTimelineMetricsSink.handleDataPoints(
+        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", SYSTEM_BOLT_TASK_ID,
20000L, 60),
+        Collections.singleton(new IMetricsConsumer.DataPoint("key1", 42)));
+    verify(timelineMetricsCache);
+  }
+
+  @Test
+  @Ignore // TODO: Fix for failover
   public void testMapMetricMetricSubmission() throws InterruptedException, IOException {
     StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
     TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);


Mime
View raw message