Return-Path: X-Original-To: apmail-ambari-commits-archive@www.apache.org Delivered-To: apmail-ambari-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B0CE410070 for ; Mon, 1 Dec 2014 20:03:30 +0000 (UTC) Received: (qmail 16177 invoked by uid 500); 1 Dec 2014 20:03:30 -0000 Delivered-To: apmail-ambari-commits-archive@ambari.apache.org Received: (qmail 16079 invoked by uid 500); 1 Dec 2014 20:03:30 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 15910 invoked by uid 99); 1 Dec 2014 20:03:30 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Dec 2014 20:03:30 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 000069B4339; Mon, 1 Dec 2014 20:03:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: swagle@apache.org To: commits@ambari.apache.org Date: Mon, 01 Dec 2014 20:03:34 -0000 Message-Id: <2182039064ae4f0bb8b49a2ea1840d60@git.apache.org> In-Reply-To: <5744415574f64235a4b8a6ec2466d5b4@git.apache.org> References: <5744415574f64235a4b8a6ec2466d5b4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [06/22] ambari git commit: AMBARI-5707. Renaming a module. (swagle) http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java new file mode 100644 index 0000000..0d53f5f --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java @@ -0,0 +1,528 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +/** + * Encapsulate all metrics related SQL queries. + */ +public class PhoenixTransactSQL { + + static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class); + // TODO: Configurable TTL values + /** + * Create table to store individual metric records. + */ + public static final String CREATE_METRICS_TABLE_SQL = "CREATE TABLE IF NOT " + + "EXISTS METRIC_RECORD (METRIC_NAME VARCHAR, " + + "HOSTNAME VARCHAR, " + + "SERVER_TIME UNSIGNED_LONG NOT NULL, " + + "APP_ID VARCHAR, " + + "INSTANCE_ID VARCHAR, " + + "START_TIME UNSIGNED_LONG, " + + "UNITS CHAR(20), " + + "METRIC_SUM DOUBLE, " + + "METRIC_COUNT UNSIGNED_INT, " + + "METRIC_MAX DOUBLE, " + + "METRIC_MIN DOUBLE, " + + "METRICS VARCHAR CONSTRAINT pk " + + "PRIMARY KEY (METRIC_NAME, HOSTNAME, SERVER_TIME, APP_ID, " + + "INSTANCE_ID)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + + "TTL=%s, COMPRESSION='%s'"; + + public static final String CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS METRIC_RECORD_HOURLY " + + "(METRIC_NAME VARCHAR, " + + "HOSTNAME VARCHAR, " + + "APP_ID VARCHAR, " + + "INSTANCE_ID VARCHAR, " + + "SERVER_TIME UNSIGNED_LONG NOT NULL, " + + "UNITS CHAR(20), " + + "METRIC_SUM DOUBLE," + + "METRIC_COUNT UNSIGNED_INT, " + + "METRIC_MAX DOUBLE," + + "METRIC_MIN DOUBLE CONSTRAINT pk " + + "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " + + "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + + "TTL=%s, COMPRESSION='%s'"; + + public static final String CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS METRIC_RECORD_MINUTE " + + "(METRIC_NAME VARCHAR, " + + "HOSTNAME VARCHAR, " + + "APP_ID VARCHAR, " + + "INSTANCE_ID VARCHAR, " + + "SERVER_TIME UNSIGNED_LONG NOT NULL, " + + "UNITS CHAR(20), " + + "METRIC_SUM DOUBLE," + + "METRIC_COUNT UNSIGNED_INT, " + + "METRIC_MAX DOUBLE," + + "METRIC_MIN DOUBLE CONSTRAINT pk " + + "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " + + "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," + + " COMPRESSION='%s'"; + + public static final String CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE " + + "(METRIC_NAME VARCHAR, " + + "APP_ID VARCHAR, " + + "INSTANCE_ID VARCHAR, " + + "SERVER_TIME UNSIGNED_LONG NOT NULL, " + + "UNITS CHAR(20), " + + "METRIC_SUM DOUBLE, " + + "HOSTS_COUNT UNSIGNED_INT, " + + "METRIC_MAX DOUBLE, " + + "METRIC_MIN DOUBLE " + + "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " + + "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + + "TTL=%s, COMPRESSION='%s'"; + + public static final String CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE_HOURLY " + + "(METRIC_NAME VARCHAR, " + + "APP_ID VARCHAR, " + + "INSTANCE_ID VARCHAR, " + + "SERVER_TIME UNSIGNED_LONG NOT NULL, " + + "UNITS CHAR(20), " + + "METRIC_SUM DOUBLE, " + + "METRIC_COUNT UNSIGNED_INT, " + + "METRIC_MAX DOUBLE, " + + "METRIC_MIN DOUBLE " + + "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " + + "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + + "TTL=%s, COMPRESSION='%s'"; + + /** + * Insert into metric records table. + */ + public static final String UPSERT_METRICS_SQL = "UPSERT INTO %s " + + "(METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, " + + "UNITS, " + + "METRIC_SUM, " + + "METRIC_MAX, " + + "METRIC_MIN, " + + "METRIC_COUNT, " + + "METRICS) VALUES " + + "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + public static final String UPSERT_CLUSTER_AGGREGATE_SQL = "UPSERT INTO " + + "METRIC_AGGREGATE (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " + + "UNITS, " + + "METRIC_SUM, " + + "HOSTS_COUNT, " + + "METRIC_MAX, " + + "METRIC_MIN) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + public static final String UPSERT_CLUSTER_AGGREGATE_TIME_SQL = "UPSERT INTO" + + " %s (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " + + "UNITS, " + + "METRIC_SUM, " + + "METRIC_COUNT, " + + "METRIC_MAX, " + + "METRIC_MIN) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + + public static final String UPSERT_AGGREGATE_RECORD_SQL = "UPSERT INTO " + + "%s (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " + + "SERVER_TIME, " + + "UNITS, " + + "METRIC_SUM, " + + "METRIC_MAX, " + + "METRIC_MIN," + + "METRIC_COUNT) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + /** + * Retrieve a set of rows from metrics records table. + */ + public static final String GET_METRIC_SQL = "SELECT %s METRIC_NAME, " + + "HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, UNITS, " + + "METRIC_SUM, " + + "METRIC_MAX, " + + "METRIC_MIN, " + + "METRIC_COUNT, " + + "METRICS " + + "FROM %s"; + + public static final String GET_METRIC_AGGREGATE_ONLY_SQL = "SELECT %s " + + "METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, " + + "UNITS, " + + "METRIC_SUM, " + + "METRIC_MAX, " + + "METRIC_MIN, " + + "METRIC_COUNT " + + "FROM %s"; + + public static final String GET_CLUSTER_AGGREGATE_SQL = "SELECT %s " + + "METRIC_NAME, APP_ID, " + + "INSTANCE_ID, SERVER_TIME, " + + "UNITS, " + + "METRIC_SUM, " + + "HOSTS_COUNT, " + + "METRIC_MAX, " + + "METRIC_MIN " + + "FROM METRIC_AGGREGATE"; + + public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD"; + public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME = + "METRIC_RECORD_MINUTE"; + public static final String METRICS_AGGREGATE_HOURLY_TABLE_NAME = + "METRIC_RECORD_HOURLY"; + public static final String METRICS_CLUSTER_AGGREGATE_TABLE_NAME = + "METRIC_AGGREGATE"; + public static final String METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME = + "METRIC_AGGREGATE_HOURLY"; + public static final String DEFAULT_TABLE_COMPRESSION = "SNAPPY"; + public static final String DEFAULT_ENCODING = "FAST_DIFF"; + public static final long NATIVE_TIME_RANGE_DELTA = 120000; // 2 minutes + + /** Filter to optimize HBase scan by using file timestamps. This prevents + * a full table scan of metric records. + * @return Phoenix Hint String + */ + public static String getNaiveTimeRangeHint(Long startTime, Long delta) { + return String.format("/*+ NATIVE_TIME_RANGE(%s) */", (startTime - delta)); + } + + public static PreparedStatement prepareGetMetricsSqlStmt( + Connection connection, Condition condition) throws SQLException { + + if (condition.isEmpty()) { + throw new IllegalArgumentException("Condition is empty."); + } + String stmtStr; + if (condition.getStatement() != null) { + stmtStr = condition.getStatement(); + } else { + stmtStr = String.format(GET_METRIC_SQL, + getNaiveTimeRangeHint(condition.getStartTime(), NATIVE_TIME_RANGE_DELTA), + METRICS_RECORD_TABLE_NAME); + } + + StringBuilder sb = new StringBuilder(stmtStr); + sb.append(" WHERE "); + sb.append(condition.getConditionClause()); + String orderByClause = condition.getOrderByClause(); + + if (orderByClause != null) { + sb.append(orderByClause); + } else { + sb.append(" ORDER BY METRIC_NAME, SERVER_TIME "); + } + if (condition.getLimit() != null) { + sb.append(" LIMIT ").append(condition.getLimit()); + } + + LOG.debug("SQL: " + sb.toString() + ", condition: " + condition); + PreparedStatement stmt = connection.prepareStatement(sb.toString()); + int pos = 1; + if (condition.getMetricNames() != null) { + for (; pos <= condition.getMetricNames().size(); pos++) { + LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1)); + stmt.setString(pos, condition.getMetricNames().get(pos - 1)); + } + } + if (condition.getHostname() != null) { + LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname()); + stmt.setString(pos++, condition.getHostname()); + } + // TODO: Upper case all strings on POST + if (condition.getAppId() != null) { + // TODO: fix case of appId coming from host metrics + String appId = condition.getAppId(); + if (!condition.getAppId().equals("HOST")) { + appId = appId.toLowerCase(); + } + LOG.debug("Setting pos: " + pos + ", value: " + appId); + stmt.setString(pos++, appId); + } + if (condition.getInstanceId() != null) { + LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId()); + stmt.setString(pos++, condition.getInstanceId()); + } + if (condition.getStartTime() != null) { + LOG.debug("Setting pos: " + pos + ", value: " + condition.getStartTime()); + stmt.setLong(pos++, condition.getStartTime()); + } + if (condition.getEndTime() != null) { + LOG.debug("Setting pos: " + pos + ", value: " + condition.getEndTime()); + stmt.setLong(pos, condition.getEndTime()); + } + if (condition.getFetchSize() != null) { + stmt.setFetchSize(condition.getFetchSize()); + } + + return stmt; + } + + + public static PreparedStatement prepareGetAggregateSqlStmt( + Connection connection, Condition condition) throws SQLException { + + if (condition.isEmpty()) { + throw new IllegalArgumentException("Condition is empty."); + } + + StringBuilder sb = new StringBuilder(GET_CLUSTER_AGGREGATE_SQL); + sb.append(" WHERE "); + sb.append(condition.getConditionClause()); + sb.append(" ORDER BY METRIC_NAME, SERVER_TIME"); + if (condition.getLimit() != null) { + sb.append(" LIMIT ").append(condition.getLimit()); + } + + LOG.debug("SQL => " + sb.toString() + ", condition => " + condition); + PreparedStatement stmt = connection.prepareStatement(sb.toString()); + int pos = 1; + if (condition.getMetricNames() != null) { + for (; pos <= condition.getMetricNames().size(); pos++) { + stmt.setString(pos, condition.getMetricNames().get(pos - 1)); + } + } + // TODO: Upper case all strings on POST + if (condition.getAppId() != null) { + stmt.setString(pos++, condition.getAppId().toLowerCase()); + } + if (condition.getInstanceId() != null) { + stmt.setString(pos++, condition.getInstanceId()); + } + if (condition.getStartTime() != null) { + stmt.setLong(pos++, condition.getStartTime()); + } + if (condition.getEndTime() != null) { + stmt.setLong(pos, condition.getEndTime()); + } + + return stmt; + } + + static class Condition { + List metricNames; + String hostname; + String appId; + String instanceId; + Long startTime; + Long endTime; + Integer limit; + boolean grouped; + boolean noLimit = false; + Integer fetchSize; + String statement; + Set orderByColumns = new LinkedHashSet(); + + Condition(List metricNames, String hostname, String appId, + String instanceId, Long startTime, Long endTime, Integer limit, + boolean grouped) { + this.metricNames = metricNames; + this.hostname = hostname; + this.appId = appId; + this.instanceId = instanceId; + this.startTime = startTime; + this.endTime = endTime; + this.limit = limit; + this.grouped = grouped; + } + + String getStatement() { + return statement; + } + + void setStatement(String statement) { + this.statement = statement; + } + + List getMetricNames() { + return metricNames == null || metricNames.isEmpty() ? null : metricNames; + } + + String getMetricsClause() { + StringBuilder sb = new StringBuilder("("); + if (metricNames != null) { + for (String name : metricNames) { + if (sb.length() != 1) { + sb.append(", "); + } + sb.append("?"); + } + sb.append(")"); + return sb.toString(); + } else { + return null; + } + } + + String getConditionClause() { + StringBuilder sb = new StringBuilder(); + boolean appendConjunction = false; + + if (getMetricNames() != null) { + sb.append("METRIC_NAME IN "); + sb.append(getMetricsClause()); + appendConjunction = true; + } + if (appendConjunction) { + sb.append(" AND"); + } + appendConjunction = false; + if (getHostname() != null) { + sb.append(" HOSTNAME = ?"); + appendConjunction = true; + } + if (appendConjunction) { + sb.append(" AND"); + } + appendConjunction = false; + if (getAppId() != null) { + sb.append(" APP_ID = ?"); + appendConjunction = true; + } + if (appendConjunction) { + sb.append(" AND"); + } + appendConjunction = false; + if (getInstanceId() != null) { + sb.append(" INSTANCE_ID = ?"); + appendConjunction = true; + } + if (appendConjunction) { + sb.append(" AND"); + } + appendConjunction = false; + if (getStartTime() != null) { + sb.append(" SERVER_TIME >= ?"); + appendConjunction = true; + } + if (appendConjunction) { + sb.append(" AND"); + } + if (getEndTime() != null) { + sb.append(" SERVER_TIME < ?"); + } + return sb.toString(); + } + + String getHostname() { + return hostname == null || hostname.isEmpty() ? null : hostname; + } + + String getAppId() { + return appId == null || appId.isEmpty() ? null : appId; + } + + String getInstanceId() { + return instanceId == null || instanceId.isEmpty() ? null : instanceId; + } + + /** + * Convert to millis. + */ + Long getStartTime() { + if (startTime < 9999999999l) { + return startTime * 1000; + } else { + return startTime; + } + } + + Long getEndTime() { + if (endTime < 9999999999l) { + return endTime * 1000; + } else { + return endTime; + } + } + + void setNoLimit() { + this.noLimit = true; + } + + Integer getLimit() { + if (noLimit) { + return null; + } + return limit == null ? PhoenixHBaseAccessor.RESULTSET_LIMIT : limit; + } + + boolean isGrouped() { + return grouped; + } + + boolean isEmpty() { + return (metricNames == null || metricNames.isEmpty()) + && (hostname == null || hostname.isEmpty()) + && (appId == null || appId.isEmpty()) + && (instanceId == null || instanceId.isEmpty()) + && startTime == null + && endTime == null; + } + + Integer getFetchSize() { + return fetchSize; + } + + void setFetchSize(Integer fetchSize) { + this.fetchSize = fetchSize; + } + + void addOrderByColumn(String column) { + orderByColumns.add(column); + } + + String getOrderByClause() { + String orderByStr = " ORDER BY "; + if (!orderByColumns.isEmpty()) { + StringBuilder sb = new StringBuilder(orderByStr); + for (String orderByColumn : orderByColumns) { + if (sb.length() != orderByStr.length()) { + sb.append(", "); + } + sb.append(orderByColumn); + } + sb.append(" "); + return sb.toString(); + } + return null; + } + + @Override + public String toString() { + return "Condition{" + + "metricNames=" + metricNames + + ", hostname='" + hostname + '\'' + + ", appId='" + appId + '\'' + + ", instanceId='" + instanceId + '\'' + + ", startTime=" + startTime + + ", endTime=" + endTime + + ", limit=" + limit + + ", grouped=" + grouped + + ", orderBy=" + orderByColumns + + ", noLimit=" + noLimit + + '}'; + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java new file mode 100644 index 0000000..d227993 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + +public class TimelineClusterMetric { + private String metricName; + private String appId; + private String instanceId; + private long timestamp; + private String type; + + TimelineClusterMetric(String metricName, String appId, String instanceId, + long timestamp, String type) { + this.metricName = metricName; + this.appId = appId; + this.instanceId = instanceId; + this.timestamp = timestamp; + this.type = type; + } + + String getMetricName() { + return metricName; + } + + String getAppId() { + return appId; + } + + String getInstanceId() { + return instanceId; + } + + long getTimestamp() { + return timestamp; + } + + String getType() { return type; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TimelineClusterMetric that = (TimelineClusterMetric) o; + + if (timestamp != that.timestamp) return false; + if (appId != null ? !appId.equals(that.appId) : that.appId != null) + return false; + if (instanceId != null ? !instanceId.equals(that.instanceId) : that.instanceId != null) + return false; + if (!metricName.equals(that.metricName)) return false; + + return true; + } + + public boolean equalsExceptTime(TimelineClusterMetric metric) { + if (!metricName.equals(metric.metricName)) return false; + if (!appId.equals(metric.appId)) return false; + if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null) + return false; + + return true; + } + @Override + public int hashCode() { + int result = metricName.hashCode(); + result = 31 * result + (appId != null ? appId.hashCode() : 0); + result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0); + result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); + return result; + } + + @Override + public String toString() { + return "TimelineClusterMetric{" + + "metricName='" + metricName + '\'' + + ", appId='" + appId + '\'' + + ", instanceId='" + instanceId + '\'' + + ", timestamp=" + timestamp + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java new file mode 100644 index 0000000..cab154b --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL; + +public class TimelineMetricAggregator extends AbstractTimelineAggregator { + private static final Log LOG = LogFactory.getLog + (TimelineMetricAggregator.class); + + private final String checkpointLocation; + private final Long sleepIntervalMillis; + private final Integer checkpointCutOffMultiplier; + private final String hostAggregatorDisabledParam; + private final String tableName; + private final String outputTableName; + private final Long nativeTimeRangeDelay; + + public TimelineMetricAggregator(PhoenixHBaseAccessor hBaseAccessor, + Configuration metricsConf, + String checkpointLocation, + Long sleepIntervalMillis, + Integer checkpointCutOffMultiplier, + String hostAggregatorDisabledParam, + String tableName, + String outputTableName, + Long nativeTimeRangeDelay) { + super(hBaseAccessor, metricsConf); + this.checkpointLocation = checkpointLocation; + this.sleepIntervalMillis = sleepIntervalMillis; + this.checkpointCutOffMultiplier = checkpointCutOffMultiplier; + this.hostAggregatorDisabledParam = hostAggregatorDisabledParam; + this.tableName = tableName; + this.outputTableName = outputTableName; + this.nativeTimeRangeDelay = nativeTimeRangeDelay; + } + + @Override + protected String getCheckpointLocation() { + return checkpointLocation; + } + + @Override + protected void aggregate(ResultSet rs, long startTime, long endTime) + throws IOException, SQLException { + Map hostAggregateMap = + aggregateMetricsFromResultSet(rs); + + LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates."); + hBaseAccessor.saveHostAggregateRecords(hostAggregateMap, + outputTableName); + } + + @Override + protected Condition prepareMetricQueryCondition(long startTime, long endTime) { + Condition condition = new Condition(null, null, null, null, startTime, + endTime, null, true); + condition.setNoLimit(); + condition.setFetchSize(resultsetFetchSize); + condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, nativeTimeRangeDelay), + tableName)); + condition.addOrderByColumn("METRIC_NAME"); + condition.addOrderByColumn("HOSTNAME"); + condition.addOrderByColumn("APP_ID"); + condition.addOrderByColumn("INSTANCE_ID"); + condition.addOrderByColumn("SERVER_TIME"); + return condition; + } + + private Map aggregateMetricsFromResultSet + (ResultSet rs) throws IOException, SQLException { + TimelineMetric existingMetric = null; + MetricHostAggregate hostAggregate = null; + Map hostAggregateMap = + new HashMap(); + + while (rs.next()) { + TimelineMetric currentMetric = + PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs); + MetricHostAggregate currentHostAggregate = + PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs); + + if (existingMetric == null) { + // First row + existingMetric = currentMetric; + hostAggregate = new MetricHostAggregate(); + hostAggregateMap.put(currentMetric, hostAggregate); + } + + if (existingMetric.equalsExceptTime(currentMetric)) { + // Recalculate totals with current metric + hostAggregate.updateAggregates(currentHostAggregate); + } else { + // Switched over to a new metric - save existing - create new aggregate + hostAggregate = new MetricHostAggregate(); + hostAggregate.updateAggregates(currentHostAggregate); + hostAggregateMap.put(currentMetric, hostAggregate); + existingMetric = currentMetric; + } + } + return hostAggregateMap; + } + + @Override + protected Long getSleepIntervalMillis() { + return sleepIntervalMillis; + } + + @Override + protected Integer getCheckpointCutOffMultiplier() { + return checkpointCutOffMultiplier; + } + + @Override + protected boolean isDisabled() { + return metricsConf.getBoolean(hostAggregatorDisabledParam, false); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java new file mode 100644 index 0000000..8b10079 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + +import static java.util.concurrent.TimeUnit.SECONDS; +import org.apache.commons.io.FilenameUtils; +import org.apache.hadoop.conf.Configuration; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_DISABLED; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_DISABLED; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR; + +/** + * + */ +public class TimelineMetricAggregatorFactory { + private static final String MINUTE_AGGREGATE_CHECKPOINT_FILE = + "timeline-metrics-host-aggregator-checkpoint"; + private static final String MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE = + "timeline-metrics-host-aggregator-hourly-checkpoint"; + + public static TimelineMetricAggregator createTimelineMetricAggregatorMinute + (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { + + String checkpointDir = metricsConf.get( + TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); + String checkpointLocation = FilenameUtils.concat(checkpointDir, + MINUTE_AGGREGATE_CHECKPOINT_FILE); + long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong + (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l)); // 5 mins + + int checkpointCutOffMultiplier = metricsConf.getInt + (HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3); + String hostAggregatorDisabledParam = HOST_AGGREGATOR_MINUTE_DISABLED; + + String inputTableName = METRICS_RECORD_TABLE_NAME; + String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME; + + return new TimelineMetricAggregator(hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + hostAggregatorDisabledParam, + inputTableName, + outputTableName, + 120000l); + } + + public static TimelineMetricAggregator createTimelineMetricAggregatorHourly + (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { + + String checkpointDir = metricsConf.get( + TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); + String checkpointLocation = FilenameUtils.concat(checkpointDir, + MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE); + long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong + (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l)); + + int checkpointCutOffMultiplier = metricsConf.getInt + (HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2); + String hostAggregatorDisabledParam = HOST_AGGREGATOR_HOUR_DISABLED; + + String inputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME; + String outputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME; + + return new TimelineMetricAggregator(hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + hostAggregatorDisabledParam, + inputTableName, + outputTableName, + 3600000l); + } + + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java new file mode 100644 index 0000000..654c188 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + + +import org.apache.commons.io.FilenameUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR; + +/** + * Aggregates a metric across all hosts in the cluster. Reads metrics from + * the precision table and saves into the aggregate. + */ +public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator { + private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregator.class); + private static final String CLUSTER_AGGREGATOR_CHECKPOINT_FILE = + "timeline-metrics-cluster-aggregator-checkpoint"; + private final String checkpointLocation; + private final Long sleepIntervalMillis; + public final int timeSliceIntervalMillis; + private final Integer checkpointCutOffMultiplier; + + public TimelineMetricClusterAggregator(PhoenixHBaseAccessor hBaseAccessor, + Configuration metricsConf) { + super(hBaseAccessor, metricsConf); + + String checkpointDir = metricsConf.get( + TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); + + checkpointLocation = FilenameUtils.concat(checkpointDir, + CLUSTER_AGGREGATOR_CHECKPOINT_FILE); + + sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong + (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 120l)); + timeSliceIntervalMillis = (int)SECONDS.toMillis(metricsConf.getInt + (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 15)); + checkpointCutOffMultiplier = + metricsConf.getInt(CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2); + } + + @Override + protected String getCheckpointLocation() { + return checkpointLocation; + } + + @Override + protected void aggregate(ResultSet rs, long startTime, long endTime) + throws SQLException, IOException { + List timeSlices = getTimeSlices(startTime, endTime); + Map + aggregateClusterMetrics = aggregateMetricsFromResultSet(rs, timeSlices); + + LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates."); + hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics); + } + + @Override + protected Condition prepareMetricQueryCondition(long startTime, long endTime) { + Condition condition = new Condition(null, null, null, null, startTime, + endTime, null, true); + condition.setNoLimit(); + condition.setFetchSize(resultsetFetchSize); + condition.setStatement(String.format(GET_METRIC_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), + METRICS_RECORD_TABLE_NAME)); + condition.addOrderByColumn("METRIC_NAME"); + condition.addOrderByColumn("APP_ID"); + condition.addOrderByColumn("INSTANCE_ID"); + condition.addOrderByColumn("SERVER_TIME"); + return condition; + } + + private List getTimeSlices(long startTime, long endTime) { + List timeSlices = new ArrayList(); + long sliceStartTime = startTime; + while (sliceStartTime < endTime) { + timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis}); + sliceStartTime += timeSliceIntervalMillis; + } + return timeSlices; + } + + private Map + aggregateMetricsFromResultSet(ResultSet rs, List timeSlices) + throws SQLException, IOException { + Map aggregateClusterMetrics = + new HashMap(); + // Create time slices + + while (rs.next()) { + TimelineMetric metric = + PhoenixHBaseAccessor.getTimelineMetricFromResultSet(rs); + + Map clusterMetrics = + sliceFromTimelineMetric(metric, timeSlices); + + if (clusterMetrics != null && !clusterMetrics.isEmpty()) { + for (Map.Entry clusterMetricEntry : + clusterMetrics.entrySet()) { + TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey(); + MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric); + Double avgValue = clusterMetricEntry.getValue(); + + if (aggregate == null) { + aggregate = new MetricClusterAggregate(avgValue, 1, null, + avgValue, avgValue); + aggregateClusterMetrics.put(clusterMetric, aggregate); + } else { + aggregate.updateSum(avgValue); + aggregate.updateNumberOfHosts(1); + aggregate.updateMax(avgValue); + aggregate.updateMin(avgValue); + } + } + } + } + return aggregateClusterMetrics; + } + + @Override + protected Long getSleepIntervalMillis() { + return sleepIntervalMillis; + } + + @Override + protected Integer getCheckpointCutOffMultiplier() { + return checkpointCutOffMultiplier; + } + + @Override + protected boolean isDisabled() { + return metricsConf.getBoolean(CLUSTER_AGGREGATOR_MINUTE_DISABLED, false); + } + + private Map sliceFromTimelineMetric( + TimelineMetric timelineMetric, List timeSlices) { + + if (timelineMetric.getMetricValues().isEmpty()) { + return null; + } + + Map timelineClusterMetricMap = + new HashMap(); + + for (Map.Entry metric : timelineMetric.getMetricValues().entrySet()) { + // TODO: investigate null values - pre filter + if (metric.getValue() == null) { + continue; + } + Long timestamp = getSliceTimeForMetric(timeSlices, + Long.parseLong(metric.getKey().toString())); + if (timestamp != -1) { + // Metric is within desired time range + TimelineClusterMetric clusterMetric = new TimelineClusterMetric( + timelineMetric.getMetricName(), + timelineMetric.getAppId(), + timelineMetric.getInstanceId(), + timestamp, + timelineMetric.getType()); + if (!timelineClusterMetricMap.containsKey(clusterMetric)) { + timelineClusterMetricMap.put(clusterMetric, metric.getValue()); + } else { + Double oldValue = timelineClusterMetricMap.get(clusterMetric); + Double newValue = (oldValue + metric.getValue()) / 2; + timelineClusterMetricMap.put(clusterMetric, newValue); + } + } + } + + return timelineClusterMetricMap; + } + + /** + * Return beginning of the time slice into which the metric fits. + */ + private Long getSliceTimeForMetric(List timeSlices, Long timestamp) { + for (Long[] timeSlice : timeSlices) { + if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) { + return timeSlice[0]; + } + } + return -1l; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java new file mode 100644 index 0000000..7764ea3 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .timeline; + +import org.apache.commons.io.FilenameUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR; + +public class TimelineMetricClusterAggregatorHourly extends + AbstractTimelineAggregator { + private static final Log LOG = LogFactory.getLog + (TimelineMetricClusterAggregatorHourly.class); + private static final String CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE = + "timeline-metrics-cluster-aggregator-hourly-checkpoint"; + private final String checkpointLocation; + private final long sleepIntervalMillis; + private final Integer checkpointCutOffMultiplier; + private long checkpointCutOffIntervalMillis; + private static final Long NATIVE_TIME_RANGE_DELTA = 3600000l; // 1 hour + + public TimelineMetricClusterAggregatorHourly( + PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { + super(hBaseAccessor, metricsConf); + + String checkpointDir = metricsConf.get( + TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); + + checkpointLocation = FilenameUtils.concat(checkpointDir, + CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE); + + sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong + (CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l)); + checkpointCutOffIntervalMillis = SECONDS.toMillis(metricsConf.getLong + (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL, 7200l)); + checkpointCutOffMultiplier = metricsConf.getInt + (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2); + } + + @Override + protected String getCheckpointLocation() { + return checkpointLocation; + } + + @Override + protected void aggregate(ResultSet rs, long startTime, long endTime) + throws SQLException, IOException { + Map hostAggregateMap = + aggregateMetricsFromResultSet(rs); + + LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates."); + hBaseAccessor.saveClusterAggregateHourlyRecords(hostAggregateMap, + METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME); + } + + @Override + protected Condition prepareMetricQueryCondition(long startTime, + long endTime) { + Condition condition = new Condition(null, null, null, null, startTime, + endTime, null, true); + condition.setNoLimit(); + condition.setFetchSize(resultsetFetchSize); + condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA))); + condition.addOrderByColumn("METRIC_NAME"); + condition.addOrderByColumn("APP_ID"); + condition.addOrderByColumn("INSTANCE_ID"); + condition.addOrderByColumn("SERVER_TIME"); + return condition; + } + + private Map + aggregateMetricsFromResultSet(ResultSet rs) throws IOException, SQLException { + + TimelineClusterMetric existingMetric = null; + MetricHostAggregate hostAggregate = null; + Map hostAggregateMap = + new HashMap(); + + while (rs.next()) { + TimelineClusterMetric currentMetric = + getTimelineMetricClusterKeyFromResultSet(rs); + MetricClusterAggregate currentHostAggregate = + getMetricClusterAggregateFromResultSet(rs); + + if (existingMetric == null) { + // First row + existingMetric = currentMetric; + hostAggregate = new MetricHostAggregate(); + hostAggregateMap.put(currentMetric, hostAggregate); + } + + if (existingMetric.equalsExceptTime(currentMetric)) { + // Recalculate totals with current metric + updateAggregatesFromHost(hostAggregate, currentHostAggregate); + + } else { + // Switched over to a new metric - save existing + hostAggregate = new MetricHostAggregate(); + updateAggregatesFromHost(hostAggregate, currentHostAggregate); + hostAggregateMap.put(currentMetric, hostAggregate); + existingMetric = currentMetric; + } + + } + + return hostAggregateMap; + } + + private void updateAggregatesFromHost( + MetricHostAggregate agg, + MetricClusterAggregate currentClusterAggregate) { + agg.updateMax(currentClusterAggregate.getMax()); + agg.updateMin(currentClusterAggregate.getMin()); + agg.updateSum(currentClusterAggregate.getSum()); + agg.updateNumberOfSamples(currentClusterAggregate.getNumberOfHosts()); + } + + @Override + protected Long getSleepIntervalMillis() { + return sleepIntervalMillis; + } + + @Override + protected Integer getCheckpointCutOffMultiplier() { + return checkpointCutOffMultiplier; + } + + @Override + protected Long getCheckpointCutOffIntervalMillis() { + return checkpointCutOffIntervalMillis; + } + + @Override + protected boolean isDisabled() { + return metricsConf.getBoolean(CLUSTER_AGGREGATOR_HOUR_DISABLED, false); + } + + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java new file mode 100644 index 0000000..60833d0 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Configuration class that reads properties from ams-site.xml. All values + * for time or intervals are given in seconds. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface TimelineMetricConfiguration { + public static final String HBASE_SITE_CONFIGURATION_FILE = "hbase-site.xml"; + public static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml"; + + public static final String TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR = + "timeline.metrics.aggregator.checkpoint.dir"; + + public static final String DEFAULT_CHECKPOINT_LOCATION = + System.getProperty("java.io.tmpdir"); + + public static final String HBASE_ENCODING_SCHEME = + "timeline.metrics.hbase.data.block.encoding"; + + public static final String HBASE_COMPRESSION_SCHEME = + "timeline.metrics.hbase.compression.scheme"; + + public static final String PRECISION_TABLE_TTL = + "timeline.metrics.host.aggregator.ttl"; + public static final String HOST_MINUTE_TABLE_TTL = + "timeline.metrics.host.aggregator.minute.ttl"; + public static final String HOST_HOUR_TABLE_TTL = + "timeline.metrics.host.aggregator.hourly.ttl"; + public static final String CLUSTER_MINUTE_TABLE_TTL = + "timeline.metrics.cluster.aggregator.minute.ttl"; + public static final String CLUSTER_HOUR_TABLE_TTL = + "timeline.metrics.cluster.aggregator.hourly.ttl"; + + public static final String CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL = + "timeline.metrics.cluster.aggregator.minute.timeslice.interval"; + + public static final String AGGREGATOR_CHECKPOINT_DELAY = + "timeline.metrics.service.checkpointDelay"; + + public static final String RESULTSET_FETCH_SIZE = + "timeline.metrics.service.resultset.fetchSize"; + + public static final String HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL = + "timeline.metrics.host.aggregator.minute.interval"; + + public static final String HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL = + "timeline.metrics.host.aggregator.hourly.interval"; + + public static final String CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL = + "timeline.metrics.cluster.aggregator.minute.interval"; + + public static final String CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL = + "timeline.metrics.cluster.aggregator.hourly.interval"; + + public static final String HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER = + "timeline.metrics.host.aggregator.minute.checkpointCutOffMultiplier"; + + public static final String HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER = + "timeline.metrics.host.aggregator.hourly.checkpointCutOffMultiplier"; + + public static final String CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER = + "timeline.metrics.cluster.aggregator.minute.checkpointCutOffMultiplier"; + + public static final String CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER = + "timeline.metrics.cluster.aggregator.hourly.checkpointCutOffMultiplier"; + + public static final String CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL = + "timeline.metrics.cluster.aggregator.hourly.checkpointCutOffInterval"; + + public static final String GLOBAL_RESULT_LIMIT = + "timeline.metrics.service.default.result.limit"; + + public static final String GLOBAL_MAX_RETRIES = + "timeline.metrics.service.default.max_retries"; + + public static final String GLOBAL_RETRY_INTERVAL = + "timeline.metrics.service.default.retryInterval"; + + public static final String HOST_AGGREGATOR_MINUTE_DISABLED = + "timeline.metrics.host.aggregator.minute.disabled"; + + public static final String HOST_AGGREGATOR_HOUR_DISABLED = + "timeline.metrics.host.aggregator.hourly.disabled"; + + public static final String CLUSTER_AGGREGATOR_MINUTE_DISABLED = + "timeline.metrics.cluster.aggregator.minute.disabled"; + + public static final String CLUSTER_AGGREGATOR_HOUR_DISABLED = + "timeline.metrics.cluster.aggregator.hourly.disabled"; + + public static final String DISABLE_APPLICATION_TIMELINE_STORE = + "timeline.service.disable.application.timeline.store"; +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java new file mode 100644 index 0000000..5224450 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import java.io.IOException; +import java.sql.SQLException; +import java.util.List; + +public interface TimelineMetricStore { + /** + * This method retrieves metrics stored byu the Timeline store. + * + * @param metricNames Names of the metric, e.g.: cpu_user + * @param hostname Name of the host where the metric originated from + * @param applicationId Id of the application to which this metric belongs + * @param instanceId Application instance id. + * @param startTime Start timestamp + * @param endTime End timestamp + * @param limit Override default result limit + * @param groupedByHosts Group {@link TimelineMetric} by metric name, hostname, + * app id and instance id + * + * @return {@link TimelineMetric} + * @throws java.sql.SQLException + */ + TimelineMetrics getTimelineMetrics(List metricNames, String hostname, + String applicationId, String instanceId, Long startTime, + Long endTime, Integer limit, boolean groupedByHosts) + throws SQLException, IOException; + + + /** + * Return all records for a single metric satisfying the filter criteria. + * @return {@link TimelineMetric} + */ + TimelineMetric getTimelineMetric(String metricName, String hostname, + String applicationId, String instanceId, Long startTime, + Long endTime, Integer limit) + throws SQLException, IOException; + + + /** + * Stores metric information to the timeline store. Any errors occurring for + * individual put request objects will be reported in the response. + * + * @param metrics An {@link TimelineMetrics}. + * @return An {@link org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse}. + * @throws SQLException, IOException + */ + TimelinePutResponse putMetrics(TimelineMetrics metrics) + throws SQLException, IOException; +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptFinishData.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptFinishData.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptFinishData.java new file mode 100644 index 0000000..7ba51af --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptFinishData.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.util.Records; + +/** + * The class contains the fields that can be determined when + * RMAppAttempt finishes, and that need to be stored persistently. + */ +@Public +@Unstable +public abstract class ApplicationAttemptFinishData { + + @Public + @Unstable + public static ApplicationAttemptFinishData newInstance( + ApplicationAttemptId appAttemptId, String diagnosticsInfo, + String trackingURL, FinalApplicationStatus finalApplicationStatus, + YarnApplicationAttemptState yarnApplicationAttemptState) { + ApplicationAttemptFinishData appAttemptFD = + Records.newRecord(ApplicationAttemptFinishData.class); + appAttemptFD.setApplicationAttemptId(appAttemptId); + appAttemptFD.setDiagnosticsInfo(diagnosticsInfo); + appAttemptFD.setTrackingURL(trackingURL); + appAttemptFD.setFinalApplicationStatus(finalApplicationStatus); + appAttemptFD.setYarnApplicationAttemptState(yarnApplicationAttemptState); + return appAttemptFD; + } + + @Public + @Unstable + public abstract ApplicationAttemptId getApplicationAttemptId(); + + @Public + @Unstable + public abstract void setApplicationAttemptId( + ApplicationAttemptId applicationAttemptId); + + @Public + @Unstable + public abstract String getTrackingURL(); + + @Public + @Unstable + public abstract void setTrackingURL(String trackingURL); + + @Public + @Unstable + public abstract String getDiagnosticsInfo(); + + @Public + @Unstable + public abstract void setDiagnosticsInfo(String diagnosticsInfo); + + @Public + @Unstable + public abstract FinalApplicationStatus getFinalApplicationStatus(); + + @Public + @Unstable + public abstract void setFinalApplicationStatus( + FinalApplicationStatus finalApplicationStatus); + + @Public + @Unstable + public abstract YarnApplicationAttemptState getYarnApplicationAttemptState(); + + @Public + @Unstable + public abstract void setYarnApplicationAttemptState( + YarnApplicationAttemptState yarnApplicationAttemptState); + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptHistoryData.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptHistoryData.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptHistoryData.java new file mode 100644 index 0000000..b759ab1 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptHistoryData.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; + +/** + * The class contains all the fields that are stored persistently for + * RMAppAttempt. + */ +@Public +@Unstable +public class ApplicationAttemptHistoryData { + + private ApplicationAttemptId applicationAttemptId; + + private String host; + + private int rpcPort; + + private String trackingURL; + + private String diagnosticsInfo; + + private FinalApplicationStatus finalApplicationStatus; + + private ContainerId masterContainerId; + + private YarnApplicationAttemptState yarnApplicationAttemptState; + + @Public + @Unstable + public static ApplicationAttemptHistoryData newInstance( + ApplicationAttemptId appAttemptId, String host, int rpcPort, + ContainerId masterContainerId, String diagnosticsInfo, + String trackingURL, FinalApplicationStatus finalApplicationStatus, + YarnApplicationAttemptState yarnApplicationAttemptState) { + ApplicationAttemptHistoryData appAttemptHD = + new ApplicationAttemptHistoryData(); + appAttemptHD.setApplicationAttemptId(appAttemptId); + appAttemptHD.setHost(host); + appAttemptHD.setRPCPort(rpcPort); + appAttemptHD.setMasterContainerId(masterContainerId); + appAttemptHD.setDiagnosticsInfo(diagnosticsInfo); + appAttemptHD.setTrackingURL(trackingURL); + appAttemptHD.setFinalApplicationStatus(finalApplicationStatus); + appAttemptHD.setYarnApplicationAttemptState(yarnApplicationAttemptState); + return appAttemptHD; + } + + @Public + @Unstable + public ApplicationAttemptId getApplicationAttemptId() { + return applicationAttemptId; + } + + @Public + @Unstable + public void + setApplicationAttemptId(ApplicationAttemptId applicationAttemptId) { + this.applicationAttemptId = applicationAttemptId; + } + + @Public + @Unstable + public String getHost() { + return host; + } + + @Public + @Unstable + public void setHost(String host) { + this.host = host; + } + + @Public + @Unstable + public int getRPCPort() { + return rpcPort; + } + + @Public + @Unstable + public void setRPCPort(int rpcPort) { + this.rpcPort = rpcPort; + } + + @Public + @Unstable + public String getTrackingURL() { + return trackingURL; + } + + @Public + @Unstable + public void setTrackingURL(String trackingURL) { + this.trackingURL = trackingURL; + } + + @Public + @Unstable + public String getDiagnosticsInfo() { + return diagnosticsInfo; + } + + @Public + @Unstable + public void setDiagnosticsInfo(String diagnosticsInfo) { + this.diagnosticsInfo = diagnosticsInfo; + } + + @Public + @Unstable + public FinalApplicationStatus getFinalApplicationStatus() { + return finalApplicationStatus; + } + + @Public + @Unstable + public void setFinalApplicationStatus( + FinalApplicationStatus finalApplicationStatus) { + this.finalApplicationStatus = finalApplicationStatus; + } + + @Public + @Unstable + public ContainerId getMasterContainerId() { + return masterContainerId; + } + + @Public + @Unstable + public void setMasterContainerId(ContainerId masterContainerId) { + this.masterContainerId = masterContainerId; + } + + @Public + @Unstable + public YarnApplicationAttemptState getYarnApplicationAttemptState() { + return yarnApplicationAttemptState; + } + + @Public + @Unstable + public void setYarnApplicationAttemptState( + YarnApplicationAttemptState yarnApplicationAttemptState) { + this.yarnApplicationAttemptState = yarnApplicationAttemptState; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptStartData.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptStartData.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptStartData.java new file mode 100644 index 0000000..7ca43fa --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptStartData.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.util.Records; + +/** + * The class contains the fields that can be determined when + * RMAppAttempt starts, and that need to be stored persistently. + */ +@Public +@Unstable +public abstract class ApplicationAttemptStartData { + + @Public + @Unstable + public static ApplicationAttemptStartData newInstance( + ApplicationAttemptId appAttemptId, String host, int rpcPort, + ContainerId masterContainerId) { + ApplicationAttemptStartData appAttemptSD = + Records.newRecord(ApplicationAttemptStartData.class); + appAttemptSD.setApplicationAttemptId(appAttemptId); + appAttemptSD.setHost(host); + appAttemptSD.setRPCPort(rpcPort); + appAttemptSD.setMasterContainerId(masterContainerId); + return appAttemptSD; + } + + @Public + @Unstable + public abstract ApplicationAttemptId getApplicationAttemptId(); + + @Public + @Unstable + public abstract void setApplicationAttemptId( + ApplicationAttemptId applicationAttemptId); + + @Public + @Unstable + public abstract String getHost(); + + @Public + @Unstable + public abstract void setHost(String host); + + @Public + @Unstable + public abstract int getRPCPort(); + + @Public + @Unstable + public abstract void setRPCPort(int rpcPort); + + @Public + @Unstable + public abstract ContainerId getMasterContainerId(); + + @Public + @Unstable + public abstract void setMasterContainerId(ContainerId masterContainerId); + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationFinishData.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationFinishData.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationFinishData.java new file mode 100644 index 0000000..997fa6c --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationFinishData.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.util.Records; + +/** + * The class contains the fields that can be determined when RMApp + * finishes, and that need to be stored persistently. + */ +@Public +@Unstable +public abstract class ApplicationFinishData { + + @Public + @Unstable + public static ApplicationFinishData newInstance(ApplicationId applicationId, + long finishTime, String diagnosticsInfo, + FinalApplicationStatus finalApplicationStatus, + YarnApplicationState yarnApplicationState) { + ApplicationFinishData appFD = + Records.newRecord(ApplicationFinishData.class); + appFD.setApplicationId(applicationId); + appFD.setFinishTime(finishTime); + appFD.setDiagnosticsInfo(diagnosticsInfo); + appFD.setFinalApplicationStatus(finalApplicationStatus); + appFD.setYarnApplicationState(yarnApplicationState); + return appFD; + } + + @Public + @Unstable + public abstract ApplicationId getApplicationId(); + + @Public + @Unstable + public abstract void setApplicationId(ApplicationId applicationId); + + @Public + @Unstable + public abstract long getFinishTime(); + + @Public + @Unstable + public abstract void setFinishTime(long finishTime); + + @Public + @Unstable + public abstract String getDiagnosticsInfo(); + + @Public + @Unstable + public abstract void setDiagnosticsInfo(String diagnosticsInfo); + + @Public + @Unstable + public abstract FinalApplicationStatus getFinalApplicationStatus(); + + @Public + @Unstable + public abstract void setFinalApplicationStatus( + FinalApplicationStatus finalApplicationStatus); + + @Public + @Unstable + public abstract YarnApplicationState getYarnApplicationState(); + + @Public + @Unstable + public abstract void setYarnApplicationState( + YarnApplicationState yarnApplicationState); + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationHistoryData.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationHistoryData.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationHistoryData.java new file mode 100644 index 0000000..b7d16f3 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationHistoryData.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; + +/** + * The class contains all the fields that are stored persistently for + * RMApp. + */ +@Public +@Unstable +public class ApplicationHistoryData { + + private ApplicationId applicationId; + + private String applicationName; + + private String applicationType; + + private String user; + + private String queue; + + private long submitTime; + + private long startTime; + + private long finishTime; + + private String diagnosticsInfo; + + private FinalApplicationStatus finalApplicationStatus; + + private YarnApplicationState yarnApplicationState; + + @Public + @Unstable + public static ApplicationHistoryData newInstance(ApplicationId applicationId, + String applicationName, String applicationType, String queue, + String user, long submitTime, long startTime, long finishTime, + String diagnosticsInfo, FinalApplicationStatus finalApplicationStatus, + YarnApplicationState yarnApplicationState) { + ApplicationHistoryData appHD = new ApplicationHistoryData(); + appHD.setApplicationId(applicationId); + appHD.setApplicationName(applicationName); + appHD.setApplicationType(applicationType); + appHD.setQueue(queue); + appHD.setUser(user); + appHD.setSubmitTime(submitTime); + appHD.setStartTime(startTime); + appHD.setFinishTime(finishTime); + appHD.setDiagnosticsInfo(diagnosticsInfo); + appHD.setFinalApplicationStatus(finalApplicationStatus); + appHD.setYarnApplicationState(yarnApplicationState); + return appHD; + } + + @Public + @Unstable + public ApplicationId getApplicationId() { + return applicationId; + } + + @Public + @Unstable + public void setApplicationId(ApplicationId applicationId) { + this.applicationId = applicationId; + } + + @Public + @Unstable + public String getApplicationName() { + return applicationName; + } + + @Public + @Unstable + public void setApplicationName(String applicationName) { + this.applicationName = applicationName; + } + + @Public + @Unstable + public String getApplicationType() { + return applicationType; + } + + @Public + @Unstable + public void setApplicationType(String applicationType) { + this.applicationType = applicationType; + } + + @Public + @Unstable + public String getUser() { + return user; + } + + @Public + @Unstable + public void setUser(String user) { + this.user = user; + } + + @Public + @Unstable + public String getQueue() { + return queue; + } + + @Public + @Unstable + public void setQueue(String queue) { + this.queue = queue; + } + + @Public + @Unstable + public long getSubmitTime() { + return submitTime; + } + + @Public + @Unstable + public void setSubmitTime(long submitTime) { + this.submitTime = submitTime; + } + + @Public + @Unstable + public long getStartTime() { + return startTime; + } + + @Public + @Unstable + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + @Public + @Unstable + public long getFinishTime() { + return finishTime; + } + + @Public + @Unstable + public void setFinishTime(long finishTime) { + this.finishTime = finishTime; + } + + @Public + @Unstable + public String getDiagnosticsInfo() { + return diagnosticsInfo; + } + + @Public + @Unstable + public void setDiagnosticsInfo(String diagnosticsInfo) { + this.diagnosticsInfo = diagnosticsInfo; + } + + @Public + @Unstable + public FinalApplicationStatus getFinalApplicationStatus() { + return finalApplicationStatus; + } + + @Public + @Unstable + public void setFinalApplicationStatus( + FinalApplicationStatus finalApplicationStatus) { + this.finalApplicationStatus = finalApplicationStatus; + } + + @Public + @Unstable + public YarnApplicationState getYarnApplicationState() { + return this.yarnApplicationState; + } + + @Public + @Unstable + public void + setYarnApplicationState(YarnApplicationState yarnApplicationState) { + this.yarnApplicationState = yarnApplicationState; + } + +}