hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hanishakon...@apache.org
Subject [17/50] [abbrv] hadoop git commit: YARN-7919. Refactor timelineservice-hbase module into submodules. Contributed by Haibo Chen.
Date Fri, 23 Feb 2018 19:44:58 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnRWHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnRWHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnRWHelper.java
new file mode 100644
index 0000000..a8e5149
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnRWHelper.java
@@ -0,0 +1,487 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * A set of utility functions that read or read to a column.
+ * This class is meant to be used only by explicit Columns,
+ * and not directly to write by clients.
+ */
+public final class ColumnRWHelper {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ColumnHelper.class);
+
+  private ColumnRWHelper() {
+  }
+
+  /**
+   * Figures out the cell timestamp used in the Put For storing.
+   * Will supplement the timestamp if required. Typically done for flow run
+   * table.If we supplement the timestamp, we left shift the timestamp and
+   * supplement it with the AppId id so that there are no collisions in the flow
+   * run table's cells.
+   */
+  private static long getPutTimestamp(
+      Long timestamp, boolean supplementTs, Attribute[] attributes) {
+    if (timestamp == null) {
+      timestamp = System.currentTimeMillis();
+    }
+    if (!supplementTs) {
+      return timestamp;
+    } else {
+      String appId = getAppIdFromAttributes(attributes);
+      long supplementedTS = TimestampGenerator.getSupplementedTimestamp(
+          timestamp, appId);
+      return supplementedTS;
+    }
+  }
+
+  private static String getAppIdFromAttributes(Attribute[] attributes) {
+    if (attributes == null) {
+      return null;
+    }
+    String appId = null;
+    for (Attribute attribute : attributes) {
+      if (AggregationCompactionDimension.APPLICATION_ID.toString().equals(
+          attribute.getName())) {
+        appId = Bytes.toString(attribute.getValue());
+      }
+    }
+    return appId;
+  }
+
+  /**
+   * Sends a Mutation to the table. The mutations will be buffered and sent over
+   * the wire as part of a batch.
+   *
+   * @param rowKey
+   *          identifying the row to write. Nothing gets written when null.
+   * @param tableMutator
+   *          used to modify the underlying HBase table
+   * @param column the column that is to be modified
+   * @param timestamp
+   *          version timestamp. When null the current timestamp multiplied with
+   *          TimestampGenerator.TS_MULTIPLIER and added with last 3 digits of
+   *          app id will be used
+   * @param inputValue
+   *          the value to write to the rowKey and column qualifier. Nothing
+   *          gets written when null.
+   * @param attributes Attributes to be set for HBase Put.
+   * @throws IOException if any problem occurs during store operation(sending
+   *          mutation to table).
+   */
+  public static void store(byte[] rowKey, TypedBufferedMutator<?> tableMutator,
+                           Column<?> column, Long timestamp,
+                           Object inputValue, Attribute... attributes)
+      throws IOException {
+    store(rowKey, tableMutator, column.getColumnFamilyBytes(),
+        column.getColumnQualifierBytes(), timestamp,
+        column.supplementCellTimestamp(), inputValue,
+        column.getValueConverter(),
+        column.getCombinedAttrsWithAggr(attributes));
+  }
+
+  /**
+   * Sends a Mutation to the table. The mutations will be buffered and sent over
+   * the wire as part of a batch.
+   *
+   * @param rowKey
+   *          identifying the row to write. Nothing gets written when null.
+   * @param tableMutator
+   *          used to modify the underlying HBase table
+   * @param columnFamilyBytes
+   * @param columnQualifier
+   *          column qualifier. Nothing gets written when null.
+   * @param timestamp
+   *          version timestamp. When null the current timestamp multiplied with
+   *          TimestampGenerator.TS_MULTIPLIER and added with last 3 digits of
+   *          app id will be used
+   * @param inputValue
+   *          the value to write to the rowKey and column qualifier. Nothing
+   *          gets written when null.
+   * @param converter
+   * @param attributes Attributes to be set for HBase Put.
+   * @throws IOException if any problem occurs during store operation(sending
+   *          mutation to table).
+   */
+  public static void store(byte[] rowKey, TypedBufferedMutator<?> tableMutator,
+      byte[] columnFamilyBytes, byte[] columnQualifier, Long timestamp,
+      boolean supplementTs, Object inputValue, ValueConverter converter,
+      Attribute... attributes) throws IOException {
+    if ((rowKey == null) || (columnQualifier == null) || (inputValue == null)) {
+      return;
+    }
+    Put p = new Put(rowKey);
+    timestamp = getPutTimestamp(timestamp, supplementTs, attributes);
+    p.addColumn(columnFamilyBytes, columnQualifier, timestamp,
+        converter.encodeValue(inputValue));
+    if ((attributes != null) && (attributes.length > 0)) {
+      for (Attribute attribute : attributes) {
+        p.setAttribute(attribute.getName(), attribute.getValue());
+      }
+    }
+    tableMutator.mutate(p);
+  }
+
+  /**
+   * Get the latest version of this specified column. Note: this call clones the
+   * value content of the hosting {@link org.apache.hadoop.hbase.Cell Cell}.
+   *
+   * @param result from which to read the value. Cannot be null
+   * @param columnFamilyBytes
+   * @param columnQualifierBytes referring to the column to be read.
+   * @param converter
+   * @return latest version of the specified column of whichever object was
+   *         written.
+   * @throws IOException if any problem occurs while reading result.
+   */
+  public static Object readResult(Result result, byte[] columnFamilyBytes,
+      byte[] columnQualifierBytes, ValueConverter converter)
+      throws IOException {
+    if (result == null || columnQualifierBytes == null) {
+      return null;
+    }
+
+    // Would have preferred to be able to use getValueAsByteBuffer and get a
+    // ByteBuffer to avoid copy, but GenericObjectMapper doesn't seem to like
+    // that.
+    byte[] value = result.getValue(columnFamilyBytes, columnQualifierBytes);
+    return converter.decodeValue(value);
+  }
+
+  /**
+   * Get the latest version of this specified column. Note: this call clones the
+   * value content of the hosting {@link org.apache.hadoop.hbase.Cell Cell}.
+   *
+   * @param result from which to read the value. Cannot be null
+   * @param column the column that the result can be parsed to
+   * @return latest version of the specified column of whichever object was
+   *         written.
+   * @throws IOException if any problem occurs while reading result.
+   */
+  public static Object readResult(Result result, Column<?> column)
+      throws IOException {
+    return readResult(result, column.getColumnFamilyBytes(),
+        column.getColumnQualifierBytes(), column.getValueConverter());
+  }
+
+  /**
+   * Get the latest version of this specified column. Note: this call clones the
+   * value content of the hosting {@link org.apache.hadoop.hbase.Cell Cell}.
+   *
+   * @param result Cannot be null
+   * @param columnPrefix column prefix to read from
+   * @param qualifier column qualifier. Nothing gets read when null.
+   * @return result object (can be cast to whatever object was written to) or
+   *         null when specified column qualifier for this prefix doesn't exist
+   *         in the result.
+   * @throws IOException if there is any exception encountered while reading
+   *     result.
+   */
+  public static Object readResult(Result result, ColumnPrefix<?> columnPrefix,
+                                  String qualifier) throws IOException {
+    byte[] columnQualifier = ColumnHelper.getColumnQualifier(
+        columnPrefix.getColumnPrefixInBytes(), qualifier);
+
+    return readResult(
+        result, columnPrefix.getColumnFamilyBytes(),
+        columnQualifier, columnPrefix.getValueConverter());
+  }
+
+  /**
+   *
+   * @param <K> identifies the type of key converter.
+   * @param result from which to read columns.
+   * @param keyConverter used to convert column bytes to the appropriate key
+   *          type
+   * @return the latest values of columns in the column family with this prefix
+   *         (or all of them if the prefix value is null).
+   * @throws IOException if there is any exception encountered while reading
+   *           results.
+   */
+  public static <K> Map<K, Object> readResults(Result result,
+      ColumnPrefix<?> columnPrefix, KeyConverter<K> keyConverter)
+      throws IOException {
+    return readResults(result,
+        columnPrefix.getColumnFamilyBytes(),
+        columnPrefix.getColumnPrefixInBytes(),
+        keyConverter, columnPrefix.getValueConverter());
+  }
+
+  /**
+   * @param result from which to reads data with timestamps.
+   * @param <K> identifies the type of key converter.
+   * @param <V> the type of the values. The values will be cast into that type.
+   * @param keyConverter used to convert column bytes to the appropriate key
+   *     type.
+   * @return the cell values at each respective time in for form
+   *         {@literal {idA={timestamp1->value1}, idA={timestamp2->value2},
+   *         idB={timestamp3->value3}, idC={timestamp1->value4}}}
+   * @throws IOException if there is any exception encountered while reading
+   *     result.
+   */
+  public static <K, V> NavigableMap<K, NavigableMap<Long, V>>
+      readResultsWithTimestamps(Result result, ColumnPrefix<?> columnPrefix,
+      KeyConverter<K> keyConverter) throws IOException {
+    return readResultsWithTimestamps(result,
+        columnPrefix.getColumnFamilyBytes(),
+        columnPrefix.getColumnPrefixInBytes(),
+        keyConverter, columnPrefix.getValueConverter(),
+        columnPrefix.supplementCellTimeStamp());
+  }
+
+  /**
+   * @param result from which to reads data with timestamps
+   * @param columnPrefixBytes optional prefix to limit columns. If null all
+   *          columns are returned.
+   * @param <K> identifies the type of column name(indicated by type of key
+   *     converter).
+   * @param <V> the type of the values. The values will be cast into that type.
+   * @param keyConverter used to convert column bytes to the appropriate key
+   *     type.
+   * @return the cell values at each respective time in for form
+   *         {@literal {idA={timestamp1->value1}, idA={timestamp2->value2},
+   *         idB={timestamp3->value3}, idC={timestamp1->value4}}}
+   * @throws IOException if any problem occurs while reading results.
+   */
+  @SuppressWarnings("unchecked")
+  public static <K, V> NavigableMap<K, NavigableMap<Long, V>>
+      readResultsWithTimestamps(Result result, byte[] columnFamilyBytes,
+          byte[] columnPrefixBytes, KeyConverter<K> keyConverter,
+          ValueConverter valueConverter, boolean supplementTs)
+      throws IOException {
+
+    NavigableMap<K, NavigableMap<Long, V>> results = new TreeMap<>();
+
+    if (result != null) {
+      NavigableMap<
+          byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> resultMap =
+          result.getMap();
+
+      NavigableMap<byte[], NavigableMap<Long, byte[]>> columnCellMap =
+          resultMap.get(columnFamilyBytes);
+      // could be that there is no such column family.
+      if (columnCellMap != null) {
+        for (Map.Entry<byte[], NavigableMap<Long, byte[]>> entry : columnCellMap
+            .entrySet()) {
+          K converterColumnKey = null;
+          if (columnPrefixBytes == null) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("null prefix was specified; returning all columns");
+            }
+            try {
+              converterColumnKey = keyConverter.decode(entry.getKey());
+            } catch (IllegalArgumentException iae) {
+              LOG.error("Illegal column found, skipping this column.", iae);
+              continue;
+            }
+          } else {
+            // A non-null prefix means columns are actually of the form
+            // prefix!columnNameRemainder
+            byte[][] columnNameParts =
+                Separator.QUALIFIERS.split(entry.getKey(), 2);
+            byte[] actualColumnPrefixBytes = columnNameParts[0];
+            if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes)
+                && columnNameParts.length == 2) {
+              try {
+                // This is the prefix that we want
+                converterColumnKey = keyConverter.decode(columnNameParts[1]);
+              } catch (IllegalArgumentException iae) {
+                LOG.error("Illegal column found, skipping this column.", iae);
+                continue;
+              }
+            }
+          }
+
+          // If this column has the prefix we want
+          if (converterColumnKey != null) {
+            NavigableMap<Long, V> cellResults =
+                new TreeMap<Long, V>();
+            NavigableMap<Long, byte[]> cells = entry.getValue();
+            if (cells != null) {
+              for (Map.Entry<Long, byte[]> cell : cells.entrySet()) {
+                V value =
+                    (V) valueConverter.decodeValue(cell.getValue());
+                Long ts = supplementTs ? TimestampGenerator.
+                    getTruncatedTimestamp(cell.getKey()) : cell.getKey();
+                cellResults.put(ts, value);
+              }
+            }
+            results.put(converterColumnKey, cellResults);
+          }
+        } // for entry : columnCellMap
+      } // if columnCellMap != null
+    } // if result != null
+    return results;
+  }
+
+  /**
+   * @param <K> identifies the type of column name(indicated by type of key
+   *     converter).
+   * @param result from which to read columns
+   * @param columnPrefixBytes optional prefix to limit columns. If null all
+   *        columns are returned.
+   * @param keyConverter used to convert column bytes to the appropriate key
+   *          type.
+   * @return the latest values of columns in the column family. If the column
+   *         prefix is null, the column qualifier is returned as Strings. For a
+   *         non-null column prefix bytes, the column qualifier is returned as
+   *         a list of parts, each part a byte[]. This is to facilitate
+   *         returning byte arrays of values that were not Strings.
+   * @throws IOException if any problem occurs while reading results.
+   */
+  public static <K> Map<K, Object> readResults(Result result,
+      byte[] columnFamilyBytes, byte[] columnPrefixBytes,
+      KeyConverter<K> keyConverter, ValueConverter valueConverter)
+      throws IOException {
+    Map<K, Object> results = new HashMap<K, Object>();
+
+    if (result != null) {
+      Map<byte[], byte[]> columns = result.getFamilyMap(columnFamilyBytes);
+      for (Map.Entry<byte[], byte[]> entry : columns.entrySet()) {
+        byte[] columnKey = entry.getKey();
+        if (columnKey != null && columnKey.length > 0) {
+
+          K converterColumnKey = null;
+          if (columnPrefixBytes == null) {
+            try {
+              converterColumnKey = keyConverter.decode(columnKey);
+            } catch (IllegalArgumentException iae) {
+              LOG.error("Illegal column found, skipping this column.", iae);
+              continue;
+            }
+          } else {
+            // A non-null prefix means columns are actually of the form
+            // prefix!columnNameRemainder
+            byte[][] columnNameParts = Separator.QUALIFIERS.split(columnKey, 2);
+            if (columnNameParts.length > 0) {
+              byte[] actualColumnPrefixBytes = columnNameParts[0];
+              // If this is the prefix that we want
+              if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes)
+                  && columnNameParts.length == 2) {
+                try {
+                  converterColumnKey = keyConverter.decode(columnNameParts[1]);
+                } catch (IllegalArgumentException iae) {
+                  LOG.error("Illegal column found, skipping this column.", iae);
+                  continue;
+                }
+              }
+            }
+          } // if-else
+
+          // If the columnPrefix is null (we want all columns), or the actual
+          // prefix matches the given prefix we want this column
+          if (converterColumnKey != null) {
+            Object value = valueConverter.decodeValue(entry.getValue());
+            // we return the columnQualifier in parts since we don't know
+            // which part is of which data type.
+            results.put(converterColumnKey, value);
+          }
+        }
+      } // for entry
+    }
+    return results;
+  }
+
+  /**
+   * Sends a Mutation to the table. The mutations will be buffered and sent over
+   * the wire as part of a batch.
+   *
+   * @param rowKey identifying the row to write. Nothing gets written when null.
+   * @param tableMutator used to modify the underlying HBase table. Caller is
+   *          responsible to pass a mutator for the table that actually has this
+   *          column.
+   * @param qualifier column qualifier. Nothing gets written when null.
+   * @param timestamp version timestamp. When null the server timestamp will be
+   *          used.
+   * @param attributes attributes for the mutation that are used by the
+   *          coprocessor to set/read the cell tags.
+   * @param inputValue the value to write to the rowKey and column qualifier.
+   *          Nothing gets written when null.
+   * @throws IOException if there is any exception encountered while doing
+   *     store operation(sending mutation to the table).
+   */
+  public static void store(byte[] rowKey, TypedBufferedMutator<?> tableMutator,
+             ColumnPrefix<?> columnPrefix, byte[] qualifier, Long timestamp,
+             Object inputValue, Attribute... attributes) throws IOException {
+    // Null check
+    if (qualifier == null) {
+      throw new IOException("Cannot store column with null qualifier in "
+          +tableMutator.getName().getNameAsString());
+    }
+
+    byte[] columnQualifier = columnPrefix.getColumnPrefixBytes(qualifier);
+    Attribute[] combinedAttributes =
+        columnPrefix.getCombinedAttrsWithAggr(attributes);
+
+    store(rowKey, tableMutator, columnPrefix.getColumnFamilyBytes(),
+        columnQualifier, timestamp, columnPrefix.supplementCellTimeStamp(),
+        inputValue, columnPrefix.getValueConverter(), combinedAttributes);
+  }
+
+  /**
+   * Sends a Mutation to the table. The mutations will be buffered and sent over
+   * the wire as part of a batch.
+   *
+   * @param rowKey identifying the row to write. Nothing gets written when null.
+   * @param tableMutator used to modify the underlying HBase table. Caller is
+   *          responsible to pass a mutator for the table that actually has this
+   *          column.
+   * @param qualifier column qualifier. Nothing gets written when null.
+   * @param timestamp version timestamp. When null the server timestamp will be
+   *          used.
+   * @param attributes attributes for the mutation that are used by the
+   *          coprocessor to set/read the cell tags.
+   * @param inputValue the value to write to the rowKey and column qualifier.
+   *          Nothing gets written when null.
+   * @throws IOException if there is any exception encountered while doing
+   *     store operation(sending mutation to the table).
+   */
+  public static void store(byte[] rowKey, TypedBufferedMutator<?> tableMutator,
+             ColumnPrefix<?> columnPrefix, String qualifier, Long timestamp,
+             Object inputValue, Attribute... attributes) throws IOException {
+    // Null check
+    if (qualifier == null) {
+      throw new IOException("Cannot store column with null qualifier in "
+          + tableMutator.getName().getNameAsString());
+    }
+
+    byte[] columnQualifier = columnPrefix.getColumnPrefixBytes(qualifier);
+    Attribute[] combinedAttributes =
+        columnPrefix.getCombinedAttrsWithAggr(attributes);
+
+    store(rowKey, tableMutator, columnPrefix.getColumnFamilyBytes(),
+        columnQualifier, timestamp, columnPrefix.supplementCellTimeStamp(),
+        inputValue, columnPrefix.getValueConverter(), combinedAttributes);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java
new file mode 100644
index 0000000..f4cd6fb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Query;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A bunch of utility functions used in HBase TimelineService backend.
+ */
+public final class HBaseTimelineStorageUtils {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(HBaseTimelineStorageUtils.class);
+
+  private HBaseTimelineStorageUtils() {
+  }
+
+
+  /**
+   * @param conf YARN configuration. Used to see if there is an explicit config
+   *          pointing to the HBase config file to read. It should not be null
+   *          or a NullPointerException will be thrown.
+   * @return a configuration with the HBase configuration from the classpath,
+   *         optionally overwritten by the timeline service configuration URL if
+   *         specified.
+   * @throws MalformedURLException if a timeline service HBase configuration URL
+   *           is specified but is a malformed URL.
+   */
+  public static Configuration getTimelineServiceHBaseConf(Configuration conf)
+      throws MalformedURLException {
+    if (conf == null) {
+      throw new NullPointerException();
+    }
+
+    Configuration hbaseConf;
+    String timelineServiceHBaseConfFileURL =
+        conf.get(YarnConfiguration.TIMELINE_SERVICE_HBASE_CONFIGURATION_FILE);
+    if (timelineServiceHBaseConfFileURL != null
+        && timelineServiceHBaseConfFileURL.length() > 0) {
+      LOG.info("Using hbase configuration at " +
+          timelineServiceHBaseConfFileURL);
+      // create a clone so that we don't mess with out input one
+      hbaseConf = new Configuration(conf);
+      Configuration plainHBaseConf = new Configuration(false);
+      URL hbaseSiteXML = new URL(timelineServiceHBaseConfFileURL);
+      plainHBaseConf.addResource(hbaseSiteXML);
+      HBaseConfiguration.merge(hbaseConf, plainHBaseConf);
+    } else {
+      // default to what is on the classpath
+      hbaseConf = HBaseConfiguration.create(conf);
+    }
+    return hbaseConf;
+  }
+
+  /**
+   * Given a row key prefix stored in a byte array, return a byte array for its
+   * immediate next row key.
+   *
+   * @param rowKeyPrefix The provided row key prefix, represented in an array.
+   * @return the closest next row key of the provided row key.
+   */
+  public static byte[] calculateTheClosestNextRowKeyForPrefix(
+      byte[] rowKeyPrefix) {
+    // Essentially we are treating it like an 'unsigned very very long' and
+    // doing +1 manually.
+    // Search for the place where the trailing 0xFFs start
+    int offset = rowKeyPrefix.length;
+    while (offset > 0) {
+      if (rowKeyPrefix[offset - 1] != (byte) 0xFF) {
+        break;
+      }
+      offset--;
+    }
+
+    if (offset == 0) {
+      // We got an 0xFFFF... (only FFs) stopRow value which is
+      // the last possible prefix before the end of the table.
+      // So set it to stop at the 'end of the table'
+      return HConstants.EMPTY_END_ROW;
+    }
+
+    // Copy the right length of the original
+    byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset);
+    // And increment the last one
+    newStopRow[newStopRow.length - 1]++;
+    return newStopRow;
+  }
+
+  public static void setMetricsTimeRange(Query query, byte[] metricsCf,
+      long tsBegin, long tsEnd) {
+    if (tsBegin != 0 || tsEnd != Long.MAX_VALUE) {
+      query.setColumnFamilyTimeRange(metricsCf,
+          tsBegin, ((tsEnd == Long.MAX_VALUE) ? Long.MAX_VALUE : (tsEnd + 1)));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java
new file mode 100644
index 0000000..8e6c259
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * contains the constants used in the context of schema accesses for
+ * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
+ * information.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class TimelineHBaseSchemaConstants {
+  private TimelineHBaseSchemaConstants() {
+  }
+
+  /**
+   * Used to create a pre-split for tables starting with a username in the
+   * prefix. TODO: this may have to become a config variable (string with
+   * separators) so that different installations can presplit based on their own
+   * commonly occurring names.
+   */
+  private final static byte[][] USERNAME_SPLITS = {
+      Bytes.toBytes("a"), Bytes.toBytes("ad"), Bytes.toBytes("an"),
+      Bytes.toBytes("b"), Bytes.toBytes("ca"), Bytes.toBytes("cl"),
+      Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"),
+      Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i"),
+      Bytes.toBytes("j"), Bytes.toBytes("k"), Bytes.toBytes("l"),
+      Bytes.toBytes("m"), Bytes.toBytes("n"), Bytes.toBytes("o"),
+      Bytes.toBytes("q"), Bytes.toBytes("r"), Bytes.toBytes("s"),
+      Bytes.toBytes("se"), Bytes.toBytes("t"), Bytes.toBytes("u"),
+      Bytes.toBytes("v"), Bytes.toBytes("w"), Bytes.toBytes("x"),
+      Bytes.toBytes("y"), Bytes.toBytes("z")
+  };
+
+  /**
+   * The length at which keys auto-split.
+   */
+  public static final String USERNAME_SPLIT_KEY_PREFIX_LENGTH = "4";
+
+  /**
+   * @return splits for splits where a user is a prefix.
+   */
+  public static byte[][] getUsernameSplits() {
+    byte[][] kloon = USERNAME_SPLITS.clone();
+    // Deep copy.
+    for (int row = 0; row < USERNAME_SPLITS.length; row++) {
+      kloon[row] = Bytes.copy(USERNAME_SPLITS[row]);
+    }
+    return kloon;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TypedBufferedMutator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TypedBufferedMutator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TypedBufferedMutator.java
new file mode 100644
index 0000000..29a07e4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TypedBufferedMutator.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Mutation;
+
+/**
+ * To be used to wrap an actual {@link BufferedMutator} in a type safe manner.
+ *
+ * @param <T> The class referring to the table to be written to.
+ */
+public class TypedBufferedMutator<T extends BaseTable<T>> {
+
+  private final BufferedMutator bufferedMutator;
+
+  /**
+   * @param bufferedMutator the mutator to be wrapped for delegation. Shall not
+   *          be null.
+   */
+  public TypedBufferedMutator(BufferedMutator bufferedMutator) {
+    this.bufferedMutator = bufferedMutator;
+  }
+
+  public TableName getName() {
+    return bufferedMutator.getName();
+  }
+
+  public Configuration getConfiguration() {
+    return bufferedMutator.getConfiguration();
+  }
+
+  public void mutate(Mutation mutation) throws IOException {
+    bufferedMutator.mutate(mutation);
+  }
+
+  public void mutate(List<? extends Mutation> mutations) throws IOException {
+    bufferedMutator.mutate(mutations);
+  }
+
+  public void close() throws IOException {
+    bufferedMutator.close();
+  }
+
+  public void flush() throws IOException {
+    bufferedMutator.flush();
+  }
+
+  public long getWriteBufferSize() {
+    return bufferedMutator.getWriteBufferSize();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java
new file mode 100644
index 0000000..0df5b8a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.storage.common contains
+ * a set of utility classes used across backend storage reader and writer.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTableRW.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTableRW.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTableRW.java
new file mode 100644
index 0000000..111ae71
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTableRW.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Create, read and write to the Entity Table.
+ */
+public class EntityTableRW extends BaseTableRW<EntityTable> {
+  /** entity prefix. */
+  private static final String PREFIX =
+      YarnConfiguration.TIMELINE_SERVICE_PREFIX + "entity";
+
+  /** config param name that specifies the entity table name. */
+  public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
+
+  /**
+   * config param name that specifies the TTL for metrics column family in
+   * entity table.
+   */
+  private static final String METRICS_TTL_CONF_NAME = PREFIX
+      + ".table.metrics.ttl";
+
+  /**
+   * config param name that specifies max-versions for metrics column family in
+   * entity table.
+   */
+  private static final String METRICS_MAX_VERSIONS =
+      PREFIX + ".table.metrics.max-versions";
+
+  /** default value for entity table name. */
+  public static final String DEFAULT_TABLE_NAME = "timelineservice.entity";
+
+  /** default TTL is 30 days for metrics timeseries. */
+  private static final int DEFAULT_METRICS_TTL = 2592000;
+
+  /** default max number of versions. */
+  private static final int DEFAULT_METRICS_MAX_VERSIONS = 10000;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(EntityTableRW.class);
+
+  public EntityTableRW() {
+    super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTableRW#
+   * createTable(org.apache.hadoop.hbase.client.Admin,
+   * org.apache.hadoop.conf.Configuration)
+   */
+  public void createTable(Admin admin, Configuration hbaseConf)
+      throws IOException {
+
+    TableName table = getTableName(hbaseConf);
+    if (admin.tableExists(table)) {
+      // do not disable / delete existing table
+      // similar to the approach taken by map-reduce jobs when
+      // output directory exists
+      throw new IOException("Table " + table.getNameAsString()
+          + " already exists.");
+    }
+
+    HTableDescriptor entityTableDescp = new HTableDescriptor(table);
+    HColumnDescriptor infoCF =
+        new HColumnDescriptor(EntityColumnFamily.INFO.getBytes());
+    infoCF.setBloomFilterType(BloomType.ROWCOL);
+    entityTableDescp.addFamily(infoCF);
+
+    HColumnDescriptor configCF =
+        new HColumnDescriptor(EntityColumnFamily.CONFIGS.getBytes());
+    configCF.setBloomFilterType(BloomType.ROWCOL);
+    configCF.setBlockCacheEnabled(true);
+    entityTableDescp.addFamily(configCF);
+
+    HColumnDescriptor metricsCF =
+        new HColumnDescriptor(EntityColumnFamily.METRICS.getBytes());
+    entityTableDescp.addFamily(metricsCF);
+    metricsCF.setBlockCacheEnabled(true);
+    // always keep 1 version (the latest)
+    metricsCF.setMinVersions(1);
+    metricsCF.setMaxVersions(
+        hbaseConf.getInt(METRICS_MAX_VERSIONS, DEFAULT_METRICS_MAX_VERSIONS));
+    metricsCF.setTimeToLive(hbaseConf.getInt(METRICS_TTL_CONF_NAME,
+        DEFAULT_METRICS_TTL));
+    entityTableDescp.setRegionSplitPolicyClassName(
+        "org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
+    entityTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
+        TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
+    admin.createTable(entityTableDescp,
+        TimelineHBaseSchemaConstants.getUsernameSplits());
+    LOG.info("Status of table creation for " + table.getNameAsString() + "="
+        + admin.tableExists(table));
+  }
+
+  /**
+   * @param metricsTTL time to live parameter for the metricss in this table.
+   * @param hbaseConf configururation in which to set the metrics TTL config
+   *          variable.
+   */
+  public void setMetricsTTL(int metricsTTL, Configuration hbaseConf) {
+    hbaseConf.setInt(METRICS_TTL_CONF_NAME, metricsTTL);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/package-info.java
new file mode 100644
index 0000000..bb0e331
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.storage.entity
+ * contains classes related to implementation for entity table.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTableRW.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTableRW.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTableRW.java
new file mode 100644
index 0000000..5b9fe13
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTableRW.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Create, read and write to the FlowActivity Table.
+ */
+public class FlowActivityTableRW extends BaseTableRW<FlowActivityTable> {
+  /** flow activity table prefix. */
+  private static final String PREFIX =
+      YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".flowactivity";
+
+  /** config param name that specifies the flowactivity table name. */
+  public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
+
+  /** default value for flowactivity table name. */
+  public static final String DEFAULT_TABLE_NAME =
+      "timelineservice.flowactivity";
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(FlowActivityTableRW.class);
+
+  /** default max number of versions. */
+  public static final int DEFAULT_METRICS_MAX_VERSIONS = Integer.MAX_VALUE;
+
+  public FlowActivityTableRW() {
+    super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTableRW#
+   * createTable(org.apache.hadoop.hbase.client.Admin,
+   * org.apache.hadoop.conf.Configuration)
+   */
+  public void createTable(Admin admin, Configuration hbaseConf)
+      throws IOException {
+
+    TableName table = getTableName(hbaseConf);
+    if (admin.tableExists(table)) {
+      // do not disable / delete existing table
+      // similar to the approach taken by map-reduce jobs when
+      // output directory exists
+      throw new IOException("Table " + table.getNameAsString()
+          + " already exists.");
+    }
+
+    HTableDescriptor flowActivityTableDescp = new HTableDescriptor(table);
+    HColumnDescriptor infoCF =
+        new HColumnDescriptor(FlowActivityColumnFamily.INFO.getBytes());
+    infoCF.setBloomFilterType(BloomType.ROWCOL);
+    flowActivityTableDescp.addFamily(infoCF);
+    infoCF.setMinVersions(1);
+    infoCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS);
+
+    // TODO: figure the split policy before running in production
+    admin.createTable(flowActivityTableDescp);
+    LOG.info("Status of table creation for " + table.getNameAsString() + "="
+        + admin.tableExists(table));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTableRW.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTableRW.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTableRW.java
new file mode 100644
index 0000000..61c0734
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTableRW.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Coprocessor;
+
+/**
+ * Create, read and write to the FlowRun table.
+ */
+public class FlowRunTableRW extends BaseTableRW<FlowRunTable> {
+  /** entity prefix. */
+  private static final String PREFIX =
+      YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".flowrun";
+
+  /** config param name that specifies the flowrun table name. */
+  public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
+
+  /** default value for flowrun table name. */
+  public static final String DEFAULT_TABLE_NAME = "timelineservice.flowrun";
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(FlowRunTableRW.class);
+
+  /** default max number of versions. */
+  public static final int DEFAULT_METRICS_MAX_VERSIONS = Integer.MAX_VALUE;
+
+  public FlowRunTableRW() {
+    super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTableRW#
+   * createTable(org.apache.hadoop.hbase.client.Admin,
+   * org.apache.hadoop.conf.Configuration)
+   */
+  public void createTable(Admin admin, Configuration hbaseConf)
+      throws IOException {
+
+    TableName table = getTableName(hbaseConf);
+    if (admin.tableExists(table)) {
+      // do not disable / delete existing table
+      // similar to the approach taken by map-reduce jobs when
+      // output directory exists
+      throw new IOException("Table " + table.getNameAsString()
+          + " already exists.");
+    }
+
+    HTableDescriptor flowRunTableDescp = new HTableDescriptor(table);
+    HColumnDescriptor infoCF =
+        new HColumnDescriptor(FlowRunColumnFamily.INFO.getBytes());
+    infoCF.setBloomFilterType(BloomType.ROWCOL);
+    flowRunTableDescp.addFamily(infoCF);
+    infoCF.setMinVersions(1);
+    infoCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS);
+
+    // TODO: figure the split policy
+    String coprocessorJarPathStr = hbaseConf.get(
+        YarnConfiguration.FLOW_RUN_COPROCESSOR_JAR_HDFS_LOCATION,
+        YarnConfiguration.DEFAULT_HDFS_LOCATION_FLOW_RUN_COPROCESSOR_JAR);
+
+    Path coprocessorJarPath = new Path(coprocessorJarPathStr);
+    LOG.info("CoprocessorJarPath=" + coprocessorJarPath.toString());
+    flowRunTableDescp.addCoprocessor(
+        "org.apache.hadoop.yarn.server.timelineservice.storage." +
+            "flow.FlowRunCoprocessor", coprocessorJarPath,
+        Coprocessor.PRIORITY_USER, null);
+    admin.createTable(flowRunTableDescp);
+    LOG.info("Status of table creation for " + table.getNameAsString() + "="
+        + admin.tableExists(table));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java
new file mode 100644
index 0000000..04963f3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.storage.flow
+ * contains classes related to implementation for flow related tables, viz. flow
+ * run table and flow activity table.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java
new file mode 100644
index 0000000..e78db2a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.storage contains
+ * classes which define and implement reading and writing to backend storage.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java
new file mode 100644
index 0000000..0956f1e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnRWHelper;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+
+/**
+ * The base class for reading timeline data from the HBase storage. This class
+ * provides basic support to validate and augment reader context.
+ */
+public abstract class AbstractTimelineStorageReader {
+
+  private final TimelineReaderContext context;
+  /**
+   * Used to look up the flow context.
+   */
+  private final AppToFlowTableRW appToFlowTable = new AppToFlowTableRW();
+
+  public AbstractTimelineStorageReader(TimelineReaderContext ctxt) {
+    context = ctxt;
+  }
+
+  protected TimelineReaderContext getContext() {
+    return context;
+  }
+
+  /**
+   * Looks up flow context from AppToFlow table.
+   *
+   * @param appToFlowRowKey to identify Cluster and App Ids.
+   * @param clusterId the cluster id.
+   * @param hbaseConf HBase configuration.
+   * @param conn HBase Connection.
+   * @return flow context information.
+   * @throws IOException if any problem occurs while fetching flow information.
+   */
+  protected FlowContext lookupFlowContext(AppToFlowRowKey appToFlowRowKey,
+      String clusterId, Configuration hbaseConf, Connection conn)
+      throws IOException {
+    byte[] rowKey = appToFlowRowKey.getRowKey();
+    Get get = new Get(rowKey);
+    Result result = appToFlowTable.getResult(hbaseConf, conn, get);
+    if (result != null && !result.isEmpty()) {
+      Object flowName = ColumnRWHelper.readResult(
+          result, AppToFlowColumnPrefix.FLOW_NAME, clusterId);
+      Object flowRunId = ColumnRWHelper.readResult(
+          result, AppToFlowColumnPrefix.FLOW_RUN_ID, clusterId);
+      Object userId = ColumnRWHelper.readResult(
+          result, AppToFlowColumnPrefix.USER_ID, clusterId);
+      if (flowName == null || userId == null || flowRunId == null) {
+        throw new NotFoundException(
+            "Unable to find the context flow name, and flow run id, "
+            + "and user id for clusterId=" + clusterId
+            + ", appId=" + appToFlowRowKey.getAppId());
+      }
+      return new FlowContext((String)userId, (String)flowName,
+          ((Number)flowRunId).longValue());
+    } else {
+      throw new NotFoundException(
+          "Unable to find the context flow name, and flow run id, "
+          + "and user id for clusterId=" + clusterId
+          + ", appId=" + appToFlowRowKey.getAppId());
+    }
+  }
+
+  /**
+    * Sets certain parameters to defaults if the values are not provided.
+    *
+    * @param hbaseConf HBase Configuration.
+    * @param conn HBase Connection.
+    * @throws IOException if any exception is encountered while setting params.
+    */
+  protected void augmentParams(Configuration hbaseConf, Connection conn)
+      throws IOException {
+    defaultAugmentParams(hbaseConf, conn);
+  }
+
+  /**
+   * Default behavior for all timeline readers to augment parameters.
+   *
+   * @param hbaseConf HBase Configuration.
+   * @param conn HBase Connection.
+   * @throws IOException if any exception is encountered while setting params.
+   */
+  final protected void defaultAugmentParams(Configuration hbaseConf,
+      Connection conn) throws IOException {
+    // In reality all three should be null or neither should be null
+    if (context.getFlowName() == null || context.getFlowRunId() == null
+        || context.getUserId() == null) {
+      // Get flow context information from AppToFlow table.
+      AppToFlowRowKey appToFlowRowKey =
+          new AppToFlowRowKey(context.getAppId());
+      FlowContext flowContext =
+          lookupFlowContext(appToFlowRowKey, context.getClusterId(), hbaseConf,
+          conn);
+      context.setFlowName(flowContext.flowName);
+      context.setFlowRunId(flowContext.flowRunId);
+      context.setUserId(flowContext.userId);
+    }
+  }
+
+  /**
+   * Validates the required parameters to read the entities.
+   */
+  protected abstract void validateParams();
+
+  /**
+   * Encapsulates flow context information.
+   */
+  protected static class FlowContext {
+    private final String userId;
+    private final String flowName;
+    private final Long flowRunId;
+
+    public FlowContext(String user, String flowName, Long flowRunId) {
+      this.userId = user;
+      this.flowName = flowName;
+      this.flowRunId = flowRunId;
+    }
+
+    protected String getUserId() {
+      return userId;
+    }
+
+    protected String getFlowName() {
+      return flowName;
+    }
+
+    protected Long getFlowRunId() {
+      return flowRunId;
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message