hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From varunsax...@apache.org
Subject [3/4] hadoop git commit: YARN-5170. Eliminate singleton converters and static method access
Date Fri, 10 Jun 2016 15:37:37 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java
index 32ef1c3..d3ef897 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java
@@ -30,14 +30,8 @@ import org.apache.hadoop.hbase.util.Bytes;
  */
 public final class EventColumnNameConverter
     implements KeyConverter<EventColumnName> {
-  private static final EventColumnNameConverter INSTANCE =
-      new EventColumnNameConverter();
 
-  public static EventColumnNameConverter getInstance() {
-    return INSTANCE;
-  }
-
-  private EventColumnNameConverter() {
+  public EventColumnNameConverter() {
   }
 
   // eventId=timestamp=infokey are of types String, Long String
@@ -69,7 +63,7 @@ public final class EventColumnNameConverter
       return Separator.VALUES.join(first, Separator.EMPTY_BYTES);
     }
     byte[] second = Bytes.toBytes(
-        TimelineStorageUtils.invertLong(key.getTimestamp()));
+        LongConverter.invertLong(key.getTimestamp()));
     if (key.getInfoKey() == null) {
       return Separator.VALUES.join(first, second, Separator.EMPTY_BYTES);
     }
@@ -96,7 +90,7 @@ public final class EventColumnNameConverter
     }
     String id = Separator.decode(Bytes.toString(components[0]),
         Separator.VALUES, Separator.TAB, Separator.SPACE);
-    Long ts = TimelineStorageUtils.invertLong(Bytes.toLong(components[1]));
+    Long ts = LongConverter.invertLong(Bytes.toLong(components[1]));
     String infoKey = components[2].length == 0 ? null :
         Separator.decode(Bytes.toString(components[2]),
             Separator.VALUES, Separator.TAB, Separator.SPACE);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java
index 48c56f9..600601a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.timelineservice.storage.common;
 
 import java.io.IOException;
+import java.io.Serializable;
 
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -26,14 +27,15 @@ import org.apache.hadoop.hbase.util.Bytes;
  * Encodes a value by interpreting it as a Long and converting it to bytes and
  * decodes a set of bytes as a Long.
  */
-public final class LongConverter implements NumericValueConverter {
-  private static final LongConverter INSTANCE = new LongConverter();
+public final class LongConverter implements NumericValueConverter,
+    Serializable {
 
-  private LongConverter() {
-  }
+  /**
+   * Added because we implement Comparator<Number>.
+   */
+  private static final long serialVersionUID = 1L;
 
-  public static LongConverter getInstance() {
-    return INSTANCE;
+  public LongConverter() {
   }
 
   @Override
@@ -76,4 +78,17 @@ public final class LongConverter implements NumericValueConverter {
     }
     return sum;
   }
+
+  /**
+   * Converts a timestamp into it's inverse timestamp to be used in (row) keys
+   * where we want to have the most recent timestamp in the top of the table
+   * (scans start at the most recent timestamp first).
+   *
+   * @param key value to be inverted so that the latest version will be first in
+   *          a scan.
+   * @return inverted long
+   */
+  public static long invertLong(long key) {
+    return Long.MAX_VALUE - key;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java
index 3954145..4a724d6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java
@@ -23,13 +23,13 @@ import java.io.IOException;
  * Encodes and decodes column names / row keys which are long.
  */
 public final class LongKeyConverter implements KeyConverter<Long> {
-  private static final LongKeyConverter INSTANCE = new LongKeyConverter();
 
-  public static LongKeyConverter getInstance() {
-    return INSTANCE;
-  }
+  /**
+   * To delegate the actual work to.
+   */
+  private final LongConverter longConverter = new LongConverter();
 
-  private LongKeyConverter() {
+  public LongKeyConverter() {
   }
 
   /*
@@ -44,7 +44,7 @@ public final class LongKeyConverter implements KeyConverter<Long> {
     try {
       // IOException will not be thrown here as we are explicitly passing
       // Long.
-      return LongConverter.getInstance().encodeValue(key);
+      return longConverter.encodeValue(key);
     } catch (IOException e) {
       return null;
     }
@@ -60,7 +60,7 @@ public final class LongKeyConverter implements KeyConverter<Long> {
   @Override
   public Long decode(byte[] bytes) {
     try {
-      return (Long) LongConverter.getInstance().decodeValue(bytes);
+      return (Long) longConverter.decodeValue(bytes);
     } catch (IOException e) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/RowKeyPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/RowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/RowKeyPrefix.java
new file mode 100644
index 0000000..6159dc7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/RowKeyPrefix.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+/**
+ * In queries where a single result is needed, an exact rowkey can be used
+ * through the corresponding rowkey#getRowKey() method. For queries that need to
+ * scan over a range of rowkeys, a partial (the initial part) of rowkeys are
+ * used. Classes implementing RowKeyPrefix indicate that they are the initial
+ * part of rowkeys, with different constructors with fewer number of argument to
+ * form a partial rowkey, a prefix.
+ *
+ * @param <R> indicating the type of rowkey that a particular implementation is
+ *          a prefix for.
+ */
+public interface RowKeyPrefix<R> {
+
+  /**
+   * Create a row key prefix, meaning a partial rowkey that can be used in range
+   * scans. Which fields are included in the prefix will depend on the
+   * constructor of the specific instance that was used. Output depends on which
+   * constructor was used.
+   * @return a prefix of the following form {@code fist!second!...!last!}
+   */
+  byte[] getRowKeyPrefix();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java
index b0f6d55..282848e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java
@@ -24,13 +24,8 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.common;
  * added later, if required in the associated ColumnPrefix implementations.
  */
 public final class StringKeyConverter implements KeyConverter<String> {
-  private static final StringKeyConverter INSTANCE = new StringKeyConverter();
 
-  public static StringKeyConverter getInstance() {
-    return INSTANCE;
-  }
-
-  private StringKeyConverter() {
+  public StringKeyConverter() {
   }
 
   /*

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
index d52a5d7..aa9a793 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
@@ -18,14 +18,12 @@
 package org.apache.hadoop.yarn.server.timelineservice.storage.common;
 
 import java.io.IOException;
-import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.SortedSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,13 +35,10 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter;
 import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
 import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter;
@@ -52,7 +47,6 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilte
 import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
 import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
 import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
@@ -73,19 +67,6 @@ public final class TimelineStorageUtils {
   public static final long MILLIS_ONE_DAY = 86400000L;
 
   /**
-   * Converts a timestamp into it's inverse timestamp to be used in (row) keys
-   * where we want to have the most recent timestamp in the top of the table
-   * (scans start at the most recent timestamp first).
-   *
-   * @param key value to be inverted so that the latest version will be first in
-   *          a scan.
-   * @return inverted long
-   */
-  public static long invertLong(long key) {
-    return Long.MAX_VALUE - key;
-  }
-
-  /**
    * Converts an int into it's inverse int to be used in (row) keys
    * where we want to have the largest int value in the top of the table
    * (scans start at the largest int first).
@@ -164,66 +145,6 @@ public final class TimelineStorageUtils {
   }
 
   /**
-   * checks if an application has finished.
-   *
-   * @param te TimlineEntity object.
-   * @return true if application has finished else false
-   */
-  public static boolean isApplicationFinished(TimelineEntity te) {
-    SortedSet<TimelineEvent> allEvents = te.getEvents();
-    if ((allEvents != null) && (allEvents.size() > 0)) {
-      TimelineEvent event = allEvents.last();
-      if (event.getId().equals(
-          ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  /**
-   * Check if we have a certain field amongst fields to retrieve. This method
-   * checks against {@link Field#ALL} as well because that would mean field
-   * passed needs to be matched.
-   *
-   * @param fieldsToRetrieve fields to be retrieved.
-   * @param requiredField fields to be checked in fieldsToRetrieve.
-   * @return true if has the required field, false otherwise.
-   */
-  public static boolean hasField(EnumSet<Field> fieldsToRetrieve,
-      Field requiredField) {
-    return fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(requiredField);
-  }
-
-  /**
-   * Checks if the input TimelineEntity object is an ApplicationEntity.
-   *
-   * @param te TimelineEntity object.
-   * @return true if input is an ApplicationEntity, false otherwise
-   */
-  public static boolean isApplicationEntity(TimelineEntity te) {
-    return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString());
-  }
-
-  /**
-   * @param te TimelineEntity object.
-   * @param eventId event with this id needs to be fetched
-   * @return TimelineEvent if TimelineEntity contains the desired event.
-   */
-  public static TimelineEvent getApplicationEvent(TimelineEntity te,
-      String eventId) {
-    if (isApplicationEntity(te)) {
-      for (TimelineEvent event : te.getEvents()) {
-        if (event.getId().equals(eventId)) {
-          return event;
-        }
-      }
-    }
-    return null;
-  }
-
-  /**
    * Returns the first seen aggregation operation as seen in the list of input
    * tags or null otherwise.
    *
@@ -646,98 +567,6 @@ public final class TimelineStorageUtils {
     return appId;
   }
 
-  /**
-   * Helper method for reading relationship.
-   *
-   * @param <T> Describes the type of column prefix.
-   * @param entity entity to fill.
-   * @param result result from HBase.
-   * @param prefix column prefix.
-   * @param isRelatedTo if true, means relationship is to be added to
-   *     isRelatedTo, otherwise its added to relatesTo.
-   * @throws IOException if any problem is encountered while reading result.
-   */
-  public static <T> void readRelationship(
-      TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
-      boolean isRelatedTo) throws IOException {
-    // isRelatedTo and relatesTo are of type Map<String, Set<String>>
-    Map<String, Object> columns =
-        prefix.readResults(result, StringKeyConverter.getInstance());
-    for (Map.Entry<String, Object> column : columns.entrySet()) {
-      for (String id : Separator.VALUES.splitEncoded(
-          column.getValue().toString())) {
-        if (isRelatedTo) {
-          entity.addIsRelatedToEntity(column.getKey(), id);
-        } else {
-          entity.addRelatesToEntity(column.getKey(), id);
-        }
-      }
-    }
-  }
-
-  /**
-   * Helper method for reading key-value pairs for either info or config.
-   *
-   * @param <T> Describes the type of column prefix.
-   * @param entity entity to fill.
-   * @param result result from HBase.
-   * @param prefix column prefix.
-   * @param isConfig if true, means we are reading configs, otherwise info.
-   * @throws IOException if any problem is encountered while reading result.
-   */
-  public static <T> void readKeyValuePairs(
-      TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
-      boolean isConfig) throws IOException {
-    // info and configuration are of type Map<String, Object or String>
-    Map<String, Object> columns =
-        prefix.readResults(result, StringKeyConverter.getInstance());
-    if (isConfig) {
-      for (Map.Entry<String, Object> column : columns.entrySet()) {
-        entity.addConfig(column.getKey(), column.getValue().toString());
-      }
-    } else {
-      entity.addInfo(columns);
-    }
-  }
-
-  /**
-   * Read events from the entity table or the application table. The column name
-   * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted
-   * if there is no info associated with the event.
-   *
-   * @param <T> Describes the type of column prefix.
-   * @param entity entity to fill.
-   * @param result HBase Result.
-   * @param prefix column prefix.
-   * @throws IOException if any problem is encountered while reading result.
-   */
-  public static <T> void readEvents(TimelineEntity entity, Result result,
-      ColumnPrefix<T> prefix) throws IOException {
-    Map<String, TimelineEvent> eventsMap = new HashMap<>();
-    Map<EventColumnName, Object> eventsResult =
-        prefix.readResults(result, EventColumnNameConverter.getInstance());
-    for (Map.Entry<EventColumnName, Object>
-             eventResult : eventsResult.entrySet()) {
-      EventColumnName eventColumnName = eventResult.getKey();
-      String key = eventColumnName.getId() +
-          Long.toString(eventColumnName.getTimestamp());
-      // Retrieve previously seen event to add to it
-      TimelineEvent event = eventsMap.get(key);
-      if (event == null) {
-        // First time we're seeing this event, add it to the eventsMap
-        event = new TimelineEvent();
-        event.setId(eventColumnName.getId());
-        event.setTimestamp(eventColumnName.getTimestamp());
-        eventsMap.put(key, event);
-      }
-      if (eventColumnName.getInfoKey() != null) {
-        event.addInfo(eventColumnName.getInfoKey(), eventResult.getValue());
-      }
-    }
-    Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
-    entity.addEvents(eventsSet);
-  }
-
   public static boolean isFlowRunTable(HRegionInfo hRegionInfo,
       Configuration conf) {
     String regionTableName = hRegionInfo.getTable().getNameAsString();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
index 775879a..93b4b36 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
@@ -49,8 +49,7 @@ public enum EntityColumn implements Column<EntityTable> {
   /**
    * When the entity was created.
    */
-  CREATED_TIME(EntityColumnFamily.INFO, "created_time",
-      LongConverter.getInstance()),
+  CREATED_TIME(EntityColumnFamily.INFO, "created_time", new LongConverter()),
 
   /**
    * The version of the flow that this entity belongs to.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
index 02a4bb3..e410549 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
@@ -67,8 +67,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
   /**
    * Metrics are stored with the metric name as the column name.
    */
-  METRIC(EntityColumnFamily.METRICS, null,
-      LongConverter.getInstance());
+  METRIC(EntityColumnFamily.METRICS, null, new LongConverter());
 
   private final ColumnHelper<EntityTable> column;
   private final ColumnFamily<EntityTable> columnFamily;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
index 6d08390..ff22178 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
@@ -17,6 +17,12 @@
  */
 package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
 
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
 /**
  * Represents a rowkey for the entity table.
  */
@@ -28,6 +34,8 @@ public class EntityRowKey {
   private final String appId;
   private final String entityType;
   private final String entityId;
+  private final KeyConverter<EntityRowKey> entityRowKeyConverter =
+      new EntityRowKeyConverter();
 
   public EntityRowKey(String clusterId, String userId, String flowName,
       Long flowRunId, String appId, String entityType, String entityId) {
@@ -69,61 +77,14 @@ public class EntityRowKey {
   }
 
   /**
-   * Constructs a row key prefix for the entity table as follows:
-   * {@code userName!clusterId!flowName!flowRunId!AppId}.
-   *
-   * @param clusterId Context cluster id.
-   * @param userId User name.
-   * @param flowName Flow name.
-   * @param flowRunId Run Id for the flow.
-   * @param appId Application Id.
-   * @return byte array with the row key prefix.
-   */
-  public static byte[] getRowKeyPrefix(String clusterId, String userId,
-      String flowName, Long flowRunId, String appId) {
-    return EntityRowKeyConverter.getInstance().encode(new EntityRowKey(
-        clusterId, userId, flowName, flowRunId, appId, null, null));
-  }
-
-  /**
-   * Constructs a row key prefix for the entity table as follows:
-   * {@code userName!clusterId!flowName!flowRunId!AppId!entityType!}.
-   * Typically used while querying multiple entities of a particular entity
-   * type.
-   *
-   * @param clusterId Context cluster id.
-   * @param userId User name.
-   * @param flowName Flow name.
-   * @param flowRunId Run Id for the flow.
-   * @param appId Application Id.
-   * @param entityType Entity type.
-   * @return byte array with the row key prefix.
-   */
-  public static byte[] getRowKeyPrefix(String clusterId, String userId,
-      String flowName, Long flowRunId, String appId, String entityType) {
-    return EntityRowKeyConverter.getInstance().encode(new EntityRowKey(
-        clusterId, userId, flowName, flowRunId, appId, entityType, null));
-  }
-
-  /**
    * Constructs a row key for the entity table as follows:
    * {@code userName!clusterId!flowName!flowRunId!AppId!entityType!entityId}.
    * Typically used while querying a specific entity.
    *
-   * @param clusterId Context cluster id.
-   * @param userId User name.
-   * @param flowName Flow name.
-   * @param flowRunId Run Id for the flow.
-   * @param appId Application Id.
-   * @param entityType Entity type.
-   * @param entityId Entity Id.
    * @return byte array with the row key.
    */
-  public static byte[] getRowKey(String clusterId, String userId,
-      String flowName, Long flowRunId, String appId, String entityType,
-      String entityId) {
-    return EntityRowKeyConverter.getInstance().encode(new EntityRowKey(
-        clusterId, userId, flowName, flowRunId, appId, entityType, entityId));
+  public byte[] getRowKey() {
+    return entityRowKeyConverter.encode(this);
   }
 
   /**
@@ -133,6 +94,132 @@ public class EntityRowKey {
    * @return An <cite>EntityRowKey</cite> object.
    */
   public static EntityRowKey parseRowKey(byte[] rowKey) {
-    return EntityRowKeyConverter.getInstance().decode(rowKey);
+    return new EntityRowKeyConverter().decode(rowKey);
+  }
+
+  /**
+   * Encodes and decodes row key for entity table. The row key is of the form :
+   * userName!clusterId!flowName!flowRunId!appId!entityType!entityId. flowRunId
+   * is a long, appId is encoded/decoded using {@link AppIdKeyConverter} and
+   * rest are strings.
+   * <p>
+   */
+  final private static class EntityRowKeyConverter implements
+      KeyConverter<EntityRowKey> {
+
+    private final AppIdKeyConverter appIDKeyConverter = new AppIdKeyConverter();
+
+    private EntityRowKeyConverter() {
+    }
+
+    /**
+     * Entity row key is of the form
+     * userName!clusterId!flowName!flowRunId!appId!entityType!entityId w. each
+     * segment separated by !. The sizes below indicate sizes of each one of
+     * these segments in sequence. clusterId, userName, flowName, entityType and
+     * entityId are strings. flowrunId is a long hence 8 bytes in size. app id
+     * is represented as 12 bytes with cluster timestamp part of appid being 8
+     * bytes (long) and seq id being 4 bytes(int). Strings are variable in size
+     * (i.e. end whenever separator is encountered). This is used while decoding
+     * and helps in determining where to split.
+     */
+    private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE,
+        Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
+        AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE,
+        Separator.VARIABLE_SIZE };
+
+    /*
+     * (non-Javadoc)
+     *
+     * Encodes EntityRowKey object into a byte array with each component/field
+     * in EntityRowKey separated by Separator#QUALIFIERS. This leads to an
+     * entity table row key of the form
+     * userName!clusterId!flowName!flowRunId!appId!entityType!entityId If
+     * entityType in passed EntityRowKey object is null (and the fields
+     * preceding it i.e. clusterId, userId and flowName, flowRunId and appId
+     * are not null), this returns a row key prefix of the form
+     * userName!clusterId!flowName!flowRunId!appId! and if entityId in
+     * EntityRowKey is null (other 6 components are not null), this returns a
+     * row key prefix of the form
+     * userName!clusterId!flowName!flowRunId!appId!entityType! flowRunId is
+     * inverted while encoding as it helps maintain a descending order for row
+     * keys in entity table.
+     *
+     * @see org.apache.hadoop.yarn.server.timelineservice.storage.common
+     * .KeyConverter#encode(java.lang.Object)
+     */
+    @Override
+    public byte[] encode(EntityRowKey rowKey) {
+      byte[] user =
+          Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB,
+              Separator.QUALIFIERS);
+      byte[] cluster =
+          Separator.encode(rowKey.getClusterId(), Separator.SPACE,
+              Separator.TAB, Separator.QUALIFIERS);
+      byte[] flow =
+          Separator.encode(rowKey.getFlowName(), Separator.SPACE,
+              Separator.TAB, Separator.QUALIFIERS);
+      byte[] first = Separator.QUALIFIERS.join(user, cluster, flow);
+      // Note that flowRunId is a long, so we can't encode them all at the same
+      // time.
+      byte[] second =
+          Bytes.toBytes(LongConverter.invertLong(rowKey.getFlowRunId()));
+      byte[] third = appIDKeyConverter.encode(rowKey.getAppId());
+      if (rowKey.getEntityType() == null) {
+        return Separator.QUALIFIERS.join(first, second, third,
+            Separator.EMPTY_BYTES);
+      }
+      byte[] entityType =
+          Separator.encode(rowKey.getEntityType(), Separator.SPACE,
+              Separator.TAB, Separator.QUALIFIERS);
+      byte[] entityId =
+          rowKey.getEntityId() == null ? Separator.EMPTY_BYTES : Separator
+              .encode(rowKey.getEntityId(), Separator.SPACE, Separator.TAB,
+                  Separator.QUALIFIERS);
+      byte[] fourth = Separator.QUALIFIERS.join(entityType, entityId);
+      return Separator.QUALIFIERS.join(first, second, third, fourth);
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * Decodes an application row key of the form
+     * userName!clusterId!flowName!flowRunId!appId!entityType!entityId
+     * represented in byte format and converts it into an EntityRowKey object.
+     * flowRunId is inverted while decoding as it was inverted while encoding.
+     *
+     * @see
+     * org.apache.hadoop.yarn.server.timelineservice.storage.common
+     * .KeyConverter#decode(byte[])
+     */
+    @Override
+    public EntityRowKey decode(byte[] rowKey) {
+      byte[][] rowKeyComponents =
+          Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
+      if (rowKeyComponents.length != 7) {
+        throw new IllegalArgumentException("the row key is not valid for "
+            + "an entity");
+      }
+      String userId =
+          Separator.decode(Bytes.toString(rowKeyComponents[0]),
+              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+      String clusterId =
+          Separator.decode(Bytes.toString(rowKeyComponents[1]),
+              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+      String flowName =
+          Separator.decode(Bytes.toString(rowKeyComponents[2]),
+              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+      Long flowRunId =
+          LongConverter.invertLong(Bytes.toLong(rowKeyComponents[3]));
+      String appId = appIDKeyConverter.decode(rowKeyComponents[4]);
+      String entityType =
+          Separator.decode(Bytes.toString(rowKeyComponents[5]),
+              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+      String entityId =
+          Separator.decode(Bytes.toString(rowKeyComponents[6]),
+              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+      return new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
+          entityType, entityId);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java
deleted file mode 100644
index 43c0569..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-
-/**
- * Encodes and decodes row key for entity table.
- * The row key is of the form :
- * userName!clusterId!flowName!flowRunId!appId!entityType!entityId.
- * flowRunId is a long, appId is encoded/decoded using
- * {@link AppIdKeyConverter} and rest are strings.
- */
-public final class EntityRowKeyConverter implements KeyConverter<EntityRowKey> {
-  private static final EntityRowKeyConverter INSTANCE =
-      new EntityRowKeyConverter();
-
-  public static EntityRowKeyConverter getInstance() {
-    return INSTANCE;
-  }
-
-  private EntityRowKeyConverter() {
-  }
-
-  // Entity row key is of the form
-  // userName!clusterId!flowName!flowRunId!appId!entityType!entityId with each
-  // segment separated by !. The sizes below indicate sizes of each one of these
-  // segements in sequence. clusterId, userName, flowName, entityType and
-  // entityId are strings. flowrunId is a long hence 8 bytes in size. app id is
-  // represented as 12 bytes with cluster timestamp part of appid being 8 bytes
-  // (long) and seq id being 4 bytes(int).
-  // Strings are variable in size (i.e. end whenever separator is encountered).
-  // This is used while decoding and helps in determining where to split.
-  private static final int[] SEGMENT_SIZES = {
-      Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
-      Bytes.SIZEOF_LONG, AppIdKeyConverter.getKeySize(),
-      Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE };
-
-  /*
-   * (non-Javadoc)
-   *
-   * Encodes EntityRowKey object into a byte array with each component/field in
-   * EntityRowKey separated by Separator#QUALIFIERS. This leads to an entity
-   * table row key of the form
-   * userName!clusterId!flowName!flowRunId!appId!entityType!entityId
-   * If entityType in passed EntityRowKey object is null (and the fields
-   * preceding it i.e. clusterId, userId and flowName, flowRunId and appId are
-   * not null), this returns a row key prefix of the form
-   * userName!clusterId!flowName!flowRunId!appId! and if entityId in
-   * EntityRowKey is null (other 6 components are not null), this returns a row
-   * key prefix of the form
-   * userName!clusterId!flowName!flowRunId!appId!entityType!
-   * flowRunId is inverted while encoding as it helps maintain a descending
-   * order for row keys in entity table.
-   *
-   * @see
-   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
-   * #encode(java.lang.Object)
-   */
-  @Override
-  public byte[] encode(EntityRowKey rowKey) {
-    byte[] user = Separator.encode(rowKey.getUserId(),
-        Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
-    byte[] cluster = Separator.encode(rowKey.getClusterId(),
-        Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
-    byte[] flow = Separator.encode(rowKey.getFlowName(),
-        Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
-    byte[] first = Separator.QUALIFIERS.join(user, cluster, flow);
-    // Note that flowRunId is a long, so we can't encode them all at the same
-    // time.
-    byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(
-        rowKey.getFlowRunId()));
-    byte[] third = AppIdKeyConverter.getInstance().encode(rowKey.getAppId());
-    if (rowKey.getEntityType() == null) {
-      return Separator.QUALIFIERS.join(
-          first, second, third, Separator.EMPTY_BYTES);
-    }
-    byte[] entityType = Separator.encode(rowKey.getEntityType(),
-        Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
-    byte[] entityId = rowKey.getEntityId() == null ? Separator.EMPTY_BYTES :
-        Separator.encode(rowKey.getEntityId(), Separator.SPACE, Separator.TAB,
-        Separator.QUALIFIERS);
-    byte[] fourth = Separator.QUALIFIERS.join(entityType, entityId);
-    return Separator.QUALIFIERS.join(first, second, third, fourth);
-  }
-
-  /*
-   * (non-Javadoc)
-   *
-   * Decodes an application row key of the form
-   * userName!clusterId!flowName!flowRunId!appId!entityType!entityId represented
-   * in byte format and converts it into an EntityRowKey object. flowRunId is
-   * inverted while decoding as it was inverted while encoding.
-   *
-   * @see
-   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
-   * #decode(byte[])
-   */
-  @Override
-  public EntityRowKey decode(byte[] rowKey) {
-    byte[][] rowKeyComponents =
-        Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
-    if (rowKeyComponents.length != 7) {
-      throw new IllegalArgumentException("the row key is not valid for " +
-          "an entity");
-    }
-    String userId = Separator.decode(Bytes.toString(rowKeyComponents[0]),
-        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
-    String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[1]),
-        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
-    String flowName = Separator.decode(Bytes.toString(rowKeyComponents[2]),
-        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
-    Long flowRunId =
-        TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
-    String appId = AppIdKeyConverter.getInstance().decode(rowKeyComponents[4]);
-    String entityType = Separator.decode(Bytes.toString(rowKeyComponents[5]),
-        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
-    String entityId =Separator.decode(Bytes.toString(rowKeyComponents[6]),
-        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
-    return new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
-        entityType, entityId);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java
new file mode 100644
index 0000000..9146180
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
+
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
+
+/**
+ * Represents a partial rowkey without the entityId or without entityType and
+ * entityId for the entity table.
+ *
+ */
+public class EntityRowKeyPrefix extends EntityRowKey implements
+    RowKeyPrefix<EntityRowKey> {
+
+  /**
+   * Creates a prefix which generates the following rowKeyPrefixes for the
+   * entity table:
+   * {@code userName!clusterId!flowName!flowRunId!AppId!entityType!}.
+   *
+   * @param clusterId identifying the cluster
+   * @param userId identifying the user
+   * @param flowName identifying the flow
+   * @param flowRunId identifying the individual run of this flow
+   * @param appId identifying the application
+   * @param entityType which entity type
+   */
+  public EntityRowKeyPrefix(String clusterId, String userId, String flowName,
+      Long flowRunId, String appId, String entityType) {
+    super(clusterId, userId, flowName, flowRunId, appId, entityType, null);
+  }
+
+  /**
+   * Creates a prefix which generates the following rowKeyPrefixes for the
+   * entity table:
+   * {@code userName!clusterId!flowName!flowRunId!AppId!entityType!entityId}.
+   *
+   * @param clusterId identifying the cluster
+   * @param userId identifying the user
+   * @param flowName identifying the flow
+   * @param flowRunId identifying the individual run of this flow
+   * @param appId identifying the application
+   */
+  public EntityRowKeyPrefix(String clusterId, String userId, String flowName,
+      Long flowRunId, String appId) {
+    super(clusterId, userId, flowName, flowRunId, appId, null, null);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.application.
+   * RowKeyPrefix#getRowKeyPrefix()
+   */
+  public byte[] getRowKeyPrefix() {
+    return super.getRowKey();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
index eea38a5..d10608a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
@@ -17,6 +17,10 @@
  */
 package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
 
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 
 /**
@@ -28,11 +32,37 @@ public class FlowActivityRowKey {
   private final Long dayTs;
   private final String userId;
   private final String flowName;
+  private final KeyConverter<FlowActivityRowKey> flowActivityRowKeyConverter =
+      new FlowActivityRowKeyConverter();
 
+  /**
+   * @param clusterId identifying the cluster
+   * @param dayTs to be converted to the top of the day timestamp
+   * @param userId identifying user
+   * @param flowName identifying the flow
+   */
   public FlowActivityRowKey(String clusterId, Long dayTs, String userId,
       String flowName) {
+    this(clusterId, dayTs, userId, flowName, true);
+  }
+
+  /**
+   * @param clusterId identifying the cluster
+   * @param timestamp when the flow activity happened. May be converted to the
+   *          top of the day depending on the convertDayTsToTopOfDay argument.
+   * @param userId identifying user
+   * @param flowName identifying the flow
+   * @param convertDayTsToTopOfDay if true and timestamp isn't null, then
+   *          timestamp will be converted to the top-of-the day timestamp
+   */
+  protected FlowActivityRowKey(String clusterId, Long timestamp, String userId,
+      String flowName, boolean convertDayTsToTopOfDay) {
     this.clusterId = clusterId;
-    this.dayTs = dayTs;
+    if (convertDayTsToTopOfDay && (timestamp != null)) {
+      this.dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(timestamp);
+    } else {
+      this.dayTs = timestamp;
+    }
     this.userId = userId;
     this.flowName = flowName;
   }
@@ -54,46 +84,13 @@ public class FlowActivityRowKey {
   }
 
   /**
-   * Constructs a row key prefix for the flow activity table as follows:
-   * {@code clusterId!}.
-   *
-   * @param clusterId Cluster Id.
-   * @return byte array with the row key prefix
-   */
-  public static byte[] getRowKeyPrefix(String clusterId) {
-    return FlowActivityRowKeyConverter.getInstance().encode(
-        new FlowActivityRowKey(clusterId, null, null, null));
-  }
-
-  /**
-   * Constructs a row key prefix for the flow activity table as follows:
-   * {@code clusterId!dayTimestamp!}.
-   *
-   * @param clusterId Cluster Id.
-   * @param dayTs Start of the day timestamp.
-   * @return byte array with the row key prefix
-   */
-  public static byte[] getRowKeyPrefix(String clusterId, long dayTs) {
-    return FlowActivityRowKeyConverter.getInstance().encode(
-        new FlowActivityRowKey(clusterId, dayTs, null, null));
-  }
-
-  /**
    * Constructs a row key for the flow activity table as follows:
    * {@code clusterId!dayTimestamp!user!flowName}.
    *
-   * @param clusterId Cluster Id.
-   * @param eventTs event's TimeStamp.
-   * @param userId User Id.
-   * @param flowName Flow Name.
    * @return byte array for the row key
    */
-  public static byte[] getRowKey(String clusterId, long eventTs, String userId,
-      String flowName) {
-    // convert it to Day's time stamp
-    eventTs = TimelineStorageUtils.getTopOfTheDayTimestamp(eventTs);
-    return FlowActivityRowKeyConverter.getInstance().encode(
-        new FlowActivityRowKey(clusterId, eventTs, userId, flowName));
+  public byte[] getRowKey() {
+    return flowActivityRowKeyConverter.encode(this);
   }
 
   /**
@@ -103,6 +100,97 @@ public class FlowActivityRowKey {
    * @return A <cite>FlowActivityRowKey</cite> object.
    */
   public static FlowActivityRowKey parseRowKey(byte[] rowKey) {
-    return FlowActivityRowKeyConverter.getInstance().decode(rowKey);
+    return new FlowActivityRowKeyConverter().decode(rowKey);
+  }
+
+  /**
+   * Encodes and decodes row key for flow activity table. The row key is of the
+   * form : clusterId!dayTimestamp!user!flowName. dayTimestamp(top of the day
+   * timestamp) is a long and rest are strings.
+   * <p>
+   */
+  final private static class FlowActivityRowKeyConverter implements
+      KeyConverter<FlowActivityRowKey> {
+
+    private FlowActivityRowKeyConverter() {
+    }
+
+    /**
+     * The flow activity row key is of the form
+     * clusterId!dayTimestamp!user!flowName with each segment separated by !.
+     * The sizes below indicate sizes of each one of these segements in
+     * sequence. clusterId, user and flowName are strings. Top of the day
+     * timestamp is a long hence 8 bytes in size. Strings are variable in size
+     * (i.e. they end whenever separator is encountered). This is used while
+     * decoding and helps in determining where to split.
+     */
+    private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE,
+        Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE };
+
+    /*
+     * (non-Javadoc)
+     *
+     * Encodes FlowActivityRowKey object into a byte array with each
+     * component/field in FlowActivityRowKey separated by Separator#QUALIFIERS.
+     * This leads to an flow activity table row key of the form
+     * clusterId!dayTimestamp!user!flowName. If dayTimestamp in passed
+     * FlowActivityRowKey object is null and clusterId is not null, then this
+     * returns a row key prefix as clusterId! and if userId in
+     * FlowActivityRowKey is null (and the fields preceding it i.e. clusterId
+     * and dayTimestamp are not null), this returns a row key prefix as
+     * clusterId!dayTimeStamp! dayTimestamp is inverted while encoding as it
+     * helps maintain a descending order for row keys in flow activity table.
+     *
+     * @see org.apache.hadoop.yarn.server.timelineservice.storage.common
+     * .KeyConverter#encode(java.lang.Object)
+     */
+    @Override
+    public byte[] encode(FlowActivityRowKey rowKey) {
+      if (rowKey.getDayTimestamp() == null) {
+        return Separator.QUALIFIERS.join(Separator.encode(
+            rowKey.getClusterId(), Separator.SPACE, Separator.TAB,
+            Separator.QUALIFIERS), Separator.EMPTY_BYTES);
+      }
+      if (rowKey.getUserId() == null) {
+        return Separator.QUALIFIERS.join(Separator.encode(
+            rowKey.getClusterId(), Separator.SPACE, Separator.TAB,
+            Separator.QUALIFIERS), Bytes.toBytes(LongConverter
+            .invertLong(rowKey.getDayTimestamp())), Separator.EMPTY_BYTES);
+      }
+      return Separator.QUALIFIERS.join(Separator.encode(rowKey.getClusterId(),
+          Separator.SPACE, Separator.TAB, Separator.QUALIFIERS), Bytes
+          .toBytes(LongConverter.invertLong(rowKey.getDayTimestamp())),
+          Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB,
+              Separator.QUALIFIERS), Separator.encode(rowKey.getFlowName(),
+              Separator.SPACE, Separator.TAB, Separator.QUALIFIERS));
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see
+     * org.apache.hadoop.yarn.server.timelineservice.storage.common
+     * .KeyConverter#decode(byte[])
+     */
+    @Override
+    public FlowActivityRowKey decode(byte[] rowKey) {
+      byte[][] rowKeyComponents =
+          Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
+      if (rowKeyComponents.length != 4) {
+        throw new IllegalArgumentException("the row key is not valid for "
+            + "a flow activity");
+      }
+      String clusterId =
+          Separator.decode(Bytes.toString(rowKeyComponents[0]),
+              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+      Long dayTs = LongConverter.invertLong(Bytes.toLong(rowKeyComponents[1]));
+      String userId =
+          Separator.decode(Bytes.toString(rowKeyComponents[2]),
+              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+      String flowName =
+          Separator.decode(Bytes.toString(rowKeyComponents[3]),
+              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+      return new FlowActivityRowKey(clusterId, dayTs, userId, flowName);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyConverter.java
deleted file mode 100644
index 9dc4c98..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyConverter.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-
-/**
- * Encodes and decodes row key for flow activity table.
- * The row key is of the form : clusterId!dayTimestamp!user!flowName.
- * dayTimestamp(top of the day timestamp) is a long and rest are strings.
- */
-public final class FlowActivityRowKeyConverter implements
-    KeyConverter<FlowActivityRowKey> {
-  private static final FlowActivityRowKeyConverter INSTANCE =
-      new FlowActivityRowKeyConverter();
-
-  public static FlowActivityRowKeyConverter getInstance() {
-    return INSTANCE;
-  }
-
-  private FlowActivityRowKeyConverter() {
-  }
-
-  // Flow activity row key is of the form clusterId!dayTimestamp!user!flowName
-  // with each segment separated by !. The sizes below indicate sizes of each
-  // one of these segements in sequence. clusterId, user and flowName are
-  // strings. Top of the day timestamp is a long hence 8 bytes in size.
-  // Strings are variable in size (i.e. end whenever separator is encountered).
-  // This is used while decoding and helps in determining where to split.
-  private static final int[] SEGMENT_SIZES = {
-      Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE,
-      Separator.VARIABLE_SIZE };
-
-  /*
-   * (non-Javadoc)
-   *
-   * Encodes FlowActivityRowKey object into a byte array with each
-   * component/field in FlowActivityRowKey separated by Separator#QUALIFIERS.
-   * This leads to an flow activity table row key of the form
-   * clusterId!dayTimestamp!user!flowName
-   * If dayTimestamp in passed FlowActivityRowKey object is null and clusterId
-   * is not null, this returns a row key prefix as clusterId! and if userId in
-   * FlowActivityRowKey is null (and the fields preceding it i.e. clusterId and
-   * dayTimestamp are not null), this returns a row key prefix as
-   * clusterId!dayTimeStamp!
-   * dayTimestamp is inverted while encoding as it helps maintain a descending
-   * order for row keys in flow activity table.
-   *
-   * @see
-   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
-   * #encode(java.lang.Object)
-   */
-
-  @Override
-  public byte[] encode(FlowActivityRowKey rowKey) {
-    if (rowKey.getDayTimestamp() == null) {
-      return Separator.QUALIFIERS.join(Separator.encode(rowKey.getClusterId(),
-          Separator.SPACE, Separator.TAB, Separator.QUALIFIERS),
-              Separator.EMPTY_BYTES);
-    }
-    if (rowKey.getUserId() == null) {
-      return Separator.QUALIFIERS.join(Separator.encode(rowKey.getClusterId(),
-          Separator.SPACE, Separator.TAB, Separator.QUALIFIERS),
-          Bytes.toBytes(TimelineStorageUtils.invertLong(
-              rowKey.getDayTimestamp())), Separator.EMPTY_BYTES);
-    }
-    return Separator.QUALIFIERS.join(
-        Separator.encode(rowKey.getClusterId(), Separator.SPACE, Separator.TAB,
-            Separator.QUALIFIERS),
-        Bytes.toBytes(
-            TimelineStorageUtils.invertLong(rowKey.getDayTimestamp())),
-        Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB,
-            Separator.QUALIFIERS),
-        Separator.encode(rowKey.getFlowName(), Separator.SPACE, Separator.TAB,
-            Separator.QUALIFIERS));
-  }
-
-  @Override
-  public FlowActivityRowKey decode(byte[] rowKey) {
-    byte[][] rowKeyComponents =
-        Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
-    if (rowKeyComponents.length != 4) {
-      throw new IllegalArgumentException("the row key is not valid for "
-          + "a flow activity");
-    }
-    String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[0]),
-        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
-    Long dayTs =
-        TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[1]));
-    String userId = Separator.decode(Bytes.toString(rowKeyComponents[2]),
-        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
-    String flowName = Separator.decode(Bytes.toString(rowKeyComponents[3]),
-        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
-    return new FlowActivityRowKey(clusterId, dayTs, userId, flowName);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyPrefix.java
new file mode 100644
index 0000000..eb88e54
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyPrefix.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
+
+/**
+ * A prefix partial rowkey for flow activities.
+ */
+public class FlowActivityRowKeyPrefix extends FlowActivityRowKey implements
+    RowKeyPrefix<FlowActivityRowKey> {
+
+  /**
+   * Constructs a row key prefix for the flow activity table as follows:
+   * {@code clusterId!dayTimestamp!}.
+   *
+   * @param clusterId Cluster Id.
+   * @param dayTs Start of the day timestamp.
+   */
+  public FlowActivityRowKeyPrefix(String clusterId, Long dayTs) {
+    super(clusterId, dayTs, null, null, false);
+  }
+
+  /**
+   * Constructs a row key prefix for the flow activity table as follows:
+   * {@code clusterId!}.
+   *
+   * @param clusterId identifying the cluster
+   */
+  public FlowActivityRowKeyPrefix(String clusterId) {
+    super(clusterId, null, null, null, false);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.application.
+   * RowKeyPrefix#getRowKeyPrefix()
+   */
+  public byte[] getRowKeyPrefix() {
+    return super.getRowKey();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
index f1553b8..2e7a9d8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
@@ -25,10 +25,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
 
 /**
@@ -41,14 +41,14 @@ public enum FlowRunColumn implements Column<FlowRunTable> {
    * application start times.
    */
   MIN_START_TIME(FlowRunColumnFamily.INFO, "min_start_time",
-      AggregationOperation.GLOBAL_MIN, LongConverter.getInstance()),
+      AggregationOperation.GLOBAL_MIN, new LongConverter()),
 
   /**
    * When the flow ended. This is the maximum of currently known application end
    * times.
    */
   MAX_END_TIME(FlowRunColumnFamily.INFO, "max_end_time",
-      AggregationOperation.GLOBAL_MAX, LongConverter.getInstance()),
+      AggregationOperation.GLOBAL_MAX, new LongConverter()),
 
   /**
    * The version of the flow that this flow belongs to.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
index 0f14c89..e74282a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
@@ -41,7 +41,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
   /**
    * To store flow run info values.
    */
-  METRIC(FlowRunColumnFamily.INFO, "m", null, LongConverter.getInstance());
+  METRIC(FlowRunColumnFamily.INFO, "m", null, new LongConverter());
 
   private final ColumnHelper<FlowRunTable> column;
   private final ColumnFamily<FlowRunTable> columnFamily;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
index 925242b..8fda9a8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
@@ -17,6 +17,11 @@
  */
 package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
 
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
 /**
  * Represents a rowkey for the flow run table.
  */
@@ -25,6 +30,8 @@ public class FlowRunRowKey {
   private final String userId;
   private final String flowName;
   private final Long flowRunId;
+  private final FlowRunRowKeyConverter flowRunRowKeyConverter =
+      new FlowRunRowKeyConverter();
 
   public FlowRunRowKey(String clusterId, String userId, String flowName,
       Long flowRunId) {
@@ -51,36 +58,16 @@ public class FlowRunRowKey {
   }
 
   /**
-   * Constructs a row key prefix for the flow run table as follows: {
-   * clusterId!userI!flowName!}.
-   *
-   * @param clusterId Cluster Id.
-   * @param userId User Id.
-   * @param flowName Flow Name.
-   * @return byte array with the row key prefix
-   */
-  public static byte[] getRowKeyPrefix(String clusterId, String userId,
-      String flowName) {
-    return FlowRunRowKeyConverter.getInstance().encode(new FlowRunRowKey(
-        clusterId, userId, flowName, null));
-  }
-
-  /**
    * Constructs a row key for the entity table as follows: {
    * clusterId!userId!flowName!Inverted Flow Run Id}.
    *
-   * @param clusterId Cluster Id.
-   * @param userId User Id.
-   * @param flowName Flow Name.
-   * @param flowRunId Run Id for the flow name.
    * @return byte array with the row key
    */
-  public static byte[] getRowKey(String clusterId, String userId,
-      String flowName, Long flowRunId) {
-    return FlowRunRowKeyConverter.getInstance().encode(new FlowRunRowKey(
-        clusterId, userId, flowName, flowRunId));
+  public byte[] getRowKey() {
+    return flowRunRowKeyConverter.encode(this);
   }
 
+
   /**
    * Given the raw row key as bytes, returns the row key as an object.
    *
@@ -88,7 +75,7 @@ public class FlowRunRowKey {
    * @return A <cite>FlowRunRowKey</cite> object.
    */
   public static FlowRunRowKey parseRowKey(byte[] rowKey) {
-    return FlowRunRowKeyConverter.getInstance().decode(rowKey);
+    return new FlowRunRowKeyConverter().decode(rowKey);
   }
 
   /**
@@ -106,4 +93,98 @@ public class FlowRunRowKey {
     flowKeyStr.append("}");
     return flowKeyStr.toString();
   }
+
+  /**
+   * Encodes and decodes row key for flow run table.
+   * The row key is of the form : clusterId!userId!flowName!flowrunId.
+   * flowrunId is a long and rest are strings.
+   * <p>
+   */
+  final private static class FlowRunRowKeyConverter implements
+      KeyConverter<FlowRunRowKey> {
+
+    private FlowRunRowKeyConverter() {
+    }
+
+    /**
+     * The flow run row key is of the form clusterId!userId!flowName!flowrunId
+     * with each segment separated by !. The sizes below indicate sizes of each
+     * one of these segments in sequence. clusterId, userId and flowName are
+     * strings. flowrunId is a long hence 8 bytes in size. Strings are variable
+     * in size (i.e. end whenever separator is encountered). This is used while
+     * decoding and helps in determining where to split.
+     */
+    private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE,
+        Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG };
+
+    /*
+     * (non-Javadoc)
+     *
+     * Encodes FlowRunRowKey object into a byte array with each component/field
+     * in FlowRunRowKey separated by Separator#QUALIFIERS. This leads to an flow
+     * run row key of the form clusterId!userId!flowName!flowrunId If flowRunId
+     * in passed FlowRunRowKey object is null (and the fields preceding it i.e.
+     * clusterId, userId and flowName are not null), this returns a row key
+     * prefix of the form clusterId!userName!flowName! flowRunId is inverted
+     * while encoding as it helps maintain a descending order for flow keys in
+     * flow run table.
+     *
+     * @see
+     * org.apache.hadoop.yarn.server.timelineservice.storage.common
+     * .KeyConverter#encode(java.lang.Object)
+     */
+    @Override
+    public byte[] encode(FlowRunRowKey rowKey) {
+      byte[] first =
+          Separator.QUALIFIERS.join(Separator.encode(rowKey.getClusterId(),
+              Separator.SPACE, Separator.TAB, Separator.QUALIFIERS), Separator
+              .encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB,
+                  Separator.QUALIFIERS), Separator.encode(rowKey.getFlowName(),
+              Separator.SPACE, Separator.TAB, Separator.QUALIFIERS));
+      if (rowKey.getFlowRunId() == null) {
+        return Separator.QUALIFIERS.join(first, Separator.EMPTY_BYTES);
+      } else {
+        // Note that flowRunId is a long, so we can't encode them all at the
+        // same
+        // time.
+        byte[] second =
+            Bytes.toBytes(LongConverter.invertLong(rowKey.getFlowRunId()));
+        return Separator.QUALIFIERS.join(first, second);
+      }
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * Decodes an flow run row key of the form
+     * clusterId!userId!flowName!flowrunId represented in byte format and
+     * converts it into an FlowRunRowKey object. flowRunId is inverted while
+     * decoding as it was inverted while encoding.
+     *
+     * @see
+     * org.apache.hadoop.yarn.server.timelineservice.storage.common
+     * .KeyConverter#decode(byte[])
+     */
+    @Override
+    public FlowRunRowKey decode(byte[] rowKey) {
+      byte[][] rowKeyComponents =
+          Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
+      if (rowKeyComponents.length != 4) {
+        throw new IllegalArgumentException("the row key is not valid for "
+            + "a flow run");
+      }
+      String clusterId =
+          Separator.decode(Bytes.toString(rowKeyComponents[0]),
+              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+      String userId =
+          Separator.decode(Bytes.toString(rowKeyComponents[1]),
+              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+      String flowName =
+          Separator.decode(Bytes.toString(rowKeyComponents[2]),
+              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+      Long flowRunId =
+          LongConverter.invertLong(Bytes.toLong(rowKeyComponents[3]));
+      return new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyConverter.java
deleted file mode 100644
index 642f065..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyConverter.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-
-/**
- * Encodes and decodes row key for flow run table.
- * The row key is of the form : clusterId!userId!flowName!flowrunId.
- * flowrunId is a long and rest are strings.
- */
-public final class FlowRunRowKeyConverter implements
-    KeyConverter<FlowRunRowKey> {
-  private static final FlowRunRowKeyConverter INSTANCE =
-      new FlowRunRowKeyConverter();
-
-  public static FlowRunRowKeyConverter getInstance() {
-    return INSTANCE;
-  }
-
-  private FlowRunRowKeyConverter() {
-  }
-
-  // Flow run row key is of the form
-  // clusterId!userId!flowName!flowrunId with each segment separated by !.
-  // The sizes below indicate sizes of each one of these segments in sequence.
-  // clusterId, userId and flowName are strings. flowrunId is a long hence 8
-  // bytes in size. Strings are variable in size (i.e. end whenever separator is
-  // encountered). This is used while decoding and helps in determining where to
-  // split.
-  private static final int[] SEGMENT_SIZES = {
-      Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
-      Bytes.SIZEOF_LONG };
-
-  /*
-   * (non-Javadoc)
-   *
-   * Encodes FlowRunRowKey object into a byte array with each component/field in
-   * FlowRunRowKey separated by Separator#QUALIFIERS. This leads to an
-   * flow run row key of the form clusterId!userId!flowName!flowrunId
-   * If flowRunId in passed FlowRunRowKey object is null (and the fields
-   * preceding it i.e. clusterId, userId and flowName are not null), this
-   * returns a row key prefix of the form clusterId!userName!flowName!
-   * flowRunId is inverted while encoding as it helps maintain a descending
-   * order for flow keys in flow run table.
-   *
-   * @see
-   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
-   * #encode(java.lang.Object)
-   */
-  @Override
-  public byte[] encode(FlowRunRowKey rowKey) {
-    byte[] first = Separator.QUALIFIERS.join(
-        Separator.encode(rowKey.getClusterId(), Separator.SPACE, Separator.TAB,
-            Separator.QUALIFIERS),
-        Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB,
-            Separator.QUALIFIERS),
-        Separator.encode(rowKey.getFlowName(), Separator.SPACE, Separator.TAB,
-            Separator.QUALIFIERS));
-    if (rowKey.getFlowRunId() == null) {
-      return Separator.QUALIFIERS.join(first, Separator.EMPTY_BYTES);
-    } else {
-      // Note that flowRunId is a long, so we can't encode them all at the same
-      // time.
-      byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(
-          rowKey.getFlowRunId()));
-      return Separator.QUALIFIERS.join(first, second);
-    }
-  }
-
-  /*
-   * (non-Javadoc)
-   *
-   * Decodes an flow run row key of the form
-   * clusterId!userId!flowName!flowrunId represented in byte format and converts
-   * it into an FlowRunRowKey object. flowRunId is inverted while decoding as
-   * it was inverted while encoding.
-   *
-   * @see
-   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
-   * #decode(byte[])
-   */
-  @Override
-  public FlowRunRowKey decode(byte[] rowKey) {
-    byte[][] rowKeyComponents =
-        Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
-    if (rowKeyComponents.length != 4) {
-      throw new IllegalArgumentException("the row key is not valid for " +
-          "a flow run");
-    }
-    String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[0]),
-        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
-    String userId = Separator.decode(Bytes.toString(rowKeyComponents[1]),
-        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
-    String flowName = Separator.decode(Bytes.toString(rowKeyComponents[2]),
-        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
-    Long flowRunId =
-        TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
-    return new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java
new file mode 100644
index 0000000..23ebc66
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
+
+/**
+ * Represents a partial rowkey (without the flowRunId) for the flow run table.
+ */
+public class FlowRunRowKeyPrefix extends FlowRunRowKey implements
+    RowKeyPrefix<FlowRunRowKey> {
+
+  /**
+   * Constructs a row key prefix for the flow run table as follows:
+   * {@code clusterId!userI!flowName!}.
+   *
+   * @param clusterId identifying the cluster
+   * @param userId identifying the user
+   * @param flowName identifying the flow
+   */
+  public FlowRunRowKeyPrefix(String clusterId, String userId,
+      String flowName) {
+    super(clusterId, userId, flowName, null);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.application.
+   * RowKeyPrefix#getRowKeyPrefix()
+   */
+  public byte[] getRowKeyPrefix() {
+    // We know we're a FlowRunRowKey with null florRunId, so we can simply
+    // delegate
+    return super.getRowKey();
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message