Return-Path: X-Original-To: apmail-ambari-commits-archive@www.apache.org Delivered-To: apmail-ambari-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CFFD710C16 for ; Tue, 25 Nov 2014 17:29:36 +0000 (UTC) Received: (qmail 50346 invoked by uid 500); 25 Nov 2014 17:29:36 -0000 Delivered-To: apmail-ambari-commits-archive@ambari.apache.org Received: (qmail 50310 invoked by uid 500); 25 Nov 2014 17:29:36 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 50297 invoked by uid 99); 25 Nov 2014 17:29:36 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Nov 2014 17:29:36 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 68A7AA19D5D; Tue, 25 Nov 2014 17:29:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: swagle@apache.org To: commits@ambari.apache.org Date: Tue, 25 Nov 2014 17:29:37 -0000 Message-Id: In-Reply-To: <99496c7f70234059859811ae2f617c57@git.apache.org> References: <99496c7f70234059859811ae2f617c57@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] ambari git commit: AMBARI-7680. Implement the Metric Collector using ATS. Unit tests. AMBARI-7680. Implement the Metric Collector using ATS. Unit tests. Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/1d817954 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/1d817954 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/1d817954 Branch: refs/heads/branch-metrics-dev Commit: 1d8179543a8b35ce1d27962f44494efa75acf9bc Parents: 3b877ac Author: Siddharth Wagle Authored: Tue Nov 25 09:29:17 2014 -0800 Committer: Siddharth Wagle Committed: Tue Nov 25 09:29:17 2014 -0800 ---------------------------------------------------------------------- .../pom.xml | 26 +- .../timeline/AbstractTimelineAggregator.java | 255 +++---------- .../timeline/DefaultPhoenixDataSource.java | 1 + .../timeline/HBaseTimelineMetricStore.java | 24 +- .../metrics/timeline/MetricAggregate.java | 110 ++++++ .../timeline/MetricClusterAggregate.java | 74 ++++ .../metrics/timeline/MetricHostAggregate.java | 81 ++++ .../metrics/timeline/PhoenixHBaseAccessor.java | 34 +- .../metrics/timeline/TimelineClusterMetric.java | 97 +++++ .../timeline/TimelineMetricAggregator.java | 147 ++++++++ .../TimelineMetricAggregatorFactory.java | 89 +++++ .../TimelineMetricAggregatorHourly.java | 198 ---------- .../TimelineMetricAggregatorMinute.java | 181 --------- .../TimelineMetricClusterAggregator.java | 235 ++++-------- .../TimelineMetricClusterAggregatorHourly.java | 98 ++--- .../timeline/TimelineMetricConfiguration.java | 3 + .../TestApplicationHistoryServer.java | 31 +- .../timeline/AbstractMiniHBaseClusterTest.java | 141 ++++--- .../AbstractPhoenixConnectionlessTest.java | 113 ++++++ .../metrics/timeline/ITClusterAggregator.java | 376 +++++++++++++++++++ .../metrics/timeline/ITMetricAggregator.java | 298 +++++++++++++++ .../metrics/timeline/TestClusterAggregator.java | 275 -------------- .../metrics/timeline/TestClusterSuite.java | 32 ++ .../metrics/timeline/TestHBaseAccessor.java | 332 ---------------- .../timeline/TestMetricHostAggregate.java | 19 +- .../timeline/TestPhoenixTransactSQL.java | 2 +- .../src/test/resources/hbase-default.xml | 36 ++ .../src/test/resources/log4j.properties | 1 + .../src/test/resources/logging.properties | 3 + 29 files changed, 1756 insertions(+), 1556 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml index 7efdb6b..ae2872d 100644 --- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml +++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml @@ -214,7 +214,18 @@ - + + maven-surefire-plugin + + always + + + java.util.logging.config.file + src/test/resources/logging.properties + + + + @@ -470,6 +481,19 @@ test tests + + org.apache.hbase + hbase-testing-util + 0.98.4-hadoop2 + test + true + + + org.jruby + jruby-complete + + + org.powermock powermock-module-junit4 http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java index 43ec648..b3c1af9 100644 --- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java +++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java @@ -21,16 +21,14 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import org.codehaus.jackson.annotate.JsonCreator; -import org.codehaus.jackson.annotate.JsonProperty; -import org.codehaus.jackson.annotate.JsonSubTypes; -import org.codehaus.jackson.map.ObjectMapper; import java.io.File; import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; import java.util.Date; import static java.util.concurrent.TimeUnit.SECONDS; @@ -42,15 +40,10 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics public abstract class AbstractTimelineAggregator implements Runnable { protected final PhoenixHBaseAccessor hBaseAccessor; private final Log LOG; - private static final ObjectMapper mapper; protected final long checkpointDelayMillis; protected final Integer resultsetFetchSize; protected Configuration metricsConf; - static { - mapper = new ObjectMapper(); - } - public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { this.hBaseAccessor = hBaseAccessor; @@ -162,204 +155,78 @@ public abstract class AbstractTimelineAggregator implements Runnable { FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime)); } - // TODO: Abstract out doWork implementation for cluster and host levels - protected abstract boolean doWork(long startTime, long endTime); - - protected abstract Long getSleepIntervalMillis(); - - protected abstract Integer getCheckpointCutOffMultiplier(); - - protected Long getCheckpointCutOffIntervalMillis() { - return getCheckpointCutOffMultiplier() * getSleepIntervalMillis(); - } - - protected abstract boolean isDisabled(); - - protected abstract String getCheckpointLocation(); - - @JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class), - @JsonSubTypes.Type(value = MetricHostAggregate.class)}) - @InterfaceAudience.Public - @InterfaceStability.Unstable - public static class MetricAggregate { - protected Double sum = 0.0; - protected Double deviation; - protected Double max = Double.MIN_VALUE; - protected Double min = Double.MAX_VALUE; - - public MetricAggregate() { - } + /** + * Read metrics written during the time interval and save the sum and total + * in the aggregate table. + * + * @param startTime Sample start time + * @param endTime Sample end time + */ + protected boolean doWork(long startTime, long endTime) { + LOG.info("Start aggregation cycle @ " + new Date() + ", " + + "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime)); - protected MetricAggregate(Double sum, Double deviation, Double max, - Double min) { - this.sum = sum; - this.deviation = deviation; - this.max = max; - this.min = min; - } + boolean success = true; + PhoenixTransactSQL.Condition condition = + prepareMetricQueryCondition(startTime, endTime); - void updateSum(Double sum) { - this.sum += sum; - } + Connection conn = null; + PreparedStatement stmt = null; - void updateMax(Double max) { - if (max > this.max) { - this.max = max; + try { + conn = hBaseAccessor.getConnection(); + stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); + + LOG.debug("Query issued @: " + new Date()); + ResultSet rs = stmt.executeQuery(); + LOG.debug("Query returned @: " + new Date()); + + aggregate(rs, startTime, endTime); + LOG.info("End aggregation cycle @ " + new Date()); + + } catch (SQLException e) { + LOG.error("Exception during aggregating metrics.", e); + success = false; + } catch (IOException e) { + LOG.error("Exception during aggregating metrics.", e); + success = false; + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // Ignore + } } - } - - void updateMin(Double min) { - if (min < this.min) { - this.min = min; + if (conn != null) { + try { + conn.close(); + } catch (SQLException sql) { + // Ignore + } } } - @JsonProperty("sum") - Double getSum() { - return sum; - } - - @JsonProperty("deviation") - Double getDeviation() { - return deviation; - } - - @JsonProperty("max") - Double getMax() { - return max; - } - - @JsonProperty("min") - Double getMin() { - return min; - } - - public void setSum(Double sum) { - this.sum = sum; - } - - public void setDeviation(Double deviation) { - this.deviation = deviation; - } - - public void setMax(Double max) { - this.max = max; - } - - public void setMin(Double min) { - this.min = min; - } - - public String toJSON() throws IOException { - return mapper.writeValueAsString(this); - } + LOG.info("End aggregation cycle @ " + new Date()); + return success; } - public static class MetricClusterAggregate extends MetricAggregate { - private int numberOfHosts; - - @JsonCreator - public MetricClusterAggregate() { - } + protected abstract PhoenixTransactSQL.Condition + prepareMetricQueryCondition(long startTime, long endTime); - MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation, - Double max, Double min) { - super(sum, deviation, max, min); - this.numberOfHosts = numberOfHosts; - } + protected abstract void aggregate(ResultSet rs, long startTime, long endTime) + throws IOException, SQLException; - @JsonProperty("numberOfHosts") - int getNumberOfHosts() { - return numberOfHosts; - } - - void updateNumberOfHosts(int count) { - this.numberOfHosts += count; - } - - public void setNumberOfHosts(int numberOfHosts) { - this.numberOfHosts = numberOfHosts; - } + protected abstract Long getSleepIntervalMillis(); - /** - * Find and update min, max and avg for a minute - */ - void updateAggregates(MetricClusterAggregate hostAggregate) { - updateMax(hostAggregate.getMax()); - updateMin(hostAggregate.getMin()); - updateSum(hostAggregate.getSum()); - updateNumberOfHosts(hostAggregate.getNumberOfHosts()); - } + protected abstract Integer getCheckpointCutOffMultiplier(); - @Override - public String toString() { -// MetricClusterAggregate - return "MetricAggregate{" + - "sum=" + sum + - ", numberOfHosts=" + numberOfHosts + - ", deviation=" + deviation + - ", max=" + max + - ", min=" + min + - '}'; - } + protected Long getCheckpointCutOffIntervalMillis() { + return getCheckpointCutOffMultiplier() * getSleepIntervalMillis(); } - /** - * Represents a collection of minute based aggregation of values for - * resolution greater than a minute. - */ - public static class MetricHostAggregate extends MetricAggregate { - - private long numberOfSamples = 0; - - @JsonCreator - public MetricHostAggregate() { - super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE); - } - - public MetricHostAggregate(Double sum, int numberOfSamples, - Double deviation, - Double max, Double min) { - super(sum, deviation, max, min); - this.numberOfSamples = numberOfSamples; - } - - @JsonProperty("numberOfSamples") - long getNumberOfSamples() { - return numberOfSamples == 0 ? 1 : numberOfSamples; - } - - void updateNumberOfSamples(long count) { - this.numberOfSamples += count; - } - - public void setNumberOfSamples(long numberOfSamples) { - this.numberOfSamples = numberOfSamples; - } - - public double getAvg() { - return sum / numberOfSamples; - } + protected abstract boolean isDisabled(); - /** - * Find and update min, max and avg for a minute - */ - void updateAggregates(MetricHostAggregate hostAggregate) { - updateMax(hostAggregate.getMax()); - updateMin(hostAggregate.getMin()); - updateSum(hostAggregate.getSum()); - updateNumberOfSamples(hostAggregate.getNumberOfSamples()); - } + protected abstract String getCheckpointLocation(); - @Override - public String toString() { - return "MetricHostAggregate{" + - "sum=" + sum + - ", numberOfSamples=" + numberOfSamples + - ", deviation=" + deviation + - ", max=" + max + - ", min=" + min + - '}'; - } - } } http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java index c20dd14..679ee36 100644 --- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java +++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java @@ -69,6 +69,7 @@ public class DefaultPhoenixDataSource implements ConnectionProvider { LOG.debug("Metric store connection url: " + url); try { // TODO: Exception is swallowed, it should be thrown - discuss it + connection = DriverManager.getConnection(url); } catch (SQLException e) { LOG.warn("Unable to connect to HBase store using Phoenix.", e); http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java index d2b96ec..a3eb731 100644 --- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -24,6 +25,7 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; + import java.io.IOException; import java.net.URL; import java.sql.SQLException; @@ -31,9 +33,13 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_SITE_CONFIGURATION_FILE; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.METRICS_SITE_CONFIGURATION_FILE; + +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .timeline.PhoenixTransactSQL.Condition; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .timeline.TimelineMetricConfiguration.HBASE_SITE_CONFIGURATION_FILE; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .timeline.TimelineMetricConfiguration.METRICS_SITE_CONFIGURATION_FILE; public class HBaseTimelineMetricStore extends AbstractService implements TimelineMetricStore { @@ -82,7 +88,7 @@ public class HBaseTimelineMetricStore extends AbstractService Configuration metricsConf) { hBaseAccessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf); hBaseAccessor.initMetricSchema(); - +//...BUG... // Start the cluster aggregator TimelineMetricClusterAggregator minuteClusterAggregator = new TimelineMetricClusterAggregator(hBaseAccessor, metricsConf); @@ -100,16 +106,18 @@ public class HBaseTimelineMetricStore extends AbstractService } // Start the 5 minute aggregator - TimelineMetricAggregatorMinute minuteHostAggregator = - new TimelineMetricAggregatorMinute(hBaseAccessor, metricsConf); + TimelineMetricAggregator minuteHostAggregator = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute + (hBaseAccessor, metricsConf); if (!minuteHostAggregator.isDisabled()) { Thread minuteAggregatorThread = new Thread(minuteHostAggregator); minuteAggregatorThread.start(); } // Start hourly host aggregator - TimelineMetricAggregatorHourly hourlyHostAggregator = - new TimelineMetricAggregatorHourly(hBaseAccessor, metricsConf); + TimelineMetricAggregator hourlyHostAggregator = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly + (hBaseAccessor, metricsConf); if (!hourlyHostAggregator.isDisabled()) { Thread aggregatorHourlyThread = new Thread(hourlyHostAggregator); aggregatorHourlyThread.start(); http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java new file mode 100644 index 0000000..61e15d7 --- /dev/null +++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.annotate.JsonSubTypes; +import org.codehaus.jackson.map.ObjectMapper; + +import java.io.IOException; + +/** +* +*/ +@JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class), + @JsonSubTypes.Type(value = MetricHostAggregate.class)}) +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class MetricAggregate { + private static final ObjectMapper mapper = new ObjectMapper(); + + protected Double sum = 0.0; + protected Double deviation; + protected Double max = Double.MIN_VALUE; + protected Double min = Double.MAX_VALUE; + + public MetricAggregate() { + } + + MetricAggregate(Double sum, Double deviation, Double max, + Double min) { + this.sum = sum; + this.deviation = deviation; + this.max = max; + this.min = min; + } + + void updateSum(Double sum) { + this.sum += sum; + } + + void updateMax(Double max) { + if (max > this.max) { + this.max = max; + } + } + + void updateMin(Double min) { + if (min < this.min) { + this.min = min; + } + } + + @JsonProperty("sum") + Double getSum() { + return sum; + } + + @JsonProperty("deviation") + Double getDeviation() { + return deviation; + } + + @JsonProperty("max") + Double getMax() { + return max; + } + + @JsonProperty("min") + Double getMin() { + return min; + } + + public void setSum(Double sum) { + this.sum = sum; + } + + public void setDeviation(Double deviation) { + this.deviation = deviation; + } + + public void setMax(Double max) { + this.max = max; + } + + public void setMin(Double min) { + this.min = min; + } + + public String toJSON() throws IOException { + return mapper.writeValueAsString(this); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java new file mode 100644 index 0000000..c13c85f --- /dev/null +++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + + +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + +/** +* +*/ +public class MetricClusterAggregate extends MetricAggregate { + private int numberOfHosts; + + @JsonCreator + public MetricClusterAggregate() { + } + + MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation, + Double max, Double min) { + super(sum, deviation, max, min); + this.numberOfHosts = numberOfHosts; + } + + @JsonProperty("numberOfHosts") + int getNumberOfHosts() { + return numberOfHosts; + } + + void updateNumberOfHosts(int count) { + this.numberOfHosts += count; + } + + public void setNumberOfHosts(int numberOfHosts) { + this.numberOfHosts = numberOfHosts; + } + + /** + * Find and update min, max and avg for a minute + */ + void updateAggregates(MetricClusterAggregate hostAggregate) { + updateMax(hostAggregate.getMax()); + updateMin(hostAggregate.getMin()); + updateSum(hostAggregate.getSum()); + updateNumberOfHosts(hostAggregate.getNumberOfHosts()); + } + + @Override + public String toString() { +// MetricClusterAggregate + return "MetricAggregate{" + + "sum=" + sum + + ", numberOfHosts=" + numberOfHosts + + ", deviation=" + deviation + + ", max=" + max + + ", min=" + min + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java new file mode 100644 index 0000000..02cc207 --- /dev/null +++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + + +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * Represents a collection of minute based aggregation of values for + * resolution greater than a minute. + */ +public class MetricHostAggregate extends MetricAggregate { + + private long numberOfSamples = 0; + + @JsonCreator + public MetricHostAggregate() { + super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE); + } + + public MetricHostAggregate(Double sum, int numberOfSamples, + Double deviation, + Double max, Double min) { + super(sum, deviation, max, min); + this.numberOfSamples = numberOfSamples; + } + + @JsonProperty("numberOfSamples") + long getNumberOfSamples() { + return numberOfSamples == 0 ? 1 : numberOfSamples; + } + + void updateNumberOfSamples(long count) { + this.numberOfSamples += count; + } + + public void setNumberOfSamples(long numberOfSamples) { + this.numberOfSamples = numberOfSamples; + } + + public double getAvg() { + return sum / numberOfSamples; + } + + /** + * Find and update min, max and avg for a minute + */ + void updateAggregates(MetricHostAggregate hostAggregate) { + updateMax(hostAggregate.getMax()); + updateMin(hostAggregate.getMin()); + updateSum(hostAggregate.getSum()); + updateNumberOfSamples(hostAggregate.getNumberOfSamples()); + } + + @Override + public String toString() { + return "MetricHostAggregate{" + + "sum=" + sum + + ", numberOfSamples=" + numberOfSamples + + ", deviation=" + deviation + + ", max=" + max + + ", min=" + min + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java index 0851d07..41eb30e 100644 --- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -27,41 +28,16 @@ import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; import java.io.IOException; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; +import java.sql.*; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics - .timeline.AbstractTimelineAggregator.MetricClusterAggregate; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractTimelineAggregator.MetricHostAggregate; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_ENCODING; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_METRICS_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricClusterAggregator.TimelineClusterMetric; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_ENCODING_SCHEME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RESULT_LIMIT; + .timeline.PhoenixTransactSQL.*; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .timeline.TimelineMetricConfiguration.*; /** * Provides a facade over the Phoenix API to access HBase schema http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java new file mode 100644 index 0000000..d227993 --- /dev/null +++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + +public class TimelineClusterMetric { + private String metricName; + private String appId; + private String instanceId; + private long timestamp; + private String type; + + TimelineClusterMetric(String metricName, String appId, String instanceId, + long timestamp, String type) { + this.metricName = metricName; + this.appId = appId; + this.instanceId = instanceId; + this.timestamp = timestamp; + this.type = type; + } + + String getMetricName() { + return metricName; + } + + String getAppId() { + return appId; + } + + String getInstanceId() { + return instanceId; + } + + long getTimestamp() { + return timestamp; + } + + String getType() { return type; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TimelineClusterMetric that = (TimelineClusterMetric) o; + + if (timestamp != that.timestamp) return false; + if (appId != null ? !appId.equals(that.appId) : that.appId != null) + return false; + if (instanceId != null ? !instanceId.equals(that.instanceId) : that.instanceId != null) + return false; + if (!metricName.equals(that.metricName)) return false; + + return true; + } + + public boolean equalsExceptTime(TimelineClusterMetric metric) { + if (!metricName.equals(metric.metricName)) return false; + if (!appId.equals(metric.appId)) return false; + if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null) + return false; + + return true; + } + @Override + public int hashCode() { + int result = metricName.hashCode(); + result = 31 * result + (appId != null ? appId.hashCode() : 0); + result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0); + result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); + return result; + } + + @Override + public String toString() { + return "TimelineClusterMetric{" + + "metricName='" + metricName + '\'' + + ", appId='" + appId + '\'' + + ", instanceId='" + instanceId + '\'' + + ", timestamp=" + timestamp + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java new file mode 100644 index 0000000..eaa1ab9 --- /dev/null +++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .timeline; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; + +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .timeline.PhoenixTransactSQL.Condition; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL; + +public class TimelineMetricAggregator extends AbstractTimelineAggregator { + private static final Log LOG = LogFactory.getLog + (TimelineMetricAggregator.class); + + private final String checkpointLocation; + private final Long sleepIntervalMillis; + private final Integer checkpointCutOffMultiplier; + private final String hostAggregatorDisabledParam; + private final String inputTableName; + private final String outputTableName; + + public TimelineMetricAggregator(PhoenixHBaseAccessor hBaseAccessor, + Configuration metricsConf, + String checkpointLocation, + Long sleepIntervalMillis, + Integer checkpointCutOffMultiplier, + String hostAggregatorDisabledParam, + String inputTableName, + String outputTableName) { + super(hBaseAccessor, metricsConf); + this.checkpointLocation = checkpointLocation; + this.sleepIntervalMillis = sleepIntervalMillis; + this.checkpointCutOffMultiplier = checkpointCutOffMultiplier; + this.hostAggregatorDisabledParam = hostAggregatorDisabledParam; + this.inputTableName = inputTableName; + this.outputTableName = outputTableName; + } + + @Override + protected String getCheckpointLocation() { + return checkpointLocation; + } + + @Override + protected void aggregate(ResultSet rs, long startTime, long endTime) + throws IOException, SQLException { + Map hostAggregateMap = + aggregateMetricsFromResultSet(rs); + + LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates."); + hBaseAccessor.saveHostAggregateRecords(hostAggregateMap, + outputTableName); + } + + @Override + protected Condition prepareMetricQueryCondition(long startTime, + long endTime) { + Condition condition = new Condition(null, null, null, null, startTime, + endTime, null, true); + condition.setNoLimit(); + condition.setFetchSize(resultsetFetchSize); + condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, + inputTableName)); + condition.addOrderByColumn("METRIC_NAME"); + condition.addOrderByColumn("HOSTNAME"); + condition.addOrderByColumn("APP_ID"); + condition.addOrderByColumn("INSTANCE_ID"); + condition.addOrderByColumn("SERVER_TIME"); + return condition; + } + + private Map aggregateMetricsFromResultSet + (ResultSet rs) throws IOException, SQLException { + TimelineMetric existingMetric = null; + MetricHostAggregate hostAggregate = null; + Map hostAggregateMap = + new HashMap(); + + while (rs.next()) { + TimelineMetric currentMetric = + PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs); + MetricHostAggregate currentHostAggregate = + PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs); + + if (existingMetric == null) { + // First row + existingMetric = currentMetric; + hostAggregate = new MetricHostAggregate(); + hostAggregateMap.put(currentMetric, hostAggregate); + } + + if (existingMetric.equalsExceptTime(currentMetric)) { + // Recalculate totals with current metric + hostAggregate.updateAggregates(currentHostAggregate); + + } else { + // Switched over to a new metric - save existing - create new aggregate + hostAggregate = new MetricHostAggregate(); + hostAggregate.updateAggregates(currentHostAggregate); + hostAggregateMap.put(currentMetric, hostAggregate); + existingMetric = currentMetric; + } + } + return hostAggregateMap; + } + + @Override + protected Long getSleepIntervalMillis() { + return sleepIntervalMillis; + } + + @Override + protected Integer getCheckpointCutOffMultiplier() { + return checkpointCutOffMultiplier; + } + + @Override + protected boolean isDisabled() { + return metricsConf.getBoolean(hostAggregatorDisabledParam, false); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java new file mode 100644 index 0000000..d0dafeb --- /dev/null +++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .timeline; + +import org.apache.commons.io.FilenameUtils; +import org.apache.hadoop.conf.Configuration; + +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .timeline.PhoenixTransactSQL.*; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .timeline.TimelineMetricConfiguration.*; + +/** + * + */ +public class TimelineMetricAggregatorFactory { + private static final String MINUTE_AGGREGATE_CHECKPOINT_FILE = + "timeline-metrics-host-aggregator-checkpoint"; + private static final String MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE = + "timeline-metrics-host-aggregator-hourly-checkpoint"; + + public static TimelineMetricAggregator createTimelineMetricAggregatorMinute + (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { + + String checkpointDir = metricsConf.get( + TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); + String checkpointLocation = FilenameUtils.concat(checkpointDir, + MINUTE_AGGREGATE_CHECKPOINT_FILE); + long sleepInterval = metricsConf.getLong + (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300000l); // 5 mins + int checkpointCutOffMultiplier = metricsConf.getInt + (HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3); + String hostAggregatorDisabledParam = HOST_AGGREGATOR_MINUTE_DISABLED; + + String inputTableName = METRICS_RECORD_TABLE_NAME; + String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME; + + return new TimelineMetricAggregator(hBaseAccessor, metricsConf, + checkpointLocation, + sleepInterval, + checkpointCutOffMultiplier, + hostAggregatorDisabledParam, + inputTableName, + outputTableName); + } + + public static TimelineMetricAggregator createTimelineMetricAggregatorHourly + (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { + + String checkpointDir = metricsConf.get( + TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); + String checkpointLocation = FilenameUtils.concat(checkpointDir, + MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE); + long sleepInterval = metricsConf.getLong + (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600000l); + int checkpointCutOffMultiplier = metricsConf.getInt + (HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2); + String hostAggregatorDisabledParam = HOST_AGGREGATOR_HOUR_DISABLED; + + String inputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME; + String outputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME; + + return new TimelineMetricAggregator(hBaseAccessor, metricsConf, + checkpointLocation, + sleepInterval, + checkpointCutOffMultiplier, + hostAggregatorDisabledParam, + inputTableName, + outputTableName); + } + + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java deleted file mode 100644 index 16f5ab9..0000000 --- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java +++ /dev/null @@ -1,198 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics - .timeline; - -import org.apache.commons.io.FilenameUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; - -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics - .timeline.PhoenixTransactSQL.*; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics - .timeline.TimelineMetricConfiguration.*; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics - .timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics - .timeline.TimelineMetricConfiguration - .HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics - .timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics - .timeline.TimelineMetricConfiguration - .TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR; - -public class TimelineMetricAggregatorHourly extends AbstractTimelineAggregator { - private static final Log LOG = LogFactory.getLog - (TimelineMetricAggregatorHourly.class); - private static final String MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE = - "timeline-metrics-host-aggregator-hourly-checkpoint"; - private final String checkpointLocation; - private final Long sleepIntervalMillis; - private final Integer checkpointCutOffMultiplier; - - public TimelineMetricAggregatorHourly(PhoenixHBaseAccessor hBaseAccessor, - Configuration metricsConf) { - - super(hBaseAccessor, metricsConf); - - String checkpointDir = metricsConf.get( - TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); - - checkpointLocation = FilenameUtils.concat(checkpointDir, - MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE); - - sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong - (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l)); - checkpointCutOffMultiplier = - metricsConf.getInt(HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2); - } - - @Override - protected String getCheckpointLocation() { - return checkpointLocation; - } - - @Override - protected boolean doWork(long startTime, long endTime) { - LOG.info("Start aggregation cycle @ " + new Date()); - - boolean success = true; - Condition condition = prepareMetricQueryCondition(startTime, endTime); - - Connection conn = null; - PreparedStatement stmt = null; - - try { - conn = hBaseAccessor.getConnection(); - stmt = prepareGetMetricsSqlStmt(conn, condition); - - ResultSet rs = stmt.executeQuery(); - Map hostAggregateMap = - aggregateMetricsFromResultSet(rs); - - LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates."); - - hBaseAccessor.saveHostAggregateRecords(hostAggregateMap, - METRICS_AGGREGATE_HOURLY_TABLE_NAME); - - } catch (SQLException e) { - LOG.error("Exception during aggregating metrics.", e); - success = false; - } catch (IOException e) { - LOG.error("Exception during aggregating metrics.", e); - success = false; - } finally { - if (stmt != null) { - try { - stmt.close(); - } catch (SQLException e) { - // Ignore - } - } - if (conn != null) { - try { - conn.close(); - } catch (SQLException sql) { - // Ignore - } - } - } - - LOG.info("End aggregation cycle @ " + new Date()); - return success; - } - - private Condition prepareMetricQueryCondition(long startTime, long endTime) { - Condition condition = new Condition(null, null, null, null, startTime, - endTime, null, true); - condition.setNoLimit(); - condition.setFetchSize(resultsetFetchSize); - condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, - METRICS_AGGREGATE_MINUTE_TABLE_NAME)); - condition.addOrderByColumn("METRIC_NAME"); - condition.addOrderByColumn("HOSTNAME"); - condition.addOrderByColumn("APP_ID"); - condition.addOrderByColumn("INSTANCE_ID"); - condition.addOrderByColumn("SERVER_TIME"); - return condition; - } - - private Map - aggregateMetricsFromResultSet(ResultSet rs) throws SQLException, IOException { - TimelineMetric existingMetric = null; - MetricHostAggregate hostAggregate = null; - Map hostAggregateMap = - new HashMap(); - - while (rs.next()) { - TimelineMetric currentMetric = - PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs); - MetricHostAggregate currentHostAggregate = - PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs); - - if (existingMetric == null) { - // First row - existingMetric = currentMetric; - hostAggregate = new MetricHostAggregate(); - hostAggregateMap.put(currentMetric, hostAggregate); - } - - if (existingMetric.equalsExceptTime(currentMetric)) { - // Recalculate totals with current metric - hostAggregate.updateAggregates(currentHostAggregate); - - } else { - // Switched over to a new metric - save existing - hostAggregate = new MetricHostAggregate(); - hostAggregate.updateAggregates(currentHostAggregate); - hostAggregateMap.put(currentMetric, hostAggregate); - existingMetric = currentMetric; - } - } - return hostAggregateMap; - } - - @Override - protected Long getSleepIntervalMillis() { - return sleepIntervalMillis; - } - - @Override - protected Integer getCheckpointCutOffMultiplier() { - return checkpointCutOffMultiplier; - } - - @Override - protected boolean isDisabled() { - return metricsConf.getBoolean(HOST_AGGREGATOR_HOUR_DISABLED, false); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorMinute.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorMinute.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorMinute.java deleted file mode 100644 index ac9d12e..0000000 --- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorMinute.java +++ /dev/null @@ -1,181 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - -import org.apache.commons.io.FilenameUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; - -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.prepareGetMetricsSqlStmt; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.*; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR; - -public class TimelineMetricAggregatorMinute extends AbstractTimelineAggregator { - private static final Log LOG = LogFactory.getLog(TimelineMetricAggregatorMinute.class); - private static final String MINUTE_AGGREGATE_CHECKPOINT_FILE = - "timeline-metrics-host-aggregator-checkpoint"; - private final String checkpointLocation; - private final Long sleepIntervalMillis; - private final Integer checkpointCutOffMultiplier; - - public TimelineMetricAggregatorMinute(PhoenixHBaseAccessor hBaseAccessor, - Configuration metricsConf) { - super(hBaseAccessor, metricsConf); - - String checkpointDir = metricsConf.get( - TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); - - checkpointLocation = FilenameUtils.concat(checkpointDir, - MINUTE_AGGREGATE_CHECKPOINT_FILE); - - sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong - (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l)); // 5 mins - checkpointCutOffMultiplier = - metricsConf.getInt(HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3); - } - - @Override - protected String getCheckpointLocation() { - return checkpointLocation; - } - - @Override - protected boolean doWork(long startTime, long endTime) { - LOG.info("Start aggregation cycle @ " + new Date() + ", " + - "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime)); - - boolean success = true; - Condition condition = new Condition(null, null, null, null, startTime, - endTime, null, true); - condition.setNoLimit(); - condition.setFetchSize(resultsetFetchSize); - condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, - METRICS_RECORD_TABLE_NAME)); - condition.addOrderByColumn("METRIC_NAME"); - condition.addOrderByColumn("HOSTNAME"); - condition.addOrderByColumn("APP_ID"); - condition.addOrderByColumn("INSTANCE_ID"); - condition.addOrderByColumn("SERVER_TIME"); - - Connection conn = null; - PreparedStatement stmt = null; - - try { - conn = hBaseAccessor.getConnection(); - stmt = prepareGetMetricsSqlStmt(conn, condition); - LOG.debug("Query issued @: " + new Date()); - ResultSet rs = stmt.executeQuery(); - LOG.debug("Query returned @: " + new Date()); - TimelineMetric existingMetric = null; - MetricHostAggregate hostAggregate = null; - - Map hostAggregateMap = - new HashMap(); - - while (rs.next()) { - TimelineMetric currentMetric = - PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs); - MetricHostAggregate currentHostAggregate = - PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs); - - if (existingMetric == null) { - // First row - existingMetric = currentMetric; - hostAggregate = new MetricHostAggregate(); - hostAggregateMap.put(currentMetric, hostAggregate); - } - - if (existingMetric.equalsExceptTime(currentMetric)) { - // Recalculate totals with current metric - hostAggregate.updateAggregates(currentHostAggregate); - - } else { - // Switched over to a new metric - create new aggregate - hostAggregate = new MetricHostAggregate(); - hostAggregate.updateAggregates(currentHostAggregate); - hostAggregateMap.put(currentMetric, hostAggregate); - existingMetric = currentMetric; - } - } - - LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates."); - - hBaseAccessor.saveHostAggregateRecords(hostAggregateMap, - METRICS_AGGREGATE_MINUTE_TABLE_NAME); - - } catch (SQLException e) { - LOG.error("Exception during aggregating metrics.", e); - success = false; - } catch (IOException e) { - LOG.error("Exception during aggregating metrics.", e); - success = false; - } finally { - if (stmt != null) { - try { - stmt.close(); - } catch (SQLException e) { - // Ignore - } - } - if (conn != null) { - try { - conn.close(); - } catch (SQLException sql) { - // Ignore - } - } - } - - LOG.info("End aggregation cycle @ " + new Date()); - return success; - } - - @Override - protected Long getSleepIntervalMillis() { - return sleepIntervalMillis; - } - - @Override - protected Integer getCheckpointCutOffMultiplier() { - return checkpointCutOffMultiplier; - } - - @Override - protected boolean isDisabled() { - return metricsConf.getBoolean(HOST_AGGREGATOR_MINUTE_DISABLED, false); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java index c52451e..96de1a9 100644 --- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java +++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + import org.apache.commons.io.FilenameUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -24,27 +25,18 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import java.io.IOException; -import java.sql.Connection; -import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.prepareGetMetricsSqlStmt; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.*; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .timeline.PhoenixTransactSQL.*; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .timeline.TimelineMetricConfiguration.*; /** * Aggregates a metric across all hosts in the cluster. Reads metrics from @@ -82,98 +74,79 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator return checkpointLocation; } - /** - * Read metrics written during the time interval and save the sum and total - * in the aggregate table. - * - * @param startTime Sample start time - * @param endTime Sample end time - */ - protected boolean doWork(long startTime, long endTime) { - LOG.info("Start aggregation cycle @ " + new Date() + ", " + - "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime)); - - boolean success = true; - Condition condition = prepareMetricQueryCondition(startTime, endTime); - - Connection conn; - PreparedStatement stmt; - - try { - conn = hBaseAccessor.getConnection(); - stmt = prepareGetMetricsSqlStmt(conn, condition); - LOG.debug("Query issued @: " + new Date()); - ResultSet rs = stmt.executeQuery(); - LOG.debug("Query returned @: " + new Date()); - Map aggregateClusterMetrics = - new HashMap(); - List timeSlices = new ArrayList(); - // Create time slices - long sliceStartTime = startTime; - while (sliceStartTime < endTime) { - timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis}); - sliceStartTime += timeSliceIntervalMillis; - } - - while (rs.next()) { - TimelineMetric metric = - PhoenixHBaseAccessor.getTimelineMetricFromResultSet(rs); - - Map clusterMetrics = - sliceFromTimelineMetric(metric, timeSlices); - - if (clusterMetrics != null && !clusterMetrics.isEmpty()) { - for (Map.Entry clusterMetricEntry : - clusterMetrics.entrySet()) { - TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey(); - MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric); - Double avgValue = clusterMetricEntry.getValue(); - - if (aggregate == null) { - aggregate = new MetricClusterAggregate(avgValue, 1, null, - avgValue, avgValue); - aggregateClusterMetrics.put(clusterMetric, aggregate); - } else { - aggregate.updateSum(avgValue); - aggregate.updateNumberOfHosts(1); - aggregate.updateMax(avgValue); - aggregate.updateMin(avgValue); - } - } - } - } - LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates."); - - hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics); - - LOG.info("End aggregation cycle @ " + new Date()); - - } catch (SQLException e) { - LOG.error("Exception during aggregating metrics.", e); - success = false; - } catch (IOException e) { - LOG.error("Exception during aggregating metrics.", e); - success = false; - } - - return success; + @Override + protected void aggregate(ResultSet rs, long startTime, long endTime) + throws SQLException, IOException { + List timeSlices = getTimeSlices(startTime, endTime); + Map + aggregateClusterMetrics = aggregateMetricsFromResultSet(rs, timeSlices); + + LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates."); + hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics); } - private Condition prepareMetricQueryCondition(long startTime, long endTime) { + @Override + protected Condition prepareMetricQueryCondition(long startTime, long endTime) { Condition condition = new Condition(null, null, null, null, startTime, - endTime, null, true); - condition.setFetchSize(resultsetFetchSize); + endTime, null, true); condition.setNoLimit(); + condition.setFetchSize(resultsetFetchSize); condition.setStatement(String.format(GET_METRIC_SQL, METRICS_RECORD_TABLE_NAME)); condition.addOrderByColumn("METRIC_NAME"); condition.addOrderByColumn("APP_ID"); condition.addOrderByColumn("INSTANCE_ID"); condition.addOrderByColumn("SERVER_TIME"); - return condition; } + private List getTimeSlices(long startTime, long endTime) { + List timeSlices = new ArrayList(); + long sliceStartTime = startTime; + while (sliceStartTime < endTime) { + timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis}); + sliceStartTime += timeSliceIntervalMillis; + } + return timeSlices; + } + + private Map + aggregateMetricsFromResultSet(ResultSet rs, List timeSlices) + throws SQLException, IOException { + Map aggregateClusterMetrics = + new HashMap(); + // Create time slices + + while (rs.next()) { + TimelineMetric metric = + PhoenixHBaseAccessor.getTimelineMetricFromResultSet(rs); + + Map clusterMetrics = + sliceFromTimelineMetric(metric, timeSlices); + + if (clusterMetrics != null && !clusterMetrics.isEmpty()) { + for (Map.Entry clusterMetricEntry : + clusterMetrics.entrySet()) { + TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey(); + MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric); + Double avgValue = clusterMetricEntry.getValue(); + + if (aggregate == null) { + aggregate = new MetricClusterAggregate(avgValue, 1, null, + avgValue, avgValue); + aggregateClusterMetrics.put(clusterMetric, aggregate); + } else { + aggregate.updateSum(avgValue); + aggregate.updateNumberOfHosts(1); + aggregate.updateMax(avgValue); + aggregate.updateMin(avgValue); + } + } + } + } + return aggregateClusterMetrics; + } + @Override protected Long getSleepIntervalMillis() { return sleepIntervalMillis; @@ -239,82 +212,4 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator return -1l; } - public static class TimelineClusterMetric { - private String metricName; - private String appId; - private String instanceId; - private long timestamp; - private String type; - - TimelineClusterMetric(String metricName, String appId, String instanceId, - long timestamp, String type) { - this.metricName = metricName; - this.appId = appId; - this.instanceId = instanceId; - this.timestamp = timestamp; - this.type = type; - } - - String getMetricName() { - return metricName; - } - - String getAppId() { - return appId; - } - - String getInstanceId() { - return instanceId; - } - - long getTimestamp() { - return timestamp; - } - - String getType() { return type; } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - TimelineClusterMetric that = (TimelineClusterMetric) o; - - if (timestamp != that.timestamp) return false; - if (appId != null ? !appId.equals(that.appId) : that.appId != null) - return false; - if (instanceId != null ? !instanceId.equals(that.instanceId) : that.instanceId != null) - return false; - if (!metricName.equals(that.metricName)) return false; - - return true; - } - - public boolean equalsExceptTime(TimelineClusterMetric metric) { - if (!metricName.equals(metric.metricName)) return false; - if (!appId.equals(metric.appId)) return false; - if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null) - return false; - - return true; - } - @Override - public int hashCode() { - int result = metricName.hashCode(); - result = 31 * result + (appId != null ? appId.hashCode() : 0); - result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0); - result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); - return result; - } - - @Override - public String toString() { - return "TimelineClusterMetric{" + - "metricName='" + metricName + '\'' + - ", appId='" + appId + '\'' + - ", instanceId='" + instanceId + '\'' + - ", timestamp=" + timestamp + - '}'; - } - } } http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java index e886b71..54d3fdd 100644 --- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java +++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java @@ -24,11 +24,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import java.io.IOException; -import java.sql.Connection; -import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.Date; import java.util.HashMap; import java.util.Map; @@ -40,19 +37,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics .timeline.PhoenixTransactSQL.*; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics - .timeline.TimelineMetricClusterAggregator.TimelineClusterMetric; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics .timeline.TimelineMetricConfiguration.*; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics - .timeline.TimelineMetricConfiguration - .CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics - .timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics - .timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics - .timeline.TimelineMetricConfiguration - .TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR; public class TimelineMetricClusterAggregatorHourly extends AbstractTimelineAggregator { @@ -77,7 +62,8 @@ public class TimelineMetricClusterAggregatorHourly extends sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong (CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l)); - checkpointCutOffIntervalMillis = 7200000l; + checkpointCutOffIntervalMillis = SECONDS.toMillis(metricsConf.getLong + (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL, 7200l)); checkpointCutOffMultiplier = metricsConf.getInt (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2); } @@ -88,60 +74,31 @@ public class TimelineMetricClusterAggregatorHourly extends } @Override - protected boolean doWork(long startTime, long endTime) { - LOG.info("Start aggregation cycle @ " + new Date() + ", " + - "startTime = " + new Date(startTime) + ", endTime = " + new Date - (endTime)); - - boolean success = true; - Condition condition = prepareMetricQueryCondition(startTime, endTime); - - Connection conn = null; - PreparedStatement stmt = null; - - try { - conn = hBaseAccessor.getConnection(); - stmt = prepareGetMetricsSqlStmt(conn, condition); - - ResultSet rs = stmt.executeQuery(); + protected void aggregate(ResultSet rs, long startTime, long endTime) + throws SQLException, IOException { Map hostAggregateMap = aggregateMetricsFromResultSet(rs); - LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates."); - - hBaseAccessor.saveClusterAggregateHourlyRecords( - hostAggregateMap, - METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME); - - } catch (SQLException e) { - LOG.error("Exception during aggregating metrics.", e); - success = false; - } catch (IOException e) { - LOG.error("Exception during aggregating metrics.", e); - success = false; - } finally { - if (stmt != null) { - try { - stmt.close(); - } catch (SQLException e) { - // Ignore - } - } - if (conn != null) { - try { - conn.close(); - } catch (SQLException sql) { - // Ignore - } - } - } + LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates."); + hBaseAccessor.saveClusterAggregateHourlyRecords(hostAggregateMap, + METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME); + } - LOG.info("End aggregation cycle @ " + new Date()); - return success; + @Override + protected Condition prepareMetricQueryCondition(long startTime, + long endTime) { + Condition condition = new Condition(null, null, null, null, startTime, + endTime, null, true); + condition.setNoLimit(); + condition.setFetchSize(resultsetFetchSize); + condition.setStatement(GET_CLUSTER_AGGREGATE_SQL); + condition.addOrderByColumn("METRIC_NAME"); + condition.addOrderByColumn("APP_ID"); + condition.addOrderByColumn("INSTANCE_ID"); + condition.addOrderByColumn("SERVER_TIME"); + return condition; } - // should rewrite from host agg to cluster agg - // private Map aggregateMetricsFromResultSet(ResultSet rs) throws IOException, SQLException { @@ -189,19 +146,6 @@ public class TimelineMetricClusterAggregatorHourly extends agg.updateNumberOfSamples(currentClusterAggregate.getNumberOfHosts()); } - private Condition prepareMetricQueryCondition(long startTime, long endTime) { - Condition condition = new Condition - (null, null, null, null, startTime, endTime, null, true); - condition.setNoLimit(); - condition.setFetchSize(resultsetFetchSize); - condition.setStatement(GET_CLUSTER_AGGREGATE_SQL); - condition.addOrderByColumn("METRIC_NAME"); - condition.addOrderByColumn("APP_ID"); - condition.addOrderByColumn("INSTANCE_ID"); - condition.addOrderByColumn("SERVER_TIME"); - return condition; - } - @Override protected Long getSleepIntervalMillis() { return sleepIntervalMillis; http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java index 6b19847..e022e5e 100644 --- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java +++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java @@ -86,6 +86,9 @@ public interface TimelineMetricConfiguration { public static final String CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER = "timeline.metrics.cluster.aggregator.hourly.checkpointCutOffMultiplier"; + public static final String CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL = + "timeline.metrics.cluster.aggregator.hourly.checkpointCutOffInterval"; + public static final String GLOBAL_RESULT_LIMIT = "timeline.metrics.service.default.result.limit";