hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From varunsax...@apache.org
Subject [15/50] [abbrv] hadoop git commit: YARN-5355. Backported YARN-2928 into our branch-2 feature branch.
Date Sun, 06 Nov 2016 16:31:32 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
new file mode 100644
index 0000000..be55db5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
@@ -0,0 +1,388 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+/**
+ * This class is meant to be used only by explicit Columns, and not directly to
+ * write by clients.
+ *
+ * @param <T> refers to the table.
+ */
+public class ColumnHelper<T> {
+  private static final Log LOG = LogFactory.getLog(ColumnHelper.class);
+
+  private final ColumnFamily<T> columnFamily;
+
+  /**
+   * Local copy of bytes representation of columnFamily so that we can avoid
+   * cloning a new copy over and over.
+   */
+  private final byte[] columnFamilyBytes;
+
+  private final ValueConverter converter;
+
+  public ColumnHelper(ColumnFamily<T> columnFamily) {
+    this(columnFamily, GenericConverter.getInstance());
+  }
+
+  public ColumnHelper(ColumnFamily<T> columnFamily, ValueConverter converter) {
+    this.columnFamily = columnFamily;
+    columnFamilyBytes = columnFamily.getBytes();
+    if (converter == null) {
+      this.converter = GenericConverter.getInstance();
+    } else {
+      this.converter = converter;
+    }
+  }
+
+  /**
+   * Sends a Mutation to the table. The mutations will be buffered and sent over
+   * the wire as part of a batch.
+   *
+   * @param rowKey
+   *          identifying the row to write. Nothing gets written when null.
+   * @param tableMutator
+   *          used to modify the underlying HBase table
+   * @param columnQualifier
+   *          column qualifier. Nothing gets written when null.
+   * @param timestamp
+   *          version timestamp. When null the current timestamp multiplied with
+   *          TimestampGenerator.TS_MULTIPLIER and added with last 3 digits of
+   *          app id will be used
+   * @param inputValue
+   *          the value to write to the rowKey and column qualifier. Nothing
+   *          gets written when null.
+   * @param attributes Attributes to be set for HBase Put.
+   * @throws IOException if any problem occurs during store operation(sending
+   *          mutation to table).
+   */
+  public void store(byte[] rowKey, TypedBufferedMutator<?> tableMutator,
+      byte[] columnQualifier, Long timestamp, Object inputValue,
+      Attribute... attributes) throws IOException {
+    if ((rowKey == null) || (columnQualifier == null) || (inputValue == null)) {
+      return;
+    }
+    Put p = new Put(rowKey);
+    timestamp = getPutTimestamp(timestamp, attributes);
+    p.addColumn(columnFamilyBytes, columnQualifier, timestamp,
+        converter.encodeValue(inputValue));
+    if ((attributes != null) && (attributes.length > 0)) {
+      for (Attribute attribute : attributes) {
+        p.setAttribute(attribute.getName(), attribute.getValue());
+      }
+    }
+    tableMutator.mutate(p);
+  }
+
+  /*
+   * Figures out the cell timestamp used in the Put For storing into flow run
+   * table. We would like to left shift the timestamp and supplement it with the
+   * AppId id so that there are no collisions in the flow run table's cells
+   */
+  private long getPutTimestamp(Long timestamp, Attribute[] attributes) {
+    if (timestamp == null) {
+      timestamp = System.currentTimeMillis();
+    }
+    String appId = getAppIdFromAttributes(attributes);
+    long supplementedTS = TimestampGenerator.getSupplementedTimestamp(
+        timestamp, appId);
+    return supplementedTS;
+  }
+
+  private String getAppIdFromAttributes(Attribute[] attributes) {
+    if (attributes == null) {
+      return null;
+    }
+    String appId = null;
+    for (Attribute attribute : attributes) {
+      if (AggregationCompactionDimension.APPLICATION_ID.toString().equals(
+          attribute.getName())) {
+        appId = Bytes.toString(attribute.getValue());
+      }
+    }
+    return appId;
+  }
+
+  /**
+   * @return the column family for this column implementation.
+   */
+  public ColumnFamily<T> getColumnFamily() {
+    return columnFamily;
+  }
+
+  /**
+   * Get the latest version of this specified column. Note: this call clones the
+   * value content of the hosting {@link org.apache.hadoop.hbase.Cell Cell}.
+   *
+   * @param result from which to read the value. Cannot be null
+   * @param columnQualifierBytes referring to the column to be read.
+   * @return latest version of the specified column of whichever object was
+   *         written.
+   * @throws IOException if any problem occurs while reading result.
+   */
+  public Object readResult(Result result, byte[] columnQualifierBytes)
+      throws IOException {
+    if (result == null || columnQualifierBytes == null) {
+      return null;
+    }
+
+    // Would have preferred to be able to use getValueAsByteBuffer and get a
+    // ByteBuffer to avoid copy, but GenericObjectMapper doesn't seem to like
+    // that.
+    byte[] value = result.getValue(columnFamilyBytes, columnQualifierBytes);
+    return converter.decodeValue(value);
+  }
+
+  /**
+   * @param result from which to reads data with timestamps
+   * @param columnPrefixBytes optional prefix to limit columns. If null all
+   *          columns are returned.
+   * @param <K> identifies the type of column name(indicated by type of key
+   *     converter).
+   * @param <V> the type of the values. The values will be cast into that type.
+   * @param keyConverter used to convert column bytes to the appropriate key
+   *     type.
+   * @return the cell values at each respective time in for form
+   *         {@literal {idA={timestamp1->value1}, idA={timestamp2->value2},
+   *         idB={timestamp3->value3}, idC={timestamp1->value4}}}
+   * @throws IOException if any problem occurs while reading results.
+   */
+  @SuppressWarnings("unchecked")
+  public <K, V> NavigableMap<K, NavigableMap<Long, V>>
+      readResultsWithTimestamps(Result result, byte[] columnPrefixBytes,
+          KeyConverter<K> keyConverter) throws IOException {
+
+    NavigableMap<K, NavigableMap<Long, V>> results = new TreeMap<>();
+
+    if (result != null) {
+      NavigableMap<
+          byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> resultMap =
+              result.getMap();
+
+      NavigableMap<byte[], NavigableMap<Long, byte[]>> columnCellMap =
+          resultMap.get(columnFamilyBytes);
+
+      // could be that there is no such column family.
+      if (columnCellMap != null) {
+        for (Entry<byte[], NavigableMap<Long, byte[]>> entry : columnCellMap
+            .entrySet()) {
+          K converterColumnKey = null;
+          if (columnPrefixBytes == null) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("null prefix was specified; returning all columns");
+            }
+            try {
+              converterColumnKey = keyConverter.decode(entry.getKey());
+            } catch (IllegalArgumentException iae) {
+              LOG.error("Illegal column found, skipping this column.", iae);
+              continue;
+            }
+          } else {
+            // A non-null prefix means columns are actually of the form
+            // prefix!columnNameRemainder
+            byte[][] columnNameParts =
+                Separator.QUALIFIERS.split(entry.getKey(), 2);
+            byte[] actualColumnPrefixBytes = columnNameParts[0];
+            if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes)
+                && columnNameParts.length == 2) {
+              try {
+                // This is the prefix that we want
+                converterColumnKey = keyConverter.decode(columnNameParts[1]);
+              } catch (IllegalArgumentException iae) {
+                LOG.error("Illegal column found, skipping this column.", iae);
+                continue;
+              }
+            }
+          }
+
+          // If this column has the prefix we want
+          if (converterColumnKey != null) {
+            NavigableMap<Long, V> cellResults =
+                new TreeMap<Long, V>();
+            NavigableMap<Long, byte[]> cells = entry.getValue();
+            if (cells != null) {
+              for (Entry<Long, byte[]> cell : cells.entrySet()) {
+                V value =
+                    (V) converter.decodeValue(cell.getValue());
+                cellResults.put(
+                    TimestampGenerator.getTruncatedTimestamp(cell.getKey()),
+                    value);
+              }
+            }
+            results.put(converterColumnKey, cellResults);
+          }
+        } // for entry : columnCellMap
+      } // if columnCellMap != null
+    } // if result != null
+    return results;
+  }
+
+  /**
+   * @param <K> identifies the type of column name(indicated by type of key
+   *     converter).
+   * @param result from which to read columns
+   * @param columnPrefixBytes optional prefix to limit columns. If null all
+   *        columns are returned.
+   * @param keyConverter used to convert column bytes to the appropriate key
+   *          type.
+   * @return the latest values of columns in the column family. If the column
+   *         prefix is null, the column qualifier is returned as Strings. For a
+   *         non-null column prefix bytes, the column qualifier is returned as
+   *         a list of parts, each part a byte[]. This is to facilitate
+   *         returning byte arrays of values that were not Strings.
+   * @throws IOException if any problem occurs while reading results.
+   */
+  public <K> Map<K, Object> readResults(Result result,
+      byte[] columnPrefixBytes, KeyConverter<K> keyConverter)
+      throws IOException {
+    Map<K, Object> results = new HashMap<K, Object>();
+
+    if (result != null) {
+      Map<byte[], byte[]> columns = result.getFamilyMap(columnFamilyBytes);
+      for (Entry<byte[], byte[]> entry : columns.entrySet()) {
+        byte[] columnKey = entry.getKey();
+        if (columnKey != null && columnKey.length > 0) {
+
+          K converterColumnKey = null;
+          if (columnPrefixBytes == null) {
+            try {
+              converterColumnKey = keyConverter.decode(columnKey);
+            } catch (IllegalArgumentException iae) {
+              LOG.error("Illegal column found, skipping this column.", iae);
+              continue;
+            }
+          } else {
+            // A non-null prefix means columns are actually of the form
+            // prefix!columnNameRemainder
+            byte[][] columnNameParts = Separator.QUALIFIERS.split(columnKey, 2);
+            if (columnNameParts.length > 0) {
+              byte[] actualColumnPrefixBytes = columnNameParts[0];
+              // If this is the prefix that we want
+              if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes)
+                  && columnNameParts.length == 2) {
+                try {
+                  converterColumnKey = keyConverter.decode(columnNameParts[1]);
+                } catch (IllegalArgumentException iae) {
+                  LOG.error("Illegal column found, skipping this column.", iae);
+                  continue;
+                }
+              }
+            }
+          } // if-else
+
+          // If the columnPrefix is null (we want all columns), or the actual
+          // prefix matches the given prefix we want this column
+          if (converterColumnKey != null) {
+            Object value = converter.decodeValue(entry.getValue());
+            // we return the columnQualifier in parts since we don't know
+            // which part is of which data type.
+            results.put(converterColumnKey, value);
+          }
+        }
+      } // for entry
+    }
+    return results;
+  }
+
+  /**
+   * @param columnPrefixBytes The byte representation for the column prefix.
+   *          Should not contain {@link Separator#QUALIFIERS}.
+   * @param qualifier for the remainder of the column. Any
+   *          {@link Separator#QUALIFIERS} will be encoded in the qualifier.
+   * @return fully sanitized column qualifier that is a combination of prefix
+   *         and qualifier. If prefix is null, the result is simply the encoded
+   *         qualifier without any separator.
+   */
+  public static byte[] getColumnQualifier(byte[] columnPrefixBytes,
+      String qualifier) {
+
+    // We don't want column names to have spaces / tabs.
+    byte[] encodedQualifier =
+        Separator.encode(qualifier, Separator.SPACE, Separator.TAB);
+    if (columnPrefixBytes == null) {
+      return encodedQualifier;
+    }
+
+    // Convert qualifier to lower case, strip of separators and tag on column
+    // prefix.
+    byte[] columnQualifier =
+        Separator.QUALIFIERS.join(columnPrefixBytes, encodedQualifier);
+    return columnQualifier;
+  }
+
+  /**
+   * @param columnPrefixBytes The byte representation for the column prefix.
+   *          Should not contain {@link Separator#QUALIFIERS}.
+   * @param qualifier for the remainder of the column.
+   * @return fully sanitized column qualifier that is a combination of prefix
+   *         and qualifier. If prefix is null, the result is simply the encoded
+   *         qualifier without any separator.
+   */
+  public static byte[] getColumnQualifier(byte[] columnPrefixBytes,
+      long qualifier) {
+
+    if (columnPrefixBytes == null) {
+      return Bytes.toBytes(qualifier);
+    }
+
+    // Convert qualifier to lower case, strip of separators and tag on column
+    // prefix.
+    byte[] columnQualifier =
+        Separator.QUALIFIERS.join(columnPrefixBytes, Bytes.toBytes(qualifier));
+    return columnQualifier;
+  }
+
+  public ValueConverter getValueConverter() {
+    return converter;
+  }
+
+  /**
+   * @param columnPrefixBytes The byte representation for the column prefix.
+   *          Should not contain {@link Separator#QUALIFIERS}.
+   * @param qualifier the byte representation for the remainder of the column.
+   * @return fully sanitized column qualifier that is a combination of prefix
+   *         and qualifier. If prefix is null, the result is simply the encoded
+   *         qualifier without any separator.
+   */
+  public static byte[] getColumnQualifier(byte[] columnPrefixBytes,
+      byte[] qualifier) {
+
+    if (columnPrefixBytes == null) {
+      return qualifier;
+    }
+
+    byte[] columnQualifier =
+        Separator.QUALIFIERS.join(columnPrefixBytes, qualifier);
+    return columnQualifier;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
new file mode 100644
index 0000000..89aa013
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+
+/**
+ * Used to represent a partially qualified column, where the actual column name
+ * will be composed of a prefix and the remainder of the column qualifier. The
+ * prefix can be null, in which case the column qualifier will be completely
+ * determined when the values are stored.
+ */
+public interface ColumnPrefix<T> {
+
+  /**
+   * Sends a Mutation to the table. The mutations will be buffered and sent over
+   * the wire as part of a batch.
+   *
+   * @param rowKey identifying the row to write. Nothing gets written when null.
+   * @param tableMutator used to modify the underlying HBase table. Caller is
+   *          responsible to pass a mutator for the table that actually has this
+   *          column.
+   * @param qualifier column qualifier. Nothing gets written when null.
+   * @param timestamp version timestamp. When null the server timestamp will be
+   *          used.
+   * @param attributes attributes for the mutation that are used by the
+   *          coprocessor to set/read the cell tags.
+   * @param inputValue the value to write to the rowKey and column qualifier.
+   *          Nothing gets written when null.
+   * @throws IOException if there is any exception encountered while doing
+   *     store operation(sending mutation to the table).
+   */
+  void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
+      byte[] qualifier, Long timestamp, Object inputValue,
+      Attribute... attributes) throws IOException;
+
+  /**
+   * Sends a Mutation to the table. The mutations will be buffered and sent over
+   * the wire as part of a batch.
+   *
+   * @param rowKey identifying the row to write. Nothing gets written when null.
+   * @param tableMutator used to modify the underlying HBase table. Caller is
+   *          responsible to pass a mutator for the table that actually has this
+   *          column.
+   * @param qualifier column qualifier. Nothing gets written when null.
+   * @param timestamp version timestamp. When null the server timestamp will be
+   *          used.
+   * @param attributes attributes for the mutation that are used by the
+   *          coprocessor to set/read the cell tags.
+   * @param inputValue the value to write to the rowKey and column qualifier.
+   *          Nothing gets written when null.
+   * @throws IOException if there is any exception encountered while doing
+   *     store operation(sending mutation to the table).
+   */
+  void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
+      String qualifier, Long timestamp, Object inputValue,
+      Attribute... attributes) throws IOException;
+
+  /**
+   * Get the latest version of this specified column. Note: this call clones the
+   * value content of the hosting {@link org.apache.hadoop.hbase.Cell Cell}.
+   *
+   * @param result Cannot be null
+   * @param qualifier column qualifier. Nothing gets read when null.
+   * @return result object (can be cast to whatever object was written to) or
+   *         null when specified column qualifier for this prefix doesn't exist
+   *         in the result.
+   * @throws IOException if there is any exception encountered while reading
+   *     result.
+   */
+  Object readResult(Result result, String qualifier) throws IOException;
+
+  /**
+   *
+   * @param <K> identifies the type of key converter.
+   * @param result from which to read columns.
+   * @param keyConverter used to convert column bytes to the appropriate key
+   *          type
+   * @return the latest values of columns in the column family with this prefix
+   *         (or all of them if the prefix value is null).
+   * @throws IOException if there is any exception encountered while reading
+   *           results.
+   */
+  <K> Map<K, Object> readResults(Result result, KeyConverter<K> keyConverter)
+      throws IOException;
+
+  /**
+   * @param result from which to reads data with timestamps.
+   * @param <K> identifies the type of key converter.
+   * @param <V> the type of the values. The values will be cast into that type.
+   * @param keyConverter used to convert column bytes to the appropriate key
+   *     type.
+   * @return the cell values at each respective time in for form
+   *         {@literal {idA={timestamp1->value1}, idA={timestamp2->value2},
+   *         idB={timestamp3->value3}, idC={timestamp1->value4}}}
+   * @throws IOException if there is any exception encountered while reading
+   *     result.
+   */
+  <K, V> NavigableMap<K, NavigableMap<Long, V>> readResultsWithTimestamps(
+      Result result, KeyConverter<K> keyConverter) throws IOException;
+
+  /**
+   * @param qualifierPrefix Column qualifier or prefix of qualifier.
+   * @return a byte array encoding column prefix and qualifier/prefix passed.
+   */
+  byte[] getColumnPrefixBytes(String qualifierPrefix);
+
+  /**
+   * @param qualifierPrefix Column qualifier or prefix of qualifier.
+   * @return a byte array encoding column prefix and qualifier/prefix passed.
+   */
+  byte[] getColumnPrefixBytes(byte[] qualifierPrefix);
+
+  /**
+   * Returns column family name(as bytes) associated with this column prefix.
+   * @return a byte array encoding column family for this prefix.
+   */
+  byte[] getColumnFamilyBytes();
+
+  /**
+   * Returns value converter implementation associated with this column prefix.
+   * @return a {@link ValueConverter} implementation.
+   */
+  ValueConverter getValueConverter();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java
new file mode 100644
index 0000000..8445575
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+/**
+ * Encapsulates information about Event column names for application and entity
+ * tables. Used while encoding/decoding event column names.
+ */
+public class EventColumnName {
+
+  private final String id;
+  private final Long timestamp;
+  private final String infoKey;
+  private final KeyConverter<EventColumnName> eventColumnNameConverter =
+      new EventColumnNameConverter();
+
+  public EventColumnName(String id, Long timestamp, String infoKey) {
+    this.id = id;
+    this.timestamp = timestamp;
+    this.infoKey = infoKey;
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public Long getTimestamp() {
+    return timestamp;
+  }
+
+  public String getInfoKey() {
+    return infoKey;
+  }
+
+  /**
+   * @return a byte array with each components/fields separated by
+   *         Separator#VALUES. This leads to an event column name of the form
+   *         eventId=timestamp=infokey. If both timestamp and infokey are null,
+   *         then a qualifier of the form eventId=timestamp= is returned. If
+   *         only infokey is null, then a qualifier of the form eventId= is
+   *         returned. These prefix forms are useful for queries that intend to
+   *         retrieve more than one specific column name.
+   */
+  public byte[] getColumnQualifier() {
+    return eventColumnNameConverter.encode(this);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java
new file mode 100644
index 0000000..d3ef897
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Encodes and decodes event column names for application and entity tables.
+ * The event column name is of the form : eventId=timestamp=infokey.
+ * If info is not associated with the event, event column name is of the form :
+ * eventId=timestamp=
+ * Event timestamp is long and rest are strings.
+ * Column prefixes are not part of the eventcolumn name passed for encoding. It
+ * is added later, if required in the associated ColumnPrefix implementations.
+ */
+public final class EventColumnNameConverter
+    implements KeyConverter<EventColumnName> {
+
+  public EventColumnNameConverter() {
+  }
+
+  // eventId=timestamp=infokey are of types String, Long String
+  // Strings are variable in size (i.e. end whenever separator is encountered).
+  // This is used while decoding and helps in determining where to split.
+  private static final int[] SEGMENT_SIZES = {
+      Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE };
+
+  /*
+   * (non-Javadoc)
+   *
+   * Encodes EventColumnName into a byte array with each component/field in
+   * EventColumnName separated by Separator#VALUES. This leads to an event
+   * column name of the form eventId=timestamp=infokey.
+   * If timestamp in passed EventColumnName object is null (eventId is not null)
+   * this returns a column prefix of the form eventId= and if infokey in
+   * EventColumnName is null (other 2 components are not null), this returns a
+   * column name of the form eventId=timestamp=
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+   * #encode(java.lang.Object)
+   */
+  @Override
+  public byte[] encode(EventColumnName key) {
+    byte[] first = Separator.encode(key.getId(), Separator.SPACE, Separator.TAB,
+        Separator.VALUES);
+    if (key.getTimestamp() == null) {
+      return Separator.VALUES.join(first, Separator.EMPTY_BYTES);
+    }
+    byte[] second = Bytes.toBytes(
+        LongConverter.invertLong(key.getTimestamp()));
+    if (key.getInfoKey() == null) {
+      return Separator.VALUES.join(first, second, Separator.EMPTY_BYTES);
+    }
+    return Separator.VALUES.join(first, second, Separator.encode(
+        key.getInfoKey(), Separator.SPACE, Separator.TAB, Separator.VALUES));
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * Decodes an event column name of the form eventId=timestamp= or
+   * eventId=timestamp=infoKey represented in byte format and converts it into
+   * an EventColumnName object.
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+   * #decode(byte[])
+   */
+  @Override
+  public EventColumnName decode(byte[] bytes) {
+    byte[][] components = Separator.VALUES.split(bytes, SEGMENT_SIZES);
+    if (components.length != 3) {
+      throw new IllegalArgumentException("the column name is not valid");
+    }
+    String id = Separator.decode(Bytes.toString(components[0]),
+        Separator.VALUES, Separator.TAB, Separator.SPACE);
+    Long ts = LongConverter.invertLong(Bytes.toLong(components[1]));
+    String infoKey = components[2].length == 0 ? null :
+        Separator.decode(Bytes.toString(components[2]),
+            Separator.VALUES, Separator.TAB, Separator.SPACE);
+    return new EventColumnName(id, ts, infoKey);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/GenericConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/GenericConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/GenericConverter.java
new file mode 100644
index 0000000..c34bfcb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/GenericConverter.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+
+/**
+ * Uses GenericObjectMapper to encode objects as bytes and decode bytes as
+ * objects.
+ */
+public final class GenericConverter implements ValueConverter {
+  private static final GenericConverter INSTANCE = new GenericConverter();
+
+  private GenericConverter() {
+  }
+
+  public static GenericConverter getInstance() {
+    return INSTANCE;
+  }
+
+  @Override
+  public byte[] encodeValue(Object value) throws IOException {
+    return GenericObjectMapper.write(value);
+  }
+
+  @Override
+  public Object decodeValue(byte[] bytes) throws IOException {
+    return GenericObjectMapper.read(bytes);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/KeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/KeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/KeyConverter.java
new file mode 100644
index 0000000..4229e81
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/KeyConverter.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+/**
+ * Interface which has to be implemented for encoding and decoding row keys and
+ * columns.
+ */
+public interface KeyConverter<T> {
+  /**
+   * Encodes a key as a byte array.
+   *
+   * @param key key to be encoded.
+   * @return a byte array.
+   */
+  byte[] encode(T key);
+
+  /**
+   * Decodes a byte array and returns a key of type T.
+   *
+   * @param bytes byte representation
+   * @return an object(key) of type T which has been constructed after decoding
+   * the bytes.
+   */
+  T decode(byte[] bytes);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java
new file mode 100644
index 0000000..600601a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Encodes a value by interpreting it as a Long and converting it to bytes and
+ * decodes a set of bytes as a Long.
+ */
+public final class LongConverter implements NumericValueConverter,
+    Serializable {
+
+  /**
+   * Added because we implement Comparator<Number>.
+   */
+  private static final long serialVersionUID = 1L;
+
+  public LongConverter() {
+  }
+
+  @Override
+  public byte[] encodeValue(Object value) throws IOException {
+    if (!TimelineStorageUtils.isIntegralValue(value)) {
+      throw new IOException("Expected integral value");
+    }
+    return Bytes.toBytes(((Number)value).longValue());
+  }
+
+  @Override
+  public Object decodeValue(byte[] bytes) throws IOException {
+    if (bytes == null) {
+      return null;
+    }
+    return Bytes.toLong(bytes);
+  }
+
+  /**
+   * Compares two numbers as longs. If either number is null, it will be taken
+   * as 0.
+   *
+   * @param num1 the first {@code Long} to compare.
+   * @param num2 the second {@code Long} to compare.
+   * @return -1 if num1 is less than num2, 0 if num1 is equal to num2 and 1 if
+   * num1 is greater than num2.
+   */
+  @Override
+  public int compare(Number num1, Number num2) {
+    return Long.compare((num1 == null) ? 0L : num1.longValue(),
+        (num2 == null) ? 0L : num2.longValue());
+  }
+
+  @Override
+  public Number add(Number num1, Number num2, Number...numbers) {
+    long sum = ((num1 == null) ? 0L : num1.longValue()) +
+        ((num2 == null) ? 0L : num2.longValue());
+    for (Number num : numbers) {
+      sum = sum + ((num == null) ? 0L : num.longValue());
+    }
+    return sum;
+  }
+
+  /**
+   * Converts a timestamp into it's inverse timestamp to be used in (row) keys
+   * where we want to have the most recent timestamp in the top of the table
+   * (scans start at the most recent timestamp first).
+   *
+   * @param key value to be inverted so that the latest version will be first in
+   *          a scan.
+   * @return inverted long
+   */
+  public static long invertLong(long key) {
+    return Long.MAX_VALUE - key;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java
new file mode 100644
index 0000000..4a724d6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+/**
+ * Encodes and decodes column names / row keys which are long.
+ */
+public final class LongKeyConverter implements KeyConverter<Long> {
+
+  /**
+   * To delegate the actual work to.
+   */
+  private final LongConverter longConverter = new LongConverter();
+
+  public LongKeyConverter() {
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+   * #encode(java.lang.Object)
+   */
+  @Override
+  public byte[] encode(Long key) {
+    try {
+      // IOException will not be thrown here as we are explicitly passing
+      // Long.
+      return longConverter.encodeValue(key);
+    } catch (IOException e) {
+      return null;
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+   * #decode(byte[])
+   */
+  @Override
+  public Long decode(byte[] bytes) {
+    try {
+      return (Long) longConverter.decodeValue(bytes);
+    } catch (IOException e) {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/NumericValueConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/NumericValueConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/NumericValueConverter.java
new file mode 100644
index 0000000..8fb6536
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/NumericValueConverter.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.util.Comparator;
+
+/**
+ * Extends ValueConverter interface for numeric converters to support numerical
+ * operations such as comparison, addition, etc.
+ */
+public interface NumericValueConverter extends ValueConverter,
+    Comparator<Number> {
+  /**
+   * Adds two or more numbers. If either of the numbers are null, it is taken as
+   * 0.
+   *
+   * @param num1 the first number to add.
+   * @param num2 the second number to add.
+   * @param numbers Rest of the numbers to be added.
+   * @return result after adding up the numbers.
+   */
+  Number add(Number num1, Number num2, Number...numbers);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/OfflineAggregationInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/OfflineAggregationInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/OfflineAggregationInfo.java
new file mode 100644
index 0000000..3dc5f51
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/OfflineAggregationInfo.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * Class to carry the offline aggregation information for storage level
+ * implementations. There are currently two predefined aggregation info
+ * instances that represent flow and user level offline aggregations. Depend on
+ * its implementation, a storage class may use an OfflineAggregationInfo object
+ * to decide behaviors dynamically.
+ */
+public final class OfflineAggregationInfo {
+  /**
+   * Default flow level aggregation table name.
+   */
+  @VisibleForTesting
+  public static final String FLOW_AGGREGATION_TABLE_NAME
+      = "yarn_timeline_flow_aggregation";
+  /**
+   * Default user level aggregation table name.
+   */
+  public static final String USER_AGGREGATION_TABLE_NAME
+      = "yarn_timeline_user_aggregation";
+
+  // These lists are not taking effects in table creations.
+  private static final String[] FLOW_AGGREGATION_PK_LIST = {
+      "user", "cluster", "flow_name"
+  };
+  private static final String[] USER_AGGREGATION_PK_LIST = {
+      "user", "cluster"
+  };
+
+  private final String tableName;
+  private final String[] primaryKeyList;
+  private final PrimaryKeyStringSetter primaryKeyStringSetter;
+
+  private OfflineAggregationInfo(String table, String[] pkList,
+      PrimaryKeyStringSetter formatter) {
+    tableName = table;
+    primaryKeyList = pkList;
+    primaryKeyStringSetter = formatter;
+  }
+
+  private interface PrimaryKeyStringSetter {
+    int setValues(PreparedStatement ps, TimelineCollectorContext context,
+        String[] extraInfo, int startPos) throws SQLException;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public String[] getPrimaryKeyList() {
+    return primaryKeyList.clone();
+  }
+
+  public int setStringsForPrimaryKey(PreparedStatement ps,
+      TimelineCollectorContext context, String[] extraInfo, int startPos)
+      throws SQLException {
+    return primaryKeyStringSetter.setValues(ps, context, extraInfo, startPos);
+  }
+
+  public static final OfflineAggregationInfo FLOW_AGGREGATION =
+      new OfflineAggregationInfo(FLOW_AGGREGATION_TABLE_NAME,
+          FLOW_AGGREGATION_PK_LIST,
+          new PrimaryKeyStringSetter() {
+          @Override
+          public int setValues(PreparedStatement ps,
+              TimelineCollectorContext context, String[] extraInfo,
+              int startPos) throws SQLException {
+            int idx = startPos;
+            ps.setString(idx++, context.getUserId());
+            ps.setString(idx++, context.getClusterId());
+            ps.setString(idx++, context.getFlowName());
+            return idx;
+          }
+        });
+
+  public static final OfflineAggregationInfo USER_AGGREGATION =
+      new OfflineAggregationInfo(USER_AGGREGATION_TABLE_NAME,
+          USER_AGGREGATION_PK_LIST,
+          new PrimaryKeyStringSetter() {
+          @Override
+          public int setValues(PreparedStatement ps,
+              TimelineCollectorContext context, String[] extraInfo,
+              int startPos) throws SQLException {
+            int idx = startPos;
+            ps.setString(idx++, context.getUserId());
+            ps.setString(idx++, context.getClusterId());
+            return idx;
+          }
+        });
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Range.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Range.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Range.java
new file mode 100644
index 0000000..8a2e01a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Range.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Encapsulates a range with start and end indices.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class Range {
+  private final int startIdx;
+  private final int endIdx;
+
+  /**
+   * Defines a range from start index (inclusive) to end index (exclusive).
+   *
+   * @param start
+   *          Starting index position
+   * @param end
+   *          Ending index position (exclusive)
+   */
+  public Range(int start, int end) {
+    if (start < 0 || end < start) {
+      throw new IllegalArgumentException(
+          "Invalid range, required that: 0 <= start <= end; start=" + start
+              + ", end=" + end);
+    }
+
+    this.startIdx = start;
+    this.endIdx = end;
+  }
+
+  public int start() {
+    return startIdx;
+  }
+
+  public int end() {
+    return endIdx;
+  }
+
+  public int length() {
+    return endIdx - startIdx;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/RowKeyPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/RowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/RowKeyPrefix.java
new file mode 100644
index 0000000..6159dc7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/RowKeyPrefix.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+/**
+ * In queries where a single result is needed, an exact rowkey can be used
+ * through the corresponding rowkey#getRowKey() method. For queries that need to
+ * scan over a range of rowkeys, a partial (the initial part) of rowkeys are
+ * used. Classes implementing RowKeyPrefix indicate that they are the initial
+ * part of rowkeys, with different constructors with fewer number of argument to
+ * form a partial rowkey, a prefix.
+ *
+ * @param <R> indicating the type of rowkey that a particular implementation is
+ *          a prefix for.
+ */
+public interface RowKeyPrefix<R> {
+
+  /**
+   * Create a row key prefix, meaning a partial rowkey that can be used in range
+   * scans. Which fields are included in the prefix will depend on the
+   * constructor of the specific instance that was used. Output depends on which
+   * constructor was used.
+   * @return a prefix of the following form {@code fist!second!...!last!}
+   */
+  byte[] getRowKeyPrefix();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
new file mode 100644
index 0000000..5090b4d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Used to separate row qualifiers, column qualifiers and compound fields.
+ */
+public enum Separator {
+
+  /**
+   * separator in key or column qualifier fields.
+   */
+  QUALIFIERS("!", "%0$"),
+
+  /**
+   * separator in values, and/or compound key/column qualifier fields.
+   */
+  VALUES("=", "%1$"),
+
+  /**
+   * separator in values, often used to avoid having these in qualifiers and
+   * names. Note that if we use HTML form encoding through URLEncoder, we end up
+   * getting a + for a space, which may already occur in strings, so we don't
+   * want that.
+   */
+  SPACE(" ", "%2$"),
+
+  /**
+   * separator in values, often used to avoid having these in qualifiers and
+   * names.
+   */
+  TAB("\t", "%3$");
+
+  // a reserved character that starts each of the encoded values and is encoded
+  // first in order to escape naturally occurring instances of encoded values
+  // although it can be expressed as an enum instance, we define them as private
+  // variables to hide it from callers
+  private static final String PERCENT = "%";
+  private static final String PERCENT_ENCODED = "%9$";
+
+  private static final Pattern PERCENT_PATTERN =
+      Pattern.compile(PERCENT, Pattern.LITERAL);
+  private static final String PERCENT_REPLACEMENT =
+      Matcher.quoteReplacement(PERCENT);
+
+  private static final Pattern PERCENT_ENCODED_PATTERN =
+      Pattern.compile(PERCENT_ENCODED, Pattern.LITERAL);
+  private static final String PERCENT_ENCODED_REPLACEMENT =
+      Matcher.quoteReplacement(PERCENT_ENCODED);
+
+  /**
+   * The string value of this separator.
+   */
+  private final String value;
+
+  /**
+   * The bye representation of value.
+   */
+  private final byte[] bytes;
+
+  // pre-compiled patterns and quoted replacements for optimization
+  private final Pattern valuePattern;
+  private final String valueReplacement;
+
+  private final Pattern encodedValuePattern;
+  private final String encodedValueReplacement;
+
+  /**
+   * Indicator for variable size of an individual segment in a split. The
+   * segment ends wherever separator is encountered.
+   * Typically used for string.
+   * Also used to indicate that there is no fixed number of splits which need to
+   * be returned. If split limit is specified as this, all possible splits are
+   * returned.
+   */
+  public static final int VARIABLE_SIZE = 0;
+
+
+  /** empty string. */
+  public static final String EMPTY_STRING = "";
+
+  /** empty bytes. */
+  public static final byte[] EMPTY_BYTES = new byte[0];
+
+  /**
+   * @param value of the separator to use. Cannot be null or empty string.
+   * @param encodedValue choose something that isn't likely to occur in the data
+   *          itself. Cannot be null or empty string.
+   */
+  private Separator(String value, String encodedValue) {
+    this.value = value;
+
+    // validation
+    if (value == null || value.length() == 0 || encodedValue == null
+        || encodedValue.length() == 0) {
+      throw new IllegalArgumentException(
+          "Cannot create separator from null or empty string.");
+    }
+
+    this.bytes = Bytes.toBytes(value);
+    this.valuePattern = Pattern.compile(value, Pattern.LITERAL);
+    this.valueReplacement = Matcher.quoteReplacement(value);
+
+    this.encodedValuePattern = Pattern.compile(encodedValue, Pattern.LITERAL);
+    this.encodedValueReplacement = Matcher.quoteReplacement(encodedValue);
+  }
+
+  /**
+   * @return the original value of the separator
+   */
+  public String getValue() {
+    return value;
+  }
+
+  /**
+   * Used to make token safe to be used with this separator without collisions.
+   * It <em>must</em> be paired with {@link #decode(String)} for it to be
+   * decoded correctly.
+   * <p>
+   * If you need to encode a given string for multiple separators,
+   * {@link #encode(String, Separator...)} should be used over successive
+   * invocations of this method. It will result in a more compact version of the
+   * encoded value.
+   *
+   * @param token Token to be encoded.
+   * @return the token with any occurrences of this separator URLEncoded.
+   */
+  public String encode(String token) {
+    if (token == null || token.length() == 0) {
+      // Nothing to replace
+      return token;
+    }
+    // first encode the percent to escape naturally occurring encoded values
+    String escaped = encodePercent(token);
+    return encodeSingle(escaped, this);
+  }
+
+  private static String replace(String token, Pattern pattern,
+      String replacement) {
+    return pattern.matcher(token).replaceAll(replacement);
+  }
+
+  private static String encodeSingle(String token, Separator separator) {
+    return replace(token, separator.valuePattern,
+        separator.encodedValueReplacement);
+  }
+
+  private static String encodePercent(String token) {
+    return replace(token, PERCENT_PATTERN, PERCENT_ENCODED_REPLACEMENT);
+  }
+
+  /**
+   * Decode the token encoded using {@link #encode(String)}. It <em>must</em> be
+   * used for the result encoded with {@link #encode(String)} to be able to
+   * recover the original.
+   *
+   * @param token Token to be decoded.
+   * @return the token with any occurrences of the encoded separator replaced by
+   *         the separator itself.
+   */
+  public String decode(String token) {
+    if (token == null || token.length() == 0) {
+      // Nothing to replace
+      return token;
+    }
+    String escaped = decodeSingle(token, this);
+    // decode percent to de-escape
+    return decodePercent(escaped);
+  }
+
+  private static String decodeSingle(String token, Separator separator) {
+    return replace(token, separator.encodedValuePattern,
+        separator.valueReplacement);
+  }
+
+  private static String decodePercent(String token) {
+    return replace(token, PERCENT_ENCODED_PATTERN, PERCENT_REPLACEMENT);
+  }
+
+  /**
+   * Encode the given separators in the token with their encoding equivalents.
+   * It <em>must</em> be paired with {@link #decode(byte[], Separator...)} or
+   * {@link #decode(String, Separator...)} with the same separators for it to be
+   * decoded correctly.
+   * <p>
+   * If you need to encode a given string for multiple separators, this form of
+   * encoding should be used over successive invocations of
+   * {@link #encode(String)}. It will result in a more compact version of the
+   * encoded value.
+   *
+   * @param token containing possible separators that need to be encoded.
+   * @param separators to be encoded in the token with their URLEncoding
+   *          equivalent.
+   * @return non-null byte representation of the token with occurrences of the
+   *         separators encoded.
+   */
+  public static byte[] encode(String token, Separator... separators) {
+    if (token == null || token.length() == 0) {
+      return EMPTY_BYTES;
+    }
+    String result = token;
+    // first encode the percent to escape naturally occurring encoded values
+    result = encodePercent(token);
+    for (Separator separator : separators) {
+      if (separator != null) {
+        result = encodeSingle(result, separator);
+      }
+    }
+    return Bytes.toBytes(result);
+  }
+
+  /**
+   * Decode the given separators in the token with their decoding equivalents.
+   * It <em>must</em> be used for the result encoded with
+   * {@link #encode(String, Separator...)} with the same separators to be able
+   * to recover the original.
+   *
+   * @param token containing possible separators that need to be encoded.
+   * @param separators to be encoded in the token with their URLEncoding
+   *          equivalent.
+   * @return String representation of the token with occurrences of the URL
+   *         encoded separators decoded.
+   */
+  public static String decode(byte[] token, Separator... separators) {
+    if (token == null) {
+      return null;
+    }
+    return decode(Bytes.toString(token), separators);
+  }
+
+  /**
+   * Decode the given separators in the token with their decoding equivalents.
+   * It <em>must</em> be used for the result encoded with
+   * {@link #encode(String, Separator...)} with the same separators to be able
+   * to recover the original.
+   *
+   * @param token containing possible separators that need to be encoded.
+   * @param separators to be encoded in the token with their URLEncoding
+   *          equivalent.
+   * @return String representation of the token with occurrences of the URL
+   *         encoded separators decoded.
+   */
+  public static String decode(String token, Separator... separators) {
+    if (token == null) {
+      return null;
+    }
+    String result = token;
+    for (Separator separator : separators) {
+      if (separator != null) {
+        result = decodeSingle(result, separator);
+      }
+    }
+    // decode percent to de-escape
+    return decodePercent(result);
+  }
+
+  /**
+   * Returns a single byte array containing all of the individual arrays
+   * components separated by this separator.
+   *
+   * @param components Byte array components to be joined together.
+   * @return byte array after joining the components
+   */
+  public byte[] join(byte[]... components) {
+    if (components == null || components.length == 0) {
+      return EMPTY_BYTES;
+    }
+
+    int finalSize = 0;
+    finalSize = this.value.length() * (components.length - 1);
+    for (byte[] comp : components) {
+      if (comp != null) {
+        finalSize += comp.length;
+      }
+    }
+
+    byte[] buf = new byte[finalSize];
+    int offset = 0;
+    for (int i = 0; i < components.length; i++) {
+      if (components[i] != null) {
+        System.arraycopy(components[i], 0, buf, offset, components[i].length);
+        offset += components[i].length;
+      }
+      if (i < (components.length - 1)) {
+        System.arraycopy(this.bytes, 0, buf, offset, this.value.length());
+        offset += this.value.length();
+      }
+    }
+    return buf;
+  }
+
+  /**
+   * Concatenates items (as String), using this separator.
+   *
+   * @param items Items join, {@code toString()} will be called in each item.
+   *          Any occurrence of the separator in the individual strings will be
+   *          first encoded. Cannot be null.
+   * @return non-null joined result. Note that when separator is {@literal null}
+   *         the result is simply all items concatenated and the process is not
+   *         reversible through {@link #splitEncoded(String)}
+   */
+  public String joinEncoded(String... items) {
+    if (items == null || items.length == 0) {
+      return "";
+    }
+
+    StringBuilder sb = new StringBuilder(encode(items[0].toString()));
+    // Start at 1, we've already grabbed the first value at index 0
+    for (int i = 1; i < items.length; i++) {
+      sb.append(this.value);
+      sb.append(encode(items[i].toString()));
+    }
+
+    return sb.toString();
+  }
+
+  /**
+   * Concatenates items (as String), using this separator.
+   *
+   * @param items Items join, {@code toString()} will be called in each item.
+   *          Any occurrence of the separator in the individual strings will be
+   *          first encoded. Cannot be null.
+   * @return non-null joined result. Note that when separator is {@literal null}
+   *         the result is simply all items concatenated and the process is not
+   *         reversible through {@link #splitEncoded(String)}
+   */
+  public String joinEncoded(Iterable<?> items) {
+    if (items == null) {
+      return "";
+    }
+    Iterator<?> i = items.iterator();
+    if (!i.hasNext()) {
+      return "";
+    }
+
+    StringBuilder sb = new StringBuilder(encode(i.next().toString()));
+    while (i.hasNext()) {
+      sb.append(this.value);
+      sb.append(encode(i.next().toString()));
+    }
+
+    return sb.toString();
+  }
+
+  /**
+   * @param compoundValue containing individual values separated by this
+   *          separator, which have that separator encoded.
+   * @return non-null set of values from the compoundValue with the separator
+   *         decoded.
+   */
+  public Collection<String> splitEncoded(String compoundValue) {
+    List<String> result = new ArrayList<String>();
+    if (compoundValue != null) {
+      for (String val : valuePattern.split(compoundValue)) {
+        result.add(decode(val));
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Splits the source array into multiple array segments using this separator,
+   * up to a maximum of count items. This will naturally produce copied byte
+   * arrays for each of the split segments.
+   *
+   * @param source to be split
+   * @param limit on how many segments are supposed to be returned. A
+   *          non-positive value indicates no limit on number of segments.
+   * @return source split by this separator.
+   */
+  public byte[][] split(byte[] source, int limit) {
+    return split(source, this.bytes, limit);
+  }
+
+  /**
+   * Splits the source array into multiple array segments using this separator.
+   * The sizes indicate the sizes of the relative components/segments.
+   * In case one of the segments contains this separator before the specified
+   * size is reached, the separator will be considered part of that segment and
+   * we will continue till size is reached.
+   * Variable length strings cannot contain this separator and are indiced with
+   * a size of {@value #VARIABLE_SIZE}. Such strings are encoded for this
+   * separator and decoded after the results from split is returned.
+   *
+   * @param source byte array to be split.
+   * @param sizes sizes of relative components/segments.
+   * @return source split by this separator as per the sizes specified..
+   */
+  public byte[][] split(byte[] source, int[] sizes) {
+    return split(source, this.bytes, sizes);
+  }
+
+  /**
+   * Splits the source array into multiple array segments using this separator,
+   * as many times as splits are found. This will naturally produce copied byte
+   * arrays for each of the split segments.
+   *
+   * @param source byte array to be split
+   * @return source split by this separator.
+   */
+  public byte[][] split(byte[] source) {
+    return split(source, this.bytes);
+  }
+
+  /**
+   * Returns a list of ranges identifying [start, end) -- closed, open --
+   * positions within the source byte array that would be split using the
+   * separator byte array.
+   * The sizes indicate the sizes of the relative components/segments.
+   * In case one of the segments contains this separator before the specified
+   * size is reached, the separator will be considered part of that segment and
+   * we will continue till size is reached.
+   * Variable length strings cannot contain this separator and are indiced with
+   * a size of {@value #VARIABLE_SIZE}. Such strings are encoded for this
+   * separator and decoded after the results from split is returned.
+   *
+   * @param source the source data
+   * @param separator the separator pattern to look for
+   * @param sizes indicate the sizes of the relative components/segments.
+   * @return a list of ranges.
+   */
+  private static List<Range> splitRanges(byte[] source, byte[] separator,
+      int[] sizes) {
+    List<Range> segments = new ArrayList<Range>();
+    if (source == null || separator == null) {
+      return segments;
+    }
+    // VARIABLE_SIZE here indicates that there is no limit to number of segments
+    // to return.
+    int limit = VARIABLE_SIZE;
+    if (sizes != null && sizes.length > 0) {
+      limit = sizes.length;
+    }
+    int start = 0;
+    int currentSegment = 0;
+    itersource: for (int i = 0; i < source.length; i++) {
+      for (int j = 0; j < separator.length; j++) {
+        if (source[i + j] != separator[j]) {
+          continue itersource;
+        }
+      }
+      // all separator elements matched
+      if (limit > VARIABLE_SIZE) {
+        if (segments.size() >= (limit - 1)) {
+          // everything else goes in one final segment
+          break;
+        }
+        if (sizes != null) {
+          int currentSegExpectedSize = sizes[currentSegment];
+          if (currentSegExpectedSize > VARIABLE_SIZE) {
+            int currentSegSize = i - start;
+            if (currentSegSize < currentSegExpectedSize) {
+              // Segment not yet complete. More bytes to parse.
+              continue itersource;
+            } else if (currentSegSize > currentSegExpectedSize) {
+              // Segment is not as per size.
+              throw new IllegalArgumentException(
+                  "Segments not separated as per expected sizes");
+            }
+          }
+        }
+      }
+      segments.add(new Range(start, i));
+      start = i + separator.length;
+      // i will be incremented again in outer for loop
+      i += separator.length - 1;
+      currentSegment++;
+    }
+    // add in remaining to a final range
+    if (start <= source.length) {
+      if (sizes != null) {
+        // Check if final segment is as per size specified.
+        if (sizes[currentSegment] > VARIABLE_SIZE &&
+            source.length - start > sizes[currentSegment]) {
+          // Segment is not as per size.
+          throw new IllegalArgumentException(
+              "Segments not separated as per expected sizes");
+        }
+      }
+      segments.add(new Range(start, source.length));
+    }
+    return segments;
+  }
+
+  /**
+   * Splits based on segments calculated based on limit/sizes specified for the
+   * separator.
+   *
+   * @param source byte array to be split.
+   * @param segments specifies the range for each segment.
+   * @return a byte[][] split as per the segment ranges.
+   */
+  private static byte[][] split(byte[] source, List<Range> segments) {
+    byte[][] splits = new byte[segments.size()][];
+    for (int i = 0; i < segments.size(); i++) {
+      Range r = segments.get(i);
+      byte[] tmp = new byte[r.length()];
+      if (tmp.length > 0) {
+        System.arraycopy(source, r.start(), tmp, 0, r.length());
+      }
+      splits[i] = tmp;
+    }
+    return splits;
+  }
+
+  /**
+   * Splits the source array into multiple array segments using the given
+   * separator based on the sizes. This will naturally produce copied byte
+   * arrays for each of the split segments.
+   *
+   * @param source source array.
+   * @param separator separator represented as a byte array.
+   * @param sizes sizes of relative components/segments.
+   * @return byte[][] after splitting the source.
+   */
+  private static byte[][] split(byte[] source, byte[] separator, int[] sizes) {
+    List<Range> segments = splitRanges(source, separator, sizes);
+    return split(source, segments);
+  }
+
+  /**
+   * Splits the source array into multiple array segments using the given
+   * separator. This will naturally produce copied byte arrays for each of the
+   * split segments.
+   *
+   * @param source Source array.
+   * @param separator Separator represented as a byte array.
+   * @return byte[][] after splitting the source.
+   */
+  private static byte[][] split(byte[] source, byte[] separator) {
+    return split(source, separator, (int[]) null);
+  }
+
+  /**
+   * Splits the source array into multiple array segments using the given
+   * separator, up to a maximum of count items. This will naturally produce
+   * copied byte arrays for each of the split segments.
+   *
+   * @param source Source array.
+   * @param separator Separator represented as a byte array.
+   * @param limit a non-positive value indicates no limit on number of segments.
+   * @return byte[][] after splitting the input source.
+   */
+  private static byte[][] split(byte[] source, byte[] separator, int limit) {
+    int[] sizes = null;
+    if (limit > VARIABLE_SIZE) {
+      sizes = new int[limit];
+    }
+    List<Range> segments = splitRanges(source, separator, sizes);
+    return split(source, segments);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java
new file mode 100644
index 0000000..282848e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+/**
+ * Encodes and decodes column names / row keys which are merely strings.
+ * Column prefixes are not part of the column name passed for encoding. It is
+ * added later, if required in the associated ColumnPrefix implementations.
+ */
+public final class StringKeyConverter implements KeyConverter<String> {
+
+  public StringKeyConverter() {
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+   * #encode(java.lang.Object)
+   */
+  @Override
+  public byte[] encode(String key) {
+    return Separator.encode(key, Separator.SPACE, Separator.TAB);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+   * #decode(byte[])
+   */
+  @Override
+  public String decode(byte[] bytes) {
+    return Separator.decode(bytes, Separator.TAB, Separator.SPACE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntityFiltersType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntityFiltersType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntityFiltersType.java
new file mode 100644
index 0000000..4099e92
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntityFiltersType.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilter.TimelineFilterType;
+
+/**
+ * Used to define which filter to match.
+ */
+enum TimelineEntityFiltersType {
+  CONFIG {
+    boolean isValidFilter(TimelineFilterType filterType) {
+      return filterType == TimelineFilterType.LIST ||
+          filterType == TimelineFilterType.KEY_VALUE;
+    }
+  },
+  INFO {
+    boolean isValidFilter(TimelineFilterType filterType) {
+      return filterType == TimelineFilterType.LIST ||
+          filterType == TimelineFilterType.KEY_VALUE;
+    }
+  },
+  METRIC {
+    boolean isValidFilter(TimelineFilterType filterType) {
+      return filterType == TimelineFilterType.LIST ||
+          filterType == TimelineFilterType.COMPARE;
+    }
+  },
+  EVENT {
+    boolean isValidFilter(TimelineFilterType filterType) {
+      return filterType == TimelineFilterType.LIST ||
+          filterType == TimelineFilterType.EXISTS;
+    }
+  },
+  IS_RELATED_TO {
+    boolean isValidFilter(TimelineFilterType filterType) {
+      return filterType == TimelineFilterType.LIST ||
+          filterType == TimelineFilterType.KEY_VALUES;
+    }
+  },
+  RELATES_TO {
+    boolean isValidFilter(TimelineFilterType filterType) {
+      return filterType == TimelineFilterType.LIST ||
+          filterType == TimelineFilterType.KEY_VALUES;
+    }
+  };
+
+  /**
+   * Checks whether filter type is valid for the filter being matched.
+   *
+   * @param filterType filter type.
+   * @return true, if its a valid filter, false otherwise.
+   */
+  abstract boolean isValidFilter(TimelineFilterType filterType);
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message