Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4E359200C63 for ; Wed, 26 Apr 2017 15:32:02 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4C9CF160BBD; Wed, 26 Apr 2017 13:32:02 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E82C2160BBB for ; Wed, 26 Apr 2017 15:31:59 +0200 (CEST) Received: (qmail 12542 invoked by uid 500); 26 Apr 2017 13:31:57 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 12175 invoked by uid 99); 26 Apr 2017 13:31:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Apr 2017 13:31:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2151BE2F41; Wed, 26 Apr 2017 13:31:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: varunsaxena@apache.org To: common-commits@hadoop.apache.org Date: Wed, 26 Apr 2017 13:32:03 -0000 Message-Id: In-Reply-To: <6b3cccbc60654dcea3eb180298e895f0@git.apache.org> References: <6b3cccbc60654dcea3eb180298e895f0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/50] [abbrv] hadoop git commit: YARN-5928. Move ATSv2 HBase backend code into a new module that is only dependent at runtime by yarn servers. Contributed by Haibo Chen. archived-at: Wed, 26 Apr 2017 13:32:02 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java deleted file mode 100644 index 90dd345..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java +++ /dev/null @@ -1,182 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.flow; - -import java.io.IOException; - -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; - -/** - * Identifies fully qualified columns for the {@link FlowRunTable}. - */ -public enum FlowRunColumn implements Column { - - /** - * When the flow was started. This is the minimum of currently known - * application start times. - */ - MIN_START_TIME(FlowRunColumnFamily.INFO, "min_start_time", - AggregationOperation.GLOBAL_MIN, new LongConverter()), - - /** - * When the flow ended. This is the maximum of currently known application end - * times. - */ - MAX_END_TIME(FlowRunColumnFamily.INFO, "max_end_time", - AggregationOperation.GLOBAL_MAX, new LongConverter()), - - /** - * The version of the flow that this flow belongs to. - */ - FLOW_VERSION(FlowRunColumnFamily.INFO, "flow_version", null); - - private final ColumnHelper column; - private final ColumnFamily columnFamily; - private final String columnQualifier; - private final byte[] columnQualifierBytes; - private final AggregationOperation aggOp; - - private FlowRunColumn(ColumnFamily columnFamily, - String columnQualifier, AggregationOperation aggOp) { - this(columnFamily, columnQualifier, aggOp, - GenericConverter.getInstance()); - } - - private FlowRunColumn(ColumnFamily columnFamily, - String columnQualifier, AggregationOperation aggOp, - ValueConverter converter) { - this.columnFamily = columnFamily; - this.columnQualifier = columnQualifier; - this.aggOp = aggOp; - // Future-proof by ensuring the right column prefix hygiene. - this.columnQualifierBytes = Bytes.toBytes(Separator.SPACE - .encode(columnQualifier)); - this.column = new ColumnHelper(columnFamily, converter); - } - - /** - * @return the column name value - */ - private String getColumnQualifier() { - return columnQualifier; - } - - @Override - public byte[] getColumnQualifierBytes() { - return columnQualifierBytes.clone(); - } - - @Override - public byte[] getColumnFamilyBytes() { - return columnFamily.getBytes(); - } - - public AggregationOperation getAggregationOperation() { - return aggOp; - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common.Column#store - * (byte[], org.apache.hadoop.yarn.server.timelineservice.storage.common. - * TypedBufferedMutator, java.lang.Long, java.lang.Object, - * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[]) - */ - public void store(byte[] rowKey, - TypedBufferedMutator tableMutator, Long timestamp, - Object inputValue, Attribute... attributes) throws IOException { - - Attribute[] combinedAttributes = - HBaseTimelineStorageUtils.combineAttributes(attributes, aggOp); - column.store(rowKey, tableMutator, columnQualifierBytes, timestamp, - inputValue, combinedAttributes); - } - - public Object readResult(Result result) throws IOException { - return column.readResult(result, columnQualifierBytes); - } - - /** - * Retrieve an {@link FlowRunColumn} given a name, or null if there is no - * match. The following holds true: {@code columnFor(x) == columnFor(y)} if - * and only if {@code x.equals(y)} or {@code (x == y == null)} - * - * @param columnQualifier - * Name of the column to retrieve - * @return the corresponding {@link FlowRunColumn} or null - */ - public static final FlowRunColumn columnFor(String columnQualifier) { - - // Match column based on value, assume column family matches. - for (FlowRunColumn ec : FlowRunColumn.values()) { - // Find a match based only on name. - if (ec.getColumnQualifier().equals(columnQualifier)) { - return ec; - } - } - - // Default to null - return null; - } - - @Override - public ValueConverter getValueConverter() { - return column.getValueConverter(); - } - - /** - * Retrieve an {@link FlowRunColumn} given a name, or null if there is no - * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)} - * if and only if {@code a.equals(b) & x.equals(y)} or - * {@code (x == y == null)} - * - * @param columnFamily - * The columnFamily for which to retrieve the column. - * @param name - * Name of the column to retrieve - * @return the corresponding {@link FlowRunColumn} or null if both arguments - * don't match. - */ - public static final FlowRunColumn columnFor(FlowRunColumnFamily columnFamily, - String name) { - - for (FlowRunColumn ec : FlowRunColumn.values()) { - // Find a match based column family and on name. - if (ec.columnFamily.equals(columnFamily) - && ec.getColumnQualifier().equals(name)) { - return ec; - } - } - - // Default to null - return null; - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java deleted file mode 100644 index 8faf5f8..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.flow; - -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; - -/** - * Represents the flow run table column families. - */ -public enum FlowRunColumnFamily implements ColumnFamily { - - /** - * Info column family houses known columns, specifically ones included in - * columnfamily filters. - */ - INFO("i"); - - /** - * Byte representation of this column family. - */ - private final byte[] bytes; - - /** - * @param value - * create a column family with this name. Must be lower case and - * without spaces. - */ - private FlowRunColumnFamily(String value) { - // column families should be lower case and not contain any spaces. - this.bytes = Bytes.toBytes(Separator.SPACE.encode(value)); - } - - public byte[] getBytes() { - return Bytes.copy(bytes); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java deleted file mode 100644 index 278d18e..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java +++ /dev/null @@ -1,268 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.flow; - -import java.io.IOException; -import java.util.Map; -import java.util.NavigableMap; - -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; - -/** - * Identifies partially qualified columns for the {@link FlowRunTable}. - */ -public enum FlowRunColumnPrefix implements ColumnPrefix { - - /** - * To store flow run info values. - */ - METRIC(FlowRunColumnFamily.INFO, "m", null, new LongConverter()); - - private final ColumnHelper column; - private final ColumnFamily columnFamily; - - /** - * Can be null for those cases where the provided column qualifier is the - * entire column name. - */ - private final String columnPrefix; - private final byte[] columnPrefixBytes; - - private final AggregationOperation aggOp; - - /** - * Private constructor, meant to be used by the enum definition. - * - * @param columnFamily that this column is stored in. - * @param columnPrefix for this column. - */ - private FlowRunColumnPrefix(ColumnFamily columnFamily, - String columnPrefix, AggregationOperation fra, ValueConverter converter) { - this(columnFamily, columnPrefix, fra, converter, false); - } - - private FlowRunColumnPrefix(ColumnFamily columnFamily, - String columnPrefix, AggregationOperation fra, ValueConverter converter, - boolean compoundColQual) { - column = new ColumnHelper(columnFamily, converter); - this.columnFamily = columnFamily; - this.columnPrefix = columnPrefix; - if (columnPrefix == null) { - this.columnPrefixBytes = null; - } else { - // Future-proof by ensuring the right column prefix hygiene. - this.columnPrefixBytes = - Bytes.toBytes(Separator.SPACE.encode(columnPrefix)); - } - this.aggOp = fra; - } - - /** - * @return the column name value - */ - public String getColumnPrefix() { - return columnPrefix; - } - - public byte[] getColumnPrefixBytes() { - return columnPrefixBytes.clone(); - } - - @Override - public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) { - return ColumnHelper.getColumnQualifier(this.columnPrefixBytes, - qualifierPrefix); - } - - @Override - public byte[] getColumnPrefixBytes(String qualifierPrefix) { - return ColumnHelper.getColumnQualifier(this.columnPrefixBytes, - qualifierPrefix); - } - - @Override - public byte[] getColumnFamilyBytes() { - return columnFamily.getBytes(); - } - - public AggregationOperation getAttribute() { - return aggOp; - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #store(byte[], - * org.apache.hadoop.yarn.server.timelineservice.storage.common. - * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object) - */ - public void store(byte[] rowKey, - TypedBufferedMutator tableMutator, String qualifier, - Long timestamp, Object inputValue, Attribute... attributes) - throws IOException { - - // Null check - if (qualifier == null) { - throw new IOException("Cannot store column with null qualifier in " - + tableMutator.getName().getNameAsString()); - } - - byte[] columnQualifier = getColumnPrefixBytes(qualifier); - Attribute[] combinedAttributes = - HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp); - column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, - combinedAttributes); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #store(byte[], - * org.apache.hadoop.yarn.server.timelineservice.storage.common. - * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object) - */ - public void store(byte[] rowKey, - TypedBufferedMutator tableMutator, byte[] qualifier, - Long timestamp, Object inputValue, Attribute... attributes) - throws IOException { - - // Null check - if (qualifier == null) { - throw new IOException("Cannot store column with null qualifier in " - + tableMutator.getName().getNameAsString()); - } - - byte[] columnQualifier = getColumnPrefixBytes(qualifier); - Attribute[] combinedAttributes = - HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp); - column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, - combinedAttributes); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String) - */ - public Object readResult(Result result, String qualifier) throws IOException { - byte[] columnQualifier = - ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); - return column.readResult(result, columnQualifier); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #readResults(org.apache.hadoop.hbase.client.Result, - * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter) - */ - public Map readResults(Result result, - KeyConverter keyConverter) throws IOException { - return column.readResults(result, columnPrefixBytes, keyConverter); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result, - * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter) - */ - public NavigableMap> - readResultsWithTimestamps(Result result, KeyConverter keyConverter) - throws IOException { - return column.readResultsWithTimestamps(result, columnPrefixBytes, - keyConverter); - } - - /** - * Retrieve an {@link FlowRunColumnPrefix} given a name, or null if there is - * no match. The following holds true: {@code columnFor(x) == columnFor(y)} if - * and only if {@code x.equals(y)} or {@code (x == y == null)} - * - * @param columnPrefix Name of the column to retrieve - * @return the corresponding {@link FlowRunColumnPrefix} or null - */ - public static final FlowRunColumnPrefix columnFor(String columnPrefix) { - - // Match column based on value, assume column family matches. - for (FlowRunColumnPrefix frcp : FlowRunColumnPrefix.values()) { - // Find a match based only on name. - if (frcp.getColumnPrefix().equals(columnPrefix)) { - return frcp; - } - } - - // Default to null - return null; - } - - @Override - public ValueConverter getValueConverter() { - return column.getValueConverter(); - } - - /** - * Retrieve an {@link FlowRunColumnPrefix} given a name, or null if there is - * no match. The following holds true: - * {@code columnFor(a,x) == columnFor(b,y)} if and only if - * {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)} - * - * @param columnFamily The columnFamily for which to retrieve the column. - * @param columnPrefix Name of the column to retrieve - * @return the corresponding {@link FlowRunColumnPrefix} or null if both - * arguments don't match. - */ - public static final FlowRunColumnPrefix columnFor( - FlowRunColumnFamily columnFamily, String columnPrefix) { - - // TODO: needs unit test to confirm and need to update javadoc to explain - // null prefix case. - - for (FlowRunColumnPrefix frcp : FlowRunColumnPrefix.values()) { - // Find a match based column family and on name. - if (frcp.columnFamily.equals(columnFamily) - && (((columnPrefix == null) && (frcp.getColumnPrefix() == null)) || - (frcp.getColumnPrefix().equals(columnPrefix)))) { - return frcp; - } - } - - // Default to null - return null; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java deleted file mode 100644 index 24101c6..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java +++ /dev/null @@ -1,274 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.flow; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.CoprocessorEnvironment; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; -import org.apache.hadoop.hbase.coprocessor.ObserverContext; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.hadoop.hbase.regionserver.ScanType; -import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.StoreFile; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; - -/** - * Coprocessor for flow run table. - */ -public class FlowRunCoprocessor extends BaseRegionObserver { - - private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class); - - private Region region; - /** - * generate a timestamp that is unique per row in a region this is per region. - */ - private final TimestampGenerator timestampGenerator = - new TimestampGenerator(); - - @Override - public void start(CoprocessorEnvironment e) throws IOException { - if (e instanceof RegionCoprocessorEnvironment) { - RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; - this.region = env.getRegion(); - } - } - - /* - * (non-Javadoc) - * - * This method adds the tags onto the cells in the Put. It is presumed that - * all the cells in one Put have the same set of Tags. The existing cell - * timestamp is overwritten for non-metric cells and each such cell gets a new - * unique timestamp generated by {@link TimestampGenerator} - * - * @see - * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#prePut(org.apache - * .hadoop.hbase.coprocessor.ObserverContext, - * org.apache.hadoop.hbase.client.Put, - * org.apache.hadoop.hbase.regionserver.wal.WALEdit, - * org.apache.hadoop.hbase.client.Durability) - */ - @Override - public void prePut(ObserverContext e, Put put, - WALEdit edit, Durability durability) throws IOException { - Map attributes = put.getAttributesMap(); - // Assumption is that all the cells in a put are the same operation. - List tags = new ArrayList<>(); - if ((attributes != null) && (attributes.size() > 0)) { - for (Map.Entry attribute : attributes.entrySet()) { - Tag t = HBaseTimelineStorageUtils.getTagFromAttribute(attribute); - tags.add(t); - } - byte[] tagByteArray = Tag.fromList(tags); - NavigableMap> newFamilyMap = new TreeMap<>( - Bytes.BYTES_COMPARATOR); - for (Map.Entry> entry : put.getFamilyCellMap() - .entrySet()) { - List newCells = new ArrayList<>(entry.getValue().size()); - for (Cell cell : entry.getValue()) { - // for each cell in the put add the tags - // Assumption is that all the cells in - // one put are the same operation - // also, get a unique cell timestamp for non-metric cells - // this way we don't inadvertently overwrite cell versions - long cellTimestamp = getCellTimestamp(cell.getTimestamp(), tags); - newCells.add(CellUtil.createCell(CellUtil.cloneRow(cell), - CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), - cellTimestamp, KeyValue.Type.Put, CellUtil.cloneValue(cell), - tagByteArray)); - } - newFamilyMap.put(entry.getKey(), newCells); - } // for each entry - // Update the family map for the Put - put.setFamilyCellMap(newFamilyMap); - } - } - - /** - * Determines if the current cell's timestamp is to be used or a new unique - * cell timestamp is to be used. The reason this is done is to inadvertently - * overwrite cells when writes come in very fast. But for metric cells, the - * cell timestamp signifies the metric timestamp. Hence we don't want to - * overwrite it. - * - * @param timestamp - * @param tags - * @return cell timestamp - */ - private long getCellTimestamp(long timestamp, List tags) { - // if ts not set (hbase sets to HConstants.LATEST_TIMESTAMP by default) - // then use the generator - if (timestamp == HConstants.LATEST_TIMESTAMP) { - return timestampGenerator.getUniqueTimestamp(); - } else { - return timestamp; - } - } - - /* - * (non-Javadoc) - * - * Creates a {@link FlowScanner} Scan so that it can correctly process the - * contents of {@link FlowRunTable}. - * - * @see - * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preGetOp(org.apache - * .hadoop.hbase.coprocessor.ObserverContext, - * org.apache.hadoop.hbase.client.Get, java.util.List) - */ - @Override - public void preGetOp(ObserverContext e, - Get get, List results) throws IOException { - Scan scan = new Scan(get); - scan.setMaxVersions(); - RegionScanner scanner = null; - try { - scanner = new FlowScanner(e.getEnvironment(), scan, - region.getScanner(scan), FlowScannerOperation.READ); - scanner.next(results); - e.bypass(); - } finally { - if (scanner != null) { - scanner.close(); - } - } - } - - /* - * (non-Javadoc) - * - * Ensures that max versions are set for the Scan so that metrics can be - * correctly aggregated and min/max can be correctly determined. - * - * @see - * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preScannerOpen(org - * .apache.hadoop.hbase.coprocessor.ObserverContext, - * org.apache.hadoop.hbase.client.Scan, - * org.apache.hadoop.hbase.regionserver.RegionScanner) - */ - @Override - public RegionScanner preScannerOpen( - ObserverContext e, Scan scan, - RegionScanner scanner) throws IOException { - // set max versions for scan to see all - // versions to aggregate for metrics - scan.setMaxVersions(); - return scanner; - } - - /* - * (non-Javadoc) - * - * Creates a {@link FlowScanner} Scan so that it can correctly process the - * contents of {@link FlowRunTable}. - * - * @see - * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#postScannerOpen( - * org.apache.hadoop.hbase.coprocessor.ObserverContext, - * org.apache.hadoop.hbase.client.Scan, - * org.apache.hadoop.hbase.regionserver.RegionScanner) - */ - @Override - public RegionScanner postScannerOpen( - ObserverContext e, Scan scan, - RegionScanner scanner) throws IOException { - return new FlowScanner(e.getEnvironment(), scan, - scanner, FlowScannerOperation.READ); - } - - @Override - public InternalScanner preFlush( - ObserverContext c, Store store, - InternalScanner scanner) throws IOException { - if (LOG.isDebugEnabled()) { - if (store != null) { - LOG.debug("preFlush store = " + store.getColumnFamilyName() - + " flushableSize=" + store.getFlushableSize() - + " flushedCellsCount=" + store.getFlushedCellsCount() - + " compactedCellsCount=" + store.getCompactedCellsCount() - + " majorCompactedCellsCount=" - + store.getMajorCompactedCellsCount() + " memstoreFlushSize=" - + store.getMemstoreFlushSize() + " memstoreSize=" - + store.getMemStoreSize() + " size=" + store.getSize() - + " storeFilesCount=" + store.getStorefilesCount()); - } - } - return new FlowScanner(c.getEnvironment(), scanner, - FlowScannerOperation.FLUSH); - } - - @Override - public void postFlush(ObserverContext c, - Store store, StoreFile resultFile) { - if (LOG.isDebugEnabled()) { - if (store != null) { - LOG.debug("postFlush store = " + store.getColumnFamilyName() - + " flushableSize=" + store.getFlushableSize() - + " flushedCellsCount=" + store.getFlushedCellsCount() - + " compactedCellsCount=" + store.getCompactedCellsCount() - + " majorCompactedCellsCount=" - + store.getMajorCompactedCellsCount() + " memstoreFlushSize=" - + store.getMemstoreFlushSize() + " memstoreSize=" - + store.getMemStoreSize() + " size=" + store.getSize() - + " storeFilesCount=" + store.getStorefilesCount()); - } - } - } - - @Override - public InternalScanner preCompact( - ObserverContext e, Store store, - InternalScanner scanner, ScanType scanType, CompactionRequest request) - throws IOException { - - FlowScannerOperation requestOp = FlowScannerOperation.MINOR_COMPACTION; - if (request != null) { - requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION - : FlowScannerOperation.MINOR_COMPACTION); - LOG.info("Compactionrequest= " + request.toString() + " " - + requestOp.toString() + " RegionName=" + e.getEnvironment() - .getRegion().getRegionInfo().getRegionNameAsString()); - } - return new FlowScanner(e.getEnvironment(), scanner, requestOp); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java deleted file mode 100644 index 8fda9a8..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java +++ /dev/null @@ -1,190 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.flow; - -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; - -/** - * Represents a rowkey for the flow run table. - */ -public class FlowRunRowKey { - private final String clusterId; - private final String userId; - private final String flowName; - private final Long flowRunId; - private final FlowRunRowKeyConverter flowRunRowKeyConverter = - new FlowRunRowKeyConverter(); - - public FlowRunRowKey(String clusterId, String userId, String flowName, - Long flowRunId) { - this.clusterId = clusterId; - this.userId = userId; - this.flowName = flowName; - this.flowRunId = flowRunId; - } - - public String getClusterId() { - return clusterId; - } - - public String getUserId() { - return userId; - } - - public String getFlowName() { - return flowName; - } - - public Long getFlowRunId() { - return flowRunId; - } - - /** - * Constructs a row key for the entity table as follows: { - * clusterId!userId!flowName!Inverted Flow Run Id}. - * - * @return byte array with the row key - */ - public byte[] getRowKey() { - return flowRunRowKeyConverter.encode(this); - } - - - /** - * Given the raw row key as bytes, returns the row key as an object. - * - * @param rowKey Byte representation of row key. - * @return A FlowRunRowKey object. - */ - public static FlowRunRowKey parseRowKey(byte[] rowKey) { - return new FlowRunRowKeyConverter().decode(rowKey); - } - - /** - * returns the Flow Key as a verbose String output. - * @return String - */ - @Override - public String toString() { - StringBuilder flowKeyStr = new StringBuilder(); - flowKeyStr.append("{clusterId=" + clusterId); - flowKeyStr.append(" userId=" + userId); - flowKeyStr.append(" flowName=" + flowName); - flowKeyStr.append(" flowRunId="); - flowKeyStr.append(flowRunId); - flowKeyStr.append("}"); - return flowKeyStr.toString(); - } - - /** - * Encodes and decodes row key for flow run table. - * The row key is of the form : clusterId!userId!flowName!flowrunId. - * flowrunId is a long and rest are strings. - *

- */ - final private static class FlowRunRowKeyConverter implements - KeyConverter { - - private FlowRunRowKeyConverter() { - } - - /** - * The flow run row key is of the form clusterId!userId!flowName!flowrunId - * with each segment separated by !. The sizes below indicate sizes of each - * one of these segments in sequence. clusterId, userId and flowName are - * strings. flowrunId is a long hence 8 bytes in size. Strings are variable - * in size (i.e. end whenever separator is encountered). This is used while - * decoding and helps in determining where to split. - */ - private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE, - Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG }; - - /* - * (non-Javadoc) - * - * Encodes FlowRunRowKey object into a byte array with each component/field - * in FlowRunRowKey separated by Separator#QUALIFIERS. This leads to an flow - * run row key of the form clusterId!userId!flowName!flowrunId If flowRunId - * in passed FlowRunRowKey object is null (and the fields preceding it i.e. - * clusterId, userId and flowName are not null), this returns a row key - * prefix of the form clusterId!userName!flowName! flowRunId is inverted - * while encoding as it helps maintain a descending order for flow keys in - * flow run table. - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common - * .KeyConverter#encode(java.lang.Object) - */ - @Override - public byte[] encode(FlowRunRowKey rowKey) { - byte[] first = - Separator.QUALIFIERS.join(Separator.encode(rowKey.getClusterId(), - Separator.SPACE, Separator.TAB, Separator.QUALIFIERS), Separator - .encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB, - Separator.QUALIFIERS), Separator.encode(rowKey.getFlowName(), - Separator.SPACE, Separator.TAB, Separator.QUALIFIERS)); - if (rowKey.getFlowRunId() == null) { - return Separator.QUALIFIERS.join(first, Separator.EMPTY_BYTES); - } else { - // Note that flowRunId is a long, so we can't encode them all at the - // same - // time. - byte[] second = - Bytes.toBytes(LongConverter.invertLong(rowKey.getFlowRunId())); - return Separator.QUALIFIERS.join(first, second); - } - } - - /* - * (non-Javadoc) - * - * Decodes an flow run row key of the form - * clusterId!userId!flowName!flowrunId represented in byte format and - * converts it into an FlowRunRowKey object. flowRunId is inverted while - * decoding as it was inverted while encoding. - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common - * .KeyConverter#decode(byte[]) - */ - @Override - public FlowRunRowKey decode(byte[] rowKey) { - byte[][] rowKeyComponents = - Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES); - if (rowKeyComponents.length != 4) { - throw new IllegalArgumentException("the row key is not valid for " - + "a flow run"); - } - String clusterId = - Separator.decode(Bytes.toString(rowKeyComponents[0]), - Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); - String userId = - Separator.decode(Bytes.toString(rowKeyComponents[1]), - Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); - String flowName = - Separator.decode(Bytes.toString(rowKeyComponents[2]), - Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); - Long flowRunId = - LongConverter.invertLong(Bytes.toLong(rowKeyComponents[3])); - return new FlowRunRowKey(clusterId, userId, flowName, flowRunId); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java deleted file mode 100644 index 23ebc66..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.flow; - -import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix; - -/** - * Represents a partial rowkey (without the flowRunId) for the flow run table. - */ -public class FlowRunRowKeyPrefix extends FlowRunRowKey implements - RowKeyPrefix { - - /** - * Constructs a row key prefix for the flow run table as follows: - * {@code clusterId!userI!flowName!}. - * - * @param clusterId identifying the cluster - * @param userId identifying the user - * @param flowName identifying the flow - */ - public FlowRunRowKeyPrefix(String clusterId, String userId, - String flowName) { - super(clusterId, userId, flowName, null); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.application. - * RowKeyPrefix#getRowKeyPrefix() - */ - public byte[] getRowKeyPrefix() { - // We know we're a FlowRunRowKey with null florRunId, so we can simply - // delegate - return super.getRowKey(); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java deleted file mode 100644 index ec973cb..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java +++ /dev/null @@ -1,150 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.flow; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.regionserver.BloomType; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Coprocessor; - -/** - * The flow run table has column family info - * Stores per flow run information - * aggregated across applications. - * - * Metrics are also stored in the info column family. - * - * Example flow run table record: - * - *

- * flow_run table
- * |-------------------------------------------|
- * |  Row key   | Column Family                |
- * |            | info                         |
- * |-------------------------------------------|
- * | clusterId! | flow_version:version7        |
- * | userName!  |                              |
- * | flowName!  | running_apps:1               |
- * | flowRunId  |                              |
- * |            | min_start_time:1392995080000 |
- * |            | #0:""                        |
- * |            |                              |
- * |            | min_start_time:1392995081012 |
- * |            | #0:appId2                    |
- * |            |                              |
- * |            | min_start_time:1392993083210 |
- * |            | #0:appId3                    |
- * |            |                              |
- * |            |                              |
- * |            | max_end_time:1392993084018   |
- * |            | #0:""                        |
- * |            |                              |
- * |            |                              |
- * |            | m!mapInputRecords:127        |
- * |            | #0:""                        |
- * |            |                              |
- * |            | m!mapInputRecords:31         |
- * |            | #2:appId2                    |
- * |            |                              |
- * |            | m!mapInputRecords:37         |
- * |            | #1:appId3                    |
- * |            |                              |
- * |            |                              |
- * |            | m!mapOutputRecords:181       |
- * |            | #0:""                        |
- * |            |                              |
- * |            | m!mapOutputRecords:37        |
- * |            | #1:appId3                    |
- * |            |                              |
- * |            |                              |
- * |-------------------------------------------|
- * 
- */ -public class FlowRunTable extends BaseTable { - /** entity prefix. */ - private static final String PREFIX = - YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".flowrun"; - - /** config param name that specifies the flowrun table name. */ - public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name"; - - /** default value for flowrun table name. */ - public static final String DEFAULT_TABLE_NAME = "timelineservice.flowrun"; - - private static final Log LOG = LogFactory.getLog(FlowRunTable.class); - - /** default max number of versions. */ - public static final int DEFAULT_METRICS_MAX_VERSIONS = Integer.MAX_VALUE; - - public FlowRunTable() { - super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable - * (org.apache.hadoop.hbase.client.Admin, - * org.apache.hadoop.conf.Configuration) - */ - public void createTable(Admin admin, Configuration hbaseConf) - throws IOException { - - TableName table = getTableName(hbaseConf); - if (admin.tableExists(table)) { - // do not disable / delete existing table - // similar to the approach taken by map-reduce jobs when - // output directory exists - throw new IOException("Table " + table.getNameAsString() - + " already exists."); - } - - HTableDescriptor flowRunTableDescp = new HTableDescriptor(table); - HColumnDescriptor infoCF = - new HColumnDescriptor(FlowRunColumnFamily.INFO.getBytes()); - infoCF.setBloomFilterType(BloomType.ROWCOL); - flowRunTableDescp.addFamily(infoCF); - infoCF.setMinVersions(1); - infoCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS); - - // TODO: figure the split policy - String coprocessorJarPathStr = hbaseConf.get( - YarnConfiguration.FLOW_RUN_COPROCESSOR_JAR_HDFS_LOCATION, - YarnConfiguration.DEFAULT_HDFS_LOCATION_FLOW_RUN_COPROCESSOR_JAR); - - Path coprocessorJarPath = new Path(coprocessorJarPathStr); - LOG.info("CoprocessorJarPath=" + coprocessorJarPath.toString()); - flowRunTableDescp.addCoprocessor( - FlowRunCoprocessor.class.getCanonicalName(), coprocessorJarPath, - Coprocessor.PRIORITY_USER, null); - admin.createTable(flowRunTableDescp); - LOG.info("Status of table creation for " + table.getNameAsString() + "=" - + admin.tableExists(table)); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java deleted file mode 100644 index 0e3c8ee..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java +++ /dev/null @@ -1,728 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage.flow; - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.hadoop.hbase.regionserver.ScannerContext; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; - -import com.google.common.annotations.VisibleForTesting; - -/** - * Invoked via the coprocessor when a Get or a Scan is issued for flow run - * table. Looks through the list of cells per row, checks their tags and does - * operation on those cells as per the cell tags. Transforms reads of the stored - * metrics into calculated sums for each column Also, finds the min and max for - * start and end times in a flow run. - */ -class FlowScanner implements RegionScanner, Closeable { - - private static final Log LOG = LogFactory.getLog(FlowScanner.class); - - /** - * use a special application id to represent the flow id this is needed since - * TimestampGenerator parses the app id to generate a cell timestamp. - */ - private static final String FLOW_APP_ID = "application_00000000000_0000"; - - private final Region region; - private final InternalScanner flowRunScanner; - private final int batchSize; - private final long appFinalValueRetentionThreshold; - private RegionScanner regionScanner; - private boolean hasMore; - private byte[] currentRow; - private List availableCells = new ArrayList<>(); - private int currentIndex; - private FlowScannerOperation action = FlowScannerOperation.READ; - - FlowScanner(RegionCoprocessorEnvironment env, InternalScanner internalScanner, - FlowScannerOperation action) { - this(env, null, internalScanner, action); - } - - FlowScanner(RegionCoprocessorEnvironment env, Scan incomingScan, - InternalScanner internalScanner, FlowScannerOperation action) { - this.batchSize = incomingScan == null ? -1 : incomingScan.getBatch(); - // TODO initialize other scan attributes like Scan#maxResultSize - this.flowRunScanner = internalScanner; - if (internalScanner instanceof RegionScanner) { - this.regionScanner = (RegionScanner) internalScanner; - } - this.action = action; - if (env == null) { - this.appFinalValueRetentionThreshold = - YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD; - this.region = null; - } else { - this.region = env.getRegion(); - Configuration hbaseConf = env.getConfiguration(); - this.appFinalValueRetentionThreshold = hbaseConf.getLong( - YarnConfiguration.APP_FINAL_VALUE_RETENTION_THRESHOLD, - YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD); - } - if (LOG.isDebugEnabled()) { - LOG.debug(" batch size=" + batchSize); - } - } - - - /* - * (non-Javadoc) - * - * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getRegionInfo() - */ - @Override - public HRegionInfo getRegionInfo() { - return region.getRegionInfo(); - } - - @Override - public boolean nextRaw(List cells) throws IOException { - return nextRaw(cells, ScannerContext.newBuilder().build()); - } - - @Override - public boolean nextRaw(List cells, ScannerContext scannerContext) - throws IOException { - return nextInternal(cells, scannerContext); - } - - @Override - public boolean next(List cells) throws IOException { - return next(cells, ScannerContext.newBuilder().build()); - } - - @Override - public boolean next(List cells, ScannerContext scannerContext) - throws IOException { - return nextInternal(cells, scannerContext); - } - - /** - * Get value converter associated with a column or a column prefix. If nothing - * matches, generic converter is returned. - * @param colQualifierBytes - * @return value converter implementation. - */ - private static ValueConverter getValueConverter(byte[] colQualifierBytes) { - // Iterate over all the column prefixes for flow run table and get the - // appropriate converter for the column qualifier passed if prefix matches. - for (FlowRunColumnPrefix colPrefix : FlowRunColumnPrefix.values()) { - byte[] colPrefixBytes = colPrefix.getColumnPrefixBytes(""); - if (Bytes.compareTo(colPrefixBytes, 0, colPrefixBytes.length, - colQualifierBytes, 0, colPrefixBytes.length) == 0) { - return colPrefix.getValueConverter(); - } - } - // Iterate over all the columns for flow run table and get the - // appropriate converter for the column qualifier passed if match occurs. - for (FlowRunColumn column : FlowRunColumn.values()) { - if (Bytes.compareTo( - column.getColumnQualifierBytes(), colQualifierBytes) == 0) { - return column.getValueConverter(); - } - } - // Return generic converter if nothing matches. - return GenericConverter.getInstance(); - } - - /** - * This method loops through the cells in a given row of the - * {@link FlowRunTable}. It looks at the tags of each cell to figure out how - * to process the contents. It then calculates the sum or min or max for each - * column or returns the cell as is. - * - * @param cells - * @param scannerContext - * @return true if next row is available for the scanner, false otherwise - * @throws IOException - */ - private boolean nextInternal(List cells, ScannerContext scannerContext) - throws IOException { - Cell cell = null; - startNext(); - // Loop through all the cells in this row - // For min/max/metrics we do need to scan the entire set of cells to get the - // right one - // But with flush/compaction, the number of cells being scanned will go down - // cells are grouped per column qualifier then sorted by cell timestamp - // (latest to oldest) per column qualifier - // So all cells in one qualifier come one after the other before we see the - // next column qualifier - ByteArrayComparator comp = new ByteArrayComparator(); - byte[] previousColumnQualifier = Separator.EMPTY_BYTES; - AggregationOperation currentAggOp = null; - SortedSet currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR); - Set alreadySeenAggDim = new HashSet<>(); - int addedCnt = 0; - long currentTimestamp = System.currentTimeMillis(); - ValueConverter converter = null; - int limit = batchSize; - - while (limit <= 0 || addedCnt < limit) { - cell = peekAtNextCell(scannerContext); - if (cell == null) { - break; - } - byte[] currentColumnQualifier = CellUtil.cloneQualifier(cell); - if (previousColumnQualifier == null) { - // first time in loop - previousColumnQualifier = currentColumnQualifier; - } - - converter = getValueConverter(currentColumnQualifier); - if (comp.compare(previousColumnQualifier, currentColumnQualifier) != 0) { - addedCnt += emitCells(cells, currentColumnCells, currentAggOp, - converter, currentTimestamp); - resetState(currentColumnCells, alreadySeenAggDim); - previousColumnQualifier = currentColumnQualifier; - currentAggOp = getCurrentAggOp(cell); - converter = getValueConverter(currentColumnQualifier); - } - collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim, - converter, scannerContext); - nextCell(scannerContext); - } - if ((!currentColumnCells.isEmpty()) && ((limit <= 0 || addedCnt < limit))) { - addedCnt += emitCells(cells, currentColumnCells, currentAggOp, converter, - currentTimestamp); - if (LOG.isDebugEnabled()) { - if (addedCnt > 0) { - LOG.debug("emitted cells. " + addedCnt + " for " + this.action - + " rowKey=" - + FlowRunRowKey.parseRowKey(CellUtil.cloneRow(cells.get(0)))); - } else { - LOG.debug("emitted no cells for " + this.action); - } - } - } - return hasMore(); - } - - private AggregationOperation getCurrentAggOp(Cell cell) { - List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), - cell.getTagsLength()); - // We assume that all the operations for a particular column are the same - return HBaseTimelineStorageUtils.getAggregationOperationFromTagsList(tags); - } - - /** - * resets the parameters to an initialized state for next loop iteration. - * - * @param cell - * @param currentAggOp - * @param currentColumnCells - * @param alreadySeenAggDim - * @param collectedButNotEmitted - */ - private void resetState(SortedSet currentColumnCells, - Set alreadySeenAggDim) { - currentColumnCells.clear(); - alreadySeenAggDim.clear(); - } - - private void collectCells(SortedSet currentColumnCells, - AggregationOperation currentAggOp, Cell cell, - Set alreadySeenAggDim, ValueConverter converter, - ScannerContext scannerContext) throws IOException { - - if (currentAggOp == null) { - // not a min/max/metric cell, so just return it as is - currentColumnCells.add(cell); - return; - } - - switch (currentAggOp) { - case GLOBAL_MIN: - if (currentColumnCells.size() == 0) { - currentColumnCells.add(cell); - } else { - Cell currentMinCell = currentColumnCells.first(); - Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp, - (NumericValueConverter) converter); - if (!currentMinCell.equals(newMinCell)) { - currentColumnCells.remove(currentMinCell); - currentColumnCells.add(newMinCell); - } - } - break; - case GLOBAL_MAX: - if (currentColumnCells.size() == 0) { - currentColumnCells.add(cell); - } else { - Cell currentMaxCell = currentColumnCells.first(); - Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp, - (NumericValueConverter) converter); - if (!currentMaxCell.equals(newMaxCell)) { - currentColumnCells.remove(currentMaxCell); - currentColumnCells.add(newMaxCell); - } - } - break; - case SUM: - case SUM_FINAL: - if (LOG.isTraceEnabled()) { - LOG.trace("In collect cells " - + " FlowSannerOperation=" - + this.action - + " currentAggOp=" - + currentAggOp - + " cell qualifier=" - + Bytes.toString(CellUtil.cloneQualifier(cell)) - + " cell value= " - + converter.decodeValue(CellUtil.cloneValue(cell)) - + " timestamp=" + cell.getTimestamp()); - } - - // only if this app has not been seen yet, add to current column cells - List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), - cell.getTagsLength()); - String aggDim = HBaseTimelineStorageUtils - .getAggregationCompactionDimension(tags); - if (!alreadySeenAggDim.contains(aggDim)) { - // if this agg dimension has already been seen, - // since they show up in sorted order - // we drop the rest which are older - // in other words, this cell is older than previously seen cells - // for that agg dim - // but when this agg dim is not seen, - // consider this cell in our working set - currentColumnCells.add(cell); - alreadySeenAggDim.add(aggDim); - } - break; - default: - break; - } // end of switch case - } - - /* - * Processes the cells in input param currentColumnCells and populates - * List cells as the output based on the input AggregationOperation - * parameter. - */ - private int emitCells(List cells, SortedSet currentColumnCells, - AggregationOperation currentAggOp, ValueConverter converter, - long currentTimestamp) throws IOException { - if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) { - return 0; - } - if (currentAggOp == null) { - cells.addAll(currentColumnCells); - return currentColumnCells.size(); - } - if (LOG.isTraceEnabled()) { - LOG.trace("In emitCells " + this.action + " currentColumnCells size= " - + currentColumnCells.size() + " currentAggOp" + currentAggOp); - } - - switch (currentAggOp) { - case GLOBAL_MIN: - case GLOBAL_MAX: - cells.addAll(currentColumnCells); - return currentColumnCells.size(); - case SUM: - case SUM_FINAL: - switch (action) { - case FLUSH: - case MINOR_COMPACTION: - cells.addAll(currentColumnCells); - return currentColumnCells.size(); - case READ: - Cell sumCell = processSummation(currentColumnCells, - (NumericValueConverter) converter); - cells.add(sumCell); - return 1; - case MAJOR_COMPACTION: - List finalCells = processSummationMajorCompaction( - currentColumnCells, (NumericValueConverter) converter, - currentTimestamp); - cells.addAll(finalCells); - return finalCells.size(); - default: - cells.addAll(currentColumnCells); - return currentColumnCells.size(); - } - default: - cells.addAll(currentColumnCells); - return currentColumnCells.size(); - } - } - - /* - * Returns a cell whose value is the sum of all cell values in the input set. - * The new cell created has the timestamp of the most recent metric cell. The - * sum of a metric for a flow run is the summation at the point of the last - * metric update in that flow till that time. - */ - private Cell processSummation(SortedSet currentColumnCells, - NumericValueConverter converter) throws IOException { - Number sum = 0; - Number currentValue = 0; - long ts = 0L; - long mostCurrentTimestamp = 0L; - Cell mostRecentCell = null; - for (Cell cell : currentColumnCells) { - currentValue = (Number) converter.decodeValue(CellUtil.cloneValue(cell)); - ts = cell.getTimestamp(); - if (mostCurrentTimestamp < ts) { - mostCurrentTimestamp = ts; - mostRecentCell = cell; - } - sum = converter.add(sum, currentValue); - } - byte[] sumBytes = converter.encodeValue(sum); - Cell sumCell = - HBaseTimelineStorageUtils.createNewCell(mostRecentCell, sumBytes); - return sumCell; - } - - - /** - * Returns a list of cells that contains - * - * A) the latest cells for applications that haven't finished yet - * B) summation - * for the flow, based on applications that have completed and are older than - * a certain time - * - * The new cell created has the timestamp of the most recent metric cell. The - * sum of a metric for a flow run is the summation at the point of the last - * metric update in that flow till that time. - */ - @VisibleForTesting - List processSummationMajorCompaction( - SortedSet currentColumnCells, NumericValueConverter converter, - long currentTimestamp) - throws IOException { - Number sum = 0; - Number currentValue = 0; - long ts = 0L; - boolean summationDone = false; - List finalCells = new ArrayList(); - if (currentColumnCells == null) { - return finalCells; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("In processSummationMajorCompaction," - + " will drop cells older than " + currentTimestamp - + " CurrentColumnCells size=" + currentColumnCells.size()); - } - - for (Cell cell : currentColumnCells) { - AggregationOperation cellAggOp = getCurrentAggOp(cell); - // if this is the existing flow sum cell - List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), - cell.getTagsLength()); - String appId = HBaseTimelineStorageUtils - .getAggregationCompactionDimension(tags); - if (appId == FLOW_APP_ID) { - sum = converter.add(sum, currentValue); - summationDone = true; - if (LOG.isTraceEnabled()) { - LOG.trace("reading flow app id sum=" + sum); - } - } else { - currentValue = (Number) converter.decodeValue(CellUtil - .cloneValue(cell)); - // read the timestamp truncated by the generator - ts = TimestampGenerator.getTruncatedTimestamp(cell.getTimestamp()); - if ((cellAggOp == AggregationOperation.SUM_FINAL) - && ((ts + this.appFinalValueRetentionThreshold) - < currentTimestamp)) { - sum = converter.add(sum, currentValue); - summationDone = true; - if (LOG.isTraceEnabled()) { - LOG.trace("MAJOR COMPACTION loop sum= " + sum - + " discarding now: " + " qualifier=" - + Bytes.toString(CellUtil.cloneQualifier(cell)) + " value=" - + converter.decodeValue(CellUtil.cloneValue(cell)) - + " timestamp=" + cell.getTimestamp() + " " + this.action); - } - } else { - // not a final value but it's the latest cell for this app - // so include this cell in the list of cells to write back - finalCells.add(cell); - } - } - } - if (summationDone) { - Cell anyCell = currentColumnCells.first(); - List tags = new ArrayList(); - Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), - Bytes.toBytes(FLOW_APP_ID)); - tags.add(t); - t = new Tag(AggregationCompactionDimension.APPLICATION_ID.getTagType(), - Bytes.toBytes(FLOW_APP_ID)); - tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); - Cell sumCell = HBaseTimelineStorageUtils.createNewCell( - CellUtil.cloneRow(anyCell), - CellUtil.cloneFamily(anyCell), - CellUtil.cloneQualifier(anyCell), - TimestampGenerator.getSupplementedTimestamp( - System.currentTimeMillis(), FLOW_APP_ID), - converter.encodeValue(sum), tagByteArray); - finalCells.add(sumCell); - if (LOG.isTraceEnabled()) { - LOG.trace("MAJOR COMPACTION final sum= " + sum + " for " - + Bytes.toString(CellUtil.cloneQualifier(sumCell)) - + " " + this.action); - } - LOG.info("After major compaction for qualifier=" - + Bytes.toString(CellUtil.cloneQualifier(sumCell)) - + " with currentColumnCells.size=" - + currentColumnCells.size() - + " returning finalCells.size=" + finalCells.size() - + " with sum=" + sum.longValue() - + " with cell timestamp " + sumCell.getTimestamp()); - } else { - String qualifier = ""; - LOG.info("After major compaction for qualifier=" + qualifier - + " with currentColumnCells.size=" - + currentColumnCells.size() - + " returning finalCells.size=" + finalCells.size() - + " with zero sum=" - + sum.longValue()); - } - return finalCells; - } - - /** - * Determines which cell is to be returned based on the values in each cell - * and the comparison operation MIN or MAX. - * - * @param previouslyChosenCell - * @param currentCell - * @param currentAggOp - * @return the cell which is the min (or max) cell - * @throws IOException - */ - private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell, - AggregationOperation currentAggOp, NumericValueConverter converter) - throws IOException { - if (previouslyChosenCell == null) { - return currentCell; - } - try { - Number previouslyChosenCellValue = (Number)converter.decodeValue( - CellUtil.cloneValue(previouslyChosenCell)); - Number currentCellValue = (Number) converter.decodeValue(CellUtil - .cloneValue(currentCell)); - switch (currentAggOp) { - case GLOBAL_MIN: - if (converter.compare( - currentCellValue, previouslyChosenCellValue) < 0) { - // new value is minimum, hence return this cell - return currentCell; - } else { - // previously chosen value is miniumum, hence return previous min cell - return previouslyChosenCell; - } - case GLOBAL_MAX: - if (converter.compare( - currentCellValue, previouslyChosenCellValue) > 0) { - // new value is max, hence return this cell - return currentCell; - } else { - // previously chosen value is max, hence return previous max cell - return previouslyChosenCell; - } - default: - return currentCell; - } - } catch (IllegalArgumentException iae) { - LOG.error("caught iae during conversion to long ", iae); - return currentCell; - } - } - - @Override - public void close() throws IOException { - if (flowRunScanner != null) { - flowRunScanner.close(); - } else { - LOG.warn("scanner close called but scanner is null"); - } - } - - /** - * Called to signal the start of the next() call by the scanner. - */ - public void startNext() { - currentRow = null; - } - - /** - * Returns whether or not the underlying scanner has more rows. - */ - public boolean hasMore() { - return currentIndex < availableCells.size() ? true : hasMore; - } - - /** - * Returns the next available cell for the current row and advances the - * pointer to the next cell. This method can be called multiple times in a row - * to advance through all the available cells. - * - * @param scannerContext - * context information for the batch of cells under consideration - * @return the next available cell or null if no more cells are available for - * the current row - * @throws IOException - */ - public Cell nextCell(ScannerContext scannerContext) throws IOException { - Cell cell = peekAtNextCell(scannerContext); - if (cell != null) { - currentIndex++; - } - return cell; - } - - /** - * Returns the next available cell for the current row, without advancing the - * pointer. Calling this method multiple times in a row will continue to - * return the same cell. - * - * @param scannerContext - * context information for the batch of cells under consideration - * @return the next available cell or null if no more cells are available for - * the current row - * @throws IOException if any problem is encountered while grabbing the next - * cell. - */ - public Cell peekAtNextCell(ScannerContext scannerContext) throws IOException { - if (currentIndex >= availableCells.size()) { - // done with current batch - availableCells.clear(); - currentIndex = 0; - hasMore = flowRunScanner.next(availableCells, scannerContext); - } - Cell cell = null; - if (currentIndex < availableCells.size()) { - cell = availableCells.get(currentIndex); - if (currentRow == null) { - currentRow = CellUtil.cloneRow(cell); - } else if (!CellUtil.matchingRow(cell, currentRow)) { - // moved on to the next row - // don't use the current cell - // also signal no more cells for this row - return null; - } - } - return cell; - } - - /* - * (non-Javadoc) - * - * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMaxResultSize() - */ - @Override - public long getMaxResultSize() { - if (regionScanner == null) { - throw new IllegalStateException( - "RegionScanner.isFilterDone() called when the flow " - + "scanner's scanner is not a RegionScanner"); - } - return regionScanner.getMaxResultSize(); - } - - /* - * (non-Javadoc) - * - * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMvccReadPoint() - */ - @Override - public long getMvccReadPoint() { - if (regionScanner == null) { - throw new IllegalStateException( - "RegionScanner.isFilterDone() called when the flow " - + "scanner's internal scanner is not a RegionScanner"); - } - return regionScanner.getMvccReadPoint(); - } - - /* - * (non-Javadoc) - * - * @see org.apache.hadoop.hbase.regionserver.RegionScanner#isFilterDone() - */ - @Override - public boolean isFilterDone() throws IOException { - if (regionScanner == null) { - throw new IllegalStateException( - "RegionScanner.isFilterDone() called when the flow " - + "scanner's internal scanner is not a RegionScanner"); - } - return regionScanner.isFilterDone(); - - } - - /* - * (non-Javadoc) - * - * @see org.apache.hadoop.hbase.regionserver.RegionScanner#reseek(byte[]) - */ - @Override - public boolean reseek(byte[] bytes) throws IOException { - if (regionScanner == null) { - throw new IllegalStateException( - "RegionScanner.reseek() called when the flow " - + "scanner's internal scanner is not a RegionScanner"); - } - return regionScanner.reseek(bytes); - } - - @Override - public int getBatch() { - return batchSize; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java deleted file mode 100644 index 73c666f..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.flow; - - -/** - * Identifies the scanner operation on the {@link FlowRunTable}. - */ -public enum FlowScannerOperation { - - /** - * If the scanner is opened for reading - * during preGet or preScan. - */ - READ, - - /** - * If the scanner is opened during preFlush. - */ - FLUSH, - - /** - * If the scanner is opened during minor Compaction. - */ - MINOR_COMPACTION, - - /** - * If the scanner is opened during major Compaction. - */ - MAJOR_COMPACTION -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java deleted file mode 100644 index 04963f3..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Package org.apache.hadoop.yarn.server.timelineservice.storage.flow - * contains classes related to implementation for flow related tables, viz. flow - * run table and flow activity table. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -package org.apache.hadoop.yarn.server.timelineservice.storage.flow; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org