Return-Path: X-Original-To: apmail-ambari-commits-archive@www.apache.org Delivered-To: apmail-ambari-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 404AC18738 for ; Wed, 4 Nov 2015 05:34:10 +0000 (UTC) Received: (qmail 20945 invoked by uid 500); 4 Nov 2015 05:34:10 -0000 Delivered-To: apmail-ambari-commits-archive@ambari.apache.org Received: (qmail 20917 invoked by uid 500); 4 Nov 2015 05:34:10 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 20907 invoked by uid 99); 4 Nov 2015 05:34:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Nov 2015 05:34:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C69D9E00C5; Wed, 4 Nov 2015 05:34:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: swagle@apache.org To: commits@ambari.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: ambari git commit: AMBARI-13701. Introduce cluster wide MINUTE aggregator in Ambari Metrics service. (Aravindan Vijayan via swagle) Date: Wed, 4 Nov 2015 05:34:09 +0000 (UTC) Repository: ambari Updated Branches: refs/heads/trunk 6bc870f6c -> 4c3be3975 AMBARI-13701. Introduce cluster wide MINUTE aggregator in Ambari Metrics service. (Aravindan Vijayan via swagle) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/4c3be397 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/4c3be397 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/4c3be397 Branch: refs/heads/trunk Commit: 4c3be3975ace624be966639c9575d5b38aa038c9 Parents: 6bc870f Author: Siddharth Wagle Authored: Tue Nov 3 21:33:52 2015 -0800 Committer: Siddharth Wagle Committed: Tue Nov 3 21:33:52 2015 -0800 ---------------------------------------------------------------------- .../timeline/HBaseTimelineMetricStore.java | 10 +- .../metrics/timeline/PhoenixHBaseAccessor.java | 33 ++- .../timeline/TimelineMetricConfiguration.java | 14 +- .../TimelineMetricAggregatorFactory.java | 79 +++++- .../TimelineMetricClusterAggregatorMinute.java | 248 ------------------- .../TimelineMetricClusterAggregatorSecond.java | 248 +++++++++++++++++++ .../timeline/query/PhoenixTransactSQL.java | 9 +- .../metrics/timeline/ITClusterAggregator.java | 92 ++++++- .../timeline/ITPhoenixHBaseAccessor.java | 7 +- .../cache/TimelineMetricCacheEntryFactory.java | 19 +- .../server/upgrade/UpgradeCatalog213.java | 22 ++ .../0.1.0/configuration/ams-site.xml | 54 +++- .../0.1.0/package/scripts/split_points.py | 2 +- .../server/upgrade/UpgradeCatalog213Test.java | 76 ++++++ 14 files changed, 601 insertions(+), 312 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/4c3be397/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java index 17df629..aed5fed 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java @@ -76,7 +76,15 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin LOG.info("Using group by aggregators for aggregating host and cluster metrics."); } - // Start the cluster aggregator minute + // Start the cluster aggregator second + TimelineMetricAggregator secondClusterAggregator = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hBaseAccessor, metricsConf); + if (!secondClusterAggregator.isDisabled()) { + Thread aggregatorThread = new Thread(secondClusterAggregator); + aggregatorThread.start(); + } + + // Start the minute cluster aggregator TimelineMetricAggregator minuteClusterAggregator = TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hBaseAccessor, metricsConf); if (!minuteClusterAggregator.isDisabled()) { http://git-wip-us.apache.org/repos/asf/ambari/blob/4c3be397/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java index 3ce30fd..be06650 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java @@ -51,8 +51,6 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -63,6 +61,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_SECOND_TABLE_TTL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_MAX_RETRIES; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RESULT_LIMIT; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RETRY_INTERVAL; @@ -76,7 +75,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ALTER_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_TABLE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING; @@ -86,6 +85,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL; @@ -242,13 +242,14 @@ public class PhoenixHBaseAccessor { String encoding = metricsConf.get(HBASE_ENCODING_SCHEME, DEFAULT_ENCODING); String compression = metricsConf.get(HBASE_COMPRESSION_SCHEME, DEFAULT_TABLE_COMPRESSION); - String precisionTtl = metricsConf.get(PRECISION_TABLE_TTL, "86400"); - String hostMinTtl = metricsConf.get(HOST_MINUTE_TABLE_TTL, "604800"); - String hostHourTtl = metricsConf.get(HOST_HOUR_TABLE_TTL, "2592000"); - String hostDailyTtl = metricsConf.get(HOST_DAILY_TABLE_TTL, "31536000"); - String clusterMinTtl = metricsConf.get(CLUSTER_MINUTE_TABLE_TTL, "2592000"); - String clusterHourTtl = metricsConf.get(CLUSTER_HOUR_TABLE_TTL, "31536000"); - String clusterDailyTtl = metricsConf.get(CLUSTER_DAILY_TABLE_TTL, "31536000"); + String precisionTtl = metricsConf.get(PRECISION_TABLE_TTL, "86400"); //1 day + String hostMinTtl = metricsConf.get(HOST_MINUTE_TABLE_TTL, "604800"); //7 days + String hostHourTtl = metricsConf.get(HOST_HOUR_TABLE_TTL, "2592000"); //30 days + String hostDailyTtl = metricsConf.get(HOST_DAILY_TABLE_TTL, "31536000"); //1 year + String clusterSecTtl = metricsConf.get(CLUSTER_SECOND_TABLE_TTL, "2592000"); //7 days + String clusterMinTtl = metricsConf.get(CLUSTER_MINUTE_TABLE_TTL, "7776000"); //30 days + String clusterHourTtl = metricsConf.get(CLUSTER_HOUR_TABLE_TTL, "31536000"); //1 year + String clusterDailyTtl = metricsConf.get(CLUSTER_DAILY_TABLE_TTL, "63072000"); //2 years try { LOG.info("Initializing metrics schema..."); @@ -278,9 +279,11 @@ public class PhoenixHBaseAccessor { aggregateSql += getSplitPointsStr(splitPoints); } stmt.executeUpdate(aggregateSql); - stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL, + stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL, + METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, encoding, clusterHourTtl, compression)); + stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL, METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, encoding, clusterHourTtl, compression)); - stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL, + stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL, METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, encoding, clusterDailyTtl, compression)); //alter TTL options to update tables @@ -298,6 +301,9 @@ public class PhoenixHBaseAccessor { hostDailyTtl)); stmt.executeUpdate(String.format(ALTER_SQL, METRICS_CLUSTER_AGGREGATE_TABLE_NAME, + clusterSecTtl)); + stmt.executeUpdate(String.format(ALTER_SQL, + METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, clusterMinTtl)); stmt.executeUpdate(String.format(ALTER_SQL, METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, @@ -664,7 +670,8 @@ public class PhoenixHBaseAccessor { for (Function aggregateFunction : functions) { SingleValuedTimelineMetric metric; - if (condition.getPrecision() == Precision.HOURS + if (condition.getPrecision() == Precision.MINUTES + || condition.getPrecision() == Precision.HOURS || condition.getPrecision() == Precision.DAYS) { metric = getAggregateTimelineMetricFromResultSet(rs, aggregateFunction, false); } else { http://git-wip-us.apache.org/repos/asf/ambari/blob/4c3be397/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java index 30e42f2..fd51f3d 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java @@ -64,6 +64,9 @@ public class TimelineMetricConfiguration { public static final String HOST_HOUR_TABLE_TTL = "timeline.metrics.host.aggregator.hourly.ttl"; + public static final String CLUSTER_SECOND_TABLE_TTL = + "timeline.metrics.cluster.aggregator.second.ttl"; + public static final String CLUSTER_MINUTE_TABLE_TTL = "timeline.metrics.cluster.aggregator.minute.ttl"; @@ -74,7 +77,7 @@ public class TimelineMetricConfiguration { "timeline.metrics.cluster.aggregator.daily.ttl"; public static final String CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL = - "timeline.metrics.cluster.aggregator.minute.timeslice.interval"; + "timeline.metrics.cluster.aggregator.second.timeslice.interval"; public static final String AGGREGATOR_CHECKPOINT_DELAY = "timeline.metrics.service.checkpointDelay"; @@ -91,6 +94,9 @@ public class TimelineMetricConfiguration { public static final String HOST_AGGREGATOR_DAILY_SLEEP_INTERVAL = "timeline.metrics.host.aggregator.daily.interval"; + public static final String CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL = + "timeline.metrics.cluster.aggregator.second.interval"; + public static final String CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL = "timeline.metrics.cluster.aggregator.minute.interval"; @@ -109,6 +115,9 @@ public class TimelineMetricConfiguration { public static final String HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER = "timeline.metrics.host.aggregator.daily.checkpointCutOffMultiplier"; + public static final String CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER = + "timeline.metrics.cluster.aggregator.second.checkpointCutOffMultiplier"; + public static final String CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER = "timeline.metrics.cluster.aggregator.minute.checkpointCutOffMultiplier"; @@ -136,6 +145,9 @@ public class TimelineMetricConfiguration { public static final String HOST_AGGREGATOR_DAILY_DISABLED = "timeline.metrics.host.aggregator.hourly.disabled"; + public static final String CLUSTER_AGGREGATOR_SECOND_DISABLED = + "timeline.metrics.cluster.aggregator.second.disabled"; + public static final String CLUSTER_AGGREGATOR_MINUTE_DISABLED = "timeline.metrics.cluster.aggregator.minute.disabled"; http://git-wip-us.apache.org/repos/asf/ambari/blob/4c3be397/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java index f07918c..ba019fa 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java @@ -28,8 +28,11 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_DISABLED; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION; @@ -49,6 +52,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; @@ -65,6 +69,8 @@ public class TimelineMetricAggregatorFactory { "timeline-metrics-host-aggregator-daily-checkpoint"; private static final String CLUSTER_AGGREGATOR_CHECKPOINT_FILE = "timeline-metrics-cluster-aggregator-checkpoint"; + private static final String CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_FILE = + "timeline-metrics-cluster-aggregator-minute-checkpoint"; private static final String CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE = "timeline-metrics-cluster-aggregator-hourly-checkpoint"; private static final String CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_FILE = @@ -76,6 +82,7 @@ public class TimelineMetricAggregatorFactory { /** * Minute based aggregation for hosts. + * Interval : 5 mins */ public static TimelineMetricAggregator createTimelineMetricAggregatorMinute (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { @@ -119,6 +126,7 @@ public class TimelineMetricAggregatorFactory { /** * Hourly aggregation for hosts. + * Interval : 1 hour */ public static TimelineMetricAggregator createTimelineMetricAggregatorHourly (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { @@ -162,6 +170,7 @@ public class TimelineMetricAggregatorFactory { /** * Daily aggregation for hosts. + * Interval : 1 day */ public static TimelineMetricAggregator createTimelineMetricAggregatorDaily (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { @@ -204,10 +213,12 @@ public class TimelineMetricAggregatorFactory { } /** - * Minute based aggregation for cluster. + * Second aggregation for cluster. + * Interval : 2 mins + * Timeslice : 30 sec */ - public static TimelineMetricAggregator createTimelineClusterAggregatorMinute( - PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { + public static TimelineMetricAggregator createTimelineClusterAggregatorSecond( + PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { String checkpointDir = metricsConf.get( TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); @@ -216,20 +227,20 @@ public class TimelineMetricAggregatorFactory { CLUSTER_AGGREGATOR_CHECKPOINT_FILE); long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong - (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 120l)); + (CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL, 120l)); long timeSliceIntervalMillis = SECONDS.toMillis(metricsConf.getInt - (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 15)); + (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 30)); int checkpointCutOffMultiplier = - metricsConf.getInt(CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2); + metricsConf.getInt(CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER, 2); String inputTableName = METRICS_RECORD_TABLE_NAME; String outputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME; - String aggregatorDisabledParam = CLUSTER_AGGREGATOR_MINUTE_DISABLED; + String aggregatorDisabledParam = CLUSTER_AGGREGATOR_SECOND_DISABLED; - // Minute based aggregation have added responsibility of time slicing - return new TimelineMetricClusterAggregatorMinute( + // Second based aggregation have added responsibility of time slicing + return new TimelineMetricClusterAggregatorSecond( hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -243,7 +254,56 @@ public class TimelineMetricAggregatorFactory { } /** + * Minute aggregation for cluster. + * Interval : 5 mins + */ + public static TimelineMetricAggregator createTimelineClusterAggregatorMinute( + PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { + + String checkpointDir = metricsConf.get( + TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); + + String checkpointLocation = FilenameUtils.concat(checkpointDir, + CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_FILE); + + long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong + (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l)); + + int checkpointCutOffMultiplier = metricsConf.getInt + (CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2); + + String inputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME; + String outputTableName = METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME; + String aggregatorDisabledParam = CLUSTER_AGGREGATOR_MINUTE_DISABLED; + + if (useGroupByAggregator(metricsConf)) { + return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator( + hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + aggregatorDisabledParam, + inputTableName, + outputTableName, + 120000l + ); + } + + return new TimelineMetricClusterAggregator( + hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + aggregatorDisabledParam, + inputTableName, + outputTableName, + 120000l + ); + } + + /** * Hourly aggregation for cluster. + * Interval : 1 hour */ public static TimelineMetricAggregator createTimelineClusterAggregatorHourly( PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { @@ -291,6 +351,7 @@ public class TimelineMetricAggregatorFactory { /** * Daily aggregation for cluster. + * Interval : 1 day */ public static TimelineMetricAggregator createTimelineClusterAggregatorDaily( PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { http://git-wip-us.apache.org/repos/asf/ambari/blob/4c3be397/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java deleted file mode 100644 index 85bdbbc..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java +++ /dev/null @@ -1,248 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; - - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; -import java.io.IOException; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA; - -/** - * Aggregates a metric across all hosts in the cluster. Reads metrics from - * the precision table and saves into the aggregate. - */ -public class TimelineMetricClusterAggregatorMinute extends AbstractTimelineAggregator { - private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregatorMinute.class); - public Long timeSliceIntervalMillis; - private TimelineMetricReadHelper timelineMetricReadHelper = new TimelineMetricReadHelper(true); - // Aggregator to perform app-level aggregates for host metrics - private final TimelineMetricAppAggregator appAggregator; - // 1 minute client side buffering adjustment - private final Long serverTimeShiftAdjustment; - - public TimelineMetricClusterAggregatorMinute(PhoenixHBaseAccessor hBaseAccessor, - Configuration metricsConf, - String checkpointLocation, - Long sleepIntervalMillis, - Integer checkpointCutOffMultiplier, - String aggregatorDisabledParam, - String tableName, - String outputTableName, - Long nativeTimeRangeDelay, - Long timeSliceInterval) { - super(hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, - checkpointCutOffMultiplier, aggregatorDisabledParam, tableName, - outputTableName, nativeTimeRangeDelay); - - appAggregator = new TimelineMetricAppAggregator(metricsConf); - this.timeSliceIntervalMillis = timeSliceInterval; - this.serverTimeShiftAdjustment = Long.parseLong(metricsConf.get(SERVER_SIDE_TIMESIFT_ADJUSTMENT, "90000")); - } - - @Override - protected void aggregate(ResultSet rs, long startTime, long endTime) throws SQLException, IOException { - // Account for time shift due to client side buffering by shifting the - // timestamps with the difference between server time and series start time - List timeSlices = getTimeSlices(startTime - serverTimeShiftAdjustment, endTime); - // Initialize app aggregates for host metrics - appAggregator.init(); - Map aggregateClusterMetrics = - aggregateMetricsFromResultSet(rs, timeSlices); - - LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates."); - hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics); - appAggregator.cleanup(); - } - - @Override - protected Condition prepareMetricQueryCondition(long startTime, long endTime) { - Condition condition = new DefaultCondition(null, null, null, null, startTime, - endTime, null, null, true); - condition.setNoLimit(); - condition.setFetchSize(resultsetFetchSize); - condition.setStatement(String.format(GET_METRIC_SQL, - PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), - METRICS_RECORD_TABLE_NAME)); - // Retaining order of the row-key avoids client side merge sort. - condition.addOrderByColumn("METRIC_NAME"); - condition.addOrderByColumn("HOSTNAME"); - condition.addOrderByColumn("SERVER_TIME"); - condition.addOrderByColumn("APP_ID"); - return condition; - } - - /** - * Return time slices to normalize the timeseries data. - */ - private List getTimeSlices(long startTime, long endTime) { - List timeSlices = new ArrayList(); - long sliceStartTime = startTime; - while (sliceStartTime < endTime) { - timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis }); - sliceStartTime += timeSliceIntervalMillis; - } - return timeSlices; - } - - private Map aggregateMetricsFromResultSet(ResultSet rs, List timeSlices) - throws SQLException, IOException { - Map aggregateClusterMetrics = - new HashMap(); - - TimelineMetric metric = null; - if (rs.next()) { - metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs); - - // Call slice after all rows for a host are read - while (rs.next()) { - TimelineMetric nextMetric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs); - // If rows belong to same host combine them before slicing. This - // avoids issues across rows that belong to same hosts but get - // counted as coming from different ones. - if (metric.equalsExceptTime(nextMetric)) { - metric.addMetricValues(nextMetric.getMetricValues()); - } else { - // Process the current metric - processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices); - metric = nextMetric; - } - } - } - // Process last metric - if (metric != null) { - processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices); - } - - // Add app level aggregates to save - aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics()); - return aggregateClusterMetrics; - } - - /** - * Slice metric values into interval specified by : - * timeline.metrics.cluster.aggregator.minute.timeslice.interval - * Normalize value by averaging them within the interval - */ - private void processAggregateClusterMetrics(Map aggregateClusterMetrics, - TimelineMetric metric, List timeSlices) { - // Create time slices - Map clusterMetrics = sliceFromTimelineMetric(metric, timeSlices); - - if (clusterMetrics != null && !clusterMetrics.isEmpty()) { - for (Map.Entry clusterMetricEntry : - clusterMetrics.entrySet()) { - - TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey(); - Double avgValue = clusterMetricEntry.getValue(); - - MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric); - - if (aggregate == null) { - aggregate = new MetricClusterAggregate(avgValue, 1, null, avgValue, avgValue); - aggregateClusterMetrics.put(clusterMetric, aggregate); - } else { - aggregate.updateSum(avgValue); - aggregate.updateNumberOfHosts(1); - aggregate.updateMax(avgValue); - aggregate.updateMin(avgValue); - } - // Update app level aggregates - appAggregator.processTimelineClusterMetric(clusterMetric, metric.getHostName(), avgValue); - } - } - } - - private Map sliceFromTimelineMetric( - TimelineMetric timelineMetric, List timeSlices) { - - if (timelineMetric.getMetricValues().isEmpty()) { - return null; - } - - Map timelineClusterMetricMap = - new HashMap(); - - Long timeShift = timelineMetric.getTimestamp() - timelineMetric.getStartTime(); - if (timeShift < 0) { - LOG.debug("Invalid time shift found, possible discrepancy in clocks. " + - "timeShift = " + timeShift); - timeShift = 0l; - } - - for (Map.Entry metric : timelineMetric.getMetricValues().entrySet()) { - // TODO: investigate null values - pre filter - if (metric.getValue() == null) { - continue; - } - - Long timestamp = getSliceTimeForMetric(timeSlices, Long.parseLong(metric.getKey().toString())); - if (timestamp != -1) { - // Metric is within desired time range - TimelineClusterMetric clusterMetric = new TimelineClusterMetric( - timelineMetric.getMetricName(), - timelineMetric.getAppId(), - timelineMetric.getInstanceId(), - timestamp, - timelineMetric.getType()); - - // do a sum / count here to get average for all points in a slice - int count = 1; - Double sum; - if (!timelineClusterMetricMap.containsKey(clusterMetric)) { - sum = metric.getValue(); - } else { - count++; - Double oldValue = timelineClusterMetricMap.get(clusterMetric); - sum = oldValue + metric.getValue(); - } - timelineClusterMetricMap.put(clusterMetric, (sum / count)); - } - } - - return timelineClusterMetricMap; - } - - /** - * Return beginning of the time slice into which the metric fits. - */ - private Long getSliceTimeForMetric(List timeSlices, Long timestamp) { - for (Long[] timeSlice : timeSlices) { - if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) { - return timeSlice[0]; - } - } - return -1l; - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/4c3be397/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java new file mode 100644 index 0000000..1c7bf7f --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA; + +/** + * Aggregates a metric across all hosts in the cluster. Reads metrics from + * the precision table and saves into the aggregate. + */ +public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggregator { + private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregatorSecond.class); + public Long timeSliceIntervalMillis; + private TimelineMetricReadHelper timelineMetricReadHelper = new TimelineMetricReadHelper(true); + // Aggregator to perform app-level aggregates for host metrics + private final TimelineMetricAppAggregator appAggregator; + // 1 minute client side buffering adjustment + private final Long serverTimeShiftAdjustment; + + public TimelineMetricClusterAggregatorSecond(PhoenixHBaseAccessor hBaseAccessor, + Configuration metricsConf, + String checkpointLocation, + Long sleepIntervalMillis, + Integer checkpointCutOffMultiplier, + String aggregatorDisabledParam, + String tableName, + String outputTableName, + Long nativeTimeRangeDelay, + Long timeSliceInterval) { + super(hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, + checkpointCutOffMultiplier, aggregatorDisabledParam, tableName, + outputTableName, nativeTimeRangeDelay); + + appAggregator = new TimelineMetricAppAggregator(metricsConf); + this.timeSliceIntervalMillis = timeSliceInterval; + this.serverTimeShiftAdjustment = Long.parseLong(metricsConf.get(SERVER_SIDE_TIMESIFT_ADJUSTMENT, "90000")); + } + + @Override + protected void aggregate(ResultSet rs, long startTime, long endTime) throws SQLException, IOException { + // Account for time shift due to client side buffering by shifting the + // timestamps with the difference between server time and series start time + List timeSlices = getTimeSlices(startTime - serverTimeShiftAdjustment, endTime); + // Initialize app aggregates for host metrics + appAggregator.init(); + Map aggregateClusterMetrics = + aggregateMetricsFromResultSet(rs, timeSlices); + + LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates."); + hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics); + appAggregator.cleanup(); + } + + @Override + protected Condition prepareMetricQueryCondition(long startTime, long endTime) { + Condition condition = new DefaultCondition(null, null, null, null, startTime, + endTime, null, null, true); + condition.setNoLimit(); + condition.setFetchSize(resultsetFetchSize); + condition.setStatement(String.format(GET_METRIC_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), + METRICS_RECORD_TABLE_NAME)); + // Retaining order of the row-key avoids client side merge sort. + condition.addOrderByColumn("METRIC_NAME"); + condition.addOrderByColumn("HOSTNAME"); + condition.addOrderByColumn("SERVER_TIME"); + condition.addOrderByColumn("APP_ID"); + return condition; + } + + /** + * Return time slices to normalize the timeseries data. + */ + private List getTimeSlices(long startTime, long endTime) { + List timeSlices = new ArrayList(); + long sliceStartTime = startTime; + while (sliceStartTime < endTime) { + timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis }); + sliceStartTime += timeSliceIntervalMillis; + } + return timeSlices; + } + + private Map aggregateMetricsFromResultSet(ResultSet rs, List timeSlices) + throws SQLException, IOException { + Map aggregateClusterMetrics = + new HashMap(); + + TimelineMetric metric = null; + if (rs.next()) { + metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs); + + // Call slice after all rows for a host are read + while (rs.next()) { + TimelineMetric nextMetric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs); + // If rows belong to same host combine them before slicing. This + // avoids issues across rows that belong to same hosts but get + // counted as coming from different ones. + if (metric.equalsExceptTime(nextMetric)) { + metric.addMetricValues(nextMetric.getMetricValues()); + } else { + // Process the current metric + processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices); + metric = nextMetric; + } + } + } + // Process last metric + if (metric != null) { + processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices); + } + + // Add app level aggregates to save + aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics()); + return aggregateClusterMetrics; + } + + /** + * Slice metric values into interval specified by : + * timeline.metrics.cluster.aggregator.minute.timeslice.interval + * Normalize value by averaging them within the interval + */ + private void processAggregateClusterMetrics(Map aggregateClusterMetrics, + TimelineMetric metric, List timeSlices) { + // Create time slices + Map clusterMetrics = sliceFromTimelineMetric(metric, timeSlices); + + if (clusterMetrics != null && !clusterMetrics.isEmpty()) { + for (Map.Entry clusterMetricEntry : + clusterMetrics.entrySet()) { + + TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey(); + Double avgValue = clusterMetricEntry.getValue(); + + MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric); + + if (aggregate == null) { + aggregate = new MetricClusterAggregate(avgValue, 1, null, avgValue, avgValue); + aggregateClusterMetrics.put(clusterMetric, aggregate); + } else { + aggregate.updateSum(avgValue); + aggregate.updateNumberOfHosts(1); + aggregate.updateMax(avgValue); + aggregate.updateMin(avgValue); + } + // Update app level aggregates + appAggregator.processTimelineClusterMetric(clusterMetric, metric.getHostName(), avgValue); + } + } + } + + private Map sliceFromTimelineMetric( + TimelineMetric timelineMetric, List timeSlices) { + + if (timelineMetric.getMetricValues().isEmpty()) { + return null; + } + + Map timelineClusterMetricMap = + new HashMap(); + + Long timeShift = timelineMetric.getTimestamp() - timelineMetric.getStartTime(); + if (timeShift < 0) { + LOG.debug("Invalid time shift found, possible discrepancy in clocks. " + + "timeShift = " + timeShift); + timeShift = 0l; + } + + for (Map.Entry metric : timelineMetric.getMetricValues().entrySet()) { + // TODO: investigate null values - pre filter + if (metric.getValue() == null) { + continue; + } + + Long timestamp = getSliceTimeForMetric(timeSlices, Long.parseLong(metric.getKey().toString())); + if (timestamp != -1) { + // Metric is within desired time range + TimelineClusterMetric clusterMetric = new TimelineClusterMetric( + timelineMetric.getMetricName(), + timelineMetric.getAppId(), + timelineMetric.getInstanceId(), + timestamp, + timelineMetric.getType()); + + // do a sum / count here to get average for all points in a slice + int count = 1; + Double sum; + if (!timelineClusterMetricMap.containsKey(clusterMetric)) { + sum = metric.getValue(); + } else { + count++; + Double oldValue = timelineClusterMetricMap.get(clusterMetric); + sum = oldValue + metric.getValue(); + } + timelineClusterMetricMap.put(clusterMetric, (sum / count)); + } + } + + return timelineClusterMetricMap; + } + + /** + * Return beginning of the time slice into which the metric fits. + */ + private Long getSliceTimeForMetric(List timeSlices, Long timestamp) { + for (Long[] timeSlice : timeSlices) { + if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) { + return timeSlice[0]; + } + } + return -1l; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4c3be397/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java index 092c983..92d59e2 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java @@ -86,7 +86,7 @@ public class PhoenixTransactSQL { "TTL=%s, COMPRESSION='%s'"; // HOSTS_COUNT vs METRIC_COUNT - public static final String CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL = + public static final String CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL = "CREATE TABLE IF NOT EXISTS %s " + "(METRIC_NAME VARCHAR, " + "APP_ID VARCHAR, " + @@ -248,6 +248,8 @@ public class PhoenixTransactSQL { "METRIC_RECORD_DAILY"; public static final String METRICS_CLUSTER_AGGREGATE_TABLE_NAME = "METRIC_AGGREGATE"; + public static final String METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME = + "METRIC_AGGREGATE_MINUTE"; public static final String METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME = "METRIC_AGGREGATE_HOURLY"; public static final String METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME = @@ -555,7 +557,10 @@ public class PhoenixTransactSQL { metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL; break; - //TODO : Include MINUTE case after introducing CLUSTER_AGGREGATOR_MINUTE + case MINUTES: + metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME; + queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL; + break; default: metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_TABLE_NAME; queryStmt = GET_CLUSTER_AGGREGATE_SQL; http://git-wip-us.apache.org/repos/asf/ambari/blob/4c3be397/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java index cbf0233..4ddecdc 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java @@ -42,15 +42,14 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.TreeMap; import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.assertTrue; import static junit.framework.Assert.fail; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineClusterMetric; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric; @@ -101,7 +100,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { public void testShouldAggregateClusterProperly() throws Exception { // GIVEN TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false)); + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, getConfigurationForTest(false)); TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); long startTime = System.currentTimeMillis(); @@ -111,7 +110,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { "disk_free", 1)); hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", "disk_free", 2)); - ctime += minute; + ctime += 2*minute; hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", "disk_free", 2)); hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", @@ -153,7 +152,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { public void testShouldAggregateClusterIgnoringInstance() throws Exception { // GIVEN TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false)); + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, getConfigurationForTest(false)); TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); long startTime = System.currentTimeMillis(); @@ -214,21 +213,21 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { if ("disk_free".equals(currentMetric.getMetricName())) { System.out.println("OUTPUT: " + currentMetric + " - " + currentHostAggregate); assertEquals(2, currentHostAggregate.getNumberOfHosts()); - assertEquals(5.0, currentHostAggregate.getSum()); + assertEquals(5.0, Math.floor(currentHostAggregate.getSum())); recordCount++; } else { fail("Unexpected entry"); } } - Assert.assertEquals(8, recordCount); + Assert.assertEquals(5, recordCount); } @Test public void testShouldAggregateDifferentMetricsOnClusterProperly() throws Exception { // GIVEN TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false)); + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, getConfigurationForTest(false)); TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); // here we put some metrics tha will be aggregated @@ -242,7 +241,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", "disk_used", 1)); - ctime += minute; + ctime += 2*minute; hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", "disk_free", 2)); hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", @@ -334,6 +333,73 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { } @Test + public void testShouldAggregateClusterOnMinuteProperly() throws Exception { + + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false)); + + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long second = 1000; + long minute = 60*second; + + Map records = + new HashMap(); + + records.put(createEmptyTimelineClusterMetric(ctime), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineClusterMetric(ctime += second), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineClusterMetric(ctime += second), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineClusterMetric(ctime += second), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + + hdb.saveClusterAggregateRecords(records); + agg.doWork(startTime, ctime + second); + long oldCtime = ctime + second; + + //Next minute + ctime = startTime + minute; + + records.put(createEmptyTimelineClusterMetric(ctime), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineClusterMetric(ctime += second), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineClusterMetric(ctime += second), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineClusterMetric(ctime += second), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + + hdb.saveClusterAggregateRecords(records); + agg.doWork(oldCtime, ctime + second); + + ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_MINUTE"); + int count = 0; + long diff = 0 ; + while (rs.next()) { + assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME")); + assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); + assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM")); + assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); + assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX")); + assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN")); + if (count == 0) { + diff+=rs.getLong("SERVER_TIME"); + } else { + diff-=rs.getLong("SERVER_TIME"); + if (diff < 0) { + diff*=-1; + } + assertTrue(diff == minute); + } + count++; + } + + assertEquals("One hourly aggregated row expected ", 2, count); + } + + @Test public void testShouldAggregateClusterOnHourProperly() throws Exception { // GIVEN TimelineMetricAggregator agg = @@ -444,7 +510,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { Configuration conf = getConfigurationForTest(false); conf.set(CLUSTER_AGGREGATOR_APP_IDS, "app1"); TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, conf); + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, conf); TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); long startTime = System.currentTimeMillis(); @@ -483,7 +549,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { currentHostAggregate = readHelper.getMetricClusterAggregateFromResultSet(rs); recordCount++; } - assertEquals(4, recordCount); + assertEquals(3, recordCount); assertNotNull(currentMetric); assertEquals("cpu_user", currentMetric.getMetricName()); assertEquals("app1", currentMetric.getAppId()); @@ -495,7 +561,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { @Test public void testClusterAggregateMetricNormalization() throws Exception { TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false)); + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, getConfigurationForTest(false)); TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); // Sample data @@ -565,7 +631,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { fail("Unexpected entry"); } } - Assert.assertEquals(9, recordCount); + Assert.assertEquals(5, recordCount); } @Test http://git-wip-us.apache.org/repos/asf/ambari/blob/4c3be397/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java index 89fee7c..5e7234c 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java @@ -48,7 +48,6 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createMetricHostAggregate; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { @@ -205,7 +204,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { public void testGetClusterMetricRecordsSeconds() throws Exception { // GIVEN TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new Configuration()); + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, new Configuration()); long startTime = System.currentTimeMillis(); long ctime = startTime + 1; @@ -236,7 +235,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { TimelineMetric metric = timelineMetrics.getMetrics().get(0); assertEquals("disk_free", metric.getMetricName()); - assertEquals(8, metric.getMetricValues().size()); + assertEquals(5, metric.getMetricValues().size()); assertEquals(1.5, metric.getMetricValues().values().iterator().next(), 0.00001); } @@ -244,7 +243,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { public void testGetClusterMetricRecordLatestWithFunction() throws Exception { // GIVEN TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new Configuration()); + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, new Configuration()); long startTime = System.currentTimeMillis(); long ctime = startTime + 1; http://git-wip-us.apache.org/repos/asf/ambari/blob/4c3be397/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java index 7c7db9f..9100afd 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java @@ -40,6 +40,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.SortedMap; import java.util.TreeMap; @Singleton @@ -250,24 +251,16 @@ public class TimelineMetricCacheEntryFactory implements UpdatingCacheEntryFactor Long requestedStartTime, Long requestedEndTime, boolean removeAll) { for (TimelineMetric existingMetric : existingMetrics.getMetrics()) { - if(removeAll) { + if (removeAll) { existingMetric.setMetricValues(new TreeMap()); } else { - Map existingMetricValues = existingMetric.getMetricValues(); + TreeMap existingMetricValues = existingMetric.getMetricValues(); LOG.trace("Existing metric: " + existingMetric.getMetricName() + " # " + existingMetricValues.size()); - Iterator> valueIterator = existingMetricValues.entrySet().iterator(); - - // Remove old values - // Assumption: All return value are millis - while (valueIterator.hasNext()) { - Map.Entry metricEntry = valueIterator.next(); - if (metricEntry.getKey() < requestedStartTime - || metricEntry.getKey() > requestedEndTime) { - valueIterator.remove(); - } - } + // Retain only the values that are within the [requestStartTime, requestedEndTime] window + existingMetricValues.headMap(requestedStartTime,false).clear(); + existingMetricValues.tailMap(requestedEndTime, false).clear(); } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/4c3be397/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java index 895d24b..743273f 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java @@ -85,6 +85,7 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog { private static final String KAFKA_BROKER = "kafka-broker"; private static final String AMS_ENV = "ams-env"; private static final String AMS_HBASE_ENV = "ams-hbase-env"; + private static final String AMS_SITE = "ams-site"; private static final String HBASE_ENV_CONFIG = "hbase-env"; private static final String HIVE_SITE_CONFIG = "hive-site"; private static final String RANGER_ENV_CONFIG = "ranger-env"; @@ -834,6 +835,27 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog { newProperties.put("content", updateAmsHbaseEnvContent(content)); updateConfigurationPropertiesForCluster(cluster, AMS_HBASE_ENV, newProperties, true, true); } + Config amsSite = cluster.getDesiredConfigByType(AMS_SITE); + if (amsSite != null) { + Map newProperties = new HashMap<>(); + + //Interval + newProperties.put("timeline.metrics.cluster.aggregator.second.interval",String.valueOf(120)); + newProperties.put("timeline.metrics.cluster.aggregator.minute.interval",String.valueOf(300)); + newProperties.put("timeline.metrics.host.aggregator.minute.interval",String.valueOf(300)); + + //ttl + newProperties.put("timeline.metrics.cluster.aggregator.second.ttl", String.valueOf(2592000)); + newProperties.put("timeline.metrics.cluster.aggregator.minute.ttl", String.valueOf(7776000)); + + //checkpoint + newProperties.put("timeline.metrics.cluster.aggregator.second.checkpointCutOffMultiplier", String.valueOf(2)); + + //disabled + newProperties.put("timeline.metrics.cluster.aggregator.second.disabled", String.valueOf(false)); + + updateConfigurationPropertiesForCluster(cluster, AMS_SITE, newProperties, true, true); + } } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/4c3be397/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml index 89b584b..c73a401 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml @@ -59,11 +59,11 @@ timeline.metrics.host.aggregator.minute.interval - 120 + 300 Minute host aggregator interval Time in seconds to sleep for the minute resolution host based - aggregator. Default resolution is 2 minutes. + aggregator. Default resolution is 5 minutes. int @@ -111,10 +111,22 @@ timeline.metrics.cluster.aggregator.minute.interval - 120 + 300 Minute cluster aggregator interval Time in seconds to sleep for the minute resolution cluster wide + aggregator. Default resolution is 5 minutes. + + + int + + + + timeline.metrics.cluster.aggregator.second.interval + 120 + Second cluster aggregator interval + + Time in seconds to sleep for the second resolution cluster wide aggregator. Default resolution is 2 minutes. @@ -170,6 +182,19 @@ + timeline.metrics.cluster.aggregator.second.checkpointCutOffMultiplier + 2 + Second cluster aggregator checkpoint cutOff multiplier + + Multiplier value * interval = Max allowed checkpoint lag. Effectively + if aggregator checkpoint is greater than max allowed checkpoint delay, + the checkpoint will be discarded by the aggregator. + + + int + + + timeline.metrics.cluster.aggregator.minute.checkpointCutOffMultiplier 2 Minute cluster aggregator checkpoint cutOff multiplier @@ -238,11 +263,19 @@ - timeline.metrics.cluster.aggregator.minute.timeslice.interval + timeline.metrics.cluster.aggregator.second.disabled + false + Disable second cluster aggregator + + Disable cluster based second aggregations. + + + + timeline.metrics.cluster.aggregator.second.timeslice.interval 30 - Minute cluster aggregator timeslice interval + Second cluster aggregator timeslice interval - Lowest resolution of desired data for cluster level minute aggregates. + Lowest resolution of desired data for cluster level second aggregates. int @@ -270,9 +303,16 @@ - timeline.metrics.cluster.aggregator.minute.ttl + timeline.metrics.cluster.aggregator.second.ttl 2592000 + Cluster wide second resolution data purge interval. Default is 7 days. + + + + timeline.metrics.cluster.aggregator.minute.ttl + 7776000 + Cluster wide minute resolution data purge interval. Default is 30 days. http://git-wip-us.apache.org/repos/asf/ambari/blob/4c3be397/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/split_points.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/split_points.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/split_points.py index cd9c844..fa4deaf 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/split_points.py +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/split_points.py @@ -27,7 +27,7 @@ import ast metric_filename_ext = '.txt' # 5 regions for higher order aggregate tables -other_region_static_count = 5 +other_region_static_count = 6 # Max equidistant points to return per service max_equidistant_points = 50 http://git-wip-us.apache.org/repos/asf/ambari/blob/4c3be397/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java index fff720a..61050cc 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java @@ -18,6 +18,8 @@ package org.apache.ambari.server.upgrade; +import com.google.common.collect.Maps; +import com.google.gson.Gson; import com.google.inject.AbstractModule; import com.google.inject.Binder; import com.google.inject.Guice; @@ -26,9 +28,15 @@ import com.google.inject.Module; import com.google.inject.Provider; import com.google.inject.persist.PersistService; import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.actionmanager.ActionManager; import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.AmbariManagementController; +import org.apache.ambari.server.controller.AmbariManagementControllerImpl; +import org.apache.ambari.server.controller.ConfigurationRequest; +import org.apache.ambari.server.controller.ConfigurationResponse; +import org.apache.ambari.server.controller.KerberosHelper; +import org.apache.ambari.server.controller.MaintenanceStateHelper; import org.apache.ambari.server.orm.DBAccessor; import org.apache.ambari.server.orm.GuiceJpaInitializer; import org.apache.ambari.server.orm.InMemoryDefaultTestModule; @@ -76,6 +84,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import static org.easymock.EasyMock.anyLong; import static org.easymock.EasyMock.anyObject; @@ -89,6 +98,8 @@ import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.reset; import static org.easymock.EasyMock.verify; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * {@link org.apache.ambari.server.upgrade.UpgradeCatalog213} unit tests. @@ -593,6 +604,71 @@ public class UpgradeCatalog213Test { } @Test + public void testAmsSiteUpdateConfigs() throws Exception{ + + Map oldPropertiesAmsSite = new HashMap() { + { + //Including only those properties that might be present in an older version. + put("timeline.metrics.cluster.aggregator.minute.interval",String.valueOf(1000)); + put("timeline.metrics.host.aggregator.minute.interval",String.valueOf(1000)); + put("timeline.metrics.cluster.aggregator.minute.ttl", String.valueOf(1000)); + } + }; + Map newPropertiesAmsSite = new HashMap() { + { + put("timeline.metrics.cluster.aggregator.second.interval",String.valueOf(120)); + put("timeline.metrics.cluster.aggregator.minute.interval",String.valueOf(300)); + put("timeline.metrics.host.aggregator.minute.interval",String.valueOf(300)); + put("timeline.metrics.cluster.aggregator.second.ttl", String.valueOf(2592000)); + put("timeline.metrics.cluster.aggregator.minute.ttl", String.valueOf(7776000)); + put("timeline.metrics.cluster.aggregator.second.checkpointCutOffMultiplier", String.valueOf(2)); + put("timeline.metrics.cluster.aggregator.second.disabled", String.valueOf(false)); + } + }; + EasyMockSupport easyMockSupport = new EasyMockSupport(); + + Clusters clusters = easyMockSupport.createNiceMock(Clusters.class); + final Cluster cluster = easyMockSupport.createNiceMock(Cluster.class); + Config mockAmsSite = easyMockSupport.createNiceMock(Config.class); + + expect(clusters.getClusters()).andReturn(new HashMap() {{ + put("normal", cluster); + }}).once(); + expect(cluster.getDesiredConfigByType("ams-site")).andReturn(mockAmsSite).atLeastOnce(); + expect(mockAmsSite.getProperties()).andReturn(oldPropertiesAmsSite).times(1); + + Injector injector = easyMockSupport.createNiceMock(Injector.class); + expect(injector.getInstance(Gson.class)).andReturn(null).anyTimes(); + expect(injector.getInstance(MaintenanceStateHelper.class)).andReturn(null).anyTimes(); + expect(injector.getInstance(KerberosHelper.class)).andReturn(createNiceMock(KerberosHelper.class)).anyTimes(); + + replay(injector, clusters, mockAmsSite, cluster); + + AmbariManagementControllerImpl controller = createMockBuilder(AmbariManagementControllerImpl.class) + .addMockedMethod("createConfiguration") + .addMockedMethod("getClusters", new Class[] { }) + .withConstructor(createNiceMock(ActionManager.class), clusters, injector) + .createNiceMock(); + + Injector injector2 = easyMockSupport.createNiceMock(Injector.class); + Capture configurationRequestCapture = EasyMock.newCapture(); + ConfigurationResponse configurationResponseMock = easyMockSupport.createMock(ConfigurationResponse.class); + + expect(injector2.getInstance(AmbariManagementController.class)).andReturn(controller).anyTimes(); + expect(controller.getClusters()).andReturn(clusters).anyTimes(); + expect(controller.createConfiguration(capture(configurationRequestCapture))).andReturn(configurationResponseMock).once(); + + replay(controller, injector2, configurationResponseMock); + new UpgradeCatalog213(injector2).updateAMSConfigs(); + easyMockSupport.verifyAll(); + + ConfigurationRequest configurationRequest = configurationRequestCapture.getValue(); + Map updatedProperties = configurationRequest.getProperties(); + assertTrue(Maps.difference(newPropertiesAmsSite, updatedProperties).areEqual()); + + } + + @Test public void testUpdateAlertDefinitions() { EasyMockSupport easyMockSupport = new EasyMockSupport(); UpgradeCatalog213 upgradeCatalog213 = new UpgradeCatalog213(injector);