Return-Path: X-Original-To: apmail-ambari-commits-archive@www.apache.org Delivered-To: apmail-ambari-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 940B319401 for ; Fri, 4 Mar 2016 19:23:24 +0000 (UTC) Received: (qmail 83770 invoked by uid 500); 4 Mar 2016 19:23:24 -0000 Delivered-To: apmail-ambari-commits-archive@ambari.apache.org Received: (qmail 83739 invoked by uid 500); 4 Mar 2016 19:23:24 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 83730 invoked by uid 99); 4 Mar 2016 19:23:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2016 19:23:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4F97FE69FD; Fri, 4 Mar 2016 19:23:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: avijayan@apache.org To: commits@ambari.apache.org Message-Id: <69486d44dea14aba92d2471fc904fd3c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ambari git commit: AMBARI-15267 : Metrics aggregate times should be tied to aggregation period instead of AMS start time (avijayan) Date: Fri, 4 Mar 2016 19:23:24 +0000 (UTC) Repository: ambari Updated Branches: refs/heads/trunk 2c3bc30dc -> 8486be6aa AMBARI-15267 : Metrics aggregate times should be tied to aggregation period instead of AMS start time (avijayan) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/8486be6a Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/8486be6a Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/8486be6a Branch: refs/heads/trunk Commit: 8486be6aa1a79a8c123e41d5267cddb8504c1dbc Parents: 2c3bc30 Author: Aravindan Vijayan Authored: Fri Mar 4 11:23:14 2016 -0800 Committer: Aravindan Vijayan Committed: Fri Mar 4 11:23:19 2016 -0800 ---------------------------------------------------------------------- .../timeline/HBaseTimelineMetricStore.java | 50 ++--- .../aggregators/AbstractTimelineAggregator.java | 117 +++++------ .../aggregators/TimelineClusterMetric.java | 4 + .../aggregators/TimelineMetricAggregator.java | 9 +- .../TimelineMetricClusterAggregator.java | 6 +- .../TimelineMetricClusterAggregatorSecond.java | 6 +- .../TimelineMetricHostAggregator.java | 6 +- .../v2/TimelineMetricClusterAggregator.java | 2 +- .../v2/TimelineMetricHostAggregator.java | 2 +- .../timeline/query/PhoenixTransactSQL.java | 4 +- .../AbstractTimelineAggregatorTest.java | 204 +++++-------------- 11 files changed, 157 insertions(+), 253 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/8486be6a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java index 37e4796..f460292 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java @@ -48,6 +48,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES; public class HBaseTimelineMetricStore extends AbstractService implements TimelineMetricStore { @@ -92,58 +94,37 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin // Start the cluster aggregator second TimelineMetricAggregator secondClusterAggregator = TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hBaseAccessor, metricsConf, metricMetadataManager); - if (!secondClusterAggregator.isDisabled()) { - Thread aggregatorThread = new Thread(secondClusterAggregator); - aggregatorThread.start(); - } + scheduleAggregatorThread(secondClusterAggregator, metricsConf); - // Start the minute cluster aggregator +// // Start the minute cluster aggregator TimelineMetricAggregator minuteClusterAggregator = TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hBaseAccessor, metricsConf); - if (!minuteClusterAggregator.isDisabled()) { - Thread aggregatorThread = new Thread(minuteClusterAggregator); - aggregatorThread.start(); - } + scheduleAggregatorThread(minuteClusterAggregator, metricsConf); // Start the hourly cluster aggregator TimelineMetricAggregator hourlyClusterAggregator = TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hBaseAccessor, metricsConf); - if (!hourlyClusterAggregator.isDisabled()) { - Thread aggregatorThread = new Thread(hourlyClusterAggregator); - aggregatorThread.start(); - } + scheduleAggregatorThread(hourlyClusterAggregator, metricsConf); // Start the daily cluster aggregator TimelineMetricAggregator dailyClusterAggregator = TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hBaseAccessor, metricsConf); - if (!dailyClusterAggregator.isDisabled()) { - Thread aggregatorThread = new Thread(dailyClusterAggregator); - aggregatorThread.start(); - } + scheduleAggregatorThread(dailyClusterAggregator, metricsConf); // Start the minute host aggregator TimelineMetricAggregator minuteHostAggregator = TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hBaseAccessor, metricsConf); - if (!minuteHostAggregator.isDisabled()) { - Thread minuteAggregatorThread = new Thread(minuteHostAggregator); - minuteAggregatorThread.start(); - } + scheduleAggregatorThread(minuteHostAggregator, metricsConf); // Start the hourly host aggregator TimelineMetricAggregator hourlyHostAggregator = TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hBaseAccessor, metricsConf); - if (!hourlyHostAggregator.isDisabled()) { - Thread aggregatorHourlyThread = new Thread(hourlyHostAggregator); - aggregatorHourlyThread.start(); - } + scheduleAggregatorThread(hourlyHostAggregator, metricsConf); // Start the daily host aggregator TimelineMetricAggregator dailyHostAggregator = TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hBaseAccessor, metricsConf); - if (!dailyHostAggregator.isDisabled()) { - Thread aggregatorDailyThread = new Thread(dailyHostAggregator); - aggregatorDailyThread.start(); - } + scheduleAggregatorThread(dailyHostAggregator, metricsConf); if (!configuration.isTimelineMetricsServiceWatcherDisabled()) { int initDelay = configuration.getTimelineMetricsServiceWatcherInitDelay(); @@ -350,4 +331,15 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin public Map> getHostAppsMetadata() throws SQLException, IOException { return metricMetadataManager.getHostedAppsCache(); } + + private void scheduleAggregatorThread(TimelineMetricAggregator aggregator, + Configuration metricsConf) { + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + if (!aggregator.isDisabled()) { + executorService.scheduleAtFixedRate(aggregator, + SECONDS.toMillis(metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120)), + aggregator.getSleepIntervalMillis(), + TimeUnit.MILLISECONDS); + } + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/8486be6a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java index ea7bc27..c5b9ba1 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java @@ -22,8 +22,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; -import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.SystemClock; import org.slf4j.LoggerFactory; import org.slf4j.Logger; import java.io.File; @@ -44,7 +42,6 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti public abstract class AbstractTimelineAggregator implements TimelineMetricAggregator { protected final PhoenixHBaseAccessor hBaseAccessor; protected final Logger LOG; - private Clock clock; protected final long checkpointDelayMillis; protected final Integer resultsetFetchSize; protected Configuration metricsConf; @@ -55,25 +52,20 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg protected String tableName; protected String outputTableName; protected Long nativeTimeRangeDelay; + protected Long lastAggregatedEndTime = -1l; + // Explicitly name aggregators for logging needs private final String aggregatorName; AbstractTimelineAggregator(String aggregatorName, PhoenixHBaseAccessor hBaseAccessor, - Configuration metricsConf, Clock clk) { + Configuration metricsConf) { this.aggregatorName = aggregatorName; this.hBaseAccessor = hBaseAccessor; this.metricsConf = metricsConf; this.checkpointDelayMillis = SECONDS.toMillis(metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120)); this.resultsetFetchSize = metricsConf.getInt(RESULTSET_FETCH_SIZE, 2000); this.LOG = LoggerFactory.getLogger(aggregatorName); - this.clock = clk; - } - - AbstractTimelineAggregator(String aggregatorName, - PhoenixHBaseAccessor hBaseAccessor, - Configuration metricsConf) { - this(aggregatorName, hBaseAccessor, metricsConf, new SystemClock()); } public AbstractTimelineAggregator(String aggregatorName, @@ -100,84 +92,68 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg public void run() { LOG.info("Started Timeline aggregator thread @ " + new Date()); Long SLEEP_INTERVAL = getSleepIntervalMillis(); - - while (true) { - long sleepTime = runOnce(SLEEP_INTERVAL); - - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - LOG.info("Sleep interrupted, continuing with aggregation."); - } - } + runOnce(SLEEP_INTERVAL); + this.lastAggregatedEndTime = this.lastAggregatedEndTime + SLEEP_INTERVAL; } /** * Access relaxed for tests */ - public long runOnce(Long SLEEP_INTERVAL) { - long currentTime = clock.getTime(); - long lastCheckPointTime = readLastCheckpointSavingOnFirstRun(currentTime); - long sleepTime = SLEEP_INTERVAL; + public void runOnce(Long SLEEP_INTERVAL) { + long lastCheckPointTime = readLastCheckpointSavingOnFirstRun(); if (lastCheckPointTime != -1) { LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: " - + ((clock.getTime() - lastCheckPointTime) / 1000) + + ((lastAggregatedEndTime - lastCheckPointTime) / 1000) + " seconds."); - long startTime = clock.getTime(); boolean success = doWork(lastCheckPointTime, lastCheckPointTime + SLEEP_INTERVAL); - long executionTime = clock.getTime() - startTime; - long delta = SLEEP_INTERVAL - executionTime; - - if (delta > 0) { - // Sleep for (configured sleep - time to execute task) - sleepTime = delta; - } else { - // No sleep because last run took too long to execute - LOG.info("Aggregator execution took too long, " + - "cancelling sleep. executionTime = " + executionTime); - sleepTime = 1; - } - - LOG.debug("Aggregator sleep interval = " + sleepTime); if (success) { try { - // Comment to bug fix: - // cannot just save lastCheckPointTime + SLEEP_INTERVAL, - // it has to be verified so it is not a time in the future - // checkpoint says what was aggregated, and there is no way - // the future metrics were aggregated! - saveCheckPoint(Math.min(currentTime, lastCheckPointTime + - SLEEP_INTERVAL)); + saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL); } catch (IOException io) { LOG.warn("Error saving checkpoint, restarting aggregation at " + "previous checkpoint."); } } } - - return sleepTime; } - private long readLastCheckpointSavingOnFirstRun(long currentTime) { + private long readLastCheckpointSavingOnFirstRun() { long lastCheckPointTime = -1; try { lastCheckPointTime = readCheckPoint(); + LOG.info("Last Checkpoint read : " + new Date(lastCheckPointTime)); + + if (lastAggregatedEndTime == -1l) { + lastAggregatedEndTime = getRoundedAggregateTimeMillis(getSleepIntervalMillis()); + } + if (isLastCheckPointTooOld(lastCheckPointTime)) { LOG.warn("Last Checkpoint is too old, discarding last checkpoint. " + "lastCheckPointTime = " + new Date(lastCheckPointTime)); lastCheckPointTime = -1; } + + if (lastCheckPointTime > 0) { + lastCheckPointTime = getRoundedCheckPointTimeMillis(lastCheckPointTime, getSleepIntervalMillis()); + LOG.info("Rounded off checkpoint : " + new Date(lastCheckPointTime)); + } + + if (isLastCheckPointTooYoung(lastCheckPointTime)) { + LOG.info("Last checkpoint too recent for aggregation. Sleeping for 1 cycle."); + lastCheckPointTime = -1; + } + if (lastCheckPointTime == -1) { // Assuming first run, save checkpoint and sleep. - // Set checkpoint to 2 minutes in the past to allow the + // Set checkpoint to rounded time in the past to allow the // agents/collectors to catch up LOG.info("Saving checkpoint time on first run. " + - new Date((currentTime - checkpointDelayMillis))); - saveCheckPoint(currentTime - checkpointDelayMillis); + new Date((lastAggregatedEndTime))); + saveCheckPoint(lastAggregatedEndTime); } } catch (IOException io) { LOG.warn("Unable to write last checkpoint time. Resuming sleep.", io); @@ -189,8 +165,12 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg // first checkpoint is saved checkpointDelayMillis in the past, // so here we also need to take it into account return checkpoint != -1 && - ((clock.getTime() - checkpoint - checkpointDelayMillis) > - getCheckpointCutOffIntervalMillis()); + ((lastAggregatedEndTime - checkpoint) > getCheckpointCutOffIntervalMillis()); + } + + private boolean isLastCheckPointTooYoung(long checkpoint) { + return checkpoint != -1 && + ((lastAggregatedEndTime <= checkpoint)); } protected long readCheckPoint() { @@ -227,7 +207,6 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg * @param startTime Sample start time * @param endTime Sample end time */ - @Override public boolean doWork(long startTime, long endTime) { LOG.info("Start aggregation cycle @ " + new Date() + ", " + "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime)); @@ -292,10 +271,14 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg protected abstract void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException; - protected Long getSleepIntervalMillis() { + public Long getSleepIntervalMillis() { return sleepIntervalMillis; } + public void setSleepIntervalMillis(Long sleepIntervalMillis) { + this.sleepIntervalMillis = sleepIntervalMillis; + } + protected Integer getCheckpointCutOffMultiplier() { return checkpointCutOffMultiplier; } @@ -324,4 +307,22 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg protected String getCheckpointLocation() { return checkpointLocation; } + + protected void setLastAggregatedEndTime(long lastAggregatedEndTime) { + this.lastAggregatedEndTime = lastAggregatedEndTime; + } + + protected long getLastAggregatedEndTime() { + return lastAggregatedEndTime; + } + + public static long getRoundedCheckPointTimeMillis(long referenceTime, long aggregatorPeriod) { + return referenceTime - (referenceTime % aggregatorPeriod); + } + + public static long getRoundedAggregateTimeMillis(long aggregatorPeriod) { + long currentTime = System.currentTimeMillis(); + return currentTime - (currentTime % aggregatorPeriod); + } + } http://git-wip-us.apache.org/repos/asf/ambari/blob/8486be6a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java index 3c30a6f..b7d9110 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java @@ -94,4 +94,8 @@ public class TimelineClusterMetric { ", timestamp=" + timestamp + '}'; } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/8486be6a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java index 96be48d..295db0e 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java @@ -31,4 +31,11 @@ public interface TimelineMetricAggregator extends Runnable { * @return true/false */ public boolean isDisabled(); -} + + /** + * Return aggregator Interval + * @return Interval in Millis + */ + public Long getSleepIntervalMillis(); + + } http://git-wip-us.apache.org/repos/asf/ambari/blob/8486be6a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java index 2fd9aa8..f90b01f 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java @@ -75,13 +75,13 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator @Override protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException { - Map hostAggregateMap = aggregateMetricsFromResultSet(rs); + Map hostAggregateMap = aggregateMetricsFromResultSet(rs, endTime); LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates."); hBaseAccessor.saveClusterTimeAggregateRecords(hostAggregateMap, outputTableName); } - private Map aggregateMetricsFromResultSet(ResultSet rs) + private Map aggregateMetricsFromResultSet(ResultSet rs, long endTime) throws IOException, SQLException { TimelineClusterMetric existingMetric = null; @@ -100,6 +100,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator if (existingMetric == null) { // First row existingMetric = currentMetric; + currentMetric.setTimestamp(endTime); hostAggregate = new MetricHostAggregate(); hostAggregateMap.put(currentMetric, hostAggregate); } @@ -111,6 +112,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator } else { // Switched over to a new metric - save existing hostAggregate = new MetricHostAggregate(); + currentMetric.setTimestamp(endTime); updateAggregatesFromHost(hostAggregate, currentHostAggregate); hostAggregateMap.put(currentMetric, hostAggregate); existingMetric = currentMetric; http://git-wip-us.apache.org/repos/asf/ambari/blob/8486be6a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java index ec141e7..de90685 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java @@ -236,12 +236,12 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre } /** - * Return beginning of the time slice into which the metric fits. + * Return end of the time slice into which the metric fits. */ private Long getSliceTimeForMetric(List timeSlices, Long timestamp) { for (Long[] timeSlice : timeSlices) { - if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) { - return timeSlice[0]; + if (timestamp > timeSlice[0] && timestamp <= timeSlice[1]) { + return timeSlice[1]; } } return -1l; http://git-wip-us.apache.org/repos/asf/ambari/blob/8486be6a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java index b85cd6f..26e73b0 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java @@ -53,7 +53,7 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { @Override protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException { - Map hostAggregateMap = aggregateMetricsFromResultSet(rs); + Map hostAggregateMap = aggregateMetricsFromResultSet(rs, endTime); LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates."); hBaseAccessor.saveHostAggregateRecords(hostAggregateMap, outputTableName); @@ -76,7 +76,7 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { return condition; } - private Map aggregateMetricsFromResultSet(ResultSet rs) + private Map aggregateMetricsFromResultSet(ResultSet rs, long endTime) throws IOException, SQLException { TimelineMetric existingMetric = null; MetricHostAggregate hostAggregate = null; @@ -92,6 +92,7 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { if (existingMetric == null) { // First row existingMetric = currentMetric; + currentMetric.setTimestamp(endTime); hostAggregate = new MetricHostAggregate(); hostAggregateMap.put(currentMetric, hostAggregate); } @@ -101,6 +102,7 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { hostAggregate.updateAggregates(currentHostAggregate); } else { // Switched over to a new metric - save existing - create new aggregate + currentMetric.setTimestamp(endTime); hostAggregate = new MetricHostAggregate(); hostAggregate.updateAggregates(currentHostAggregate); hostAggregateMap.put(currentMetric, hostAggregate); http://git-wip-us.apache.org/repos/asf/ambari/blob/8486be6a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java index 3b8406b..c056d79 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java @@ -69,7 +69,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator */ condition.setStatement(String.format(GET_AGGREGATED_APP_METRIC_GROUPBY_SQL, - getQueryHint(startTime), outputTableName, aggregateColumnName, tableName, + getQueryHint(startTime), outputTableName, endTime, aggregateColumnName, tableName, startTime, endTime)); if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/ambari/blob/8486be6a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java index ff32e43..118c695 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java @@ -60,7 +60,7 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { condition.setDoUpdate(true); condition.setStatement(String.format(GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL, - getQueryHint(startTime), outputTableName, tableName, startTime, endTime)); + getQueryHint(startTime), outputTableName, endTime, tableName, startTime, endTime)); if (LOG.isDebugEnabled()) { LOG.debug("Condition: " + condition.toString()); http://git-wip-us.apache.org/repos/asf/ambari/blob/8486be6a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java index 89e8c34..4f27167 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java @@ -259,7 +259,7 @@ public class PhoenixTransactSQL { public static final String GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL = "UPSERT %s " + "INTO %s (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, UNITS, " + "METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) " + - "SELECT METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, MAX(SERVER_TIME), UNITS, " + + "SELECT METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, %s AS SERVER_TIME, UNITS, " + "SUM(METRIC_SUM), SUM(METRIC_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN) " + "FROM %s WHERE SERVER_TIME >= %s AND SERVER_TIME < %s " + "GROUP BY METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, UNITS"; @@ -271,7 +271,7 @@ public class PhoenixTransactSQL { public static final String GET_AGGREGATED_APP_METRIC_GROUPBY_SQL = "UPSERT %s " + "INTO %s (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, UNITS, " + "METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) SELECT METRIC_NAME, APP_ID, " + - "INSTANCE_ID, MAX(SERVER_TIME), UNITS, SUM(METRIC_SUM), SUM(%s), " + + "INSTANCE_ID, %s AS SERVER_TIME, UNITS, SUM(METRIC_SUM), SUM(%s), " + "MAX(METRIC_MAX), MIN(METRIC_MIN) FROM %s WHERE SERVER_TIME >= %s AND " + "SERVER_TIME < %s GROUP BY METRIC_NAME, APP_ID, INSTANCE_ID, UNITS"; http://git-wip-us.apache.org/repos/asf/ambari/blob/8486be6a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java index 2b29469..8f7320b 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java @@ -18,10 +18,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; -import org.apache.hadoop.yarn.util.Clock; import org.junit.Before; import org.junit.Test; import java.io.IOException; @@ -35,7 +32,6 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti public class AbstractTimelineAggregatorTest { private AbstractTimelineAggregator agg; - TestClock clock = new TestClock(); AtomicLong startTimeInDoWork; AtomicLong endTimeInDoWork; @@ -47,7 +43,7 @@ public class AbstractTimelineAggregatorTest { @Before public void setUp() throws Exception { - sleepIntervalMillis = 30000l; + sleepIntervalMillis = 2*2*30000l; //2 minutes checkpointCutOffMultiplier = 2; Configuration metricsConf = new Configuration(); @@ -59,7 +55,7 @@ public class AbstractTimelineAggregatorTest { checkPoint = new AtomicLong(-1); actualRuns = 0; - agg = new AbstractTimelineAggregator("TimelineAggregatorTest", null, metricsConf, clock) { + agg = new AbstractTimelineAggregator("TimelineAggregatorTest", null, metricsConf) { @Override public boolean doWork(long startTime, long endTime) { startTimeInDoWork.set(startTime); @@ -81,7 +77,7 @@ public class AbstractTimelineAggregatorTest { } @Override - protected Long getSleepIntervalMillis() { + public Long getSleepIntervalMillis() { return sleepIntervalMillis; } @@ -110,167 +106,67 @@ public class AbstractTimelineAggregatorTest { } }; - } @Test public void testDoWorkOnZeroDelay() throws Exception { - // starting at time 0; - clock.setTime(0); + long currentTime = System.currentTimeMillis(); + long roundedOffAggregatorTime = AbstractTimelineAggregator.getRoundedAggregateTimeMillis(currentTime); - long sleep = agg.runOnce(sleepIntervalMillis); + //Test first run of aggregator with no checkpoint + agg.setLastAggregatedEndTime(roundedOffAggregatorTime); + agg.runOnce(sleepIntervalMillis); assertEquals("startTime should be zero", 0, startTimeInDoWork.get()); assertEquals("endTime should be zero", 0, endTimeInDoWork.get()); - assertEquals(0, checkPoint.get()); - assertEquals(sleep, sleepIntervalMillis); + assertEquals(roundedOffAggregatorTime, checkPoint.get()); assertEquals("Do not aggregate on first run", 0, actualRuns); - // exactly one sleepInterval - clock.setTime(clock.getTime() + sleepIntervalMillis); - sleep = agg.runOnce(sleepIntervalMillis); - assertEquals("startTime", clock.getTime() - - sleepIntervalMillis, - startTimeInDoWork.get()); - assertEquals("endTime", clock.getTime(), - endTimeInDoWork.get()); - assertEquals(clock.getTime(), checkPoint.get()); - assertEquals(sleep, sleepIntervalMillis); - assertEquals(1, actualRuns); - - // exactly one sleepInterval - clock.setTime(clock.getTime() + sleepIntervalMillis); - sleep = agg.runOnce(sleepIntervalMillis); - assertEquals("startTime", clock.getTime() - - sleepIntervalMillis, - startTimeInDoWork.get()); - assertEquals("endTime", clock.getTime(), - endTimeInDoWork.get()); - assertEquals(clock.getTime(), checkPoint.get()); - assertEquals(sleep, sleepIntervalMillis); - assertEquals(2, actualRuns); - - // checkpointCutOffMultiplier x sleepInterval - should pass, - // it will aggregate only first part of the whole 2x interval - // and sleep as usual (don't we need to skip some sleep?) - // - // effectively checkpoint will be one interval in the past, - // so next run will - clock.setTime(clock.getTime() + (checkpointCutOffMultiplier * - sleepIntervalMillis)); - sleep = agg.runOnce(sleepIntervalMillis); - assertEquals("startTime after 2xinterval", clock.getTime() - - (checkpointCutOffMultiplier * sleepIntervalMillis), - startTimeInDoWork.get()); - assertEquals("endTime after 2xinterval", clock.getTime() - - sleepIntervalMillis, - endTimeInDoWork.get()); - assertEquals("checkpoint after 2xinterval", clock.getTime() - - sleepIntervalMillis, checkPoint.get()); - assertEquals(sleep, sleepIntervalMillis); - assertEquals(3, actualRuns); - - // exactly one sleepInterval after one that lagged by one whole interval, - // so it will do the previous one... and sleep as usual - // no way to keep up - clock.setTime(clock.getTime() + sleepIntervalMillis); - sleep = agg.runOnce(sleepIntervalMillis); - assertEquals("startTime ", clock.getTime() - - (checkpointCutOffMultiplier * sleepIntervalMillis), - startTimeInDoWork.get()); - assertEquals("endTime ", clock.getTime() - - sleepIntervalMillis, - endTimeInDoWork.get()); - assertEquals("checkpoint ", clock.getTime() - sleepIntervalMillis, - checkPoint.get()); - assertEquals(sleep, sleepIntervalMillis); - assertEquals(4, actualRuns); - - - // checkpointCutOffMultiplier x sleepInterval - in normal state should pass, - // but the clock lags too much, so this will not execute aggregation - // just update checkpoint to currentTime - clock.setTime(clock.getTime() + (checkpointCutOffMultiplier * - sleepIntervalMillis)); - sleep = agg.runOnce(sleepIntervalMillis); - assertEquals(4, actualRuns); - assertEquals("checkpoint after too much lag is reset to " + - "current clock time", - clock.getTime(), checkPoint.get()); - assertEquals(sleep, sleepIntervalMillis); - } - - @Test - public void testDoWorkOnInterruptedRuns() throws Exception { - // start at some non-zero arbitrarily selected time; - int startingTime = 10000; - - // 1. - clock.setTime(startingTime); - long timeOfFirstStep = clock.getTime(); - long sleep = agg.runOnce(sleepIntervalMillis); + //Test first run with Too Old checkpoint + checkPoint.set(currentTime - 5*60*1000); //Old checkpoint + agg.runOnce(sleepIntervalMillis); assertEquals("startTime should be zero", 0, startTimeInDoWork.get()); assertEquals("endTime should be zero", 0, endTimeInDoWork.get()); - assertEquals("do not aggregate on first run", 0, actualRuns); - assertEquals("first checkpoint set on current time", timeOfFirstStep, - checkPoint.get()); - assertEquals(sleep, sleepIntervalMillis); - - // 2. - // the doWork was fast, and sleep was interrupted (e.g. restart) - // Q: do we want to aggregate just part of the system? maybe we should - // sleep up to next cycle start!! - clock.setTime(timeOfFirstStep + 1); - long timeOfSecondStep = clock.getTime(); - sleep = agg.runOnce(sleepIntervalMillis); - assertEquals("startTime should be on previous checkpoint since it did not" + - " run yet", - timeOfFirstStep, startTimeInDoWork.get()); - - assertEquals("endTime can be start + interval", - startingTime + sleepIntervalMillis, - endTimeInDoWork.get()); - assertEquals("should aggregate", 1, actualRuns); - assertEquals("checkpoint here should be set to min(endTime,currentTime), " + - "it is currentTime in our scenario", - timeOfSecondStep, checkPoint.get()); - - assertEquals(sleep, sleepIntervalMillis); - - //3. - // and again not a full sleep passed, so only small part was aggregated - clock.setTime(startingTime + 2); - long timeOfThirdStep = clock.getTime(); + assertEquals(roundedOffAggregatorTime, checkPoint.get()); + assertEquals("Do not aggregate on first run", 0, actualRuns); - sleep = agg.runOnce(sleepIntervalMillis); - // startTime and endTime are both be in the future, makes no sens, - // query will not work!! - assertEquals("startTime should be previous checkpoint", - timeOfSecondStep, startTimeInDoWork.get()); + //Test first run with too "recent" checkpoint + currentTime = System.currentTimeMillis(); + checkPoint.set(currentTime - 30000); + agg.setLastAggregatedEndTime(-1l); + agg.setSleepIntervalMillis(sleepIntervalMillis); + agg.runOnce(sleepIntervalMillis); + assertEquals("startTime should be zero", 0, startTimeInDoWork.get()); + assertEquals("endTime should be zero", 0, endTimeInDoWork.get()); + assertEquals(agg.getLastAggregatedEndTime(), checkPoint.get()); + assertEquals("Do not aggregate on first run", 0, actualRuns); - assertEquals("endTime can be start + interval", - timeOfSecondStep + sleepIntervalMillis, - endTimeInDoWork.get()); - assertEquals("should aggregate", 2, actualRuns); - assertEquals("checkpoint here should be set to min(endTime,currentTime), " + - "it is currentTime in our scenario", - timeOfThirdStep, + //Test first run with perfect checkpoint (sleepIntervalMillis back) + long checkPointTime = roundedOffAggregatorTime - sleepIntervalMillis; + long expectedCheckPoint = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(checkPointTime, sleepIntervalMillis); + checkPoint.set(checkPointTime); + agg.setLastAggregatedEndTime(-1l); + agg.runOnce(sleepIntervalMillis); + assertEquals("startTime should the lower rounded time of the checkpoint time", + expectedCheckPoint, startTimeInDoWork.get()); + assertEquals("endTime should the lower rounded time of the checkpoint time + sleepIntervalMillis", + expectedCheckPoint + sleepIntervalMillis, endTimeInDoWork.get()); + assertEquals(expectedCheckPoint + sleepIntervalMillis, checkPoint.get()); - assertEquals(sleep, sleepIntervalMillis); - - } - - private static class TestClock implements Clock { - - private long time; - - public void setTime(long time) { - this.time = time; - } + assertEquals("Aggregate on first run", 1, actualRuns); + + //Test edge case for checkpoint (2 x sleepIntervalMillis) + checkPointTime = roundedOffAggregatorTime - 2*sleepIntervalMillis; + expectedCheckPoint = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(checkPointTime, sleepIntervalMillis); + checkPoint.set(checkPointTime); + agg.runOnce(sleepIntervalMillis); + assertEquals("startTime should the lower rounded time of the checkpoint time", + expectedCheckPoint, startTimeInDoWork.get()); + assertEquals("startTime should the lower rounded time of the checkpoint time + sleepIntervalMillis", + expectedCheckPoint + sleepIntervalMillis, endTimeInDoWork.get()); + assertEquals(expectedCheckPoint + sleepIntervalMillis, + checkPoint.get()); + assertEquals("Do not aggregate on first run", 2, actualRuns); - @Override - public long getTime() { - return time; - } - } + } } \ No newline at end of file