Return-Path: X-Original-To: apmail-ambari-commits-archive@www.apache.org Delivered-To: apmail-ambari-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 068B017E83 for ; Mon, 18 May 2015 20:15:18 +0000 (UTC) Received: (qmail 42159 invoked by uid 500); 18 May 2015 20:15:17 -0000 Delivered-To: apmail-ambari-commits-archive@ambari.apache.org Received: (qmail 42132 invoked by uid 500); 18 May 2015 20:15:17 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 42123 invoked by uid 99); 18 May 2015 20:15:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 May 2015 20:15:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BE655E091A; Mon, 18 May 2015 20:15:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: swagle@apache.org To: commits@ambari.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: ambari git commit: AMBARI-11184. AMS: Incorrect value obtained for a datapoint in the metric data series queried from AMS. (swagle) Date: Mon, 18 May 2015 20:15:17 +0000 (UTC) Repository: ambari Updated Branches: refs/heads/trunk 2015be59a -> 8b0c964a8 AMBARI-11184. AMS: Incorrect value obtained for a datapoint in the metric data series queried from AMS. (swagle) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/8b0c964a Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/8b0c964a Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/8b0c964a Branch: refs/heads/trunk Commit: 8b0c964a89ffd4d352d0deac0756923242a252e9 Parents: 2015be5 Author: Siddharth Wagle Authored: Mon May 18 13:07:06 2015 -0700 Committer: Siddharth Wagle Committed: Mon May 18 13:07:06 2015 -0700 ---------------------------------------------------------------------- .../metrics/timeline/PhoenixHBaseAccessor.java | 8 +- .../timeline/TimelineMetricConfiguration.java | 3 + .../aggregators/AbstractTimelineAggregator.java | 6 +- .../TimelineMetricClusterAggregatorMinute.java | 131 +++++++++++++------ .../aggregators/TimelineMetricReadHelper.java | 4 +- .../timeline/AbstractMiniHBaseClusterTest.java | 80 ++++++++++- .../metrics/timeline/ITClusterAggregator.java | 130 ++++++++++++++---- .../0.1.0/configuration/ams-site.xml | 2 +- 8 files changed, 290 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/8b0c964a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java index b890171..bf1ae66 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java @@ -313,8 +313,7 @@ public class PhoenixHBaseAccessor { } } - public void insertMetricRecords(TimelineMetrics metrics) - throws SQLException, IOException { + public void insertMetricRecords(TimelineMetrics metrics) throws SQLException, IOException { List timelineMetrics = metrics.getMetrics(); if (timelineMetrics == null || timelineMetrics.isEmpty()) { @@ -351,9 +350,8 @@ public class PhoenixHBaseAccessor { metricRecordStmt.setDouble(8, aggregates[0]); metricRecordStmt.setDouble(9, aggregates[1]); metricRecordStmt.setDouble(10, aggregates[2]); - metricRecordStmt.setLong(11, (long)aggregates[3]); - String json = - TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues()); + metricRecordStmt.setLong(11, (long) aggregates[3]); + String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues()); metricRecordStmt.setString(12, json); try { http://git-wip-us.apache.org/repos/asf/ambari/blob/8b0c964a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java index 0595c20..0461261 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java @@ -157,6 +157,9 @@ public class TimelineMetricConfiguration { public static final String CLUSTER_AGGREGATOR_APP_IDS = "timeline.metrics.service.cluster.aggregator.appIds"; + public static final String SERVER_SIDE_TIMESIFT_ADJUSTMENT = + "timeline.metrics.service.cluster.aggregator.timeshift.adjustment"; + public static final String HOST_APP_ID = "HOST"; private Configuration hbaseConf; http://git-wip-us.apache.org/repos/asf/ambari/blob/8b0c964a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java index 415471d..37fb088 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java @@ -165,15 +165,15 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg lastCheckPointTime = readCheckPoint(); if (isLastCheckPointTooOld(lastCheckPointTime)) { LOG.warn("Last Checkpoint is too old, discarding last checkpoint. " + - "lastCheckPointTime = " + lastCheckPointTime); + "lastCheckPointTime = " + new Date(lastCheckPointTime)); lastCheckPointTime = -1; } if (lastCheckPointTime == -1) { // Assuming first run, save checkpoint and sleep. // Set checkpoint to 2 minutes in the past to allow the // agents/collectors to catch up - LOG.info("Saving checkpoint time on first run." + - (currentTime - checkpointDelayMillis)); + LOG.info("Saving checkpoint time on first run. " + + new Date((currentTime - checkpointDelayMillis))); saveCheckPoint(currentTime - checkpointDelayMillis); } } catch (IOException io) { http://git-wip-us.apache.org/repos/asf/ambari/blob/8b0c964a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java index 293608e..9b51f98 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java @@ -33,9 +33,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; - +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA; @@ -49,6 +48,8 @@ public class TimelineMetricClusterAggregatorMinute extends AbstractTimelineAggre private TimelineMetricReadHelper timelineMetricReadHelper = new TimelineMetricReadHelper(true); // Aggregator to perform app-level aggregates for host metrics private final TimelineMetricAppAggregator appAggregator; + // 1 minute client side buffering adjustment + private final Long serverTimeShiftAdjustment; public TimelineMetricClusterAggregatorMinute(PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, @@ -66,11 +67,14 @@ public class TimelineMetricClusterAggregatorMinute extends AbstractTimelineAggre appAggregator = new TimelineMetricAppAggregator(metricsConf); this.timeSliceIntervalMillis = timeSliceInterval; + this.serverTimeShiftAdjustment = Long.parseLong(metricsConf.get(SERVER_SIDE_TIMESIFT_ADJUSTMENT, "90000")); } @Override protected void aggregate(ResultSet rs, long startTime, long endTime) throws SQLException, IOException { - List timeSlices = getTimeSlices(startTime, endTime); + // Account for time shift due to client side buffering by shifting the + // timestamps with the difference between server time and series start time + List timeSlices = getTimeSlices(startTime - serverTimeShiftAdjustment, endTime); // Initialize app aggregates for host metrics appAggregator.init(); Map aggregateClusterMetrics = @@ -91,17 +95,20 @@ public class TimelineMetricClusterAggregatorMinute extends AbstractTimelineAggre PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), METRICS_RECORD_TABLE_NAME)); condition.addOrderByColumn("METRIC_NAME"); + condition.addOrderByColumn("HOSTNAME"); condition.addOrderByColumn("APP_ID"); - condition.addOrderByColumn("INSTANCE_ID"); condition.addOrderByColumn("SERVER_TIME"); return condition; } + /** + * Return time slices to normalize the timeseries data. + */ private List getTimeSlices(long startTime, long endTime) { List timeSlices = new ArrayList(); long sliceStartTime = startTime; while (sliceStartTime < endTime) { - timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis}); + timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis }); sliceStartTime += timeSliceIntervalMillis; } return timeSlices; @@ -111,45 +118,72 @@ public class TimelineMetricClusterAggregatorMinute extends AbstractTimelineAggre throws SQLException, IOException { Map aggregateClusterMetrics = new HashMap(); - // Create time slices - while (rs.next()) { - TimelineMetric metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs); - - Map clusterMetrics = - sliceFromTimelineMetric(metric, timeSlices); - - if (clusterMetrics != null && !clusterMetrics.isEmpty()) { - for (Map.Entry clusterMetricEntry : - clusterMetrics.entrySet()) { - - TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey(); - Double avgValue = clusterMetricEntry.getValue(); - - MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric); - - if (aggregate == null) { - aggregate = new MetricClusterAggregate(avgValue, 1, null, avgValue, avgValue); - aggregateClusterMetrics.put(clusterMetric, aggregate); - } else { - aggregate.updateSum(avgValue); - aggregate.updateNumberOfHosts(1); - aggregate.updateMax(avgValue); - aggregate.updateMin(avgValue); - } - // Update app level aggregates - appAggregator.processTimelineClusterMetric(clusterMetric, - metric.getHostName(), avgValue); + TimelineMetric metric = null; + if (rs.next()) { + metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs); + + // Call slice after all rows for a host are read + while (rs.next()) { + TimelineMetric nextMetric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs); + // If rows belong to same host combine them before slicing. This + // avoids issues across rows that belong to same hosts but get + // counted as coming from different ones. + if (metric.equalsExceptTime(nextMetric)) { + metric.addMetricValues(nextMetric.getMetricValues()); + } else { + // Process the current metric + processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices); + metric = nextMetric; } } } + // Process last metric + if (metric != null) { + processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices); + } + // Add app level aggregates to save aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics()); return aggregateClusterMetrics; } + /** + * Slice metric values into interval specified by : + * timeline.metrics.cluster.aggregator.minute.timeslice.interval + * Normalize value by averaging them within the interval + */ + private void processAggregateClusterMetrics(Map aggregateClusterMetrics, + TimelineMetric metric, List timeSlices) { + // Create time slices + Map clusterMetrics = sliceFromTimelineMetric(metric, timeSlices); + + if (clusterMetrics != null && !clusterMetrics.isEmpty()) { + for (Map.Entry clusterMetricEntry : + clusterMetrics.entrySet()) { + + TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey(); + Double avgValue = clusterMetricEntry.getValue(); + + MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric); + + if (aggregate == null) { + aggregate = new MetricClusterAggregate(avgValue, 1, null, avgValue, avgValue); + aggregateClusterMetrics.put(clusterMetric, aggregate); + } else { + aggregate.updateSum(avgValue); + aggregate.updateNumberOfHosts(1); + aggregate.updateMax(avgValue); + aggregate.updateMin(avgValue); + } + // Update app level aggregates + appAggregator.processTimelineClusterMetric(clusterMetric, metric.getHostName(), avgValue); + } + } + } + private Map sliceFromTimelineMetric( - TimelineMetric timelineMetric, List timeSlices) { + TimelineMetric timelineMetric, List timeSlices) { if (timelineMetric.getMetricValues().isEmpty()) { return null; @@ -158,13 +192,20 @@ public class TimelineMetricClusterAggregatorMinute extends AbstractTimelineAggre Map timelineClusterMetricMap = new HashMap(); + Long timeShift = timelineMetric.getTimestamp() - timelineMetric.getStartTime(); + if (timeShift < 0) { + LOG.debug("Invalid time shift found, possible discrepancy in clocks. " + + "timeShift = " + timeShift); + timeShift = 0l; + } + for (Map.Entry metric : timelineMetric.getMetricValues().entrySet()) { // TODO: investigate null values - pre filter if (metric.getValue() == null) { continue; } - Long timestamp = getSliceTimeForMetric(timeSlices, - Long.parseLong(metric.getKey().toString())); + + Long timestamp = getSliceTimeForMetric(timeSlices, Long.parseLong(metric.getKey().toString())); if (timestamp != -1) { // Metric is within desired time range TimelineClusterMetric clusterMetric = new TimelineClusterMetric( @@ -173,12 +214,24 @@ public class TimelineMetricClusterAggregatorMinute extends AbstractTimelineAggre timelineMetric.getInstanceId(), timestamp, timelineMetric.getType()); + + // do a sum / count here to get average for all points in a slice + int count = 1; + Double sum; if (!timelineClusterMetricMap.containsKey(clusterMetric)) { - timelineClusterMetricMap.put(clusterMetric, metric.getValue()); + sum = metric.getValue(); } else { + count++; Double oldValue = timelineClusterMetricMap.get(clusterMetric); - Double newValue = (oldValue + metric.getValue()) / 2; - timelineClusterMetricMap.put(clusterMetric, newValue); + sum = oldValue + metric.getValue(); + } + timelineClusterMetricMap.put(clusterMetric, (sum / count)); + } else { + if (timelineMetric.getMetricName().equals("tserver.general.entries")) { + LOG.info("--- Fallen off: serverTs = " + timelineMetric.getTimestamp() + + ", timeShift: " + timeShift + + ", timestamp: " + Long.parseLong(metric.getKey().toString()) + + ", host = " + timelineMetric.getHostName()); } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/8b0c964a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java index 398f4c3..573e09d 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java @@ -54,7 +54,9 @@ public class TimelineMetricReadHelper { TimelineMetric metric = new TimelineMetric(); metric.setMetricName(rs.getString("METRIC_NAME")); metric.setAppId(rs.getString("APP_ID")); - if (!ignoreInstance) metric.setInstanceId(rs.getString("INSTANCE_ID")); + if (!ignoreInstance) { + metric.setInstanceId(rs.getString("INSTANCE_ID")); + } metric.setHostName(rs.getString("HOSTNAME")); metric.setTimestamp(rs.getLong("SERVER_TIME")); metric.setStartTime(rs.getLong("START_TIME")); http://git-wip-us.apache.org/repos/asf/ambari/blob/8b0c964a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java index 6cfaa2e..643e5cc 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java @@ -20,7 +20,11 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.IntegrationTestingUtility; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConnectionProvider; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.phoenix.hbase.index.write.IndexWriterUtils; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryServices; @@ -30,16 +34,22 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; + +import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.LOG; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.assertj.core.api.Assertions.assertThat; @@ -116,8 +126,7 @@ public abstract class AbstractMiniHBaseClusterTest extends BaseTest { protected PhoenixHBaseAccessor createTestableHBaseAccessor() { Configuration metricsConf = new Configuration(); - metricsConf.set( - TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME, "NONE"); + metricsConf.set(TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME, "NONE"); return new PhoenixHBaseAccessor( @@ -136,4 +145,71 @@ public abstract class AbstractMiniHBaseClusterTest extends BaseTest { } }); } + + protected void insertMetricRecords(Connection conn, TimelineMetrics metrics, long currentTime) + throws SQLException, IOException { + + List timelineMetrics = metrics.getMetrics(); + if (timelineMetrics == null || timelineMetrics.isEmpty()) { + LOG.debug("Empty metrics insert request."); + return; + } + + PreparedStatement metricRecordStmt = null; + + try { + metricRecordStmt = conn.prepareStatement(String.format( + UPSERT_METRICS_SQL, METRICS_RECORD_TABLE_NAME)); + + for (TimelineMetric metric : timelineMetrics) { + metricRecordStmt.clearParameters(); + + if (LOG.isTraceEnabled()) { + LOG.trace("host: " + metric.getHostName() + ", " + + "metricName = " + metric.getMetricName() + ", " + + "values: " + metric.getMetricValues()); + } + double[] aggregates = AggregatorUtils.calculateAggregates( + metric.getMetricValues()); + + metricRecordStmt.setString(1, metric.getMetricName()); + metricRecordStmt.setString(2, metric.getHostName()); + metricRecordStmt.setString(3, metric.getAppId()); + metricRecordStmt.setString(4, metric.getInstanceId()); + metricRecordStmt.setLong(5, currentTime); + metricRecordStmt.setLong(6, metric.getStartTime()); + metricRecordStmt.setString(7, metric.getType()); + metricRecordStmt.setDouble(8, aggregates[0]); + metricRecordStmt.setDouble(9, aggregates[1]); + metricRecordStmt.setDouble(10, aggregates[2]); + metricRecordStmt.setLong(11, (long) aggregates[3]); + String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues()); + metricRecordStmt.setString(12, json); + + try { + metricRecordStmt.executeUpdate(); + } catch (SQLException sql) { + LOG.error(sql); + } + } + + conn.commit(); + + } finally { + if (metricRecordStmt != null) { + try { + metricRecordStmt.close(); + } catch (SQLException e) { + // Ignore + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException sql) { + // Ignore + } + } + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/8b0c964a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java index fb3bc30..fb3bd31 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java @@ -18,7 +18,10 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; +import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric; @@ -37,8 +40,10 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import static junit.framework.Assert.assertEquals; @@ -48,8 +53,10 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA; public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { @@ -113,8 +120,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), METRICS_CLUSTER_AGGREGATE_TABLE_NAME)); - PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt - (conn, condition); + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); ResultSet rs = pstmt.executeQuery(); int recordCount = 0; @@ -144,7 +150,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { long startTime = System.currentTimeMillis(); long ctime = startTime; - long minute = 60 * 1000; + long minute = 60 * 1000 * 2; /** * Here we have two nodes with two instances each: @@ -153,27 +159,33 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { * instance i2 | 3 | 4 | * */ - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", + // Four 1's at ctime - 100 + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local1", "i1", "disk_free", 1)); - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", + // Four 2's at ctime - 100: different host + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local2", "i1", "disk_free", 2)); - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", + // Avoid overwrite + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local1", "i2", "disk_free", 3)); - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local2", "i2", "disk_free", 4)); + ctime += minute; - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", + // Four 1's at ctime + 2 min + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local1", "i1", "disk_free", 1)); - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", + // Four 1's at ctime + 2 min - different host + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local2", "i1", "disk_free", 3)); - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local1", "i2", "disk_free", 2)); - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local2", "i2", "disk_free", 4)); // WHEN long endTime = ctime + minute; - boolean success = agg.doWork(startTime, endTime); + boolean success = agg.doWork(startTime - 1000, endTime + 1000); //THEN Condition condition = new DefaultCondition(null, null, null, null, startTime, @@ -182,29 +194,26 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), METRICS_CLUSTER_AGGREGATE_TABLE_NAME)); - PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt - (conn, condition); + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); ResultSet rs = pstmt.executeQuery(); int recordCount = 0; while (rs.next()) { TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs); -// PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet(rs); MetricClusterAggregate currentHostAggregate = readHelper.getMetricClusterAggregateFromResultSet(rs); if ("disk_free".equals(currentMetric.getMetricName())) { - System.out.println("OUTPUT: " + currentMetric+" - " + - ""+currentHostAggregate); - assertEquals(4, currentHostAggregate.getNumberOfHosts()); - assertEquals(4.0, currentHostAggregate.getMax()); - assertEquals(1.0, currentHostAggregate.getMin()); - assertEquals(10.0, currentHostAggregate.getSum()); + System.out.println("OUTPUT: " + currentMetric + " - " + currentHostAggregate); + assertEquals(2, currentHostAggregate.getNumberOfHosts()); + assertEquals(5.0, currentHostAggregate.getSum()); recordCount++; } else { fail("Unexpected entry"); } } + + Assert.assertEquals(8, recordCount); } @Test @@ -244,8 +253,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), METRICS_CLUSTER_AGGREGATE_TABLE_NAME)); - PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt - (conn, condition); + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); ResultSet rs = pstmt.executeQuery(); int recordCount = 0; @@ -476,6 +484,82 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { assertEquals(1.0d, currentHostAggregate.getSum()); } + @Test + public void testClusterAggregateMetricNormalization() throws Exception { + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new Configuration()); + TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); + + // Sample data + TimelineMetric metric1 = new TimelineMetric(); + metric1.setMetricName("yarn.ClusterMetrics.NumActiveNMs"); + metric1.setAppId("resourcemanager"); + metric1.setHostName("h1"); + metric1.setStartTime(1431372311811l); + metric1.setMetricValues(new HashMap() {{ + put(1431372311811l, 1.0); + put(1431372321811l, 1.0); + put(1431372331811l, 1.0); + put(1431372341811l, 1.0); + put(1431372351811l, 1.0); + put(1431372361811l, 1.0); + put(1431372371810l, 1.0); + }}); + + TimelineMetric metric2 = new TimelineMetric(); + metric2.setMetricName("yarn.ClusterMetrics.NumActiveNMs"); + metric2.setAppId("resourcemanager"); + metric2.setHostName("h1"); + metric2.setStartTime(1431372381810l); + metric2.setMetricValues(new HashMap() {{ + put(1431372381810l, 1.0); + put(1431372391811l, 1.0); + put(1431372401811l, 1.0); + put(1431372411811l, 1.0); + put(1431372421811l, 1.0); + put(1431372431811l, 1.0); + put(1431372441810l, 1.0); + }}); + + TimelineMetrics metrics = new TimelineMetrics(); + metrics.setMetrics(Collections.singletonList(metric1)); + insertMetricRecords(conn, metrics, 1431372371810l); + + metrics.setMetrics(Collections.singletonList(metric2)); + insertMetricRecords(conn, metrics, 1431372441810l); + + long startTime = 1431372055000l; + long endTime = 1431372655000l; + + agg.doWork(startTime, endTime); + + Condition condition = new DefaultCondition(null, null, null, null, startTime, + endTime, null, null, true); + condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), + METRICS_CLUSTER_AGGREGATE_TABLE_NAME)); + + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); + ResultSet rs = pstmt.executeQuery(); + + int recordCount = 0; + while (rs.next()) { + TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs); + MetricClusterAggregate currentHostAggregate = readHelper.getMetricClusterAggregateFromResultSet(rs); + + if ("yarn.ClusterMetrics.NumActiveNMs".equals(currentMetric.getMetricName())) { + assertEquals(1, currentHostAggregate.getNumberOfHosts()); + assertEquals(1.0, currentHostAggregate.getMax()); + assertEquals(1.0, currentHostAggregate.getMin()); + assertEquals(1.0, currentHostAggregate.getSum()); + recordCount++; + } else { + fail("Unexpected entry"); + } + } + Assert.assertEquals(9, recordCount); + } + private ResultSet executeQuery(String query) throws SQLException { Connection conn = getConnection(getUrl()); Statement stmt = conn.createStatement(); http://git-wip-us.apache.org/repos/asf/ambari/blob/8b0c964a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml index fc52e5a..c716bea 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml @@ -198,7 +198,7 @@ timeline.metrics.cluster.aggregator.minute.timeslice.interval - 15 + 30 Lowest resolution of desired data for cluster level minute aggregates.