hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sj...@apache.org
Subject [22/28] hadoop git commit: YARN-5928. Move ATSv2 HBase backend code into a new module that is only dependent at runtime by yarn servers. Contributed by Haibo Chen.
Date Fri, 20 Jan 2017 05:06:57 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
new file mode 100644
index 0000000..4e1ab8a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
@@ -0,0 +1,648 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Timeline entity reader for generic entities that are stored in the entity
+ * table.
+ */
+class GenericEntityReader extends TimelineEntityReader {
+  private static final EntityTable ENTITY_TABLE = new EntityTable();
+
+  /**
+   * Used to look up the flow context.
+   */
+  private final AppToFlowTable appToFlowTable = new AppToFlowTable();
+
+  /**
+   * Used to convert strings key components to and from storage format.
+   */
+  private final KeyConverter<String> stringKeyConverter =
+      new StringKeyConverter();
+
+  public GenericEntityReader(TimelineReaderContext ctxt,
+      TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve,
+      boolean sortedKeys) {
+    super(ctxt, entityFilters, toRetrieve, sortedKeys);
+  }
+
+  public GenericEntityReader(TimelineReaderContext ctxt,
+      TimelineDataToRetrieve toRetrieve) {
+    super(ctxt, toRetrieve);
+  }
+
+  /**
+   * Uses the {@link EntityTable}.
+   */
+  protected BaseTable<?> getTable() {
+    return ENTITY_TABLE;
+  }
+
+  @Override
+  protected FilterList constructFilterListBasedOnFilters() throws IOException {
+    // Filters here cannot be null for multiple entity reads as they are set in
+    // augmentParams if null.
+    FilterList listBasedOnFilters = new FilterList();
+    TimelineEntityFilters filters = getFilters();
+    // Create filter list based on created time range and add it to
+    // listBasedOnFilters.
+    long createdTimeBegin = filters.getCreatedTimeBegin();
+    long createdTimeEnd = filters.getCreatedTimeEnd();
+    if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) {
+      listBasedOnFilters.addFilter(TimelineFilterUtils
+          .createSingleColValueFiltersByRange(EntityColumn.CREATED_TIME,
+              createdTimeBegin, createdTimeEnd));
+    }
+    // Create filter list based on metric filters and add it to
+    // listBasedOnFilters.
+    TimelineFilterList metricFilters = filters.getMetricFilters();
+    if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) {
+      listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList(
+          EntityColumnPrefix.METRIC, metricFilters));
+    }
+    // Create filter list based on config filters and add it to
+    // listBasedOnFilters.
+    TimelineFilterList configFilters = filters.getConfigFilters();
+    if (configFilters != null && !configFilters.getFilterList().isEmpty()) {
+      listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList(
+          EntityColumnPrefix.CONFIG, configFilters));
+    }
+    // Create filter list based on info filters and add it to listBasedOnFilters
+    TimelineFilterList infoFilters = filters.getInfoFilters();
+    if (infoFilters != null && !infoFilters.getFilterList().isEmpty()) {
+      listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList(
+          EntityColumnPrefix.INFO, infoFilters));
+    }
+    return listBasedOnFilters;
+  }
+
+  /**
+   * Check if we need to fetch only some of the event columns.
+   *
+   * @return true if we need to fetch some of the columns, false otherwise.
+   */
+  private boolean fetchPartialEventCols(TimelineFilterList eventFilters,
+      EnumSet<Field> fieldsToRetrieve) {
+    return (eventFilters != null && !eventFilters.getFilterList().isEmpty() &&
+        !hasField(fieldsToRetrieve, Field.EVENTS));
+  }
+
+  /**
+   * Check if we need to fetch only some of the relates_to columns.
+   *
+   * @return true if we need to fetch some of the columns, false otherwise.
+   */
+  private boolean fetchPartialRelatesToCols(TimelineFilterList relatesTo,
+      EnumSet<Field> fieldsToRetrieve) {
+    return (relatesTo != null && !relatesTo.getFilterList().isEmpty() &&
+        !hasField(fieldsToRetrieve, Field.RELATES_TO));
+  }
+
+  /**
+   * Check if we need to fetch only some of the is_related_to columns.
+   *
+   * @return true if we need to fetch some of the columns, false otherwise.
+   */
+  private boolean fetchPartialIsRelatedToCols(TimelineFilterList isRelatedTo,
+      EnumSet<Field> fieldsToRetrieve) {
+    return (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty() &&
+        !hasField(fieldsToRetrieve, Field.IS_RELATED_TO));
+  }
+
+  /**
+   * Check if we need to fetch only some of the columns based on event filters,
+   * relatesto and isrelatedto from info family.
+   *
+   * @return true, if we need to fetch only some of the columns, false if we
+   *         need to fetch all the columns under info column family.
+   */
+  protected boolean fetchPartialColsFromInfoFamily() {
+    EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
+    TimelineEntityFilters filters = getFilters();
+    return fetchPartialEventCols(filters.getEventFilters(), fieldsToRetrieve)
+        || fetchPartialRelatesToCols(filters.getRelatesTo(), fieldsToRetrieve)
+        || fetchPartialIsRelatedToCols(filters.getIsRelatedTo(),
+            fieldsToRetrieve);
+  }
+
+  /**
+   * Check if we need to create filter list based on fields. We need to create a
+   * filter list iff all fields need not be retrieved or we have some specific
+   * fields or metrics to retrieve. We also need to create a filter list if we
+   * have relationships(relatesTo/isRelatedTo) and event filters specified for
+   * the query.
+   *
+   * @return true if we need to create the filter list, false otherwise.
+   */
+  protected boolean needCreateFilterListBasedOnFields() {
+    TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
+    // Check if all fields are to be retrieved or not. If all fields have to
+    // be retrieved, also check if we have some metrics or configs to
+    // retrieve specified for the query because then a filter list will have
+    // to be created.
+    boolean flag =
+        !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL)
+            || (dataToRetrieve.getConfsToRetrieve() != null && !dataToRetrieve
+                .getConfsToRetrieve().getFilterList().isEmpty())
+            || (dataToRetrieve.getMetricsToRetrieve() != null && !dataToRetrieve
+                .getMetricsToRetrieve().getFilterList().isEmpty());
+    // Filters need to be checked only if we are reading multiple entities. If
+    // condition above is false, we check if there are relationships(relatesTo/
+    // isRelatedTo) and event filters specified for the query.
+    if (!flag && !isSingleEntityRead()) {
+      TimelineEntityFilters filters = getFilters();
+      flag =
+          (filters.getEventFilters() != null && !filters.getEventFilters()
+              .getFilterList().isEmpty())
+              || (filters.getIsRelatedTo() != null && !filters.getIsRelatedTo()
+                  .getFilterList().isEmpty())
+              || (filters.getRelatesTo() != null && !filters.getRelatesTo()
+                  .getFilterList().isEmpty());
+    }
+    return flag;
+  }
+
+  /**
+   * Add {@link QualifierFilter} filters to filter list for each column of
+   * entity table.
+   *
+   * @param list filter list to which qualifier filters have to be added.
+   */
+  protected void updateFixedColumns(FilterList list) {
+    for (EntityColumn column : EntityColumn.values()) {
+      list.addFilter(new QualifierFilter(CompareOp.EQUAL, new BinaryComparator(
+          column.getColumnQualifierBytes())));
+    }
+  }
+
+  /**
+   * Creates a filter list which indicates that only some of the column
+   * qualifiers in the info column family will be returned in result.
+   *
+   * @param isApplication If true, it means operations are to be performed for
+   *          application table, otherwise for entity table.
+   * @return filter list.
+   * @throws IOException if any problem occurs while creating filter list.
+   */
+  private FilterList createFilterListForColsOfInfoFamily() throws IOException {
+    FilterList infoFamilyColsFilter = new FilterList(Operator.MUST_PASS_ONE);
+    // Add filters for each column in entity table.
+    updateFixedColumns(infoFamilyColsFilter);
+    EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
+    // If INFO field has to be retrieved, add a filter for fetching columns
+    // with INFO column prefix.
+    if (hasField(fieldsToRetrieve, Field.INFO)) {
+      infoFamilyColsFilter
+          .addFilter(TimelineFilterUtils.createHBaseQualifierFilter(
+              CompareOp.EQUAL, EntityColumnPrefix.INFO));
+    }
+    TimelineFilterList relatesTo = getFilters().getRelatesTo();
+    if (hasField(fieldsToRetrieve, Field.RELATES_TO)) {
+      // If RELATES_TO field has to be retrieved, add a filter for fetching
+      // columns with RELATES_TO column prefix.
+      infoFamilyColsFilter.addFilter(TimelineFilterUtils
+          .createHBaseQualifierFilter(CompareOp.EQUAL,
+              EntityColumnPrefix.RELATES_TO));
+    } else if (relatesTo != null && !relatesTo.getFilterList().isEmpty()) {
+      // Even if fields to retrieve does not contain RELATES_TO, we still
+      // need to have a filter to fetch some of the column qualifiers if
+      // relatesTo filters are specified. relatesTo filters will then be
+      // matched after fetching rows from HBase.
+      Set<String> relatesToCols =
+          TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo);
+      infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
+          EntityColumnPrefix.RELATES_TO, relatesToCols));
+    }
+    TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo();
+    if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
+      // If IS_RELATED_TO field has to be retrieved, add a filter for fetching
+      // columns with IS_RELATED_TO column prefix.
+      infoFamilyColsFilter.addFilter(TimelineFilterUtils
+          .createHBaseQualifierFilter(CompareOp.EQUAL,
+              EntityColumnPrefix.IS_RELATED_TO));
+    } else if (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty()) {
+      // Even if fields to retrieve does not contain IS_RELATED_TO, we still
+      // need to have a filter to fetch some of the column qualifiers if
+      // isRelatedTo filters are specified. isRelatedTo filters will then be
+      // matched after fetching rows from HBase.
+      Set<String> isRelatedToCols =
+          TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo);
+      infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
+          EntityColumnPrefix.IS_RELATED_TO, isRelatedToCols));
+    }
+    TimelineFilterList eventFilters = getFilters().getEventFilters();
+    if (hasField(fieldsToRetrieve, Field.EVENTS)) {
+      // If EVENTS field has to be retrieved, add a filter for fetching columns
+      // with EVENT column prefix.
+      infoFamilyColsFilter
+          .addFilter(TimelineFilterUtils.createHBaseQualifierFilter(
+              CompareOp.EQUAL, EntityColumnPrefix.EVENT));
+    } else if (eventFilters != null &&
+        !eventFilters.getFilterList().isEmpty()) {
+      // Even if fields to retrieve does not contain EVENTS, we still need to
+      // have a filter to fetch some of the column qualifiers on the basis of
+      // event filters specified. Event filters will then be matched after
+      // fetching rows from HBase.
+      Set<String> eventCols =
+          TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters);
+      infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
+          EntityColumnPrefix.EVENT, eventCols));
+    }
+    return infoFamilyColsFilter;
+  }
+
+  /**
+   * Exclude column prefixes via filters which are not required(based on fields
+   * to retrieve) from info column family. These filters are added to filter
+   * list which contains a filter for getting info column family.
+   *
+   * @param infoColFamilyList filter list for info column family.
+   */
+  private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) {
+    EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
+    // Events not required.
+    if (!hasField(fieldsToRetrieve, Field.EVENTS)) {
+      infoColFamilyList.addFilter(TimelineFilterUtils
+          .createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
+              EntityColumnPrefix.EVENT));
+    }
+    // info not required.
+    if (!hasField(fieldsToRetrieve, Field.INFO)) {
+      infoColFamilyList.addFilter(TimelineFilterUtils
+          .createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
+              EntityColumnPrefix.INFO));
+    }
+    // is related to not required.
+    if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
+      infoColFamilyList.addFilter(TimelineFilterUtils
+          .createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
+              EntityColumnPrefix.IS_RELATED_TO));
+    }
+    // relates to not required.
+    if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) {
+      infoColFamilyList.addFilter(TimelineFilterUtils
+          .createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
+              EntityColumnPrefix.RELATES_TO));
+    }
+  }
+
+  /**
+   * Updates filter list based on fields for confs and metrics to retrieve.
+   *
+   * @param listBasedOnFields filter list based on fields.
+   * @throws IOException if any problem occurs while updating filter list.
+   */
+  private void updateFilterForConfsAndMetricsToRetrieve(
+      FilterList listBasedOnFields) throws IOException {
+    TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
+    // Please note that if confsToRetrieve is specified, we would have added
+    // CONFS to fields to retrieve in augmentParams() even if not specified.
+    if (dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS)) {
+      // Create a filter list for configs.
+      listBasedOnFields.addFilter(TimelineFilterUtils
+          .createFilterForConfsOrMetricsToRetrieve(
+              dataToRetrieve.getConfsToRetrieve(), EntityColumnFamily.CONFIGS,
+              EntityColumnPrefix.CONFIG));
+    }
+
+    // Please note that if metricsToRetrieve is specified, we would have added
+    // METRICS to fields to retrieve in augmentParams() even if not specified.
+    if (dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS)) {
+      // Create a filter list for metrics.
+      listBasedOnFields.addFilter(TimelineFilterUtils
+          .createFilterForConfsOrMetricsToRetrieve(
+              dataToRetrieve.getMetricsToRetrieve(),
+              EntityColumnFamily.METRICS, EntityColumnPrefix.METRIC));
+    }
+  }
+
+  @Override
+  protected FilterList constructFilterListBasedOnFields() throws IOException {
+    if (!needCreateFilterListBasedOnFields()) {
+      // Fetch all the columns. No need of a filter.
+      return null;
+    }
+    FilterList listBasedOnFields = new FilterList(Operator.MUST_PASS_ONE);
+    FilterList infoColFamilyList = new FilterList();
+    // By default fetch everything in INFO column family.
+    FamilyFilter infoColumnFamily =
+        new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(
+            EntityColumnFamily.INFO.getBytes()));
+    infoColFamilyList.addFilter(infoColumnFamily);
+    if (!isSingleEntityRead() && fetchPartialColsFromInfoFamily()) {
+      // We can fetch only some of the columns from info family.
+      infoColFamilyList.addFilter(createFilterListForColsOfInfoFamily());
+    } else {
+      // Exclude column prefixes in info column family which are not required
+      // based on fields to retrieve.
+      excludeFieldsFromInfoColFamily(infoColFamilyList);
+    }
+    listBasedOnFields.addFilter(infoColFamilyList);
+    updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields);
+    return listBasedOnFields;
+  }
+
+  /**
+   * Looks up flow context from AppToFlow table.
+   *
+   * @param appToFlowRowKey to identify Cluster and App Ids.
+   * @param hbaseConf HBase configuration.
+   * @param conn HBase Connection.
+   * @return flow context information.
+   * @throws IOException if any problem occurs while fetching flow information.
+   */
+  protected FlowContext lookupFlowContext(AppToFlowRowKey appToFlowRowKey,
+      Configuration hbaseConf, Connection conn) throws IOException {
+    byte[] rowKey = appToFlowRowKey.getRowKey();
+    Get get = new Get(rowKey);
+    Result result = appToFlowTable.getResult(hbaseConf, conn, get);
+    if (result != null && !result.isEmpty()) {
+      return new FlowContext(AppToFlowColumn.USER_ID.readResult(result)
+          .toString(), AppToFlowColumn.FLOW_ID.readResult(result).toString(),
+          ((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result))
+          .longValue());
+    } else {
+      throw new NotFoundException(
+          "Unable to find the context flow ID and flow run ID for clusterId="
+              + appToFlowRowKey.getClusterId() + ", appId="
+              + appToFlowRowKey.getAppId());
+    }
+  }
+
+  /**
+   * Encapsulates flow context information.
+   */
+  protected static class FlowContext {
+    private final String userId;
+    private final String flowName;
+    private final Long flowRunId;
+
+    public FlowContext(String user, String flowName, Long flowRunId) {
+      this.userId = user;
+      this.flowName = flowName;
+      this.flowRunId = flowRunId;
+    }
+
+    protected String getUserId() {
+      return userId;
+    }
+
+    protected String getFlowName() {
+      return flowName;
+    }
+
+    protected Long getFlowRunId() {
+      return flowRunId;
+    }
+  }
+
+  @Override
+  protected void validateParams() {
+    Preconditions.checkNotNull(getContext(), "context shouldn't be null");
+    Preconditions.checkNotNull(getDataToRetrieve(),
+        "data to retrieve shouldn't be null");
+    Preconditions.checkNotNull(getContext().getClusterId(),
+        "clusterId shouldn't be null");
+    Preconditions.checkNotNull(getContext().getAppId(),
+        "appId shouldn't be null");
+    Preconditions.checkNotNull(getContext().getEntityType(),
+        "entityType shouldn't be null");
+    if (isSingleEntityRead()) {
+      Preconditions.checkNotNull(getContext().getEntityId(),
+          "entityId shouldn't be null");
+    }
+  }
+
+  @Override
+  protected void augmentParams(Configuration hbaseConf, Connection conn)
+      throws IOException {
+    TimelineReaderContext context = getContext();
+    // In reality all three should be null or neither should be null
+    if (context.getFlowName() == null || context.getFlowRunId() == null
+        || context.getUserId() == null) {
+      // Get flow context information from AppToFlow table.
+      AppToFlowRowKey appToFlowRowKey =
+          new AppToFlowRowKey(context.getClusterId(), context.getAppId());
+      FlowContext flowContext =
+          lookupFlowContext(appToFlowRowKey, hbaseConf, conn);
+      context.setFlowName(flowContext.flowName);
+      context.setFlowRunId(flowContext.flowRunId);
+      context.setUserId(flowContext.userId);
+    }
+    // Add configs/metrics to fields to retrieve if confsToRetrieve and/or
+    // metricsToRetrieve are specified.
+    getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve();
+    if (!isSingleEntityRead()) {
+      createFiltersIfNull();
+    }
+  }
+
+  @Override
+  protected Result getResult(Configuration hbaseConf, Connection conn,
+      FilterList filterList) throws IOException {
+    TimelineReaderContext context = getContext();
+    byte[] rowKey =
+        new EntityRowKey(context.getClusterId(), context.getUserId(),
+            context.getFlowName(), context.getFlowRunId(), context.getAppId(),
+            context.getEntityType(), context.getEntityId()).getRowKey();
+    Get get = new Get(rowKey);
+    get.setMaxVersions(getDataToRetrieve().getMetricsLimit());
+    if (filterList != null && !filterList.getFilters().isEmpty()) {
+      get.setFilter(filterList);
+    }
+    return getTable().getResult(hbaseConf, conn, get);
+  }
+
+  @Override
+  protected ResultScanner getResults(Configuration hbaseConf, Connection conn,
+      FilterList filterList) throws IOException {
+    // Scan through part of the table to find the entities belong to one app
+    // and one type
+    Scan scan = new Scan();
+    TimelineReaderContext context = getContext();
+    RowKeyPrefix<EntityRowKey> entityRowKeyPrefix =
+        new EntityRowKeyPrefix(context.getClusterId(), context.getUserId(),
+            context.getFlowName(), context.getFlowRunId(), context.getAppId(),
+            context.getEntityType());
+    scan.setRowPrefixFilter(entityRowKeyPrefix.getRowKeyPrefix());
+    scan.setMaxVersions(getDataToRetrieve().getMetricsLimit());
+    if (filterList != null && !filterList.getFilters().isEmpty()) {
+      scan.setFilter(filterList);
+    }
+    return getTable().getResultScanner(hbaseConf, conn, scan);
+  }
+
+  @Override
+  protected TimelineEntity parseEntity(Result result) throws IOException {
+    if (result == null || result.isEmpty()) {
+      return null;
+    }
+    TimelineEntity entity = new TimelineEntity();
+    String entityType = EntityColumn.TYPE.readResult(result).toString();
+    entity.setType(entityType);
+    String entityId = EntityColumn.ID.readResult(result).toString();
+    entity.setId(entityId);
+
+    TimelineEntityFilters filters = getFilters();
+    // fetch created time
+    Long createdTime = (Long) EntityColumn.CREATED_TIME.readResult(result);
+    entity.setCreatedTime(createdTime);
+
+    EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
+    // fetch is related to entities and match isRelatedTo filter. If isRelatedTo
+    // filters do not match, entity would be dropped. We have to match filters
+    // locally as relevant HBase filters to filter out rows on the basis of
+    // isRelatedTo are not set in HBase scan.
+    boolean checkIsRelatedTo =
+        !isSingleEntityRead() && filters.getIsRelatedTo() != null
+            && filters.getIsRelatedTo().getFilterList().size() > 0;
+    if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || checkIsRelatedTo) {
+      readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true);
+      if (checkIsRelatedTo
+          && !TimelineStorageUtils.matchIsRelatedTo(entity,
+              filters.getIsRelatedTo())) {
+        return null;
+      }
+      if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
+        entity.getIsRelatedToEntities().clear();
+      }
+    }
+
+    // fetch relates to entities and match relatesTo filter. If relatesTo
+    // filters do not match, entity would be dropped. We have to match filters
+    // locally as relevant HBase filters to filter out rows on the basis of
+    // relatesTo are not set in HBase scan.
+    boolean checkRelatesTo =
+        !isSingleEntityRead() && filters.getRelatesTo() != null
+            && filters.getRelatesTo().getFilterList().size() > 0;
+    if (hasField(fieldsToRetrieve, Field.RELATES_TO)
+        || checkRelatesTo) {
+      readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false);
+      if (checkRelatesTo
+          && !TimelineStorageUtils.matchRelatesTo(entity,
+              filters.getRelatesTo())) {
+        return null;
+      }
+      if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) {
+        entity.getRelatesToEntities().clear();
+      }
+    }
+
+    // fetch info if fieldsToRetrieve contains INFO or ALL.
+    if (hasField(fieldsToRetrieve, Field.INFO)) {
+      readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false);
+    }
+
+    // fetch configs if fieldsToRetrieve contains CONFIGS or ALL.
+    if (hasField(fieldsToRetrieve, Field.CONFIGS)) {
+      readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true);
+    }
+
+    // fetch events and match event filters if they exist. If event filters do
+    // not match, entity would be dropped. We have to match filters locally
+    // as relevant HBase filters to filter out rows on the basis of events
+    // are not set in HBase scan.
+    boolean checkEvents =
+        !isSingleEntityRead() && filters.getEventFilters() != null
+            && filters.getEventFilters().getFilterList().size() > 0;
+    if (hasField(fieldsToRetrieve, Field.EVENTS) || checkEvents) {
+      readEvents(entity, result, EntityColumnPrefix.EVENT);
+      if (checkEvents
+          && !TimelineStorageUtils.matchEventFilters(entity,
+              filters.getEventFilters())) {
+        return null;
+      }
+      if (!hasField(fieldsToRetrieve, Field.EVENTS)) {
+        entity.getEvents().clear();
+      }
+    }
+
+    // fetch metrics if fieldsToRetrieve contains METRICS or ALL.
+    if (hasField(fieldsToRetrieve, Field.METRICS)) {
+      readMetrics(entity, result, EntityColumnPrefix.METRIC);
+    }
+    return entity;
+  }
+
+  /**
+   * Helper method for reading key-value pairs for either info or config.
+   *
+   * @param <T> Describes the type of column prefix.
+   * @param entity entity to fill.
+   * @param result result from HBase.
+   * @param prefix column prefix.
+   * @param isConfig if true, means we are reading configs, otherwise info.
+   * @throws IOException if any problem is encountered while reading result.
+   */
+  protected <T> void readKeyValuePairs(TimelineEntity entity, Result result,
+      ColumnPrefix<T> prefix, boolean isConfig) throws IOException {
+    // info and configuration are of type Map<String, Object or String>
+    Map<String, Object> columns =
+        prefix.readResults(result, stringKeyConverter);
+    if (isConfig) {
+      for (Map.Entry<String, Object> column : columns.entrySet()) {
+        entity.addConfig(column.getKey(), column.getValue().toString());
+      }
+    } else {
+      entity.addInfo(columns);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
new file mode 100644
index 0000000..7b294a8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
@@ -0,0 +1,496 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
+
+/**
+ * The base class for reading and deserializing timeline entities from the
+ * HBase storage. Different types can be defined for different types of the
+ * entities that are being requested.
+ */
+public abstract class TimelineEntityReader {
+  private static final Log LOG = LogFactory.getLog(TimelineEntityReader.class);
+
+  private final boolean singleEntityRead;
+  private TimelineReaderContext context;
+  private TimelineDataToRetrieve dataToRetrieve;
+  // used only for multiple entity read mode
+  private TimelineEntityFilters filters;
+
+  /**
+   * Main table the entity reader uses.
+   */
+  private BaseTable<?> table;
+
+  /**
+   * Specifies whether keys for this table are sorted in a manner where entities
+   * can be retrieved by created time. If true, it will be sufficient to collect
+   * the first results as specified by the limit. Otherwise all matched entities
+   * will be fetched and then limit applied.
+   */
+  private boolean sortedKeys = false;
+
+  /**
+   * Used to convert strings key components to and from storage format.
+   */
+  private final KeyConverter<String> stringKeyConverter =
+      new StringKeyConverter();
+
+  /**
+   * Instantiates a reader for multiple-entity reads.
+   *
+   * @param ctxt Reader context which defines the scope in which query has to be
+   *     made.
+   * @param entityFilters Filters which limit the entities returned.
+   * @param toRetrieve Data to retrieve for each entity.
+   * @param sortedKeys Specifies whether key for this table are sorted or not.
+   *     If sorted, entities can be retrieved by created time.
+   */
+  protected TimelineEntityReader(TimelineReaderContext ctxt,
+      TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve,
+      boolean sortedKeys) {
+    this.singleEntityRead = false;
+    this.sortedKeys = sortedKeys;
+    this.context = ctxt;
+    this.dataToRetrieve = toRetrieve;
+    this.filters = entityFilters;
+
+    this.setTable(getTable());
+  }
+
+  /**
+   * Instantiates a reader for single-entity reads.
+   *
+   * @param ctxt Reader context which defines the scope in which query has to be
+   *     made.
+   * @param toRetrieve Data to retrieve for each entity.
+   */
+  protected TimelineEntityReader(TimelineReaderContext ctxt,
+      TimelineDataToRetrieve toRetrieve) {
+    this.singleEntityRead = true;
+    this.context = ctxt;
+    this.dataToRetrieve = toRetrieve;
+
+    this.setTable(getTable());
+  }
+
+  /**
+   * Creates a {@link FilterList} based on fields, confs and metrics to
+   * retrieve. This filter list will be set in Scan/Get objects to trim down
+   * results fetched from HBase back-end storage. This is called only for
+   * multiple entity reads.
+   *
+   * @return a {@link FilterList} object.
+   * @throws IOException if any problem occurs while creating filter list.
+   */
+  protected abstract FilterList constructFilterListBasedOnFields()
+      throws IOException;
+
+  /**
+   * Creates a {@link FilterList} based on info, config and metric filters. This
+   * filter list will be set in HBase Get to trim down results fetched from
+   * HBase back-end storage.
+   *
+   * @return a {@link FilterList} object.
+   * @throws IOException if any problem occurs while creating filter list.
+   */
+  protected abstract FilterList constructFilterListBasedOnFilters()
+      throws IOException;
+
+  /**
+   * Combines filter lists created based on fields and based on filters.
+   *
+   * @return a {@link FilterList} object if it can be constructed. Returns null,
+   * if filter list cannot be created either on the basis of filters or on the
+   * basis of fields.
+   * @throws IOException if any problem occurs while creating filter list.
+   */
+  private FilterList createFilterList() throws IOException {
+    FilterList listBasedOnFilters = constructFilterListBasedOnFilters();
+    boolean hasListBasedOnFilters = listBasedOnFilters != null &&
+        !listBasedOnFilters.getFilters().isEmpty();
+    FilterList listBasedOnFields = constructFilterListBasedOnFields();
+    boolean hasListBasedOnFields = listBasedOnFields != null &&
+        !listBasedOnFields.getFilters().isEmpty();
+    // If filter lists based on both filters and fields can be created,
+    // combine them in a new filter list and return it.
+    // If either one of them has been created, return that filter list.
+    // Return null, if none of the filter lists can be created. This indicates
+    // that no filter list needs to be added to HBase Scan as filters are not
+    // specified for the query or only the default view of entity needs to be
+    // returned.
+    if (hasListBasedOnFilters && hasListBasedOnFields) {
+      FilterList list = new FilterList();
+      list.addFilter(listBasedOnFilters);
+      list.addFilter(listBasedOnFields);
+      return list;
+    } else if (hasListBasedOnFilters) {
+      return listBasedOnFilters;
+    } else if (hasListBasedOnFields) {
+      return listBasedOnFields;
+    }
+    return null;
+  }
+
+  protected TimelineReaderContext getContext() {
+    return context;
+  }
+
+  protected TimelineDataToRetrieve getDataToRetrieve() {
+    return dataToRetrieve;
+  }
+
+  protected TimelineEntityFilters getFilters() {
+    return filters;
+  }
+
+  /**
+   * Create a {@link TimelineEntityFilters} object with default values for
+   * filters.
+   */
+  protected void createFiltersIfNull() {
+    if (filters == null) {
+      filters = new TimelineEntityFilters();
+    }
+  }
+
+  /**
+   * Reads and deserializes a single timeline entity from the HBase storage.
+   *
+   * @param hbaseConf HBase Configuration.
+   * @param conn HBase Connection.
+   * @return A <cite>TimelineEntity</cite> object.
+   * @throws IOException if there is any exception encountered while reading
+   *     entity.
+   */
+  public TimelineEntity readEntity(Configuration hbaseConf, Connection conn)
+      throws IOException {
+    validateParams();
+    augmentParams(hbaseConf, conn);
+
+    FilterList filterList = constructFilterListBasedOnFields();
+    if (LOG.isDebugEnabled() && filterList != null) {
+      LOG.debug("FilterList created for get is - " + filterList);
+    }
+    Result result = getResult(hbaseConf, conn, filterList);
+    if (result == null || result.isEmpty()) {
+      // Could not find a matching row.
+      LOG.info("Cannot find matching entity of type " +
+          context.getEntityType());
+      return null;
+    }
+    return parseEntity(result);
+  }
+
+  /**
+   * Reads and deserializes a set of timeline entities from the HBase storage.
+   * It goes through all the results available, and returns the number of
+   * entries as specified in the limit in the entity's natural sort order.
+   *
+   * @param hbaseConf HBase Configuration.
+   * @param conn HBase Connection.
+   * @return a set of <cite>TimelineEntity</cite> objects.
+   * @throws IOException if any exception is encountered while reading entities.
+   */
+  public Set<TimelineEntity> readEntities(Configuration hbaseConf,
+      Connection conn) throws IOException {
+    validateParams();
+    augmentParams(hbaseConf, conn);
+
+    NavigableSet<TimelineEntity> entities = new TreeSet<>();
+    FilterList filterList = createFilterList();
+    if (LOG.isDebugEnabled() && filterList != null) {
+      LOG.debug("FilterList created for scan is - " + filterList);
+    }
+    ResultScanner results = getResults(hbaseConf, conn, filterList);
+    try {
+      for (Result result : results) {
+        TimelineEntity entity = parseEntity(result);
+        if (entity == null) {
+          continue;
+        }
+        entities.add(entity);
+        if (!sortedKeys) {
+          if (entities.size() > filters.getLimit()) {
+            entities.pollLast();
+          }
+        } else {
+          if (entities.size() == filters.getLimit()) {
+            break;
+          }
+        }
+      }
+      return entities;
+    } finally {
+      results.close();
+    }
+  }
+
+  /**
+   * Returns the main table to be used by the entity reader.
+   *
+   * @return A reference to the table.
+   */
+  protected BaseTable<?> getTable() {
+    return table;
+  }
+
+  /**
+   * Validates the required parameters to read the entities.
+   */
+  protected abstract void validateParams();
+
+  /**
+   * Sets certain parameters to defaults if the values are not provided.
+   *
+   * @param hbaseConf HBase Configuration.
+   * @param conn HBase Connection.
+   * @throws IOException if any exception is encountered while setting params.
+   */
+  protected abstract void augmentParams(Configuration hbaseConf,
+      Connection conn) throws IOException;
+
+  /**
+   * Fetches a {@link Result} instance for a single-entity read.
+   *
+   * @param hbaseConf HBase Configuration.
+   * @param conn HBase Connection.
+   * @param filterList filter list which will be applied to HBase Get.
+   * @return the {@link Result} instance or null if no such record is found.
+   * @throws IOException if any exception is encountered while getting result.
+   */
+  protected abstract Result getResult(Configuration hbaseConf, Connection conn,
+      FilterList filterList) throws IOException;
+
+  /**
+   * Fetches a {@link ResultScanner} for a multi-entity read.
+   *
+   * @param hbaseConf HBase Configuration.
+   * @param conn HBase Connection.
+   * @param filterList filter list which will be applied to HBase Scan.
+   * @return the {@link ResultScanner} instance.
+   * @throws IOException if any exception is encountered while getting results.
+   */
+  protected abstract ResultScanner getResults(Configuration hbaseConf,
+      Connection conn, FilterList filterList) throws IOException;
+
+  /**
+   * Parses the result retrieved from HBase backend and convert it into a
+   * {@link TimelineEntity} object.
+   *
+   * @param result Single row result of a Get/Scan.
+   * @return the <cite>TimelineEntity</cite> instance or null if the entity is
+   *     filtered.
+   * @throws IOException if any exception is encountered while parsing entity.
+   */
+  protected abstract TimelineEntity parseEntity(Result result)
+      throws IOException;
+
+  /**
+   * Helper method for reading and deserializing {@link TimelineMetric} objects
+   * using the specified column prefix. The timeline metrics then are added to
+   * the given timeline entity.
+   *
+   * @param entity {@link TimelineEntity} object.
+   * @param result {@link Result} object retrieved from backend.
+   * @param columnPrefix Metric column prefix
+   * @throws IOException if any exception is encountered while reading metrics.
+   */
+  protected void readMetrics(TimelineEntity entity, Result result,
+      ColumnPrefix<?> columnPrefix) throws IOException {
+    NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
+        columnPrefix.readResultsWithTimestamps(
+            result, stringKeyConverter);
+    for (Map.Entry<String, NavigableMap<Long, Number>> metricResult:
+        metricsResult.entrySet()) {
+      TimelineMetric metric = new TimelineMetric();
+      metric.setId(metricResult.getKey());
+      // Simply assume that if the value set contains more than 1 elements, the
+      // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric
+      TimelineMetric.Type metricType = metricResult.getValue().size() > 1 ?
+          TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE;
+      metric.setType(metricType);
+      metric.addValues(metricResult.getValue());
+      entity.addMetric(metric);
+    }
+  }
+
+  /**
+   * Checks whether the reader has been created to fetch single entity or
+   * multiple entities.
+   *
+   * @return true, if query is for single entity, false otherwise.
+   */
+  public boolean isSingleEntityRead() {
+    return singleEntityRead;
+  }
+
+  protected void setTable(BaseTable<?> baseTable) {
+    this.table = baseTable;
+  }
+
+  /**
+   * Check if we have a certain field amongst fields to retrieve. This method
+   * checks against {@link Field#ALL} as well because that would mean field
+   * passed needs to be matched.
+   *
+   * @param fieldsToRetrieve fields to be retrieved.
+   * @param requiredField fields to be checked in fieldsToRetrieve.
+   * @return true if has the required field, false otherwise.
+   */
+  protected boolean hasField(EnumSet<Field> fieldsToRetrieve,
+      Field requiredField) {
+    return fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(requiredField);
+  }
+
+  /**
+   * Create a filter list of qualifier filters based on passed set of columns.
+   *
+   * @param <T> Describes the type of column prefix.
+   * @param colPrefix Column Prefix.
+   * @param columns set of column qualifiers.
+   * @return filter list.
+   */
+  protected <T> FilterList createFiltersFromColumnQualifiers(
+      ColumnPrefix<T> colPrefix, Set<String> columns) {
+    FilterList list = new FilterList(Operator.MUST_PASS_ONE);
+    for (String column : columns) {
+      // For columns which have compound column qualifiers (eg. events), we need
+      // to include the required separator.
+      byte[] compoundColQual = createColQualifierPrefix(colPrefix, column);
+      list.addFilter(new QualifierFilter(CompareOp.EQUAL,
+          new BinaryPrefixComparator(colPrefix
+              .getColumnPrefixBytes(compoundColQual))));
+    }
+    return list;
+  }
+
+  protected <T> byte[] createColQualifierPrefix(ColumnPrefix<T> colPrefix,
+      String column) {
+    if (colPrefix == ApplicationColumnPrefix.EVENT
+        || colPrefix == EntityColumnPrefix.EVENT) {
+      return new EventColumnName(column, null, null).getColumnQualifier();
+    } else {
+      return stringKeyConverter.encode(column);
+    }
+  }
+
+  /**
+   * Helper method for reading relationship.
+   *
+   * @param <T> Describes the type of column prefix.
+   * @param entity entity to fill.
+   * @param result result from HBase.
+   * @param prefix column prefix.
+   * @param isRelatedTo if true, means relationship is to be added to
+   *          isRelatedTo, otherwise its added to relatesTo.
+   * @throws IOException if any problem is encountered while reading result.
+   */
+  protected <T> void readRelationship(TimelineEntity entity, Result result,
+      ColumnPrefix<T> prefix, boolean isRelatedTo) throws IOException {
+    // isRelatedTo and relatesTo are of type Map<String, Set<String>>
+    Map<String, Object> columns =
+        prefix.readResults(result, stringKeyConverter);
+    for (Map.Entry<String, Object> column : columns.entrySet()) {
+      for (String id : Separator.VALUES.splitEncoded(column.getValue()
+          .toString())) {
+        if (isRelatedTo) {
+          entity.addIsRelatedToEntity(column.getKey(), id);
+        } else {
+          entity.addRelatesToEntity(column.getKey(), id);
+        }
+      }
+    }
+  }
+
+  /**
+   * Read events from the entity table or the application table. The column name
+   * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted
+   * if there is no info associated with the event.
+   *
+   * @param <T> Describes the type of column prefix.
+   * @param entity entity to fill.
+   * @param result HBase Result.
+   * @param prefix column prefix.
+   * @throws IOException if any problem is encountered while reading result.
+   */
+  protected static <T> void readEvents(TimelineEntity entity, Result result,
+      ColumnPrefix<T> prefix) throws IOException {
+    Map<String, TimelineEvent> eventsMap = new HashMap<>();
+    Map<EventColumnName, Object> eventsResult =
+        prefix.readResults(result, new EventColumnNameConverter());
+    for (Map.Entry<EventColumnName, Object>
+             eventResult : eventsResult.entrySet()) {
+      EventColumnName eventColumnName = eventResult.getKey();
+      String key = eventColumnName.getId() +
+          Long.toString(eventColumnName.getTimestamp());
+      // Retrieve previously seen event to add to it
+      TimelineEvent event = eventsMap.get(key);
+      if (event == null) {
+        // First time we're seeing this event, add it to the eventsMap
+        event = new TimelineEvent();
+        event.setId(eventColumnName.getId());
+        event.setTimestamp(eventColumnName.getTimestamp());
+        eventsMap.put(key, event);
+      }
+      if (eventColumnName.getInfoKey() != null) {
+        event.addInfo(eventColumnName.getInfoKey(), eventResult.getValue());
+      }
+    }
+    Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
+    entity.addEvents(eventsSet);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java
new file mode 100644
index 0000000..b2a9476
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+
+/**
+ * Factory methods for instantiating a timeline entity reader.
+ */
+public final class TimelineEntityReaderFactory {
+  private TimelineEntityReaderFactory() {
+  }
+
+  /**
+   * Creates a timeline entity reader instance for reading a single entity with
+   * the specified input.
+   *
+   * @param context Reader context which defines the scope in which query has to
+   *     be made.
+   * @param dataToRetrieve Data to retrieve for each entity.
+   * @return An implementation of <cite>TimelineEntityReader</cite> object
+   *     depending on entity type.
+   */
+  public static TimelineEntityReader createSingleEntityReader(
+      TimelineReaderContext context, TimelineDataToRetrieve dataToRetrieve) {
+    // currently the types that are handled separate from the generic entity
+    // table are application, flow run, and flow activity entities
+    if (TimelineEntityType.YARN_APPLICATION.matches(context.getEntityType())) {
+      return new ApplicationEntityReader(context, dataToRetrieve);
+    } else if (TimelineEntityType.
+        YARN_FLOW_RUN.matches(context.getEntityType())) {
+      return new FlowRunEntityReader(context, dataToRetrieve);
+    } else if (TimelineEntityType.
+        YARN_FLOW_ACTIVITY.matches(context.getEntityType())) {
+      return new FlowActivityEntityReader(context, dataToRetrieve);
+    } else {
+      // assume we're dealing with a generic entity read
+      return new GenericEntityReader(context, dataToRetrieve);
+    }
+  }
+
+  /**
+   * Creates a timeline entity reader instance for reading set of entities with
+   * the specified input and predicates.
+   *
+   * @param context Reader context which defines the scope in which query has to
+   *     be made.
+   * @param filters Filters which limit the entities returned.
+   * @param dataToRetrieve Data to retrieve for each entity.
+   * @return An implementation of <cite>TimelineEntityReader</cite> object
+   *     depending on entity type.
+   */
+  public static TimelineEntityReader createMultipleEntitiesReader(
+      TimelineReaderContext context, TimelineEntityFilters filters,
+      TimelineDataToRetrieve dataToRetrieve) {
+    // currently the types that are handled separate from the generic entity
+    // table are application, flow run, and flow activity entities
+    if (TimelineEntityType.YARN_APPLICATION.matches(context.getEntityType())) {
+      return new ApplicationEntityReader(context, filters, dataToRetrieve);
+    } else if (TimelineEntityType.
+        YARN_FLOW_ACTIVITY.matches(context.getEntityType())) {
+      return new FlowActivityEntityReader(context, filters, dataToRetrieve);
+    } else if (TimelineEntityType.
+        YARN_FLOW_RUN.matches(context.getEntityType())) {
+      return new FlowRunEntityReader(context, filters, dataToRetrieve);
+    } else {
+      // assume we're dealing with a generic entity read
+      return new GenericEntityReader(context, filters, dataToRetrieve, false);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java
new file mode 100644
index 0000000..9814d6d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.storage.reader
+ * contains classes used to read entities from backend based on query type.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java
new file mode 100644
index 0000000..58df970
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.junit.Test;
+
+public class TestKeyConverters {
+
+  @Test
+  public void testAppIdKeyConverter() {
+    AppIdKeyConverter appIdKeyConverter = new AppIdKeyConverter();
+    long currentTs = System.currentTimeMillis();
+    ApplicationId appId1 = ApplicationId.newInstance(currentTs, 1);
+    ApplicationId appId2 = ApplicationId.newInstance(currentTs, 2);
+    ApplicationId appId3 = ApplicationId.newInstance(currentTs + 300, 1);
+    String appIdStr1 = appId1.toString();
+    String appIdStr2 = appId2.toString();
+    String appIdStr3 = appId3.toString();
+    byte[] appIdBytes1 = appIdKeyConverter.encode(appIdStr1);
+    byte[] appIdBytes2 = appIdKeyConverter.encode(appIdStr2);
+    byte[] appIdBytes3 = appIdKeyConverter.encode(appIdStr3);
+    // App ids' should be encoded in a manner wherein descending order
+    // is maintained.
+    assertTrue(
+        "Ordering of app ids' is incorrect",
+        Bytes.compareTo(appIdBytes1, appIdBytes2) > 0
+            && Bytes.compareTo(appIdBytes1, appIdBytes3) > 0
+            && Bytes.compareTo(appIdBytes2, appIdBytes3) > 0);
+    String decodedAppId1 = appIdKeyConverter.decode(appIdBytes1);
+    String decodedAppId2 = appIdKeyConverter.decode(appIdBytes2);
+    String decodedAppId3 = appIdKeyConverter.decode(appIdBytes3);
+    assertTrue("Decoded app id is not same as the app id encoded",
+        appIdStr1.equals(decodedAppId1));
+    assertTrue("Decoded app id is not same as the app id encoded",
+        appIdStr2.equals(decodedAppId2));
+    assertTrue("Decoded app id is not same as the app id encoded",
+        appIdStr3.equals(decodedAppId3));
+  }
+
+  @Test
+  public void testEventColumnNameConverter() {
+    String eventId = "=foo_=eve=nt=";
+    byte[] valSepBytes = Bytes.toBytes(Separator.VALUES.getValue());
+    byte[] maxByteArr =
+        Bytes.createMaxByteArray(Bytes.SIZEOF_LONG - valSepBytes.length);
+    byte[] ts = Bytes.add(valSepBytes, maxByteArr);
+    Long eventTs = Bytes.toLong(ts);
+    byte[] byteEventColName =
+        new EventColumnName(eventId, eventTs, null).getColumnQualifier();
+    KeyConverter<EventColumnName> eventColumnNameConverter =
+        new EventColumnNameConverter();
+    EventColumnName eventColName =
+        eventColumnNameConverter.decode(byteEventColName);
+    assertEquals(eventId, eventColName.getId());
+    assertEquals(eventTs, eventColName.getTimestamp());
+    assertNull(eventColName.getInfoKey());
+
+    String infoKey = "f=oo_event_in=fo=_key";
+    byteEventColName =
+        new EventColumnName(eventId, eventTs, infoKey).getColumnQualifier();
+    eventColName = eventColumnNameConverter.decode(byteEventColName);
+    assertEquals(eventId, eventColName.getId());
+    assertEquals(eventTs, eventColName.getTimestamp());
+    assertEquals(infoKey, eventColName.getInfoKey());
+  }
+
+  @Test
+  public void testLongKeyConverter() {
+    LongKeyConverter longKeyConverter = new LongKeyConverter();
+    confirmLongKeyConverter(longKeyConverter, Long.MIN_VALUE);
+    confirmLongKeyConverter(longKeyConverter, -1234567890L);
+    confirmLongKeyConverter(longKeyConverter, -128L);
+    confirmLongKeyConverter(longKeyConverter, -127L);
+    confirmLongKeyConverter(longKeyConverter, -1L);
+    confirmLongKeyConverter(longKeyConverter, 0L);
+    confirmLongKeyConverter(longKeyConverter, 1L);
+    confirmLongKeyConverter(longKeyConverter, 127L);
+    confirmLongKeyConverter(longKeyConverter, 128L);
+    confirmLongKeyConverter(longKeyConverter, 1234567890L);
+    confirmLongKeyConverter(longKeyConverter, Long.MAX_VALUE);
+  }
+
+  private void confirmLongKeyConverter(LongKeyConverter longKeyConverter,
+      Long testValue) {
+    Long decoded = longKeyConverter.decode(longKeyConverter.encode(testValue));
+    assertEquals(testValue, decoded);
+  }
+
+  @Test
+  public void testStringKeyConverter() {
+    StringKeyConverter stringKeyConverter = new StringKeyConverter();
+    String phrase = "QuackAttack now!";
+
+    for (int i = 0; i < phrase.length(); i++) {
+      String sub = phrase.substring(i, phrase.length());
+      confirmStrignKeyConverter(stringKeyConverter, sub);
+      confirmStrignKeyConverter(stringKeyConverter, sub + sub);
+    }
+  }
+
+  private void confirmStrignKeyConverter(StringKeyConverter stringKeyConverter,
+      String testValue) {
+    String decoded =
+        stringKeyConverter.decode(stringKeyConverter.encode(testValue));
+    assertEquals(testValue, decoded);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
new file mode 100644
index 0000000..5beb189
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
+import org.junit.Test;
+
+
+public class TestRowKeys {
+
+  private final static String QUALIFIER_SEP = Separator.QUALIFIERS.getValue();
+  private final static byte[] QUALIFIER_SEP_BYTES = Bytes
+      .toBytes(QUALIFIER_SEP);
+  private final static String CLUSTER = "cl" + QUALIFIER_SEP + "uster";
+  private final static String USER = QUALIFIER_SEP + "user";
+  private final static String FLOW_NAME = "dummy_" + QUALIFIER_SEP + "flow"
+      + QUALIFIER_SEP;
+  private final static Long FLOW_RUN_ID;
+  private final static String APPLICATION_ID;
+  static {
+    long runid = Long.MAX_VALUE - 900L;
+    byte[] longMaxByteArr = Bytes.toBytes(Long.MAX_VALUE);
+    byte[] byteArr = Bytes.toBytes(runid);
+    int sepByteLen = QUALIFIER_SEP_BYTES.length;
+    if (sepByteLen <= byteArr.length) {
+      for (int i = 0; i < sepByteLen; i++) {
+        byteArr[i] = (byte) (longMaxByteArr[i] - QUALIFIER_SEP_BYTES[i]);
+      }
+    }
+    FLOW_RUN_ID = Bytes.toLong(byteArr);
+    long clusterTs = System.currentTimeMillis();
+    byteArr = Bytes.toBytes(clusterTs);
+    if (sepByteLen <= byteArr.length) {
+      for (int i = 0; i < sepByteLen; i++) {
+        byteArr[byteArr.length - sepByteLen + i] =
+            (byte) (longMaxByteArr[byteArr.length - sepByteLen + i] -
+                QUALIFIER_SEP_BYTES[i]);
+      }
+    }
+    clusterTs = Bytes.toLong(byteArr);
+    int seqId = 222;
+    APPLICATION_ID = ApplicationId.newInstance(clusterTs, seqId).toString();
+  }
+
+  private static void verifyRowPrefixBytes(byte[] byteRowKeyPrefix) {
+    int sepLen = QUALIFIER_SEP_BYTES.length;
+    for (int i = 0; i < sepLen; i++) {
+      assertTrue(
+          "Row key prefix not encoded properly.",
+          byteRowKeyPrefix[byteRowKeyPrefix.length - sepLen + i] ==
+              QUALIFIER_SEP_BYTES[i]);
+    }
+  }
+
+  @Test
+  public void testApplicationRowKey() {
+    byte[] byteRowKey =
+        new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID,
+            APPLICATION_ID).getRowKey();
+    ApplicationRowKey rowKey = ApplicationRowKey.parseRowKey(byteRowKey);
+    assertEquals(CLUSTER, rowKey.getClusterId());
+    assertEquals(USER, rowKey.getUserId());
+    assertEquals(FLOW_NAME, rowKey.getFlowName());
+    assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
+    assertEquals(APPLICATION_ID, rowKey.getAppId());
+
+    byte[] byteRowKeyPrefix =
+        new ApplicationRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID)
+            .getRowKeyPrefix();
+    byte[][] splits =
+        Separator.QUALIFIERS.split(byteRowKeyPrefix,
+            new int[] {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+                Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
+                Separator.VARIABLE_SIZE});
+    assertEquals(5, splits.length);
+    assertEquals(0, splits[4].length);
+    assertEquals(FLOW_NAME,
+        Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
+    assertEquals(FLOW_RUN_ID,
+        (Long) LongConverter.invertLong(Bytes.toLong(splits[3])));
+    verifyRowPrefixBytes(byteRowKeyPrefix);
+
+    byteRowKeyPrefix =
+        new ApplicationRowKeyPrefix(CLUSTER, USER, FLOW_NAME).getRowKeyPrefix();
+    splits =
+        Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
+            Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+            Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE });
+    assertEquals(4, splits.length);
+    assertEquals(0, splits[3].length);
+    assertEquals(FLOW_NAME,
+        Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
+    verifyRowPrefixBytes(byteRowKeyPrefix);
+  }
+
+  /**
+   * Tests the converters indirectly through the public methods of the
+   * corresponding rowkey.
+   */
+  @Test
+  public void testAppToFlowRowKey() {
+    byte[] byteRowKey = new AppToFlowRowKey(CLUSTER,
+        APPLICATION_ID).getRowKey();
+    AppToFlowRowKey rowKey = AppToFlowRowKey.parseRowKey(byteRowKey);
+    assertEquals(CLUSTER, rowKey.getClusterId());
+    assertEquals(APPLICATION_ID, rowKey.getAppId());
+  }
+
+  @Test
+  public void testEntityRowKey() {
+    String entityId = "!ent!ity!!id!";
+    String entityType = "entity!Type";
+    byte[] byteRowKey =
+        new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID,
+            entityType, entityId).getRowKey();
+    EntityRowKey rowKey = EntityRowKey.parseRowKey(byteRowKey);
+    assertEquals(CLUSTER, rowKey.getClusterId());
+    assertEquals(USER, rowKey.getUserId());
+    assertEquals(FLOW_NAME, rowKey.getFlowName());
+    assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
+    assertEquals(APPLICATION_ID, rowKey.getAppId());
+    assertEquals(entityType, rowKey.getEntityType());
+    assertEquals(entityId, rowKey.getEntityId());
+
+    byte[] byteRowKeyPrefix =
+        new EntityRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID,
+            APPLICATION_ID, entityType).getRowKeyPrefix();
+    byte[][] splits =
+        Separator.QUALIFIERS.split(
+            byteRowKeyPrefix,
+            new int[] {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+                Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
+                AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE,
+                Separator.VARIABLE_SIZE});
+    assertEquals(7, splits.length);
+    assertEquals(0, splits[6].length);
+    assertEquals(APPLICATION_ID, new AppIdKeyConverter().decode(splits[4]));
+    assertEquals(entityType,
+        Separator.QUALIFIERS.decode(Bytes.toString(splits[5])));
+    verifyRowPrefixBytes(byteRowKeyPrefix);
+
+    byteRowKeyPrefix =
+        new EntityRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID,
+            APPLICATION_ID).getRowKeyPrefix();
+    splits =
+        Separator.QUALIFIERS.split(
+            byteRowKeyPrefix,
+            new int[] {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+                Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
+                AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE});
+    assertEquals(6, splits.length);
+    assertEquals(0, splits[5].length);
+    AppIdKeyConverter appIdKeyConverter = new AppIdKeyConverter();
+    assertEquals(APPLICATION_ID, appIdKeyConverter.decode(splits[4]));
+    verifyRowPrefixBytes(byteRowKeyPrefix);
+  }
+
+  @Test
+  public void testFlowActivityRowKey() {
+    Long ts = 1459900830000L;
+    Long dayTimestamp = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(ts);
+    byte[] byteRowKey =
+        new FlowActivityRowKey(CLUSTER, ts, USER, FLOW_NAME).getRowKey();
+    FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(byteRowKey);
+    assertEquals(CLUSTER, rowKey.getClusterId());
+    assertEquals(dayTimestamp, rowKey.getDayTimestamp());
+    assertEquals(USER, rowKey.getUserId());
+    assertEquals(FLOW_NAME, rowKey.getFlowName());
+
+    byte[] byteRowKeyPrefix =
+        new FlowActivityRowKeyPrefix(CLUSTER).getRowKeyPrefix();
+    byte[][] splits =
+        Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
+            Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE });
+    assertEquals(2, splits.length);
+    assertEquals(0, splits[1].length);
+    assertEquals(CLUSTER,
+        Separator.QUALIFIERS.decode(Bytes.toString(splits[0])));
+    verifyRowPrefixBytes(byteRowKeyPrefix);
+
+    byteRowKeyPrefix =
+        new FlowActivityRowKeyPrefix(CLUSTER, ts).getRowKeyPrefix();
+    splits =
+        Separator.QUALIFIERS.split(byteRowKeyPrefix,
+            new int[] {Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
+                Separator.VARIABLE_SIZE});
+    assertEquals(3, splits.length);
+    assertEquals(0, splits[2].length);
+    assertEquals(CLUSTER,
+        Separator.QUALIFIERS.decode(Bytes.toString(splits[0])));
+    assertEquals(ts,
+        (Long) LongConverter.invertLong(Bytes.toLong(splits[1])));
+    verifyRowPrefixBytes(byteRowKeyPrefix);
+  }
+
+  @Test
+  public void testFlowRunRowKey() {
+    byte[] byteRowKey =
+        new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID).getRowKey();
+    FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(byteRowKey);
+    assertEquals(CLUSTER, rowKey.getClusterId());
+    assertEquals(USER, rowKey.getUserId());
+    assertEquals(FLOW_NAME, rowKey.getFlowName());
+    assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
+
+    byte[] byteRowKeyPrefix =
+        new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, null).getRowKey();
+    byte[][] splits =
+        Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
+            Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+            Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE });
+    assertEquals(4, splits.length);
+    assertEquals(0, splits[3].length);
+    assertEquals(FLOW_NAME,
+        Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
+    verifyRowPrefixBytes(byteRowKeyPrefix);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java
new file mode 100644
index 0000000..7d37206
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+
+public class TestSeparator {
+
+  private static String villain = "Dr. Heinz Doofenshmirtz";
+  private static String special =
+      ".   *   |   ?   +   \t   (   )   [   ]   {   }   ^   $  \\ \"  %";
+
+  /**
+   *
+   */
+  @Test
+  public void testEncodeDecodeString() {
+
+    for (Separator separator : Separator.values()) {
+      testEncodeDecode(separator, "");
+      testEncodeDecode(separator, " ");
+      testEncodeDecode(separator, "!");
+      testEncodeDecode(separator, "?");
+      testEncodeDecode(separator, "&");
+      testEncodeDecode(separator, "+");
+      testEncodeDecode(separator, "\t");
+      testEncodeDecode(separator, "Dr.");
+      testEncodeDecode(separator, "Heinz");
+      testEncodeDecode(separator, "Doofenshmirtz");
+      testEncodeDecode(separator, villain);
+      testEncodeDecode(separator, special);
+
+      assertNull(separator.encode(null));
+
+    }
+  }
+
+  private void testEncodeDecode(Separator separator, String token) {
+    String encoded = separator.encode(token);
+    String decoded = separator.decode(encoded);
+    String msg = "token:" + token + " separator:" + separator + ".";
+    assertEquals(msg, token, decoded);
+  }
+
+  @Test
+  public void testEncodeDecode() {
+    testEncodeDecode("Dr.", Separator.QUALIFIERS);
+    testEncodeDecode("Heinz", Separator.QUALIFIERS, Separator.QUALIFIERS);
+    testEncodeDecode("Doofenshmirtz", Separator.QUALIFIERS, null,
+        Separator.QUALIFIERS);
+    testEncodeDecode("&Perry", Separator.QUALIFIERS, Separator.VALUES, null);
+    testEncodeDecode("the ", Separator.QUALIFIERS, Separator.SPACE);
+    testEncodeDecode("Platypus...", (Separator) null);
+    testEncodeDecode("The what now ?!?", Separator.QUALIFIERS,
+        Separator.VALUES, Separator.SPACE);
+
+  }
+  @Test
+  public void testEncodedValues() {
+    testEncodeDecode("Double-escape %2$ and %9$ or %%2$ or %%3$, nor  %%%2$" +
+        "= no problem!",
+        Separator.QUALIFIERS, Separator.VALUES, Separator.SPACE, Separator.TAB);
+  }
+
+  @Test
+  public void testSplits() {
+    byte[] maxLongBytes = Bytes.toBytes(Long.MAX_VALUE);
+    byte[] maxIntBytes = Bytes.toBytes(Integer.MAX_VALUE);
+    for (Separator separator : Separator.values()) {
+      String str1 = "cl" + separator.getValue() + "us";
+      String str2 = separator.getValue() + "rst";
+      byte[] sepByteArr = Bytes.toBytes(separator.getValue());
+      byte[] longVal1Arr = Bytes.add(sepByteArr, Bytes.copy(maxLongBytes,
+          sepByteArr.length, Bytes.SIZEOF_LONG - sepByteArr.length));
+      byte[] intVal1Arr = Bytes.add(sepByteArr, Bytes.copy(maxIntBytes,
+          sepByteArr.length, Bytes.SIZEOF_INT - sepByteArr.length));
+      byte[] arr = separator.join(
+          Bytes.toBytes(separator.encode(str1)), longVal1Arr,
+          Bytes.toBytes(separator.encode(str2)), intVal1Arr);
+      int[] sizes = {Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
+          Separator.VARIABLE_SIZE, Bytes.SIZEOF_INT};
+      byte[][] splits = separator.split(arr, sizes);
+      assertEquals(4, splits.length);
+      assertEquals(str1, separator.decode(Bytes.toString(splits[0])));
+      assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[1]));
+      assertEquals(str2, separator.decode(Bytes.toString(splits[2])));
+      assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[3]));
+
+      longVal1Arr = Bytes.add(Bytes.copy(maxLongBytes, 0, Bytes.SIZEOF_LONG -
+          sepByteArr.length), sepByteArr);
+      intVal1Arr = Bytes.add(Bytes.copy(maxIntBytes, 0, Bytes.SIZEOF_INT -
+          sepByteArr.length), sepByteArr);
+      arr = separator.join(Bytes.toBytes(separator.encode(str1)), longVal1Arr,
+          Bytes.toBytes(separator.encode(str2)), intVal1Arr);
+      splits = separator.split(arr, sizes);
+      assertEquals(4, splits.length);
+      assertEquals(str1, separator.decode(Bytes.toString(splits[0])));
+      assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[1]));
+      assertEquals(str2, separator.decode(Bytes.toString(splits[2])));
+      assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[3]));
+
+      longVal1Arr = Bytes.add(sepByteArr, Bytes.copy(maxLongBytes,
+          sepByteArr.length, 4 - sepByteArr.length), sepByteArr);
+      longVal1Arr = Bytes.add(longVal1Arr, Bytes.copy(maxLongBytes, 4, 3 -
+              sepByteArr.length), sepByteArr);
+      arr = separator.join(Bytes.toBytes(separator.encode(str1)), longVal1Arr,
+          Bytes.toBytes(separator.encode(str2)), intVal1Arr);
+      splits = separator.split(arr, sizes);
+      assertEquals(4, splits.length);
+      assertEquals(str1, separator.decode(Bytes.toString(splits[0])));
+      assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[1]));
+      assertEquals(str2, separator.decode(Bytes.toString(splits[2])));
+      assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[3]));
+
+      arr = separator.join(Bytes.toBytes(separator.encode(str1)),
+          Bytes.toBytes(separator.encode(str2)), intVal1Arr, longVal1Arr);
+      int[] sizes1 = {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+          Bytes.SIZEOF_INT, Bytes.SIZEOF_LONG};
+      splits = separator.split(arr, sizes1);
+      assertEquals(4, splits.length);
+      assertEquals(str1, separator.decode(Bytes.toString(splits[0])));
+      assertEquals(str2, separator.decode(Bytes.toString(splits[1])));
+      assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[2]));
+      assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[3]));
+
+      try {
+        int[] sizes2 = {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+            Bytes.SIZEOF_INT, 7};
+        splits = separator.split(arr, sizes2);
+        fail("Exception should have been thrown.");
+      } catch (IllegalArgumentException e) {}
+
+      try {
+        int[] sizes2 = {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, 2,
+            Bytes.SIZEOF_LONG};
+        splits = separator.split(arr, sizes2);
+        fail("Exception should have been thrown.");
+      } catch (IllegalArgumentException e) {}
+    }
+  }
+
+  /**
+   * Simple test to encode and decode using the same separators and confirm that
+   * we end up with the same as what we started with.
+   *
+   * @param token
+   * @param separators
+   */
+  private static void testEncodeDecode(String token, Separator... separators) {
+    byte[] encoded = Separator.encode(token, separators);
+    String decoded = Separator.decode(encoded, separators);
+    assertEquals(token, decoded);
+  }
+
+  @Test
+  public void testJoinStripped() {
+    List<String> stringList = new ArrayList<String>(0);
+    stringList.add("nothing");
+
+    String joined = Separator.VALUES.joinEncoded(stringList);
+    Iterable<String> split = Separator.VALUES.splitEncoded(joined);
+    assertTrue(Iterables.elementsEqual(stringList, split));
+
+    stringList = new ArrayList<String>(3);
+    stringList.add("a");
+    stringList.add("b?");
+    stringList.add("c");
+
+    joined = Separator.VALUES.joinEncoded(stringList);
+    split = Separator.VALUES.splitEncoded(joined);
+    assertTrue(Iterables.elementsEqual(stringList, split));
+
+    String[] stringArray1 = {"else"};
+    joined = Separator.VALUES.joinEncoded(stringArray1);
+    split = Separator.VALUES.splitEncoded(joined);
+    assertTrue(Iterables.elementsEqual(Arrays.asList(stringArray1), split));
+
+    String[] stringArray2 = {"d", "e?", "f"};
+    joined = Separator.VALUES.joinEncoded(stringArray2);
+    split = Separator.VALUES.splitEncoded(joined);
+    assertTrue(Iterables.elementsEqual(Arrays.asList(stringArray2), split));
+
+    List<String> empty = new ArrayList<String>(0);
+    split = Separator.VALUES.splitEncoded(null);
+    assertTrue(Iterables.elementsEqual(empty, split));
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
index e954047..e0af66a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
@@ -106,11 +106,6 @@
     </dependency>
 
     <dependency>
-      <groupId>commons-cli</groupId>
-      <artifactId>commons-cli</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>commons-lang</groupId>
       <artifactId>commons-lang</artifactId>
     </dependency>
@@ -131,67 +126,6 @@
       <version>1.1.1</version>
     </dependency>
 
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase-common</artifactId>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-mapreduce-client-core</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>jetty-util</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase-client</artifactId>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-mapreduce-client-core</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase-server</artifactId>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-hdfs</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-hdfs-client</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-mapreduce-client-core</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>jetty</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>jetty-util</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>jetty-sslengine</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
     <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
     <dependency>
       <groupId>org.apache.hadoop</groupId>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message