Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 11A2C200B4A for ; Wed, 6 Jul 2016 02:24:34 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 05F39160A6F; Wed, 6 Jul 2016 00:24:34 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F3C74160A60 for ; Wed, 6 Jul 2016 02:24:32 +0200 (CEST) Received: (qmail 65980 invoked by uid 500); 6 Jul 2016 00:24:32 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 65971 invoked by uid 99); 6 Jul 2016 00:24:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Jul 2016 00:24:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E34ACE01C1; Wed, 6 Jul 2016 00:24:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: avijayan@apache.org To: commits@ambari.apache.org Message-Id: <196fd341138e4b6d905f445f5d7ee9e2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ambari git commit: AMBARI-17445 : Storm metrics sink: expand metrics name to contain additional information (Jungtaek Lim via avijayan) Date: Wed, 6 Jul 2016 00:24:31 +0000 (UTC) archived-at: Wed, 06 Jul 2016 00:24:34 -0000 Repository: ambari Updated Branches: refs/heads/branch-2.4 98d6c59dc -> 64ea518ad AMBARI-17445 : Storm metrics sink: expand metrics name to contain additional information (Jungtaek Lim via avijayan) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/64ea518a Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/64ea518a Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/64ea518a Branch: refs/heads/branch-2.4 Commit: 64ea518ad2712454330c883d22eeeff77307f41d Parents: 98d6c59 Author: Aravindan Vijayan Authored: Tue Jul 5 17:24:17 2016 -0700 Committer: Aravindan Vijayan Committed: Tue Jul 5 17:24:25 2016 -0700 ---------------------------------------------------------------------- .../sink/storm/StormTimelineMetricsSink.java | 32 +++++++++++++---- .../storm/StormTimelineMetricsSinkTest.java | 10 ++++-- .../sink/storm/StormTimelineMetricsSink.java | 37 ++++++++++---------- .../storm/StormTimelineMetricsSinkTest.java | 28 ++++----------- 4 files changed, 58 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/64ea518a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java index 89906d8..83c6b67 100644 --- a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java @@ -40,11 +40,15 @@ import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCach import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT; public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer { + public static final String CLUSTER_REPORTER_APP_ID = "clusterReporterAppId"; + public static final String DEFAULT_CLUSTER_REPORTER_APP_ID = "storm"; + private String collectorUri; private TimelineMetricsCache metricsCache; private String hostname; private int timeoutSeconds; private String topologyName; + private String applicationId; @Override protected String getCollectorUri() { @@ -76,6 +80,7 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem String.valueOf(MAX_RECS_PER_NAME_DEFAULT))); int metricsSendInterval = Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL, String.valueOf(MAX_EVICTION_TIME_MILLIS))); + applicationId = configuration.getProperty(CLUSTER_REPORTER_APP_ID, DEFAULT_CLUSTER_REPORTER_APP_ID); metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval); collectorUri = configuration.getProperty(COLLECTOR_PROPERTY) + WS_V1_TIMELINE_METRICS; if (collectorUri.toLowerCase().startsWith("https://")) { @@ -96,9 +101,13 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem List populatedDataPoints = populateDataPoints(dataPoint); for (DataPoint populatedDataPoint : populatedDataPoints) { + String metricName = createMetricName(taskInfo.srcComponentId, taskInfo.srcWorkerHost, + taskInfo.srcWorkerPort, taskInfo.srcTaskId, populatedDataPoint.name); + + LOG.debug("populated datapoint: " + metricName + " = " + populatedDataPoint.value); + TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp * 1000, - taskInfo.srcComponentId, taskInfo.srcTaskId, taskInfo.srcWorkerHost, populatedDataPoint.name, - Double.valueOf(populatedDataPoint.value.toString())); + taskInfo.srcWorkerHost, metricName, Double.valueOf(populatedDataPoint.value.toString())); // Put intermediate values into the cache until it is time to send metricsCache.putTimelineMetric(timelineMetric); @@ -128,6 +137,11 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem LOG.info("Stopping Storm Metrics Sink"); } + // purpose just for testing + void setTopologyName(String topologyName) { + this.topologyName = topologyName; + } + private String removeNonce(String topologyId) { return topologyId.substring(0, topologyId.substring(0, topologyId.lastIndexOf("-")).lastIndexOf("-")); } @@ -176,12 +190,12 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem } } - private TimelineMetric createTimelineMetric(long currentTimeMillis, String componentId, int taskId, String hostName, + private TimelineMetric createTimelineMetric(long currentTimeMillis, String hostName, String attributeName, Double attributeValue) { TimelineMetric timelineMetric = new TimelineMetric(); - timelineMetric.setMetricName(createMetricName(componentId, taskId, attributeName)); + timelineMetric.setMetricName(attributeName); timelineMetric.setHostName(hostName); - timelineMetric.setAppId(topologyName); + timelineMetric.setAppId(applicationId); timelineMetric.setStartTime(currentTimeMillis); timelineMetric.setType(ClassUtils.getShortCanonicalName( attributeValue, "Number")); @@ -189,8 +203,12 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem return timelineMetric; } - private String createMetricName(String componentId, int taskId, String attributeName) { - String metricName = componentId + "." + taskId + "." + attributeName; + private String createMetricName(String componentId, String workerHost, int workerPort, int taskId, + String attributeName) { + // ..... + String metricName = topologyName + "." + componentId + "." + workerHost + "." + workerPort + + "." + taskId + "." + attributeName; + // since '._' is treat as special character (separator) so it should be replaced return metricName.replace('_', '-'); } http://git-wip-us.apache.org/repos/asf/ambari/blob/64ea518a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java index 557d088..e8f218c 100644 --- a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java +++ b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java @@ -42,6 +42,7 @@ public class StormTimelineMetricsSinkTest { @Test public void testNonNumericMetricMetricExclusion() throws InterruptedException, IOException { StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink(); + stormTimelineMetricsSink.setTopologyName("topology"); TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class); stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache); replay(timelineMetricsCache); @@ -54,8 +55,10 @@ public class StormTimelineMetricsSinkTest { @Test public void testNumericMetricMetricSubmission() throws InterruptedException, IOException { StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink(); + stormTimelineMetricsSink.setTopologyName("topology"); TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class); - expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1")).andReturn(new TimelineMetric()).once(); + expect(timelineMetricsCache.getTimelineMetric("topology.testComponent.localhost.1234.42.key1")) + .andReturn(new TimelineMetric()).once(); timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class)); expectLastCall().once(); stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache); @@ -70,10 +73,11 @@ public class StormTimelineMetricsSinkTest { @Ignore // TODO: Fix for failover public void testMapMetricMetricSubmission() throws InterruptedException, IOException { StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink(); + stormTimelineMetricsSink.setTopologyName("topology"); TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class); - expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1.field1")) + expect(timelineMetricsCache.getTimelineMetric("topology.testComponent.localhost.1234.42.key1.field1")) .andReturn(new TimelineMetric()).once(); - expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1.field2")) + expect(timelineMetricsCache.getTimelineMetric("topology.testComponent.localhost.1234.42.key1.field2")) .andReturn(new TimelineMetric()).once(); timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class)); expectLastCall().once(); http://git-wip-us.apache.org/repos/asf/ambari/blob/64ea518a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java index 951471a..f87779c 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java @@ -40,13 +40,15 @@ import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCach import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT; public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer { - public static final int SYSTEM_BOLT_TASK_ID = -1; + public static final String CLUSTER_REPORTER_APP_ID = "clusterReporterAppId"; + public static final String DEFAULT_CLUSTER_REPORTER_APP_ID = "storm"; private String collectorUri; private TimelineMetricsCache metricsCache; private String hostname; private int timeoutSeconds; private String topologyName; + private String applicationId; @Override protected String getCollectorUri() { @@ -78,6 +80,7 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem String.valueOf(MAX_RECS_PER_NAME_DEFAULT))); int metricsSendInterval = Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL, String.valueOf(MAX_EVICTION_TIME_MILLIS))); + applicationId = configuration.getProperty(CLUSTER_REPORTER_APP_ID, DEFAULT_CLUSTER_REPORTER_APP_ID); metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval); collectorUri = configuration.getProperty(COLLECTOR_PROPERTY) + WS_V1_TIMELINE_METRICS; if (collectorUri.toLowerCase().startsWith("https://")) { @@ -98,17 +101,13 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem List populatedDataPoints = populateDataPoints(dataPoint); for (DataPoint populatedDataPoint : populatedDataPoints) { - String metricName; - if (taskInfo.srcTaskId == SYSTEM_BOLT_TASK_ID) { - metricName = createMetricNameForSystemBolt(taskInfo, populatedDataPoint.name); - } else { - metricName = createMetricName(taskInfo.srcComponentId, taskInfo.srcTaskId, populatedDataPoint.name); - } + String metricName = createMetricName(taskInfo.srcComponentId, taskInfo.srcWorkerHost, + taskInfo.srcWorkerPort, taskInfo.srcTaskId, populatedDataPoint.name); LOG.debug("populated datapoint: " + metricName + " = " + populatedDataPoint.value); - TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp * 1000, taskInfo.srcWorkerHost, - metricName, Double.valueOf(populatedDataPoint.value.toString())); + TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp * 1000, + taskInfo.srcWorkerHost, metricName, Double.valueOf(populatedDataPoint.value.toString())); // Put intermediate values into the cache until it is time to send metricsCache.putTimelineMetric(timelineMetric); @@ -138,6 +137,11 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem LOG.info("Stopping Storm Metrics Sink"); } + // purpose just for testing + void setTopologyName(String topologyName) { + this.topologyName = topologyName; + } + private String removeNonce(String topologyId) { return topologyId.substring(0, topologyId.substring(0, topologyId.lastIndexOf("-")).lastIndexOf("-")); } @@ -191,7 +195,7 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem TimelineMetric timelineMetric = new TimelineMetric(); timelineMetric.setMetricName(attributeName); timelineMetric.setHostName(hostName); - timelineMetric.setAppId(topologyName); + timelineMetric.setAppId(applicationId); timelineMetric.setStartTime(currentTimeMillis); timelineMetric.setType(ClassUtils.getShortCanonicalName( attributeValue, "Number")); @@ -199,15 +203,12 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem return timelineMetric; } - private String createMetricName(String componentId, int taskId, String attributeName) { - String metricName = componentId + "." + taskId + "." + attributeName; - // since '._' is treat as special character (separator) so it should be replaced - return metricName.replace('_', '-'); - } + private String createMetricName(String componentId, String workerHost, int workerPort, int taskId, + String attributeName) { + // ..... + String metricName = topologyName + "." + componentId + "." + workerHost + "." + workerPort + + "." + taskId + "." + attributeName; - private String createMetricNameForSystemBolt(TaskInfo taskInfo, String attributeName) { - String metricName = taskInfo.srcComponentId + "." + taskInfo.srcWorkerHost + "." + - taskInfo.srcWorkerPort + "." + attributeName; // since '._' is treat as special character (separator) so it should be replaced return metricName.replace('_', '-'); } http://git-wip-us.apache.org/repos/asf/ambari/blob/64ea518a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java index 10ccab3..6fad6b3 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java @@ -18,7 +18,6 @@ package org.apache.hadoop.metrics2.sink.storm; -import static org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink.SYSTEM_BOLT_TASK_ID; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.createNiceMock; import static org.easymock.EasyMock.expect; @@ -43,6 +42,7 @@ public class StormTimelineMetricsSinkTest { @Test public void testNonNumericMetricMetricExclusion() throws InterruptedException, IOException { StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink(); + stormTimelineMetricsSink.setTopologyName("topology"); TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class); stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache); replay(timelineMetricsCache); @@ -55,31 +55,16 @@ public class StormTimelineMetricsSinkTest { @Test public void testNumericMetricMetricSubmission() throws InterruptedException, IOException { StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink(); + stormTimelineMetricsSink.setTopologyName("topology"); TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class); - expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1")).andReturn(new TimelineMetric()).once(); - timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class)); - expectLastCall().once(); - stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache); - replay(timelineMetricsCache); - stormTimelineMetricsSink.handleDataPoints( - new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60), - Collections.singleton(new IMetricsConsumer.DataPoint("key1", 42))); - verify(timelineMetricsCache); - } - - @Test - @Ignore // TODO: Fix for failover - public void testNumericMetricFromSystemBoltMetricSubmission() throws InterruptedException, IOException { - StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink(); - TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class); - expect(timelineMetricsCache.getTimelineMetric("testComponent.localhost.1234.key1")) + expect(timelineMetricsCache.getTimelineMetric("topology.testComponent.localhost.1234.42.key1")) .andReturn(new TimelineMetric()).once(); timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class)); expectLastCall().once(); stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache); replay(timelineMetricsCache); stormTimelineMetricsSink.handleDataPoints( - new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", SYSTEM_BOLT_TASK_ID, 20000L, 60), + new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60), Collections.singleton(new IMetricsConsumer.DataPoint("key1", 42))); verify(timelineMetricsCache); } @@ -88,10 +73,11 @@ public class StormTimelineMetricsSinkTest { @Ignore // TODO: Fix for failover public void testMapMetricMetricSubmission() throws InterruptedException, IOException { StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink(); + stormTimelineMetricsSink.setTopologyName("topology"); TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class); - expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1.field1")) + expect(timelineMetricsCache.getTimelineMetric("topology.testComponent.localhost.1234.42.key1.field1")) .andReturn(new TimelineMetric()).once(); - expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1.field2")) + expect(timelineMetricsCache.getTimelineMetric("topology.testComponent.localhost.1234.42.key1.field2")) .andReturn(new TimelineMetric()).once(); timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class)); expectLastCall().once();