Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 87840187BD for ; Wed, 20 Jan 2016 09:13:39 +0000 (UTC) Received: (qmail 30829 invoked by uid 500); 20 Jan 2016 09:13:23 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 28641 invoked by uid 500); 20 Jan 2016 09:13:21 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 24838 invoked by uid 99); 20 Jan 2016 09:13:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Jan 2016 09:13:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4BB37E38CD; Wed, 20 Jan 2016 09:13:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gtcarrera9@apache.org To: common-commits@hadoop.apache.org Date: Wed, 20 Jan 2016 09:13:52 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [35/50] [abbrv] hadoop git commit: YARN-3862. Support for fetching specific configs and metrics based on prefixes (Varun Saxena via sjlee) YARN-3862. Support for fetching specific configs and metrics based on prefixes (Varun Saxena via sjlee) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/76bc71cc Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/76bc71cc Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/76bc71cc Branch: refs/heads/feature-YARN-2928 Commit: 76bc71cc7abe776420223a78f2d885800410ddc6 Parents: 54a529d Author: Sangjin Lee Authored: Tue Dec 1 21:47:43 2015 -0800 Committer: Li Lu Committed: Tue Jan 19 17:59:39 2016 -0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../reader/TimelineReaderManager.java | 4 +- .../reader/filter/TimelineCompareFilter.java | 61 ++ .../reader/filter/TimelineCompareOp.java | 36 + .../reader/filter/TimelineFilter.java | 56 ++ .../reader/filter/TimelineFilterList.java | 91 +++ .../reader/filter/TimelineFilterUtils.java | 120 ++++ .../reader/filter/TimelinePrefixFilter.java | 56 ++ .../reader/filter/package-info.java | 28 + .../storage/ApplicationEntityReader.java | 123 +++- .../storage/FileSystemTimelineReaderImpl.java | 9 +- .../storage/FlowActivityEntityReader.java | 16 +- .../storage/FlowRunEntityReader.java | 69 +- .../storage/GenericEntityReader.java | 119 +++- .../storage/HBaseTimelineReaderImpl.java | 11 +- .../storage/TimelineEntityReader.java | 32 +- .../storage/TimelineEntityReaderFactory.java | 23 +- .../timelineservice/storage/TimelineReader.java | 32 + .../application/ApplicationColumnPrefix.java | 18 +- .../storage/common/ColumnPrefix.java | 29 +- .../storage/entity/EntityColumnPrefix.java | 18 +- .../storage/flow/FlowActivityColumnPrefix.java | 18 +- .../storage/flow/FlowRunColumnPrefix.java | 18 +- .../TestFileSystemTimelineReaderImpl.java | 42 +- .../storage/TestHBaseTimelineStorage.java | 682 +++++++++++++++++-- .../flow/TestHBaseStorageFlowActivity.java | 6 +- .../storage/flow/TestHBaseStorageFlowRun.java | 190 +++++- 27 files changed, 1761 insertions(+), 149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 7636317..78705e1 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -143,6 +143,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1 YARN-4053. Change the way metric values are stored in HBase Storage (Varun Saxena via sjlee) + YARN-3862. Support for fetching specific configs and metrics based on + prefixes (Varun Saxena via sjlee) + OPTIMIZATIONS BUG FIXES http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java index 27a50d5..294b05b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java @@ -77,7 +77,7 @@ public class TimelineReaderManager extends AbstractService { return reader.getEntities(userId, cluster, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, - metricFilters, eventFilters, fieldsToRetrieve); + metricFilters, eventFilters, null, null, fieldsToRetrieve); } /** @@ -91,6 +91,6 @@ public class TimelineReaderManager extends AbstractService { String entityId, EnumSet fields) throws IOException { String cluster = getClusterID(clusterId, getConfig()); return reader.getEntity(userId, cluster, flowId, flowRunId, appId, - entityType, entityId, fields); + entityType, entityId, null, null, fields); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareFilter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareFilter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareFilter.java new file mode 100644 index 0000000..14e7124 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareFilter.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * Filter class which represents filter to be applied based on key-value pair + * and the relation between them represented by different relational operators. + */ +@Private +@Unstable +public class TimelineCompareFilter extends TimelineFilter { + + private TimelineCompareOp compareOp; + private String key; + private Object value; + + public TimelineCompareFilter() { + } + + public TimelineCompareFilter(TimelineCompareOp op, String key, Object val) { + this.compareOp = op; + this.key = key; + this.value = val; + } + + @Override + public TimelineFilterType getFilterType() { + return TimelineFilterType.COMPARE; + } + + public TimelineCompareOp getCompareOp() { + return compareOp; + } + + public String getKey() { + return key; + } + + public Object getValue() { + return value; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareOp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareOp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareOp.java new file mode 100644 index 0000000..461a7d8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareOp.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * Comparison Operators. + */ +@Private +@Unstable +public enum TimelineCompareOp { + LESS_THAN, + LESS_OR_EQUAL, + EQUAL, + NOT_EQUAL, + GREATER_OR_EQUAL, + GREATER_THAN +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilter.java new file mode 100644 index 0000000..d4b4045 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilter.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * Abstract base class extended to implement timeline filters. + */ +@Private +@Unstable +public abstract class TimelineFilter { + + /** + * Lists the different filter types. + */ + @Private + @Unstable + public enum TimelineFilterType { + /** + * Combines multiple filters. + */ + LIST, + /** + * Filter which is used for comparison. + */ + COMPARE, + /** + * Filter which matches prefix for a config or a metric. + */ + PREFIX + } + + public abstract TimelineFilterType getFilterType(); + + public String toString() { + return this.getClass().getSimpleName(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterList.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterList.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterList.java new file mode 100644 index 0000000..8727bd7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterList.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * Implementation of {@link TimelineFilter} that represents an ordered list of + * timeline filters which will then be evaluated with a specified boolean + * operator {@link Operator#AND} or {@link Operator#OR}. Since you can use + * timeline filter lists as children of timeline filter lists, you can create a + * hierarchy of filters to be evaluated. + */ +@Private +@Unstable +public class TimelineFilterList extends TimelineFilter { + /** + * Specifies how filters in the filter list will be evaluated. AND means all + * the filters should match and OR means atleast one should match. + */ + @Private + @Unstable + public static enum Operator { + AND, + OR + } + + private Operator operator; + private List filterList = new ArrayList(); + + public TimelineFilterList(TimelineFilter...filters) { + this(Operator.AND, filters); + } + + public TimelineFilterList(Operator op, TimelineFilter...filters) { + this.operator = op; + this.filterList = new ArrayList(Arrays.asList(filters)); + } + + @Override + public TimelineFilterType getFilterType() { + return TimelineFilterType.LIST; + } + + /** + * Get the filter list. + * + * @return filterList + */ + public List getFilterList() { + return filterList; + } + + /** + * Get the operator. + * + * @return operator + */ + public Operator getOperator() { + return operator; + } + + public void setOperator(Operator op) { + operator = op; + } + + public void addFilter(TimelineFilter filter) { + filterList.add(filter); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java new file mode 100644 index 0000000..da3c383 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.hbase.filter.QualifierFilter; + +/** + * Set of utility methods used by timeline filter classes. + */ +public final class TimelineFilterUtils { + + private TimelineFilterUtils() { + } + + /** + * Returns the equivalent HBase filter list's {@link Operator}. + * @param op + * @return HBase filter list's Operator. + */ + private static Operator getHBaseOperator(TimelineFilterList.Operator op) { + switch (op) { + case AND: + return Operator.MUST_PASS_ALL; + case OR: + return Operator.MUST_PASS_ONE; + default: + throw new IllegalArgumentException("Invalid operator"); + } + } + + /** + * Returns the equivalent HBase compare filter's {@link CompareOp}. + * @param op + * @return HBase compare filter's CompareOp. + */ + private static CompareOp getHBaseCompareOp( + TimelineCompareOp op) { + switch (op) { + case LESS_THAN: + return CompareOp.LESS; + case LESS_OR_EQUAL: + return CompareOp.LESS_OR_EQUAL; + case EQUAL: + return CompareOp.EQUAL; + case NOT_EQUAL: + return CompareOp.NOT_EQUAL; + case GREATER_OR_EQUAL: + return CompareOp.GREATER_OR_EQUAL; + case GREATER_THAN: + return CompareOp.GREATER; + default: + throw new IllegalArgumentException("Invalid compare operator"); + } + } + + /** + * Converts a {@link TimelinePrefixFilter} to an equivalent HBase + * {@link QualifierFilter}. + * @param colPrefix + * @param filter + * @return a {@link QualifierFilter} object + */ + private static Filter createHBaseColQualPrefixFilter( + ColumnPrefix colPrefix, TimelinePrefixFilter filter) { + return new QualifierFilter(getHBaseCompareOp(filter.getCompareOp()), + new BinaryPrefixComparator( + colPrefix.getColumnPrefixBytes(filter.getPrefix()))); + } + + /** + * Creates equivalent HBase {@link FilterList} from {@link TimelineFilterList} + * while converting different timeline filters(of type {@link TimelineFilter}) + * into their equivalent HBase filters. + * @param colPrefix + * @param filterList + * @return a {@link FilterList} object + */ + public static FilterList createHBaseFilterList(ColumnPrefix colPrefix, + TimelineFilterList filterList) { + FilterList list = + new FilterList(getHBaseOperator(filterList.getOperator())); + for (TimelineFilter filter : filterList.getFilterList()) { + switch(filter.getFilterType()) { + case LIST: + list.addFilter( + createHBaseFilterList(colPrefix, (TimelineFilterList)filter)); + break; + case PREFIX: + list.addFilter(createHBaseColQualPrefixFilter( + colPrefix, (TimelinePrefixFilter)filter)); + break; + default: + break; + } + } + return list; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelinePrefixFilter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelinePrefixFilter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelinePrefixFilter.java new file mode 100644 index 0000000..6233f26 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelinePrefixFilter.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * Filter class which represents filter to be applied based on prefixes. + * Prefixes can either match or not match. + */ +@Private +@Unstable +public class TimelinePrefixFilter extends TimelineFilter { + + private TimelineCompareOp compareOp; + private String prefix; + + public TimelinePrefixFilter(TimelineCompareOp op, String prefix) { + this.prefix = prefix; + if (op != TimelineCompareOp.EQUAL && op != TimelineCompareOp.NOT_EQUAL) { + throw new IllegalArgumentException("CompareOp for prefix filter should " + + "be EQUAL or NOT_EQUAL"); + } + this.compareOp = op; + } + + @Override + public TimelineFilterType getFilterType() { + return TimelineFilterType.PREFIX; + } + + public String getPrefix() { + return prefix; + } + + public TimelineCompareOp getCompareOp() { + return compareOp; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java new file mode 100644 index 0000000..f7c0705 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.server.timelineservice.reader.filter stores + * timeline filter implementations. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java index 8324afd..7082a5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java @@ -28,11 +28,21 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; @@ -56,18 +66,21 @@ class ApplicationEntityReader extends GenericEntityReader { Map> relatesTo, Map> isRelatedTo, Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, + TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) { super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, fieldsToRetrieve, true); + eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve, + true); } public ApplicationEntityReader(String userId, String clusterId, String flowId, Long flowRunId, String appId, String entityType, - String entityId, EnumSet fieldsToRetrieve) { + String entityId, TimelineFilterList confsToRetrieve, + TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) { super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, - fieldsToRetrieve); + confsToRetrieve, metricsToRetrieve, fieldsToRetrieve); } /** @@ -78,13 +91,95 @@ class ApplicationEntityReader extends GenericEntityReader { } @Override - protected Result getResult(Configuration hbaseConf, Connection conn) - throws IOException { + protected FilterList constructFilterListBasedOnFields() { + FilterList list = new FilterList(Operator.MUST_PASS_ONE); + // Fetch all the columns. + if (fieldsToRetrieve.contains(Field.ALL) && + (confsToRetrieve == null || + confsToRetrieve.getFilterList().isEmpty()) && + (metricsToRetrieve == null || + metricsToRetrieve.getFilterList().isEmpty())) { + return list; + } + FilterList infoColFamilyList = new FilterList(); + // By default fetch everything in INFO column family. + FamilyFilter infoColumnFamily = + new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(ApplicationColumnFamily.INFO.getBytes())); + infoColFamilyList.addFilter(infoColumnFamily); + // Events not required. + if (!fieldsToRetrieve.contains(Field.EVENTS) && + !fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + ApplicationColumnPrefix.EVENT.getColumnPrefixBytes("")))); + } + // info not required. + if (!fieldsToRetrieve.contains(Field.INFO) && + !fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + ApplicationColumnPrefix.INFO.getColumnPrefixBytes("")))); + } + // is releated to not required. + if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) && + !fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + ApplicationColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes("")))); + } + // relates to not required. + if (!fieldsToRetrieve.contains(Field.RELATES_TO) && + !fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + ApplicationColumnPrefix.RELATES_TO.getColumnPrefixBytes("")))); + } + list.addFilter(infoColFamilyList); + if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) || + (confsToRetrieve != null && + !confsToRetrieve.getFilterList().isEmpty())) { + FilterList filterCfg = + new FilterList(new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(ApplicationColumnFamily.CONFIGS.getBytes()))); + if (confsToRetrieve != null && + !confsToRetrieve.getFilterList().isEmpty()) { + filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList( + ApplicationColumnPrefix.CONFIG, confsToRetrieve)); + } + list.addFilter(filterCfg); + } + if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) || + (metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty())) { + FilterList filterMetrics = + new FilterList(new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(ApplicationColumnFamily.METRICS.getBytes()))); + if (metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty()) { + filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList( + ApplicationColumnPrefix.METRIC, metricsToRetrieve)); + } + list.addFilter(filterMetrics); + } + return list; + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { byte[] rowKey = ApplicationRowKey.getRowKey(clusterId, userId, flowId, flowRunId, appId); Get get = new Get(rowKey); get.setMaxVersions(Integer.MAX_VALUE); + if (filterList != null && !filterList.getFilters().isEmpty()) { + get.setFilter(filterList); + } return table.getResult(hbaseConf, conn, get); } @@ -115,6 +210,15 @@ class ApplicationEntityReader extends GenericEntityReader { if (fieldsToRetrieve == null) { fieldsToRetrieve = EnumSet.noneOf(Field.class); } + if (!fieldsToRetrieve.contains(Field.CONFIGS) && + confsToRetrieve != null && !confsToRetrieve.getFilterList().isEmpty()) { + fieldsToRetrieve.add(Field.CONFIGS); + } + if (!fieldsToRetrieve.contains(Field.METRICS) && + metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty()) { + fieldsToRetrieve.add(Field.METRICS); + } if (!singleEntityRead) { if (limit == null || limit < 0) { limit = TimelineReader.DEFAULT_LIMIT; @@ -136,7 +240,7 @@ class ApplicationEntityReader extends GenericEntityReader { @Override protected ResultScanner getResults(Configuration hbaseConf, - Connection conn) throws IOException { + Connection conn, FilterList filterList) throws IOException { Scan scan = new Scan(); if (flowRunId != null) { scan.setRowPrefixFilter(ApplicationRowKey. @@ -145,7 +249,12 @@ class ApplicationEntityReader extends GenericEntityReader { scan.setRowPrefixFilter(ApplicationRowKey. getRowKeyPrefix(clusterId, userId, flowId)); } - scan.setFilter(new PageFilter(limit)); + FilterList newList = new FilterList(); + newList.addFilter(new PageFilter(limit)); + if (filterList != null && !filterList.getFilters().isEmpty()) { + newList.addFilter(filterList); + } + scan.setFilter(newList); return table.getResultScanner(hbaseConf, conn, scan); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java index 30d1d00..48bf844 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import org.codehaus.jackson.JsonGenerationException; @@ -272,6 +273,7 @@ public class FileSystemTimelineReaderImpl extends AbstractService Map> relatesTo, Map> isRelatedTo, Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, + TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) throws IOException { if (limit == null || limit <= 0) { limit = DEFAULT_LIMIT; @@ -386,7 +388,9 @@ public class FileSystemTimelineReaderImpl extends AbstractService @Override public TimelineEntity getEntity(String userId, String clusterId, String flowId, Long flowRunId, String appId, String entityType, - String entityId, EnumSet fieldsToRetrieve) throws IOException { + String entityId, TimelineFilterList confsToRetrieve, + TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) + throws IOException { String flowRunPath = getFlowRunPath(userId, clusterId, flowId, flowRunId, appId); File dir = new File(new File(rootPath, ENTITIES_DIR), @@ -413,6 +417,7 @@ public class FileSystemTimelineReaderImpl extends AbstractService Map> relatesTo, Map> isRelatedTo, Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, + TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) throws IOException { String flowRunPath = getFlowRunPath(userId, clusterId, flowId, flowRunId, appId); @@ -422,6 +427,6 @@ public class FileSystemTimelineReaderImpl extends AbstractService return getEntities(dir, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, fieldsToRetrieve); + eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java index 3e32128..71dd0a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; @@ -58,14 +59,14 @@ class FlowActivityEntityReader extends TimelineEntityReader { super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, fieldsToRetrieve, true); + eventFilters, null, null, fieldsToRetrieve, true); } public FlowActivityEntityReader(String userId, String clusterId, String flowId, Long flowRunId, String appId, String entityType, String entityId, EnumSet fieldsToRetrieve) { super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, - fieldsToRetrieve); + null, null, fieldsToRetrieve); } /** @@ -96,15 +97,20 @@ class FlowActivityEntityReader extends TimelineEntityReader { } @Override - protected Result getResult(Configuration hbaseConf, Connection conn) - throws IOException { + protected FilterList constructFilterListBasedOnFields() { + return null; + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { throw new UnsupportedOperationException( "we don't support a single entity query"); } @Override protected ResultScanner getResults(Configuration hbaseConf, - Connection conn) throws IOException { + Connection conn, FilterList filterList) throws IOException { Scan scan = new Scan(); if (createdTimeBegin == DEFAULT_BEGIN_TIME && createdTimeEnd == DEFAULT_END_TIME) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java index ebf2d27..1895fa6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java @@ -28,12 +28,22 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; @@ -54,18 +64,20 @@ class FlowRunEntityReader extends TimelineEntityReader { Map> relatesTo, Map> isRelatedTo, Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, + TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) { super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, fieldsToRetrieve, true); + eventFilters, null, metricsToRetrieve, fieldsToRetrieve, true); } public FlowRunEntityReader(String userId, String clusterId, String flowId, Long flowRunId, String appId, String entityType, - String entityId, EnumSet fieldsToRetrieve) { + String entityId, TimelineFilterList confsToRetrieve, + TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) { super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, - fieldsToRetrieve); + null, metricsToRetrieve, fieldsToRetrieve); } /** @@ -101,26 +113,69 @@ class FlowRunEntityReader extends TimelineEntityReader { if (createdTimeEnd == null) { createdTimeEnd = DEFAULT_END_TIME; } + if (!fieldsToRetrieve.contains(Field.METRICS) && + metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty()) { + fieldsToRetrieve.add(Field.METRICS); + } + } + } + + @Override + protected FilterList constructFilterListBasedOnFields() { + FilterList list = new FilterList(Operator.MUST_PASS_ONE); + + // By default fetch everything in INFO column family. + FamilyFilter infoColumnFamily = + new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(FlowRunColumnFamily.INFO.getBytes())); + // Metrics not required. + if (!singleEntityRead && !fieldsToRetrieve.contains(Field.METRICS) && + !fieldsToRetrieve.contains(Field.ALL)) { + FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE); + infoColFamilyList.addFilter(infoColumnFamily); + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + FlowRunColumnPrefix.METRIC.getColumnPrefixBytes("")))); + list.addFilter(infoColFamilyList); + } + if (metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty()) { + FilterList infoColFamilyList = new FilterList(); + infoColFamilyList.addFilter(infoColumnFamily); + infoColFamilyList.addFilter(TimelineFilterUtils.createHBaseFilterList( + FlowRunColumnPrefix.METRIC, metricsToRetrieve)); + list.addFilter(infoColFamilyList); } + return list; } @Override - protected Result getResult(Configuration hbaseConf, Connection conn) - throws IOException { + protected Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowId, flowRunId); Get get = new Get(rowKey); get.setMaxVersions(Integer.MAX_VALUE); + if (filterList != null && !filterList.getFilters().isEmpty()) { + get.setFilter(filterList); + } return table.getResult(hbaseConf, conn, get); } @Override protected ResultScanner getResults(Configuration hbaseConf, - Connection conn) throws IOException { + Connection conn, FilterList filterList) throws IOException { Scan scan = new Scan(); scan.setRowPrefixFilter( FlowRunRowKey.getRowKeyPrefix(clusterId, userId, flowId)); - scan.setFilter(new PageFilter(limit)); + FilterList newList = new FilterList(); + newList.addFilter(new PageFilter(limit)); + if (filterList != null && !filterList.getFilters().isEmpty()) { + newList.addFilter(filterList); + } + scan.setFilter(newList); return table.getResultScanner(hbaseConf, conn, scan); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java index 04fc8ee..dcb8b89 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java @@ -32,9 +32,18 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; @@ -46,6 +55,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; @@ -72,18 +82,21 @@ class GenericEntityReader extends TimelineEntityReader { Map> relatesTo, Map> isRelatedTo, Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, + TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve, boolean sortedKeys) { super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, fieldsToRetrieve, sortedKeys); + eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve, + sortedKeys); } public GenericEntityReader(String userId, String clusterId, String flowId, Long flowRunId, String appId, String entityType, - String entityId, EnumSet fieldsToRetrieve) { + String entityId, TimelineFilterList confsToRetrieve, + TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) { super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, - fieldsToRetrieve); + confsToRetrieve, metricsToRetrieve, fieldsToRetrieve); } /** @@ -93,6 +106,85 @@ class GenericEntityReader extends TimelineEntityReader { return ENTITY_TABLE; } + @Override + protected FilterList constructFilterListBasedOnFields() { + FilterList list = new FilterList(Operator.MUST_PASS_ONE); + // Fetch all the columns. + if (fieldsToRetrieve.contains(Field.ALL) && + (confsToRetrieve == null || + confsToRetrieve.getFilterList().isEmpty()) && + (metricsToRetrieve == null || + metricsToRetrieve.getFilterList().isEmpty())) { + return list; + } + FilterList infoColFamilyList = new FilterList(); + // By default fetch everything in INFO column family. + FamilyFilter infoColumnFamily = + new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(EntityColumnFamily.INFO.getBytes())); + infoColFamilyList.addFilter(infoColumnFamily); + // Events not required. + if (!fieldsToRetrieve.contains(Field.EVENTS) && + !fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + EntityColumnPrefix.EVENT.getColumnPrefixBytes("")))); + } + // info not required. + if (!fieldsToRetrieve.contains(Field.INFO) && + !fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + EntityColumnPrefix.INFO.getColumnPrefixBytes("")))); + } + // is related to not required. + if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) && + !fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + EntityColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes("")))); + } + // relates to not required. + if (!fieldsToRetrieve.contains(Field.RELATES_TO) && + !fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + EntityColumnPrefix.RELATES_TO.getColumnPrefixBytes("")))); + } + list.addFilter(infoColFamilyList); + if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) || + (confsToRetrieve != null && + !confsToRetrieve.getFilterList().isEmpty())) { + FilterList filterCfg = + new FilterList(new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(EntityColumnFamily.CONFIGS.getBytes()))); + if (confsToRetrieve != null && + !confsToRetrieve.getFilterList().isEmpty()) { + filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList( + EntityColumnPrefix.CONFIG, confsToRetrieve)); + } + list.addFilter(filterCfg); + } + if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) || + (metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty())) { + FilterList filterMetrics = + new FilterList(new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(EntityColumnFamily.METRICS.getBytes()))); + if (metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty()) { + filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList( + EntityColumnPrefix.METRIC, metricsToRetrieve)); + } + list.addFilter(filterMetrics); + } + return list; + } + protected FlowContext lookupFlowContext(String clusterId, String appId, Configuration hbaseConf, Connection conn) throws IOException { byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId); @@ -145,6 +237,15 @@ class GenericEntityReader extends TimelineEntityReader { if (fieldsToRetrieve == null) { fieldsToRetrieve = EnumSet.noneOf(Field.class); } + if (!fieldsToRetrieve.contains(Field.CONFIGS) && + confsToRetrieve != null && !confsToRetrieve.getFilterList().isEmpty()) { + fieldsToRetrieve.add(Field.CONFIGS); + } + if (!fieldsToRetrieve.contains(Field.METRICS) && + metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty()) { + fieldsToRetrieve.add(Field.METRICS); + } if (!singleEntityRead) { if (limit == null || limit < 0) { limit = TimelineReader.DEFAULT_LIMIT; @@ -165,25 +266,31 @@ class GenericEntityReader extends TimelineEntityReader { } @Override - protected Result getResult(Configuration hbaseConf, Connection conn) - throws IOException { + protected Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { byte[] rowKey = EntityRowKey.getRowKey(clusterId, userId, flowId, flowRunId, appId, entityType, entityId); Get get = new Get(rowKey); get.setMaxVersions(Integer.MAX_VALUE); + if (filterList != null && !filterList.getFilters().isEmpty()) { + get.setFilter(filterList); + } return table.getResult(hbaseConf, conn, get); } @Override protected ResultScanner getResults(Configuration hbaseConf, - Connection conn) throws IOException { + Connection conn, FilterList filterList) throws IOException { // Scan through part of the table to find the entities belong to one app // and one type Scan scan = new Scan(); scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix( clusterId, userId, flowId, flowRunId, appId, entityType)); scan.setMaxVersions(Integer.MAX_VALUE); + if (filterList != null && !filterList.getFilters().isEmpty()) { + scan.setFilter(filterList); + } return table.getResultScanner(hbaseConf, conn, scan); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java index 889ae19..9e4b26a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; public class HBaseTimelineReaderImpl extends AbstractService implements TimelineReader { @@ -64,11 +65,13 @@ public class HBaseTimelineReaderImpl @Override public TimelineEntity getEntity(String userId, String clusterId, String flowId, Long flowRunId, String appId, String entityType, - String entityId, EnumSet fieldsToRetrieve) + String entityId, TimelineFilterList confsToRetrieve, + TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) throws IOException { TimelineEntityReader reader = TimelineEntityReaderFactory.createSingleEntityReader(userId, clusterId, - flowId, flowRunId, appId, entityType, entityId, fieldsToRetrieve); + flowId, flowRunId, appId, entityType, entityId, confsToRetrieve, + metricsToRetrieve, fieldsToRetrieve); return reader.readEntity(hbaseConf, conn); } @@ -80,13 +83,15 @@ public class HBaseTimelineReaderImpl Map> relatesTo, Map> isRelatedTo, Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, + TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) throws IOException { TimelineEntityReader reader = TimelineEntityReaderFactory.createMultipleEntitiesReader(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, - metricFilters, eventFilters, fieldsToRetrieve); + metricFilters, eventFilters, confsToRetrieve, metricsToRetrieve, + fieldsToRetrieve); return reader.readEntities(hbaseConf, conn); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java index adaf42e..7178aab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java @@ -31,8 +31,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; @@ -70,6 +72,8 @@ abstract class TimelineEntityReader { protected Map configFilters; protected Set metricFilters; protected Set eventFilters; + protected TimelineFilterList confsToRetrieve; + protected TimelineFilterList metricsToRetrieve; /** * Main table the entity reader uses. @@ -94,6 +98,7 @@ abstract class TimelineEntityReader { Map> relatesTo, Map> isRelatedTo, Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, + TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve, boolean sortedKeys) { this.singleEntityRead = false; this.sortedKeys = sortedKeys; @@ -115,6 +120,8 @@ abstract class TimelineEntityReader { this.configFilters = configFilters; this.metricFilters = metricFilters; this.eventFilters = eventFilters; + this.confsToRetrieve = confsToRetrieve; + this.metricsToRetrieve = metricsToRetrieve; this.table = getTable(); } @@ -124,7 +131,8 @@ abstract class TimelineEntityReader { */ protected TimelineEntityReader(String userId, String clusterId, String flowId, Long flowRunId, String appId, String entityType, - String entityId, EnumSet fieldsToRetrieve) { + String entityId, TimelineFilterList confsToRetrieve, + TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) { this.singleEntityRead = true; this.userId = userId; this.clusterId = clusterId; @@ -134,11 +142,21 @@ abstract class TimelineEntityReader { this.entityType = entityType; this.fieldsToRetrieve = fieldsToRetrieve; this.entityId = entityId; + this.confsToRetrieve = confsToRetrieve; + this.metricsToRetrieve = metricsToRetrieve; this.table = getTable(); } /** + * Creates a {@link FilterList} based on fields, confs and metrics to + * retrieve. This filter list will be set in Scan/Get objects to trim down + * results fetched from HBase back-end storage. + * @return a {@link FilterList} object. + */ + protected abstract FilterList constructFilterListBasedOnFields(); + + /** * Reads and deserializes a single timeline entity from the HBase storage. */ public TimelineEntity readEntity(Configuration hbaseConf, Connection conn) @@ -146,7 +164,8 @@ abstract class TimelineEntityReader { validateParams(); augmentParams(hbaseConf, conn); - Result result = getResult(hbaseConf, conn); + FilterList filterList = constructFilterListBasedOnFields(); + Result result = getResult(hbaseConf, conn, filterList); if (result == null || result.isEmpty()) { // Could not find a matching row. LOG.info("Cannot find matching entity of type " + entityType); @@ -166,7 +185,8 @@ abstract class TimelineEntityReader { augmentParams(hbaseConf, conn); NavigableSet entities = new TreeSet<>(); - ResultScanner results = getResults(hbaseConf, conn); + FilterList filterList = constructFilterListBasedOnFields(); + ResultScanner results = getResults(hbaseConf, conn, filterList); try { for (Result result : results) { TimelineEntity entity = parseEntity(result); @@ -211,14 +231,14 @@ abstract class TimelineEntityReader { * * @return the {@link Result} instance or null if no such record is found. */ - protected abstract Result getResult(Configuration hbaseConf, Connection conn) - throws IOException; + protected abstract Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException; /** * Fetches a {@link ResultScanner} for a multi-entity read. */ protected abstract ResultScanner getResults(Configuration hbaseConf, - Connection conn) throws IOException; + Connection conn, FilterList filterList) throws IOException; /** * Given a {@link Result} instance, deserializes and creates a http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java index f5341c2..16204c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Set; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; /** @@ -34,22 +35,23 @@ class TimelineEntityReaderFactory { */ public static TimelineEntityReader createSingleEntityReader(String userId, String clusterId, String flowId, Long flowRunId, String appId, - String entityType, String entityId, EnumSet fieldsToRetrieve) { + String entityType, String entityId, TimelineFilterList confs, + TimelineFilterList metrics, EnumSet fieldsToRetrieve) { // currently the types that are handled separate from the generic entity // table are application, flow run, and flow activity entities if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) { return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId, - appId, entityType, entityId, fieldsToRetrieve); + appId, entityType, entityId, confs, metrics, fieldsToRetrieve); } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) { return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId, - appId, entityType, entityId, fieldsToRetrieve); + appId, entityType, entityId, confs, metrics, fieldsToRetrieve); } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) { return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, fieldsToRetrieve); } else { // assume we're dealing with a generic entity read return new GenericEntityReader(userId, clusterId, flowId, flowRunId, - appId, entityType, entityId, fieldsToRetrieve); + appId, entityType, entityId, confs, metrics, fieldsToRetrieve); } } @@ -64,6 +66,7 @@ class TimelineEntityReaderFactory { Map> relatesTo, Map> isRelatedTo, Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, + TimelineFilterList confs, TimelineFilterList metrics, EnumSet fieldsToRetrieve) { // currently the types that are handled separate from the generic entity // table are application, flow run, and flow activity entities @@ -71,8 +74,8 @@ class TimelineEntityReaderFactory { return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, - infoFilters, configFilters, metricFilters, eventFilters, - fieldsToRetrieve); + infoFilters, configFilters, metricFilters, eventFilters, confs, + metrics, fieldsToRetrieve); } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) { return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, @@ -83,15 +86,15 @@ class TimelineEntityReaderFactory { return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, - infoFilters, configFilters, metricFilters, eventFilters, - fieldsToRetrieve); + infoFilters, configFilters, metricFilters, eventFilters, confs, + metrics, fieldsToRetrieve); } else { // assume we're dealing with a generic entity read return new GenericEntityReader(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, - infoFilters, configFilters, metricFilters, eventFilters, - fieldsToRetrieve, false); + infoFilters, configFilters, metricFilters, eventFilters, confs, + metrics, fieldsToRetrieve, false); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java index e4e305e..0ed17da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java @@ -27,6 +27,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter; /** ATSv2 reader interface. */ @Private @@ -70,6 +72,18 @@ public interface TimelineReader extends Service { * Entity type (mandatory) * @param entityId * Entity Id (mandatory) + * @param confsToRetrieve + * Used for deciding which configs to return in response. This is + * represented as a {@link TimelineFilterList} object containing + * {@link TimelinePrefixFilter} objects. These can either be exact config + * keys' or prefixes which are then compared against config keys' to decide + * configs to return in response. + * @param metricsToRetrieve + * Used for deciding which metrics to return in response. This is + * represented as a {@link TimelineFilterList} object containing + * {@link TimelinePrefixFilter} objects. These can either be exact metric + * ids' or prefixes which are then compared against metric ids' to decide + * metrics to return in response. * @param fieldsToRetrieve * Specifies which fields of the entity object to retrieve(optional), see * {@link Field}. If null, retrieves 4 fields namely entity id, @@ -81,6 +95,7 @@ public interface TimelineReader extends Service { */ TimelineEntity getEntity(String userId, String clusterId, String flowId, Long flowRunId, String appId, String entityType, String entityId, + TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) throws IOException; /** @@ -139,6 +154,22 @@ public interface TimelineReader extends Service { * @param eventFilters * Matched entities should contain the given events (optional). If null * or empty, the filter is not applied. + * @param confsToRetrieve + * Used for deciding which configs to return in response. This is + * represented as a {@link TimelineFilterList} object containing + * {@link TimelinePrefixFilter} objects. These can either be exact config + * keys' or prefixes which are then compared against config keys' to decide + * configs(inside entities) to return in response. This should not be + * confused with configFilters which is used to decide which entities to + * return instead. + * @param metricsToRetrieve + * Used for deciding which metrics to return in response. This is + * represented as a {@link TimelineFilterList} object containing + * {@link TimelinePrefixFilter} objects. These can either be exact metric + * ids' or prefixes which are then compared against metric ids' to decide + * metrics(inside entities) to return in response. This should not be + * confused with metricFilters which is used to decide which entities to + * return instead. * @param fieldsToRetrieve * Specifies which fields of the entity object to retrieve(optional), see * {@link Field}. If null, retrieves 4 fields namely entity id, @@ -158,5 +189,6 @@ public interface TimelineReader extends Service { Map> relatesTo, Map> isRelatedTo, Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, + TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) throws IOException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java index b06f5c1..056e51f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java @@ -119,6 +119,18 @@ public enum ApplicationColumnPrefix implements ColumnPrefix { return columnPrefix; } + @Override + public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) { + return ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifierPrefix); + } + + @Override + public byte[] getColumnPrefixBytes(String qualifierPrefix) { + return ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifierPrefix); + } + /* * (non-Javadoc) * @@ -139,8 +151,7 @@ public enum ApplicationColumnPrefix implements ColumnPrefix { + tableMutator.getName().getNameAsString()); } - byte[] columnQualifier = - ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); + byte[] columnQualifier = getColumnPrefixBytes(qualifier); column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, attributes); @@ -166,8 +177,7 @@ public enum ApplicationColumnPrefix implements ColumnPrefix { + tableMutator.getName().getNameAsString()); } - byte[] columnQualifier = - ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); + byte[] columnQualifier = getColumnPrefixBytes(qualifier); column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, attributes); http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java index db49098..0f3ac4e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java @@ -44,13 +44,13 @@ public interface ColumnPrefix { * @param qualifier column qualifier. Nothing gets written when null. * @param timestamp version timestamp. When null the server timestamp will be * used. - *@param attributes attributes for the mutation that are used by the coprocessor - * to set/read the cell tags + * @param attributes attributes for the mutation that are used by the + * coprocessor to set/read the cell tags. * @param inputValue the value to write to the rowKey and column qualifier. * Nothing gets written when null. * @throws IOException */ - public void store(byte[] rowKey, TypedBufferedMutator tableMutator, + void store(byte[] rowKey, TypedBufferedMutator tableMutator, byte[] qualifier, Long timestamp, Object inputValue, Attribute... attributes) throws IOException; @@ -65,13 +65,13 @@ public interface ColumnPrefix { * @param qualifier column qualifier. Nothing gets written when null. * @param timestamp version timestamp. When null the server timestamp will be * used. - *@param attributes attributes for the mutation that are used by the coprocessor - * to set/read the cell tags + * @param attributes attributes for the mutation that are used by the + * coprocessor to set/read the cell tags. * @param inputValue the value to write to the rowKey and column qualifier. * Nothing gets written when null. * @throws IOException */ - public void store(byte[] rowKey, TypedBufferedMutator tableMutator, + void store(byte[] rowKey, TypedBufferedMutator tableMutator, String qualifier, Long timestamp, Object inputValue, Attribute... attributes) throws IOException; @@ -86,7 +86,7 @@ public interface ColumnPrefix { * in the result. * @throws IOException */ - public Object readResult(Result result, String qualifier) throws IOException; + Object readResult(Result result, String qualifier) throws IOException; /** * @param result from which to read columns @@ -94,7 +94,7 @@ public interface ColumnPrefix { * (or all of them if the prefix value is null). * @throws IOException */ - public Map readResults(Result result) throws IOException; + Map readResults(Result result) throws IOException; /** * @param result from which to reads data with timestamps @@ -104,7 +104,18 @@ public interface ColumnPrefix { * idB={timestamp3->value3}, idC={timestamp1->value4}} * @throws IOException */ - public NavigableMap> + NavigableMap> readResultsWithTimestamps(Result result) throws IOException; + /** + * @param qualifierPrefix Column qualifier or prefix of qualifier. + * @return a byte array encoding column prefix and qualifier/prefix passed. + */ + byte[] getColumnPrefixBytes(String qualifierPrefix); + + /** + * @param qualifierPrefix Column qualifier or prefix of qualifier. + * @return a byte array encoding column prefix and qualifier/prefix passed. + */ + byte[] getColumnPrefixBytes(byte[] qualifierPrefix); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java index abede9c..5b71228 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java @@ -119,6 +119,18 @@ public enum EntityColumnPrefix implements ColumnPrefix { return columnPrefix; } + @Override + public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) { + return ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifierPrefix); + } + + @Override + public byte[] getColumnPrefixBytes(String qualifierPrefix) { + return ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifierPrefix); + } + /* * (non-Javadoc) * @@ -140,8 +152,7 @@ public enum EntityColumnPrefix implements ColumnPrefix { + tableMutator.getName().getNameAsString()); } - byte[] columnQualifier = - ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); + byte[] columnQualifier = getColumnPrefixBytes(qualifier); column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, attributes); @@ -167,8 +178,7 @@ public enum EntityColumnPrefix implements ColumnPrefix { + tableMutator.getName().getNameAsString()); } - byte[] columnQualifier = - ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); + byte[] columnQualifier = getColumnPrefixBytes(qualifier); column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, attributes);