Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 99617200B21 for ; Fri, 10 Jun 2016 17:37:39 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 97D41160A15; Fri, 10 Jun 2016 15:37:39 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 49A49160A5B for ; Fri, 10 Jun 2016 17:37:37 +0200 (CEST) Received: (qmail 75025 invoked by uid 500); 10 Jun 2016 15:37:35 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 74593 invoked by uid 99); 10 Jun 2016 15:37:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Jun 2016 15:37:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 593A2E07F6; Fri, 10 Jun 2016 15:37:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: varunsaxena@apache.org To: common-commits@hadoop.apache.org Date: Fri, 10 Jun 2016 15:37:37 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/4] hadoop git commit: YARN-5170. Eliminate singleton converters and static method access archived-at: Fri, 10 Jun 2016 15:37:39 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java index 32ef1c3..d3ef897 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java @@ -30,14 +30,8 @@ import org.apache.hadoop.hbase.util.Bytes; */ public final class EventColumnNameConverter implements KeyConverter { - private static final EventColumnNameConverter INSTANCE = - new EventColumnNameConverter(); - public static EventColumnNameConverter getInstance() { - return INSTANCE; - } - - private EventColumnNameConverter() { + public EventColumnNameConverter() { } // eventId=timestamp=infokey are of types String, Long String @@ -69,7 +63,7 @@ public final class EventColumnNameConverter return Separator.VALUES.join(first, Separator.EMPTY_BYTES); } byte[] second = Bytes.toBytes( - TimelineStorageUtils.invertLong(key.getTimestamp())); + LongConverter.invertLong(key.getTimestamp())); if (key.getInfoKey() == null) { return Separator.VALUES.join(first, second, Separator.EMPTY_BYTES); } @@ -96,7 +90,7 @@ public final class EventColumnNameConverter } String id = Separator.decode(Bytes.toString(components[0]), Separator.VALUES, Separator.TAB, Separator.SPACE); - Long ts = TimelineStorageUtils.invertLong(Bytes.toLong(components[1])); + Long ts = LongConverter.invertLong(Bytes.toLong(components[1])); String infoKey = components[2].length == 0 ? null : Separator.decode(Bytes.toString(components[2]), Separator.VALUES, Separator.TAB, Separator.SPACE); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java index 48c56f9..600601a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.common; import java.io.IOException; +import java.io.Serializable; import org.apache.hadoop.hbase.util.Bytes; @@ -26,14 +27,15 @@ import org.apache.hadoop.hbase.util.Bytes; * Encodes a value by interpreting it as a Long and converting it to bytes and * decodes a set of bytes as a Long. */ -public final class LongConverter implements NumericValueConverter { - private static final LongConverter INSTANCE = new LongConverter(); +public final class LongConverter implements NumericValueConverter, + Serializable { - private LongConverter() { - } + /** + * Added because we implement Comparator. + */ + private static final long serialVersionUID = 1L; - public static LongConverter getInstance() { - return INSTANCE; + public LongConverter() { } @Override @@ -76,4 +78,17 @@ public final class LongConverter implements NumericValueConverter { } return sum; } + + /** + * Converts a timestamp into it's inverse timestamp to be used in (row) keys + * where we want to have the most recent timestamp in the top of the table + * (scans start at the most recent timestamp first). + * + * @param key value to be inverted so that the latest version will be first in + * a scan. + * @return inverted long + */ + public static long invertLong(long key) { + return Long.MAX_VALUE - key; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java index 3954145..4a724d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java @@ -23,13 +23,13 @@ import java.io.IOException; * Encodes and decodes column names / row keys which are long. */ public final class LongKeyConverter implements KeyConverter { - private static final LongKeyConverter INSTANCE = new LongKeyConverter(); - public static LongKeyConverter getInstance() { - return INSTANCE; - } + /** + * To delegate the actual work to. + */ + private final LongConverter longConverter = new LongConverter(); - private LongKeyConverter() { + public LongKeyConverter() { } /* @@ -44,7 +44,7 @@ public final class LongKeyConverter implements KeyConverter { try { // IOException will not be thrown here as we are explicitly passing // Long. - return LongConverter.getInstance().encodeValue(key); + return longConverter.encodeValue(key); } catch (IOException e) { return null; } @@ -60,7 +60,7 @@ public final class LongKeyConverter implements KeyConverter { @Override public Long decode(byte[] bytes) { try { - return (Long) LongConverter.getInstance().decodeValue(bytes); + return (Long) longConverter.decodeValue(bytes); } catch (IOException e) { return null; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/RowKeyPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/RowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/RowKeyPrefix.java new file mode 100644 index 0000000..6159dc7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/RowKeyPrefix.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +/** + * In queries where a single result is needed, an exact rowkey can be used + * through the corresponding rowkey#getRowKey() method. For queries that need to + * scan over a range of rowkeys, a partial (the initial part) of rowkeys are + * used. Classes implementing RowKeyPrefix indicate that they are the initial + * part of rowkeys, with different constructors with fewer number of argument to + * form a partial rowkey, a prefix. + * + * @param indicating the type of rowkey that a particular implementation is + * a prefix for. + */ +public interface RowKeyPrefix { + + /** + * Create a row key prefix, meaning a partial rowkey that can be used in range + * scans. Which fields are included in the prefix will depend on the + * constructor of the specific instance that was used. Output depends on which + * constructor was used. + * @return a prefix of the following form {@code fist!second!...!last!} + */ + byte[] getRowKeyPrefix(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java index b0f6d55..282848e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java @@ -24,13 +24,8 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.common; * added later, if required in the associated ColumnPrefix implementations. */ public final class StringKeyConverter implements KeyConverter { - private static final StringKeyConverter INSTANCE = new StringKeyConverter(); - public static StringKeyConverter getInstance() { - return INSTANCE; - } - - private StringKeyConverter() { + public StringKeyConverter() { } /* http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java index d52a5d7..aa9a793 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java @@ -18,14 +18,12 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.common; import java.io.IOException; -import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.SortedSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,13 +35,10 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter; @@ -52,7 +47,6 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilte import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; @@ -73,19 +67,6 @@ public final class TimelineStorageUtils { public static final long MILLIS_ONE_DAY = 86400000L; /** - * Converts a timestamp into it's inverse timestamp to be used in (row) keys - * where we want to have the most recent timestamp in the top of the table - * (scans start at the most recent timestamp first). - * - * @param key value to be inverted so that the latest version will be first in - * a scan. - * @return inverted long - */ - public static long invertLong(long key) { - return Long.MAX_VALUE - key; - } - - /** * Converts an int into it's inverse int to be used in (row) keys * where we want to have the largest int value in the top of the table * (scans start at the largest int first). @@ -164,66 +145,6 @@ public final class TimelineStorageUtils { } /** - * checks if an application has finished. - * - * @param te TimlineEntity object. - * @return true if application has finished else false - */ - public static boolean isApplicationFinished(TimelineEntity te) { - SortedSet allEvents = te.getEvents(); - if ((allEvents != null) && (allEvents.size() > 0)) { - TimelineEvent event = allEvents.last(); - if (event.getId().equals( - ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) { - return true; - } - } - return false; - } - - /** - * Check if we have a certain field amongst fields to retrieve. This method - * checks against {@link Field#ALL} as well because that would mean field - * passed needs to be matched. - * - * @param fieldsToRetrieve fields to be retrieved. - * @param requiredField fields to be checked in fieldsToRetrieve. - * @return true if has the required field, false otherwise. - */ - public static boolean hasField(EnumSet fieldsToRetrieve, - Field requiredField) { - return fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(requiredField); - } - - /** - * Checks if the input TimelineEntity object is an ApplicationEntity. - * - * @param te TimelineEntity object. - * @return true if input is an ApplicationEntity, false otherwise - */ - public static boolean isApplicationEntity(TimelineEntity te) { - return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString()); - } - - /** - * @param te TimelineEntity object. - * @param eventId event with this id needs to be fetched - * @return TimelineEvent if TimelineEntity contains the desired event. - */ - public static TimelineEvent getApplicationEvent(TimelineEntity te, - String eventId) { - if (isApplicationEntity(te)) { - for (TimelineEvent event : te.getEvents()) { - if (event.getId().equals(eventId)) { - return event; - } - } - } - return null; - } - - /** * Returns the first seen aggregation operation as seen in the list of input * tags or null otherwise. * @@ -646,98 +567,6 @@ public final class TimelineStorageUtils { return appId; } - /** - * Helper method for reading relationship. - * - * @param Describes the type of column prefix. - * @param entity entity to fill. - * @param result result from HBase. - * @param prefix column prefix. - * @param isRelatedTo if true, means relationship is to be added to - * isRelatedTo, otherwise its added to relatesTo. - * @throws IOException if any problem is encountered while reading result. - */ - public static void readRelationship( - TimelineEntity entity, Result result, ColumnPrefix prefix, - boolean isRelatedTo) throws IOException { - // isRelatedTo and relatesTo are of type Map> - Map columns = - prefix.readResults(result, StringKeyConverter.getInstance()); - for (Map.Entry column : columns.entrySet()) { - for (String id : Separator.VALUES.splitEncoded( - column.getValue().toString())) { - if (isRelatedTo) { - entity.addIsRelatedToEntity(column.getKey(), id); - } else { - entity.addRelatesToEntity(column.getKey(), id); - } - } - } - } - - /** - * Helper method for reading key-value pairs for either info or config. - * - * @param Describes the type of column prefix. - * @param entity entity to fill. - * @param result result from HBase. - * @param prefix column prefix. - * @param isConfig if true, means we are reading configs, otherwise info. - * @throws IOException if any problem is encountered while reading result. - */ - public static void readKeyValuePairs( - TimelineEntity entity, Result result, ColumnPrefix prefix, - boolean isConfig) throws IOException { - // info and configuration are of type Map - Map columns = - prefix.readResults(result, StringKeyConverter.getInstance()); - if (isConfig) { - for (Map.Entry column : columns.entrySet()) { - entity.addConfig(column.getKey(), column.getValue().toString()); - } - } else { - entity.addInfo(columns); - } - } - - /** - * Read events from the entity table or the application table. The column name - * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted - * if there is no info associated with the event. - * - * @param Describes the type of column prefix. - * @param entity entity to fill. - * @param result HBase Result. - * @param prefix column prefix. - * @throws IOException if any problem is encountered while reading result. - */ - public static void readEvents(TimelineEntity entity, Result result, - ColumnPrefix prefix) throws IOException { - Map eventsMap = new HashMap<>(); - Map eventsResult = - prefix.readResults(result, EventColumnNameConverter.getInstance()); - for (Map.Entry - eventResult : eventsResult.entrySet()) { - EventColumnName eventColumnName = eventResult.getKey(); - String key = eventColumnName.getId() + - Long.toString(eventColumnName.getTimestamp()); - // Retrieve previously seen event to add to it - TimelineEvent event = eventsMap.get(key); - if (event == null) { - // First time we're seeing this event, add it to the eventsMap - event = new TimelineEvent(); - event.setId(eventColumnName.getId()); - event.setTimestamp(eventColumnName.getTimestamp()); - eventsMap.put(key, event); - } - if (eventColumnName.getInfoKey() != null) { - event.addInfo(eventColumnName.getInfoKey(), eventResult.getValue()); - } - } - Set eventsSet = new HashSet<>(eventsMap.values()); - entity.addEvents(eventsSet); - } - public static boolean isFlowRunTable(HRegionInfo hRegionInfo, Configuration conf) { String regionTableName = hRegionInfo.getTable().getNameAsString(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java index 775879a..93b4b36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java @@ -49,8 +49,7 @@ public enum EntityColumn implements Column { /** * When the entity was created. */ - CREATED_TIME(EntityColumnFamily.INFO, "created_time", - LongConverter.getInstance()), + CREATED_TIME(EntityColumnFamily.INFO, "created_time", new LongConverter()), /** * The version of the flow that this entity belongs to. http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java index 02a4bb3..e410549 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java @@ -67,8 +67,7 @@ public enum EntityColumnPrefix implements ColumnPrefix { /** * Metrics are stored with the metric name as the column name. */ - METRIC(EntityColumnFamily.METRICS, null, - LongConverter.getInstance()); + METRIC(EntityColumnFamily.METRICS, null, new LongConverter()); private final ColumnHelper column; private final ColumnFamily columnFamily; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java index 6d08390..ff22178 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java @@ -17,6 +17,12 @@ */ package org.apache.hadoop.yarn.server.timelineservice.storage.entity; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; + /** * Represents a rowkey for the entity table. */ @@ -28,6 +34,8 @@ public class EntityRowKey { private final String appId; private final String entityType; private final String entityId; + private final KeyConverter entityRowKeyConverter = + new EntityRowKeyConverter(); public EntityRowKey(String clusterId, String userId, String flowName, Long flowRunId, String appId, String entityType, String entityId) { @@ -69,61 +77,14 @@ public class EntityRowKey { } /** - * Constructs a row key prefix for the entity table as follows: - * {@code userName!clusterId!flowName!flowRunId!AppId}. - * - * @param clusterId Context cluster id. - * @param userId User name. - * @param flowName Flow name. - * @param flowRunId Run Id for the flow. - * @param appId Application Id. - * @return byte array with the row key prefix. - */ - public static byte[] getRowKeyPrefix(String clusterId, String userId, - String flowName, Long flowRunId, String appId) { - return EntityRowKeyConverter.getInstance().encode(new EntityRowKey( - clusterId, userId, flowName, flowRunId, appId, null, null)); - } - - /** - * Constructs a row key prefix for the entity table as follows: - * {@code userName!clusterId!flowName!flowRunId!AppId!entityType!}. - * Typically used while querying multiple entities of a particular entity - * type. - * - * @param clusterId Context cluster id. - * @param userId User name. - * @param flowName Flow name. - * @param flowRunId Run Id for the flow. - * @param appId Application Id. - * @param entityType Entity type. - * @return byte array with the row key prefix. - */ - public static byte[] getRowKeyPrefix(String clusterId, String userId, - String flowName, Long flowRunId, String appId, String entityType) { - return EntityRowKeyConverter.getInstance().encode(new EntityRowKey( - clusterId, userId, flowName, flowRunId, appId, entityType, null)); - } - - /** * Constructs a row key for the entity table as follows: * {@code userName!clusterId!flowName!flowRunId!AppId!entityType!entityId}. * Typically used while querying a specific entity. * - * @param clusterId Context cluster id. - * @param userId User name. - * @param flowName Flow name. - * @param flowRunId Run Id for the flow. - * @param appId Application Id. - * @param entityType Entity type. - * @param entityId Entity Id. * @return byte array with the row key. */ - public static byte[] getRowKey(String clusterId, String userId, - String flowName, Long flowRunId, String appId, String entityType, - String entityId) { - return EntityRowKeyConverter.getInstance().encode(new EntityRowKey( - clusterId, userId, flowName, flowRunId, appId, entityType, entityId)); + public byte[] getRowKey() { + return entityRowKeyConverter.encode(this); } /** @@ -133,6 +94,132 @@ public class EntityRowKey { * @return An EntityRowKey object. */ public static EntityRowKey parseRowKey(byte[] rowKey) { - return EntityRowKeyConverter.getInstance().decode(rowKey); + return new EntityRowKeyConverter().decode(rowKey); + } + + /** + * Encodes and decodes row key for entity table. The row key is of the form : + * userName!clusterId!flowName!flowRunId!appId!entityType!entityId. flowRunId + * is a long, appId is encoded/decoded using {@link AppIdKeyConverter} and + * rest are strings. + *

+ */ + final private static class EntityRowKeyConverter implements + KeyConverter { + + private final AppIdKeyConverter appIDKeyConverter = new AppIdKeyConverter(); + + private EntityRowKeyConverter() { + } + + /** + * Entity row key is of the form + * userName!clusterId!flowName!flowRunId!appId!entityType!entityId w. each + * segment separated by !. The sizes below indicate sizes of each one of + * these segments in sequence. clusterId, userName, flowName, entityType and + * entityId are strings. flowrunId is a long hence 8 bytes in size. app id + * is represented as 12 bytes with cluster timestamp part of appid being 8 + * bytes (long) and seq id being 4 bytes(int). Strings are variable in size + * (i.e. end whenever separator is encountered). This is used while decoding + * and helps in determining where to split. + */ + private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE, + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, + AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE, + Separator.VARIABLE_SIZE }; + + /* + * (non-Javadoc) + * + * Encodes EntityRowKey object into a byte array with each component/field + * in EntityRowKey separated by Separator#QUALIFIERS. This leads to an + * entity table row key of the form + * userName!clusterId!flowName!flowRunId!appId!entityType!entityId If + * entityType in passed EntityRowKey object is null (and the fields + * preceding it i.e. clusterId, userId and flowName, flowRunId and appId + * are not null), this returns a row key prefix of the form + * userName!clusterId!flowName!flowRunId!appId! and if entityId in + * EntityRowKey is null (other 6 components are not null), this returns a + * row key prefix of the form + * userName!clusterId!flowName!flowRunId!appId!entityType! flowRunId is + * inverted while encoding as it helps maintain a descending order for row + * keys in entity table. + * + * @see org.apache.hadoop.yarn.server.timelineservice.storage.common + * .KeyConverter#encode(java.lang.Object) + */ + @Override + public byte[] encode(EntityRowKey rowKey) { + byte[] user = + Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB, + Separator.QUALIFIERS); + byte[] cluster = + Separator.encode(rowKey.getClusterId(), Separator.SPACE, + Separator.TAB, Separator.QUALIFIERS); + byte[] flow = + Separator.encode(rowKey.getFlowName(), Separator.SPACE, + Separator.TAB, Separator.QUALIFIERS); + byte[] first = Separator.QUALIFIERS.join(user, cluster, flow); + // Note that flowRunId is a long, so we can't encode them all at the same + // time. + byte[] second = + Bytes.toBytes(LongConverter.invertLong(rowKey.getFlowRunId())); + byte[] third = appIDKeyConverter.encode(rowKey.getAppId()); + if (rowKey.getEntityType() == null) { + return Separator.QUALIFIERS.join(first, second, third, + Separator.EMPTY_BYTES); + } + byte[] entityType = + Separator.encode(rowKey.getEntityType(), Separator.SPACE, + Separator.TAB, Separator.QUALIFIERS); + byte[] entityId = + rowKey.getEntityId() == null ? Separator.EMPTY_BYTES : Separator + .encode(rowKey.getEntityId(), Separator.SPACE, Separator.TAB, + Separator.QUALIFIERS); + byte[] fourth = Separator.QUALIFIERS.join(entityType, entityId); + return Separator.QUALIFIERS.join(first, second, third, fourth); + } + + /* + * (non-Javadoc) + * + * Decodes an application row key of the form + * userName!clusterId!flowName!flowRunId!appId!entityType!entityId + * represented in byte format and converts it into an EntityRowKey object. + * flowRunId is inverted while decoding as it was inverted while encoding. + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common + * .KeyConverter#decode(byte[]) + */ + @Override + public EntityRowKey decode(byte[] rowKey) { + byte[][] rowKeyComponents = + Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES); + if (rowKeyComponents.length != 7) { + throw new IllegalArgumentException("the row key is not valid for " + + "an entity"); + } + String userId = + Separator.decode(Bytes.toString(rowKeyComponents[0]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + String clusterId = + Separator.decode(Bytes.toString(rowKeyComponents[1]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + String flowName = + Separator.decode(Bytes.toString(rowKeyComponents[2]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + Long flowRunId = + LongConverter.invertLong(Bytes.toLong(rowKeyComponents[3])); + String appId = appIDKeyConverter.decode(rowKeyComponents[4]); + String entityType = + Separator.decode(Bytes.toString(rowKeyComponents[5]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + String entityId = + Separator.decode(Bytes.toString(rowKeyComponents[6]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + return new EntityRowKey(clusterId, userId, flowName, flowRunId, appId, + entityType, entityId); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java deleted file mode 100644 index 43c0569..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java +++ /dev/null @@ -1,143 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage.entity; - -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; - -/** - * Encodes and decodes row key for entity table. - * The row key is of the form : - * userName!clusterId!flowName!flowRunId!appId!entityType!entityId. - * flowRunId is a long, appId is encoded/decoded using - * {@link AppIdKeyConverter} and rest are strings. - */ -public final class EntityRowKeyConverter implements KeyConverter { - private static final EntityRowKeyConverter INSTANCE = - new EntityRowKeyConverter(); - - public static EntityRowKeyConverter getInstance() { - return INSTANCE; - } - - private EntityRowKeyConverter() { - } - - // Entity row key is of the form - // userName!clusterId!flowName!flowRunId!appId!entityType!entityId with each - // segment separated by !. The sizes below indicate sizes of each one of these - // segements in sequence. clusterId, userName, flowName, entityType and - // entityId are strings. flowrunId is a long hence 8 bytes in size. app id is - // represented as 12 bytes with cluster timestamp part of appid being 8 bytes - // (long) and seq id being 4 bytes(int). - // Strings are variable in size (i.e. end whenever separator is encountered). - // This is used while decoding and helps in determining where to split. - private static final int[] SEGMENT_SIZES = { - Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, - Bytes.SIZEOF_LONG, AppIdKeyConverter.getKeySize(), - Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE }; - - /* - * (non-Javadoc) - * - * Encodes EntityRowKey object into a byte array with each component/field in - * EntityRowKey separated by Separator#QUALIFIERS. This leads to an entity - * table row key of the form - * userName!clusterId!flowName!flowRunId!appId!entityType!entityId - * If entityType in passed EntityRowKey object is null (and the fields - * preceding it i.e. clusterId, userId and flowName, flowRunId and appId are - * not null), this returns a row key prefix of the form - * userName!clusterId!flowName!flowRunId!appId! and if entityId in - * EntityRowKey is null (other 6 components are not null), this returns a row - * key prefix of the form - * userName!clusterId!flowName!flowRunId!appId!entityType! - * flowRunId is inverted while encoding as it helps maintain a descending - * order for row keys in entity table. - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter - * #encode(java.lang.Object) - */ - @Override - public byte[] encode(EntityRowKey rowKey) { - byte[] user = Separator.encode(rowKey.getUserId(), - Separator.SPACE, Separator.TAB, Separator.QUALIFIERS); - byte[] cluster = Separator.encode(rowKey.getClusterId(), - Separator.SPACE, Separator.TAB, Separator.QUALIFIERS); - byte[] flow = Separator.encode(rowKey.getFlowName(), - Separator.SPACE, Separator.TAB, Separator.QUALIFIERS); - byte[] first = Separator.QUALIFIERS.join(user, cluster, flow); - // Note that flowRunId is a long, so we can't encode them all at the same - // time. - byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong( - rowKey.getFlowRunId())); - byte[] third = AppIdKeyConverter.getInstance().encode(rowKey.getAppId()); - if (rowKey.getEntityType() == null) { - return Separator.QUALIFIERS.join( - first, second, third, Separator.EMPTY_BYTES); - } - byte[] entityType = Separator.encode(rowKey.getEntityType(), - Separator.SPACE, Separator.TAB, Separator.QUALIFIERS); - byte[] entityId = rowKey.getEntityId() == null ? Separator.EMPTY_BYTES : - Separator.encode(rowKey.getEntityId(), Separator.SPACE, Separator.TAB, - Separator.QUALIFIERS); - byte[] fourth = Separator.QUALIFIERS.join(entityType, entityId); - return Separator.QUALIFIERS.join(first, second, third, fourth); - } - - /* - * (non-Javadoc) - * - * Decodes an application row key of the form - * userName!clusterId!flowName!flowRunId!appId!entityType!entityId represented - * in byte format and converts it into an EntityRowKey object. flowRunId is - * inverted while decoding as it was inverted while encoding. - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter - * #decode(byte[]) - */ - @Override - public EntityRowKey decode(byte[] rowKey) { - byte[][] rowKeyComponents = - Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES); - if (rowKeyComponents.length != 7) { - throw new IllegalArgumentException("the row key is not valid for " + - "an entity"); - } - String userId = Separator.decode(Bytes.toString(rowKeyComponents[0]), - Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); - String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[1]), - Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); - String flowName = Separator.decode(Bytes.toString(rowKeyComponents[2]), - Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); - Long flowRunId = - TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3])); - String appId = AppIdKeyConverter.getInstance().decode(rowKeyComponents[4]); - String entityType = Separator.decode(Bytes.toString(rowKeyComponents[5]), - Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); - String entityId =Separator.decode(Bytes.toString(rowKeyComponents[6]), - Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); - return new EntityRowKey(clusterId, userId, flowName, flowRunId, appId, - entityType, entityId); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java new file mode 100644 index 0000000..9146180 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.entity; + +import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix; + +/** + * Represents a partial rowkey without the entityId or without entityType and + * entityId for the entity table. + * + */ +public class EntityRowKeyPrefix extends EntityRowKey implements + RowKeyPrefix { + + /** + * Creates a prefix which generates the following rowKeyPrefixes for the + * entity table: + * {@code userName!clusterId!flowName!flowRunId!AppId!entityType!}. + * + * @param clusterId identifying the cluster + * @param userId identifying the user + * @param flowName identifying the flow + * @param flowRunId identifying the individual run of this flow + * @param appId identifying the application + * @param entityType which entity type + */ + public EntityRowKeyPrefix(String clusterId, String userId, String flowName, + Long flowRunId, String appId, String entityType) { + super(clusterId, userId, flowName, flowRunId, appId, entityType, null); + } + + /** + * Creates a prefix which generates the following rowKeyPrefixes for the + * entity table: + * {@code userName!clusterId!flowName!flowRunId!AppId!entityType!entityId}. + * + * @param clusterId identifying the cluster + * @param userId identifying the user + * @param flowName identifying the flow + * @param flowRunId identifying the individual run of this flow + * @param appId identifying the application + */ + public EntityRowKeyPrefix(String clusterId, String userId, String flowName, + Long flowRunId, String appId) { + super(clusterId, userId, flowName, flowRunId, appId, null, null); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.application. + * RowKeyPrefix#getRowKeyPrefix() + */ + public byte[] getRowKeyPrefix() { + return super.getRowKey(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java index eea38a5..d10608a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.yarn.server.timelineservice.storage.flow; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; /** @@ -28,11 +32,37 @@ public class FlowActivityRowKey { private final Long dayTs; private final String userId; private final String flowName; + private final KeyConverter flowActivityRowKeyConverter = + new FlowActivityRowKeyConverter(); + /** + * @param clusterId identifying the cluster + * @param dayTs to be converted to the top of the day timestamp + * @param userId identifying user + * @param flowName identifying the flow + */ public FlowActivityRowKey(String clusterId, Long dayTs, String userId, String flowName) { + this(clusterId, dayTs, userId, flowName, true); + } + + /** + * @param clusterId identifying the cluster + * @param timestamp when the flow activity happened. May be converted to the + * top of the day depending on the convertDayTsToTopOfDay argument. + * @param userId identifying user + * @param flowName identifying the flow + * @param convertDayTsToTopOfDay if true and timestamp isn't null, then + * timestamp will be converted to the top-of-the day timestamp + */ + protected FlowActivityRowKey(String clusterId, Long timestamp, String userId, + String flowName, boolean convertDayTsToTopOfDay) { this.clusterId = clusterId; - this.dayTs = dayTs; + if (convertDayTsToTopOfDay && (timestamp != null)) { + this.dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(timestamp); + } else { + this.dayTs = timestamp; + } this.userId = userId; this.flowName = flowName; } @@ -54,46 +84,13 @@ public class FlowActivityRowKey { } /** - * Constructs a row key prefix for the flow activity table as follows: - * {@code clusterId!}. - * - * @param clusterId Cluster Id. - * @return byte array with the row key prefix - */ - public static byte[] getRowKeyPrefix(String clusterId) { - return FlowActivityRowKeyConverter.getInstance().encode( - new FlowActivityRowKey(clusterId, null, null, null)); - } - - /** - * Constructs a row key prefix for the flow activity table as follows: - * {@code clusterId!dayTimestamp!}. - * - * @param clusterId Cluster Id. - * @param dayTs Start of the day timestamp. - * @return byte array with the row key prefix - */ - public static byte[] getRowKeyPrefix(String clusterId, long dayTs) { - return FlowActivityRowKeyConverter.getInstance().encode( - new FlowActivityRowKey(clusterId, dayTs, null, null)); - } - - /** * Constructs a row key for the flow activity table as follows: * {@code clusterId!dayTimestamp!user!flowName}. * - * @param clusterId Cluster Id. - * @param eventTs event's TimeStamp. - * @param userId User Id. - * @param flowName Flow Name. * @return byte array for the row key */ - public static byte[] getRowKey(String clusterId, long eventTs, String userId, - String flowName) { - // convert it to Day's time stamp - eventTs = TimelineStorageUtils.getTopOfTheDayTimestamp(eventTs); - return FlowActivityRowKeyConverter.getInstance().encode( - new FlowActivityRowKey(clusterId, eventTs, userId, flowName)); + public byte[] getRowKey() { + return flowActivityRowKeyConverter.encode(this); } /** @@ -103,6 +100,97 @@ public class FlowActivityRowKey { * @return A FlowActivityRowKey object. */ public static FlowActivityRowKey parseRowKey(byte[] rowKey) { - return FlowActivityRowKeyConverter.getInstance().decode(rowKey); + return new FlowActivityRowKeyConverter().decode(rowKey); + } + + /** + * Encodes and decodes row key for flow activity table. The row key is of the + * form : clusterId!dayTimestamp!user!flowName. dayTimestamp(top of the day + * timestamp) is a long and rest are strings. + *

+ */ + final private static class FlowActivityRowKeyConverter implements + KeyConverter { + + private FlowActivityRowKeyConverter() { + } + + /** + * The flow activity row key is of the form + * clusterId!dayTimestamp!user!flowName with each segment separated by !. + * The sizes below indicate sizes of each one of these segements in + * sequence. clusterId, user and flowName are strings. Top of the day + * timestamp is a long hence 8 bytes in size. Strings are variable in size + * (i.e. they end whenever separator is encountered). This is used while + * decoding and helps in determining where to split. + */ + private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE, + Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE }; + + /* + * (non-Javadoc) + * + * Encodes FlowActivityRowKey object into a byte array with each + * component/field in FlowActivityRowKey separated by Separator#QUALIFIERS. + * This leads to an flow activity table row key of the form + * clusterId!dayTimestamp!user!flowName. If dayTimestamp in passed + * FlowActivityRowKey object is null and clusterId is not null, then this + * returns a row key prefix as clusterId! and if userId in + * FlowActivityRowKey is null (and the fields preceding it i.e. clusterId + * and dayTimestamp are not null), this returns a row key prefix as + * clusterId!dayTimeStamp! dayTimestamp is inverted while encoding as it + * helps maintain a descending order for row keys in flow activity table. + * + * @see org.apache.hadoop.yarn.server.timelineservice.storage.common + * .KeyConverter#encode(java.lang.Object) + */ + @Override + public byte[] encode(FlowActivityRowKey rowKey) { + if (rowKey.getDayTimestamp() == null) { + return Separator.QUALIFIERS.join(Separator.encode( + rowKey.getClusterId(), Separator.SPACE, Separator.TAB, + Separator.QUALIFIERS), Separator.EMPTY_BYTES); + } + if (rowKey.getUserId() == null) { + return Separator.QUALIFIERS.join(Separator.encode( + rowKey.getClusterId(), Separator.SPACE, Separator.TAB, + Separator.QUALIFIERS), Bytes.toBytes(LongConverter + .invertLong(rowKey.getDayTimestamp())), Separator.EMPTY_BYTES); + } + return Separator.QUALIFIERS.join(Separator.encode(rowKey.getClusterId(), + Separator.SPACE, Separator.TAB, Separator.QUALIFIERS), Bytes + .toBytes(LongConverter.invertLong(rowKey.getDayTimestamp())), + Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB, + Separator.QUALIFIERS), Separator.encode(rowKey.getFlowName(), + Separator.SPACE, Separator.TAB, Separator.QUALIFIERS)); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common + * .KeyConverter#decode(byte[]) + */ + @Override + public FlowActivityRowKey decode(byte[] rowKey) { + byte[][] rowKeyComponents = + Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES); + if (rowKeyComponents.length != 4) { + throw new IllegalArgumentException("the row key is not valid for " + + "a flow activity"); + } + String clusterId = + Separator.decode(Bytes.toString(rowKeyComponents[0]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + Long dayTs = LongConverter.invertLong(Bytes.toLong(rowKeyComponents[1])); + String userId = + Separator.decode(Bytes.toString(rowKeyComponents[2]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + String flowName = + Separator.decode(Bytes.toString(rowKeyComponents[3]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + return new FlowActivityRowKey(clusterId, dayTs, userId, flowName); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyConverter.java deleted file mode 100644 index 9dc4c98..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyConverter.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage.flow; - -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; - -/** - * Encodes and decodes row key for flow activity table. - * The row key is of the form : clusterId!dayTimestamp!user!flowName. - * dayTimestamp(top of the day timestamp) is a long and rest are strings. - */ -public final class FlowActivityRowKeyConverter implements - KeyConverter { - private static final FlowActivityRowKeyConverter INSTANCE = - new FlowActivityRowKeyConverter(); - - public static FlowActivityRowKeyConverter getInstance() { - return INSTANCE; - } - - private FlowActivityRowKeyConverter() { - } - - // Flow activity row key is of the form clusterId!dayTimestamp!user!flowName - // with each segment separated by !. The sizes below indicate sizes of each - // one of these segements in sequence. clusterId, user and flowName are - // strings. Top of the day timestamp is a long hence 8 bytes in size. - // Strings are variable in size (i.e. end whenever separator is encountered). - // This is used while decoding and helps in determining where to split. - private static final int[] SEGMENT_SIZES = { - Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE, - Separator.VARIABLE_SIZE }; - - /* - * (non-Javadoc) - * - * Encodes FlowActivityRowKey object into a byte array with each - * component/field in FlowActivityRowKey separated by Separator#QUALIFIERS. - * This leads to an flow activity table row key of the form - * clusterId!dayTimestamp!user!flowName - * If dayTimestamp in passed FlowActivityRowKey object is null and clusterId - * is not null, this returns a row key prefix as clusterId! and if userId in - * FlowActivityRowKey is null (and the fields preceding it i.e. clusterId and - * dayTimestamp are not null), this returns a row key prefix as - * clusterId!dayTimeStamp! - * dayTimestamp is inverted while encoding as it helps maintain a descending - * order for row keys in flow activity table. - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter - * #encode(java.lang.Object) - */ - - @Override - public byte[] encode(FlowActivityRowKey rowKey) { - if (rowKey.getDayTimestamp() == null) { - return Separator.QUALIFIERS.join(Separator.encode(rowKey.getClusterId(), - Separator.SPACE, Separator.TAB, Separator.QUALIFIERS), - Separator.EMPTY_BYTES); - } - if (rowKey.getUserId() == null) { - return Separator.QUALIFIERS.join(Separator.encode(rowKey.getClusterId(), - Separator.SPACE, Separator.TAB, Separator.QUALIFIERS), - Bytes.toBytes(TimelineStorageUtils.invertLong( - rowKey.getDayTimestamp())), Separator.EMPTY_BYTES); - } - return Separator.QUALIFIERS.join( - Separator.encode(rowKey.getClusterId(), Separator.SPACE, Separator.TAB, - Separator.QUALIFIERS), - Bytes.toBytes( - TimelineStorageUtils.invertLong(rowKey.getDayTimestamp())), - Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB, - Separator.QUALIFIERS), - Separator.encode(rowKey.getFlowName(), Separator.SPACE, Separator.TAB, - Separator.QUALIFIERS)); - } - - @Override - public FlowActivityRowKey decode(byte[] rowKey) { - byte[][] rowKeyComponents = - Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES); - if (rowKeyComponents.length != 4) { - throw new IllegalArgumentException("the row key is not valid for " - + "a flow activity"); - } - String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[0]), - Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); - Long dayTs = - TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[1])); - String userId = Separator.decode(Bytes.toString(rowKeyComponents[2]), - Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); - String flowName = Separator.decode(Bytes.toString(rowKeyComponents[3]), - Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); - return new FlowActivityRowKey(clusterId, dayTs, userId, flowName); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyPrefix.java new file mode 100644 index 0000000..eb88e54 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyPrefix.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix; + +/** + * A prefix partial rowkey for flow activities. + */ +public class FlowActivityRowKeyPrefix extends FlowActivityRowKey implements + RowKeyPrefix { + + /** + * Constructs a row key prefix for the flow activity table as follows: + * {@code clusterId!dayTimestamp!}. + * + * @param clusterId Cluster Id. + * @param dayTs Start of the day timestamp. + */ + public FlowActivityRowKeyPrefix(String clusterId, Long dayTs) { + super(clusterId, dayTs, null, null, false); + } + + /** + * Constructs a row key prefix for the flow activity table as follows: + * {@code clusterId!}. + * + * @param clusterId identifying the cluster + */ + public FlowActivityRowKeyPrefix(String clusterId) { + super(clusterId, null, null, null, false); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.application. + * RowKeyPrefix#getRowKeyPrefix() + */ + public byte[] getRowKeyPrefix() { + return super.getRowKey(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java index f1553b8..2e7a9d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java @@ -25,10 +25,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; /** @@ -41,14 +41,14 @@ public enum FlowRunColumn implements Column { * application start times. */ MIN_START_TIME(FlowRunColumnFamily.INFO, "min_start_time", - AggregationOperation.GLOBAL_MIN, LongConverter.getInstance()), + AggregationOperation.GLOBAL_MIN, new LongConverter()), /** * When the flow ended. This is the maximum of currently known application end * times. */ MAX_END_TIME(FlowRunColumnFamily.INFO, "max_end_time", - AggregationOperation.GLOBAL_MAX, LongConverter.getInstance()), + AggregationOperation.GLOBAL_MAX, new LongConverter()), /** * The version of the flow that this flow belongs to. http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java index 0f14c89..e74282a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java @@ -41,7 +41,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix { /** * To store flow run info values. */ - METRIC(FlowRunColumnFamily.INFO, "m", null, LongConverter.getInstance()); + METRIC(FlowRunColumnFamily.INFO, "m", null, new LongConverter()); private final ColumnHelper column; private final ColumnFamily columnFamily; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java index 925242b..8fda9a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java @@ -17,6 +17,11 @@ */ package org.apache.hadoop.yarn.server.timelineservice.storage.flow; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; + /** * Represents a rowkey for the flow run table. */ @@ -25,6 +30,8 @@ public class FlowRunRowKey { private final String userId; private final String flowName; private final Long flowRunId; + private final FlowRunRowKeyConverter flowRunRowKeyConverter = + new FlowRunRowKeyConverter(); public FlowRunRowKey(String clusterId, String userId, String flowName, Long flowRunId) { @@ -51,36 +58,16 @@ public class FlowRunRowKey { } /** - * Constructs a row key prefix for the flow run table as follows: { - * clusterId!userI!flowName!}. - * - * @param clusterId Cluster Id. - * @param userId User Id. - * @param flowName Flow Name. - * @return byte array with the row key prefix - */ - public static byte[] getRowKeyPrefix(String clusterId, String userId, - String flowName) { - return FlowRunRowKeyConverter.getInstance().encode(new FlowRunRowKey( - clusterId, userId, flowName, null)); - } - - /** * Constructs a row key for the entity table as follows: { * clusterId!userId!flowName!Inverted Flow Run Id}. * - * @param clusterId Cluster Id. - * @param userId User Id. - * @param flowName Flow Name. - * @param flowRunId Run Id for the flow name. * @return byte array with the row key */ - public static byte[] getRowKey(String clusterId, String userId, - String flowName, Long flowRunId) { - return FlowRunRowKeyConverter.getInstance().encode(new FlowRunRowKey( - clusterId, userId, flowName, flowRunId)); + public byte[] getRowKey() { + return flowRunRowKeyConverter.encode(this); } + /** * Given the raw row key as bytes, returns the row key as an object. * @@ -88,7 +75,7 @@ public class FlowRunRowKey { * @return A FlowRunRowKey object. */ public static FlowRunRowKey parseRowKey(byte[] rowKey) { - return FlowRunRowKeyConverter.getInstance().decode(rowKey); + return new FlowRunRowKeyConverter().decode(rowKey); } /** @@ -106,4 +93,98 @@ public class FlowRunRowKey { flowKeyStr.append("}"); return flowKeyStr.toString(); } + + /** + * Encodes and decodes row key for flow run table. + * The row key is of the form : clusterId!userId!flowName!flowrunId. + * flowrunId is a long and rest are strings. + *

+ */ + final private static class FlowRunRowKeyConverter implements + KeyConverter { + + private FlowRunRowKeyConverter() { + } + + /** + * The flow run row key is of the form clusterId!userId!flowName!flowrunId + * with each segment separated by !. The sizes below indicate sizes of each + * one of these segments in sequence. clusterId, userId and flowName are + * strings. flowrunId is a long hence 8 bytes in size. Strings are variable + * in size (i.e. end whenever separator is encountered). This is used while + * decoding and helps in determining where to split. + */ + private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE, + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG }; + + /* + * (non-Javadoc) + * + * Encodes FlowRunRowKey object into a byte array with each component/field + * in FlowRunRowKey separated by Separator#QUALIFIERS. This leads to an flow + * run row key of the form clusterId!userId!flowName!flowrunId If flowRunId + * in passed FlowRunRowKey object is null (and the fields preceding it i.e. + * clusterId, userId and flowName are not null), this returns a row key + * prefix of the form clusterId!userName!flowName! flowRunId is inverted + * while encoding as it helps maintain a descending order for flow keys in + * flow run table. + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common + * .KeyConverter#encode(java.lang.Object) + */ + @Override + public byte[] encode(FlowRunRowKey rowKey) { + byte[] first = + Separator.QUALIFIERS.join(Separator.encode(rowKey.getClusterId(), + Separator.SPACE, Separator.TAB, Separator.QUALIFIERS), Separator + .encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB, + Separator.QUALIFIERS), Separator.encode(rowKey.getFlowName(), + Separator.SPACE, Separator.TAB, Separator.QUALIFIERS)); + if (rowKey.getFlowRunId() == null) { + return Separator.QUALIFIERS.join(first, Separator.EMPTY_BYTES); + } else { + // Note that flowRunId is a long, so we can't encode them all at the + // same + // time. + byte[] second = + Bytes.toBytes(LongConverter.invertLong(rowKey.getFlowRunId())); + return Separator.QUALIFIERS.join(first, second); + } + } + + /* + * (non-Javadoc) + * + * Decodes an flow run row key of the form + * clusterId!userId!flowName!flowrunId represented in byte format and + * converts it into an FlowRunRowKey object. flowRunId is inverted while + * decoding as it was inverted while encoding. + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common + * .KeyConverter#decode(byte[]) + */ + @Override + public FlowRunRowKey decode(byte[] rowKey) { + byte[][] rowKeyComponents = + Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES); + if (rowKeyComponents.length != 4) { + throw new IllegalArgumentException("the row key is not valid for " + + "a flow run"); + } + String clusterId = + Separator.decode(Bytes.toString(rowKeyComponents[0]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + String userId = + Separator.decode(Bytes.toString(rowKeyComponents[1]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + String flowName = + Separator.decode(Bytes.toString(rowKeyComponents[2]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + Long flowRunId = + LongConverter.invertLong(Bytes.toLong(rowKeyComponents[3])); + return new FlowRunRowKey(clusterId, userId, flowName, flowRunId); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyConverter.java deleted file mode 100644 index 642f065..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyConverter.java +++ /dev/null @@ -1,120 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage.flow; - -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; - -/** - * Encodes and decodes row key for flow run table. - * The row key is of the form : clusterId!userId!flowName!flowrunId. - * flowrunId is a long and rest are strings. - */ -public final class FlowRunRowKeyConverter implements - KeyConverter { - private static final FlowRunRowKeyConverter INSTANCE = - new FlowRunRowKeyConverter(); - - public static FlowRunRowKeyConverter getInstance() { - return INSTANCE; - } - - private FlowRunRowKeyConverter() { - } - - // Flow run row key is of the form - // clusterId!userId!flowName!flowrunId with each segment separated by !. - // The sizes below indicate sizes of each one of these segments in sequence. - // clusterId, userId and flowName are strings. flowrunId is a long hence 8 - // bytes in size. Strings are variable in size (i.e. end whenever separator is - // encountered). This is used while decoding and helps in determining where to - // split. - private static final int[] SEGMENT_SIZES = { - Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, - Bytes.SIZEOF_LONG }; - - /* - * (non-Javadoc) - * - * Encodes FlowRunRowKey object into a byte array with each component/field in - * FlowRunRowKey separated by Separator#QUALIFIERS. This leads to an - * flow run row key of the form clusterId!userId!flowName!flowrunId - * If flowRunId in passed FlowRunRowKey object is null (and the fields - * preceding it i.e. clusterId, userId and flowName are not null), this - * returns a row key prefix of the form clusterId!userName!flowName! - * flowRunId is inverted while encoding as it helps maintain a descending - * order for flow keys in flow run table. - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter - * #encode(java.lang.Object) - */ - @Override - public byte[] encode(FlowRunRowKey rowKey) { - byte[] first = Separator.QUALIFIERS.join( - Separator.encode(rowKey.getClusterId(), Separator.SPACE, Separator.TAB, - Separator.QUALIFIERS), - Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB, - Separator.QUALIFIERS), - Separator.encode(rowKey.getFlowName(), Separator.SPACE, Separator.TAB, - Separator.QUALIFIERS)); - if (rowKey.getFlowRunId() == null) { - return Separator.QUALIFIERS.join(first, Separator.EMPTY_BYTES); - } else { - // Note that flowRunId is a long, so we can't encode them all at the same - // time. - byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong( - rowKey.getFlowRunId())); - return Separator.QUALIFIERS.join(first, second); - } - } - - /* - * (non-Javadoc) - * - * Decodes an flow run row key of the form - * clusterId!userId!flowName!flowrunId represented in byte format and converts - * it into an FlowRunRowKey object. flowRunId is inverted while decoding as - * it was inverted while encoding. - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter - * #decode(byte[]) - */ - @Override - public FlowRunRowKey decode(byte[] rowKey) { - byte[][] rowKeyComponents = - Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES); - if (rowKeyComponents.length != 4) { - throw new IllegalArgumentException("the row key is not valid for " + - "a flow run"); - } - String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[0]), - Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); - String userId = Separator.decode(Bytes.toString(rowKeyComponents[1]), - Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); - String flowName = Separator.decode(Bytes.toString(rowKeyComponents[2]), - Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); - Long flowRunId = - TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3])); - return new FlowRunRowKey(clusterId, userId, flowName, flowRunId); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java new file mode 100644 index 0000000..23ebc66 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix; + +/** + * Represents a partial rowkey (without the flowRunId) for the flow run table. + */ +public class FlowRunRowKeyPrefix extends FlowRunRowKey implements + RowKeyPrefix { + + /** + * Constructs a row key prefix for the flow run table as follows: + * {@code clusterId!userI!flowName!}. + * + * @param clusterId identifying the cluster + * @param userId identifying the user + * @param flowName identifying the flow + */ + public FlowRunRowKeyPrefix(String clusterId, String userId, + String flowName) { + super(clusterId, userId, flowName, null); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.application. + * RowKeyPrefix#getRowKeyPrefix() + */ + public byte[] getRowKeyPrefix() { + // We know we're a FlowRunRowKey with null florRunId, so we can simply + // delegate + return super.getRowKey(); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org