Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 222D918593 for ; Mon, 12 Oct 2015 17:11:03 +0000 (UTC) Received: (qmail 97999 invoked by uid 500); 12 Oct 2015 17:10:43 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 97727 invoked by uid 500); 12 Oct 2015 17:10:43 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 95800 invoked by uid 99); 12 Oct 2015 17:10:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Oct 2015 17:10:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 24311E057A; Mon, 12 Oct 2015 17:10:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sjlee@apache.org To: common-commits@hadoop.apache.org Date: Mon, 12 Oct 2015 17:11:12 -0000 Message-Id: <06bfc5f8767a4c23a614d92045742b0b@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [32/50] [abbrv] hadoop git commit: YARN-3904. Refactor timelineservice.storage to add support to online and offline aggregation writers (Li Lu via sjlee) YARN-3904. Refactor timelineservice.storage to add support to online and offline aggregation writers (Li Lu via sjlee) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0fd93f15 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0fd93f15 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0fd93f15 Branch: refs/heads/YARN-2928-rebase Commit: 0fd93f15781050aa1bad9834e7774d7b3a4c2983 Parents: aa927d8 Author: Sangjin Lee Authored: Mon Aug 17 16:48:58 2015 -0700 Committer: Sangjin Lee Committed: Sat Oct 10 15:45:42 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../dev-support/findbugs-exclude.xml | 7 +- .../hadoop/yarn/conf/YarnConfiguration.java | 10 + .../storage/OfflineAggregationWriter.java | 66 +++ .../PhoenixOfflineAggregationWriterImpl.java | 356 +++++++++++++ .../storage/PhoenixTimelineWriterImpl.java | 530 ------------------- .../storage/TimelineSchemaCreator.java | 45 +- .../storage/common/OfflineAggregationInfo.java | 110 ++++ ...TestPhoenixOfflineAggregationWriterImpl.java | 162 ++++++ .../storage/TestPhoenixTimelineWriterImpl.java | 152 ------ .../storage/TestTimelineWriterImpl.java | 74 --- 11 files changed, 754 insertions(+), 761 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fd93f15/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index d4e2c4f..374b254 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -88,6 +88,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1 YARN-3906. Split the application table from the entity table. (Sangjin Lee via junping_du) + YARN-3904. Refactor timelineservice.storage to add support to online and + offline aggregation writers (Li Lu via sjlee) + IMPROVEMENTS YARN-3276. Code cleanup for timeline service API records. (Junping Du via http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fd93f15/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 5583cd6..691170e 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -488,13 +488,12 @@ - + - - - + + http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fd93f15/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index e2e0635..073a8b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1783,6 +1783,16 @@ public class YarnConfiguration extends Configuration { public static final long DEFAULT_TIMELINE_DELEGATION_TOKEN_MAX_LIFETIME = 7*24*60*60*1000; // 7 days + // Timeline service v2 offlien aggregation related keys + public static final String TIMELINE_OFFLINE_AGGREGATION_PREFIX = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + "aggregation.offline."; + public static final String PHOENIX_OFFLINE_STORAGE_CONN_STR + = TIMELINE_OFFLINE_AGGREGATION_PREFIX + + "phoenix.connectionString"; + + public static final String PHOENIX_OFFLINE_STORAGE_CONN_STR_DEFAULT + = "jdbc:phoenix:localhost:2181:/hbase"; + // /////////////////////////////// // Shared Cache Configs // /////////////////////////////// http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fd93f15/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/OfflineAggregationWriter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/OfflineAggregationWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/OfflineAggregationWriter.java new file mode 100644 index 0000000..e1219e0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/OfflineAggregationWriter.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo; + +import java.io.IOException; + +/** + * YARN timeline service v2 offline aggregation storage interface + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public abstract class OfflineAggregationWriter extends AbstractService { + + /** + * Construct the offline writer. + * + * @param name service name + */ + public OfflineAggregationWriter(String name) { + super(name); + } + + /** + * Persist aggregated timeline entities to the offline store based on which + * track this entity is to be rolled up to. The tracks along which aggregations + * are to be done are given by {@link OfflineAggregationInfo}. + * + * @param context a {@link TimelineCollectorContext} object that describes the + * context information of the aggregated data. Depends on the + * type of the aggregation, some fields of this context maybe + * empty or null. + * @param entities {@link TimelineEntities} to be persisted. + * @param info an {@link OfflineAggregationInfo} object that describes the + * detail of the aggregation. Current supported option is + * {@link OfflineAggregationInfo#FLOW_AGGREGATION}. + * @return a {@link TimelineWriteResponse} object. + * @throws IOException + */ + abstract TimelineWriteResponse writeAggregatedEntity( + TimelineCollectorContext context, + TimelineEntities entities, OfflineAggregationInfo info) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fd93f15/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java new file mode 100644 index 0000000..4c1352c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java @@ -0,0 +1,356 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; +import org.apache.phoenix.util.PropertiesUtil; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +/** + * Offline aggregation Phoenix storage. This storage currently consists of two + * aggregation tables, one for flow level aggregation and one for user level + * aggregation. + * + * Example table record: + * + *
+ * |---------------------------|
+ * |  Primary   | Column Family|
+ * |  key       | metrics      |
+ * |---------------------------|
+ * | row_key    | metricId1:   |
+ * |            | metricValue1 |
+ * |            | @timestamp1  |
+ * |            |              |
+ * |            | metriciD1:   |
+ * |            | metricValue2 |
+ * |            | @timestamp2  |
+ * |            |              |
+ * |            | metricId2:   |
+ * |            | metricValue1 |
+ * |            | @timestamp2  |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |---------------------------|
+ * 
+ * + * For the flow aggregation table, the primary key contains user, cluster, and + * flow id. For user aggregation table,the primary key is user. + * + * Metrics column family stores all aggregated metrics for each record. + */ +@Private +@Unstable +public class PhoenixOfflineAggregationWriterImpl + extends OfflineAggregationWriter { + + private static final Log LOG + = LogFactory.getLog(PhoenixOfflineAggregationWriterImpl.class); + private static final String PHOENIX_COL_FAMILY_PLACE_HOLDER + = "timeline_cf_placeholder"; + + /** Default Phoenix JDBC driver name */ + private static final String DRIVER_CLASS_NAME + = "org.apache.phoenix.jdbc.PhoenixDriver"; + + /** Default Phoenix timeline config column family */ + private static final String METRIC_COLUMN_FAMILY = "m."; + /** Default Phoenix timeline info column family */ + private static final String INFO_COLUMN_FAMILY = "i."; + /** Default separator for Phoenix storage */ + private static final String AGGREGATION_STORAGE_SEPARATOR = ";"; + + /** Connection string to the deployed Phoenix cluster */ + private String connString = null; + private Properties connProperties = new Properties(); + + public PhoenixOfflineAggregationWriterImpl(Properties prop) { + super(PhoenixOfflineAggregationWriterImpl.class.getName()); + connProperties = PropertiesUtil.deepCopy(prop); + } + + public PhoenixOfflineAggregationWriterImpl() { + super(PhoenixOfflineAggregationWriterImpl.class.getName()); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + Class.forName(DRIVER_CLASS_NAME); + // so check it here and only read in the config if it's not overridden. + connString = + conf.get(YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR, + YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR_DEFAULT); + super.init(conf); + } + + @Override + public TimelineWriteResponse writeAggregatedEntity( + TimelineCollectorContext context, TimelineEntities entities, + OfflineAggregationInfo info) throws IOException { + TimelineWriteResponse response = new TimelineWriteResponse(); + String sql = "UPSERT INTO " + info.getTableName() + + " (" + StringUtils.join(info.getPrimaryKeyList(), ",") + + ", created_time, modified_time, metric_names) " + + "VALUES (" + + StringUtils.repeat("?,", info.getPrimaryKeyList().length) + + "?, ?, ?)"; + if (LOG.isDebugEnabled()) { + LOG.debug("TimelineEntity write SQL: " + sql); + } + + try (Connection conn = getConnection(); + PreparedStatement ps = conn.prepareStatement(sql)) { + for (TimelineEntity entity : entities.getEntities()) { + HashMap formattedMetrics = new HashMap<>(); + if (entity.getMetrics() != null) { + for (TimelineMetric m : entity.getMetrics()) { + formattedMetrics.put(m.getId(), m); + } + } + int idx = info.setStringsForPrimaryKey(ps, context, null, 1); + ps.setLong(idx++, entity.getCreatedTime()); + ps.setLong(idx++, entity.getModifiedTime()); + ps.setString(idx++, StringUtils.join(formattedMetrics.keySet().toArray(), + AGGREGATION_STORAGE_SEPARATOR)); + ps.execute(); + + storeEntityVariableLengthFields(entity, formattedMetrics, context, conn, + info); + + conn.commit(); + } + } catch (SQLException se) { + LOG.error("Failed to add entity to Phoenix " + se.getMessage()); + throw new IOException(se); + } catch (Exception e) { + LOG.error("Exception on getting connection: " + e.getMessage()); + throw new IOException(e); + } + return response; + } + + /** + * Create Phoenix tables for offline aggregation storage if the tables do not + * exist. + * + * @throws IOException + */ + public void createPhoenixTables() throws IOException { + // Create tables if necessary + try (Connection conn = getConnection(); + Statement stmt = conn.createStatement()) { + // Table schema defined as in YARN-3817. + String sql = "CREATE TABLE IF NOT EXISTS " + + OfflineAggregationInfo.FLOW_AGGREGATION_TABLE_NAME + + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, " + + "flow_name VARCHAR NOT NULL, " + + "created_time UNSIGNED_LONG, modified_time UNSIGNED_LONG, " + + METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARBINARY, " + + "metric_names VARCHAR, info_keys VARCHAR " + + "CONSTRAINT pk PRIMARY KEY(" + + "user, cluster, flow_name))"; + stmt.executeUpdate(sql); + sql = "CREATE TABLE IF NOT EXISTS " + + OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME + + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, " + + "created_time UNSIGNED_LONG, modified_time UNSIGNED_LONG, " + + METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARBINARY, " + + "metric_names VARCHAR, info_keys VARCHAR " + + "CONSTRAINT pk PRIMARY KEY(user, cluster))"; + stmt.executeUpdate(sql); + conn.commit(); + } catch (SQLException se) { + LOG.error("Failed in init data " + se.getLocalizedMessage()); + throw new IOException(se); + } + return; + } + + // Utility functions + @Private + @VisibleForTesting + Connection getConnection() throws IOException { + Connection conn; + try { + conn = DriverManager.getConnection(connString, connProperties); + conn.setAutoCommit(false); + } catch (SQLException se) { + LOG.error("Failed to connect to phoenix server! " + + se.getLocalizedMessage()); + throw new IOException(se); + } + return conn; + } + + // WARNING: This method will permanently drop a table! + @Private + @VisibleForTesting + void dropTable(String tableName) throws Exception { + try (Connection conn = getConnection(); + Statement stmt = conn.createStatement()) { + String sql = "DROP TABLE " + tableName; + stmt.executeUpdate(sql); + } catch (SQLException se) { + LOG.error("Failed in dropping entity table " + se.getLocalizedMessage()); + throw se; + } + } + + private static class DynamicColumns { + static final String COLUMN_FAMILY_TYPE_BYTES = " VARBINARY"; + static final String COLUMN_FAMILY_TYPE_STRING = " VARCHAR"; + String columnFamilyPrefix; + String type; + Set columns; + + public DynamicColumns(String columnFamilyPrefix, String type, + Set keyValues) { + this.columnFamilyPrefix = columnFamilyPrefix; + this.columns = keyValues; + this.type = type; + } + } + + private static StringBuilder appendColumnsSQL( + StringBuilder colNames, DynamicColumns cfInfo) { + // Prepare the sql template by iterating through all keys + for (K key : cfInfo.columns) { + colNames.append(",").append(cfInfo.columnFamilyPrefix) + .append(key.toString()).append(cfInfo.type); + } + return colNames; + } + + private static int setValuesForColumnFamily( + PreparedStatement ps, Map keyValues, int startPos, + boolean converToBytes) throws SQLException { + int idx = startPos; + for (Map.Entry entry : keyValues.entrySet()) { + V value = entry.getValue(); + if (value instanceof Collection) { + ps.setString(idx++, StringUtils.join( + (Collection) value, AGGREGATION_STORAGE_SEPARATOR)); + } else { + if (converToBytes) { + try { + ps.setBytes(idx++, GenericObjectMapper.write(entry.getValue())); + } catch (IOException ie) { + LOG.error("Exception in converting values into bytes " + + ie.getMessage()); + throw new SQLException(ie); + } + } else { + ps.setString(idx++, value.toString()); + } + } + } + return idx; + } + + private static int setBytesForColumnFamily( + PreparedStatement ps, Map keyValues, int startPos) + throws SQLException { + return setValuesForColumnFamily(ps, keyValues, startPos, true); + } + + private static int setStringsForColumnFamily( + PreparedStatement ps, Map keyValues, int startPos) + throws SQLException { + return setValuesForColumnFamily(ps, keyValues, startPos, false); + } + + private static void storeEntityVariableLengthFields(TimelineEntity entity, + Map formattedMetrics, + TimelineCollectorContext context, Connection conn, + OfflineAggregationInfo aggregationInfo) throws SQLException { + int numPlaceholders = 0; + StringBuilder columnDefs = new StringBuilder( + StringUtils.join(aggregationInfo.getPrimaryKeyList(), ",")); + if (formattedMetrics != null && formattedMetrics.size() > 0) { + appendColumnsSQL(columnDefs, new DynamicColumns<>( + METRIC_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_BYTES, + formattedMetrics.keySet())); + numPlaceholders += formattedMetrics.keySet().size(); + } + if (numPlaceholders == 0) { + return; + } + StringBuilder placeholders = new StringBuilder(); + placeholders.append( + StringUtils.repeat("?,", aggregationInfo.getPrimaryKeyList().length)); + // numPlaceholders >= 1 now + placeholders.append("?") + .append(StringUtils.repeat(",?", numPlaceholders - 1)); + String sqlVariableLengthFields = new StringBuilder("UPSERT INTO ") + .append(aggregationInfo.getTableName()).append(" (").append(columnDefs) + .append(") VALUES(").append(placeholders).append(")").toString(); + if (LOG.isDebugEnabled()) { + LOG.debug("SQL statement for variable length fields: " + + sqlVariableLengthFields); + } + // Use try with resource statement for the prepared statement + try (PreparedStatement psVariableLengthFields = + conn.prepareStatement(sqlVariableLengthFields)) { + int idx = aggregationInfo.setStringsForPrimaryKey( + psVariableLengthFields, context, null, 1); + if (formattedMetrics != null && formattedMetrics.size() > 0) { + idx = setBytesForColumnFamily( + psVariableLengthFields, formattedMetrics, idx); + } + psVariableLengthFields.execute(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fd93f15/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java deleted file mode 100644 index 381ff17..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java +++ /dev/null @@ -1,530 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; -import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.Collection; -import java.util.Iterator; -import java.util.Map; -import java.util.Properties; -import java.util.Set; - -@Private -@Unstable -public class PhoenixTimelineWriterImpl extends AbstractService - implements TimelineWriter { - - public static final String TIMELINE_SERVICE_PHOENIX_STORAGE_CONN_STR - = YarnConfiguration.TIMELINE_SERVICE_PREFIX - + "writer.phoenix.connectionString"; - - public static final String TIMELINE_SERVICE_PHEONIX_STORAGE_CONN_STR_DEFAULT - = "jdbc:phoenix:localhost:2181:/hbase"; - - private static final Log LOG - = LogFactory.getLog(PhoenixTimelineWriterImpl.class); - private static final String PHOENIX_COL_FAMILY_PLACE_HOLDER - = "timeline_cf_placeholder"; - // These lists are not taking effects in table creations. - private static final String[] PHOENIX_STORAGE_PK_LIST - = {"cluster", "user", "flow_name", "flow_version", "flow_run", "app_id", - "type", "entity_id"}; - private static final String[] TIMELINE_EVENT_EXTRA_PK_LIST = - {"timestamp", "event_id"}; - private static final String[] TIMELINE_METRIC_EXTRA_PK_LIST = - {"metric_id"}; - /** Default Phoenix JDBC driver name */ - private static final String DRIVER_CLASS_NAME - = "org.apache.phoenix.jdbc.PhoenixDriver"; - - /** Default Phoenix timeline entity table name */ - @VisibleForTesting - static final String ENTITY_TABLE_NAME = "timeline_entity"; - /** Default Phoenix event table name */ - @VisibleForTesting - static final String EVENT_TABLE_NAME = "timeline_event"; - /** Default Phoenix metric table name */ - @VisibleForTesting - static final String METRIC_TABLE_NAME = "metric_singledata"; - - /** Default Phoenix timeline config column family */ - private static final String CONFIG_COLUMN_FAMILY = "c."; - /** Default Phoenix timeline info column family */ - private static final String INFO_COLUMN_FAMILY = "i."; - /** Default Phoenix event info column family */ - private static final String EVENT_INFO_COLUMN_FAMILY = "ei."; - /** Default Phoenix isRelatedTo column family */ - private static final String IS_RELATED_TO_FAMILY = "ir."; - /** Default Phoenix relatesTo column family */ - private static final String RELATES_TO_FAMILY = "rt."; - /** Default separator for Phoenix storage */ - private static final String PHOENIX_STORAGE_SEPARATOR = ";"; - - /** Connection string to the deployed Phoenix cluster */ - @VisibleForTesting - String connString = null; - @VisibleForTesting - Properties connProperties = new Properties(); - - PhoenixTimelineWriterImpl() { - super((PhoenixTimelineWriterImpl.class.getName())); - } - - @Override - protected void serviceInit(Configuration conf) throws Exception { - // so check it here and only read in the config if it's not overridden. - connString = - conf.get(TIMELINE_SERVICE_PHOENIX_STORAGE_CONN_STR, - TIMELINE_SERVICE_PHEONIX_STORAGE_CONN_STR_DEFAULT); - createTables(); - super.init(conf); - } - - @Override - protected void serviceStop() throws Exception { - super.serviceStop(); - } - - @Override - public TimelineWriteResponse write(String clusterId, String userId, - String flowName, String flowVersion, long flowRunId, String appId, - TimelineEntities entities) throws IOException { - TimelineWriteResponse response = new TimelineWriteResponse(); - TimelineCollectorContext currContext = new TimelineCollectorContext( - clusterId, userId, flowName, flowVersion, flowRunId, appId); - String sql = "UPSERT INTO " + ENTITY_TABLE_NAME - + " (" + StringUtils.join(PHOENIX_STORAGE_PK_LIST, ",") - + ", creation_time, modified_time, configs) " - + "VALUES (" + StringUtils.repeat("?,", PHOENIX_STORAGE_PK_LIST.length) - + "?, ?, ?)"; - if (LOG.isDebugEnabled()) { - LOG.debug("TimelineEntity write SQL: " + sql); - } - - try (Connection conn = getConnection(); - PreparedStatement ps = conn.prepareStatement(sql)) { - for (TimelineEntity entity : entities.getEntities()) { - int idx = setStringsForPrimaryKey(ps, currContext, entity, 1); - ps.setLong(idx++, entity.getCreatedTime()); - ps.setLong(idx++, entity.getModifiedTime()); - String configKeys = StringUtils.join( - entity.getConfigs().keySet(), PHOENIX_STORAGE_SEPARATOR); - ps.setString(idx++, configKeys); - ps.execute(); - - storeEntityVariableLengthFields(entity, currContext, conn); - storeEvents(entity, currContext, conn); - storeMetrics(entity, currContext, conn); - - conn.commit(); - } - } catch (SQLException se) { - LOG.error("Failed to add entity to Phoenix " + se.getMessage()); - throw new IOException(se); - } catch (Exception e) { - LOG.error("Exception on getting connection: " + e.getMessage()); - throw new IOException(e); - } - return response; - } - - /** - * Aggregates the entity information to the timeline store based on which - * track this entity is to be rolled up to The tracks along which aggregations - * are to be done are given by {@link TimelineAggregationTrack} - * - * Any errors occurring for individual write request objects will be reported - * in the response. - * - * @param data - * a {@link TimelineEntity} object - * a {@link TimelineAggregationTrack} enum value - * @return a {@link TimelineWriteResponse} object. - * @throws IOException - */ - @Override - public TimelineWriteResponse aggregate(TimelineEntity data, - TimelineAggregationTrack track) throws IOException { - return null; - - } - - @Override - public void flush() throws IOException { - // currently no-op - } - - // Utility functions - @Private - @VisibleForTesting - Connection getConnection() throws IOException { - Connection conn; - try { - Class.forName(DRIVER_CLASS_NAME); - conn = DriverManager.getConnection(connString, connProperties); - conn.setAutoCommit(false); - } catch (SQLException se) { - LOG.error("Failed to connect to phoenix server! " - + se.getLocalizedMessage()); - throw new IOException(se); - } catch (ClassNotFoundException e) { - LOG.error("Class not found! " + e.getLocalizedMessage()); - throw new IOException(e); - } - return conn; - } - - private void createTables() throws Exception { - // Create tables if necessary - try (Connection conn = getConnection(); - Statement stmt = conn.createStatement()) { - // Table schema defined as in YARN-3134. - String sql = "CREATE TABLE IF NOT EXISTS " + ENTITY_TABLE_NAME - + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, " - + "flow_name VARCHAR NOT NULL, flow_version VARCHAR NOT NULL, " - + "flow_run UNSIGNED_LONG NOT NULL, " - + "app_id VARCHAR NOT NULL, type VARCHAR NOT NULL, " - + "entity_id VARCHAR NOT NULL, " - + "creation_time UNSIGNED_LONG, modified_time UNSIGNED_LONG, " - + "configs VARCHAR, " - + CONFIG_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARCHAR, " - + INFO_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARBINARY, " - + IS_RELATED_TO_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARCHAR, " - + RELATES_TO_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARCHAR " - + "CONSTRAINT pk PRIMARY KEY(" - + "user, cluster, flow_name, flow_version, flow_run DESC, app_id, " - + "type, entity_id))"; - stmt.executeUpdate(sql); - sql = "CREATE TABLE IF NOT EXISTS " + EVENT_TABLE_NAME - + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, " - + "flow_name VARCHAR NOT NULL, flow_version VARCHAR NOT NULL, " - + "flow_run UNSIGNED_LONG NOT NULL, " - + "app_id VARCHAR NOT NULL, type VARCHAR NOT NULL, " - + "entity_id VARCHAR NOT NULL, " - + "timestamp UNSIGNED_LONG NOT NULL, event_id VARCHAR NOT NULL, " - + EVENT_INFO_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARBINARY " - + "CONSTRAINT pk PRIMARY KEY(" - + "user, cluster, flow_name, flow_version, flow_run DESC, app_id, " - + "type, entity_id, timestamp DESC, event_id))"; - stmt.executeUpdate(sql); - sql = "CREATE TABLE IF NOT EXISTS " + METRIC_TABLE_NAME - + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, " - + "flow_name VARCHAR NOT NULL, flow_version VARCHAR NOT NULL, " - + "flow_run UNSIGNED_LONG NOT NULL, " - + "app_id VARCHAR NOT NULL, type VARCHAR NOT NULL, " - + "entity_id VARCHAR NOT NULL, " - + "metric_id VARCHAR NOT NULL, " - + "singledata VARBINARY, " - + "time UNSIGNED_LONG " - + "CONSTRAINT pk PRIMARY KEY(" - + "user, cluster, flow_name, flow_version, flow_run DESC, app_id, " - + "type, entity_id, metric_id))"; - stmt.executeUpdate(sql); - conn.commit(); - } catch (SQLException se) { - LOG.error("Failed in init data " + se.getLocalizedMessage()); - throw se; - } - return; - } - - private static class DynamicColumns { - static final String COLUMN_FAMILY_TYPE_BYTES = " VARBINARY"; - static final String COLUMN_FAMILY_TYPE_STRING = " VARCHAR"; - String columnFamilyPrefix; - String type; - Set columns; - - public DynamicColumns(String columnFamilyPrefix, String type, - Set keyValues) { - this.columnFamilyPrefix = columnFamilyPrefix; - this.columns = keyValues; - this.type = type; - } - } - - private static StringBuilder appendColumnsSQL( - StringBuilder colNames, DynamicColumns cfInfo) { - // Prepare the sql template by iterating through all keys - for (K key : cfInfo.columns) { - colNames.append(",").append(cfInfo.columnFamilyPrefix) - .append(key.toString()).append(cfInfo.type); - } - return colNames; - } - - private static int setValuesForColumnFamily( - PreparedStatement ps, Map keyValues, int startPos, - boolean converToBytes) throws SQLException { - int idx = startPos; - for (Map.Entry entry : keyValues.entrySet()) { - V value = entry.getValue(); - if (value instanceof Collection) { - ps.setString(idx++, StringUtils.join( - (Collection) value, PHOENIX_STORAGE_SEPARATOR)); - } else { - if (converToBytes) { - try { - ps.setBytes(idx++, GenericObjectMapper.write(entry.getValue())); - } catch (IOException ie) { - LOG.error("Exception in converting values into bytes " - + ie.getMessage()); - throw new SQLException(ie); - } - } else { - ps.setString(idx++, value.toString()); - } - } - } - return idx; - } - - private static int setBytesForColumnFamily( - PreparedStatement ps, Map keyValues, int startPos) - throws SQLException { - return setValuesForColumnFamily(ps, keyValues, startPos, true); - } - - private static int setStringsForColumnFamily( - PreparedStatement ps, Map keyValues, int startPos) - throws SQLException { - return setValuesForColumnFamily(ps, keyValues, startPos, false); - } - - private static int setStringsForPrimaryKey(PreparedStatement ps, - TimelineCollectorContext context, TimelineEntity entity, int startPos) - throws SQLException { - int idx = startPos; - ps.setString(idx++, context.getClusterId()); - ps.setString(idx++, context.getUserId()); - ps.setString(idx++, - context.getFlowName()); - ps.setString(idx++, context.getFlowVersion()); - ps.setLong(idx++, context.getFlowRunId()); - ps.setString(idx++, context.getAppId()); - ps.setString(idx++, entity.getType()); - ps.setString(idx++, entity.getId()); - return idx; - } - - private static void storeEntityVariableLengthFields(TimelineEntity entity, - TimelineCollectorContext context, Connection conn) throws SQLException { - int numPlaceholders = 0; - StringBuilder columnDefs = new StringBuilder( - StringUtils.join(PHOENIX_STORAGE_PK_LIST, ",")); - if (entity.getConfigs() != null) { - Set keySet = entity.getConfigs().keySet(); - appendColumnsSQL(columnDefs, new DynamicColumns<>( - CONFIG_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_STRING, - keySet)); - numPlaceholders += keySet.size(); - } - if (entity.getInfo() != null) { - Set keySet = entity.getInfo().keySet(); - appendColumnsSQL(columnDefs, new DynamicColumns<>( - INFO_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_BYTES, - keySet)); - numPlaceholders += keySet.size(); - } - if (entity.getIsRelatedToEntities() != null) { - Set keySet = entity.getIsRelatedToEntities().keySet(); - appendColumnsSQL(columnDefs, new DynamicColumns<>( - IS_RELATED_TO_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_STRING, - keySet)); - numPlaceholders += keySet.size(); - } - if (entity.getRelatesToEntities() != null) { - Set keySet = entity.getRelatesToEntities().keySet(); - appendColumnsSQL(columnDefs, new DynamicColumns<>( - RELATES_TO_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_STRING, - keySet)); - numPlaceholders += keySet.size(); - } - if (numPlaceholders == 0) { - return; - } - StringBuilder placeholders = new StringBuilder(); - placeholders.append( - StringUtils.repeat("?,", PHOENIX_STORAGE_PK_LIST.length)); - // numPlaceholders >= 1 now - placeholders.append("?") - .append(StringUtils.repeat(",?", numPlaceholders - 1)); - String sqlVariableLengthFields = new StringBuilder("UPSERT INTO ") - .append(ENTITY_TABLE_NAME).append(" (").append(columnDefs) - .append(") VALUES(").append(placeholders).append(")").toString(); - if (LOG.isDebugEnabled()) { - LOG.debug("SQL statement for variable length fields: " - + sqlVariableLengthFields); - } - // Use try with resource statement for the prepared statement - try (PreparedStatement psVariableLengthFields = - conn.prepareStatement(sqlVariableLengthFields)) { - int idx = setStringsForPrimaryKey( - psVariableLengthFields, context, entity, 1); - if (entity.getConfigs() != null) { - idx = setStringsForColumnFamily( - psVariableLengthFields, entity.getConfigs(), idx); - } - if (entity.getInfo() != null) { - idx = setBytesForColumnFamily( - psVariableLengthFields, entity.getInfo(), idx); - } - if (entity.getIsRelatedToEntities() != null) { - idx = setStringsForColumnFamily( - psVariableLengthFields, entity.getIsRelatedToEntities(), idx); - } - if (entity.getRelatesToEntities() != null) { - idx = setStringsForColumnFamily( - psVariableLengthFields, entity.getRelatesToEntities(), idx); - } - psVariableLengthFields.execute(); - } - } - - private static void storeMetrics(TimelineEntity entity, - TimelineCollectorContext context, Connection conn) throws SQLException { - if (entity.getMetrics() == null) { - return; - } - Set metrics = entity.getMetrics(); - for (TimelineMetric metric : metrics) { - StringBuilder sqlColumns = new StringBuilder( - StringUtils.join(PHOENIX_STORAGE_PK_LIST, ",")); - sqlColumns.append(",") - .append(StringUtils.join(TIMELINE_METRIC_EXTRA_PK_LIST, ",")); - sqlColumns.append(",").append("singledata, time"); - StringBuilder placeholders = new StringBuilder(); - placeholders.append( - StringUtils.repeat("?,", PHOENIX_STORAGE_PK_LIST.length)) - .append(StringUtils.repeat("?,", TIMELINE_METRIC_EXTRA_PK_LIST.length)); - placeholders.append("?, ?"); - String sqlMetric = new StringBuilder("UPSERT INTO ") - .append(METRIC_TABLE_NAME).append(" (").append(sqlColumns) - .append(") VALUES(").append(placeholders).append(")").toString(); - if (LOG.isDebugEnabled()) { - LOG.debug("SQL statement for metric: " + sqlMetric); - } - try (PreparedStatement psMetrics = conn.prepareStatement(sqlMetric)) { - if (metric.getType().equals(TimelineMetric.Type.TIME_SERIES)) { - LOG.warn("The incoming timeline metric contains time series data, " - + "which is currently not supported by Phoenix storage. " - + "Time series will be truncated. "); - } - int idx = setStringsForPrimaryKey(psMetrics, context, entity, 1); - psMetrics.setString(idx++, metric.getId()); - Iterator> currNumIter = - metric.getValues().entrySet().iterator(); - if (currNumIter.hasNext()) { - // TODO: support time series storage - Map.Entry currEntry = currNumIter.next(); - psMetrics.setBytes(idx++, - GenericObjectMapper.write(currEntry.getValue())); - psMetrics.setLong(idx++, currEntry.getKey()); - } else { - psMetrics.setBytes(idx++, GenericObjectMapper.write(null)); - LOG.warn("The incoming metric contains an empty value set. "); - } - psMetrics.execute(); - } catch (IOException ie) { - LOG.error("Exception on converting single data to bytes: " - + ie.getMessage()); - throw new SQLException(ie); - } - } - } - - private static void storeEvents(TimelineEntity entity, - TimelineCollectorContext context, Connection conn) throws SQLException { - if (entity.getEvents() == null) { - return; - } - Set events = entity.getEvents(); - for (TimelineEvent event : events) { - // We need this number to check if the incoming event's info field is empty - int numPlaceholders = 0; - StringBuilder sqlColumns = new StringBuilder( - StringUtils.join(PHOENIX_STORAGE_PK_LIST, ",")); - sqlColumns.append(",") - .append(StringUtils.join(TIMELINE_EVENT_EXTRA_PK_LIST, ",")); - appendColumnsSQL(sqlColumns, new DynamicColumns<>( - EVENT_INFO_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_BYTES, - event.getInfo().keySet())); - numPlaceholders += event.getInfo().keySet().size(); - if (numPlaceholders == 0) { - continue; - } - StringBuilder placeholders = new StringBuilder(); - placeholders.append( - StringUtils.repeat("?,", PHOENIX_STORAGE_PK_LIST.length)) - .append(StringUtils.repeat("?,", TIMELINE_EVENT_EXTRA_PK_LIST.length)); - // numPlaceholders >= 1 now - placeholders.append("?") - .append(StringUtils.repeat(",?", numPlaceholders - 1)); - String sqlEvents = new StringBuilder("UPSERT INTO ") - .append(EVENT_TABLE_NAME).append(" (").append(sqlColumns) - .append(") VALUES(").append(placeholders).append(")").toString(); - if (LOG.isDebugEnabled()) { - LOG.debug("SQL statement for events: " + sqlEvents); - } - try (PreparedStatement psEvent = conn.prepareStatement(sqlEvents)) { - int idx = setStringsForPrimaryKey(psEvent, context, entity, 1); - psEvent.setLong(idx++, event.getTimestamp()); - psEvent.setString(idx++, event.getId()); - setBytesForColumnFamily(psEvent, event.getInfo(), idx); - psEvent.execute(); - } - } - } - - // WARNING: This method will permanently drop a table! - @Private - @VisibleForTesting - void dropTable(String tableName) throws Exception { - try (Connection conn = getConnection(); - Statement stmt = conn.createStatement()) { - String sql = "DROP TABLE " + tableName; - stmt.executeUpdate(sql); - } catch (SQLException se) { - LOG.error("Failed in dropping entity table " + se.getLocalizedMessage()); - throw se; - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fd93f15/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java index 3a22ed6..5120856 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.timelineservice.storage; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -51,6 +53,7 @@ public class TimelineSchemaCreator { final static String NAME = TimelineSchemaCreator.class.getSimpleName(); private static final Log LOG = LogFactory.getLog(TimelineSchemaCreator.class); + private static final String PHOENIX_OPTION_SHORT = "p"; public static void main(String[] args) throws Exception { @@ -83,7 +86,41 @@ public class TimelineSchemaCreator { hbaseConf.set(ApplicationTable.TABLE_NAME_CONF_NAME, applicationTableName); } - createAllTables(hbaseConf); + + List exceptions = new ArrayList<>(); + try { + createAllTables(hbaseConf); + LOG.info("Successfully created HBase schema. "); + } catch (IOException e) { + LOG.error("Error in creating hbase tables: " + e.getMessage()); + exceptions.add(e); + } + + // Create Phoenix data schema if needed + if (commandLine.hasOption(PHOENIX_OPTION_SHORT)) { + Configuration phoenixConf = new Configuration(); + try { + PhoenixOfflineAggregationWriterImpl phoenixWriter = + new PhoenixOfflineAggregationWriterImpl(); + phoenixWriter.init(phoenixConf); + phoenixWriter.start(); + phoenixWriter.createPhoenixTables(); + phoenixWriter.stop(); + LOG.info("Successfully created Phoenix offline aggregation schema. "); + } catch (IOException e) { + LOG.error("Error in creating phoenix tables: " + e.getMessage()); + exceptions.add(e); + } + } + if (exceptions.size() > 0) { + LOG.warn("Schema creation finished with the following exceptions"); + for (Exception e : exceptions) { + LOG.warn(e.getMessage()); + } + System.exit(-1); + } else { + LOG.info("Schema creation finished successfully"); + } } /** @@ -115,6 +152,12 @@ public class TimelineSchemaCreator { o.setRequired(false); options.addOption(o); + o = new Option(PHOENIX_OPTION_SHORT, "usePhoenix", false, + "create Phoenix offline aggregation tables"); + // No need to set arg name since we do not need an argument here + o.setRequired(false); + options.addOption(o); + CommandLineParser parser = new PosixParser(); CommandLine commandLine = null; try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fd93f15/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/OfflineAggregationInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/OfflineAggregationInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/OfflineAggregationInfo.java new file mode 100644 index 0000000..16c03a3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/OfflineAggregationInfo.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; + +import java.sql.PreparedStatement; +import java.sql.SQLException; + +/** + * Class to carry the offline aggregation information for storage level + * implementations. There are currently two predefined aggregation info + * instances that represent flow and user level offline aggregations. Depend on + * its implementation, a storage class may use an OfflineAggregationInfo object + * to decide behaviors dynamically. + */ +public final class OfflineAggregationInfo { + /** + * Default flow level aggregation table name + */ + @VisibleForTesting + public static final String FLOW_AGGREGATION_TABLE_NAME + = "yarn_timeline_flow_aggregation"; + /** + * Default user level aggregation table name + */ + public static final String USER_AGGREGATION_TABLE_NAME + = "yarn_timeline_user_aggregation"; + + // These lists are not taking effects in table creations. + private static final String[] FLOW_AGGREGATION_PK_LIST = + { "user", "cluster", "flow_name" }; + private static final String[] USER_AGGREGATION_PK_LIST = { "user", "cluster"}; + + private final String tableName; + private final String[] primaryKeyList; + private final PrimaryKeyStringSetter primaryKeyStringSetter; + + private OfflineAggregationInfo(String table, String[] pkList, + PrimaryKeyStringSetter formatter) { + tableName = table; + primaryKeyList = pkList; + primaryKeyStringSetter = formatter; + } + + private interface PrimaryKeyStringSetter { + int setValues(PreparedStatement ps, TimelineCollectorContext context, + String[] extraInfo, int startPos) throws SQLException; + } + + public String getTableName() { + return tableName; + } + + public String[] getPrimaryKeyList() { + return primaryKeyList.clone(); + } + + public int setStringsForPrimaryKey(PreparedStatement ps, + TimelineCollectorContext context, String[] extraInfo, int startPos) + throws SQLException { + return primaryKeyStringSetter.setValues(ps, context, extraInfo, startPos); + } + + public static final OfflineAggregationInfo FLOW_AGGREGATION = + new OfflineAggregationInfo(FLOW_AGGREGATION_TABLE_NAME, + FLOW_AGGREGATION_PK_LIST, new PrimaryKeyStringSetter() { + @Override + public int setValues(PreparedStatement ps, + TimelineCollectorContext context, String[] extraInfo, int startPos) + throws SQLException { + int idx = startPos; + ps.setString(idx++, context.getUserId()); + ps.setString(idx++, context.getClusterId()); + ps.setString(idx++, context.getFlowName()); + return idx; + } + }); + + public static final OfflineAggregationInfo USER_AGGREGATION = + new OfflineAggregationInfo(USER_AGGREGATION_TABLE_NAME, + USER_AGGREGATION_PK_LIST, new PrimaryKeyStringSetter() { + @Override + public int setValues(PreparedStatement ps, + TimelineCollectorContext context, String[] extraInfo, int startPos) + throws SQLException { + int idx = startPos; + ps.setString(idx++, context.getUserId()); + ps.setString(idx++, context.getClusterId()); + return idx; + } + }); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fd93f15/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java new file mode 100644 index 0000000..de66a17 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.apache.hadoop.hbase.IntegrationTestingUtility; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.apache.phoenix.hbase.index.write.IndexWriterUtils; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.ReadOnlyProps; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; + +public class TestPhoenixOfflineAggregationWriterImpl extends BaseTest { + private static PhoenixOfflineAggregationWriterImpl storage; + private static final int BATCH_SIZE = 3; + + @BeforeClass + public static void setup() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + storage = setupPhoenixClusterAndWriterForTest(conf); + } + + @Test(timeout = 90000) + public void testFlowLevelAggregationStorage() throws Exception { + testAggregator(OfflineAggregationInfo.FLOW_AGGREGATION); + } + + @Test(timeout = 90000) + public void testUserLevelAggregationStorage() throws Exception { + testAggregator(OfflineAggregationInfo.USER_AGGREGATION); + } + + @AfterClass + public static void cleanup() throws Exception { + storage.dropTable(OfflineAggregationInfo.FLOW_AGGREGATION_TABLE_NAME); + storage.dropTable(OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME); + tearDownMiniCluster(); + } + + private static PhoenixOfflineAggregationWriterImpl + setupPhoenixClusterAndWriterForTest(YarnConfiguration conf) + throws Exception{ + Map props = new HashMap<>(); + // Must update config before starting server + props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, + Boolean.FALSE.toString()); + props.put("java.security.krb5.realm", ""); + props.put("java.security.krb5.kdc", ""); + props.put(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, + Boolean.FALSE.toString()); + props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000)); + props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100)); + // Make a small batch size to test multiple calls to reserve sequences + props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB, + Long.toString(BATCH_SIZE)); + // Must update config before starting server + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + + // Change connection settings for test + conf.set( + YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR, + getUrl()); + PhoenixOfflineAggregationWriterImpl + myWriter = new PhoenixOfflineAggregationWriterImpl(TEST_PROPERTIES); + myWriter.init(conf); + myWriter.start(); + myWriter.createPhoenixTables(); + return myWriter; + } + + private static TimelineEntity getTestAggregationTimelineEntity() { + TimelineEntity entity = new TimelineEntity(); + String id = "hello1"; + String type = "testAggregationType"; + entity.setId(id); + entity.setType(type); + entity.setCreatedTime(1425016501000L); + entity.setModifiedTime(1425016502000L); + + TimelineMetric metric = new TimelineMetric(); + metric.setId("HDFS_BYTES_READ"); + metric.addValue(1425016501100L, 8000); + entity.addMetric(metric); + + return entity; + } + + private void testAggregator(OfflineAggregationInfo aggregationInfo) + throws Exception { + // Set up a list of timeline entities and write them back to Phoenix + int numEntity = 1; + TimelineEntities te = new TimelineEntities(); + te.addEntity(getTestAggregationTimelineEntity()); + TimelineCollectorContext context = new TimelineCollectorContext("cluster_1", + "user1", "testFlow", null, 0, null); + storage.writeAggregatedEntity(context, te, + aggregationInfo); + + // Verify if we're storing all entities + String[] primaryKeyList = aggregationInfo.getPrimaryKeyList(); + String sql = "SELECT COUNT(" + primaryKeyList[primaryKeyList.length - 1] + +") FROM " + aggregationInfo.getTableName(); + verifySQLWithCount(sql, numEntity, "Number of entities should be "); + // Check metric + sql = "SELECT COUNT(m.HDFS_BYTES_READ) FROM " + + aggregationInfo.getTableName() + "(m.HDFS_BYTES_READ VARBINARY) "; + verifySQLWithCount(sql, numEntity, + "Number of entities with info should be "); + } + + + private void verifySQLWithCount(String sql, int targetCount, String message) + throws Exception { + try ( + Statement stmt = + storage.getConnection().createStatement(); + ResultSet rs = stmt.executeQuery(sql)) { + assertTrue("Result set empty on statement " + sql, rs.next()); + assertNotNull("Fail to execute query " + sql, rs); + assertEquals(message + " " + targetCount, targetCount, rs.getInt(1)); + } catch (SQLException se) { + fail("SQL exception on query: " + sql + + " With exception message: " + se.getLocalizedMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fd93f15/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java deleted file mode 100644 index dece83d..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java +++ /dev/null @@ -1,152 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import org.apache.hadoop.hbase.IntegrationTestingUtility; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.apache.phoenix.hbase.index.write.IndexWriterUtils; -import org.apache.phoenix.query.BaseTest; -import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.util.PropertiesUtil; -import org.apache.phoenix.util.ReadOnlyProps; - -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.HashMap; -import java.util.Map; - -import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; - -public class TestPhoenixTimelineWriterImpl extends BaseTest { - private static PhoenixTimelineWriterImpl writer; - private static final int BATCH_SIZE = 3; - - @BeforeClass - public static void setup() throws Exception { - YarnConfiguration conf = new YarnConfiguration(); - writer = setupPhoenixClusterAndWriterForTest(conf); - } - - @Test(timeout = 90000) - public void testPhoenixWriterBasic() throws Exception { - // Set up a list of timeline entities and write them back to Phoenix - int numEntity = 12; - TimelineEntities te = - TestTimelineWriterImpl.getStandardTestTimelineEntities(numEntity); - writer.write("cluster_1", "user1", "testFlow", "version1", 1l, "app_test_1", te); - // Verify if we're storing all entities - String sql = "SELECT COUNT(entity_id) FROM " - + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME; - verifySQLWithCount(sql, numEntity, "Number of entities should be "); - // Check config (half of all entities) - sql = "SELECT COUNT(c.config) FROM " - + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME + "(c.config VARCHAR) "; - verifySQLWithCount(sql, (numEntity / 2), - "Number of entities with config should be "); - // Check info (half of all entities) - sql = "SELECT COUNT(i.info1) FROM " - + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME + "(i.info1 VARBINARY) "; - verifySQLWithCount(sql, (numEntity / 2), - "Number of entities with info should be "); - // Check config and info (a quarter of all entities) - sql = "SELECT COUNT(entity_id) FROM " - + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME - + "(c.config VARCHAR, i.info1 VARBINARY) " - + "WHERE c.config IS NOT NULL AND i.info1 IS NOT NULL"; - verifySQLWithCount(sql, (numEntity / 4), - "Number of entities with both config and info should be "); - // Check relatesToEntities and isRelatedToEntities - sql = "SELECT COUNT(entity_id) FROM " - + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME - + "(rt.testType VARCHAR, ir.testType VARCHAR) " - + "WHERE rt.testType IS NOT NULL AND ir.testType IS NOT NULL"; - verifySQLWithCount(sql, numEntity - 2, - "Number of entities with both relatesTo and isRelatedTo should be "); - // Check event - sql = "SELECT COUNT(entity_id) FROM " - + PhoenixTimelineWriterImpl.EVENT_TABLE_NAME; - verifySQLWithCount(sql, (numEntity / 4), "Number of events should be "); - // Check metrics - sql = "SELECT COUNT(entity_id) FROM " - + PhoenixTimelineWriterImpl.METRIC_TABLE_NAME; - verifySQLWithCount(sql, (numEntity / 4), "Number of events should be "); - } - - @AfterClass - public static void cleanup() throws Exception { - writer.dropTable(PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME); - writer.dropTable(PhoenixTimelineWriterImpl.EVENT_TABLE_NAME); - writer.dropTable(PhoenixTimelineWriterImpl.METRIC_TABLE_NAME); - writer.serviceStop(); - tearDownMiniCluster(); - } - - private static PhoenixTimelineWriterImpl setupPhoenixClusterAndWriterForTest( - YarnConfiguration conf) throws Exception{ - Map props = new HashMap<>(); - // Must update config before starting server - props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, - Boolean.FALSE.toString()); - props.put("java.security.krb5.realm", ""); - props.put("java.security.krb5.kdc", ""); - props.put(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, - Boolean.FALSE.toString()); - props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000)); - props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100)); - // Make a small batch size to test multiple calls to reserve sequences - props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB, - Long.toString(BATCH_SIZE)); - // Must update config before starting server - setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); - - PhoenixTimelineWriterImpl myWriter = new PhoenixTimelineWriterImpl(); - // Change connection settings for test - conf.set( - PhoenixTimelineWriterImpl.TIMELINE_SERVICE_PHOENIX_STORAGE_CONN_STR, - getUrl()); - myWriter.connProperties = PropertiesUtil.deepCopy(TEST_PROPERTIES); - myWriter.serviceInit(conf); - return myWriter; - } - - private void verifySQLWithCount(String sql, int targetCount, String message) - throws Exception { - try ( - Statement stmt = - writer.getConnection().createStatement(); - ResultSet rs = stmt.executeQuery(sql)) { - assertTrue("Result set empty on statement " + sql, rs.next()); - assertNotNull("Fail to execute query " + sql, rs); - assertEquals(message + " " + targetCount, targetCount, rs.getInt(1)); - } catch (SQLException se) { - fail("SQL exception on query: " + sql - + " With exception message: " + se.getLocalizedMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fd93f15/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterImpl.java deleted file mode 100644 index 7a7afc0..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterImpl.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; - -public class TestTimelineWriterImpl { - static TimelineEntities getStandardTestTimelineEntities(int listSize) { - TimelineEntities te = new TimelineEntities(); - for (int i = 0; i < listSize; i++) { - TimelineEntity entity = new TimelineEntity(); - String id = "hello" + i; - String type = "testType"; - entity.setId(id); - entity.setType(type); - entity.setCreatedTime(1425016501000L + i); - entity.setModifiedTime(1425016502000L + i); - if (i > 0) { - entity.addRelatesToEntity(type, "hello" + i); - entity.addRelatesToEntity(type, "hello" + (i - 1)); - } - if (i < listSize - 1) { - entity.addIsRelatedToEntity(type, "hello" + i); - entity.addIsRelatedToEntity(type, "hello" + (i + 1)); - } - int category = i % 4; - switch (category) { - case 0: - entity.addConfig("config", "config" + i); - // Fall through deliberately - case 1: - entity.addInfo("info1", new Integer(i)); - entity.addInfo("info2", "helloworld"); - // Fall through deliberately - case 2: - break; - case 3: - entity.addConfig("config", "config" + i); - TimelineEvent event = new TimelineEvent(); - event.setId("test event"); - event.setTimestamp(1425016501100L + i); - event.addInfo("test_info", "content for " + entity.getId()); - event.addInfo("test_info1", new Integer(i)); - entity.addEvent(event); - TimelineMetric metric = new TimelineMetric(); - metric.setId("HDFS_BYTES_READ"); - metric.addValue(1425016501100L + i, 8000 + i); - entity.addMetric(metric); - break; - } - te.addEntity(entity); - } - return te; - } -}