hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sj...@apache.org
Subject [28/50] [abbrv] hadoop git commit: YARN-5109. timestamps are stored unencoded causing parse errors (Varun Saxena via sjlee)
Date Sun, 10 Jul 2016 15:51:19 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b8cfa5c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java
new file mode 100644
index 0000000..32ef1c3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Encodes and decodes event column names for application and entity tables.
+ * The event column name is of the form : eventId=timestamp=infokey.
+ * If info is not associated with the event, event column name is of the form :
+ * eventId=timestamp=
+ * Event timestamp is long and rest are strings.
+ * Column prefixes are not part of the eventcolumn name passed for encoding. It
+ * is added later, if required in the associated ColumnPrefix implementations.
+ */
+public final class EventColumnNameConverter
+    implements KeyConverter<EventColumnName> {
+  private static final EventColumnNameConverter INSTANCE =
+      new EventColumnNameConverter();
+
+  public static EventColumnNameConverter getInstance() {
+    return INSTANCE;
+  }
+
+  private EventColumnNameConverter() {
+  }
+
+  // eventId=timestamp=infokey are of types String, Long String
+  // Strings are variable in size (i.e. end whenever separator is encountered).
+  // This is used while decoding and helps in determining where to split.
+  private static final int[] SEGMENT_SIZES = {
+      Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE };
+
+  /*
+   * (non-Javadoc)
+   *
+   * Encodes EventColumnName into a byte array with each component/field in
+   * EventColumnName separated by Separator#VALUES. This leads to an event
+   * column name of the form eventId=timestamp=infokey.
+   * If timestamp in passed EventColumnName object is null (eventId is not null)
+   * this returns a column prefix of the form eventId= and if infokey in
+   * EventColumnName is null (other 2 components are not null), this returns a
+   * column name of the form eventId=timestamp=
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+   * #encode(java.lang.Object)
+   */
+  @Override
+  public byte[] encode(EventColumnName key) {
+    byte[] first = Separator.encode(key.getId(), Separator.SPACE, Separator.TAB,
+        Separator.VALUES);
+    if (key.getTimestamp() == null) {
+      return Separator.VALUES.join(first, Separator.EMPTY_BYTES);
+    }
+    byte[] second = Bytes.toBytes(
+        TimelineStorageUtils.invertLong(key.getTimestamp()));
+    if (key.getInfoKey() == null) {
+      return Separator.VALUES.join(first, second, Separator.EMPTY_BYTES);
+    }
+    return Separator.VALUES.join(first, second, Separator.encode(
+        key.getInfoKey(), Separator.SPACE, Separator.TAB, Separator.VALUES));
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * Decodes an event column name of the form eventId=timestamp= or
+   * eventId=timestamp=infoKey represented in byte format and converts it into
+   * an EventColumnName object.
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+   * #decode(byte[])
+   */
+  @Override
+  public EventColumnName decode(byte[] bytes) {
+    byte[][] components = Separator.VALUES.split(bytes, SEGMENT_SIZES);
+    if (components.length != 3) {
+      throw new IllegalArgumentException("the column name is not valid");
+    }
+    String id = Separator.decode(Bytes.toString(components[0]),
+        Separator.VALUES, Separator.TAB, Separator.SPACE);
+    Long ts = TimelineStorageUtils.invertLong(Bytes.toLong(components[1]));
+    String infoKey = components[2].length == 0 ? null :
+        Separator.decode(Bytes.toString(components[2]),
+            Separator.VALUES, Separator.TAB, Separator.SPACE);
+    return new EventColumnName(id, ts, infoKey);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b8cfa5c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/KeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/KeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/KeyConverter.java
new file mode 100644
index 0000000..4229e81
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/KeyConverter.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+/**
+ * Interface which has to be implemented for encoding and decoding row keys and
+ * columns.
+ */
+public interface KeyConverter<T> {
+  /**
+   * Encodes a key as a byte array.
+   *
+   * @param key key to be encoded.
+   * @return a byte array.
+   */
+  byte[] encode(T key);
+
+  /**
+   * Decodes a byte array and returns a key of type T.
+   *
+   * @param bytes byte representation
+   * @return an object(key) of type T which has been constructed after decoding
+   * the bytes.
+   */
+  T decode(byte[] bytes);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b8cfa5c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java
new file mode 100644
index 0000000..3954145
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+/**
+ * Encodes and decodes column names / row keys which are long.
+ */
+public final class LongKeyConverter implements KeyConverter<Long> {
+  private static final LongKeyConverter INSTANCE = new LongKeyConverter();
+
+  public static LongKeyConverter getInstance() {
+    return INSTANCE;
+  }
+
+  private LongKeyConverter() {
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+   * #encode(java.lang.Object)
+   */
+  @Override
+  public byte[] encode(Long key) {
+    try {
+      // IOException will not be thrown here as we are explicitly passing
+      // Long.
+      return LongConverter.getInstance().encodeValue(key);
+    } catch (IOException e) {
+      return null;
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+   * #decode(byte[])
+   */
+  @Override
+  public Long decode(byte[] bytes) {
+    try {
+      return (Long) LongConverter.getInstance().decodeValue(bytes);
+    } catch (IOException e) {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b8cfa5c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
index a81c717..8a178db 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
@@ -45,7 +45,13 @@ public enum Separator {
    * getting a + for a space, which may already occur in strings, so we don't
    * want that.
    */
-  SPACE(" ", "%2$");
+  SPACE(" ", "%2$"),
+
+  /**
+   * separator in values, often used to avoid having these in qualifiers and
+   * names.
+   */
+  TAB("\t", "%3$");
 
   /**
    * The string value of this separator.
@@ -67,7 +73,22 @@ public enum Separator {
    */
   private final String quotedValue;
 
-  private static final byte[] EMPTY_BYTES = new byte[0];
+  /**
+   * Indicator for variable size of an individual segment in a split. The
+   * segment ends wherever separator is encountered.
+   * Typically used for string.
+   * Also used to indicate that there is no fixed number of splits which need to
+   * be returned. If split limit is specified as this, all possible splits are
+   * returned.
+   */
+  public static final int VARIABLE_SIZE = 0;
+
+
+  /** empty string. */
+  public static final String EMPTY_STRING = "";
+
+  /** empty bytes. */
+  public static final byte[] EMPTY_BYTES = new byte[0];
 
   /**
    * @param value of the separator to use. Cannot be null or empty string.
@@ -222,7 +243,6 @@ public enum Separator {
         System.arraycopy(this.bytes, 0, buf, offset, this.value.length());
         offset += this.value.length();
       }
-
     }
     return buf;
   }
@@ -307,7 +327,25 @@ public enum Separator {
    * @return source split by this separator.
    */
   public byte[][] split(byte[] source, int limit) {
-    return TimelineStorageUtils.split(source, this.bytes, limit);
+    return split(source, this.bytes, limit);
+  }
+
+  /**
+   * Splits the source array into multiple array segments using this separator.
+   * The sizes indicate the sizes of the relative components/segments.
+   * In case one of the segments contains this separator before the specified
+   * size is reached, the separator will be considered part of that segment and
+   * we will continue till size is reached.
+   * Variable length strings cannot contain this separator and are indiced with
+   * a size of {@value #VARIABLE_SIZE}. Such strings are encoded for this
+   * separator and decoded after the results from split is returned.
+   *
+   * @param source byte array to be split.
+   * @param sizes sizes of relative components/segments.
+   * @return source split by this separator as per the sizes specified..
+   */
+  public byte[][] split(byte[] source, int[] sizes) {
+    return split(source, this.bytes, sizes);
   }
 
   /**
@@ -315,10 +353,158 @@ public enum Separator {
    * as many times as splits are found. This will naturally produce copied byte
    * arrays for each of the split segments.
    *
-   * @param source to be split
+   * @param source byte array to be split
    * @return source split by this separator.
    */
   public byte[][] split(byte[] source) {
-    return TimelineStorageUtils.split(source, this.bytes);
+    return split(source, this.bytes);
+  }
+
+  /**
+   * Returns a list of ranges identifying [start, end) -- closed, open --
+   * positions within the source byte array that would be split using the
+   * separator byte array.
+   * The sizes indicate the sizes of the relative components/segments.
+   * In case one of the segments contains this separator before the specified
+   * size is reached, the separator will be considered part of that segment and
+   * we will continue till size is reached.
+   * Variable length strings cannot contain this separator and are indiced with
+   * a size of {@value #VARIABLE_SIZE}. Such strings are encoded for this
+   * separator and decoded after the results from split is returned.
+   *
+   * @param source the source data
+   * @param separator the separator pattern to look for
+   * @param sizes indicate the sizes of the relative components/segments.
+   * @return a list of ranges.
+   */
+  private static List<Range> splitRanges(byte[] source, byte[] separator,
+      int[] sizes) {
+    List<Range> segments = new ArrayList<Range>();
+    if (source == null || separator == null) {
+      return segments;
+    }
+    // VARIABLE_SIZE here indicates that there is no limit to number of segments
+    // to return.
+    int limit = VARIABLE_SIZE;
+    if (sizes != null && sizes.length > 0) {
+      limit = sizes.length;
+    }
+    int start = 0;
+    int currentSegment = 0;
+    itersource: for (int i = 0; i < source.length; i++) {
+      for (int j = 0; j < separator.length; j++) {
+        if (source[i + j] != separator[j]) {
+          continue itersource;
+        }
+      }
+      // all separator elements matched
+      if (limit > VARIABLE_SIZE) {
+        if (segments.size() >= (limit - 1)) {
+          // everything else goes in one final segment
+          break;
+        }
+        if (sizes != null) {
+          int currentSegExpectedSize = sizes[currentSegment];
+          if (currentSegExpectedSize > VARIABLE_SIZE) {
+            int currentSegSize = i - start;
+            if (currentSegSize < currentSegExpectedSize) {
+              // Segment not yet complete. More bytes to parse.
+              continue itersource;
+            } else if (currentSegSize > currentSegExpectedSize) {
+              // Segment is not as per size.
+              throw new IllegalArgumentException(
+                  "Segments not separated as per expected sizes");
+            }
+          }
+        }
+      }
+      segments.add(new Range(start, i));
+      start = i + separator.length;
+      // i will be incremented again in outer for loop
+      i += separator.length - 1;
+      currentSegment++;
+    }
+    // add in remaining to a final range
+    if (start <= source.length) {
+      if (sizes != null) {
+        // Check if final segment is as per size specified.
+        if (sizes[currentSegment] > VARIABLE_SIZE &&
+            source.length - start > sizes[currentSegment]) {
+          // Segment is not as per size.
+          throw new IllegalArgumentException(
+              "Segments not separated as per expected sizes");
+        }
+      }
+      segments.add(new Range(start, source.length));
+    }
+    return segments;
+  }
+
+  /**
+   * Splits based on segments calculated based on limit/sizes specified for the
+   * separator.
+   *
+   * @param source byte array to be split.
+   * @param segments specifies the range for each segment.
+   * @return a byte[][] split as per the segment ranges.
+   */
+  private static byte[][] split(byte[] source, List<Range> segments) {
+    byte[][] splits = new byte[segments.size()][];
+    for (int i = 0; i < segments.size(); i++) {
+      Range r = segments.get(i);
+      byte[] tmp = new byte[r.length()];
+      if (tmp.length > 0) {
+        System.arraycopy(source, r.start(), tmp, 0, r.length());
+      }
+      splits[i] = tmp;
+    }
+    return splits;
+  }
+
+  /**
+   * Splits the source array into multiple array segments using the given
+   * separator based on the sizes. This will naturally produce copied byte
+   * arrays for each of the split segments.
+   *
+   * @param source source array.
+   * @param separator separator represented as a byte array.
+   * @param sizes sizes of relative components/segments.
+   * @return byte[][] after splitting the source.
+   */
+  private static byte[][] split(byte[] source, byte[] separator, int[] sizes) {
+    List<Range> segments = splitRanges(source, separator, sizes);
+    return split(source, segments);
+  }
+
+  /**
+   * Splits the source array into multiple array segments using the given
+   * separator. This will naturally produce copied byte arrays for each of the
+   * split segments.
+   *
+   * @param source Source array.
+   * @param separator Separator represented as a byte array.
+   * @return byte[][] after splitting the source.
+   */
+  private static byte[][] split(byte[] source, byte[] separator) {
+    return split(source, separator, (int[]) null);
+  }
+
+  /**
+   * Splits the source array into multiple array segments using the given
+   * separator, up to a maximum of count items. This will naturally produce
+   * copied byte arrays for each of the split segments.
+   *
+   * @param source Source array.
+   * @param separator Separator represented as a byte array.
+   * @param limit a non-positive value indicates no limit on number of segments.
+   * @return byte[][] after splitting the input source.
+   */
+  private static byte[][] split(byte[] source, byte[] separator, int limit) {
+    int[] sizes = null;
+    if (limit > VARIABLE_SIZE) {
+      sizes = new int[limit];
+    }
+    List<Range> segments = splitRanges(source, separator, sizes);
+    return split(source, segments);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b8cfa5c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java
new file mode 100644
index 0000000..b0f6d55
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+/**
+ * Encodes and decodes column names / row keys which are merely strings.
+ * Column prefixes are not part of the column name passed for encoding. It is
+ * added later, if required in the associated ColumnPrefix implementations.
+ */
+public final class StringKeyConverter implements KeyConverter<String> {
+  private static final StringKeyConverter INSTANCE = new StringKeyConverter();
+
+  public static StringKeyConverter getInstance() {
+    return INSTANCE;
+  }
+
+  private StringKeyConverter() {
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+   * #encode(java.lang.Object)
+   */
+  @Override
+  public byte[] encode(String key) {
+    return Separator.encode(key, Separator.SPACE, Separator.TAB);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+   * #decode(byte[])
+   */
+  @Override
+  public String decode(byte[] bytes) {
+    return Separator.decode(bytes, Separator.TAB, Separator.SPACE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b8cfa5c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
index 18f975a..d52a5d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.yarn.server.timelineservice.storage.common;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -40,7 +39,6 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
@@ -48,18 +46,17 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter;
 import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
 import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter;
 import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilter;
 import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilter.TimelineFilterType;
 import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
 import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 
 /**
  * A bunch of utility functions used across TimelineReader and TimelineWriter.
@@ -72,109 +69,10 @@ public final class TimelineStorageUtils {
 
   private static final Log LOG = LogFactory.getLog(TimelineStorageUtils.class);
 
-  /** empty bytes. */
-  public static final byte[] EMPTY_BYTES = new byte[0];
-
-  /** indicator for no limits for splitting. */
-  public static final int NO_LIMIT_SPLIT = -1;
-
   /** milliseconds in one day. */
   public static final long MILLIS_ONE_DAY = 86400000L;
 
   /**
-   * Splits the source array into multiple array segments using the given
-   * separator, up to a maximum of count items. This will naturally produce
-   * copied byte arrays for each of the split segments. To identify the split
-   * ranges without the array copies, see {@link #splitRanges(byte[], byte[])}.
-   *
-   * @param source Source array.
-   * @param separator Separator represented as a byte array.
-   * @return byte[][] after splitting the source
-   */
-  public static byte[][] split(byte[] source, byte[] separator) {
-    return split(source, separator, NO_LIMIT_SPLIT);
-  }
-
-  /**
-   * Splits the source array into multiple array segments using the given
-   * separator, up to a maximum of count items. This will naturally produce
-   * copied byte arrays for each of the split segments. To identify the split
-   * ranges without the array copies, see {@link #splitRanges(byte[], byte[])}.
-   *
-   * @param source Source array.
-   * @param separator Separator represented as a byte array.
-   * @param limit a non-positive value indicates no limit on number of segments.
-   * @return byte[][] after splitting the input source.
-   */
-  public static byte[][] split(byte[] source, byte[] separator, int limit) {
-    List<Range> segments = splitRanges(source, separator, limit);
-
-    byte[][] splits = new byte[segments.size()][];
-    for (int i = 0; i < segments.size(); i++) {
-      Range r = segments.get(i);
-      byte[] tmp = new byte[r.length()];
-      if (tmp.length > 0) {
-        System.arraycopy(source, r.start(), tmp, 0, r.length());
-      }
-      splits[i] = tmp;
-    }
-    return splits;
-  }
-
-  /**
-   * Returns a list of ranges identifying [start, end) -- closed, open --
-   * positions within the source byte array that would be split using the
-   * separator byte array.
-   *
-   * @param source Source array.
-   * @param separator Separator represented as a byte array.
-   * @return a list of ranges.
-   */
-  public static List<Range> splitRanges(byte[] source, byte[] separator) {
-    return splitRanges(source, separator, NO_LIMIT_SPLIT);
-  }
-
-  /**
-   * Returns a list of ranges identifying [start, end) -- closed, open --
-   * positions within the source byte array that would be split using the
-   * separator byte array.
-   *
-   * @param source the source data
-   * @param separator the separator pattern to look for
-   * @param limit the maximum number of splits to identify in the source
-   * @return a list of ranges.
-   */
-  public static List<Range> splitRanges(byte[] source, byte[] separator,
-      int limit) {
-    List<Range> segments = new ArrayList<Range>();
-    if ((source == null) || (separator == null)) {
-      return segments;
-    }
-    int start = 0;
-    itersource: for (int i = 0; i < source.length; i++) {
-      for (int j = 0; j < separator.length; j++) {
-        if (source[i + j] != separator[j]) {
-          continue itersource;
-        }
-      }
-      // all separator elements matched
-      if (limit > 0 && segments.size() >= (limit - 1)) {
-        // everything else goes in one final segment
-        break;
-      }
-      segments.add(new Range(start, i));
-      start = i + separator.length;
-      // i will be incremented again in outer for loop
-      i += separator.length - 1;
-    }
-    // add in remaining to a final range
-    if (start <= source.length) {
-      segments.add(new Range(start, source.length));
-    }
-    return segments;
-  }
-
-  /**
    * Converts a timestamp into it's inverse timestamp to be used in (row) keys
    * where we want to have the most recent timestamp in the top of the table
    * (scans start at the most recent timestamp first).
@@ -200,53 +98,6 @@ public final class TimelineStorageUtils {
     return Integer.MAX_VALUE - key;
   }
 
-
-  /**
-   * Converts/encodes a string app Id into a byte representation for (row) keys.
-   * For conversion, we extract cluster timestamp and sequence id from the
-   * string app id (calls {@link ConverterUtils#toApplicationId(String)} for
-   * conversion) and then store it in a byte array of length 12 (8 bytes (long)
-   * for cluster timestamp followed 4 bytes(int) for sequence id). Both cluster
-   * timestamp and sequence id are inverted so that the most recent cluster
-   * timestamp and highest sequence id appears first in the table (i.e.
-   * application id appears in a descending order).
-   *
-   * @param appIdStr application id in string format i.e.
-   * application_{cluster timestamp}_{sequence id with min 4 digits}
-   *
-   * @return encoded byte representation of app id.
-   */
-  public static byte[] encodeAppId(String appIdStr) {
-    ApplicationId appId = ConverterUtils.toApplicationId(appIdStr);
-    byte[] appIdBytes = new byte[Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT];
-    byte[] clusterTs = Bytes.toBytes(invertLong(appId.getClusterTimestamp()));
-    System.arraycopy(clusterTs, 0, appIdBytes, 0, Bytes.SIZEOF_LONG);
-    byte[] seqId = Bytes.toBytes(invertInt(appId.getId()));
-    System.arraycopy(seqId, 0, appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT);
-    return appIdBytes;
-  }
-
-  /**
-   * Converts/decodes a 12 byte representation of app id for (row) keys to an
-   * app id in string format which can be returned back to client.
-   * For decoding, 12 bytes are interpreted as 8 bytes of inverted cluster
-   * timestamp(long) followed by 4 bytes of inverted sequence id(int). Calls
-   * {@link ApplicationId#toString} to generate string representation of app id.
-   *
-   * @param appIdBytes application id in byte representation.
-   *
-   * @return decoded app id in string format.
-   */
-  public static String decodeAppId(byte[] appIdBytes) {
-    if (appIdBytes.length != (Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT)) {
-      throw new IllegalArgumentException("Invalid app id in byte format");
-    }
-    long clusterTs = invertLong(Bytes.toLong(appIdBytes, 0, Bytes.SIZEOF_LONG));
-    int seqId =
-        invertInt(Bytes.toInt(appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT));
-    return ApplicationId.newInstance(clusterTs, seqId).toString();
-  }
-
   /**
    * returns the timestamp of that day's start (which is midnight 00:00:00 AM)
    * for a given input timestamp.
@@ -810,7 +661,8 @@ public final class TimelineStorageUtils {
       TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
       boolean isRelatedTo) throws IOException {
     // isRelatedTo and relatesTo are of type Map<String, Set<String>>
-    Map<String, Object> columns = prefix.readResults(result);
+    Map<String, Object> columns =
+        prefix.readResults(result, StringKeyConverter.getInstance());
     for (Map.Entry<String, Object> column : columns.entrySet()) {
       for (String id : Separator.VALUES.splitEncoded(
           column.getValue().toString())) {
@@ -837,7 +689,8 @@ public final class TimelineStorageUtils {
       TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
       boolean isConfig) throws IOException {
     // info and configuration are of type Map<String, Object or String>
-    Map<String, Object> columns = prefix.readResults(result);
+    Map<String, Object> columns =
+        prefix.readResults(result, StringKeyConverter.getInstance());
     if (isConfig) {
       for (Map.Entry<String, Object> column : columns.entrySet()) {
         entity.addConfig(column.getKey(), column.getValue().toString());
@@ -861,30 +714,24 @@ public final class TimelineStorageUtils {
   public static <T> void readEvents(TimelineEntity entity, Result result,
       ColumnPrefix<T> prefix) throws IOException {
     Map<String, TimelineEvent> eventsMap = new HashMap<>();
-    Map<?, Object> eventsResult =
-        prefix.readResultsHavingCompoundColumnQualifiers(result);
-    for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) {
-      byte[][] karr = (byte[][])eventResult.getKey();
-      // the column name is of the form "eventId=timestamp=infoKey"
-      if (karr.length == 3) {
-        String id = Bytes.toString(karr[0]);
-        long ts = TimelineStorageUtils.invertLong(Bytes.toLong(karr[1]));
-        String key = Separator.VALUES.joinEncoded(id, Long.toString(ts));
-        TimelineEvent event = eventsMap.get(key);
-        if (event == null) {
-          event = new TimelineEvent();
-          event.setId(id);
-          event.setTimestamp(ts);
-          eventsMap.put(key, event);
-        }
-        // handle empty info
-        String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]);
-        if (infoKey != null) {
-          event.addInfo(infoKey, eventResult.getValue());
-        }
-      } else {
-        LOG.warn("incorrectly formatted column name: it will be discarded");
-        continue;
+    Map<EventColumnName, Object> eventsResult =
+        prefix.readResults(result, EventColumnNameConverter.getInstance());
+    for (Map.Entry<EventColumnName, Object>
+             eventResult : eventsResult.entrySet()) {
+      EventColumnName eventColumnName = eventResult.getKey();
+      String key = eventColumnName.getId() +
+          Long.toString(eventColumnName.getTimestamp());
+      // Retrieve previously seen event to add to it
+      TimelineEvent event = eventsMap.get(key);
+      if (event == null) {
+        // First time we're seeing this event, add it to the eventsMap
+        event = new TimelineEvent();
+        event.setId(eventColumnName.getId());
+        event.setTimestamp(eventColumnName.getTimestamp());
+        eventsMap.put(key, event);
+      }
+      if (eventColumnName.getInfoKey() != null) {
+        event.addInfo(eventColumnName.getInfoKey(), eventResult.getValue());
       }
     }
     Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b8cfa5c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
index de2b29d..02a4bb3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
@@ -27,9 +27,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
 
@@ -78,7 +79,6 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
    */
   private final String columnPrefix;
   private final byte[] columnPrefixBytes;
-  private final boolean compoundColQual;
 
   /**
    * Private constructor, meant to be used by the enum definition.
@@ -122,7 +122,6 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
       this.columnPrefixBytes =
           Bytes.toBytes(Separator.SPACE.encode(columnPrefix));
     }
-    this.compoundColQual = compondColQual;
   }
 
   /**
@@ -154,14 +153,6 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
     return column.getValueConverter();
   }
 
-  public byte[] getCompoundColQualBytes(String qualifier,
-      byte[]...components) {
-    if (!compoundColQual) {
-      return ColumnHelper.getColumnQualifier(null, qualifier);
-    }
-    return ColumnHelper.getCompoundColumnQualifierBytes(qualifier, components);
-  }
-
   /*
    * (non-Javadoc)
    *
@@ -233,26 +224,12 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
    *
    * @see
    * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
-   * #readResults(org.apache.hadoop.hbase.client.Result)
+   * #readResults(org.apache.hadoop.hbase.client.Result,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
    */
-  public Map<String, Object> readResults(Result result) throws IOException {
-    return column.readResults(result, columnPrefixBytes);
-  }
-
-  /**
-   * @param result from which to read columns
-   * @return the latest values of columns in the column family. The column
-   *         qualifier is returned as a list of parts, each part a byte[]. This
-   *         is to facilitate returning byte arrays of values that were not
-   *         Strings. If they can be treated as Strings, you should use
-   *         {@link #readResults(Result)} instead.
-   * @throws IOException if there is any exception encountered while reading
-   *     result.
-   */
-  public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result)
-          throws IOException {
-    return column.readResultsHavingCompoundColumnQualifiers(result,
-        columnPrefixBytes);
+  public <K> Map<K, Object> readResults(Result result,
+      KeyConverter<K> keyConverter) throws IOException {
+    return column.readResults(result, columnPrefixBytes, keyConverter);
   }
 
   /*
@@ -260,11 +237,14 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
    *
    * @see
    * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
-   * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result)
+   * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
    */
-  public <V> NavigableMap<String, NavigableMap<Long, V>>
-      readResultsWithTimestamps(Result result) throws IOException {
-    return column.readResultsWithTimestamps(result, columnPrefixBytes);
+  public <K, V> NavigableMap<K, NavigableMap<Long, V>>
+      readResultsWithTimestamps(Result result, KeyConverter<K> keyConverter)
+      throws IOException {
+    return column.readResultsWithTimestamps(result, columnPrefixBytes,
+        keyConverter);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b8cfa5c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
index 04c633c..6d08390 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
@@ -17,10 +17,6 @@
  */
 package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
 
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-
 /**
  * Represents a rowkey for the entity table.
  */
@@ -28,13 +24,13 @@ public class EntityRowKey {
   private final String clusterId;
   private final String userId;
   private final String flowName;
-  private final long flowRunId;
+  private final Long flowRunId;
   private final String appId;
   private final String entityType;
   private final String entityId;
 
   public EntityRowKey(String clusterId, String userId, String flowName,
-      long flowRunId, String appId, String entityType, String entityId) {
+      Long flowRunId, String appId, String entityType, String entityId) {
     this.clusterId = clusterId;
     this.userId = userId;
     this.flowName = flowName;
@@ -56,7 +52,7 @@ public class EntityRowKey {
     return flowName;
   }
 
-  public long getFlowRunId() {
+  public Long getFlowRunId() {
     return flowRunId;
   }
 
@@ -85,14 +81,8 @@ public class EntityRowKey {
    */
   public static byte[] getRowKeyPrefix(String clusterId, String userId,
       String flowName, Long flowRunId, String appId) {
-    byte[] first =
-        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
-            flowName));
-    // Note that flowRunId is a long, so we can't encode them all at the same
-    // time.
-    byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
-    byte[] third = TimelineStorageUtils.encodeAppId(appId);
-    return Separator.QUALIFIERS.join(first, second, third, new byte[0]);
+    return EntityRowKeyConverter.getInstance().encode(new EntityRowKey(
+        clusterId, userId, flowName, flowRunId, appId, null, null));
   }
 
   /**
@@ -111,16 +101,8 @@ public class EntityRowKey {
    */
   public static byte[] getRowKeyPrefix(String clusterId, String userId,
       String flowName, Long flowRunId, String appId, String entityType) {
-    byte[] first =
-        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
-            flowName));
-    // Note that flowRunId is a long, so we can't encode them all at the same
-    // time.
-    byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
-    byte[] third = TimelineStorageUtils.encodeAppId(appId);
-    byte[] fourth =
-        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(entityType, ""));
-    return Separator.QUALIFIERS.join(first, second, third, fourth);
+    return EntityRowKeyConverter.getInstance().encode(new EntityRowKey(
+        clusterId, userId, flowName, flowRunId, appId, entityType, null));
   }
 
   /**
@@ -140,16 +122,8 @@ public class EntityRowKey {
   public static byte[] getRowKey(String clusterId, String userId,
       String flowName, Long flowRunId, String appId, String entityType,
       String entityId) {
-    byte[] first =
-        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
-            flowName));
-    // Note that flowRunId is a long, so we can't encode them all at the same
-    // time.
-    byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
-    byte[] third = TimelineStorageUtils.encodeAppId(appId);
-    byte[] fourth =
-        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(entityType, entityId));
-    return Separator.QUALIFIERS.join(first, second, third, fourth);
+    return EntityRowKeyConverter.getInstance().encode(new EntityRowKey(
+        clusterId, userId, flowName, flowRunId, appId, entityType, entityId));
   }
 
   /**
@@ -159,27 +133,6 @@ public class EntityRowKey {
    * @return An <cite>EntityRowKey</cite> object.
    */
   public static EntityRowKey parseRowKey(byte[] rowKey) {
-    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
-
-    if (rowKeyComponents.length < 7) {
-      throw new IllegalArgumentException("the row key is not valid for " +
-          "an entity");
-    }
-
-    String userId =
-        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0]));
-    String clusterId =
-        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1]));
-    String flowName =
-        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
-    long flowRunId =
-        TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
-    String appId = TimelineStorageUtils.decodeAppId(rowKeyComponents[4]);
-    String entityType =
-        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[5]));
-    String entityId =
-        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[6]));
-    return new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
-        entityType, entityId);
+    return EntityRowKeyConverter.getInstance().decode(rowKey);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b8cfa5c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java
new file mode 100644
index 0000000..43c0569
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+
+/**
+ * Encodes and decodes row key for entity table.
+ * The row key is of the form :
+ * userName!clusterId!flowName!flowRunId!appId!entityType!entityId.
+ * flowRunId is a long, appId is encoded/decoded using
+ * {@link AppIdKeyConverter} and rest are strings.
+ */
+public final class EntityRowKeyConverter implements KeyConverter<EntityRowKey> {
+  private static final EntityRowKeyConverter INSTANCE =
+      new EntityRowKeyConverter();
+
+  public static EntityRowKeyConverter getInstance() {
+    return INSTANCE;
+  }
+
+  private EntityRowKeyConverter() {
+  }
+
+  // Entity row key is of the form
+  // userName!clusterId!flowName!flowRunId!appId!entityType!entityId with each
+  // segment separated by !. The sizes below indicate sizes of each one of these
+  // segements in sequence. clusterId, userName, flowName, entityType and
+  // entityId are strings. flowrunId is a long hence 8 bytes in size. app id is
+  // represented as 12 bytes with cluster timestamp part of appid being 8 bytes
+  // (long) and seq id being 4 bytes(int).
+  // Strings are variable in size (i.e. end whenever separator is encountered).
+  // This is used while decoding and helps in determining where to split.
+  private static final int[] SEGMENT_SIZES = {
+      Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+      Bytes.SIZEOF_LONG, AppIdKeyConverter.getKeySize(),
+      Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE };
+
+  /*
+   * (non-Javadoc)
+   *
+   * Encodes EntityRowKey object into a byte array with each component/field in
+   * EntityRowKey separated by Separator#QUALIFIERS. This leads to an entity
+   * table row key of the form
+   * userName!clusterId!flowName!flowRunId!appId!entityType!entityId
+   * If entityType in passed EntityRowKey object is null (and the fields
+   * preceding it i.e. clusterId, userId and flowName, flowRunId and appId are
+   * not null), this returns a row key prefix of the form
+   * userName!clusterId!flowName!flowRunId!appId! and if entityId in
+   * EntityRowKey is null (other 6 components are not null), this returns a row
+   * key prefix of the form
+   * userName!clusterId!flowName!flowRunId!appId!entityType!
+   * flowRunId is inverted while encoding as it helps maintain a descending
+   * order for row keys in entity table.
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+   * #encode(java.lang.Object)
+   */
+  @Override
+  public byte[] encode(EntityRowKey rowKey) {
+    byte[] user = Separator.encode(rowKey.getUserId(),
+        Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
+    byte[] cluster = Separator.encode(rowKey.getClusterId(),
+        Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
+    byte[] flow = Separator.encode(rowKey.getFlowName(),
+        Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
+    byte[] first = Separator.QUALIFIERS.join(user, cluster, flow);
+    // Note that flowRunId is a long, so we can't encode them all at the same
+    // time.
+    byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(
+        rowKey.getFlowRunId()));
+    byte[] third = AppIdKeyConverter.getInstance().encode(rowKey.getAppId());
+    if (rowKey.getEntityType() == null) {
+      return Separator.QUALIFIERS.join(
+          first, second, third, Separator.EMPTY_BYTES);
+    }
+    byte[] entityType = Separator.encode(rowKey.getEntityType(),
+        Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
+    byte[] entityId = rowKey.getEntityId() == null ? Separator.EMPTY_BYTES :
+        Separator.encode(rowKey.getEntityId(), Separator.SPACE, Separator.TAB,
+        Separator.QUALIFIERS);
+    byte[] fourth = Separator.QUALIFIERS.join(entityType, entityId);
+    return Separator.QUALIFIERS.join(first, second, third, fourth);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * Decodes an application row key of the form
+   * userName!clusterId!flowName!flowRunId!appId!entityType!entityId represented
+   * in byte format and converts it into an EntityRowKey object. flowRunId is
+   * inverted while decoding as it was inverted while encoding.
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+   * #decode(byte[])
+   */
+  @Override
+  public EntityRowKey decode(byte[] rowKey) {
+    byte[][] rowKeyComponents =
+        Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
+    if (rowKeyComponents.length != 7) {
+      throw new IllegalArgumentException("the row key is not valid for " +
+          "an entity");
+    }
+    String userId = Separator.decode(Bytes.toString(rowKeyComponents[0]),
+        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+    String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[1]),
+        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+    String flowName = Separator.decode(Bytes.toString(rowKeyComponents[2]),
+        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+    Long flowRunId =
+        TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
+    String appId = AppIdKeyConverter.getInstance().decode(rowKeyComponents[4]);
+    String entityType = Separator.decode(Bytes.toString(rowKeyComponents[5]),
+        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+    String entityId =Separator.decode(Bytes.toString(rowKeyComponents[6]),
+        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+    return new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
+        entityType, entityId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b8cfa5c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
index 188c2fe..71c3d90 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
@@ -51,7 +52,6 @@ public enum FlowActivityColumnPrefix
    */
   private final String columnPrefix;
   private final byte[] columnPrefixBytes;
-  private final boolean compoundColQual;
 
   private final AggregationOperation aggOp;
 
@@ -83,7 +83,6 @@ public enum FlowActivityColumnPrefix
           .encode(columnPrefix));
     }
     this.aggOp = aggOp;
-    this.compoundColQual = compoundColQual;
   }
 
   /**
@@ -169,10 +168,12 @@ public enum FlowActivityColumnPrefix
    *
    * @see
    * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
-   * #readResults(org.apache.hadoop.hbase.client.Result)
+   * #readResults(org.apache.hadoop.hbase.client.Result,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
    */
-  public Map<String, Object> readResults(Result result) throws IOException {
-    return column.readResults(result, columnPrefixBytes);
+  public <K> Map<K, Object> readResults(Result result,
+      KeyConverter<K> keyConverter) throws IOException {
+    return column.readResults(result, columnPrefixBytes, keyConverter);
   }
 
   /*
@@ -180,11 +181,14 @@ public enum FlowActivityColumnPrefix
    *
    * @see
    * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
-   * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result)
+   * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
    */
-  public <T> NavigableMap<String, NavigableMap<Long, T>>
-      readResultsWithTimestamps(Result result) throws IOException {
-    return column.readResultsWithTimestamps(result, columnPrefixBytes);
+  public <K, V> NavigableMap<K, NavigableMap<Long, V>>
+      readResultsWithTimestamps(Result result, KeyConverter<K> keyConverter)
+      throws IOException {
+    return column.readResultsWithTimestamps(result, columnPrefixBytes,
+        keyConverter);
   }
 
   /**
@@ -270,20 +274,4 @@ public enum FlowActivityColumnPrefix
     column.store(rowKey, tableMutator, columnQualifier, null, inputValue,
         combinedAttributes);
   }
-
-  @Override
-  public byte[] getCompoundColQualBytes(String qualifier,
-      byte[]...components) {
-    if (!compoundColQual) {
-      return ColumnHelper.getColumnQualifier(null, qualifier);
-    }
-    return ColumnHelper.getCompoundColumnQualifierBytes(qualifier, components);
-  }
-
-  @Override
-  public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result)
-      throws IOException {
-    // There are no compound column qualifiers for flow activity table.
-    return null;
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b8cfa5c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
index 2726ae2..eea38a5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
 
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 
 /**
@@ -27,11 +25,11 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStor
 public class FlowActivityRowKey {
 
   private final String clusterId;
-  private final long dayTs;
+  private final Long dayTs;
   private final String userId;
   private final String flowName;
 
-  public FlowActivityRowKey(String clusterId, long dayTs, String userId,
+  public FlowActivityRowKey(String clusterId, Long dayTs, String userId,
       String flowName) {
     this.clusterId = clusterId;
     this.dayTs = dayTs;
@@ -43,7 +41,7 @@ public class FlowActivityRowKey {
     return clusterId;
   }
 
-  public long getDayTimestamp() {
+  public Long getDayTimestamp() {
     return dayTs;
   }
 
@@ -63,7 +61,8 @@ public class FlowActivityRowKey {
    * @return byte array with the row key prefix
    */
   public static byte[] getRowKeyPrefix(String clusterId) {
-    return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, ""));
+    return FlowActivityRowKeyConverter.getInstance().encode(
+        new FlowActivityRowKey(clusterId, null, null, null));
   }
 
   /**
@@ -75,9 +74,8 @@ public class FlowActivityRowKey {
    * @return byte array with the row key prefix
    */
   public static byte[] getRowKeyPrefix(String clusterId, long dayTs) {
-    return Separator.QUALIFIERS.join(
-        Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)),
-        Bytes.toBytes(TimelineStorageUtils.invertLong(dayTs)), new byte[0]);
+    return FlowActivityRowKeyConverter.getInstance().encode(
+        new FlowActivityRowKey(clusterId, dayTs, null, null));
   }
 
   /**
@@ -94,12 +92,8 @@ public class FlowActivityRowKey {
       String flowName) {
     // convert it to Day's time stamp
     eventTs = TimelineStorageUtils.getTopOfTheDayTimestamp(eventTs);
-
-    return Separator.QUALIFIERS.join(
-        Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)),
-        Bytes.toBytes(TimelineStorageUtils.invertLong(eventTs)),
-        Bytes.toBytes(Separator.QUALIFIERS.encode(userId)),
-        Bytes.toBytes(Separator.QUALIFIERS.encode(flowName)));
+    return FlowActivityRowKeyConverter.getInstance().encode(
+        new FlowActivityRowKey(clusterId, eventTs, userId, flowName));
   }
 
   /**
@@ -109,21 +103,6 @@ public class FlowActivityRowKey {
    * @return A <cite>FlowActivityRowKey</cite> object.
    */
   public static FlowActivityRowKey parseRowKey(byte[] rowKey) {
-    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
-
-    if (rowKeyComponents.length < 4) {
-      throw new IllegalArgumentException("the row key is not valid for "
-          + "a flow activity");
-    }
-
-    String clusterId = Separator.QUALIFIERS.decode(Bytes
-        .toString(rowKeyComponents[0]));
-    long dayTs =
-        TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[1]));
-    String userId = Separator.QUALIFIERS.decode(Bytes
-        .toString(rowKeyComponents[2]));
-    String flowName = Separator.QUALIFIERS.decode(Bytes
-        .toString(rowKeyComponents[3]));
-    return new FlowActivityRowKey(clusterId, dayTs, userId, flowName);
+    return FlowActivityRowKeyConverter.getInstance().decode(rowKey);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b8cfa5c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyConverter.java
new file mode 100644
index 0000000..9dc4c98
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyConverter.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+
+/**
+ * Encodes and decodes row key for flow activity table.
+ * The row key is of the form : clusterId!dayTimestamp!user!flowName.
+ * dayTimestamp(top of the day timestamp) is a long and rest are strings.
+ */
+public final class FlowActivityRowKeyConverter implements
+    KeyConverter<FlowActivityRowKey> {
+  private static final FlowActivityRowKeyConverter INSTANCE =
+      new FlowActivityRowKeyConverter();
+
+  public static FlowActivityRowKeyConverter getInstance() {
+    return INSTANCE;
+  }
+
+  private FlowActivityRowKeyConverter() {
+  }
+
+  // Flow activity row key is of the form clusterId!dayTimestamp!user!flowName
+  // with each segment separated by !. The sizes below indicate sizes of each
+  // one of these segements in sequence. clusterId, user and flowName are
+  // strings. Top of the day timestamp is a long hence 8 bytes in size.
+  // Strings are variable in size (i.e. end whenever separator is encountered).
+  // This is used while decoding and helps in determining where to split.
+  private static final int[] SEGMENT_SIZES = {
+      Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE,
+      Separator.VARIABLE_SIZE };
+
+  /*
+   * (non-Javadoc)
+   *
+   * Encodes FlowActivityRowKey object into a byte array with each
+   * component/field in FlowActivityRowKey separated by Separator#QUALIFIERS.
+   * This leads to an flow activity table row key of the form
+   * clusterId!dayTimestamp!user!flowName
+   * If dayTimestamp in passed FlowActivityRowKey object is null and clusterId
+   * is not null, this returns a row key prefix as clusterId! and if userId in
+   * FlowActivityRowKey is null (and the fields preceding it i.e. clusterId and
+   * dayTimestamp are not null), this returns a row key prefix as
+   * clusterId!dayTimeStamp!
+   * dayTimestamp is inverted while encoding as it helps maintain a descending
+   * order for row keys in flow activity table.
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+   * #encode(java.lang.Object)
+   */
+
+  @Override
+  public byte[] encode(FlowActivityRowKey rowKey) {
+    if (rowKey.getDayTimestamp() == null) {
+      return Separator.QUALIFIERS.join(Separator.encode(rowKey.getClusterId(),
+          Separator.SPACE, Separator.TAB, Separator.QUALIFIERS),
+              Separator.EMPTY_BYTES);
+    }
+    if (rowKey.getUserId() == null) {
+      return Separator.QUALIFIERS.join(Separator.encode(rowKey.getClusterId(),
+          Separator.SPACE, Separator.TAB, Separator.QUALIFIERS),
+          Bytes.toBytes(TimelineStorageUtils.invertLong(
+              rowKey.getDayTimestamp())), Separator.EMPTY_BYTES);
+    }
+    return Separator.QUALIFIERS.join(
+        Separator.encode(rowKey.getClusterId(), Separator.SPACE, Separator.TAB,
+            Separator.QUALIFIERS),
+        Bytes.toBytes(
+            TimelineStorageUtils.invertLong(rowKey.getDayTimestamp())),
+        Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB,
+            Separator.QUALIFIERS),
+        Separator.encode(rowKey.getFlowName(), Separator.SPACE, Separator.TAB,
+            Separator.QUALIFIERS));
+  }
+
+  @Override
+  public FlowActivityRowKey decode(byte[] rowKey) {
+    byte[][] rowKeyComponents =
+        Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
+    if (rowKeyComponents.length != 4) {
+      throw new IllegalArgumentException("the row key is not valid for "
+          + "a flow activity");
+    }
+    String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[0]),
+        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+    Long dayTs =
+        TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[1]));
+    String userId = Separator.decode(Bytes.toString(rowKeyComponents[2]),
+        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+    String flowName = Separator.decode(Bytes.toString(rowKeyComponents[3]),
+        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+    return new FlowActivityRowKey(clusterId, dayTs, userId, flowName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b8cfa5c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
index 77f2ab2..0f14c89 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
@@ -26,10 +26,11 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
 
 /**
@@ -40,8 +41,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
   /**
    * To store flow run info values.
    */
-  METRIC(FlowRunColumnFamily.INFO, "m", null,
-      LongConverter.getInstance());
+  METRIC(FlowRunColumnFamily.INFO, "m", null, LongConverter.getInstance());
 
   private final ColumnHelper<FlowRunTable> column;
   private final ColumnFamily<FlowRunTable> columnFamily;
@@ -52,17 +52,14 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
    */
   private final String columnPrefix;
   private final byte[] columnPrefixBytes;
-  private final boolean compoundColQual;
 
   private final AggregationOperation aggOp;
 
   /**
    * Private constructor, meant to be used by the enum definition.
    *
-   * @param columnFamily
-   *          that this column is stored in.
-   * @param columnPrefix
-   *          for this column.
+   * @param columnFamily that this column is stored in.
+   * @param columnPrefix for this column.
    */
   private FlowRunColumnPrefix(ColumnFamily<FlowRunTable> columnFamily,
       String columnPrefix, AggregationOperation fra, ValueConverter converter) {
@@ -79,11 +76,10 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
       this.columnPrefixBytes = null;
     } else {
       // Future-proof by ensuring the right column prefix hygiene.
-      this.columnPrefixBytes = Bytes.toBytes(Separator.SPACE
-          .encode(columnPrefix));
+      this.columnPrefixBytes =
+          Bytes.toBytes(Separator.SPACE.encode(columnPrefix));
     }
     this.aggOp = fra;
-    this.compoundColQual = compoundColQual;
   }
 
   /**
@@ -99,14 +95,14 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
 
   @Override
   public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) {
-    return ColumnHelper.getColumnQualifier(
-        this.columnPrefixBytes, qualifierPrefix);
+    return ColumnHelper.getColumnQualifier(this.columnPrefixBytes,
+        qualifierPrefix);
   }
 
   @Override
   public byte[] getColumnPrefixBytes(String qualifierPrefix) {
-    return ColumnHelper.getColumnQualifier(
-        this.columnPrefixBytes, qualifierPrefix);
+    return ColumnHelper.getColumnQualifier(this.columnPrefixBytes,
+        qualifierPrefix);
   }
 
   @Override
@@ -139,8 +135,8 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
     }
 
     byte[] columnQualifier = getColumnPrefixBytes(qualifier);
-    Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
-        attributes, this.aggOp);
+    Attribute[] combinedAttributes =
+        TimelineStorageUtils.combineAttributes(attributes, this.aggOp);
     column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
         combinedAttributes);
   }
@@ -166,8 +162,8 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
     }
 
     byte[] columnQualifier = getColumnPrefixBytes(qualifier);
-    Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
-        attributes, this.aggOp);
+    Attribute[] combinedAttributes =
+        TimelineStorageUtils.combineAttributes(attributes, this.aggOp);
     column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
         combinedAttributes);
   }
@@ -180,8 +176,8 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
    * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String)
    */
   public Object readResult(Result result, String qualifier) throws IOException {
-    byte[] columnQualifier = ColumnHelper.getColumnQualifier(
-        this.columnPrefixBytes, qualifier);
+    byte[] columnQualifier =
+        ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
     return column.readResult(result, columnQualifier);
   }
 
@@ -190,10 +186,12 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
    *
    * @see
    * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
-   * #readResults(org.apache.hadoop.hbase.client.Result)
+   * #readResults(org.apache.hadoop.hbase.client.Result,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
    */
-  public Map<String, Object> readResults(Result result) throws IOException {
-    return column.readResults(result, columnPrefixBytes);
+  public <K> Map<K, Object> readResults(Result result,
+      KeyConverter<K> keyConverter) throws IOException {
+    return column.readResults(result, columnPrefixBytes, keyConverter);
   }
 
   /*
@@ -201,11 +199,14 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
    *
    * @see
    * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
-   * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result)
+   * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
    */
-  public <T> NavigableMap<String, NavigableMap<Long, T>>
-      readResultsWithTimestamps(Result result) throws IOException {
-    return column.readResultsWithTimestamps(result, columnPrefixBytes);
+  public <K, V> NavigableMap<K, NavigableMap<Long, V>>
+      readResultsWithTimestamps(Result result, KeyConverter<K> keyConverter)
+      throws IOException {
+    return column.readResultsWithTimestamps(result, columnPrefixBytes,
+        keyConverter);
   }
 
   /**
@@ -213,8 +214,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
    * no match. The following holds true: {@code columnFor(x) == columnFor(y)} if
    * and only if {@code x.equals(y)} or {@code (x == y == null)}
    *
-   * @param columnPrefix
-   *          Name of the column to retrieve
+   * @param columnPrefix Name of the column to retrieve
    * @return the corresponding {@link FlowRunColumnPrefix} or null
    */
   public static final FlowRunColumnPrefix columnFor(String columnPrefix) {
@@ -242,10 +242,8 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
    * {@code columnFor(a,x) == columnFor(b,y)} if and only if
    * {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)}
    *
-   * @param columnFamily
-   *          The columnFamily for which to retrieve the column.
-   * @param columnPrefix
-   *          Name of the column to retrieve
+   * @param columnFamily The columnFamily for which to retrieve the column.
+   * @param columnPrefix Name of the column to retrieve
    * @return the corresponding {@link FlowRunColumnPrefix} or null if both
    *         arguments don't match.
    */
@@ -267,20 +265,4 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
     // Default to null
     return null;
   }
-
-  @Override
-  public byte[] getCompoundColQualBytes(String qualifier,
-      byte[]...components) {
-    if (!compoundColQual) {
-      return ColumnHelper.getColumnQualifier(null, qualifier);
-    }
-    return ColumnHelper.getCompoundColumnQualifierBytes(qualifier, components);
-  }
-
-  @Override
-  public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result)
-      throws IOException {
-    // There are no compound column qualifiers for flow run table.
-    return null;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b8cfa5c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
index eac8f05..925242b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
@@ -17,10 +17,6 @@
  */
 package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
 
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-
 /**
  * Represents a rowkey for the flow run table.
  */
@@ -28,10 +24,10 @@ public class FlowRunRowKey {
   private final String clusterId;
   private final String userId;
   private final String flowName;
-  private final long flowRunId;
+  private final Long flowRunId;
 
   public FlowRunRowKey(String clusterId, String userId, String flowName,
-      long flowRunId) {
+      Long flowRunId) {
     this.clusterId = clusterId;
     this.userId = userId;
     this.flowName = flowName;
@@ -50,7 +46,7 @@ public class FlowRunRowKey {
     return flowName;
   }
 
-  public long getFlowRunId() {
+  public Long getFlowRunId() {
     return flowRunId;
   }
 
@@ -65,13 +61,13 @@ public class FlowRunRowKey {
    */
   public static byte[] getRowKeyPrefix(String clusterId, String userId,
       String flowName) {
-    return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, userId,
-        flowName, ""));
+    return FlowRunRowKeyConverter.getInstance().encode(new FlowRunRowKey(
+        clusterId, userId, flowName, null));
   }
 
   /**
    * Constructs a row key for the entity table as follows: {
-   * clusterId!userI!flowName!Inverted Flow Run Id}.
+   * clusterId!userId!flowName!Inverted Flow Run Id}.
    *
    * @param clusterId Cluster Id.
    * @param userId User Id.
@@ -81,12 +77,8 @@ public class FlowRunRowKey {
    */
   public static byte[] getRowKey(String clusterId, String userId,
       String flowName, Long flowRunId) {
-    byte[] first = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId,
-        userId, flowName));
-    // Note that flowRunId is a long, so we can't encode them all at the same
-    // time.
-    byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
-    return Separator.QUALIFIERS.join(first, second);
+    return FlowRunRowKeyConverter.getInstance().encode(new FlowRunRowKey(
+        clusterId, userId, flowName, flowRunId));
   }
 
   /**
@@ -96,22 +88,7 @@ public class FlowRunRowKey {
    * @return A <cite>FlowRunRowKey</cite> object.
    */
   public static FlowRunRowKey parseRowKey(byte[] rowKey) {
-    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
-
-    if (rowKeyComponents.length < 4) {
-      throw new IllegalArgumentException("the row key is not valid for " +
-          "a flow run");
-    }
-
-    String clusterId =
-        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0]));
-    String userId =
-        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1]));
-    String flowName =
-        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
-    long flowRunId =
-        TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
-    return new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
+    return FlowRunRowKeyConverter.getInstance().decode(rowKey);
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message