hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sj...@apache.org
Subject [25/28] hadoop git commit: YARN-5928. Move ATSv2 HBase backend code into a new module that is only dependent at runtime by yarn servers. Contributed by Haibo Chen.
Date Fri, 20 Jan 2017 05:07:00 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java
new file mode 100644
index 0000000..8e6c259
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * contains the constants used in the context of schema accesses for
+ * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
+ * information.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class TimelineHBaseSchemaConstants {
+  private TimelineHBaseSchemaConstants() {
+  }
+
+  /**
+   * Used to create a pre-split for tables starting with a username in the
+   * prefix. TODO: this may have to become a config variable (string with
+   * separators) so that different installations can presplit based on their own
+   * commonly occurring names.
+   */
+  private final static byte[][] USERNAME_SPLITS = {
+      Bytes.toBytes("a"), Bytes.toBytes("ad"), Bytes.toBytes("an"),
+      Bytes.toBytes("b"), Bytes.toBytes("ca"), Bytes.toBytes("cl"),
+      Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"),
+      Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i"),
+      Bytes.toBytes("j"), Bytes.toBytes("k"), Bytes.toBytes("l"),
+      Bytes.toBytes("m"), Bytes.toBytes("n"), Bytes.toBytes("o"),
+      Bytes.toBytes("q"), Bytes.toBytes("r"), Bytes.toBytes("s"),
+      Bytes.toBytes("se"), Bytes.toBytes("t"), Bytes.toBytes("u"),
+      Bytes.toBytes("v"), Bytes.toBytes("w"), Bytes.toBytes("x"),
+      Bytes.toBytes("y"), Bytes.toBytes("z")
+  };
+
+  /**
+   * The length at which keys auto-split.
+   */
+  public static final String USERNAME_SPLIT_KEY_PREFIX_LENGTH = "4";
+
+  /**
+   * @return splits for splits where a user is a prefix.
+   */
+  public static byte[][] getUsernameSplits() {
+    byte[][] kloon = USERNAME_SPLITS.clone();
+    // Deep copy.
+    for (int row = 0; row < USERNAME_SPLITS.length; row++) {
+      kloon[row] = Bytes.copy(USERNAME_SPLITS[row]);
+    }
+    return kloon;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java
new file mode 100644
index 0000000..d03b37d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * Utility class that allows HBase coprocessors to interact with unique
+ * timestamps.
+ */
+public class TimestampGenerator {
+
+  /*
+   * if this is changed, then reading cell timestamps written with older
+   * multiplier value will not work
+   */
+  public static final long TS_MULTIPLIER = 1000000L;
+
+  private final AtomicLong lastTimestamp = new AtomicLong();
+
+  /**
+   * Returns the current wall clock time in milliseconds, multiplied by the
+   * required precision.
+   *
+   * @return current timestamp.
+   */
+  public long currentTime() {
+    // We want to align cell timestamps with current time.
+    // cell timestamps are not be less than
+    // System.currentTimeMillis() * TS_MULTIPLIER.
+    return System.currentTimeMillis() * TS_MULTIPLIER;
+  }
+
+  /**
+   * Returns a timestamp value unique within the scope of this
+   * {@code TimestampGenerator} instance. For usage by HBase
+   * {@code RegionObserver} coprocessors, this normally means unique within a
+   * given region.
+   *
+   * Unlikely scenario of generating a non-unique timestamp: if there is a
+   * sustained rate of more than 1M hbase writes per second AND if region fails
+   * over within that time range of timestamps being generated then there may be
+   * collisions writing to a cell version of the same column.
+   *
+   * @return unique timestamp.
+   */
+  public long getUniqueTimestamp() {
+    long lastTs;
+    long nextTs;
+    do {
+      lastTs = lastTimestamp.get();
+      nextTs = Math.max(lastTs + 1, currentTime());
+    } while (!lastTimestamp.compareAndSet(lastTs, nextTs));
+    return nextTs;
+  }
+
+  /**
+   * Returns a timestamp multiplied with TS_MULTIPLIER and last few digits of
+   * application id.
+   *
+   * Unlikely scenario of generating a timestamp that is a duplicate: If more
+   * than a 1M concurrent apps are running in one flow run AND write to same
+   * column at the same time, then say appId of 1M and 1 will overlap
+   * with appId of 001 and there may be collisions for that flow run's
+   * specific column.
+   *
+   * @param incomingTS Timestamp to be converted.
+   * @param appId Application Id.
+   * @return a timestamp multiplied with TS_MULTIPLIER and last few digits of
+   *         application id
+   */
+  public static long getSupplementedTimestamp(long incomingTS, String appId) {
+    long suffix = getAppIdSuffix(appId);
+    long outgoingTS = incomingTS * TS_MULTIPLIER + suffix;
+    return outgoingTS;
+
+  }
+
+  private static long getAppIdSuffix(String appIdStr) {
+    if (appIdStr == null) {
+      return 0L;
+    }
+    ApplicationId appId = ApplicationId.fromString(appIdStr);
+    long id = appId.getId() % TS_MULTIPLIER;
+    return id;
+  }
+
+  /**
+   * truncates the last few digits of the timestamp which were supplemented by
+   * the TimestampGenerator#getSupplementedTimestamp function.
+   *
+   * @param incomingTS Timestamp to be truncated.
+   * @return a truncated timestamp value
+   */
+  public static long getTruncatedTimestamp(long incomingTS) {
+    return incomingTS / TS_MULTIPLIER;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TypedBufferedMutator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TypedBufferedMutator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TypedBufferedMutator.java
new file mode 100644
index 0000000..64a11f8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TypedBufferedMutator.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.hbase.client.BufferedMutator;
+
+/**
+ * Just a typed wrapper around {@link BufferedMutator} used to ensure that
+ * columns can write only to the table mutator for the right table.
+ */
+public interface TypedBufferedMutator<T> extends BufferedMutator {
+  // This class is intentionally left (almost) blank
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ValueConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ValueConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ValueConverter.java
new file mode 100644
index 0000000..757a6d3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ValueConverter.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+
+/**
+ * Converter used to encode/decode value associated with a column prefix or a
+ * column.
+ */
+public interface ValueConverter {
+
+  /**
+   * Encode an object as a byte array depending on the converter implementation.
+   *
+   * @param value Value to be encoded.
+   * @return a byte array
+   * @throws IOException if any problem is encountered while encoding.
+   */
+  byte[] encodeValue(Object value) throws IOException;
+
+  /**
+   * Decode a byte array and convert it into an object depending on the
+   * converter implementation.
+   *
+   * @param bytes Byte array to be decoded.
+   * @return an object
+   * @throws IOException if any problem is encountered while decoding.
+   */
+  Object decodeValue(byte[] bytes) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java
new file mode 100644
index 0000000..0df5b8a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.storage.common contains
+ * a set of utility classes used across backend storage reader and writer.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
new file mode 100644
index 0000000..93b4b36
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+
+/**
+ * Identifies fully qualified columns for the {@link EntityTable}.
+ */
+public enum EntityColumn implements Column<EntityTable> {
+
+  /**
+   * Identifier for the entity.
+   */
+  ID(EntityColumnFamily.INFO, "id"),
+
+  /**
+   * The type of entity.
+   */
+  TYPE(EntityColumnFamily.INFO, "type"),
+
+  /**
+   * When the entity was created.
+   */
+  CREATED_TIME(EntityColumnFamily.INFO, "created_time", new LongConverter()),
+
+  /**
+   * The version of the flow that this entity belongs to.
+   */
+  FLOW_VERSION(EntityColumnFamily.INFO, "flow_version");
+
+  private final ColumnHelper<EntityTable> column;
+  private final ColumnFamily<EntityTable> columnFamily;
+  private final String columnQualifier;
+  private final byte[] columnQualifierBytes;
+
+  EntityColumn(ColumnFamily<EntityTable> columnFamily,
+      String columnQualifier) {
+    this(columnFamily, columnQualifier, GenericConverter.getInstance());
+  }
+
+  EntityColumn(ColumnFamily<EntityTable> columnFamily,
+      String columnQualifier, ValueConverter converter) {
+    this.columnFamily = columnFamily;
+    this.columnQualifier = columnQualifier;
+    // Future-proof by ensuring the right column prefix hygiene.
+    this.columnQualifierBytes =
+        Bytes.toBytes(Separator.SPACE.encode(columnQualifier));
+    this.column = new ColumnHelper<EntityTable>(columnFamily, converter);
+  }
+
+  /**
+   * @return the column name value
+   */
+  private String getColumnQualifier() {
+    return columnQualifier;
+  }
+
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<EntityTable> tableMutator, Long timestamp,
+      Object inputValue, Attribute... attributes) throws IOException {
+    column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
+        inputValue, attributes);
+  }
+
+  public Object readResult(Result result) throws IOException {
+    return column.readResult(result, columnQualifierBytes);
+  }
+
+  /**
+   * Retrieve an {@link EntityColumn} given a name, or null if there is no
+   * match. The following holds true: {@code columnFor(x) == columnFor(y)} if
+   * and only if {@code x.equals(y)} or {@code (x == y == null)}
+   *
+   * @param columnQualifier Name of the column to retrieve
+   * @return the corresponding {@link EntityColumn} or null
+   */
+  public static final EntityColumn columnFor(String columnQualifier) {
+
+    // Match column based on value, assume column family matches.
+    for (EntityColumn ec : EntityColumn.values()) {
+      // Find a match based only on name.
+      if (ec.getColumnQualifier().equals(columnQualifier)) {
+        return ec;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+
+  @Override
+  public byte[] getColumnQualifierBytes() {
+    return columnQualifierBytes.clone();
+  }
+
+  @Override
+  public byte[] getColumnFamilyBytes() {
+    return columnFamily.getBytes();
+  }
+
+  @Override
+  public ValueConverter getValueConverter() {
+    return column.getValueConverter();
+  }
+
+  /**
+   * Retrieve an {@link EntityColumn} given a name, or null if there is no
+   * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)}
+   * if and only if {@code a.equals(b) & x.equals(y)} or
+   * {@code (x == y == null)}
+   *
+   * @param columnFamily The columnFamily for which to retrieve the column.
+   * @param name Name of the column to retrieve
+   * @return the corresponding {@link EntityColumn} or null if both arguments
+   *         don't match.
+   */
+  public static final EntityColumn columnFor(EntityColumnFamily columnFamily,
+      String name) {
+
+    for (EntityColumn ec : EntityColumn.values()) {
+      // Find a match based column family and on name.
+      if (ec.columnFamily.equals(columnFamily)
+          && ec.getColumnQualifier().equals(name)) {
+        return ec;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java
new file mode 100644
index 0000000..7c63727
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
+/**
+ * Represents the entity table column families.
+ */
+public enum EntityColumnFamily implements ColumnFamily<EntityTable> {
+
+  /**
+   * Info column family houses known columns, specifically ones included in
+   * columnfamily filters.
+   */
+  INFO("i"),
+
+  /**
+   * Configurations are in a separate column family for two reasons: a) the size
+   * of the config values can be very large and b) we expect that config values
+   * are often separately accessed from other metrics and info columns.
+   */
+  CONFIGS("c"),
+
+  /**
+   * Metrics have a separate column family, because they have a separate TTL.
+   */
+  METRICS("m");
+
+  /**
+   * Byte representation of this column family.
+   */
+  private final byte[] bytes;
+
+  /**
+   * @param value create a column family with this name. Must be lower case and
+   *          without spaces.
+   */
+  EntityColumnFamily(String value) {
+    // column families should be lower case and not contain any spaces.
+    this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
+  }
+
+  public byte[] getBytes() {
+    return Bytes.copy(bytes);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
new file mode 100644
index 0000000..e410549
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+
+/**
+ * Identifies partially qualified columns for the entity table.
+ */
+public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
+
+  /**
+   * To store TimelineEntity getIsRelatedToEntities values.
+   */
+  IS_RELATED_TO(EntityColumnFamily.INFO, "s"),
+
+  /**
+   * To store TimelineEntity getRelatesToEntities values.
+   */
+  RELATES_TO(EntityColumnFamily.INFO, "r"),
+
+  /**
+   * To store TimelineEntity info values.
+   */
+  INFO(EntityColumnFamily.INFO, "i"),
+
+  /**
+   * Lifecycle events for an entity.
+   */
+  EVENT(EntityColumnFamily.INFO, "e", true),
+
+  /**
+   * Config column stores configuration with config key as the column name.
+   */
+  CONFIG(EntityColumnFamily.CONFIGS, null),
+
+  /**
+   * Metrics are stored with the metric name as the column name.
+   */
+  METRIC(EntityColumnFamily.METRICS, null, new LongConverter());
+
+  private final ColumnHelper<EntityTable> column;
+  private final ColumnFamily<EntityTable> columnFamily;
+
+  /**
+   * Can be null for those cases where the provided column qualifier is the
+   * entire column name.
+   */
+  private final String columnPrefix;
+  private final byte[] columnPrefixBytes;
+
+  /**
+   * Private constructor, meant to be used by the enum definition.
+   *
+   * @param columnFamily that this column is stored in.
+   * @param columnPrefix for this column.
+   */
+  EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily,
+      String columnPrefix) {
+    this(columnFamily, columnPrefix, false, GenericConverter.getInstance());
+  }
+
+  EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily,
+      String columnPrefix, boolean compondColQual) {
+    this(columnFamily, columnPrefix, compondColQual,
+        GenericConverter.getInstance());
+  }
+
+  EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily,
+      String columnPrefix, ValueConverter converter) {
+    this(columnFamily, columnPrefix, false, converter);
+  }
+
+  /**
+   * Private constructor, meant to be used by the enum definition.
+   *
+   * @param columnFamily that this column is stored in.
+   * @param columnPrefix for this column.
+   * @param converter used to encode/decode values to be stored in HBase for
+   * this column prefix.
+   */
+  EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily,
+      String columnPrefix, boolean compondColQual, ValueConverter converter) {
+    column = new ColumnHelper<EntityTable>(columnFamily, converter);
+    this.columnFamily = columnFamily;
+    this.columnPrefix = columnPrefix;
+    if (columnPrefix == null) {
+      this.columnPrefixBytes = null;
+    } else {
+      // Future-proof by ensuring the right column prefix hygiene.
+      this.columnPrefixBytes =
+          Bytes.toBytes(Separator.SPACE.encode(columnPrefix));
+    }
+  }
+
+  /**
+   * @return the column name value
+   */
+  public String getColumnPrefix() {
+    return columnPrefix;
+  }
+
+  @Override
+  public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) {
+    return ColumnHelper.getColumnQualifier(
+        this.columnPrefixBytes, qualifierPrefix);
+  }
+
+  @Override
+  public byte[] getColumnPrefixBytes(String qualifierPrefix) {
+    return ColumnHelper.getColumnQualifier(
+        this.columnPrefixBytes, qualifierPrefix);
+  }
+
+  @Override
+  public byte[] getColumnFamilyBytes() {
+    return columnFamily.getBytes();
+  }
+
+  @Override
+  public ValueConverter getValueConverter() {
+    return column.getValueConverter();
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #store(byte[],
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+   * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[])
+   */
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<EntityTable> tableMutator, String qualifier,
+      Long timestamp, Object inputValue, Attribute... attributes)
+      throws IOException {
+
+    // Null check
+    if (qualifier == null) {
+      throw new IOException("Cannot store column with null qualifier in "
+          + tableMutator.getName().getNameAsString());
+    }
+
+    byte[] columnQualifier = getColumnPrefixBytes(qualifier);
+
+    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+        attributes);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #store(byte[],
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+   * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
+   */
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<EntityTable> tableMutator, byte[] qualifier,
+      Long timestamp, Object inputValue, Attribute... attributes)
+      throws IOException {
+
+    // Null check
+    if (qualifier == null) {
+      throw new IOException("Cannot store column with null qualifier in "
+          + tableMutator.getName().getNameAsString());
+    }
+
+    byte[] columnQualifier = getColumnPrefixBytes(qualifier);
+
+    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+        attributes);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String)
+   */
+  public Object readResult(Result result, String qualifier) throws IOException {
+    byte[] columnQualifier =
+        ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
+    return column.readResult(result, columnQualifier);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #readResults(org.apache.hadoop.hbase.client.Result,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
+   */
+  public <K> Map<K, Object> readResults(Result result,
+      KeyConverter<K> keyConverter) throws IOException {
+    return column.readResults(result, columnPrefixBytes, keyConverter);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
+   */
+  public <K, V> NavigableMap<K, NavigableMap<Long, V>>
+      readResultsWithTimestamps(Result result, KeyConverter<K> keyConverter)
+      throws IOException {
+    return column.readResultsWithTimestamps(result, columnPrefixBytes,
+        keyConverter);
+  }
+
+  /**
+   * Retrieve an {@link EntityColumnPrefix} given a name, or null if there is no
+   * match. The following holds true: {@code columnFor(x) == columnFor(y)} if
+   * and only if {@code x.equals(y)} or {@code (x == y == null)}
+   *
+   * @param columnPrefix Name of the column to retrieve
+   * @return the corresponding {@link EntityColumnPrefix} or null
+   */
+  public static final EntityColumnPrefix columnFor(String columnPrefix) {
+
+    // Match column based on value, assume column family matches.
+    for (EntityColumnPrefix ecp : EntityColumnPrefix.values()) {
+      // Find a match based only on name.
+      if (ecp.getColumnPrefix().equals(columnPrefix)) {
+        return ecp;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+
+  /**
+   * Retrieve an {@link EntityColumnPrefix} given a name, or null if there is no
+   * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)}
+   * if and only if {@code (x == y == null)} or
+   * {@code a.equals(b) & x.equals(y)}
+   *
+   * @param columnFamily The columnFamily for which to retrieve the column.
+   * @param columnPrefix Name of the column to retrieve
+   * @return the corresponding {@link EntityColumnPrefix} or null if both
+   *         arguments don't match.
+   */
+  public static final EntityColumnPrefix columnFor(
+      EntityColumnFamily columnFamily, String columnPrefix) {
+
+    // TODO: needs unit test to confirm and need to update javadoc to explain
+    // null prefix case.
+
+    for (EntityColumnPrefix ecp : EntityColumnPrefix.values()) {
+      // Find a match based column family and on name.
+      if (ecp.columnFamily.equals(columnFamily)
+          && (((columnPrefix == null) && (ecp.getColumnPrefix() == null)) ||
+          (ecp.getColumnPrefix().equals(columnPrefix)))) {
+        return ecp;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
new file mode 100644
index 0000000..ff22178
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
+/**
+ * Represents a rowkey for the entity table.
+ */
+public class EntityRowKey {
+  private final String clusterId;
+  private final String userId;
+  private final String flowName;
+  private final Long flowRunId;
+  private final String appId;
+  private final String entityType;
+  private final String entityId;
+  private final KeyConverter<EntityRowKey> entityRowKeyConverter =
+      new EntityRowKeyConverter();
+
+  public EntityRowKey(String clusterId, String userId, String flowName,
+      Long flowRunId, String appId, String entityType, String entityId) {
+    this.clusterId = clusterId;
+    this.userId = userId;
+    this.flowName = flowName;
+    this.flowRunId = flowRunId;
+    this.appId = appId;
+    this.entityType = entityType;
+    this.entityId = entityId;
+  }
+
+  public String getClusterId() {
+    return clusterId;
+  }
+
+  public String getUserId() {
+    return userId;
+  }
+
+  public String getFlowName() {
+    return flowName;
+  }
+
+  public Long getFlowRunId() {
+    return flowRunId;
+  }
+
+  public String getAppId() {
+    return appId;
+  }
+
+  public String getEntityType() {
+    return entityType;
+  }
+
+  public String getEntityId() {
+    return entityId;
+  }
+
+  /**
+   * Constructs a row key for the entity table as follows:
+   * {@code userName!clusterId!flowName!flowRunId!AppId!entityType!entityId}.
+   * Typically used while querying a specific entity.
+   *
+   * @return byte array with the row key.
+   */
+  public byte[] getRowKey() {
+    return entityRowKeyConverter.encode(this);
+  }
+
+  /**
+   * Given the raw row key as bytes, returns the row key as an object.
+   *
+   * @param rowKey byte representation of row key.
+   * @return An <cite>EntityRowKey</cite> object.
+   */
+  public static EntityRowKey parseRowKey(byte[] rowKey) {
+    return new EntityRowKeyConverter().decode(rowKey);
+  }
+
+  /**
+   * Encodes and decodes row key for entity table. The row key is of the form :
+   * userName!clusterId!flowName!flowRunId!appId!entityType!entityId. flowRunId
+   * is a long, appId is encoded/decoded using {@link AppIdKeyConverter} and
+   * rest are strings.
+   * <p>
+   */
+  final private static class EntityRowKeyConverter implements
+      KeyConverter<EntityRowKey> {
+
+    private final AppIdKeyConverter appIDKeyConverter = new AppIdKeyConverter();
+
+    private EntityRowKeyConverter() {
+    }
+
+    /**
+     * Entity row key is of the form
+     * userName!clusterId!flowName!flowRunId!appId!entityType!entityId w. each
+     * segment separated by !. The sizes below indicate sizes of each one of
+     * these segments in sequence. clusterId, userName, flowName, entityType and
+     * entityId are strings. flowrunId is a long hence 8 bytes in size. app id
+     * is represented as 12 bytes with cluster timestamp part of appid being 8
+     * bytes (long) and seq id being 4 bytes(int). Strings are variable in size
+     * (i.e. end whenever separator is encountered). This is used while decoding
+     * and helps in determining where to split.
+     */
+    private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE,
+        Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
+        AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE,
+        Separator.VARIABLE_SIZE };
+
+    /*
+     * (non-Javadoc)
+     *
+     * Encodes EntityRowKey object into a byte array with each component/field
+     * in EntityRowKey separated by Separator#QUALIFIERS. This leads to an
+     * entity table row key of the form
+     * userName!clusterId!flowName!flowRunId!appId!entityType!entityId If
+     * entityType in passed EntityRowKey object is null (and the fields
+     * preceding it i.e. clusterId, userId and flowName, flowRunId and appId
+     * are not null), this returns a row key prefix of the form
+     * userName!clusterId!flowName!flowRunId!appId! and if entityId in
+     * EntityRowKey is null (other 6 components are not null), this returns a
+     * row key prefix of the form
+     * userName!clusterId!flowName!flowRunId!appId!entityType! flowRunId is
+     * inverted while encoding as it helps maintain a descending order for row
+     * keys in entity table.
+     *
+     * @see org.apache.hadoop.yarn.server.timelineservice.storage.common
+     * .KeyConverter#encode(java.lang.Object)
+     */
+    @Override
+    public byte[] encode(EntityRowKey rowKey) {
+      byte[] user =
+          Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB,
+              Separator.QUALIFIERS);
+      byte[] cluster =
+          Separator.encode(rowKey.getClusterId(), Separator.SPACE,
+              Separator.TAB, Separator.QUALIFIERS);
+      byte[] flow =
+          Separator.encode(rowKey.getFlowName(), Separator.SPACE,
+              Separator.TAB, Separator.QUALIFIERS);
+      byte[] first = Separator.QUALIFIERS.join(user, cluster, flow);
+      // Note that flowRunId is a long, so we can't encode them all at the same
+      // time.
+      byte[] second =
+          Bytes.toBytes(LongConverter.invertLong(rowKey.getFlowRunId()));
+      byte[] third = appIDKeyConverter.encode(rowKey.getAppId());
+      if (rowKey.getEntityType() == null) {
+        return Separator.QUALIFIERS.join(first, second, third,
+            Separator.EMPTY_BYTES);
+      }
+      byte[] entityType =
+          Separator.encode(rowKey.getEntityType(), Separator.SPACE,
+              Separator.TAB, Separator.QUALIFIERS);
+      byte[] entityId =
+          rowKey.getEntityId() == null ? Separator.EMPTY_BYTES : Separator
+              .encode(rowKey.getEntityId(), Separator.SPACE, Separator.TAB,
+                  Separator.QUALIFIERS);
+      byte[] fourth = Separator.QUALIFIERS.join(entityType, entityId);
+      return Separator.QUALIFIERS.join(first, second, third, fourth);
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * Decodes an application row key of the form
+     * userName!clusterId!flowName!flowRunId!appId!entityType!entityId
+     * represented in byte format and converts it into an EntityRowKey object.
+     * flowRunId is inverted while decoding as it was inverted while encoding.
+     *
+     * @see
+     * org.apache.hadoop.yarn.server.timelineservice.storage.common
+     * .KeyConverter#decode(byte[])
+     */
+    @Override
+    public EntityRowKey decode(byte[] rowKey) {
+      byte[][] rowKeyComponents =
+          Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
+      if (rowKeyComponents.length != 7) {
+        throw new IllegalArgumentException("the row key is not valid for "
+            + "an entity");
+      }
+      String userId =
+          Separator.decode(Bytes.toString(rowKeyComponents[0]),
+              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+      String clusterId =
+          Separator.decode(Bytes.toString(rowKeyComponents[1]),
+              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+      String flowName =
+          Separator.decode(Bytes.toString(rowKeyComponents[2]),
+              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+      Long flowRunId =
+          LongConverter.invertLong(Bytes.toLong(rowKeyComponents[3]));
+      String appId = appIDKeyConverter.decode(rowKeyComponents[4]);
+      String entityType =
+          Separator.decode(Bytes.toString(rowKeyComponents[5]),
+              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+      String entityId =
+          Separator.decode(Bytes.toString(rowKeyComponents[6]),
+              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+      return new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
+          entityType, entityId);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java
new file mode 100644
index 0000000..9146180
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
+
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
+
+/**
+ * Represents a partial rowkey without the entityId or without entityType and
+ * entityId for the entity table.
+ *
+ */
+public class EntityRowKeyPrefix extends EntityRowKey implements
+    RowKeyPrefix<EntityRowKey> {
+
+  /**
+   * Creates a prefix which generates the following rowKeyPrefixes for the
+   * entity table:
+   * {@code userName!clusterId!flowName!flowRunId!AppId!entityType!}.
+   *
+   * @param clusterId identifying the cluster
+   * @param userId identifying the user
+   * @param flowName identifying the flow
+   * @param flowRunId identifying the individual run of this flow
+   * @param appId identifying the application
+   * @param entityType which entity type
+   */
+  public EntityRowKeyPrefix(String clusterId, String userId, String flowName,
+      Long flowRunId, String appId, String entityType) {
+    super(clusterId, userId, flowName, flowRunId, appId, entityType, null);
+  }
+
+  /**
+   * Creates a prefix which generates the following rowKeyPrefixes for the
+   * entity table:
+   * {@code userName!clusterId!flowName!flowRunId!AppId!entityType!entityId}.
+   *
+   * @param clusterId identifying the cluster
+   * @param userId identifying the user
+   * @param flowName identifying the flow
+   * @param flowRunId identifying the individual run of this flow
+   * @param appId identifying the application
+   */
+  public EntityRowKeyPrefix(String clusterId, String userId, String flowName,
+      Long flowRunId, String appId) {
+    super(clusterId, userId, flowName, flowRunId, appId, null, null);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.application.
+   * RowKeyPrefix#getRowKeyPrefix()
+   */
+  public byte[] getRowKeyPrefix() {
+    return super.getRowKey();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
new file mode 100644
index 0000000..b194f07
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants;
+
+/**
+ * The entity table as column families info, config and metrics. Info stores
+ * information about a timeline entity object config stores configuration data
+ * of a timeline entity object metrics stores the metrics of a timeline entity
+ * object
+ *
+ * Example entity table record:
+ *
+ * <pre>
+ * |-------------------------------------------------------------------------|
+ * |  Row       | Column Family                | Column Family| Column Family|
+ * |  key       | info                         | metrics      | config       |
+ * |-------------------------------------------------------------------------|
+ * | userName!  | id:entityId                  | metricId1:   | configKey1:  |
+ * | clusterId! |                              | metricValue1 | configValue1 |
+ * | flowName!  | type:entityType              | @timestamp1  |              |
+ * | flowRunId! |                              |              | configKey2:  |
+ * | AppId!     | created_time:                | metricId1:   | configValue2 |
+ * | entityType!| 1392993084018                | metricValue2 |              |
+ * | entityId   |                              | @timestamp2  |              |
+ * |            | i!infoKey:                   |              |              |
+ * |            | infoValue                    | metricId1:   |              |
+ * |            |                              | metricValue1 |              |
+ * |            | r!relatesToKey:              | @timestamp2  |              |
+ * |            | id3=id4=id5                  |              |              |
+ * |            |                              |              |              |
+ * |            | s!isRelatedToKey             |              |              |
+ * |            | id7=id9=id6                  |              |              |
+ * |            |                              |              |              |
+ * |            | e!eventId=timestamp=infoKey: |              |              |
+ * |            | eventInfoValue               |              |              |
+ * |            |                              |              |              |
+ * |            | flowVersion:                 |              |              |
+ * |            | versionValue                 |              |              |
+ * |-------------------------------------------------------------------------|
+ * </pre>
+ */
+public class EntityTable extends BaseTable<EntityTable> {
+  /** entity prefix. */
+  private static final String PREFIX =
+      YarnConfiguration.TIMELINE_SERVICE_PREFIX + "entity";
+
+  /** config param name that specifies the entity table name. */
+  public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
+
+  /**
+   * config param name that specifies the TTL for metrics column family in
+   * entity table.
+   */
+  private static final String METRICS_TTL_CONF_NAME = PREFIX
+      + ".table.metrics.ttl";
+
+  /** default value for entity table name. */
+  public static final String DEFAULT_TABLE_NAME = "timelineservice.entity";
+
+  /** default TTL is 30 days for metrics timeseries. */
+  private static final int DEFAULT_METRICS_TTL = 2592000;
+
+  /** default max number of versions. */
+  private static final int DEFAULT_METRICS_MAX_VERSIONS = 1000;
+
+  private static final Log LOG = LogFactory.getLog(EntityTable.class);
+
+  public EntityTable() {
+    super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable
+   * (org.apache.hadoop.hbase.client.Admin,
+   * org.apache.hadoop.conf.Configuration)
+   */
+  public void createTable(Admin admin, Configuration hbaseConf)
+      throws IOException {
+
+    TableName table = getTableName(hbaseConf);
+    if (admin.tableExists(table)) {
+      // do not disable / delete existing table
+      // similar to the approach taken by map-reduce jobs when
+      // output directory exists
+      throw new IOException("Table " + table.getNameAsString()
+          + " already exists.");
+    }
+
+    HTableDescriptor entityTableDescp = new HTableDescriptor(table);
+    HColumnDescriptor infoCF =
+        new HColumnDescriptor(EntityColumnFamily.INFO.getBytes());
+    infoCF.setBloomFilterType(BloomType.ROWCOL);
+    entityTableDescp.addFamily(infoCF);
+
+    HColumnDescriptor configCF =
+        new HColumnDescriptor(EntityColumnFamily.CONFIGS.getBytes());
+    configCF.setBloomFilterType(BloomType.ROWCOL);
+    configCF.setBlockCacheEnabled(true);
+    entityTableDescp.addFamily(configCF);
+
+    HColumnDescriptor metricsCF =
+        new HColumnDescriptor(EntityColumnFamily.METRICS.getBytes());
+    entityTableDescp.addFamily(metricsCF);
+    metricsCF.setBlockCacheEnabled(true);
+    // always keep 1 version (the latest)
+    metricsCF.setMinVersions(1);
+    metricsCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS);
+    metricsCF.setTimeToLive(hbaseConf.getInt(METRICS_TTL_CONF_NAME,
+        DEFAULT_METRICS_TTL));
+    entityTableDescp.setRegionSplitPolicyClassName(
+        "org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
+    entityTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
+        TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
+    admin.createTable(entityTableDescp,
+        TimelineHBaseSchemaConstants.getUsernameSplits());
+    LOG.info("Status of table creation for " + table.getNameAsString() + "="
+        + admin.tableExists(table));
+  }
+
+  /**
+   * @param metricsTTL time to live parameter for the metricss in this table.
+   * @param hbaseConf configururation in which to set the metrics TTL config
+   *          variable.
+   */
+  public void setMetricsTTL(int metricsTTL, Configuration hbaseConf) {
+    hbaseConf.setInt(METRICS_TTL_CONF_NAME, metricsTTL);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/package-info.java
new file mode 100644
index 0000000..bb0e331
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.storage.entity
+ * contains classes related to implementation for entity table.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java
new file mode 100644
index 0000000..4e2cf2d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Identifies the compaction dimensions for the data in the {@link FlowRunTable}
+ * .
+ */
+public enum AggregationCompactionDimension {
+
+  /**
+   * the application id.
+   */
+  APPLICATION_ID((byte) 101);
+
+  private byte tagType;
+  private byte[] inBytes;
+
+  private AggregationCompactionDimension(byte tagType) {
+    this.tagType = tagType;
+    this.inBytes = Bytes.toBytes(this.name());
+  }
+
+  public Attribute getAttribute(String attributeValue) {
+    return new Attribute(this.name(), Bytes.toBytes(attributeValue));
+  }
+
+  public byte getTagType() {
+    return tagType;
+  }
+
+  public byte[] getInBytes() {
+    return this.inBytes.clone();
+  }
+
+  public static AggregationCompactionDimension
+      getAggregationCompactionDimension(String aggCompactDimStr) {
+    for (AggregationCompactionDimension aggDim : AggregationCompactionDimension
+        .values()) {
+      if (aggDim.name().equals(aggCompactDimStr)) {
+        return aggDim;
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
new file mode 100644
index 0000000..40cdd2c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Identifies the attributes to be set for puts into the {@link FlowRunTable}.
+ * The numbers used for tagType are prime numbers.
+ */
+public enum AggregationOperation {
+
+  /**
+   * When the flow was started.
+   */
+  GLOBAL_MIN((byte) 71),
+
+  /**
+   * When it ended.
+   */
+  GLOBAL_MAX((byte) 73),
+
+  /**
+   * The metrics of the flow.
+   */
+  SUM((byte) 79),
+
+  /**
+   * application running.
+   */
+  SUM_FINAL((byte) 83),
+
+  /**
+   * Min value as per the latest timestamp
+   * seen for a given app.
+   */
+  LATEST_MIN((byte) 89),
+
+  /**
+   * Max value as per the latest timestamp
+   * seen for a given app.
+   */
+  LATEST_MAX((byte) 97);
+
+  private byte tagType;
+  private byte[] inBytes;
+
+  private AggregationOperation(byte tagType) {
+    this.tagType = tagType;
+    this.inBytes = Bytes.toBytes(this.name());
+  }
+
+  public Attribute getAttribute() {
+    return new Attribute(this.name(), this.inBytes);
+  }
+
+  public byte getTagType() {
+    return tagType;
+  }
+
+  public byte[] getInBytes() {
+    return this.inBytes.clone();
+  }
+
+  /**
+   * returns the AggregationOperation enum that represents that string.
+   * @param aggOpStr Aggregation operation.
+   * @return the AggregationOperation enum that represents that string
+   */
+  public static AggregationOperation getAggregationOperation(String aggOpStr) {
+    for (AggregationOperation aggOp : AggregationOperation.values()) {
+      if (aggOp.name().equals(aggOpStr)) {
+        return aggOp;
+      }
+    }
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java
new file mode 100644
index 0000000..d3de518
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+/**
+ * Defines the attribute tuple to be set for puts into the {@link FlowRunTable}.
+ */
+public class Attribute {
+  private final String name;
+  private final byte[] value;
+
+  public Attribute(String name, byte[] value) {
+    this.name = name;
+    this.value = value.clone();
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public byte[] getValue() {
+    return value.clone();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java
new file mode 100644
index 0000000..f9eb5b4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
+/**
+ * Represents the flow run table column families.
+ */
+public enum FlowActivityColumnFamily
+    implements ColumnFamily<FlowActivityTable> {
+
+  /**
+   * Info column family houses known columns, specifically ones included in
+   * columnfamily filters.
+   */
+  INFO("i");
+
+  /**
+   * Byte representation of this column family.
+   */
+  private final byte[] bytes;
+
+  /**
+   * @param value
+   *          create a column family with this name. Must be lower case and
+   *          without spaces.
+   */
+  private FlowActivityColumnFamily(String value) {
+    // column families should be lower case and not contain any spaces.
+    this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
+  }
+
+  public byte[] getBytes() {
+    return Bytes.copy(bytes);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
new file mode 100644
index 0000000..439e0c8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
+
+/**
+ * Identifies partially qualified columns for the {@link FlowActivityTable}.
+ */
+public enum FlowActivityColumnPrefix
+    implements ColumnPrefix<FlowActivityTable> {
+
+  /**
+   * To store run ids of the flows.
+   */
+  RUN_ID(FlowActivityColumnFamily.INFO, "r", null);
+
+  private final ColumnHelper<FlowActivityTable> column;
+  private final ColumnFamily<FlowActivityTable> columnFamily;
+
+  /**
+   * Can be null for those cases where the provided column qualifier is the
+   * entire column name.
+   */
+  private final String columnPrefix;
+  private final byte[] columnPrefixBytes;
+
+  private final AggregationOperation aggOp;
+
+  /**
+   * Private constructor, meant to be used by the enum definition.
+   *
+   * @param columnFamily
+   *          that this column is stored in.
+   * @param columnPrefix
+   *          for this column.
+   */
+  private FlowActivityColumnPrefix(
+      ColumnFamily<FlowActivityTable> columnFamily, String columnPrefix,
+      AggregationOperation aggOp) {
+    this(columnFamily, columnPrefix, aggOp, false);
+  }
+
+  private FlowActivityColumnPrefix(
+      ColumnFamily<FlowActivityTable> columnFamily, String columnPrefix,
+      AggregationOperation aggOp, boolean compoundColQual) {
+    column = new ColumnHelper<FlowActivityTable>(columnFamily);
+    this.columnFamily = columnFamily;
+    this.columnPrefix = columnPrefix;
+    if (columnPrefix == null) {
+      this.columnPrefixBytes = null;
+    } else {
+      // Future-proof by ensuring the right column prefix hygiene.
+      this.columnPrefixBytes = Bytes.toBytes(Separator.SPACE
+          .encode(columnPrefix));
+    }
+    this.aggOp = aggOp;
+  }
+
+  /**
+   * @return the column name value
+   */
+  public String getColumnPrefix() {
+    return columnPrefix;
+  }
+
+  @Override
+  public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) {
+    return ColumnHelper.getColumnQualifier(
+        this.columnPrefixBytes, qualifierPrefix);
+  }
+
+  @Override
+  public byte[] getColumnPrefixBytes(String qualifierPrefix) {
+    return ColumnHelper.getColumnQualifier(
+        this.columnPrefixBytes, qualifierPrefix);
+  }
+
+  public byte[] getColumnPrefixBytes() {
+    return columnPrefixBytes.clone();
+  }
+
+  @Override
+  public byte[] getColumnFamilyBytes() {
+    return columnFamily.getBytes();
+  }
+
+  @Override
+  public ValueConverter getValueConverter() {
+    return column.getValueConverter();
+  }
+
+  public AggregationOperation getAttribute() {
+    return aggOp;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #store(byte[],
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+   * TypedBufferedMutator, byte[], java.lang.Long, java.lang.Object,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[])
+   */
+  @Override
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<FlowActivityTable> tableMutator, byte[] qualifier,
+      Long timestamp, Object inputValue, Attribute... attributes)
+      throws IOException {
+    // Null check
+    if (qualifier == null) {
+      throw new IOException("Cannot store column with null qualifier in "
+          + tableMutator.getName().getNameAsString());
+    }
+
+    byte[] columnQualifier = getColumnPrefixBytes(qualifier);
+    Attribute[] combinedAttributes =
+        HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp);
+    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+        combinedAttributes);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String)
+   */
+  public Object readResult(Result result, String qualifier) throws IOException {
+    byte[] columnQualifier = ColumnHelper.getColumnQualifier(
+        this.columnPrefixBytes, qualifier);
+    return column.readResult(result, columnQualifier);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #readResults(org.apache.hadoop.hbase.client.Result,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
+   */
+  public <K> Map<K, Object> readResults(Result result,
+      KeyConverter<K> keyConverter) throws IOException {
+    return column.readResults(result, columnPrefixBytes, keyConverter);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
+   */
+  public <K, V> NavigableMap<K, NavigableMap<Long, V>>
+      readResultsWithTimestamps(Result result, KeyConverter<K> keyConverter)
+      throws IOException {
+    return column.readResultsWithTimestamps(result, columnPrefixBytes,
+        keyConverter);
+  }
+
+  /**
+   * Retrieve an {@link FlowActivityColumnPrefix} given a name, or null if there
+   * is no match. The following holds true: {@code columnFor(x) == columnFor(y)}
+   * if and only if {@code x.equals(y)} or {@code (x == y == null)}
+   *
+   * @param columnPrefix
+   *          Name of the column to retrieve
+   * @return the corresponding {@link FlowActivityColumnPrefix} or null
+   */
+  public static final FlowActivityColumnPrefix columnFor(String columnPrefix) {
+
+    // Match column based on value, assume column family matches.
+    for (FlowActivityColumnPrefix flowActivityColPrefix :
+        FlowActivityColumnPrefix.values()) {
+      // Find a match based only on name.
+      if (flowActivityColPrefix.getColumnPrefix().equals(columnPrefix)) {
+        return flowActivityColPrefix;
+      }
+    }
+    // Default to null
+    return null;
+  }
+
+  /**
+   * Retrieve an {@link FlowActivityColumnPrefix} given a name, or null if there
+   * is no match. The following holds true:
+   * {@code columnFor(a,x) == columnFor(b,y)} if and only if
+   * {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)}
+   *
+   * @param columnFamily
+   *          The columnFamily for which to retrieve the column.
+   * @param columnPrefix
+   *          Name of the column to retrieve
+   * @return the corresponding {@link FlowActivityColumnPrefix} or null if both
+   *         arguments don't match.
+   */
+  public static final FlowActivityColumnPrefix columnFor(
+      FlowActivityColumnFamily columnFamily, String columnPrefix) {
+
+    // TODO: needs unit test to confirm and need to update javadoc to explain
+    // null prefix case.
+
+    for (FlowActivityColumnPrefix flowActivityColumnPrefix :
+        FlowActivityColumnPrefix.values()) {
+      // Find a match based column family and on name.
+      if (flowActivityColumnPrefix.columnFamily.equals(columnFamily)
+          && (((columnPrefix == null) && (flowActivityColumnPrefix
+              .getColumnPrefix() == null)) || (flowActivityColumnPrefix
+              .getColumnPrefix().equals(columnPrefix)))) {
+        return flowActivityColumnPrefix;
+      }
+    }
+    // Default to null
+    return null;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #store(byte[],
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+   * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[])
+   */
+  @Override
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<FlowActivityTable> tableMutator, String qualifier,
+      Long timestamp, Object inputValue, Attribute... attributes)
+      throws IOException {
+    // Null check
+    if (qualifier == null) {
+      throw new IOException("Cannot store column with null qualifier in "
+          + tableMutator.getName().getNameAsString());
+    }
+
+    byte[] columnQualifier = getColumnPrefixBytes(qualifier);
+    Attribute[] combinedAttributes =
+        HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp);
+    column.store(rowKey, tableMutator, columnQualifier, null, inputValue,
+        combinedAttributes);
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message