Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 22B12182AF for ; Fri, 14 Aug 2015 18:42:34 +0000 (UTC) Received: (qmail 93104 invoked by uid 500); 14 Aug 2015 18:42:15 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 92955 invoked by uid 500); 14 Aug 2015 18:42:15 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 88873 invoked by uid 99); 14 Aug 2015 18:42:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Aug 2015 18:42:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4A504E08E0; Fri, 14 Aug 2015 18:42:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vinodkv@apache.org To: common-commits@hadoop.apache.org Date: Fri, 14 Aug 2015 18:42:50 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [39/50] [abbrv] hadoop git commit: YARN-3706. Generalize native HBase writer for additional tables (Joep Rottinghuis via sjlee) YARN-3706. Generalize native HBase writer for additional tables (Joep Rottinghuis via sjlee) (cherry picked from commit 9137aeae0dec83f9eff40d12cae712dfd508c0c5) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b0369827 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b0369827 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b0369827 Branch: refs/heads/YARN-2928 Commit: b0369827cae18c49af83d1a87e0ce3b4576ccf37 Parents: 566824e Author: Sangjin Lee Authored: Thu Jun 18 10:49:20 2015 -0700 Committer: Vinod Kumar Vavilapalli Committed: Fri Aug 14 11:23:26 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../storage/EntityColumnDetails.java | 110 ------ .../storage/EntityColumnFamily.java | 95 ----- .../storage/HBaseTimelineWriterImpl.java | 114 +++--- .../server/timelineservice/storage/Range.java | 59 ---- .../storage/TimelineEntitySchemaConstants.java | 71 ---- .../storage/TimelineSchemaCreator.java | 134 +------- .../timelineservice/storage/TimelineWriter.java | 3 +- .../storage/TimelineWriterUtils.java | 344 ------------------- .../storage/common/BaseTable.java | 118 +++++++ .../common/BufferedMutatorDelegator.java | 73 ++++ .../timelineservice/storage/common/Column.java | 59 ++++ .../storage/common/ColumnFamily.java | 34 ++ .../storage/common/ColumnHelper.java | 247 +++++++++++++ .../storage/common/ColumnPrefix.java | 83 +++++ .../timelineservice/storage/common/Range.java | 59 ++++ .../storage/common/Separator.java | 303 ++++++++++++++++ .../common/TimelineEntitySchemaConstants.java | 68 ++++ .../storage/common/TimelineWriterUtils.java | 127 +++++++ .../storage/common/TypedBufferedMutator.java | 28 ++ .../storage/common/package-info.java | 24 ++ .../storage/entity/EntityColumn.java | 141 ++++++++ .../storage/entity/EntityColumnFamily.java | 65 ++++ .../storage/entity/EntityColumnPrefix.java | 212 ++++++++++++ .../storage/entity/EntityRowKey.java | 93 +++++ .../storage/entity/EntityTable.java | 161 +++++++++ .../storage/entity/package-info.java | 25 ++ .../storage/TestHBaseTimelineWriterImpl.java | 252 ++++++++------ .../storage/common/TestSeparator.java | 129 +++++++ .../storage/common/TestTimelineWriterUtils.java | 29 ++ 30 files changed, 2301 insertions(+), 962 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0369827/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 17d8b7b..a7bae37 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -93,6 +93,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1 YARN-3276. Code cleanup for timeline service API records. (Junping Du via zjshen) + YARN-3706. Generalize native HBase writer for additional tables (Joep + Rottinghuis via sjlee) + OPTIMIZATIONS BUG FIXES http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0369827/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java deleted file mode 100644 index 2894c41..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import java.io.IOException; -import java.util.Set; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hbase.client.BufferedMutator; -import org.apache.hadoop.hbase.util.Bytes; - -/** - * Contains the Info Column Family details like Column names, types and byte - * representations for - * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity} - * object that is stored in hbase Also has utility functions for storing each of - * these to the backend - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -enum EntityColumnDetails { - ID(EntityColumnFamily.INFO, "id"), - TYPE(EntityColumnFamily.INFO, "type"), - CREATED_TIME(EntityColumnFamily.INFO, "created_time"), - MODIFIED_TIME(EntityColumnFamily.INFO, "modified_time"), - FLOW_VERSION(EntityColumnFamily.INFO, "flow_version"), - PREFIX_IS_RELATED_TO(EntityColumnFamily.INFO, "r"), - PREFIX_RELATES_TO(EntityColumnFamily.INFO, "s"), - PREFIX_EVENTS(EntityColumnFamily.INFO, "e"); - - private final EntityColumnFamily columnFamily; - private final String value; - private final byte[] inBytes; - - private EntityColumnDetails(EntityColumnFamily columnFamily, - String value) { - this.columnFamily = columnFamily; - this.value = value; - this.inBytes = Bytes.toBytes(this.value.toLowerCase()); - } - - public String getValue() { - return value; - } - - byte[] getInBytes() { - return inBytes; - } - - void store(byte[] rowKey, BufferedMutator entityTable, Object inputValue) - throws IOException { - TimelineWriterUtils.store(rowKey, entityTable, - this.columnFamily.getInBytes(), null, this.getInBytes(), inputValue, - null); - } - - /** - * stores events data with column prefix - */ - void store(byte[] rowKey, BufferedMutator entityTable, byte[] idBytes, - String key, Object inputValue) throws IOException { - TimelineWriterUtils.store(rowKey, entityTable, - this.columnFamily.getInBytes(), - // column prefix - TimelineWriterUtils.join( - TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES, - this.getInBytes(), idBytes), - // column qualifier - Bytes.toBytes(key), - inputValue, null); - } - - /** - * stores relation entities with a column prefix - */ - void store(byte[] rowKey, BufferedMutator entityTable, String key, - Set inputValue) throws IOException { - TimelineWriterUtils.store(rowKey, entityTable, - this.columnFamily.getInBytes(), - // column prefix - this.getInBytes(), - // column qualifier - Bytes.toBytes(key), - // value - TimelineWriterUtils.getValueAsString( - TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR, inputValue), - // cell timestamp - null); - } - - // TODO add a method that accepts a byte array, - // iterates over the enum and returns an enum from those bytes - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0369827/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnFamily.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnFamily.java deleted file mode 100644 index e556351..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnFamily.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hbase.client.BufferedMutator; -import org.apache.hadoop.hbase.util.Bytes; - -/** - * Contains the Column family names and byte representations for - * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity} - * object that is stored in hbase - * Also has utility functions for storing each of these to the backend - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -enum EntityColumnFamily { - INFO("i"), - CONFIG("c"), - METRICS("m"); - - private final String value; - private final byte[] inBytes; - - private EntityColumnFamily(String value) { - this.value = value; - this.inBytes = Bytes.toBytes(this.value.toLowerCase()); - } - - byte[] getInBytes() { - return inBytes; - } - - public String getValue() { - return value; - } - - /** - * stores the key as column and value as hbase column value in the given - * column family in the entity table - * - * @param rowKey - * @param entityTable - * @param inputValue - * @throws IOException - */ - public void store(byte[] rowKey, BufferedMutator entityTable, String key, - String inputValue) throws IOException { - if (key == null) { - return; - } - TimelineWriterUtils.store(rowKey, entityTable, this.getInBytes(), null, - Bytes.toBytes(key), inputValue, null); - } - - /** - * stores the values along with cell timestamp - * - * @param rowKey - * @param entityTable - * @param key - * @param timestamp - * @param inputValue - * @throws IOException - */ - public void store(byte[] rowKey, BufferedMutator entityTable, String key, - Long timestamp, Number inputValue) throws IOException { - if (key == null) { - return; - } - TimelineWriterUtils.store(rowKey, entityTable, this.getInBytes(), null, - Bytes.toBytes(key), inputValue, timestamp); - } - - // TODO add a method that accepts a byte array, - // iterates over the enum and returns an enum from those bytes -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0369827/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index aa71c6c..e48ca60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -26,19 +26,22 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; -import org.apache.hadoop.hbase.client.BufferedMutator; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineEntitySchemaConstants; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; /** * This implements a hbase based backend for storing application timeline entity @@ -50,7 +53,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements TimelineWriter { private Connection conn; - private BufferedMutator entityTable; + private TypedBufferedMutator entityTable; private static final Log LOG = LogFactory .getLog(HBaseTimelineWriterImpl.class); @@ -72,10 +75,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements super.serviceInit(conf); Configuration hbaseConf = HBaseConfiguration.create(conf); conn = ConnectionFactory.createConnection(hbaseConf); - TableName entityTableName = TableName.valueOf(hbaseConf.get( - TimelineEntitySchemaConstants.ENTITY_TABLE_NAME, - TimelineEntitySchemaConstants.DEFAULT_ENTITY_TABLE_NAME)); - entityTable = conn.getBufferedMutator(entityTableName); + entityTable = new EntityTable().getTableMutator(hbaseConf, conn); } /** @@ -86,9 +86,6 @@ public class HBaseTimelineWriterImpl extends AbstractService implements String flowName, String flowVersion, long flowRunId, String appId, TimelineEntities data) throws IOException { - byte[] rowKeyPrefix = TimelineWriterUtils.getRowKeyPrefix(clusterId, - userId, flowName, flowRunId, appId); - TimelineWriteResponse putStatus = new TimelineWriteResponse(); for (TimelineEntity te : data.getEntities()) { @@ -96,19 +93,19 @@ public class HBaseTimelineWriterImpl extends AbstractService implements if (te == null) { continue; } - // get row key - byte[] row = TimelineWriterUtils.join( - TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES, rowKeyPrefix, - Bytes.toBytes(te.getType()), Bytes.toBytes(te.getId())); - - storeInfo(row, te, flowVersion); - storeEvents(row, te.getEvents()); - storeConfig(row, te.getConfigs()); - storeMetrics(row, te.getMetrics()); - storeRelations(row, te.getIsRelatedToEntities(), - EntityColumnDetails.PREFIX_IS_RELATED_TO); - storeRelations(row, te.getRelatesToEntities(), - EntityColumnDetails.PREFIX_RELATES_TO); + + byte[] rowKey = + EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId, + te); + + storeInfo(rowKey, te, flowVersion); + storeEvents(rowKey, te.getEvents()); + storeConfig(rowKey, te.getConfigs()); + storeMetrics(rowKey, te.getMetrics()); + storeRelations(rowKey, te.getIsRelatedToEntities(), + EntityColumnPrefix.IS_RELATED_TO); + storeRelations(rowKey, te.getRelatesToEntities(), + EntityColumnPrefix.RELATES_TO); } return putStatus; @@ -119,10 +116,15 @@ public class HBaseTimelineWriterImpl extends AbstractService implements */ private void storeRelations(byte[] rowKey, Map> connectedEntities, - EntityColumnDetails columnNamePrefix) throws IOException { - for (Map.Entry> entry : connectedEntities.entrySet()) { - columnNamePrefix.store(rowKey, entityTable, entry.getKey(), - entry.getValue()); + EntityColumnPrefix entityColumnPrefix) throws IOException { + for (Map.Entry> connectedEntity : connectedEntities + .entrySet()) { + // id3?id4?id5 + String compoundValue = + Separator.VALUES.joinEncoded(connectedEntity.getValue()); + + entityColumnPrefix.store(rowKey, entityTable, connectedEntity.getKey(), + null, compoundValue); } } @@ -132,13 +134,13 @@ public class HBaseTimelineWriterImpl extends AbstractService implements private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion) throws IOException { - EntityColumnDetails.ID.store(rowKey, entityTable, te.getId()); - EntityColumnDetails.TYPE.store(rowKey, entityTable, te.getType()); - EntityColumnDetails.CREATED_TIME.store(rowKey, entityTable, + EntityColumn.ID.store(rowKey, entityTable, null, te.getId()); + EntityColumn.TYPE.store(rowKey, entityTable, null, te.getType()); + EntityColumn.CREATED_TIME.store(rowKey, entityTable, null, te.getCreatedTime()); - EntityColumnDetails.MODIFIED_TIME.store(rowKey, entityTable, + EntityColumn.MODIFIED_TIME.store(rowKey, entityTable, null, te.getModifiedTime()); - EntityColumnDetails.FLOW_VERSION.store(rowKey, entityTable, flowVersion); + EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion); } /** @@ -150,8 +152,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements return; } for (Map.Entry entry : config.entrySet()) { - EntityColumnFamily.CONFIG.store(rowKey, entityTable, - entry.getKey(), entry.getValue()); + EntityColumnPrefix.CONFIG.store(rowKey, entityTable, entry.getKey(), + null, entry.getValue()); } } @@ -163,11 +165,12 @@ public class HBaseTimelineWriterImpl extends AbstractService implements throws IOException { if (metrics != null) { for (TimelineMetric metric : metrics) { - String key = metric.getId(); + String metricColumnQualifier = metric.getId(); Map timeseries = metric.getValues(); - for (Map.Entry entry : timeseries.entrySet()) { - EntityColumnFamily.METRICS.store(rowKey, entityTable, key, - entry.getKey(), entry.getValue()); + for (Map.Entry timeseriesEntry : timeseries.entrySet()) { + Long timestamp = timeseriesEntry.getKey(); + EntityColumnPrefix.METRIC.store(rowKey, entityTable, + metricColumnQualifier, timestamp, timeseriesEntry.getValue()); } } } @@ -181,19 +184,27 @@ public class HBaseTimelineWriterImpl extends AbstractService implements if (events != null) { for (TimelineEvent event : events) { if (event != null) { - String id = event.getId(); - if (id != null) { - byte[] idBytes = Bytes.toBytes(id); + String eventId = event.getId(); + if (eventId != null) { Map eventInfo = event.getInfo(); if (eventInfo != null) { for (Map.Entry info : eventInfo.entrySet()) { - EntityColumnDetails.PREFIX_EVENTS.store(rowKey, - entityTable, idBytes, info.getKey(), info.getValue()); - } + // eventId?infoKey + byte[] columnQualifierFirst = + Bytes.toBytes(Separator.VALUES.encode(eventId)); + byte[] compoundColumnQualifierBytes = + Separator.VALUES.join(columnQualifierFirst, + Bytes.toBytes(info.getKey())); + // convert back to string to avoid additional API on store. + String compoundColumnQualifier = + Bytes.toString(compoundColumnQualifierBytes); + EntityColumnPrefix.METRIC.store(rowKey, entityTable, + compoundColumnQualifier, null, info.getValue()); + } // for info: eventInfo } } } - } + } // event : events } } @@ -204,8 +215,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements } /** - * close the hbase connections - * The close APIs perform flushing and release any + * close the hbase connections The close APIs perform flushing and release any * resources held */ @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0369827/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Range.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Range.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Range.java deleted file mode 100644 index 2a2db81..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Range.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -@InterfaceAudience.Private -@InterfaceStability.Unstable -public class Range { - private final int startIdx; - private final int endIdx; - - /** - * Defines a range from start index (inclusive) to end index (exclusive). - * - * @param start - * Starting index position - * @param end - * Ending index position (exclusive) - */ - public Range(int start, int end) { - if (start < 0 || end < start) { - throw new IllegalArgumentException( - "Invalid range, required that: 0 <= start <= end; start=" + start - + ", end=" + end); - } - - this.startIdx = start; - this.endIdx = end; - } - - public int start() { - return startIdx; - } - - public int end() { - return endIdx; - } - - public int length() { - return endIdx - startIdx; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0369827/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntitySchemaConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntitySchemaConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntitySchemaConstants.java deleted file mode 100644 index d95cbb2..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntitySchemaConstants.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.conf.YarnConfiguration; - -/** - * contains the constants used in the context of schema accesses for - * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity} - * information - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public class TimelineEntitySchemaConstants { - - /** entity prefix */ - public static final String ENTITY_PREFIX = - YarnConfiguration.TIMELINE_SERVICE_PREFIX - + ".entity"; - - /** config param name that specifies the entity table name */ - public static final String ENTITY_TABLE_NAME = ENTITY_PREFIX - + ".table.name"; - - /** - * config param name that specifies the TTL for metrics column family in - * entity table - */ - public static final String ENTITY_TABLE_METRICS_TTL = ENTITY_PREFIX - + ".table.metrics.ttl"; - - /** default value for entity table name */ - public static final String DEFAULT_ENTITY_TABLE_NAME = "timelineservice.entity"; - - /** in bytes default value for entity table name */ - static final byte[] DEFAULT_ENTITY_TABLE_NAME_BYTES = Bytes - .toBytes(DEFAULT_ENTITY_TABLE_NAME); - - /** separator in row key */ - public static final String ROW_KEY_SEPARATOR = "!"; - - /** byte representation of the separator in row key */ - static final byte[] ROW_KEY_SEPARATOR_BYTES = Bytes - .toBytes(ROW_KEY_SEPARATOR); - - public static final byte ZERO_BYTES = 0; - - /** default TTL is 30 days for metrics timeseries */ - public static final int ENTITY_TABLE_METRICS_TTL_DEFAULT = 2592000; - - /** default max number of versions */ - public static final int ENTITY_TABLE_METRICS_MAX_VERSIONS_DEFAULT = 1000; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0369827/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java index 820a6d1..a5cc2ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java @@ -19,21 +19,6 @@ package org.apache.hadoop.yarn.server.timelineservice.storage; import java.io.IOException; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.regionserver.BloomType; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.HelpFormatter; @@ -41,7 +26,18 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.cli.PosixParser; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; /** * This creates the schema for a hbase based backend for storing application @@ -53,18 +49,6 @@ public class TimelineSchemaCreator { final static String NAME = TimelineSchemaCreator.class.getSimpleName(); private static final Log LOG = LogFactory.getLog(TimelineSchemaCreator.class); - final static byte[][] splits = { Bytes.toBytes("a"), Bytes.toBytes("ad"), - Bytes.toBytes("an"), Bytes.toBytes("b"), Bytes.toBytes("ca"), - Bytes.toBytes("cl"), Bytes.toBytes("d"), Bytes.toBytes("e"), - Bytes.toBytes("f"), Bytes.toBytes("g"), Bytes.toBytes("h"), - Bytes.toBytes("i"), Bytes.toBytes("j"), Bytes.toBytes("k"), - Bytes.toBytes("l"), Bytes.toBytes("m"), Bytes.toBytes("n"), - Bytes.toBytes("o"), Bytes.toBytes("q"), Bytes.toBytes("r"), - Bytes.toBytes("s"), Bytes.toBytes("se"), Bytes.toBytes("t"), - Bytes.toBytes("u"), Bytes.toBytes("v"), Bytes.toBytes("w"), - Bytes.toBytes("x"), Bytes.toBytes("y"), Bytes.toBytes("z") }; - - public static final String SPLIT_KEY_PREFIX_LENGTH = "4"; public static void main(String[] args) throws Exception { @@ -79,13 +63,12 @@ public class TimelineSchemaCreator { // Grab the entityTableName argument String entityTableName = commandLine.getOptionValue("e"); if (StringUtils.isNotBlank(entityTableName)) { - hbaseConf.set(TimelineEntitySchemaConstants.ENTITY_TABLE_NAME, - entityTableName); + hbaseConf.set(EntityTable.TABLE_NAME_CONF_NAME, entityTableName); } - String entityTable_TTL_Metrics = commandLine.getOptionValue("m"); - if (StringUtils.isNotBlank(entityTable_TTL_Metrics)) { - hbaseConf.set(TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_TTL, - entityTable_TTL_Metrics); + String entityTableTTLMetrics = commandLine.getOptionValue("m"); + if (StringUtils.isNotBlank(entityTableTTLMetrics)) { + int metricsTTL = Integer.parseInt(entityTableTTLMetrics); + new EntityTable().setMetricsTTL(metricsTTL, hbaseConf); } createAllTables(hbaseConf); } @@ -136,7 +119,7 @@ public class TimelineSchemaCreator { if (admin == null) { throw new IOException("Cannot create table since admin is null"); } - createTimelineEntityTable(admin, hbaseConf); + new EntityTable().createTable(admin, hbaseConf); } finally { if (conn != null) { conn.close(); @@ -144,88 +127,5 @@ public class TimelineSchemaCreator { } } - /** - * Creates a table with column families info, config and metrics - * info stores information about a timeline entity object - * config stores configuration data of a timeline entity object - * metrics stores the metrics of a timeline entity object - * - * Example entity table record: - *
-   *|------------------------------------------------------------|
-   *|  Row       | Column Family  | Column Family | Column Family|
-   *|  key       | info           | metrics       | config       |
-   *|------------------------------------------------------------|
-   *| userName!  | id:entityId    | metricName1:  | configKey1:  |
-   *| clusterId! |                | metricValue1  | configValue1 |
-   *| flowId!    | type:entityType| @timestamp1   |              |
-   *| flowRunId! |                |               | configKey2:  |
-   *| AppId!     | created_time:  | metricName1:  | configValue2 |
-   *| entityType!| 1392993084018  | metricValue2  |              |
-   *| entityId   |                | @timestamp2   |              |
-   *|            | modified_time: |               |              |
-   *|            | 1392995081012  | metricName2:  |              |
-   *|            |                | metricValue1  |              |
-   *|            | r!relatesToKey:| @timestamp2   |              |
-   *|            | id3!id4!id5    |               |              |
-   *|            |                |               |              |
-   *|            | s!isRelatedToKey|              |              |
-   *|            | id7!id9!id5    |               |              |
-   *|            |                |               |              |
-   *|            | e!eventKey:    |               |              |
-   *|            | eventValue     |               |              |
-   *|            |                |               |              |
-   *|            | flowVersion:   |               |              |
-   *|            | versionValue   |               |              |
-   *|------------------------------------------------------------|
-   *
- * @param admin - * @param hbaseConf - * @throws IOException - */ - public static void createTimelineEntityTable(Admin admin, - Configuration hbaseConf) throws IOException { - - TableName table = TableName.valueOf(hbaseConf.get( - TimelineEntitySchemaConstants.ENTITY_TABLE_NAME, - TimelineEntitySchemaConstants.DEFAULT_ENTITY_TABLE_NAME)); - if (admin.tableExists(table)) { - // do not disable / delete existing table - // similar to the approach taken by map-reduce jobs when - // output directory exists - throw new IOException("Table " + table.getNameAsString() - + " already exists."); - } - - HTableDescriptor entityTableDescp = new HTableDescriptor(table); - HColumnDescriptor cf1 = new HColumnDescriptor( - EntityColumnFamily.INFO.getInBytes()); - cf1.setBloomFilterType(BloomType.ROWCOL); - entityTableDescp.addFamily(cf1); - - HColumnDescriptor cf2 = new HColumnDescriptor( - EntityColumnFamily.CONFIG.getInBytes()); - cf2.setBloomFilterType(BloomType.ROWCOL); - cf2.setBlockCacheEnabled(true); - entityTableDescp.addFamily(cf2); - - HColumnDescriptor cf3 = new HColumnDescriptor( - EntityColumnFamily.METRICS.getInBytes()); - entityTableDescp.addFamily(cf3); - cf3.setBlockCacheEnabled(true); - // always keep 1 version (the latest) - cf3.setMinVersions(1); - cf3.setMaxVersions(TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_MAX_VERSIONS_DEFAULT); - cf3.setTimeToLive(hbaseConf.getInt( - TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_TTL, - TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_TTL_DEFAULT)); - entityTableDescp - .setRegionSplitPolicyClassName("org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy"); - entityTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length", - SPLIT_KEY_PREFIX_LENGTH); - admin.createTable(entityTableDescp, splits); - LOG.info("Status of table creation for " + table.getNameAsString() + "=" - + admin.tableExists(table)); - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0369827/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java index 467bcec..494e8ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java @@ -15,17 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.yarn.server.timelineservice.storage; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; -import org.apache.hadoop.service.Service; /** * This interface is for storing application timeline information. http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0369827/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java deleted file mode 100644 index 113935e..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java +++ /dev/null @@ -1,344 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.client.BufferedMutator; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; -import org.apache.hadoop.yarn.server.timelineservice.storage.Range; - -/** - * bunch of utility functions used across TimelineWriter classes - */ -@InterfaceAudience.Public -@InterfaceStability.Unstable -public class TimelineWriterUtils { - - /** empty bytes */ - public static final byte[] EMPTY_BYTES = new byte[0]; - private static final String SPACE = " "; - private static final String UNDERSCORE = "_"; - private static final String EMPTY_STRING = ""; - - /** - * Returns a single byte array containing all of the individual component - * arrays separated by the separator array. - * - * @param separator - * @param components - * @return byte array after joining the components - */ - public static byte[] join(byte[] separator, byte[]... components) { - if (components == null || components.length == 0) { - return EMPTY_BYTES; - } - - int finalSize = 0; - if (separator != null) { - finalSize = separator.length * (components.length - 1); - } - for (byte[] comp : components) { - if (comp != null) { - finalSize += comp.length; - } - } - - byte[] buf = new byte[finalSize]; - int offset = 0; - for (int i = 0; i < components.length; i++) { - if (components[i] != null) { - System.arraycopy(components[i], 0, buf, offset, components[i].length); - offset += components[i].length; - if (i < (components.length - 1) && separator != null - && separator.length > 0) { - System.arraycopy(separator, 0, buf, offset, separator.length); - offset += separator.length; - } - } - } - return buf; - } - - /** - * Splits the source array into multiple array segments using the given - * separator, up to a maximum of count items. This will naturally produce - * copied byte arrays for each of the split segments. To identify the split - * ranges without the array copies, see - * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}. - * - * @param source - * @param separator - * @return byte[] array after splitting the source - */ - public static byte[][] split(byte[] source, byte[] separator) { - return split(source, separator, -1); - } - - /** - * Splits the source array into multiple array segments using the given - * separator, up to a maximum of count items. This will naturally produce - * copied byte arrays for each of the split segments. To identify the split - * ranges without the array copies, see - * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}. - * - * @param source - * @param separator - * @param limit - * @return byte[][] after splitting the input source - */ - public static byte[][] split(byte[] source, byte[] separator, int limit) { - List segments = splitRanges(source, separator, limit); - - byte[][] splits = new byte[segments.size()][]; - for (int i = 0; i < segments.size(); i++) { - Range r = segments.get(i); - byte[] tmp = new byte[r.length()]; - if (tmp.length > 0) { - System.arraycopy(source, r.start(), tmp, 0, r.length()); - } - splits[i] = tmp; - } - return splits; - } - - /** - * Returns a list of ranges identifying [start, end) -- closed, open -- - * positions within the source byte array that would be split using the - * separator byte array. - */ - public static List splitRanges(byte[] source, byte[] separator) { - return splitRanges(source, separator, -1); - } - - /** - * Returns a list of ranges identifying [start, end) -- closed, open -- - * positions within the source byte array that would be split using the - * separator byte array. - * @param source the source data - * @param separator the separator pattern to look for - * @param limit the maximum number of splits to identify in the source - */ - public static List splitRanges(byte[] source, byte[] separator, int limit) { - List segments = new ArrayList(); - if ((source == null) || (separator == null)) { - return segments; - } - int start = 0; - itersource: for (int i = 0; i < source.length; i++) { - for (int j = 0; j < separator.length; j++) { - if (source[i + j] != separator[j]) { - continue itersource; - } - } - // all separator elements matched - if (limit > 0 && segments.size() >= (limit-1)) { - // everything else goes in one final segment - break; - } - - segments.add(new Range(start, i)); - start = i + separator.length; - // i will be incremented again in outer for loop - i += separator.length-1; - } - // add in remaining to a final range - if (start <= source.length) { - segments.add(new Range(start, source.length)); - } - return segments; - } - - /** - * converts run id into it's inverse timestamp - * @param flowRunId - * @return inverted long - */ - public static long encodeRunId(Long flowRunId) { - return Long.MAX_VALUE - flowRunId; - } - - /** - * return a value from the Map as a String - * @param key - * @param values - * @return value as a String or "" - * @throws IOException - */ - public static String getValueAsString(final byte[] key, - final Map values) throws IOException { - if( values == null ) { - return EMPTY_STRING; - } - byte[] value = values.get(key); - if (value != null) { - return GenericObjectMapper.read(value).toString(); - } else { - return EMPTY_STRING; - } - } - - /** - * return a value from the Map as a long - * @param key - * @param values - * @return value as Long or 0L - * @throws IOException - */ - public static long getValueAsLong(final byte[] key, - final Map values) throws IOException { - if (values == null) { - return 0; - } - byte[] value = values.get(key); - if (value != null) { - Number val = (Number) GenericObjectMapper.read(value); - return val.longValue(); - } else { - return 0L; - } - } - - /** - * concates the values from a Set to return a single delimited string value - * @param rowKeySeparator - * @param values - * @return Value from the set of strings as a string - */ - public static String getValueAsString(String rowKeySeparator, - Set values) { - - if (values == null) { - return EMPTY_STRING; - } - StringBuilder concatStrings = new StringBuilder(); - for (String value : values) { - concatStrings.append(value); - concatStrings.append(rowKeySeparator); - } - // remove the last separator - if(concatStrings.length() > 1) { - concatStrings.deleteCharAt(concatStrings.lastIndexOf(rowKeySeparator)); - } - return concatStrings.toString(); - } - /** - * Constructs a row key prefix for the entity table - * @param clusterId - * @param userId - * @param flowId - * @param flowRunId - * @param appId - * @return byte array with the row key prefix - */ - static byte[] getRowKeyPrefix(String clusterId, String userId, String flowId, - Long flowRunId, String appId) { - return TimelineWriterUtils.join( - TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES, - Bytes.toBytes(cleanse(userId)), Bytes.toBytes(cleanse(clusterId)), - Bytes.toBytes(cleanse(flowId)), - Bytes.toBytes(TimelineWriterUtils.encodeRunId(flowRunId)), - Bytes.toBytes(cleanse(appId))); - } - - /** - * Takes a string token to be used as a key or qualifier and - * cleanses out reserved tokens. - * This operation is not symmetrical. - * Logic is to replace all spaces and separator chars in input with - * underscores. - * - * @param token token to cleanse. - * @return String with no spaces and no separator chars - */ - public static String cleanse(String token) { - if (token == null || token.length() == 0) { - return token; - } - - String cleansed = token.replaceAll(SPACE, UNDERSCORE); - cleansed = cleansed.replaceAll( - TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR, UNDERSCORE); - - return cleansed; - } - - /** - * stores the info to the table in hbase - * - * @param rowKey - * @param table - * @param columnFamily - * @param columnPrefix - * @param columnQualifier - * @param inputValue - * @param cellTimeStamp - * @throws IOException - */ - public static void store(byte[] rowKey, BufferedMutator table, byte[] columnFamily, - byte[] columnPrefix, byte[] columnQualifier, Object inputValue, - Long cellTimeStamp) throws IOException { - if ((rowKey == null) || (table == null) || (columnFamily == null) - || (columnQualifier == null) || (inputValue == null)) { - return; - } - - Put p = null; - if (cellTimeStamp == null) { - if (columnPrefix != null) { - // store with prefix - p = new Put(rowKey); - p.addColumn( - columnFamily, - join(TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES, - columnPrefix, columnQualifier), GenericObjectMapper - .write(inputValue)); - } else { - // store without prefix - p = new Put(rowKey); - p.addColumn(columnFamily, columnQualifier, - GenericObjectMapper.write(inputValue)); - } - } else { - // store with cell timestamp - Cell cell = CellUtil.createCell(rowKey, columnFamily, columnQualifier, - // set the cell timestamp - cellTimeStamp, - // KeyValue Type minimum - TimelineEntitySchemaConstants.ZERO_BYTES, - GenericObjectMapper.write(inputValue)); - p = new Put(rowKey); - p.add(cell); - } - if (p != null) { - table.mutate(p); - } - - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0369827/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java new file mode 100644 index 0000000..e8d8b5c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; + +/** + * Implements behavior common to tables used in the timeline service storage. + * + * @param reference to the table instance class itself for type safety. + */ +public abstract class BaseTable { + + /** + * Name of config variable that is used to point to this table + */ + private final String tableNameConfName; + + /** + * Unless the configuration overrides, this will be the default name for the + * table when it is created. + */ + private final String defaultTableName; + + /** + * @param tableNameConfName name of config variable that is used to point to + * this table. + */ + protected BaseTable(String tableNameConfName, String defaultTableName) { + this.tableNameConfName = tableNameConfName; + this.defaultTableName = defaultTableName; + } + + /** + * Used to create a type-safe mutator for this table. + * + * @param hbaseConf used to read table name + * @param conn used to create a table from. + * @return a type safe {@link BufferedMutator} for the entity table. + * @throws IOException + */ + public TypedBufferedMutator getTableMutator(Configuration hbaseConf, + Connection conn) throws IOException { + + TableName tableName = this.getTableName(hbaseConf); + + // Plain buffered mutator + BufferedMutator bufferedMutator = conn.getBufferedMutator(tableName); + + // Now make this thing type safe. + // This is how service initialization should hang on to this variable, with + // the proper type + TypedBufferedMutator table = + new BufferedMutatorDelegator(bufferedMutator); + + return table; + } + + /** + * @param hbaseConf used to read settings that override defaults + * @param conn used to create table from + * @param scan that specifies what you want to read from this table. + * @return scanner for the table. + * @throws IOException + */ + public ResultScanner getResultScanner(Configuration hbaseConf, + Connection conn, Scan scan) throws IOException { + Table table = conn.getTable(getTableName(hbaseConf)); + return table.getScanner(scan); + } + + /** + * Get the table name for this table. + * + * @param hbaseConf + */ + public TableName getTableName(Configuration hbaseConf) { + TableName table = + TableName.valueOf(hbaseConf.get(tableNameConfName, defaultTableName)); + return table; + + } + + /** + * Used to create the table in HBase. Should be called only once (per HBase + * instance). + * + * @param admin + * @param hbaseConf + */ + public abstract void createTable(Admin admin, Configuration hbaseConf) + throws IOException; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0369827/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java new file mode 100644 index 0000000..fe8f9c6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Mutation; + +/** + * To be used to wrap an actual {@link BufferedMutator} in a type safe manner + * + * @param The class referring to the table to be written to. + */ +class BufferedMutatorDelegator implements TypedBufferedMutator { + + private final BufferedMutator bufferedMutator; + + /** + * @param bufferedMutator the mutator to be wrapped for delegation. Shall not + * be null. + */ + public BufferedMutatorDelegator(BufferedMutator bufferedMutator) { + this.bufferedMutator = bufferedMutator; + } + + public TableName getName() { + return bufferedMutator.getName(); + } + + public Configuration getConfiguration() { + return bufferedMutator.getConfiguration(); + } + + public void mutate(Mutation mutation) throws IOException { + bufferedMutator.mutate(mutation); + } + + public void mutate(List mutations) throws IOException { + bufferedMutator.mutate(mutations); + } + + public void close() throws IOException { + bufferedMutator.close(); + } + + public void flush() throws IOException { + bufferedMutator.flush(); + } + + public long getWriteBufferSize() { + return bufferedMutator.getWriteBufferSize(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0369827/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java new file mode 100644 index 0000000..3397d62 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import java.io.IOException; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Result; + +/** + * A Column represents the way to store a fully qualified column in a specific + * table. + */ +public interface Column { + + /** + * Sends a Mutation to the table. The mutations will be buffered and sent over + * the wire as part of a batch. + * + * @param rowKey identifying the row to write. Nothing gets written when null. + * @param tableMutator used to modify the underlying HBase table. Caller is + * responsible to pass a mutator for the table that actually has this + * column. + * @param timestamp version timestamp. When null the server timestamp will be + * used. + * @param inputValue the value to write to the rowKey and column qualifier. + * Nothing gets written when null. + * @throws IOException + */ + public void store(byte[] rowKey, TypedBufferedMutator tableMutator, + Long timestamp, Object inputValue) throws IOException; + + /** + * Get the latest version of this specified column. Note: this call clones the + * value content of the hosting {@link Cell}. + * + * @param result Cannot be null + * @return result object (can be cast to whatever object was written to), or + * null when result doesn't contain this column. + * @throws IOException + */ + public Object readResult(Result result) throws IOException; + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0369827/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnFamily.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnFamily.java new file mode 100644 index 0000000..c84c016 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnFamily.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +/** + * Type safe column family. + * + * @param refers to the table for which this column family is used for. + */ +public interface ColumnFamily { + + /** + * Keep a local copy if you need to avoid overhead of repeated cloning. + * + * @return a clone of the byte representation of the column family. + */ + public byte[] getBytes(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0369827/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java new file mode 100644 index 0000000..6a204dc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.TreeMap; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; + +/** + * This class is meant to be used only by explicit Columns, and not directly to + * write by clients. + * + * @param refers to the table. + */ +public class ColumnHelper { + + private final ColumnFamily columnFamily; + + /** + * Local copy of bytes representation of columnFamily so that we can avoid + * cloning a new copy over and over. + */ + private final byte[] columnFamilyBytes; + + public ColumnHelper(ColumnFamily columnFamily) { + this.columnFamily = columnFamily; + columnFamilyBytes = columnFamily.getBytes(); + } + + /** + * Sends a Mutation to the table. The mutations will be buffered and sent over + * the wire as part of a batch. + * + * @param rowKey identifying the row to write. Nothing gets written when null. + * @param tableMutator used to modify the underlying HBase table + * @param columnQualifier column qualifier. Nothing gets written when null. + * @param timestamp version timestamp. When null the server timestamp will be + * used. + * @param inputValue the value to write to the rowKey and column qualifier. + * Nothing gets written when null. + * @throws IOException + */ + public void store(byte[] rowKey, TypedBufferedMutator tableMutator, + byte[] columnQualifier, Long timestamp, Object inputValue) + throws IOException { + if ((rowKey == null) || (columnQualifier == null) || (inputValue == null)) { + return; + } + Put p = new Put(rowKey); + + if (timestamp == null) { + p.addColumn(columnFamilyBytes, columnQualifier, + GenericObjectMapper.write(inputValue)); + } else { + p.addColumn(columnFamilyBytes, columnQualifier, timestamp, + GenericObjectMapper.write(inputValue)); + } + tableMutator.mutate(p); + } + + /** + * @return the column family for this column implementation. + */ + public ColumnFamily getColumnFamily() { + return columnFamily; + } + + /** + * Get the latest version of this specified column. Note: this call clones the + * value content of the hosting {@link Cell}. + * + * @param result from which to read the value. Cannot be null + * @param columnQualifierBytes referring to the column to be read. + * @return latest version of the specified column of whichever object was + * written. + * @throws IOException + */ + public Object readResult(Result result, byte[] columnQualifierBytes) + throws IOException { + if (result == null || columnQualifierBytes == null) { + return null; + } + + // Would have preferred to be able to use getValueAsByteBuffer and get a + // ByteBuffer to avoid copy, but GenericObjectMapper doesn't seem to like + // that. + byte[] value = result.getValue(columnFamilyBytes, columnQualifierBytes); + return GenericObjectMapper.read(value); + } + + /** + * @param result from which to reads timeseries data + * @param columnPrefixBytes optional prefix to limit columns. If null all + * columns are returned. + * @return the cell values at each respective time in for form + * {idA={timestamp1->value1}, idA={timestamp2->value2}, + * idB={timestamp3->value3}, idC={timestamp1->value4}} + * @throws IOException + */ + public NavigableMap> readTimeseriesResults( + Result result, byte[] columnPrefixBytes) throws IOException { + + NavigableMap> results = + new TreeMap>(); + + if (result != null) { + NavigableMap>> resultMap = + result.getMap(); + + NavigableMap> columnCellMap = + resultMap.get(columnFamilyBytes); + + // could be that there is no such column family. + if (columnCellMap != null) { + for (Entry> entry : columnCellMap + .entrySet()) { + String columnName = null; + if (columnPrefixBytes == null) { + // Decode the spaces we encoded in the column name. + columnName = Separator.decode(entry.getKey(), Separator.SPACE); + } else { + // A non-null prefix means columns are actually of the form + // prefix!columnNameRemainder + byte[][] columnNameParts = + Separator.QUALIFIERS.split(entry.getKey(), 2); + byte[] actualColumnPrefixBytes = columnNameParts[0]; + if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes) + && columnNameParts.length == 2) { + // This is the prefix that we want + columnName = Separator.decode(columnNameParts[1]); + } + } + + // If this column has the prefix we want + if (columnName != null) { + NavigableMap cellResults = + new TreeMap(); + NavigableMap cells = entry.getValue(); + if (cells != null) { + for (Entry cell : cells.entrySet()) { + Number value = + (Number) GenericObjectMapper.read(cell.getValue()); + cellResults.put(cell.getKey(), value); + } + } + results.put(columnName, cellResults); + } + } // for entry : columnCellMap + } // if columnCellMap != null + } // if result != null + return results; + } + + /** + * @param result from which to read columns + * @param columnPrefixBytes optional prefix to limit columns. If null all + * columns are returned. + * @return the latest values of columns in the column family. + * @throws IOException + */ + public Map readResults(Result result, byte[] columnPrefixBytes) + throws IOException { + Map results = new HashMap(); + + if (result != null) { + Map columns = result.getFamilyMap(columnFamilyBytes); + for (Entry entry : columns.entrySet()) { + if (entry.getKey() != null && entry.getKey().length > 0) { + + String columnName = null; + if (columnPrefixBytes == null) { + // Decode the spaces we encoded in the column name. + columnName = Separator.decode(entry.getKey(), Separator.SPACE); + } else { + // A non-null prefix means columns are actually of the form + // prefix!columnNameRemainder + byte[][] columnNameParts = + Separator.QUALIFIERS.split(entry.getKey(), 2); + byte[] actualColumnPrefixBytes = columnNameParts[0]; + if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes) + && columnNameParts.length == 2) { + // This is the prefix that we want + columnName = Separator.decode(columnNameParts[1]); + } + } + + // If this column has the prefix we want + if (columnName != null) { + Object value = GenericObjectMapper.read(entry.getValue()); + results.put(columnName, value); + } + } + } // for entry + } + return results; + } + + /** + * @param columnPrefixBytes The byte representation for the column prefix. + * Should not contain {@link Separator#QUALIFIERS}. + * @param qualifier for the remainder of the column. Any + * {@link Separator#QUALIFIERS} will be encoded in the qualifier. + * @return fully sanitized column qualifier that is a combination of prefix + * and qualifier. If prefix is null, the result is simply the encoded + * qualifier without any separator. + */ + public static byte[] getColumnQualifier(byte[] columnPrefixBytes, + String qualifier) { + + // We don't want column names to have spaces + byte[] encodedQualifier = Bytes.toBytes(Separator.SPACE.encode(qualifier)); + if (columnPrefixBytes == null) { + return encodedQualifier; + } + + // Convert qualifier to lower case, strip of separators and tag on column + // prefix. + byte[] columnQualifier = + Separator.QUALIFIERS.join(columnPrefixBytes, encodedQualifier); + return columnQualifier; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0369827/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java new file mode 100644 index 0000000..2eedea0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import java.io.IOException; +import java.util.Map; +import java.util.NavigableMap; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Result; + +/** + * Used to represent a partially qualified column, where the actual column name + * will be composed of a prefix and the remainder of the column qualifier. The + * prefix can be null, in which case the column qualifier will be completely + * determined when the values are stored. + */ +public interface ColumnPrefix { + + /** + * Sends a Mutation to the table. The mutations will be buffered and sent over + * the wire as part of a batch. + * + * @param rowKey identifying the row to write. Nothing gets written when null. + * @param tableMutator used to modify the underlying HBase table. Caller is + * responsible to pass a mutator for the table that actually has this + * column. + * @param qualifier column qualifier. Nothing gets written when null. + * @param timestamp version timestamp. When null the server timestamp will be + * used. + * @param inputValue the value to write to the rowKey and column qualifier. + * Nothing gets written when null. + * @throws IOException + */ + public void store(byte[] rowKey, TypedBufferedMutator tableMutator, + String qualifier, Long timestamp, Object inputValue) throws IOException; + + /** + * Get the latest version of this specified column. Note: this call clones the + * value content of the hosting {@link Cell}. + * + * @param result Cannot be null + * @param qualifier column qualifier. Nothing gets read when null. + * @return result object (can be cast to whatever object was written to) or + * null when specified column qualifier for this prefix doesn't exist + * in the result. + * @throws IOException + */ + public Object readResult(Result result, String qualifier) throws IOException; + + /** + * @param resultfrom which to read columns + * @return the latest values of columns in the column family with this prefix + * (or all of them if the prefix value is null). + * @throws IOException + */ + public Map readResults(Result result) throws IOException; + + /** + * @param result from which to reads timeseries data + * @return the cell values at each respective time in for form + * {idA={timestamp1->value1}, idA={timestamp2->value2}, + * idB={timestamp3->value3}, idC={timestamp1->value4}} + * @throws IOException + */ + public NavigableMap> readTimeseriesResults( + Result result) throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0369827/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Range.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Range.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Range.java new file mode 100644 index 0000000..2cb6c08 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Range.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class Range { + private final int startIdx; + private final int endIdx; + + /** + * Defines a range from start index (inclusive) to end index (exclusive). + * + * @param start + * Starting index position + * @param end + * Ending index position (exclusive) + */ + public Range(int start, int end) { + if (start < 0 || end < start) { + throw new IllegalArgumentException( + "Invalid range, required that: 0 <= start <= end; start=" + start + + ", end=" + end); + } + + this.startIdx = start; + this.endIdx = end; + } + + public int start() { + return startIdx; + } + + public int end() { + return endIdx; + } + + public int length() { + return endIdx - startIdx; + } +} \ No newline at end of file