ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From avija...@apache.org
Subject [3/3] ambari git commit: AMBARI-17784 : AMS Storm Sink: remove redundant information from kafka offset metrics on storm-kafka (Jungtaek Lim via avijayan)
Date Wed, 20 Jul 2016 17:54:11 GMT
AMBARI-17784 : AMS Storm Sink: remove redundant information from kafka offset metrics on storm-kafka
(Jungtaek Lim via avijayan)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/2e873b6c
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/2e873b6c
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/2e873b6c

Branch: refs/heads/trunk
Commit: 2e873b6c4a4a7dae9c33f8df3e5f97ac61f73499
Parents: da2e677
Author: Aravindan Vijayan <avijayan@hortonworks.com>
Authored: Wed Jul 20 10:48:09 2016 -0700
Committer: Aravindan Vijayan <avijayan@hortonworks.com>
Committed: Wed Jul 20 10:48:09 2016 -0700

----------------------------------------------------------------------
 .../sink/storm/StormTimelineMetricsSink.java    | 36 +++++++++++++++++--
 .../storm/StormTimelineMetricsSinkTest.java     | 37 ++++++++++++++++++++
 .../sink/storm/StormTimelineMetricsSink.java    | 36 +++++++++++++++++--
 .../storm/StormTimelineMetricsSinkTest.java     | 37 ++++++++++++++++++++
 4 files changed, 142 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/2e873b6c/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index 879cbfc..0d3b770 100644
--- a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -47,6 +47,7 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink
implem
 
   public static final String CLUSTER_REPORTER_APP_ID = "clusterReporterAppId";
   public static final String DEFAULT_CLUSTER_REPORTER_APP_ID = "storm";
+  public static final String METRIC_NAME_PREFIX_KAFKA_OFFSET = "kafkaOffset.";
 
   private String collectorUri;
   private TimelineMetricsCache metricsCache;
@@ -138,8 +139,13 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink
implem
       List<DataPoint> populatedDataPoints = populateDataPoints(dataPoint);
 
       for (DataPoint populatedDataPoint : populatedDataPoints) {
-        String metricName = createMetricName(taskInfo.srcComponentId, taskInfo.srcWorkerHost,
-            taskInfo.srcWorkerPort, taskInfo.srcTaskId, populatedDataPoint.name);
+        String metricName;
+        if (populatedDataPoint.name.startsWith(METRIC_NAME_PREFIX_KAFKA_OFFSET)) {
+          metricName = createKafkaOffsetMetricName(populatedDataPoint.name);
+        } else {
+          metricName = createMetricName(taskInfo.srcComponentId, taskInfo.srcWorkerHost,
+              taskInfo.srcWorkerPort, taskInfo.srcTaskId, populatedDataPoint.name);
+        }
 
         LOG.debug("populated datapoint: " + metricName + " = " + populatedDataPoint.value);
 
@@ -250,6 +256,32 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink
implem
     return metricName.replace('_', '-');
   }
 
+  private String createKafkaOffsetMetricName(String kafkaOffsetMetricName) {
+    // get rid of "kafkaOffset."
+    // <topic>/<metric name (starts with total)> or <topic>/partition_<partition_num>/<metricName>
+    String tempMetricName = kafkaOffsetMetricName.substring(METRIC_NAME_PREFIX_KAFKA_OFFSET.length());
+
+    String[] slashSplittedNames = tempMetricName.split("/");
+
+    if (slashSplittedNames.length == 1) {
+      // unknown metrics
+      throw new IllegalArgumentException("Unknown metrics for kafka offset metric: " + kafkaOffsetMetricName);
+    }
+
+    String topic = slashSplittedNames[0];
+    String metricName = "topology." + topologyName + ".kafka-topic." + topic;
+    if (slashSplittedNames.length > 2) {
+      // partition level
+      metricName = metricName + "." + slashSplittedNames[1] + "." + slashSplittedNames[2];
+    } else {
+      // topic level
+      metricName = metricName + "." + slashSplittedNames[1];
+    }
+
+    // since '._' is treat as special character (separator) so it should be replaced
+    return metricName.replace('_', '-');
+  }
+
   private void warnIfTopologyNameContainsWarnString(String name) {
     for (String warn : WARN_STRINGS_FOR_TOPOLOGY_OR_COMPONENT_NAME) {
       if (name.contains(warn)) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/2e873b6c/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
index 2128e07..3b3e236 100644
--- a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
+++ b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.metrics2.sink.storm;
 
+import static org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink.METRIC_NAME_PREFIX_KAFKA_OFFSET;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.createMockBuilder;
 import static org.easymock.EasyMock.createNiceMock;
@@ -73,6 +74,42 @@ public class StormTimelineMetricsSinkTest {
 
   @Test
   @Ignore // TODO: Fix for failover
+  public void testTopicLevelKafkaOffsetMetricSubmission() throws InterruptedException, IOException
{
+    StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
+    stormTimelineMetricsSink.setTopologyName("topology1");
+    TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
+    expect(timelineMetricsCache.getTimelineMetric("topology.topology1.kafka-topic.topic1.totalLatestTimeOffset"))
+        .andReturn(new TimelineMetric()).once();
+    timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
+    expectLastCall().once();
+    stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
+    replay(timelineMetricsCache);
+    stormTimelineMetricsSink.handleDataPoints(
+        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
+        Collections.singleton(new IMetricsConsumer.DataPoint(METRIC_NAME_PREFIX_KAFKA_OFFSET
+ "topic1/totalLatestTimeOffset", 42)));
+    verify(timelineMetricsCache);
+  }
+
+  @Test
+  @Ignore // TODO: Fix for failover
+  public void testPartitionLevelKafkaOffsetMetricSubmission() throws InterruptedException,
IOException {
+    StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
+    stormTimelineMetricsSink.setTopologyName("topology1");
+    TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
+    expect(timelineMetricsCache.getTimelineMetric("topology.topology1.kafka-topic.topic1.partition-1.latestTimeOffset"))
+        .andReturn(new TimelineMetric()).once();
+    timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
+    expectLastCall().once();
+    stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
+    replay(timelineMetricsCache);
+    stormTimelineMetricsSink.handleDataPoints(
+        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
+        Collections.singleton(new IMetricsConsumer.DataPoint(METRIC_NAME_PREFIX_KAFKA_OFFSET
+ "topic1/partition_1/latestTimeOffset", 42)));
+    verify(timelineMetricsCache);
+  }
+
+  @Test
+  @Ignore // TODO: Fix for failover
   public void testMapMetricMetricSubmission() throws InterruptedException, IOException {
     StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
     stormTimelineMetricsSink.setTopologyName("topology1");

http://git-wip-us.apache.org/repos/asf/ambari/blob/2e873b6c/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index 91f78bc..3a4289b 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -47,6 +47,7 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink
implem
 
   public static final String CLUSTER_REPORTER_APP_ID = "clusterReporterAppId";
   public static final String DEFAULT_CLUSTER_REPORTER_APP_ID = "storm";
+  public static final String METRIC_NAME_PREFIX_KAFKA_OFFSET = "kafkaOffset.";
 
   private String collectorUri;
   private TimelineMetricsCache metricsCache;
@@ -138,8 +139,13 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink
implem
       List<DataPoint> populatedDataPoints = populateDataPoints(dataPoint);
 
       for (DataPoint populatedDataPoint : populatedDataPoints) {
-        String metricName = createMetricName(taskInfo.srcComponentId, taskInfo.srcWorkerHost,
-            taskInfo.srcWorkerPort, taskInfo.srcTaskId, populatedDataPoint.name);
+        String metricName;
+        if (populatedDataPoint.name.startsWith(METRIC_NAME_PREFIX_KAFKA_OFFSET)) {
+          metricName = createKafkaOffsetMetricName(populatedDataPoint.name);
+        } else {
+          metricName = createMetricName(taskInfo.srcComponentId, taskInfo.srcWorkerHost,
+              taskInfo.srcWorkerPort, taskInfo.srcTaskId, populatedDataPoint.name);
+        }
 
         LOG.debug("populated datapoint: " + metricName + " = " + populatedDataPoint.value);
 
@@ -250,6 +256,32 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink
implem
     return metricName.replace('_', '-');
   }
 
+  private String createKafkaOffsetMetricName(String kafkaOffsetMetricName) {
+    // get rid of "kafkaOffset."
+    // <topic>/<metric name (starts with total)> or <topic>/partition_<partition_num>/<metricName>
+    String tempMetricName = kafkaOffsetMetricName.substring(METRIC_NAME_PREFIX_KAFKA_OFFSET.length());
+
+    String[] slashSplittedNames = tempMetricName.split("/");
+
+    if (slashSplittedNames.length == 1) {
+      // unknown metrics
+      throw new IllegalArgumentException("Unknown metrics for kafka offset metric: " + kafkaOffsetMetricName);
+    }
+
+    String topic = slashSplittedNames[0];
+    String metricName = "topology." + topologyName + ".kafka-topic." + topic;
+    if (slashSplittedNames.length > 2) {
+      // partition level
+      metricName = metricName + "." + slashSplittedNames[1] + "." + slashSplittedNames[2];
+    } else {
+      // topic level
+      metricName = metricName + "." + slashSplittedNames[1];
+    }
+
+    // since '._' is treat as special character (separator) so it should be replaced
+    return metricName.replace('_', '-');
+  }
+
   private void warnIfTopologyNameContainsWarnString(String name) {
     for (String warn : WARN_STRINGS_FOR_TOPOLOGY_OR_COMPONENT_NAME) {
       if (name.contains(warn)) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/2e873b6c/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
index efe3022..fadb00c 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.metrics2.sink.storm;
 
+import static org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink.METRIC_NAME_PREFIX_KAFKA_OFFSET;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.createMockBuilder;
 import static org.easymock.EasyMock.createNiceMock;
@@ -73,6 +74,42 @@ public class StormTimelineMetricsSinkTest {
 
   @Test
   @Ignore // TODO: Fix for failover
+  public void testTopicLevelKafkaOffsetMetricSubmission() throws InterruptedException, IOException
{
+    StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
+    stormTimelineMetricsSink.setTopologyName("topology1");
+    TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
+    expect(timelineMetricsCache.getTimelineMetric("topology.topology1.kafka-topic.topic1.totalLatestTimeOffset"))
+        .andReturn(new TimelineMetric()).once();
+    timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
+    expectLastCall().once();
+    stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
+    replay(timelineMetricsCache);
+    stormTimelineMetricsSink.handleDataPoints(
+        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
+        Collections.singleton(new IMetricsConsumer.DataPoint(METRIC_NAME_PREFIX_KAFKA_OFFSET
+ "topic1/totalLatestTimeOffset", 42)));
+    verify(timelineMetricsCache);
+  }
+
+  @Test
+  @Ignore // TODO: Fix for failover
+  public void testPartitionLevelKafkaOffsetMetricSubmission() throws InterruptedException,
IOException {
+    StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
+    stormTimelineMetricsSink.setTopologyName("topology1");
+    TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
+    expect(timelineMetricsCache.getTimelineMetric("topology.topology1.kafka-topic.topic1.partition-1.latestTimeOffset"))
+        .andReturn(new TimelineMetric()).once();
+    timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
+    expectLastCall().once();
+    stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
+    replay(timelineMetricsCache);
+    stormTimelineMetricsSink.handleDataPoints(
+        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
+        Collections.singleton(new IMetricsConsumer.DataPoint(METRIC_NAME_PREFIX_KAFKA_OFFSET
+ "topic1/partition_1/latestTimeOffset", 42)));
+    verify(timelineMetricsCache);
+  }
+
+  @Test
+  @Ignore // TODO: Fix for failover
   public void testMapMetricMetricSubmission() throws InterruptedException, IOException {
     StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
     stormTimelineMetricsSink.setTopologyName("topology1");


Mime
View raw message