Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 67604200C10 for ; Fri, 20 Jan 2017 06:39:18 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 65FFD160B57; Fri, 20 Jan 2017 05:39:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3017B160B54 for ; Fri, 20 Jan 2017 06:39:16 +0100 (CET) Received: (qmail 39793 invoked by uid 500); 20 Jan 2017 05:39:13 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 38290 invoked by uid 99); 20 Jan 2017 05:39:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Jan 2017 05:39:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6960DF4040; Fri, 20 Jan 2017 05:39:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sjlee@apache.org To: common-commits@hadoop.apache.org Date: Fri, 20 Jan 2017 05:39:19 -0000 Message-Id: <8c5509c7e1884a60aaf394de06ca3c8c@git.apache.org> In-Reply-To: <1efa9515342a4c9898e60a1ae85951f3@git.apache.org> References: <1efa9515342a4c9898e60a1ae85951f3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/28] hadoop git commit: YARN-5928. Move ATSv2 HBase backend code into a new module that is only dependent at runtime by yarn servers. Contributed by Haibo Chen. archived-at: Fri, 20 Jan 2017 05:39:18 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java new file mode 100644 index 0000000..cedf96a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import java.io.IOException; +import java.util.EnumSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKeyPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; +import org.apache.hadoop.yarn.webapp.BadRequestException; + +import com.google.common.base.Preconditions; + +/** + * Timeline entity reader for flow run entities that are stored in the flow run + * table. + */ +class FlowRunEntityReader extends TimelineEntityReader { + private static final FlowRunTable FLOW_RUN_TABLE = new FlowRunTable(); + + public FlowRunEntityReader(TimelineReaderContext ctxt, + TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { + super(ctxt, entityFilters, toRetrieve); + } + + public FlowRunEntityReader(TimelineReaderContext ctxt, + TimelineDataToRetrieve toRetrieve) { + super(ctxt, toRetrieve); + } + + /** + * Uses the {@link FlowRunTable}. + */ + @Override + protected BaseTable getTable() { + return FLOW_RUN_TABLE; + } + + @Override + protected void validateParams() { + Preconditions.checkNotNull(getContext(), "context shouldn't be null"); + Preconditions.checkNotNull(getDataToRetrieve(), + "data to retrieve shouldn't be null"); + Preconditions.checkNotNull(getContext().getClusterId(), + "clusterId shouldn't be null"); + Preconditions.checkNotNull(getContext().getUserId(), + "userId shouldn't be null"); + Preconditions.checkNotNull(getContext().getFlowName(), + "flowName shouldn't be null"); + if (isSingleEntityRead()) { + Preconditions.checkNotNull(getContext().getFlowRunId(), + "flowRunId shouldn't be null"); + } + EnumSet fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); + if (!isSingleEntityRead() && fieldsToRetrieve != null) { + for (Field field : fieldsToRetrieve) { + if (field != Field.ALL && field != Field.METRICS) { + throw new BadRequestException("Invalid field " + field + + " specified while querying flow runs."); + } + } + } + } + + @Override + protected void augmentParams(Configuration hbaseConf, Connection conn) { + // Add metrics to fields to retrieve if metricsToRetrieve is specified. + getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve(); + if (!isSingleEntityRead()) { + createFiltersIfNull(); + } + } + + protected FilterList constructFilterListBasedOnFilters() throws IOException { + FilterList listBasedOnFilters = new FilterList(); + // Filter based on created time range. + Long createdTimeBegin = getFilters().getCreatedTimeBegin(); + Long createdTimeEnd = getFilters().getCreatedTimeEnd(); + if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) { + listBasedOnFilters.addFilter(TimelineFilterUtils + .createSingleColValueFiltersByRange(FlowRunColumn.MIN_START_TIME, + createdTimeBegin, createdTimeEnd)); + } + // Filter based on metric filters. + TimelineFilterList metricFilters = getFilters().getMetricFilters(); + if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) { + listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( + FlowRunColumnPrefix.METRIC, metricFilters)); + } + return listBasedOnFilters; + } + + /** + * Add {@link QualifierFilter} filters to filter list for each column of flow + * run table. + * + * @return filter list to which qualifier filters have been added. + */ + private FilterList updateFixedColumns() { + FilterList columnsList = new FilterList(Operator.MUST_PASS_ONE); + for (FlowRunColumn column : FlowRunColumn.values()) { + columnsList.addFilter(new QualifierFilter(CompareOp.EQUAL, + new BinaryComparator(column.getColumnQualifierBytes()))); + } + return columnsList; + } + + @Override + protected FilterList constructFilterListBasedOnFields() throws IOException { + FilterList list = new FilterList(Operator.MUST_PASS_ONE); + // By default fetch everything in INFO column family. + FamilyFilter infoColumnFamily = + new FamilyFilter(CompareOp.EQUAL, new BinaryComparator( + FlowRunColumnFamily.INFO.getBytes())); + TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); + // If multiple entities have to be retrieved, check if metrics have to be + // retrieved and if not, add a filter so that metrics can be excluded. + // Metrics are always returned if we are reading a single entity. + if (!isSingleEntityRead() + && !hasField(dataToRetrieve.getFieldsToRetrieve(), Field.METRICS)) { + FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE); + infoColFamilyList.addFilter(infoColumnFamily); + infoColFamilyList.addFilter(new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator(FlowRunColumnPrefix.METRIC + .getColumnPrefixBytes("")))); + list.addFilter(infoColFamilyList); + } else { + // Check if metricsToRetrieve are specified and if they are, create a + // filter list for info column family by adding flow run tables columns + // and a list for metrics to retrieve. Pls note that fieldsToRetrieve + // will have METRICS added to it if metricsToRetrieve are specified + // (in augmentParams()). + TimelineFilterList metricsToRetrieve = + dataToRetrieve.getMetricsToRetrieve(); + if (metricsToRetrieve != null + && !metricsToRetrieve.getFilterList().isEmpty()) { + FilterList infoColFamilyList = new FilterList(); + infoColFamilyList.addFilter(infoColumnFamily); + FilterList columnsList = updateFixedColumns(); + columnsList.addFilter(TimelineFilterUtils.createHBaseFilterList( + FlowRunColumnPrefix.METRIC, metricsToRetrieve)); + infoColFamilyList.addFilter(columnsList); + list.addFilter(infoColFamilyList); + } + } + return list; + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { + TimelineReaderContext context = getContext(); + FlowRunRowKey flowRunRowKey = + new FlowRunRowKey(context.getClusterId(), context.getUserId(), + context.getFlowName(), context.getFlowRunId()); + byte[] rowKey = flowRunRowKey.getRowKey(); + Get get = new Get(rowKey); + get.setMaxVersions(Integer.MAX_VALUE); + if (filterList != null && !filterList.getFilters().isEmpty()) { + get.setFilter(filterList); + } + return getTable().getResult(hbaseConf, conn, get); + } + + @Override + protected ResultScanner getResults(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { + Scan scan = new Scan(); + TimelineReaderContext context = getContext(); + RowKeyPrefix flowRunRowKeyPrefix = null; + if (getFilters().getFromId() == null) { + flowRunRowKeyPrefix = new FlowRunRowKeyPrefix(context.getClusterId(), + context.getUserId(), context.getFlowName()); + scan.setRowPrefixFilter(flowRunRowKeyPrefix.getRowKeyPrefix()); + } else { + + FlowRunRowKey flowRunRowKey = + new FlowRunRowKey(context.getClusterId(), context.getUserId(), + context.getFlowName(), Long.parseLong(getFilters().getFromId())); + + // set start row + scan.setStartRow(flowRunRowKey.getRowKey()); + + // get the bytes for stop row + flowRunRowKeyPrefix = new FlowRunRowKeyPrefix(context.getClusterId(), + context.getUserId(), context.getFlowName()); + + // set stop row + scan.setStopRow( + HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix( + flowRunRowKeyPrefix.getRowKeyPrefix())); + } + + FilterList newList = new FilterList(); + newList.addFilter(new PageFilter(getFilters().getLimit())); + if (filterList != null && !filterList.getFilters().isEmpty()) { + newList.addFilter(filterList); + } + scan.setFilter(newList); + scan.setMaxVersions(Integer.MAX_VALUE); + return getTable().getResultScanner(hbaseConf, conn, scan); + } + + @Override + protected TimelineEntity parseEntity(Result result) throws IOException { + TimelineReaderContext context = getContext(); + FlowRunEntity flowRun = new FlowRunEntity(); + flowRun.setUser(context.getUserId()); + flowRun.setName(context.getFlowName()); + if (isSingleEntityRead()) { + flowRun.setRunId(context.getFlowRunId()); + } else { + FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(result.getRow()); + flowRun.setRunId(rowKey.getFlowRunId()); + } + + // read the start time + Long startTime = (Long) FlowRunColumn.MIN_START_TIME.readResult(result); + if (startTime != null) { + flowRun.setStartTime(startTime.longValue()); + } + + // read the end time if available + Long endTime = (Long) FlowRunColumn.MAX_END_TIME.readResult(result); + if (endTime != null) { + flowRun.setMaxEndTime(endTime.longValue()); + } + + // read the flow version + String version = (String) FlowRunColumn.FLOW_VERSION.readResult(result); + if (version != null) { + flowRun.setVersion(version); + } + + // read metrics if its a single entity query or if METRICS are part of + // fieldsToRetrieve. + if (isSingleEntityRead() + || hasField(getDataToRetrieve().getFieldsToRetrieve(), Field.METRICS)) { + readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC); + } + + // set the id + flowRun.setId(flowRun.getId()); + return flowRun; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java new file mode 100644 index 0000000..f6904c5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java @@ -0,0 +1,628 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; + +import com.google.common.base.Preconditions; + +/** + * Timeline entity reader for generic entities that are stored in the entity + * table. + */ +class GenericEntityReader extends TimelineEntityReader { + private static final EntityTable ENTITY_TABLE = new EntityTable(); + + /** + * Used to convert strings key components to and from storage format. + */ + private final KeyConverter stringKeyConverter = + new StringKeyConverter(); + + public GenericEntityReader(TimelineReaderContext ctxt, + TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { + super(ctxt, entityFilters, toRetrieve); + } + + public GenericEntityReader(TimelineReaderContext ctxt, + TimelineDataToRetrieve toRetrieve) { + super(ctxt, toRetrieve); + } + + /** + * Uses the {@link EntityTable}. + */ + protected BaseTable getTable() { + return ENTITY_TABLE; + } + + @Override + protected FilterList constructFilterListBasedOnFilters() throws IOException { + // Filters here cannot be null for multiple entity reads as they are set in + // augmentParams if null. + FilterList listBasedOnFilters = new FilterList(); + TimelineEntityFilters filters = getFilters(); + // Create filter list based on created time range and add it to + // listBasedOnFilters. + long createdTimeBegin = filters.getCreatedTimeBegin(); + long createdTimeEnd = filters.getCreatedTimeEnd(); + if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) { + listBasedOnFilters.addFilter(TimelineFilterUtils + .createSingleColValueFiltersByRange(EntityColumn.CREATED_TIME, + createdTimeBegin, createdTimeEnd)); + } + // Create filter list based on metric filters and add it to + // listBasedOnFilters. + TimelineFilterList metricFilters = filters.getMetricFilters(); + if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) { + listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( + EntityColumnPrefix.METRIC, metricFilters)); + } + // Create filter list based on config filters and add it to + // listBasedOnFilters. + TimelineFilterList configFilters = filters.getConfigFilters(); + if (configFilters != null && !configFilters.getFilterList().isEmpty()) { + listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( + EntityColumnPrefix.CONFIG, configFilters)); + } + // Create filter list based on info filters and add it to listBasedOnFilters + TimelineFilterList infoFilters = filters.getInfoFilters(); + if (infoFilters != null && !infoFilters.getFilterList().isEmpty()) { + listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( + EntityColumnPrefix.INFO, infoFilters)); + } + return listBasedOnFilters; + } + + /** + * Check if we need to fetch only some of the event columns. + * + * @return true if we need to fetch some of the columns, false otherwise. + */ + private boolean fetchPartialEventCols(TimelineFilterList eventFilters, + EnumSet fieldsToRetrieve) { + return (eventFilters != null && !eventFilters.getFilterList().isEmpty() && + !hasField(fieldsToRetrieve, Field.EVENTS)); + } + + /** + * Check if we need to fetch only some of the relates_to columns. + * + * @return true if we need to fetch some of the columns, false otherwise. + */ + private boolean fetchPartialRelatesToCols(TimelineFilterList relatesTo, + EnumSet fieldsToRetrieve) { + return (relatesTo != null && !relatesTo.getFilterList().isEmpty() && + !hasField(fieldsToRetrieve, Field.RELATES_TO)); + } + + /** + * Check if we need to fetch only some of the is_related_to columns. + * + * @return true if we need to fetch some of the columns, false otherwise. + */ + private boolean fetchPartialIsRelatedToCols(TimelineFilterList isRelatedTo, + EnumSet fieldsToRetrieve) { + return (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty() && + !hasField(fieldsToRetrieve, Field.IS_RELATED_TO)); + } + + /** + * Check if we need to fetch only some of the columns based on event filters, + * relatesto and isrelatedto from info family. + * + * @return true, if we need to fetch only some of the columns, false if we + * need to fetch all the columns under info column family. + */ + protected boolean fetchPartialColsFromInfoFamily() { + EnumSet fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); + TimelineEntityFilters filters = getFilters(); + return fetchPartialEventCols(filters.getEventFilters(), fieldsToRetrieve) + || fetchPartialRelatesToCols(filters.getRelatesTo(), fieldsToRetrieve) + || fetchPartialIsRelatedToCols(filters.getIsRelatedTo(), + fieldsToRetrieve); + } + + /** + * Check if we need to create filter list based on fields. We need to create a + * filter list iff all fields need not be retrieved or we have some specific + * fields or metrics to retrieve. We also need to create a filter list if we + * have relationships(relatesTo/isRelatedTo) and event filters specified for + * the query. + * + * @return true if we need to create the filter list, false otherwise. + */ + protected boolean needCreateFilterListBasedOnFields() { + TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); + // Check if all fields are to be retrieved or not. If all fields have to + // be retrieved, also check if we have some metrics or configs to + // retrieve specified for the query because then a filter list will have + // to be created. + boolean flag = + !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) + || (dataToRetrieve.getConfsToRetrieve() != null && !dataToRetrieve + .getConfsToRetrieve().getFilterList().isEmpty()) + || (dataToRetrieve.getMetricsToRetrieve() != null && !dataToRetrieve + .getMetricsToRetrieve().getFilterList().isEmpty()); + // Filters need to be checked only if we are reading multiple entities. If + // condition above is false, we check if there are relationships(relatesTo/ + // isRelatedTo) and event filters specified for the query. + if (!flag && !isSingleEntityRead()) { + TimelineEntityFilters filters = getFilters(); + flag = + (filters.getEventFilters() != null && !filters.getEventFilters() + .getFilterList().isEmpty()) + || (filters.getIsRelatedTo() != null && !filters.getIsRelatedTo() + .getFilterList().isEmpty()) + || (filters.getRelatesTo() != null && !filters.getRelatesTo() + .getFilterList().isEmpty()); + } + return flag; + } + + /** + * Add {@link QualifierFilter} filters to filter list for each column of + * entity table. + * + * @param list filter list to which qualifier filters have to be added. + */ + protected void updateFixedColumns(FilterList list) { + for (EntityColumn column : EntityColumn.values()) { + list.addFilter(new QualifierFilter(CompareOp.EQUAL, new BinaryComparator( + column.getColumnQualifierBytes()))); + } + } + + /** + * Creates a filter list which indicates that only some of the column + * qualifiers in the info column family will be returned in result. + * + * @param isApplication If true, it means operations are to be performed for + * application table, otherwise for entity table. + * @return filter list. + * @throws IOException if any problem occurs while creating filter list. + */ + private FilterList createFilterListForColsOfInfoFamily() throws IOException { + FilterList infoFamilyColsFilter = new FilterList(Operator.MUST_PASS_ONE); + // Add filters for each column in entity table. + updateFixedColumns(infoFamilyColsFilter); + EnumSet fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); + // If INFO field has to be retrieved, add a filter for fetching columns + // with INFO column prefix. + if (hasField(fieldsToRetrieve, Field.INFO)) { + infoFamilyColsFilter + .addFilter(TimelineFilterUtils.createHBaseQualifierFilter( + CompareOp.EQUAL, EntityColumnPrefix.INFO)); + } + TimelineFilterList relatesTo = getFilters().getRelatesTo(); + if (hasField(fieldsToRetrieve, Field.RELATES_TO)) { + // If RELATES_TO field has to be retrieved, add a filter for fetching + // columns with RELATES_TO column prefix. + infoFamilyColsFilter.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.EQUAL, + EntityColumnPrefix.RELATES_TO)); + } else if (relatesTo != null && !relatesTo.getFilterList().isEmpty()) { + // Even if fields to retrieve does not contain RELATES_TO, we still + // need to have a filter to fetch some of the column qualifiers if + // relatesTo filters are specified. relatesTo filters will then be + // matched after fetching rows from HBase. + Set relatesToCols = + TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo); + infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( + EntityColumnPrefix.RELATES_TO, relatesToCols)); + } + TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo(); + if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { + // If IS_RELATED_TO field has to be retrieved, add a filter for fetching + // columns with IS_RELATED_TO column prefix. + infoFamilyColsFilter.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.EQUAL, + EntityColumnPrefix.IS_RELATED_TO)); + } else if (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty()) { + // Even if fields to retrieve does not contain IS_RELATED_TO, we still + // need to have a filter to fetch some of the column qualifiers if + // isRelatedTo filters are specified. isRelatedTo filters will then be + // matched after fetching rows from HBase. + Set isRelatedToCols = + TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo); + infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( + EntityColumnPrefix.IS_RELATED_TO, isRelatedToCols)); + } + TimelineFilterList eventFilters = getFilters().getEventFilters(); + if (hasField(fieldsToRetrieve, Field.EVENTS)) { + // If EVENTS field has to be retrieved, add a filter for fetching columns + // with EVENT column prefix. + infoFamilyColsFilter + .addFilter(TimelineFilterUtils.createHBaseQualifierFilter( + CompareOp.EQUAL, EntityColumnPrefix.EVENT)); + } else if (eventFilters != null && + !eventFilters.getFilterList().isEmpty()) { + // Even if fields to retrieve does not contain EVENTS, we still need to + // have a filter to fetch some of the column qualifiers on the basis of + // event filters specified. Event filters will then be matched after + // fetching rows from HBase. + Set eventCols = + TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters); + infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( + EntityColumnPrefix.EVENT, eventCols)); + } + return infoFamilyColsFilter; + } + + /** + * Exclude column prefixes via filters which are not required(based on fields + * to retrieve) from info column family. These filters are added to filter + * list which contains a filter for getting info column family. + * + * @param infoColFamilyList filter list for info column family. + */ + private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) { + EnumSet fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); + // Events not required. + if (!hasField(fieldsToRetrieve, Field.EVENTS)) { + infoColFamilyList.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.NOT_EQUAL, + EntityColumnPrefix.EVENT)); + } + // info not required. + if (!hasField(fieldsToRetrieve, Field.INFO)) { + infoColFamilyList.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.NOT_EQUAL, + EntityColumnPrefix.INFO)); + } + // is related to not required. + if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { + infoColFamilyList.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.NOT_EQUAL, + EntityColumnPrefix.IS_RELATED_TO)); + } + // relates to not required. + if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) { + infoColFamilyList.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.NOT_EQUAL, + EntityColumnPrefix.RELATES_TO)); + } + } + + /** + * Updates filter list based on fields for confs and metrics to retrieve. + * + * @param listBasedOnFields filter list based on fields. + * @throws IOException if any problem occurs while updating filter list. + */ + private void updateFilterForConfsAndMetricsToRetrieve( + FilterList listBasedOnFields) throws IOException { + TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); + // Please note that if confsToRetrieve is specified, we would have added + // CONFS to fields to retrieve in augmentParams() even if not specified. + if (dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS)) { + // Create a filter list for configs. + listBasedOnFields.addFilter(TimelineFilterUtils + .createFilterForConfsOrMetricsToRetrieve( + dataToRetrieve.getConfsToRetrieve(), EntityColumnFamily.CONFIGS, + EntityColumnPrefix.CONFIG)); + } + + // Please note that if metricsToRetrieve is specified, we would have added + // METRICS to fields to retrieve in augmentParams() even if not specified. + if (dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS)) { + // Create a filter list for metrics. + listBasedOnFields.addFilter(TimelineFilterUtils + .createFilterForConfsOrMetricsToRetrieve( + dataToRetrieve.getMetricsToRetrieve(), + EntityColumnFamily.METRICS, EntityColumnPrefix.METRIC)); + } + } + + @Override + protected FilterList constructFilterListBasedOnFields() throws IOException { + if (!needCreateFilterListBasedOnFields()) { + // Fetch all the columns. No need of a filter. + return null; + } + FilterList listBasedOnFields = new FilterList(Operator.MUST_PASS_ONE); + FilterList infoColFamilyList = new FilterList(); + // By default fetch everything in INFO column family. + FamilyFilter infoColumnFamily = + new FamilyFilter(CompareOp.EQUAL, new BinaryComparator( + EntityColumnFamily.INFO.getBytes())); + infoColFamilyList.addFilter(infoColumnFamily); + if (!isSingleEntityRead() && fetchPartialColsFromInfoFamily()) { + // We can fetch only some of the columns from info family. + infoColFamilyList.addFilter(createFilterListForColsOfInfoFamily()); + } else { + // Exclude column prefixes in info column family which are not required + // based on fields to retrieve. + excludeFieldsFromInfoColFamily(infoColFamilyList); + } + listBasedOnFields.addFilter(infoColFamilyList); + updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields); + return listBasedOnFields; + } + + @Override + protected void validateParams() { + Preconditions.checkNotNull(getContext(), "context shouldn't be null"); + Preconditions.checkNotNull(getDataToRetrieve(), + "data to retrieve shouldn't be null"); + Preconditions.checkNotNull(getContext().getClusterId(), + "clusterId shouldn't be null"); + Preconditions.checkNotNull(getContext().getAppId(), + "appId shouldn't be null"); + Preconditions.checkNotNull(getContext().getEntityType(), + "entityType shouldn't be null"); + if (isSingleEntityRead()) { + Preconditions.checkNotNull(getContext().getEntityId(), + "entityId shouldn't be null"); + } + } + + @Override + protected void augmentParams(Configuration hbaseConf, Connection conn) + throws IOException { + defaultAugmentParams(hbaseConf, conn); + // Add configs/metrics to fields to retrieve if confsToRetrieve and/or + // metricsToRetrieve are specified. + getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve(); + if (!isSingleEntityRead()) { + createFiltersIfNull(); + } + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { + TimelineReaderContext context = getContext(); + Result result = null; + if (context.getEntityIdPrefix() != null) { + byte[] rowKey = new EntityRowKey(context.getClusterId(), + context.getUserId(), context.getFlowName(), context.getFlowRunId(), + context.getAppId(), context.getEntityType(), + context.getEntityIdPrefix(), context.getEntityId()).getRowKey(); + + Get get = new Get(rowKey); + get.setMaxVersions(getDataToRetrieve().getMetricsLimit()); + if (filterList != null && !filterList.getFilters().isEmpty()) { + get.setFilter(filterList); + } + result = getTable().getResult(hbaseConf, conn, get); + + } else { + // Prepare for range scan + // create single SingleColumnValueFilter and add to existing filters. + FilterList filter = new FilterList(Operator.MUST_PASS_ALL); + if (filterList != null && !filterList.getFilters().isEmpty()) { + filter.addFilter(filterList); + } + FilterList newFilter = new FilterList(); + newFilter.addFilter(TimelineFilterUtils.createHBaseSingleColValueFilter( + EntityColumn.ID, context.getEntityId(), CompareOp.EQUAL)); + newFilter.addFilter(new PageFilter(1)); + filter.addFilter(newFilter); + + ResultScanner results = getResults(hbaseConf, conn, filter); + try { + Iterator iterator = results.iterator(); + if (iterator.hasNext()) { + result = iterator.next(); + } + } finally { + results.close(); + } + } + return result; + } + + @Override + protected ResultScanner getResults(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { + // Scan through part of the table to find the entities belong to one app + // and one type + Scan scan = new Scan(); + TimelineReaderContext context = getContext(); + RowKeyPrefix entityRowKeyPrefix = null; + // default mode, will always scans from beginning of entity type. + if (getFilters() == null || getFilters().getFromIdPrefix() == null) { + entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(), + context.getUserId(), context.getFlowName(), context.getFlowRunId(), + context.getAppId(), context.getEntityType(), null, null); + scan.setRowPrefixFilter(entityRowKeyPrefix.getRowKeyPrefix()); + } else { // pagination mode, will scan from given entityIdPrefix!enitityId + entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(), + context.getUserId(), context.getFlowName(), context.getFlowRunId(), + context.getAppId(), context.getEntityType(), + getFilters().getFromIdPrefix(), getFilters().getFromId()); + + // set start row + scan.setStartRow(entityRowKeyPrefix.getRowKeyPrefix()); + + // get the bytes for stop row + entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(), + context.getUserId(), context.getFlowName(), context.getFlowRunId(), + context.getAppId(), context.getEntityType(), null, null); + + // set stop row + scan.setStopRow( + HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix( + entityRowKeyPrefix.getRowKeyPrefix())); + + // set page filter to limit. This filter has to set only in pagination + // mode. + filterList.addFilter(new PageFilter(getFilters().getLimit())); + } + scan.setMaxVersions(getDataToRetrieve().getMetricsLimit()); + if (filterList != null && !filterList.getFilters().isEmpty()) { + scan.setFilter(filterList); + } + return getTable().getResultScanner(hbaseConf, conn, scan); + } + + @Override + protected TimelineEntity parseEntity(Result result) throws IOException { + if (result == null || result.isEmpty()) { + return null; + } + TimelineEntity entity = new TimelineEntity(); + EntityRowKey parseRowKey = EntityRowKey.parseRowKey(result.getRow()); + entity.setType(parseRowKey.getEntityType()); + entity.setId(parseRowKey.getEntityId()); + entity.setIdPrefix(parseRowKey.getEntityIdPrefix().longValue()); + + TimelineEntityFilters filters = getFilters(); + // fetch created time + Long createdTime = (Long) EntityColumn.CREATED_TIME.readResult(result); + entity.setCreatedTime(createdTime); + + EnumSet fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); + // fetch is related to entities and match isRelatedTo filter. If isRelatedTo + // filters do not match, entity would be dropped. We have to match filters + // locally as relevant HBase filters to filter out rows on the basis of + // isRelatedTo are not set in HBase scan. + boolean checkIsRelatedTo = + !isSingleEntityRead() && filters.getIsRelatedTo() != null + && filters.getIsRelatedTo().getFilterList().size() > 0; + if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || checkIsRelatedTo) { + readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true); + if (checkIsRelatedTo + && !TimelineStorageUtils.matchIsRelatedTo(entity, + filters.getIsRelatedTo())) { + return null; + } + if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { + entity.getIsRelatedToEntities().clear(); + } + } + + // fetch relates to entities and match relatesTo filter. If relatesTo + // filters do not match, entity would be dropped. We have to match filters + // locally as relevant HBase filters to filter out rows on the basis of + // relatesTo are not set in HBase scan. + boolean checkRelatesTo = + !isSingleEntityRead() && filters.getRelatesTo() != null + && filters.getRelatesTo().getFilterList().size() > 0; + if (hasField(fieldsToRetrieve, Field.RELATES_TO) + || checkRelatesTo) { + readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false); + if (checkRelatesTo + && !TimelineStorageUtils.matchRelatesTo(entity, + filters.getRelatesTo())) { + return null; + } + if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) { + entity.getRelatesToEntities().clear(); + } + } + + // fetch info if fieldsToRetrieve contains INFO or ALL. + if (hasField(fieldsToRetrieve, Field.INFO)) { + readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false); + } + + // fetch configs if fieldsToRetrieve contains CONFIGS or ALL. + if (hasField(fieldsToRetrieve, Field.CONFIGS)) { + readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true); + } + + // fetch events and match event filters if they exist. If event filters do + // not match, entity would be dropped. We have to match filters locally + // as relevant HBase filters to filter out rows on the basis of events + // are not set in HBase scan. + boolean checkEvents = + !isSingleEntityRead() && filters.getEventFilters() != null + && filters.getEventFilters().getFilterList().size() > 0; + if (hasField(fieldsToRetrieve, Field.EVENTS) || checkEvents) { + readEvents(entity, result, EntityColumnPrefix.EVENT); + if (checkEvents + && !TimelineStorageUtils.matchEventFilters(entity, + filters.getEventFilters())) { + return null; + } + if (!hasField(fieldsToRetrieve, Field.EVENTS)) { + entity.getEvents().clear(); + } + } + + // fetch metrics if fieldsToRetrieve contains METRICS or ALL. + if (hasField(fieldsToRetrieve, Field.METRICS)) { + readMetrics(entity, result, EntityColumnPrefix.METRIC); + } + return entity; + } + + /** + * Helper method for reading key-value pairs for either info or config. + * + * @param Describes the type of column prefix. + * @param entity entity to fill. + * @param result result from HBase. + * @param prefix column prefix. + * @param isConfig if true, means we are reading configs, otherwise info. + * @throws IOException if any problem is encountered while reading result. + */ + protected void readKeyValuePairs(TimelineEntity entity, Result result, + ColumnPrefix prefix, boolean isConfig) throws IOException { + // info and configuration are of type Map + Map columns = + prefix.readResults(result, stringKeyConverter); + if (isConfig) { + for (Map.Entry column : columns.entrySet()) { + entity.addConfig(column.getKey(), column.getValue().toString()); + } + } else { + entity.addInfo(columns); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java new file mode 100644 index 0000000..4c88cd3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java @@ -0,0 +1,458 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; + +/** + * The base class for reading and deserializing timeline entities from the + * HBase storage. Different types can be defined for different types of the + * entities that are being requested. + */ +public abstract class TimelineEntityReader extends + AbstractTimelineStorageReader { + private static final Log LOG = LogFactory.getLog(TimelineEntityReader.class); + + private final boolean singleEntityRead; + private TimelineDataToRetrieve dataToRetrieve; + // used only for multiple entity read mode + private TimelineEntityFilters filters; + + /** + * Main table the entity reader uses. + */ + private BaseTable table; + + /** + * Used to convert strings key components to and from storage format. + */ + private final KeyConverter stringKeyConverter = + new StringKeyConverter(); + + /** + * Instantiates a reader for multiple-entity reads. + * + * @param ctxt Reader context which defines the scope in which query has to be + * made. + * @param entityFilters Filters which limit the entities returned. + * @param toRetrieve Data to retrieve for each entity. + */ + protected TimelineEntityReader(TimelineReaderContext ctxt, + TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { + super(ctxt); + this.singleEntityRead = false; + this.dataToRetrieve = toRetrieve; + this.filters = entityFilters; + + this.setTable(getTable()); + } + + /** + * Instantiates a reader for single-entity reads. + * + * @param ctxt Reader context which defines the scope in which query has to be + * made. + * @param toRetrieve Data to retrieve for each entity. + */ + protected TimelineEntityReader(TimelineReaderContext ctxt, + TimelineDataToRetrieve toRetrieve) { + super(ctxt); + this.singleEntityRead = true; + this.dataToRetrieve = toRetrieve; + + this.setTable(getTable()); + } + + /** + * Creates a {@link FilterList} based on fields, confs and metrics to + * retrieve. This filter list will be set in Scan/Get objects to trim down + * results fetched from HBase back-end storage. This is called only for + * multiple entity reads. + * + * @return a {@link FilterList} object. + * @throws IOException if any problem occurs while creating filter list. + */ + protected abstract FilterList constructFilterListBasedOnFields() + throws IOException; + + /** + * Creates a {@link FilterList} based on info, config and metric filters. This + * filter list will be set in HBase Get to trim down results fetched from + * HBase back-end storage. + * + * @return a {@link FilterList} object. + * @throws IOException if any problem occurs while creating filter list. + */ + protected abstract FilterList constructFilterListBasedOnFilters() + throws IOException; + + /** + * Combines filter lists created based on fields and based on filters. + * + * @return a {@link FilterList} object if it can be constructed. Returns null, + * if filter list cannot be created either on the basis of filters or on the + * basis of fields. + * @throws IOException if any problem occurs while creating filter list. + */ + private FilterList createFilterList() throws IOException { + FilterList listBasedOnFilters = constructFilterListBasedOnFilters(); + boolean hasListBasedOnFilters = listBasedOnFilters != null && + !listBasedOnFilters.getFilters().isEmpty(); + FilterList listBasedOnFields = constructFilterListBasedOnFields(); + boolean hasListBasedOnFields = listBasedOnFields != null && + !listBasedOnFields.getFilters().isEmpty(); + // If filter lists based on both filters and fields can be created, + // combine them in a new filter list and return it. + // If either one of them has been created, return that filter list. + // Return null, if none of the filter lists can be created. This indicates + // that no filter list needs to be added to HBase Scan as filters are not + // specified for the query or only the default view of entity needs to be + // returned. + if (hasListBasedOnFilters && hasListBasedOnFields) { + FilterList list = new FilterList(); + list.addFilter(listBasedOnFilters); + list.addFilter(listBasedOnFields); + return list; + } else if (hasListBasedOnFilters) { + return listBasedOnFilters; + } else if (hasListBasedOnFields) { + return listBasedOnFields; + } + return null; + } + + protected TimelineDataToRetrieve getDataToRetrieve() { + return dataToRetrieve; + } + + protected TimelineEntityFilters getFilters() { + return filters; + } + + /** + * Create a {@link TimelineEntityFilters} object with default values for + * filters. + */ + protected void createFiltersIfNull() { + if (filters == null) { + filters = new TimelineEntityFilters(); + } + } + + /** + * Reads and deserializes a single timeline entity from the HBase storage. + * + * @param hbaseConf HBase Configuration. + * @param conn HBase Connection. + * @return A TimelineEntity object. + * @throws IOException if there is any exception encountered while reading + * entity. + */ + public TimelineEntity readEntity(Configuration hbaseConf, Connection conn) + throws IOException { + validateParams(); + augmentParams(hbaseConf, conn); + + FilterList filterList = constructFilterListBasedOnFields(); + if (LOG.isDebugEnabled() && filterList != null) { + LOG.debug("FilterList created for get is - " + filterList); + } + Result result = getResult(hbaseConf, conn, filterList); + if (result == null || result.isEmpty()) { + // Could not find a matching row. + LOG.info("Cannot find matching entity of type " + + getContext().getEntityType()); + return null; + } + return parseEntity(result); + } + + /** + * Reads and deserializes a set of timeline entities from the HBase storage. + * It goes through all the results available, and returns the number of + * entries as specified in the limit in the entity's natural sort order. + * + * @param hbaseConf HBase Configuration. + * @param conn HBase Connection. + * @return a set of TimelineEntity objects. + * @throws IOException if any exception is encountered while reading entities. + */ + public Set readEntities(Configuration hbaseConf, + Connection conn) throws IOException { + validateParams(); + augmentParams(hbaseConf, conn); + + Set entities = new LinkedHashSet<>(); + FilterList filterList = createFilterList(); + if (LOG.isDebugEnabled() && filterList != null) { + LOG.debug("FilterList created for scan is - " + filterList); + } + ResultScanner results = getResults(hbaseConf, conn, filterList); + try { + for (Result result : results) { + TimelineEntity entity = parseEntity(result); + if (entity == null) { + continue; + } + entities.add(entity); + if (entities.size() == filters.getLimit()) { + break; + } + } + return entities; + } finally { + results.close(); + } + } + + /** + * Returns the main table to be used by the entity reader. + * + * @return A reference to the table. + */ + protected BaseTable getTable() { + return table; + } + + /** + * Fetches a {@link Result} instance for a single-entity read. + * + * @param hbaseConf HBase Configuration. + * @param conn HBase Connection. + * @param filterList filter list which will be applied to HBase Get. + * @return the {@link Result} instance or null if no such record is found. + * @throws IOException if any exception is encountered while getting result. + */ + protected abstract Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException; + + /** + * Fetches a {@link ResultScanner} for a multi-entity read. + * + * @param hbaseConf HBase Configuration. + * @param conn HBase Connection. + * @param filterList filter list which will be applied to HBase Scan. + * @return the {@link ResultScanner} instance. + * @throws IOException if any exception is encountered while getting results. + */ + protected abstract ResultScanner getResults(Configuration hbaseConf, + Connection conn, FilterList filterList) throws IOException; + + /** + * Parses the result retrieved from HBase backend and convert it into a + * {@link TimelineEntity} object. + * + * @param result Single row result of a Get/Scan. + * @return the TimelineEntity instance or null if the entity is + * filtered. + * @throws IOException if any exception is encountered while parsing entity. + */ + protected abstract TimelineEntity parseEntity(Result result) + throws IOException; + + /** + * Helper method for reading and deserializing {@link TimelineMetric} objects + * using the specified column prefix. The timeline metrics then are added to + * the given timeline entity. + * + * @param entity {@link TimelineEntity} object. + * @param result {@link Result} object retrieved from backend. + * @param columnPrefix Metric column prefix + * @throws IOException if any exception is encountered while reading metrics. + */ + protected void readMetrics(TimelineEntity entity, Result result, + ColumnPrefix columnPrefix) throws IOException { + NavigableMap> metricsResult = + columnPrefix.readResultsWithTimestamps( + result, stringKeyConverter); + for (Map.Entry> metricResult: + metricsResult.entrySet()) { + TimelineMetric metric = new TimelineMetric(); + metric.setId(metricResult.getKey()); + // Simply assume that if the value set contains more than 1 elements, the + // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric + TimelineMetric.Type metricType = metricResult.getValue().size() > 1 ? + TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE; + metric.setType(metricType); + metric.addValues(metricResult.getValue()); + entity.addMetric(metric); + } + } + + /** + * Checks whether the reader has been created to fetch single entity or + * multiple entities. + * + * @return true, if query is for single entity, false otherwise. + */ + public boolean isSingleEntityRead() { + return singleEntityRead; + } + + protected void setTable(BaseTable baseTable) { + this.table = baseTable; + } + + /** + * Check if we have a certain field amongst fields to retrieve. This method + * checks against {@link Field#ALL} as well because that would mean field + * passed needs to be matched. + * + * @param fieldsToRetrieve fields to be retrieved. + * @param requiredField fields to be checked in fieldsToRetrieve. + * @return true if has the required field, false otherwise. + */ + protected boolean hasField(EnumSet fieldsToRetrieve, + Field requiredField) { + return fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(requiredField); + } + + /** + * Create a filter list of qualifier filters based on passed set of columns. + * + * @param Describes the type of column prefix. + * @param colPrefix Column Prefix. + * @param columns set of column qualifiers. + * @return filter list. + */ + protected FilterList createFiltersFromColumnQualifiers( + ColumnPrefix colPrefix, Set columns) { + FilterList list = new FilterList(Operator.MUST_PASS_ONE); + for (String column : columns) { + // For columns which have compound column qualifiers (eg. events), we need + // to include the required separator. + byte[] compoundColQual = createColQualifierPrefix(colPrefix, column); + list.addFilter(new QualifierFilter(CompareOp.EQUAL, + new BinaryPrefixComparator(colPrefix + .getColumnPrefixBytes(compoundColQual)))); + } + return list; + } + + protected byte[] createColQualifierPrefix(ColumnPrefix colPrefix, + String column) { + if (colPrefix == ApplicationColumnPrefix.EVENT + || colPrefix == EntityColumnPrefix.EVENT) { + return new EventColumnName(column, null, null).getColumnQualifier(); + } else { + return stringKeyConverter.encode(column); + } + } + + /** + * Helper method for reading relationship. + * + * @param Describes the type of column prefix. + * @param entity entity to fill. + * @param result result from HBase. + * @param prefix column prefix. + * @param isRelatedTo if true, means relationship is to be added to + * isRelatedTo, otherwise its added to relatesTo. + * @throws IOException if any problem is encountered while reading result. + */ + protected void readRelationship(TimelineEntity entity, Result result, + ColumnPrefix prefix, boolean isRelatedTo) throws IOException { + // isRelatedTo and relatesTo are of type Map> + Map columns = + prefix.readResults(result, stringKeyConverter); + for (Map.Entry column : columns.entrySet()) { + for (String id : Separator.VALUES.splitEncoded(column.getValue() + .toString())) { + if (isRelatedTo) { + entity.addIsRelatedToEntity(column.getKey(), id); + } else { + entity.addRelatesToEntity(column.getKey(), id); + } + } + } + } + + /** + * Read events from the entity table or the application table. The column name + * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted + * if there is no info associated with the event. + * + * @param Describes the type of column prefix. + * @param entity entity to fill. + * @param result HBase Result. + * @param prefix column prefix. + * @throws IOException if any problem is encountered while reading result. + */ + protected static void readEvents(TimelineEntity entity, Result result, + ColumnPrefix prefix) throws IOException { + Map eventsMap = new HashMap<>(); + Map eventsResult = + prefix.readResults(result, new EventColumnNameConverter()); + for (Map.Entry + eventResult : eventsResult.entrySet()) { + EventColumnName eventColumnName = eventResult.getKey(); + String key = eventColumnName.getId() + + Long.toString(eventColumnName.getTimestamp()); + // Retrieve previously seen event to add to it + TimelineEvent event = eventsMap.get(key); + if (event == null) { + // First time we're seeing this event, add it to the eventsMap + event = new TimelineEvent(); + event.setId(eventColumnName.getId()); + event.setTimestamp(eventColumnName.getTimestamp()); + eventsMap.put(key, event); + } + if (eventColumnName.getInfoKey() != null) { + event.addInfo(eventColumnName.getInfoKey(), eventResult.getValue()); + } + } + Set eventsSet = new HashSet<>(eventsMap.values()); + entity.addEvents(eventsSet); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java new file mode 100644 index 0000000..16fffa4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; + +/** + * Factory methods for instantiating a timeline entity reader. + */ +public final class TimelineEntityReaderFactory { + private TimelineEntityReaderFactory() { + } + + /** + * Creates a timeline entity reader instance for reading a single entity with + * the specified input. + * + * @param context Reader context which defines the scope in which query has to + * be made. + * @param dataToRetrieve Data to retrieve for each entity. + * @return An implementation of TimelineEntityReader object + * depending on entity type. + */ + public static TimelineEntityReader createSingleEntityReader( + TimelineReaderContext context, TimelineDataToRetrieve dataToRetrieve) { + // currently the types that are handled separate from the generic entity + // table are application, flow run, and flow activity entities + if (TimelineEntityType.YARN_APPLICATION.matches(context.getEntityType())) { + return new ApplicationEntityReader(context, dataToRetrieve); + } else if (TimelineEntityType. + YARN_FLOW_RUN.matches(context.getEntityType())) { + return new FlowRunEntityReader(context, dataToRetrieve); + } else if (TimelineEntityType. + YARN_FLOW_ACTIVITY.matches(context.getEntityType())) { + return new FlowActivityEntityReader(context, dataToRetrieve); + } else { + // assume we're dealing with a generic entity read + return new GenericEntityReader(context, dataToRetrieve); + } + } + + /** + * Creates a timeline entity reader instance for reading set of entities with + * the specified input and predicates. + * + * @param context Reader context which defines the scope in which query has to + * be made. + * @param filters Filters which limit the entities returned. + * @param dataToRetrieve Data to retrieve for each entity. + * @return An implementation of TimelineEntityReader object + * depending on entity type. + */ + public static TimelineEntityReader createMultipleEntitiesReader( + TimelineReaderContext context, TimelineEntityFilters filters, + TimelineDataToRetrieve dataToRetrieve) { + // currently the types that are handled separate from the generic entity + // table are application, flow run, and flow activity entities + if (TimelineEntityType.YARN_APPLICATION.matches(context.getEntityType())) { + return new ApplicationEntityReader(context, filters, dataToRetrieve); + } else if (TimelineEntityType. + YARN_FLOW_ACTIVITY.matches(context.getEntityType())) { + return new FlowActivityEntityReader(context, filters, dataToRetrieve); + } else if (TimelineEntityType. + YARN_FLOW_RUN.matches(context.getEntityType())) { + return new FlowRunEntityReader(context, filters, dataToRetrieve); + } else { + // assume we're dealing with a generic entity read + return new GenericEntityReader(context, filters, dataToRetrieve); + } + } + + /** + * Creates a timeline entity type reader that will read all available entity + * types within the specified context. + * + * @param context Reader context which defines the scope in which query has to + * be made. Limited to application level only. + * @return an EntityTypeReader object + */ + public static EntityTypeReader createEntityTypeReader( + TimelineReaderContext context) { + return new EntityTypeReader(context); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java new file mode 100644 index 0000000..9814d6d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.timelineservice.storage.reader + * contains classes used to read entities from backend based on query type. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java new file mode 100644 index 0000000..58df970 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.junit.Test; + +public class TestKeyConverters { + + @Test + public void testAppIdKeyConverter() { + AppIdKeyConverter appIdKeyConverter = new AppIdKeyConverter(); + long currentTs = System.currentTimeMillis(); + ApplicationId appId1 = ApplicationId.newInstance(currentTs, 1); + ApplicationId appId2 = ApplicationId.newInstance(currentTs, 2); + ApplicationId appId3 = ApplicationId.newInstance(currentTs + 300, 1); + String appIdStr1 = appId1.toString(); + String appIdStr2 = appId2.toString(); + String appIdStr3 = appId3.toString(); + byte[] appIdBytes1 = appIdKeyConverter.encode(appIdStr1); + byte[] appIdBytes2 = appIdKeyConverter.encode(appIdStr2); + byte[] appIdBytes3 = appIdKeyConverter.encode(appIdStr3); + // App ids' should be encoded in a manner wherein descending order + // is maintained. + assertTrue( + "Ordering of app ids' is incorrect", + Bytes.compareTo(appIdBytes1, appIdBytes2) > 0 + && Bytes.compareTo(appIdBytes1, appIdBytes3) > 0 + && Bytes.compareTo(appIdBytes2, appIdBytes3) > 0); + String decodedAppId1 = appIdKeyConverter.decode(appIdBytes1); + String decodedAppId2 = appIdKeyConverter.decode(appIdBytes2); + String decodedAppId3 = appIdKeyConverter.decode(appIdBytes3); + assertTrue("Decoded app id is not same as the app id encoded", + appIdStr1.equals(decodedAppId1)); + assertTrue("Decoded app id is not same as the app id encoded", + appIdStr2.equals(decodedAppId2)); + assertTrue("Decoded app id is not same as the app id encoded", + appIdStr3.equals(decodedAppId3)); + } + + @Test + public void testEventColumnNameConverter() { + String eventId = "=foo_=eve=nt="; + byte[] valSepBytes = Bytes.toBytes(Separator.VALUES.getValue()); + byte[] maxByteArr = + Bytes.createMaxByteArray(Bytes.SIZEOF_LONG - valSepBytes.length); + byte[] ts = Bytes.add(valSepBytes, maxByteArr); + Long eventTs = Bytes.toLong(ts); + byte[] byteEventColName = + new EventColumnName(eventId, eventTs, null).getColumnQualifier(); + KeyConverter eventColumnNameConverter = + new EventColumnNameConverter(); + EventColumnName eventColName = + eventColumnNameConverter.decode(byteEventColName); + assertEquals(eventId, eventColName.getId()); + assertEquals(eventTs, eventColName.getTimestamp()); + assertNull(eventColName.getInfoKey()); + + String infoKey = "f=oo_event_in=fo=_key"; + byteEventColName = + new EventColumnName(eventId, eventTs, infoKey).getColumnQualifier(); + eventColName = eventColumnNameConverter.decode(byteEventColName); + assertEquals(eventId, eventColName.getId()); + assertEquals(eventTs, eventColName.getTimestamp()); + assertEquals(infoKey, eventColName.getInfoKey()); + } + + @Test + public void testLongKeyConverter() { + LongKeyConverter longKeyConverter = new LongKeyConverter(); + confirmLongKeyConverter(longKeyConverter, Long.MIN_VALUE); + confirmLongKeyConverter(longKeyConverter, -1234567890L); + confirmLongKeyConverter(longKeyConverter, -128L); + confirmLongKeyConverter(longKeyConverter, -127L); + confirmLongKeyConverter(longKeyConverter, -1L); + confirmLongKeyConverter(longKeyConverter, 0L); + confirmLongKeyConverter(longKeyConverter, 1L); + confirmLongKeyConverter(longKeyConverter, 127L); + confirmLongKeyConverter(longKeyConverter, 128L); + confirmLongKeyConverter(longKeyConverter, 1234567890L); + confirmLongKeyConverter(longKeyConverter, Long.MAX_VALUE); + } + + private void confirmLongKeyConverter(LongKeyConverter longKeyConverter, + Long testValue) { + Long decoded = longKeyConverter.decode(longKeyConverter.encode(testValue)); + assertEquals(testValue, decoded); + } + + @Test + public void testStringKeyConverter() { + StringKeyConverter stringKeyConverter = new StringKeyConverter(); + String phrase = "QuackAttack now!"; + + for (int i = 0; i < phrase.length(); i++) { + String sub = phrase.substring(i, phrase.length()); + confirmStrignKeyConverter(stringKeyConverter, sub); + confirmStrignKeyConverter(stringKeyConverter, sub + sub); + } + } + + private void confirmStrignKeyConverter(StringKeyConverter stringKeyConverter, + String testValue) { + String decoded = + stringKeyConverter.decode(stringKeyConverter.encode(testValue)); + assertEquals(testValue, decoded); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java new file mode 100644 index 0000000..cbd2273 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; +import org.junit.Test; + + +public class TestRowKeys { + + private final static String QUALIFIER_SEP = Separator.QUALIFIERS.getValue(); + private final static byte[] QUALIFIER_SEP_BYTES = Bytes + .toBytes(QUALIFIER_SEP); + private final static String CLUSTER = "cl" + QUALIFIER_SEP + "uster"; + private final static String USER = QUALIFIER_SEP + "user"; + private final static String FLOW_NAME = "dummy_" + QUALIFIER_SEP + "flow" + + QUALIFIER_SEP; + private final static Long FLOW_RUN_ID; + private final static String APPLICATION_ID; + static { + long runid = Long.MAX_VALUE - 900L; + byte[] longMaxByteArr = Bytes.toBytes(Long.MAX_VALUE); + byte[] byteArr = Bytes.toBytes(runid); + int sepByteLen = QUALIFIER_SEP_BYTES.length; + if (sepByteLen <= byteArr.length) { + for (int i = 0; i < sepByteLen; i++) { + byteArr[i] = (byte) (longMaxByteArr[i] - QUALIFIER_SEP_BYTES[i]); + } + } + FLOW_RUN_ID = Bytes.toLong(byteArr); + long clusterTs = System.currentTimeMillis(); + byteArr = Bytes.toBytes(clusterTs); + if (sepByteLen <= byteArr.length) { + for (int i = 0; i < sepByteLen; i++) { + byteArr[byteArr.length - sepByteLen + i] = + (byte) (longMaxByteArr[byteArr.length - sepByteLen + i] - + QUALIFIER_SEP_BYTES[i]); + } + } + clusterTs = Bytes.toLong(byteArr); + int seqId = 222; + APPLICATION_ID = ApplicationId.newInstance(clusterTs, seqId).toString(); + } + + private static void verifyRowPrefixBytes(byte[] byteRowKeyPrefix) { + int sepLen = QUALIFIER_SEP_BYTES.length; + for (int i = 0; i < sepLen; i++) { + assertTrue( + "Row key prefix not encoded properly.", + byteRowKeyPrefix[byteRowKeyPrefix.length - sepLen + i] == + QUALIFIER_SEP_BYTES[i]); + } + } + + @Test + public void testApplicationRowKey() { + byte[] byteRowKey = + new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, + APPLICATION_ID).getRowKey(); + ApplicationRowKey rowKey = ApplicationRowKey.parseRowKey(byteRowKey); + assertEquals(CLUSTER, rowKey.getClusterId()); + assertEquals(USER, rowKey.getUserId()); + assertEquals(FLOW_NAME, rowKey.getFlowName()); + assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId()); + assertEquals(APPLICATION_ID, rowKey.getAppId()); + + byte[] byteRowKeyPrefix = + new ApplicationRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID) + .getRowKeyPrefix(); + byte[][] splits = + Separator.QUALIFIERS.split(byteRowKeyPrefix, + new int[] {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, + Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, + Separator.VARIABLE_SIZE}); + assertEquals(5, splits.length); + assertEquals(0, splits[4].length); + assertEquals(FLOW_NAME, + Separator.QUALIFIERS.decode(Bytes.toString(splits[2]))); + assertEquals(FLOW_RUN_ID, + (Long) LongConverter.invertLong(Bytes.toLong(splits[3]))); + verifyRowPrefixBytes(byteRowKeyPrefix); + + byteRowKeyPrefix = + new ApplicationRowKeyPrefix(CLUSTER, USER, FLOW_NAME).getRowKeyPrefix(); + splits = + Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE }); + assertEquals(4, splits.length); + assertEquals(0, splits[3].length); + assertEquals(FLOW_NAME, + Separator.QUALIFIERS.decode(Bytes.toString(splits[2]))); + verifyRowPrefixBytes(byteRowKeyPrefix); + } + + /** + * Tests the converters indirectly through the public methods of the + * corresponding rowkey. + */ + @Test + public void testAppToFlowRowKey() { + byte[] byteRowKey = new AppToFlowRowKey(APPLICATION_ID).getRowKey(); + AppToFlowRowKey rowKey = AppToFlowRowKey.parseRowKey(byteRowKey); + assertEquals(APPLICATION_ID, rowKey.getAppId()); + } + + @Test + public void testEntityRowKey() { + TimelineEntity entity = new TimelineEntity(); + entity.setId("!ent!ity!!id!"); + entity.setType("entity!Type"); + entity.setIdPrefix(54321); + + byte[] byteRowKey = + new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID, + entity.getType(), entity.getIdPrefix(), + entity.getId()).getRowKey(); + EntityRowKey rowKey = EntityRowKey.parseRowKey(byteRowKey); + assertEquals(CLUSTER, rowKey.getClusterId()); + assertEquals(USER, rowKey.getUserId()); + assertEquals(FLOW_NAME, rowKey.getFlowName()); + assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId()); + assertEquals(APPLICATION_ID, rowKey.getAppId()); + assertEquals(entity.getType(), rowKey.getEntityType()); + assertEquals(entity.getIdPrefix(), rowKey.getEntityIdPrefix().longValue()); + assertEquals(entity.getId(), rowKey.getEntityId()); + + byte[] byteRowKeyPrefix = + new EntityRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, + APPLICATION_ID, entity.getType(), null, null) + .getRowKeyPrefix(); + byte[][] splits = + Separator.QUALIFIERS.split( + byteRowKeyPrefix, + new int[] {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, + Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, + AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE, + Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE }); + assertEquals(7, splits.length); + assertEquals(APPLICATION_ID, new AppIdKeyConverter().decode(splits[4])); + assertEquals(entity.getType(), + Separator.QUALIFIERS.decode(Bytes.toString(splits[5]))); + verifyRowPrefixBytes(byteRowKeyPrefix); + + byteRowKeyPrefix = + new EntityRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, + APPLICATION_ID).getRowKeyPrefix(); + splits = + Separator.QUALIFIERS.split( + byteRowKeyPrefix, + new int[] {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, + Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, + AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE}); + assertEquals(6, splits.length); + assertEquals(0, splits[5].length); + AppIdKeyConverter appIdKeyConverter = new AppIdKeyConverter(); + assertEquals(APPLICATION_ID, appIdKeyConverter.decode(splits[4])); + verifyRowPrefixBytes(byteRowKeyPrefix); + } + + @Test + public void testFlowActivityRowKey() { + Long ts = 1459900830000L; + Long dayTimestamp = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(ts); + byte[] byteRowKey = + new FlowActivityRowKey(CLUSTER, ts, USER, FLOW_NAME).getRowKey(); + FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(byteRowKey); + assertEquals(CLUSTER, rowKey.getClusterId()); + assertEquals(dayTimestamp, rowKey.getDayTimestamp()); + assertEquals(USER, rowKey.getUserId()); + assertEquals(FLOW_NAME, rowKey.getFlowName()); + + byte[] byteRowKeyPrefix = + new FlowActivityRowKeyPrefix(CLUSTER).getRowKeyPrefix(); + byte[][] splits = + Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE }); + assertEquals(2, splits.length); + assertEquals(0, splits[1].length); + assertEquals(CLUSTER, + Separator.QUALIFIERS.decode(Bytes.toString(splits[0]))); + verifyRowPrefixBytes(byteRowKeyPrefix); + + byteRowKeyPrefix = + new FlowActivityRowKeyPrefix(CLUSTER, ts).getRowKeyPrefix(); + splits = + Separator.QUALIFIERS.split(byteRowKeyPrefix, + new int[] {Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, + Separator.VARIABLE_SIZE}); + assertEquals(3, splits.length); + assertEquals(0, splits[2].length); + assertEquals(CLUSTER, + Separator.QUALIFIERS.decode(Bytes.toString(splits[0]))); + assertEquals(ts, + (Long) LongConverter.invertLong(Bytes.toLong(splits[1]))); + verifyRowPrefixBytes(byteRowKeyPrefix); + } + + @Test + public void testFlowRunRowKey() { + byte[] byteRowKey = + new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID).getRowKey(); + FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(byteRowKey); + assertEquals(CLUSTER, rowKey.getClusterId()); + assertEquals(USER, rowKey.getUserId()); + assertEquals(FLOW_NAME, rowKey.getFlowName()); + assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId()); + + byte[] byteRowKeyPrefix = + new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, null).getRowKey(); + byte[][] splits = + Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE }); + assertEquals(4, splits.length); + assertEquals(0, splits[3].length); + assertEquals(FLOW_NAME, + Separator.QUALIFIERS.decode(Bytes.toString(splits[2]))); + verifyRowPrefixBytes(byteRowKeyPrefix); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org