Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EB4FD200C10 for ; Fri, 20 Jan 2017 06:39:26 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id E9D04160B5B; Fri, 20 Jan 2017 05:39:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BF294160B57 for ; Fri, 20 Jan 2017 06:39:24 +0100 (CET) Received: (qmail 42007 invoked by uid 500); 20 Jan 2017 05:39:15 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 38696 invoked by uid 99); 20 Jan 2017 05:39:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Jan 2017 05:39:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B647EF4048; Fri, 20 Jan 2017 05:39:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sjlee@apache.org To: common-commits@hadoop.apache.org Date: Fri, 20 Jan 2017 05:39:36 -0000 Message-Id: In-Reply-To: <1efa9515342a4c9898e60a1ae85951f3@git.apache.org> References: <1efa9515342a4c9898e60a1ae85951f3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [25/28] hadoop git commit: YARN-5928. Move ATSv2 HBase backend code into a new module that is only dependent at runtime by yarn servers. Contributed by Haibo Chen. archived-at: Fri, 20 Jan 2017 05:39:27 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ec7f92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java new file mode 100644 index 0000000..8e6c259 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * contains the constants used in the context of schema accesses for + * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity} + * information. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public final class TimelineHBaseSchemaConstants { + private TimelineHBaseSchemaConstants() { + } + + /** + * Used to create a pre-split for tables starting with a username in the + * prefix. TODO: this may have to become a config variable (string with + * separators) so that different installations can presplit based on their own + * commonly occurring names. + */ + private final static byte[][] USERNAME_SPLITS = { + Bytes.toBytes("a"), Bytes.toBytes("ad"), Bytes.toBytes("an"), + Bytes.toBytes("b"), Bytes.toBytes("ca"), Bytes.toBytes("cl"), + Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"), + Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i"), + Bytes.toBytes("j"), Bytes.toBytes("k"), Bytes.toBytes("l"), + Bytes.toBytes("m"), Bytes.toBytes("n"), Bytes.toBytes("o"), + Bytes.toBytes("q"), Bytes.toBytes("r"), Bytes.toBytes("s"), + Bytes.toBytes("se"), Bytes.toBytes("t"), Bytes.toBytes("u"), + Bytes.toBytes("v"), Bytes.toBytes("w"), Bytes.toBytes("x"), + Bytes.toBytes("y"), Bytes.toBytes("z") + }; + + /** + * The length at which keys auto-split. + */ + public static final String USERNAME_SPLIT_KEY_PREFIX_LENGTH = "4"; + + /** + * @return splits for splits where a user is a prefix. + */ + public static byte[][] getUsernameSplits() { + byte[][] kloon = USERNAME_SPLITS.clone(); + // Deep copy. + for (int row = 0; row < USERNAME_SPLITS.length; row++) { + kloon[row] = Bytes.copy(USERNAME_SPLITS[row]); + } + return kloon; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ec7f92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java new file mode 100644 index 0000000..d03b37d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.yarn.api.records.ApplicationId; + +/** + * Utility class that allows HBase coprocessors to interact with unique + * timestamps. + */ +public class TimestampGenerator { + + /* + * if this is changed, then reading cell timestamps written with older + * multiplier value will not work + */ + public static final long TS_MULTIPLIER = 1000000L; + + private final AtomicLong lastTimestamp = new AtomicLong(); + + /** + * Returns the current wall clock time in milliseconds, multiplied by the + * required precision. + * + * @return current timestamp. + */ + public long currentTime() { + // We want to align cell timestamps with current time. + // cell timestamps are not be less than + // System.currentTimeMillis() * TS_MULTIPLIER. + return System.currentTimeMillis() * TS_MULTIPLIER; + } + + /** + * Returns a timestamp value unique within the scope of this + * {@code TimestampGenerator} instance. For usage by HBase + * {@code RegionObserver} coprocessors, this normally means unique within a + * given region. + * + * Unlikely scenario of generating a non-unique timestamp: if there is a + * sustained rate of more than 1M hbase writes per second AND if region fails + * over within that time range of timestamps being generated then there may be + * collisions writing to a cell version of the same column. + * + * @return unique timestamp. + */ + public long getUniqueTimestamp() { + long lastTs; + long nextTs; + do { + lastTs = lastTimestamp.get(); + nextTs = Math.max(lastTs + 1, currentTime()); + } while (!lastTimestamp.compareAndSet(lastTs, nextTs)); + return nextTs; + } + + /** + * Returns a timestamp multiplied with TS_MULTIPLIER and last few digits of + * application id. + * + * Unlikely scenario of generating a timestamp that is a duplicate: If more + * than a 1M concurrent apps are running in one flow run AND write to same + * column at the same time, then say appId of 1M and 1 will overlap + * with appId of 001 and there may be collisions for that flow run's + * specific column. + * + * @param incomingTS Timestamp to be converted. + * @param appId Application Id. + * @return a timestamp multiplied with TS_MULTIPLIER and last few digits of + * application id + */ + public static long getSupplementedTimestamp(long incomingTS, String appId) { + long suffix = getAppIdSuffix(appId); + long outgoingTS = incomingTS * TS_MULTIPLIER + suffix; + return outgoingTS; + + } + + private static long getAppIdSuffix(String appIdStr) { + if (appIdStr == null) { + return 0L; + } + ApplicationId appId = ApplicationId.fromString(appIdStr); + long id = appId.getId() % TS_MULTIPLIER; + return id; + } + + /** + * truncates the last few digits of the timestamp which were supplemented by + * the TimestampGenerator#getSupplementedTimestamp function. + * + * @param incomingTS Timestamp to be truncated. + * @return a truncated timestamp value + */ + public static long getTruncatedTimestamp(long incomingTS) { + return incomingTS / TS_MULTIPLIER; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ec7f92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TypedBufferedMutator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TypedBufferedMutator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TypedBufferedMutator.java new file mode 100644 index 0000000..64a11f8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TypedBufferedMutator.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import org.apache.hadoop.hbase.client.BufferedMutator; + +/** + * Just a typed wrapper around {@link BufferedMutator} used to ensure that + * columns can write only to the table mutator for the right table. + */ +public interface TypedBufferedMutator extends BufferedMutator { + // This class is intentionally left (almost) blank +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ec7f92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ValueConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ValueConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ValueConverter.java new file mode 100644 index 0000000..757a6d3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ValueConverter.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import java.io.IOException; + +/** + * Converter used to encode/decode value associated with a column prefix or a + * column. + */ +public interface ValueConverter { + + /** + * Encode an object as a byte array depending on the converter implementation. + * + * @param value Value to be encoded. + * @return a byte array + * @throws IOException if any problem is encountered while encoding. + */ + byte[] encodeValue(Object value) throws IOException; + + /** + * Decode a byte array and convert it into an object depending on the + * converter implementation. + * + * @param bytes Byte array to be decoded. + * @return an object + * @throws IOException if any problem is encountered while decoding. + */ + Object decodeValue(byte[] bytes) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ec7f92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java new file mode 100644 index 0000000..dcccf74 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.timelineservice.storage contains + * classes which define and implement reading and writing to backend storage. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ec7f92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java new file mode 100644 index 0000000..93b4b36 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.entity; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; + +/** + * Identifies fully qualified columns for the {@link EntityTable}. + */ +public enum EntityColumn implements Column { + + /** + * Identifier for the entity. + */ + ID(EntityColumnFamily.INFO, "id"), + + /** + * The type of entity. + */ + TYPE(EntityColumnFamily.INFO, "type"), + + /** + * When the entity was created. + */ + CREATED_TIME(EntityColumnFamily.INFO, "created_time", new LongConverter()), + + /** + * The version of the flow that this entity belongs to. + */ + FLOW_VERSION(EntityColumnFamily.INFO, "flow_version"); + + private final ColumnHelper column; + private final ColumnFamily columnFamily; + private final String columnQualifier; + private final byte[] columnQualifierBytes; + + EntityColumn(ColumnFamily columnFamily, + String columnQualifier) { + this(columnFamily, columnQualifier, GenericConverter.getInstance()); + } + + EntityColumn(ColumnFamily columnFamily, + String columnQualifier, ValueConverter converter) { + this.columnFamily = columnFamily; + this.columnQualifier = columnQualifier; + // Future-proof by ensuring the right column prefix hygiene. + this.columnQualifierBytes = + Bytes.toBytes(Separator.SPACE.encode(columnQualifier)); + this.column = new ColumnHelper(columnFamily, converter); + } + + /** + * @return the column name value + */ + private String getColumnQualifier() { + return columnQualifier; + } + + public void store(byte[] rowKey, + TypedBufferedMutator tableMutator, Long timestamp, + Object inputValue, Attribute... attributes) throws IOException { + column.store(rowKey, tableMutator, columnQualifierBytes, timestamp, + inputValue, attributes); + } + + public Object readResult(Result result) throws IOException { + return column.readResult(result, columnQualifierBytes); + } + + /** + * Retrieve an {@link EntityColumn} given a name, or null if there is no + * match. The following holds true: {@code columnFor(x) == columnFor(y)} if + * and only if {@code x.equals(y)} or {@code (x == y == null)} + * + * @param columnQualifier Name of the column to retrieve + * @return the corresponding {@link EntityColumn} or null + */ + public static final EntityColumn columnFor(String columnQualifier) { + + // Match column based on value, assume column family matches. + for (EntityColumn ec : EntityColumn.values()) { + // Find a match based only on name. + if (ec.getColumnQualifier().equals(columnQualifier)) { + return ec; + } + } + + // Default to null + return null; + } + + @Override + public byte[] getColumnQualifierBytes() { + return columnQualifierBytes.clone(); + } + + @Override + public byte[] getColumnFamilyBytes() { + return columnFamily.getBytes(); + } + + @Override + public ValueConverter getValueConverter() { + return column.getValueConverter(); + } + + /** + * Retrieve an {@link EntityColumn} given a name, or null if there is no + * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)} + * if and only if {@code a.equals(b) & x.equals(y)} or + * {@code (x == y == null)} + * + * @param columnFamily The columnFamily for which to retrieve the column. + * @param name Name of the column to retrieve + * @return the corresponding {@link EntityColumn} or null if both arguments + * don't match. + */ + public static final EntityColumn columnFor(EntityColumnFamily columnFamily, + String name) { + + for (EntityColumn ec : EntityColumn.values()) { + // Find a match based column family and on name. + if (ec.columnFamily.equals(columnFamily) + && ec.getColumnQualifier().equals(name)) { + return ec; + } + } + + // Default to null + return null; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ec7f92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java new file mode 100644 index 0000000..7c63727 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.entity; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; + +/** + * Represents the entity table column families. + */ +public enum EntityColumnFamily implements ColumnFamily { + + /** + * Info column family houses known columns, specifically ones included in + * columnfamily filters. + */ + INFO("i"), + + /** + * Configurations are in a separate column family for two reasons: a) the size + * of the config values can be very large and b) we expect that config values + * are often separately accessed from other metrics and info columns. + */ + CONFIGS("c"), + + /** + * Metrics have a separate column family, because they have a separate TTL. + */ + METRICS("m"); + + /** + * Byte representation of this column family. + */ + private final byte[] bytes; + + /** + * @param value create a column family with this name. Must be lower case and + * without spaces. + */ + EntityColumnFamily(String value) { + // column families should be lower case and not contain any spaces. + this.bytes = Bytes.toBytes(Separator.SPACE.encode(value)); + } + + public byte[] getBytes() { + return Bytes.copy(bytes); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ec7f92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java new file mode 100644 index 0000000..e410549 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java @@ -0,0 +1,300 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.entity; + +import java.io.IOException; +import java.util.Map; +import java.util.NavigableMap; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; + +/** + * Identifies partially qualified columns for the entity table. + */ +public enum EntityColumnPrefix implements ColumnPrefix { + + /** + * To store TimelineEntity getIsRelatedToEntities values. + */ + IS_RELATED_TO(EntityColumnFamily.INFO, "s"), + + /** + * To store TimelineEntity getRelatesToEntities values. + */ + RELATES_TO(EntityColumnFamily.INFO, "r"), + + /** + * To store TimelineEntity info values. + */ + INFO(EntityColumnFamily.INFO, "i"), + + /** + * Lifecycle events for an entity. + */ + EVENT(EntityColumnFamily.INFO, "e", true), + + /** + * Config column stores configuration with config key as the column name. + */ + CONFIG(EntityColumnFamily.CONFIGS, null), + + /** + * Metrics are stored with the metric name as the column name. + */ + METRIC(EntityColumnFamily.METRICS, null, new LongConverter()); + + private final ColumnHelper column; + private final ColumnFamily columnFamily; + + /** + * Can be null for those cases where the provided column qualifier is the + * entire column name. + */ + private final String columnPrefix; + private final byte[] columnPrefixBytes; + + /** + * Private constructor, meant to be used by the enum definition. + * + * @param columnFamily that this column is stored in. + * @param columnPrefix for this column. + */ + EntityColumnPrefix(ColumnFamily columnFamily, + String columnPrefix) { + this(columnFamily, columnPrefix, false, GenericConverter.getInstance()); + } + + EntityColumnPrefix(ColumnFamily columnFamily, + String columnPrefix, boolean compondColQual) { + this(columnFamily, columnPrefix, compondColQual, + GenericConverter.getInstance()); + } + + EntityColumnPrefix(ColumnFamily columnFamily, + String columnPrefix, ValueConverter converter) { + this(columnFamily, columnPrefix, false, converter); + } + + /** + * Private constructor, meant to be used by the enum definition. + * + * @param columnFamily that this column is stored in. + * @param columnPrefix for this column. + * @param converter used to encode/decode values to be stored in HBase for + * this column prefix. + */ + EntityColumnPrefix(ColumnFamily columnFamily, + String columnPrefix, boolean compondColQual, ValueConverter converter) { + column = new ColumnHelper(columnFamily, converter); + this.columnFamily = columnFamily; + this.columnPrefix = columnPrefix; + if (columnPrefix == null) { + this.columnPrefixBytes = null; + } else { + // Future-proof by ensuring the right column prefix hygiene. + this.columnPrefixBytes = + Bytes.toBytes(Separator.SPACE.encode(columnPrefix)); + } + } + + /** + * @return the column name value + */ + public String getColumnPrefix() { + return columnPrefix; + } + + @Override + public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) { + return ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifierPrefix); + } + + @Override + public byte[] getColumnPrefixBytes(String qualifierPrefix) { + return ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifierPrefix); + } + + @Override + public byte[] getColumnFamilyBytes() { + return columnFamily.getBytes(); + } + + @Override + public ValueConverter getValueConverter() { + return column.getValueConverter(); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #store(byte[], + * org.apache.hadoop.yarn.server.timelineservice.storage.common. + * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object, + * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[]) + */ + public void store(byte[] rowKey, + TypedBufferedMutator tableMutator, String qualifier, + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException { + + // Null check + if (qualifier == null) { + throw new IOException("Cannot store column with null qualifier in " + + tableMutator.getName().getNameAsString()); + } + + byte[] columnQualifier = getColumnPrefixBytes(qualifier); + + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, + attributes); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #store(byte[], + * org.apache.hadoop.yarn.server.timelineservice.storage.common. + * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object) + */ + public void store(byte[] rowKey, + TypedBufferedMutator tableMutator, byte[] qualifier, + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException { + + // Null check + if (qualifier == null) { + throw new IOException("Cannot store column with null qualifier in " + + tableMutator.getName().getNameAsString()); + } + + byte[] columnQualifier = getColumnPrefixBytes(qualifier); + + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, + attributes); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String) + */ + public Object readResult(Result result, String qualifier) throws IOException { + byte[] columnQualifier = + ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); + return column.readResult(result, columnQualifier); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResults(org.apache.hadoop.hbase.client.Result, + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter) + */ + public Map readResults(Result result, + KeyConverter keyConverter) throws IOException { + return column.readResults(result, columnPrefixBytes, keyConverter); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result, + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter) + */ + public NavigableMap> + readResultsWithTimestamps(Result result, KeyConverter keyConverter) + throws IOException { + return column.readResultsWithTimestamps(result, columnPrefixBytes, + keyConverter); + } + + /** + * Retrieve an {@link EntityColumnPrefix} given a name, or null if there is no + * match. The following holds true: {@code columnFor(x) == columnFor(y)} if + * and only if {@code x.equals(y)} or {@code (x == y == null)} + * + * @param columnPrefix Name of the column to retrieve + * @return the corresponding {@link EntityColumnPrefix} or null + */ + public static final EntityColumnPrefix columnFor(String columnPrefix) { + + // Match column based on value, assume column family matches. + for (EntityColumnPrefix ecp : EntityColumnPrefix.values()) { + // Find a match based only on name. + if (ecp.getColumnPrefix().equals(columnPrefix)) { + return ecp; + } + } + + // Default to null + return null; + } + + /** + * Retrieve an {@link EntityColumnPrefix} given a name, or null if there is no + * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)} + * if and only if {@code (x == y == null)} or + * {@code a.equals(b) & x.equals(y)} + * + * @param columnFamily The columnFamily for which to retrieve the column. + * @param columnPrefix Name of the column to retrieve + * @return the corresponding {@link EntityColumnPrefix} or null if both + * arguments don't match. + */ + public static final EntityColumnPrefix columnFor( + EntityColumnFamily columnFamily, String columnPrefix) { + + // TODO: needs unit test to confirm and need to update javadoc to explain + // null prefix case. + + for (EntityColumnPrefix ecp : EntityColumnPrefix.values()) { + // Find a match based column family and on name. + if (ecp.columnFamily.equals(columnFamily) + && (((columnPrefix == null) && (ecp.getColumnPrefix() == null)) || + (ecp.getColumnPrefix().equals(columnPrefix)))) { + return ecp; + } + } + + // Default to null + return null; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ec7f92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java new file mode 100644 index 0000000..a8f1d0c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.entity; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; + +/** + * Represents a rowkey for the entity table. + */ +public class EntityRowKey { + private final String clusterId; + private final String userId; + private final String flowName; + private final Long flowRunId; + private final String appId; + private final String entityType; + private final Long entityIdPrefix; + private final String entityId; + private final KeyConverter entityRowKeyConverter = + new EntityRowKeyConverter(); + + public EntityRowKey(String clusterId, String userId, String flowName, + Long flowRunId, String appId, String entityType, Long entityIdPrefix, + String entityId) { + this.clusterId = clusterId; + this.userId = userId; + this.flowName = flowName; + this.flowRunId = flowRunId; + this.appId = appId; + this.entityType = entityType; + this.entityIdPrefix = entityIdPrefix; + this.entityId = entityId; + } + + public String getClusterId() { + return clusterId; + } + + public String getUserId() { + return userId; + } + + public String getFlowName() { + return flowName; + } + + public Long getFlowRunId() { + return flowRunId; + } + + public String getAppId() { + return appId; + } + + public String getEntityType() { + return entityType; + } + + public String getEntityId() { + return entityId; + } + + public Long getEntityIdPrefix() { + return entityIdPrefix; + } + + /** + * Constructs a row key for the entity table as follows: + * {@code userName!clusterId!flowName!flowRunId!AppId!entityType!entityId}. + * Typically used while querying a specific entity. + * + * @return byte array with the row key. + */ + public byte[] getRowKey() { + return entityRowKeyConverter.encode(this); + } + + /** + * Given the raw row key as bytes, returns the row key as an object. + * + * @param rowKey byte representation of row key. + * @return An EntityRowKey object. + */ + public static EntityRowKey parseRowKey(byte[] rowKey) { + return new EntityRowKeyConverter().decode(rowKey); + } + + /** + * Encodes and decodes row key for entity table. The row key is of the form : + * userName!clusterId!flowName!flowRunId!appId!entityType!entityId. flowRunId + * is a long, appId is encoded/decoded using {@link AppIdKeyConverter} and + * rest are strings. + *

+ */ + final private static class EntityRowKeyConverter implements + KeyConverter { + + private final AppIdKeyConverter appIDKeyConverter = new AppIdKeyConverter(); + + private EntityRowKeyConverter() { + } + + /** + * Entity row key is of the form + * userName!clusterId!flowName!flowRunId!appId!entityType!entityId w. each + * segment separated by !. The sizes below indicate sizes of each one of + * these segments in sequence. clusterId, userName, flowName, entityType and + * entityId are strings. flowrunId is a long hence 8 bytes in size. app id + * is represented as 12 bytes with cluster timestamp part of appid being 8 + * bytes (long) and seq id being 4 bytes(int). Strings are variable in size + * (i.e. end whenever separator is encountered). This is used while decoding + * and helps in determining where to split. + */ + private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE, + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, + AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE, + Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE }; + + /* + * (non-Javadoc) + * + * Encodes EntityRowKey object into a byte array with each component/field + * in EntityRowKey separated by Separator#QUALIFIERS. This leads to an + * entity table row key of the form + * userName!clusterId!flowName!flowRunId!appId!entityType!entityId If + * entityType in passed EntityRowKey object is null (and the fields + * preceding it i.e. clusterId, userId and flowName, flowRunId and appId + * are not null), this returns a row key prefix of the form + * userName!clusterId!flowName!flowRunId!appId! and if entityId in + * EntityRowKey is null (other 6 components are not null), this returns a + * row key prefix of the form + * userName!clusterId!flowName!flowRunId!appId!entityType! flowRunId is + * inverted while encoding as it helps maintain a descending order for row + * keys in entity table. + * + * @see org.apache.hadoop.yarn.server.timelineservice.storage.common + * .KeyConverter#encode(java.lang.Object) + */ + @Override + public byte[] encode(EntityRowKey rowKey) { + byte[] user = + Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB, + Separator.QUALIFIERS); + byte[] cluster = + Separator.encode(rowKey.getClusterId(), Separator.SPACE, + Separator.TAB, Separator.QUALIFIERS); + byte[] flow = + Separator.encode(rowKey.getFlowName(), Separator.SPACE, + Separator.TAB, Separator.QUALIFIERS); + byte[] first = Separator.QUALIFIERS.join(user, cluster, flow); + // Note that flowRunId is a long, so we can't encode them all at the same + // time. + byte[] second = + Bytes.toBytes(LongConverter.invertLong(rowKey.getFlowRunId())); + byte[] third = appIDKeyConverter.encode(rowKey.getAppId()); + if (rowKey.getEntityType() == null) { + return Separator.QUALIFIERS.join(first, second, third, + Separator.EMPTY_BYTES); + } + byte[] entityType = + Separator.encode(rowKey.getEntityType(), Separator.SPACE, + Separator.TAB, Separator.QUALIFIERS); + + if (rowKey.getEntityIdPrefix() == null) { + return Separator.QUALIFIERS.join(first, second, third, entityType, + Separator.EMPTY_BYTES); + } + + byte[] enitityIdPrefix = Bytes.toBytes(rowKey.getEntityIdPrefix()); + + if (rowKey.getEntityId() == null) { + return Separator.QUALIFIERS.join(first, second, third, entityType, + enitityIdPrefix, Separator.EMPTY_BYTES); + } + + byte[] entityId = Separator.encode(rowKey.getEntityId(), Separator.SPACE, + Separator.TAB, Separator.QUALIFIERS); + + byte[] fourth = + Separator.QUALIFIERS.join(entityType, enitityIdPrefix, entityId); + + return Separator.QUALIFIERS.join(first, second, third, fourth); + } + + /* + * (non-Javadoc) + * + * Decodes an application row key of the form + * userName!clusterId!flowName!flowRunId!appId!entityType!entityId + * represented in byte format and converts it into an EntityRowKey object. + * flowRunId is inverted while decoding as it was inverted while encoding. + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common + * .KeyConverter#decode(byte[]) + */ + @Override + public EntityRowKey decode(byte[] rowKey) { + byte[][] rowKeyComponents = + Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES); + if (rowKeyComponents.length != 8) { + throw new IllegalArgumentException("the row key is not valid for " + + "an entity"); + } + String userId = + Separator.decode(Bytes.toString(rowKeyComponents[0]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + String clusterId = + Separator.decode(Bytes.toString(rowKeyComponents[1]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + String flowName = + Separator.decode(Bytes.toString(rowKeyComponents[2]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + Long flowRunId = + LongConverter.invertLong(Bytes.toLong(rowKeyComponents[3])); + String appId = appIDKeyConverter.decode(rowKeyComponents[4]); + String entityType = + Separator.decode(Bytes.toString(rowKeyComponents[5]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + + Long entityPrefixId = Bytes.toLong(rowKeyComponents[6]); + + String entityId = + Separator.decode(Bytes.toString(rowKeyComponents[7]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + return new EntityRowKey(clusterId, userId, flowName, flowRunId, appId, + entityType, entityPrefixId, entityId); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ec7f92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java new file mode 100644 index 0000000..47a1789 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.entity; + +import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix; + +/** + * Represents a partial rowkey without the entityId or without entityType and + * entityId for the entity table. + * + */ +public class EntityRowKeyPrefix extends EntityRowKey implements + RowKeyPrefix { + + /** + * Creates a prefix which generates the following rowKeyPrefixes for the + * entity table: + * {@code userName!clusterId!flowName!flowRunId!AppId!entityType!}. + * @param clusterId identifying the cluster + * @param userId identifying the user + * @param flowName identifying the flow + * @param flowRunId identifying the individual run of this flow + * @param appId identifying the application + * @param entityType which entity type + * @param entityIdPrefix for entityId + * @param entityId for an entity + */ + public EntityRowKeyPrefix(String clusterId, String userId, String flowName, + Long flowRunId, String appId, String entityType, Long entityIdPrefix, + String entityId) { + super(clusterId, userId, flowName, flowRunId, appId, entityType, + entityIdPrefix, entityId); + } + + /** + * Creates a prefix which generates the following rowKeyPrefixes for the + * entity table: + * {@code userName!clusterId!flowName!flowRunId!AppId!entityType!entityId}. + * + * @param clusterId identifying the cluster + * @param userId identifying the user + * @param flowName identifying the flow + * @param flowRunId identifying the individual run of this flow + * @param appId identifying the application + */ + public EntityRowKeyPrefix(String clusterId, String userId, String flowName, + Long flowRunId, String appId) { + this(clusterId, userId, flowName, flowRunId, appId, null, null, null); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.application. + * RowKeyPrefix#getRowKeyPrefix() + */ + public byte[] getRowKeyPrefix() { + return super.getRowKey(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ec7f92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java new file mode 100644 index 0000000..027c8d5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.entity; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants; + +/** + * The entity table as column families info, config and metrics. Info stores + * information about a timeline entity object config stores configuration data + * of a timeline entity object metrics stores the metrics of a timeline entity + * object + * + * Example entity table record: + * + *

+ * |-------------------------------------------------------------------------|
+ * |  Row       | Column Family                | Column Family| Column Family|
+ * |  key       | info                         | metrics      | config       |
+ * |-------------------------------------------------------------------------|
+ * | userName!  | id:entityId                  | metricId1:   | configKey1:  |
+ * | clusterId! |                              | metricValue1 | configValue1 |
+ * | flowName!  | type:entityType              | @timestamp1  |              |
+ * | flowRunId! |                              |              | configKey2:  |
+ * | AppId!     | created_time:                | metricId1:   | configValue2 |
+ * | entityType!| 1392993084018                | metricValue2 |              |
+ * | idPrefix!  |                              | @timestamp2  |              |
+ * | entityId   | i!infoKey:                   |              |              |
+ * |            | infoValue                    | metricId1:   |              |
+ * |            |                              | metricValue1 |              |
+ * |            | r!relatesToKey:              | @timestamp2  |              |
+ * |            | id3=id4=id5                  |              |              |
+ * |            |                              |              |              |
+ * |            | s!isRelatedToKey             |              |              |
+ * |            | id7=id9=id6                  |              |              |
+ * |            |                              |              |              |
+ * |            | e!eventId=timestamp=infoKey: |              |              |
+ * |            | eventInfoValue               |              |              |
+ * |            |                              |              |              |
+ * |            | flowVersion:                 |              |              |
+ * |            | versionValue                 |              |              |
+ * |-------------------------------------------------------------------------|
+ * 
+ */ +public class EntityTable extends BaseTable { + /** entity prefix. */ + private static final String PREFIX = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + "entity"; + + /** config param name that specifies the entity table name. */ + public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name"; + + /** + * config param name that specifies the TTL for metrics column family in + * entity table. + */ + private static final String METRICS_TTL_CONF_NAME = PREFIX + + ".table.metrics.ttl"; + + /** default value for entity table name. */ + public static final String DEFAULT_TABLE_NAME = "timelineservice.entity"; + + /** default TTL is 30 days for metrics timeseries. */ + private static final int DEFAULT_METRICS_TTL = 2592000; + + /** default max number of versions. */ + private static final int DEFAULT_METRICS_MAX_VERSIONS = 1000; + + private static final Log LOG = LogFactory.getLog(EntityTable.class); + + public EntityTable() { + super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable + * (org.apache.hadoop.hbase.client.Admin, + * org.apache.hadoop.conf.Configuration) + */ + public void createTable(Admin admin, Configuration hbaseConf) + throws IOException { + + TableName table = getTableName(hbaseConf); + if (admin.tableExists(table)) { + // do not disable / delete existing table + // similar to the approach taken by map-reduce jobs when + // output directory exists + throw new IOException("Table " + table.getNameAsString() + + " already exists."); + } + + HTableDescriptor entityTableDescp = new HTableDescriptor(table); + HColumnDescriptor infoCF = + new HColumnDescriptor(EntityColumnFamily.INFO.getBytes()); + infoCF.setBloomFilterType(BloomType.ROWCOL); + entityTableDescp.addFamily(infoCF); + + HColumnDescriptor configCF = + new HColumnDescriptor(EntityColumnFamily.CONFIGS.getBytes()); + configCF.setBloomFilterType(BloomType.ROWCOL); + configCF.setBlockCacheEnabled(true); + entityTableDescp.addFamily(configCF); + + HColumnDescriptor metricsCF = + new HColumnDescriptor(EntityColumnFamily.METRICS.getBytes()); + entityTableDescp.addFamily(metricsCF); + metricsCF.setBlockCacheEnabled(true); + // always keep 1 version (the latest) + metricsCF.setMinVersions(1); + metricsCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS); + metricsCF.setTimeToLive(hbaseConf.getInt(METRICS_TTL_CONF_NAME, + DEFAULT_METRICS_TTL)); + entityTableDescp.setRegionSplitPolicyClassName( + "org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy"); + entityTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length", + TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH); + admin.createTable(entityTableDescp, + TimelineHBaseSchemaConstants.getUsernameSplits()); + LOG.info("Status of table creation for " + table.getNameAsString() + "=" + + admin.tableExists(table)); + } + + /** + * @param metricsTTL time to live parameter for the metricss in this table. + * @param hbaseConf configururation in which to set the metrics TTL config + * variable. + */ + public void setMetricsTTL(int metricsTTL, Configuration hbaseConf) { + hbaseConf.setInt(METRICS_TTL_CONF_NAME, metricsTTL); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ec7f92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/package-info.java new file mode 100644 index 0000000..bb0e331 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.timelineservice.storage.entity + * contains classes related to implementation for entity table. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.storage.entity; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ec7f92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java new file mode 100644 index 0000000..4e2cf2d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Identifies the compaction dimensions for the data in the {@link FlowRunTable} + * . + */ +public enum AggregationCompactionDimension { + + /** + * the application id. + */ + APPLICATION_ID((byte) 101); + + private byte tagType; + private byte[] inBytes; + + private AggregationCompactionDimension(byte tagType) { + this.tagType = tagType; + this.inBytes = Bytes.toBytes(this.name()); + } + + public Attribute getAttribute(String attributeValue) { + return new Attribute(this.name(), Bytes.toBytes(attributeValue)); + } + + public byte getTagType() { + return tagType; + } + + public byte[] getInBytes() { + return this.inBytes.clone(); + } + + public static AggregationCompactionDimension + getAggregationCompactionDimension(String aggCompactDimStr) { + for (AggregationCompactionDimension aggDim : AggregationCompactionDimension + .values()) { + if (aggDim.name().equals(aggCompactDimStr)) { + return aggDim; + } + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ec7f92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java new file mode 100644 index 0000000..40cdd2c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Identifies the attributes to be set for puts into the {@link FlowRunTable}. + * The numbers used for tagType are prime numbers. + */ +public enum AggregationOperation { + + /** + * When the flow was started. + */ + GLOBAL_MIN((byte) 71), + + /** + * When it ended. + */ + GLOBAL_MAX((byte) 73), + + /** + * The metrics of the flow. + */ + SUM((byte) 79), + + /** + * application running. + */ + SUM_FINAL((byte) 83), + + /** + * Min value as per the latest timestamp + * seen for a given app. + */ + LATEST_MIN((byte) 89), + + /** + * Max value as per the latest timestamp + * seen for a given app. + */ + LATEST_MAX((byte) 97); + + private byte tagType; + private byte[] inBytes; + + private AggregationOperation(byte tagType) { + this.tagType = tagType; + this.inBytes = Bytes.toBytes(this.name()); + } + + public Attribute getAttribute() { + return new Attribute(this.name(), this.inBytes); + } + + public byte getTagType() { + return tagType; + } + + public byte[] getInBytes() { + return this.inBytes.clone(); + } + + /** + * returns the AggregationOperation enum that represents that string. + * @param aggOpStr Aggregation operation. + * @return the AggregationOperation enum that represents that string + */ + public static AggregationOperation getAggregationOperation(String aggOpStr) { + for (AggregationOperation aggOp : AggregationOperation.values()) { + if (aggOp.name().equals(aggOpStr)) { + return aggOp; + } + } + return null; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ec7f92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java new file mode 100644 index 0000000..d3de518 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +/** + * Defines the attribute tuple to be set for puts into the {@link FlowRunTable}. + */ +public class Attribute { + private final String name; + private final byte[] value; + + public Attribute(String name, byte[] value) { + this.name = name; + this.value = value.clone(); + } + + public String getName() { + return name; + } + + public byte[] getValue() { + return value.clone(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ec7f92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java new file mode 100644 index 0000000..f9eb5b4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; + +/** + * Represents the flow run table column families. + */ +public enum FlowActivityColumnFamily + implements ColumnFamily { + + /** + * Info column family houses known columns, specifically ones included in + * columnfamily filters. + */ + INFO("i"); + + /** + * Byte representation of this column family. + */ + private final byte[] bytes; + + /** + * @param value + * create a column family with this name. Must be lower case and + * without spaces. + */ + private FlowActivityColumnFamily(String value) { + // column families should be lower case and not contain any spaces. + this.bytes = Bytes.toBytes(Separator.SPACE.encode(value)); + } + + public byte[] getBytes() { + return Bytes.copy(bytes); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ec7f92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java new file mode 100644 index 0000000..439e0c8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import java.io.IOException; +import java.util.Map; +import java.util.NavigableMap; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; + +/** + * Identifies partially qualified columns for the {@link FlowActivityTable}. + */ +public enum FlowActivityColumnPrefix + implements ColumnPrefix { + + /** + * To store run ids of the flows. + */ + RUN_ID(FlowActivityColumnFamily.INFO, "r", null); + + private final ColumnHelper column; + private final ColumnFamily columnFamily; + + /** + * Can be null for those cases where the provided column qualifier is the + * entire column name. + */ + private final String columnPrefix; + private final byte[] columnPrefixBytes; + + private final AggregationOperation aggOp; + + /** + * Private constructor, meant to be used by the enum definition. + * + * @param columnFamily + * that this column is stored in. + * @param columnPrefix + * for this column. + */ + private FlowActivityColumnPrefix( + ColumnFamily columnFamily, String columnPrefix, + AggregationOperation aggOp) { + this(columnFamily, columnPrefix, aggOp, false); + } + + private FlowActivityColumnPrefix( + ColumnFamily columnFamily, String columnPrefix, + AggregationOperation aggOp, boolean compoundColQual) { + column = new ColumnHelper(columnFamily); + this.columnFamily = columnFamily; + this.columnPrefix = columnPrefix; + if (columnPrefix == null) { + this.columnPrefixBytes = null; + } else { + // Future-proof by ensuring the right column prefix hygiene. + this.columnPrefixBytes = Bytes.toBytes(Separator.SPACE + .encode(columnPrefix)); + } + this.aggOp = aggOp; + } + + /** + * @return the column name value + */ + public String getColumnPrefix() { + return columnPrefix; + } + + @Override + public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) { + return ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifierPrefix); + } + + @Override + public byte[] getColumnPrefixBytes(String qualifierPrefix) { + return ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifierPrefix); + } + + public byte[] getColumnPrefixBytes() { + return columnPrefixBytes.clone(); + } + + @Override + public byte[] getColumnFamilyBytes() { + return columnFamily.getBytes(); + } + + @Override + public ValueConverter getValueConverter() { + return column.getValueConverter(); + } + + public AggregationOperation getAttribute() { + return aggOp; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #store(byte[], + * org.apache.hadoop.yarn.server.timelineservice.storage.common. + * TypedBufferedMutator, byte[], java.lang.Long, java.lang.Object, + * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[]) + */ + @Override + public void store(byte[] rowKey, + TypedBufferedMutator tableMutator, byte[] qualifier, + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException { + // Null check + if (qualifier == null) { + throw new IOException("Cannot store column with null qualifier in " + + tableMutator.getName().getNameAsString()); + } + + byte[] columnQualifier = getColumnPrefixBytes(qualifier); + Attribute[] combinedAttributes = + HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp); + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, + combinedAttributes); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String) + */ + public Object readResult(Result result, String qualifier) throws IOException { + byte[] columnQualifier = ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifier); + return column.readResult(result, columnQualifier); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResults(org.apache.hadoop.hbase.client.Result, + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter) + */ + public Map readResults(Result result, + KeyConverter keyConverter) throws IOException { + return column.readResults(result, columnPrefixBytes, keyConverter); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result, + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter) + */ + public NavigableMap> + readResultsWithTimestamps(Result result, KeyConverter keyConverter) + throws IOException { + return column.readResultsWithTimestamps(result, columnPrefixBytes, + keyConverter); + } + + /** + * Retrieve an {@link FlowActivityColumnPrefix} given a name, or null if there + * is no match. The following holds true: {@code columnFor(x) == columnFor(y)} + * if and only if {@code x.equals(y)} or {@code (x == y == null)} + * + * @param columnPrefix + * Name of the column to retrieve + * @return the corresponding {@link FlowActivityColumnPrefix} or null + */ + public static final FlowActivityColumnPrefix columnFor(String columnPrefix) { + + // Match column based on value, assume column family matches. + for (FlowActivityColumnPrefix flowActivityColPrefix : + FlowActivityColumnPrefix.values()) { + // Find a match based only on name. + if (flowActivityColPrefix.getColumnPrefix().equals(columnPrefix)) { + return flowActivityColPrefix; + } + } + // Default to null + return null; + } + + /** + * Retrieve an {@link FlowActivityColumnPrefix} given a name, or null if there + * is no match. The following holds true: + * {@code columnFor(a,x) == columnFor(b,y)} if and only if + * {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)} + * + * @param columnFamily + * The columnFamily for which to retrieve the column. + * @param columnPrefix + * Name of the column to retrieve + * @return the corresponding {@link FlowActivityColumnPrefix} or null if both + * arguments don't match. + */ + public static final FlowActivityColumnPrefix columnFor( + FlowActivityColumnFamily columnFamily, String columnPrefix) { + + // TODO: needs unit test to confirm and need to update javadoc to explain + // null prefix case. + + for (FlowActivityColumnPrefix flowActivityColumnPrefix : + FlowActivityColumnPrefix.values()) { + // Find a match based column family and on name. + if (flowActivityColumnPrefix.columnFamily.equals(columnFamily) + && (((columnPrefix == null) && (flowActivityColumnPrefix + .getColumnPrefix() == null)) || (flowActivityColumnPrefix + .getColumnPrefix().equals(columnPrefix)))) { + return flowActivityColumnPrefix; + } + } + // Default to null + return null; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #store(byte[], + * org.apache.hadoop.yarn.server.timelineservice.storage.common. + * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object, + * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[]) + */ + @Override + public void store(byte[] rowKey, + TypedBufferedMutator tableMutator, String qualifier, + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException { + // Null check + if (qualifier == null) { + throw new IOException("Cannot store column with null qualifier in " + + tableMutator.getName().getNameAsString()); + } + + byte[] columnQualifier = getColumnPrefixBytes(qualifier); + Attribute[] combinedAttributes = + HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp); + column.store(rowKey, tableMutator, columnQualifier, null, inputValue, + combinedAttributes); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org