hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtcarre...@apache.org
Subject [49/50] [abbrv] hadoop git commit: YARN-4200. Refactor reader classes in storage to nest under hbase specific package name. Contributed by Li Lu.
Date Wed, 20 Jan 2016 09:14:06 GMT
YARN-4200. Refactor reader classes in storage to nest under hbase
specific package name. Contributed by Li Lu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/63d90992
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/63d90992
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/63d90992

Branch: refs/heads/feature-YARN-2928
Commit: 63d909927b4dd123f43523c1449033b419f51834
Parents: 5157c30
Author: Li Lu <gtcarrera9@apache.org>
Authored: Mon Jan 11 18:05:36 2016 -0800
Committer: Li Lu <gtcarrera9@apache.org>
Committed: Tue Jan 19 18:03:31 2016 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../storage/ApplicationEntityReader.java        | 382 --------------
 .../storage/FlowActivityEntityReader.java       | 163 ------
 .../storage/FlowRunEntityReader.java            | 225 ---------
 .../storage/GenericEntityReader.java            | 496 ------------------
 .../storage/HBaseTimelineReaderImpl.java        |   2 +
 .../storage/TimelineEntityReader.java           | 274 ----------
 .../storage/TimelineEntityReaderFactory.java    | 100 ----
 .../storage/reader/ApplicationEntityReader.java | 383 ++++++++++++++
 .../reader/FlowActivityEntityReader.java        | 164 ++++++
 .../storage/reader/FlowRunEntityReader.java     | 226 +++++++++
 .../storage/reader/GenericEntityReader.java     | 497 +++++++++++++++++++
 .../storage/reader/TimelineEntityReader.java    | 274 ++++++++++
 .../reader/TimelineEntityReaderFactory.java     | 100 ++++
 .../storage/reader/package-info.java            |  23 +
 15 files changed, 1672 insertions(+), 1640 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/63d90992/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 7827d77..1a0ed56 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -129,6 +129,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
 
   IMPROVEMENTS
 
+    YARN-4200. Refactor reader classes in storage to nest under hbase specific 
+    package name. Contributed by Li Lu. 
+
     YARN-3276. Code cleanup for timeline service API records. (Junping Du via
     zjshen)
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/63d90992/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
deleted file mode 100644
index d812a6c..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
-import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.filter.FamilyFilter;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.FilterList.Operator;
-import org.apache.hadoop.hbase.filter.PageFilter;
-import org.apache.hadoop.hbase.filter.QualifierFilter;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnFamily;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Timeline entity reader for application entities that are stored in the
- * application table.
- */
-class ApplicationEntityReader extends GenericEntityReader {
-  private static final ApplicationTable APPLICATION_TABLE =
-      new ApplicationTable();
-
-  public ApplicationEntityReader(String userId, String clusterId,
-      String flowName, Long flowRunId, String appId, String entityType,
-      Long limit, Long createdTimeBegin, Long createdTimeEnd,
-      Long modifiedTimeBegin, Long modifiedTimeEnd,
-      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
-      Map<String, Object> infoFilters, Map<String, String> configFilters,
-      Set<String> metricFilters, Set<String> eventFilters,
-      TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
-      EnumSet<Field> fieldsToRetrieve) {
-    super(userId, clusterId, flowName, flowRunId, appId, entityType, limit,
-        createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
-        relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
-        eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve,
-        true);
-  }
-
-  public ApplicationEntityReader(String userId, String clusterId,
-      String flowName, Long flowRunId, String appId, String entityType,
-      String entityId, TimelineFilterList confsToRetrieve,
-      TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
-    super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId,
-        confsToRetrieve, metricsToRetrieve, fieldsToRetrieve);
-  }
-
-  /**
-   * Uses the {@link ApplicationTable}.
-   */
-  protected BaseTable<?> getTable() {
-    return APPLICATION_TABLE;
-  }
-
-  @Override
-  protected FilterList constructFilterListBasedOnFields() {
-    FilterList list = new FilterList(Operator.MUST_PASS_ONE);
-    // Fetch all the columns.
-    if (fieldsToRetrieve.contains(Field.ALL) &&
-        (confsToRetrieve == null ||
-        confsToRetrieve.getFilterList().isEmpty()) &&
-        (metricsToRetrieve == null ||
-        metricsToRetrieve.getFilterList().isEmpty())) {
-      return list;
-    }
-    FilterList infoColFamilyList = new FilterList();
-    // By default fetch everything in INFO column family.
-    FamilyFilter infoColumnFamily =
-        new FamilyFilter(CompareOp.EQUAL,
-           new BinaryComparator(ApplicationColumnFamily.INFO.getBytes()));
-    infoColFamilyList.addFilter(infoColumnFamily);
-    // Events not required.
-    if (!fieldsToRetrieve.contains(Field.EVENTS) &&
-        !fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) {
-      infoColFamilyList.addFilter(
-          new QualifierFilter(CompareOp.NOT_EQUAL,
-          new BinaryPrefixComparator(
-          ApplicationColumnPrefix.EVENT.getColumnPrefixBytes(""))));
-    }
-    // info not required.
-    if (!fieldsToRetrieve.contains(Field.INFO) &&
-        !fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) {
-      infoColFamilyList.addFilter(
-          new QualifierFilter(CompareOp.NOT_EQUAL,
-          new BinaryPrefixComparator(
-          ApplicationColumnPrefix.INFO.getColumnPrefixBytes(""))));
-    }
-    // is releated to not required.
-    if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) &&
-        !fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) {
-      infoColFamilyList.addFilter(
-          new QualifierFilter(CompareOp.NOT_EQUAL,
-          new BinaryPrefixComparator(
-          ApplicationColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes(""))));
-    }
-    // relates to not required.
-    if (!fieldsToRetrieve.contains(Field.RELATES_TO) &&
-        !fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) {
-      infoColFamilyList.addFilter(
-          new QualifierFilter(CompareOp.NOT_EQUAL,
-          new BinaryPrefixComparator(
-          ApplicationColumnPrefix.RELATES_TO.getColumnPrefixBytes(""))));
-    }
-    list.addFilter(infoColFamilyList);
-    if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) ||
-        (confsToRetrieve != null &&
-        !confsToRetrieve.getFilterList().isEmpty())) {
-      FilterList filterCfg =
-          new FilterList(new FamilyFilter(CompareOp.EQUAL,
-          new BinaryComparator(ApplicationColumnFamily.CONFIGS.getBytes())));
-      if (confsToRetrieve != null &&
-          !confsToRetrieve.getFilterList().isEmpty()) {
-        filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList(
-            ApplicationColumnPrefix.CONFIG, confsToRetrieve));
-      }
-      list.addFilter(filterCfg);
-    }
-    if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) ||
-        (metricsToRetrieve != null &&
-        !metricsToRetrieve.getFilterList().isEmpty())) {
-      FilterList filterMetrics =
-          new FilterList(new FamilyFilter(CompareOp.EQUAL,
-          new BinaryComparator(ApplicationColumnFamily.METRICS.getBytes())));
-      if (metricsToRetrieve != null &&
-          !metricsToRetrieve.getFilterList().isEmpty()) {
-        filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList(
-            ApplicationColumnPrefix.METRIC, metricsToRetrieve));
-      }
-      list.addFilter(filterMetrics);
-    }
-    return list;
-  }
-
-  @Override
-  protected Result getResult(Configuration hbaseConf, Connection conn,
-      FilterList filterList) throws IOException {
-    byte[] rowKey =
-        ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId,
-            appId);
-    Get get = new Get(rowKey);
-    get.setMaxVersions(Integer.MAX_VALUE);
-    if (filterList != null && !filterList.getFilters().isEmpty()) {
-      get.setFilter(filterList);
-    }
-    return table.getResult(hbaseConf, conn, get);
-  }
-
-  @Override
-  protected void validateParams() {
-    Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
-    Preconditions.checkNotNull(entityType, "entityType shouldn't be null");
-    if (singleEntityRead) {
-      Preconditions.checkNotNull(appId, "appId shouldn't be null");
-    } else {
-      Preconditions.checkNotNull(userId, "userId shouldn't be null");
-      Preconditions.checkNotNull(flowName, "flowName shouldn't be null");
-    }
-  }
-
-  @Override
-  protected void augmentParams(Configuration hbaseConf, Connection conn)
-      throws IOException {
-    if (singleEntityRead) {
-      if (flowName == null || flowRunId == null || userId == null) {
-        FlowContext context =
-            lookupFlowContext(clusterId, appId, hbaseConf, conn);
-        flowName = context.flowName;
-        flowRunId = context.flowRunId;
-        userId = context.userId;
-      }
-    }
-    if (fieldsToRetrieve == null) {
-      fieldsToRetrieve = EnumSet.noneOf(Field.class);
-    }
-    if (!fieldsToRetrieve.contains(Field.CONFIGS) &&
-        confsToRetrieve != null && !confsToRetrieve.getFilterList().isEmpty()) {
-      fieldsToRetrieve.add(Field.CONFIGS);
-    }
-    if (!fieldsToRetrieve.contains(Field.METRICS) &&
-        metricsToRetrieve != null &&
-        !metricsToRetrieve.getFilterList().isEmpty()) {
-      fieldsToRetrieve.add(Field.METRICS);
-    }
-    if (!singleEntityRead) {
-      if (limit == null || limit < 0) {
-        limit = TimelineReader.DEFAULT_LIMIT;
-      }
-      if (createdTimeBegin == null) {
-        createdTimeBegin = DEFAULT_BEGIN_TIME;
-      }
-      if (createdTimeEnd == null) {
-        createdTimeEnd = DEFAULT_END_TIME;
-      }
-      if (modifiedTimeBegin == null) {
-        modifiedTimeBegin = DEFAULT_BEGIN_TIME;
-      }
-      if (modifiedTimeEnd == null) {
-        modifiedTimeEnd = DEFAULT_END_TIME;
-      }
-    }
-  }
-
-  @Override
-  protected ResultScanner getResults(Configuration hbaseConf,
-      Connection conn, FilterList filterList) throws IOException {
-    Scan scan = new Scan();
-    if (flowRunId != null) {
-      scan.setRowPrefixFilter(ApplicationRowKey.
-          getRowKeyPrefix(clusterId, userId, flowName, flowRunId));
-    } else {
-      scan.setRowPrefixFilter(ApplicationRowKey.
-          getRowKeyPrefix(clusterId, userId, flowName));
-    }
-    FilterList newList = new FilterList();
-    newList.addFilter(new PageFilter(limit));
-    if (filterList != null && !filterList.getFilters().isEmpty()) {
-      newList.addFilter(filterList);
-    }
-    scan.setFilter(newList);
-    return table.getResultScanner(hbaseConf, conn, scan);
-  }
-
-  @Override
-  protected TimelineEntity parseEntity(Result result) throws IOException {
-    if (result == null || result.isEmpty()) {
-      return null;
-    }
-    TimelineEntity entity = new TimelineEntity();
-    entity.setType(TimelineEntityType.YARN_APPLICATION.toString());
-    String entityId = ApplicationColumn.ID.readResult(result).toString();
-    entity.setId(entityId);
-
-    // fetch created time
-    Number createdTime =
-        (Number)ApplicationColumn.CREATED_TIME.readResult(result);
-    entity.setCreatedTime(createdTime.longValue());
-    if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin ||
-        entity.getCreatedTime() > createdTimeEnd)) {
-      return null;
-    }
-
-    // fetch modified time
-    Number modifiedTime =
-        (Number)ApplicationColumn.MODIFIED_TIME.readResult(result);
-    entity.setModifiedTime(modifiedTime.longValue());
-    if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin ||
-        entity.getModifiedTime() > modifiedTimeEnd)) {
-      return null;
-    }
-
-    // fetch is related to entities
-    boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
-      readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO,
-          true);
-      if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations(
-          entity.getIsRelatedToEntities(), isRelatedTo)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
-        entity.getIsRelatedToEntities().clear();
-      }
-    }
-
-    // fetch relates to entities
-    boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
-      readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO,
-          false);
-      if (checkRelatesTo && !TimelineStorageUtils.matchRelations(
-          entity.getRelatesToEntities(), relatesTo)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.RELATES_TO)) {
-        entity.getRelatesToEntities().clear();
-      }
-    }
-
-    // fetch info
-    boolean checkInfo = infoFilters != null && infoFilters.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
-      readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false);
-      if (checkInfo &&
-          !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.INFO)) {
-        entity.getInfo().clear();
-      }
-    }
-
-    // fetch configs
-    boolean checkConfigs = configFilters != null && configFilters.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
-      readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true);
-      if (checkConfigs && !TimelineStorageUtils.matchFilters(
-          entity.getConfigs(), configFilters)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.CONFIGS)) {
-        entity.getConfigs().clear();
-      }
-    }
-
-    // fetch events
-    boolean checkEvents = eventFilters != null && eventFilters.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
-      readEvents(entity, result, true);
-      if (checkEvents && !TimelineStorageUtils.matchEventFilters(
-          entity.getEvents(), eventFilters)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.EVENTS)) {
-        entity.getEvents().clear();
-      }
-    }
-
-    // fetch metrics
-    boolean checkMetrics = metricFilters != null && metricFilters.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
-      readMetrics(entity, result, ApplicationColumnPrefix.METRIC);
-      if (checkMetrics && !TimelineStorageUtils.matchMetricFilters(
-          entity.getMetrics(), metricFilters)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.METRICS)) {
-        entity.getMetrics().clear();
-      }
-    }
-    return entity;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/63d90992/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
deleted file mode 100644
index 7e8d4ba..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.PageFilter;
-import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Timeline entity reader for flow activity entities that are stored in the
- * flow activity table.
- */
-class FlowActivityEntityReader extends TimelineEntityReader {
-  private static final FlowActivityTable FLOW_ACTIVITY_TABLE =
-      new FlowActivityTable();
-
-  public FlowActivityEntityReader(String userId, String clusterId,
-      String flowName, Long flowRunId, String appId, String entityType,
-      Long limit, Long createdTimeBegin, Long createdTimeEnd,
-      Long modifiedTimeBegin, Long modifiedTimeEnd,
-      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
-      Map<String, Object> infoFilters, Map<String, String> configFilters,
-      Set<String> metricFilters, Set<String> eventFilters,
-      EnumSet<Field> fieldsToRetrieve) {
-    super(userId, clusterId, flowName, flowRunId, appId, entityType, limit,
-        createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
-        relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
-        eventFilters, null, null, fieldsToRetrieve, true);
-  }
-
-  public FlowActivityEntityReader(String userId, String clusterId,
-      String flowName, Long flowRunId, String appId, String entityType,
-      String entityId, EnumSet<Field> fieldsToRetrieve) {
-    super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId,
-        null, null, fieldsToRetrieve);
-  }
-
-  /**
-   * Uses the {@link FlowActivityTable}.
-   */
-  @Override
-  protected BaseTable<?> getTable() {
-    return FLOW_ACTIVITY_TABLE;
-  }
-
-  @Override
-  protected void validateParams() {
-    Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
-  }
-
-  @Override
-  protected void augmentParams(Configuration hbaseConf, Connection conn)
-      throws IOException {
-    if (limit == null || limit < 0) {
-      limit = TimelineReader.DEFAULT_LIMIT;
-    }
-    if (createdTimeBegin == null) {
-      createdTimeBegin = DEFAULT_BEGIN_TIME;
-    }
-    if (createdTimeEnd == null) {
-      createdTimeEnd = DEFAULT_END_TIME;
-    }
-  }
-
-  @Override
-  protected FilterList constructFilterListBasedOnFields() {
-    return null;
-  }
-
-  @Override
-  protected Result getResult(Configuration hbaseConf, Connection conn,
-      FilterList filterList) throws IOException {
-    throw new UnsupportedOperationException(
-        "we don't support a single entity query");
-  }
-
-  @Override
-  protected ResultScanner getResults(Configuration hbaseConf,
-      Connection conn, FilterList filterList) throws IOException {
-    Scan scan = new Scan();
-    if (createdTimeBegin == DEFAULT_BEGIN_TIME &&
-        createdTimeEnd == DEFAULT_END_TIME) {
-      scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId));
-    } else {
-      scan.setStartRow(
-          FlowActivityRowKey.getRowKeyPrefix(clusterId, createdTimeEnd));
-      scan.setStopRow(
-          FlowActivityRowKey.getRowKeyPrefix(clusterId,
-              (createdTimeBegin <= 0 ? 0: (createdTimeBegin - 1))));
-    }
-    // use the page filter to limit the result to the page size
-    // the scanner may still return more than the limit; therefore we need to
-    // read the right number as we iterate
-    scan.setFilter(new PageFilter(limit));
-    return table.getResultScanner(hbaseConf, conn, scan);
-  }
-
-  @Override
-  protected TimelineEntity parseEntity(Result result) throws IOException {
-    FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(result.getRow());
-
-    long time = rowKey.getDayTimestamp();
-    String user = rowKey.getUserId();
-    String flowName = rowKey.getFlowName();
-
-    FlowActivityEntity flowActivity =
-        new FlowActivityEntity(clusterId, time, user, flowName);
-    // set the id
-    flowActivity.setId(flowActivity.getId());
-    // get the list of run ids along with the version that are associated with
-    // this flow on this day
-    Map<String, Object> runIdsMap =
-        FlowActivityColumnPrefix.RUN_ID.readResults(result);
-    for (Map.Entry<String, Object> e : runIdsMap.entrySet()) {
-      Long runId = Long.valueOf(e.getKey());
-      String version = (String)e.getValue();
-      FlowRunEntity flowRun = new FlowRunEntity();
-      flowRun.setUser(user);
-      flowRun.setName(flowName);
-      flowRun.setRunId(runId);
-      flowRun.setVersion(version);
-      // set the id
-      flowRun.setId(flowRun.getId());
-      flowActivity.addFlowRun(flowRun);
-    }
-
-    return flowActivity;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/63d90992/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
deleted file mode 100644
index c9076ee..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
-import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
-import org.apache.hadoop.hbase.filter.FamilyFilter;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.PageFilter;
-import org.apache.hadoop.hbase.filter.QualifierFilter;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.filter.FilterList.Operator;
-import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Timeline entity reader for flow run entities that are stored in the flow run
- * table.
- */
-class FlowRunEntityReader extends TimelineEntityReader {
-  private static final FlowRunTable FLOW_RUN_TABLE = new FlowRunTable();
-
-  public FlowRunEntityReader(String userId, String clusterId,
-      String flowName, Long flowRunId, String appId, String entityType,
-      Long limit, Long createdTimeBegin, Long createdTimeEnd,
-      Long modifiedTimeBegin, Long modifiedTimeEnd,
-      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
-      Map<String, Object> infoFilters, Map<String, String> configFilters,
-      Set<String> metricFilters, Set<String> eventFilters,
-      TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
-      EnumSet<Field> fieldsToRetrieve) {
-    super(userId, clusterId, flowName, flowRunId, appId, entityType, limit,
-        createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
-        relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
-        eventFilters, null, metricsToRetrieve, fieldsToRetrieve, true);
-  }
-
-  public FlowRunEntityReader(String userId, String clusterId,
-      String flowName, Long flowRunId, String appId, String entityType,
-      String entityId, TimelineFilterList confsToRetrieve,
-      TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
-    super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId,
-        null, metricsToRetrieve, fieldsToRetrieve);
-  }
-
-  /**
-   * Uses the {@link FlowRunTable}.
-   */
-  @Override
-  protected BaseTable<?> getTable() {
-    return FLOW_RUN_TABLE;
-  }
-
-  @Override
-  protected void validateParams() {
-    Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
-    Preconditions.checkNotNull(userId, "userId shouldn't be null");
-    Preconditions.checkNotNull(flowName, "flowName shouldn't be null");
-    if (singleEntityRead) {
-      Preconditions.checkNotNull(flowRunId, "flowRunId shouldn't be null");
-    }
-  }
-
-  @Override
-  protected void augmentParams(Configuration hbaseConf, Connection conn) {
-    if (!singleEntityRead) {
-      if (fieldsToRetrieve == null) {
-        fieldsToRetrieve = EnumSet.noneOf(Field.class);
-      }
-      if (limit == null || limit < 0) {
-        limit = TimelineReader.DEFAULT_LIMIT;
-      }
-      if (createdTimeBegin == null) {
-        createdTimeBegin = DEFAULT_BEGIN_TIME;
-      }
-      if (createdTimeEnd == null) {
-        createdTimeEnd = DEFAULT_END_TIME;
-      }
-      if (!fieldsToRetrieve.contains(Field.METRICS) &&
-          metricsToRetrieve != null &&
-          !metricsToRetrieve.getFilterList().isEmpty()) {
-        fieldsToRetrieve.add(Field.METRICS);
-      }
-    }
-  }
-
-  @Override
-  protected FilterList constructFilterListBasedOnFields() {
-    FilterList list = new FilterList(Operator.MUST_PASS_ONE);
-
-    // By default fetch everything in INFO column family.
-    FamilyFilter infoColumnFamily =
-        new FamilyFilter(CompareOp.EQUAL,
-           new BinaryComparator(FlowRunColumnFamily.INFO.getBytes()));
-    // Metrics not required.
-    if (!singleEntityRead && !fieldsToRetrieve.contains(Field.METRICS) &&
-        !fieldsToRetrieve.contains(Field.ALL)) {
-      FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE);
-      infoColFamilyList.addFilter(infoColumnFamily);
-      infoColFamilyList.addFilter(
-          new QualifierFilter(CompareOp.NOT_EQUAL,
-          new BinaryPrefixComparator(
-          FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(""))));
-      list.addFilter(infoColFamilyList);
-    }
-    if (metricsToRetrieve != null &&
-        !metricsToRetrieve.getFilterList().isEmpty()) {
-      FilterList infoColFamilyList = new FilterList();
-      infoColFamilyList.addFilter(infoColumnFamily);
-      infoColFamilyList.addFilter(TimelineFilterUtils.createHBaseFilterList(
-          FlowRunColumnPrefix.METRIC, metricsToRetrieve));
-      list.addFilter(infoColFamilyList);
-    }
-    return list;
-  }
-
-  @Override
-  protected Result getResult(Configuration hbaseConf, Connection conn,
-      FilterList filterList) throws IOException {
-    byte[] rowKey =
-        FlowRunRowKey.getRowKey(clusterId, userId, flowName, flowRunId);
-    Get get = new Get(rowKey);
-    get.setMaxVersions(Integer.MAX_VALUE);
-    if (filterList != null && !filterList.getFilters().isEmpty()) {
-      get.setFilter(filterList);
-    }
-    return table.getResult(hbaseConf, conn, get);
-  }
-
-  @Override
-  protected ResultScanner getResults(Configuration hbaseConf,
-      Connection conn, FilterList filterList) throws IOException {
-    Scan scan = new Scan();
-    scan.setRowPrefixFilter(
-        FlowRunRowKey.getRowKeyPrefix(clusterId, userId, flowName));
-    FilterList newList = new FilterList();
-    newList.addFilter(new PageFilter(limit));
-    if (filterList != null && !filterList.getFilters().isEmpty()) {
-      newList.addFilter(filterList);
-    }
-    scan.setFilter(newList);
-    return table.getResultScanner(hbaseConf, conn, scan);
-  }
-
-  @Override
-  protected TimelineEntity parseEntity(Result result) throws IOException {
-    FlowRunEntity flowRun = new FlowRunEntity();
-    flowRun.setUser(userId);
-    flowRun.setName(flowName);
-    if (singleEntityRead) {
-      flowRun.setRunId(flowRunId);
-    } else {
-      FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(result.getRow());
-      flowRun.setRunId(rowKey.getFlowRunId());
-    }
-
-    // read the start time
-    Long startTime = (Long)FlowRunColumn.MIN_START_TIME.readResult(result);
-    if (startTime != null) {
-      flowRun.setStartTime(startTime.longValue());
-    }
-    if (!singleEntityRead && (flowRun.getStartTime() < createdTimeBegin ||
-        flowRun.getStartTime() > createdTimeEnd)) {
-      return null;
-    }
-
-    // read the end time if available
-    Long endTime = (Long)FlowRunColumn.MAX_END_TIME.readResult(result);
-    if (endTime != null) {
-      flowRun.setMaxEndTime(endTime.longValue());
-    }
-
-    // read the flow version
-    String version = (String)FlowRunColumn.FLOW_VERSION.readResult(result);
-    if (version != null) {
-      flowRun.setVersion(version);
-    }
-
-    // read metrics
-    if (singleEntityRead || fieldsToRetrieve.contains(Field.METRICS)) {
-      readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC);
-    }
-
-    // set the id
-    flowRun.setId(flowRun.getId());
-    return flowRun;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/63d90992/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
deleted file mode 100644
index 784dfd5..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
+++ /dev/null
@@ -1,496 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
-import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
-import org.apache.hadoop.hbase.filter.FamilyFilter;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.QualifierFilter;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.filter.FilterList.Operator;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Timeline entity reader for generic entities that are stored in the entity
- * table.
- */
-class GenericEntityReader extends TimelineEntityReader {
-  private static final EntityTable ENTITY_TABLE = new EntityTable();
-  private static final Log LOG = LogFactory.getLog(GenericEntityReader.class);
-
-  /**
-   * Used to look up the flow context.
-   */
-  private final AppToFlowTable appToFlowTable = new AppToFlowTable();
-
-  public GenericEntityReader(String userId, String clusterId,
-      String flowName, Long flowRunId, String appId, String entityType,
-      Long limit, Long createdTimeBegin, Long createdTimeEnd,
-      Long modifiedTimeBegin, Long modifiedTimeEnd,
-      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
-      Map<String, Object> infoFilters, Map<String, String> configFilters,
-      Set<String> metricFilters, Set<String> eventFilters,
-      TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
-      EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) {
-    super(userId, clusterId, flowName, flowRunId, appId, entityType, limit,
-        createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
-        relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
-        eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve,
-        sortedKeys);
-  }
-
-  public GenericEntityReader(String userId, String clusterId,
-      String flowName, Long flowRunId, String appId, String entityType,
-      String entityId, TimelineFilterList confsToRetrieve,
-      TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
-    super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId,
-        confsToRetrieve, metricsToRetrieve, fieldsToRetrieve);
-  }
-
-  /**
-   * Uses the {@link EntityTable}.
-   */
-  protected BaseTable<?> getTable() {
-    return ENTITY_TABLE;
-  }
-
-  @Override
-  protected FilterList constructFilterListBasedOnFields() {
-    FilterList list = new FilterList(Operator.MUST_PASS_ONE);
-    // Fetch all the columns.
-    if (fieldsToRetrieve.contains(Field.ALL) &&
-        (confsToRetrieve == null ||
-        confsToRetrieve.getFilterList().isEmpty()) &&
-        (metricsToRetrieve == null ||
-        metricsToRetrieve.getFilterList().isEmpty())) {
-      return list;
-    }
-    FilterList infoColFamilyList = new FilterList();
-    // By default fetch everything in INFO column family.
-    FamilyFilter infoColumnFamily =
-        new FamilyFilter(CompareOp.EQUAL,
-           new BinaryComparator(EntityColumnFamily.INFO.getBytes()));
-    infoColFamilyList.addFilter(infoColumnFamily);
-    // Events not required.
-    if (!fieldsToRetrieve.contains(Field.EVENTS) &&
-        !fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) {
-      infoColFamilyList.addFilter(
-          new QualifierFilter(CompareOp.NOT_EQUAL,
-          new BinaryPrefixComparator(
-          EntityColumnPrefix.EVENT.getColumnPrefixBytes(""))));
-    }
-    // info not required.
-    if (!fieldsToRetrieve.contains(Field.INFO) &&
-        !fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) {
-      infoColFamilyList.addFilter(
-          new QualifierFilter(CompareOp.NOT_EQUAL,
-          new BinaryPrefixComparator(
-              EntityColumnPrefix.INFO.getColumnPrefixBytes(""))));
-    }
-    // is related to not required.
-    if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) &&
-        !fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) {
-      infoColFamilyList.addFilter(
-          new QualifierFilter(CompareOp.NOT_EQUAL,
-          new BinaryPrefixComparator(
-              EntityColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes(""))));
-    }
-    // relates to not required.
-    if (!fieldsToRetrieve.contains(Field.RELATES_TO) &&
-        !fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) {
-      infoColFamilyList.addFilter(
-          new QualifierFilter(CompareOp.NOT_EQUAL,
-          new BinaryPrefixComparator(
-              EntityColumnPrefix.RELATES_TO.getColumnPrefixBytes(""))));
-    }
-    list.addFilter(infoColFamilyList);
-    if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) ||
-        (confsToRetrieve != null &&
-        !confsToRetrieve.getFilterList().isEmpty())) {
-      FilterList filterCfg =
-          new FilterList(new FamilyFilter(CompareOp.EQUAL,
-              new BinaryComparator(EntityColumnFamily.CONFIGS.getBytes())));
-      if (confsToRetrieve != null &&
-          !confsToRetrieve.getFilterList().isEmpty()) {
-        filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList(
-            EntityColumnPrefix.CONFIG, confsToRetrieve));
-      }
-      list.addFilter(filterCfg);
-    }
-    if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) ||
-        (metricsToRetrieve != null &&
-        !metricsToRetrieve.getFilterList().isEmpty())) {
-      FilterList filterMetrics =
-          new FilterList(new FamilyFilter(CompareOp.EQUAL,
-              new BinaryComparator(EntityColumnFamily.METRICS.getBytes())));
-      if (metricsToRetrieve != null &&
-          !metricsToRetrieve.getFilterList().isEmpty()) {
-        filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList(
-            EntityColumnPrefix.METRIC, metricsToRetrieve));
-      }
-      list.addFilter(filterMetrics);
-    }
-    return list;
-  }
-
-  protected FlowContext lookupFlowContext(String clusterId, String appId,
-      Configuration hbaseConf, Connection conn) throws IOException {
-    byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
-    Get get = new Get(rowKey);
-    Result result = appToFlowTable.getResult(hbaseConf, conn, get);
-    if (result != null && !result.isEmpty()) {
-      return new FlowContext(
-          AppToFlowColumn.USER_ID.readResult(result).toString(),
-          AppToFlowColumn.FLOW_ID.readResult(result).toString(),
-          ((Number)AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue());
-    } else {
-       throw new IOException(
-           "Unable to find the context flow ID and flow run ID for clusterId=" +
-           clusterId + ", appId=" + appId);
-    }
-  }
-
-  protected static class FlowContext {
-    protected final String userId;
-    protected final String flowName;
-    protected final Long flowRunId;
-    public FlowContext(String user, String flowName, Long flowRunId) {
-      this.userId = user;
-      this.flowName = flowName;
-      this.flowRunId = flowRunId;
-    }
-  }
-
-  @Override
-  protected void validateParams() {
-    Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
-    Preconditions.checkNotNull(appId, "appId shouldn't be null");
-    Preconditions.checkNotNull(entityType, "entityType shouldn't be null");
-    if (singleEntityRead) {
-      Preconditions.checkNotNull(entityId, "entityId shouldn't be null");
-    }
-  }
-
-  @Override
-  protected void augmentParams(Configuration hbaseConf, Connection conn)
-      throws IOException {
-    // In reality all three should be null or neither should be null
-    if (flowName == null || flowRunId == null || userId == null) {
-      FlowContext context =
-          lookupFlowContext(clusterId, appId, hbaseConf, conn);
-      flowName = context.flowName;
-      flowRunId = context.flowRunId;
-      userId = context.userId;
-    }
-    if (fieldsToRetrieve == null) {
-      fieldsToRetrieve = EnumSet.noneOf(Field.class);
-    }
-    if (!fieldsToRetrieve.contains(Field.CONFIGS) &&
-        confsToRetrieve != null && !confsToRetrieve.getFilterList().isEmpty()) {
-      fieldsToRetrieve.add(Field.CONFIGS);
-    }
-    if (!fieldsToRetrieve.contains(Field.METRICS) &&
-        metricsToRetrieve != null &&
-        !metricsToRetrieve.getFilterList().isEmpty()) {
-      fieldsToRetrieve.add(Field.METRICS);
-    }
-    if (!singleEntityRead) {
-      if (limit == null || limit < 0) {
-        limit = TimelineReader.DEFAULT_LIMIT;
-      }
-      if (createdTimeBegin == null) {
-        createdTimeBegin = DEFAULT_BEGIN_TIME;
-      }
-      if (createdTimeEnd == null) {
-        createdTimeEnd = DEFAULT_END_TIME;
-      }
-      if (modifiedTimeBegin == null) {
-        modifiedTimeBegin = DEFAULT_BEGIN_TIME;
-      }
-      if (modifiedTimeEnd == null) {
-        modifiedTimeEnd = DEFAULT_END_TIME;
-      }
-    }
-  }
-
-  @Override
-  protected Result getResult(Configuration hbaseConf, Connection conn,
-      FilterList filterList) throws IOException {
-    byte[] rowKey =
-        EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId,
-            entityType, entityId);
-    Get get = new Get(rowKey);
-    get.setMaxVersions(Integer.MAX_VALUE);
-    if (filterList != null && !filterList.getFilters().isEmpty()) {
-      get.setFilter(filterList);
-    }
-    return table.getResult(hbaseConf, conn, get);
-  }
-
-  @Override
-  protected ResultScanner getResults(Configuration hbaseConf,
-      Connection conn, FilterList filterList) throws IOException {
-    // Scan through part of the table to find the entities belong to one app
-    // and one type
-    Scan scan = new Scan();
-    scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix(
-        clusterId, userId, flowName, flowRunId, appId, entityType));
-    scan.setMaxVersions(Integer.MAX_VALUE);
-    if (filterList != null && !filterList.getFilters().isEmpty()) {
-      scan.setFilter(filterList);
-    }
-    return table.getResultScanner(hbaseConf, conn, scan);
-  }
-
-  @Override
-  protected TimelineEntity parseEntity(Result result) throws IOException {
-    if (result == null || result.isEmpty()) {
-      return null;
-    }
-    TimelineEntity entity = new TimelineEntity();
-    String entityType = EntityColumn.TYPE.readResult(result).toString();
-    entity.setType(entityType);
-    String entityId = EntityColumn.ID.readResult(result).toString();
-    entity.setId(entityId);
-
-    // fetch created time
-    Number createdTime = (Number)EntityColumn.CREATED_TIME.readResult(result);
-    entity.setCreatedTime(createdTime.longValue());
-    if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin ||
-        entity.getCreatedTime() > createdTimeEnd)) {
-      return null;
-    }
-
-    // fetch modified time
-    Number modifiedTime = (Number)EntityColumn.MODIFIED_TIME.readResult(result);
-    entity.setModifiedTime(modifiedTime.longValue());
-    if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin ||
-        entity.getModifiedTime() > modifiedTimeEnd)) {
-      return null;
-    }
-
-    // fetch is related to entities
-    boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
-      readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true);
-      if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations(
-          entity.getIsRelatedToEntities(), isRelatedTo)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
-        entity.getIsRelatedToEntities().clear();
-      }
-    }
-
-    // fetch relates to entities
-    boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
-      readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false);
-      if (checkRelatesTo && !TimelineStorageUtils.matchRelations(
-          entity.getRelatesToEntities(), relatesTo)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.RELATES_TO)) {
-        entity.getRelatesToEntities().clear();
-      }
-    }
-
-    // fetch info
-    boolean checkInfo = infoFilters != null && infoFilters.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
-      readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false);
-      if (checkInfo &&
-          !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.INFO)) {
-        entity.getInfo().clear();
-      }
-    }
-
-    // fetch configs
-    boolean checkConfigs = configFilters != null && configFilters.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
-      readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true);
-      if (checkConfigs && !TimelineStorageUtils.matchFilters(
-          entity.getConfigs(), configFilters)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.CONFIGS)) {
-        entity.getConfigs().clear();
-      }
-    }
-
-    // fetch events
-    boolean checkEvents = eventFilters != null && eventFilters.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
-      readEvents(entity, result, false);
-      if (checkEvents && !TimelineStorageUtils.matchEventFilters(
-          entity.getEvents(), eventFilters)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.EVENTS)) {
-        entity.getEvents().clear();
-      }
-    }
-
-    // fetch metrics
-    boolean checkMetrics = metricFilters != null && metricFilters.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
-      readMetrics(entity, result, EntityColumnPrefix.METRIC);
-      if (checkMetrics && !TimelineStorageUtils.matchMetricFilters(
-          entity.getMetrics(), metricFilters)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.METRICS)) {
-        entity.getMetrics().clear();
-      }
-    }
-    return entity;
-  }
-
-  /**
-   * Helper method for reading relationship.
-   */
-  protected <T> void readRelationship(
-      TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
-      boolean isRelatedTo) throws IOException {
-    // isRelatedTo and relatesTo are of type Map<String, Set<String>>
-    Map<String, Object> columns = prefix.readResults(result);
-    for (Map.Entry<String, Object> column : columns.entrySet()) {
-      for (String id : Separator.VALUES.splitEncoded(
-          column.getValue().toString())) {
-        if (isRelatedTo) {
-          entity.addIsRelatedToEntity(column.getKey(), id);
-        } else {
-          entity.addRelatesToEntity(column.getKey(), id);
-        }
-      }
-    }
-  }
-
-  /**
-   * Helper method for reading key-value pairs for either info or config.
-   */
-  protected <T> void readKeyValuePairs(
-      TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
-      boolean isConfig) throws IOException {
-    // info and configuration are of type Map<String, Object or String>
-    Map<String, Object> columns = prefix.readResults(result);
-    if (isConfig) {
-      for (Map.Entry<String, Object> column : columns.entrySet()) {
-        entity.addConfig(column.getKey(), column.getValue().toString());
-      }
-    } else {
-      entity.addInfo(columns);
-    }
-  }
-
-  /**
-   * Read events from the entity table or the application table. The column name
-   * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted
-   * if there is no info associated with the event.
-   *
-   * See {@link EntityTable} and {@link ApplicationTable} for a more detailed
-   * schema description.
-   */
-  protected void readEvents(TimelineEntity entity, Result result,
-      boolean isApplication) throws IOException {
-    Map<String, TimelineEvent> eventsMap = new HashMap<>();
-    Map<?, Object> eventsResult = isApplication ?
-        ApplicationColumnPrefix.EVENT.
-            readResultsHavingCompoundColumnQualifiers(result) :
-        EntityColumnPrefix.EVENT.
-            readResultsHavingCompoundColumnQualifiers(result);
-    for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) {
-      byte[][] karr = (byte[][])eventResult.getKey();
-      // the column name is of the form "eventId=timestamp=infoKey"
-      if (karr.length == 3) {
-        String id = Bytes.toString(karr[0]);
-        long ts = TimelineStorageUtils.invertLong(Bytes.toLong(karr[1]));
-        String key = Separator.VALUES.joinEncoded(id, Long.toString(ts));
-        TimelineEvent event = eventsMap.get(key);
-        if (event == null) {
-          event = new TimelineEvent();
-          event.setId(id);
-          event.setTimestamp(ts);
-          eventsMap.put(key, event);
-        }
-        // handle empty info
-        String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]);
-        if (infoKey != null) {
-          event.addInfo(infoKey, eventResult.getValue());
-        }
-      } else {
-        LOG.warn("incorrectly formatted column name: it will be discarded");
-        continue;
-      }
-    }
-    Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
-    entity.addEvents(eventsSet);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/63d90992/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
index 96c5a19..bc48cbe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReader;
+import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReaderFactory;
 
 public class HBaseTimelineReaderImpl
     extends AbstractService implements TimelineReader {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/63d90992/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
deleted file mode 100644
index a26c0c2..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.NavigableSet;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
-
-/**
- * The base class for reading and deserializing timeline entities from the
- * HBase storage. Different types can be defined for different types of the
- * entities that are being requested.
- */
-abstract class TimelineEntityReader {
-  private static final Log LOG = LogFactory.getLog(TimelineEntityReader.class);
-  protected static final long DEFAULT_BEGIN_TIME = 0L;
-  protected static final long DEFAULT_END_TIME = Long.MAX_VALUE;
-
-  protected final boolean singleEntityRead;
-
-  protected String userId;
-  protected String clusterId;
-  protected String flowName;
-  protected Long flowRunId;
-  protected String appId;
-  protected String entityType;
-  protected EnumSet<Field> fieldsToRetrieve;
-  // used only for a single entity read mode
-  protected String entityId;
-  // used only for multiple entity read mode
-  protected Long limit;
-  protected Long createdTimeBegin;
-  protected Long createdTimeEnd;
-  protected Long modifiedTimeBegin;
-  protected Long modifiedTimeEnd;
-  protected Map<String, Set<String>> relatesTo;
-  protected Map<String, Set<String>> isRelatedTo;
-  protected Map<String, Object> infoFilters;
-  protected Map<String, String> configFilters;
-  protected Set<String> metricFilters;
-  protected Set<String> eventFilters;
-  protected TimelineFilterList confsToRetrieve;
-  protected TimelineFilterList metricsToRetrieve;
-
-  /**
-   * Main table the entity reader uses.
-   */
-  protected BaseTable<?> table;
-
-  /**
-   * Specifies whether keys for this table are sorted in a manner where entities
-   * can be retrieved by created time. If true, it will be sufficient to collect
-   * the first results as specified by the limit. Otherwise all matched entities
-   * will be fetched and then limit applied.
-   */
-  private boolean sortedKeys = false;
-
-  /**
-   * Instantiates a reader for multiple-entity reads.
-   */
-  protected TimelineEntityReader(String userId, String clusterId,
-      String flowName, Long flowRunId, String appId, String entityType,
-      Long limit, Long createdTimeBegin, Long createdTimeEnd,
-      Long modifiedTimeBegin, Long modifiedTimeEnd,
-      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
-      Map<String, Object> infoFilters, Map<String, String> configFilters,
-      Set<String> metricFilters, Set<String> eventFilters,
-      TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
-      EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) {
-    this.singleEntityRead = false;
-    this.sortedKeys = sortedKeys;
-    this.userId = userId;
-    this.clusterId = clusterId;
-    this.flowName = flowName;
-    this.flowRunId = flowRunId;
-    this.appId = appId;
-    this.entityType = entityType;
-    this.fieldsToRetrieve = fieldsToRetrieve;
-    this.limit = limit;
-    this.createdTimeBegin = createdTimeBegin;
-    this.createdTimeEnd = createdTimeEnd;
-    this.modifiedTimeBegin = modifiedTimeBegin;
-    this.modifiedTimeEnd = modifiedTimeEnd;
-    this.relatesTo = relatesTo;
-    this.isRelatedTo = isRelatedTo;
-    this.infoFilters = infoFilters;
-    this.configFilters = configFilters;
-    this.metricFilters = metricFilters;
-    this.eventFilters = eventFilters;
-    this.confsToRetrieve = confsToRetrieve;
-    this.metricsToRetrieve = metricsToRetrieve;
-
-    this.table = getTable();
-  }
-
-  /**
-   * Instantiates a reader for single-entity reads.
-   */
-  protected TimelineEntityReader(String userId, String clusterId,
-      String flowName, Long flowRunId, String appId, String entityType,
-      String entityId, TimelineFilterList confsToRetrieve,
-      TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
-    this.singleEntityRead = true;
-    this.userId = userId;
-    this.clusterId = clusterId;
-    this.flowName = flowName;
-    this.flowRunId = flowRunId;
-    this.appId = appId;
-    this.entityType = entityType;
-    this.fieldsToRetrieve = fieldsToRetrieve;
-    this.entityId = entityId;
-    this.confsToRetrieve = confsToRetrieve;
-    this.metricsToRetrieve = metricsToRetrieve;
-
-    this.table = getTable();
-  }
-
-  /**
-   * Creates a {@link FilterList} based on fields, confs and metrics to
-   * retrieve. This filter list will be set in Scan/Get objects to trim down
-   * results fetched from HBase back-end storage.
-   * @return a {@link FilterList} object.
-   */
-  protected abstract FilterList constructFilterListBasedOnFields();
-
-  /**
-   * Reads and deserializes a single timeline entity from the HBase storage.
-   */
-  public TimelineEntity readEntity(Configuration hbaseConf, Connection conn)
-      throws IOException {
-    validateParams();
-    augmentParams(hbaseConf, conn);
-
-    FilterList filterList = constructFilterListBasedOnFields();
-    Result result = getResult(hbaseConf, conn, filterList);
-    if (result == null || result.isEmpty()) {
-      // Could not find a matching row.
-      LOG.info("Cannot find matching entity of type " + entityType);
-      return null;
-    }
-    return parseEntity(result);
-  }
-
-  /**
-   * Reads and deserializes a set of timeline entities from the HBase storage.
-   * It goes through all the results available, and returns the number of
-   * entries as specified in the limit in the entity's natural sort order.
-   */
-  public Set<TimelineEntity> readEntities(Configuration hbaseConf,
-      Connection conn) throws IOException {
-    validateParams();
-    augmentParams(hbaseConf, conn);
-
-    NavigableSet<TimelineEntity> entities = new TreeSet<>();
-    FilterList filterList = constructFilterListBasedOnFields();
-    ResultScanner results = getResults(hbaseConf, conn, filterList);
-    try {
-      for (Result result : results) {
-        TimelineEntity entity = parseEntity(result);
-        if (entity == null) {
-          continue;
-        }
-        entities.add(entity);
-        if (!sortedKeys) {
-          if (entities.size() > limit) {
-            entities.pollLast();
-          }
-        } else {
-          if (entities.size() == limit) {
-            break;
-          }
-        }
-      }
-      return entities;
-    } finally {
-      results.close();
-    }
-  }
-
-  /**
-   * Returns the main table to be used by the entity reader.
-   */
-  protected abstract BaseTable<?> getTable();
-
-  /**
-   * Validates the required parameters to read the entities.
-   */
-  protected abstract void validateParams();
-
-  /**
-   * Sets certain parameters to defaults if the values are not provided.
-   */
-  protected abstract void augmentParams(Configuration hbaseConf,
-      Connection conn) throws IOException;
-
-  /**
-   * Fetches a {@link Result} instance for a single-entity read.
-   *
-   * @return the {@link Result} instance or null if no such record is found.
-   */
-  protected abstract Result getResult(Configuration hbaseConf, Connection conn,
-      FilterList filterList) throws IOException;
-
-  /**
-   * Fetches a {@link ResultScanner} for a multi-entity read.
-   */
-  protected abstract ResultScanner getResults(Configuration hbaseConf,
-      Connection conn, FilterList filterList) throws IOException;
-
-  /**
-   * Given a {@link Result} instance, deserializes and creates a
-   * {@link TimelineEntity}.
-   *
-   * @return the {@link TimelineEntity} instance, or null if the {@link Result}
-   * is null or empty.
-   */
-  protected abstract TimelineEntity parseEntity(Result result)
-      throws IOException;
-
-  /**
-   * Helper method for reading and deserializing {@link TimelineMetric} objects
-   * using the specified column prefix. The timeline metrics then are added to
-   * the given timeline entity.
-   */
-  protected void readMetrics(TimelineEntity entity, Result result,
-      ColumnPrefix<?> columnPrefix) throws IOException {
-    NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
-        columnPrefix.readResultsWithTimestamps(result);
-    for (Map.Entry<String, NavigableMap<Long, Number>> metricResult:
-        metricsResult.entrySet()) {
-      TimelineMetric metric = new TimelineMetric();
-      metric.setId(metricResult.getKey());
-      // Simply assume that if the value set contains more than 1 elements, the
-      // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric
-      metric.setType(metricResult.getValue().size() > 1 ?
-          TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE);
-      metric.addValues(metricResult.getValue());
-      entity.addMetric(metric);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/63d90992/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
deleted file mode 100644
index 36ed4ca..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
-
-/**
- * Factory methods for instantiating a timeline entity reader.
- */
-class TimelineEntityReaderFactory {
-  /**
-   * Creates a timeline entity reader instance for reading a single entity with
-   * the specified input.
-   */
-  public static TimelineEntityReader createSingleEntityReader(String userId,
-      String clusterId, String flowName, Long flowRunId, String appId,
-      String entityType, String entityId, TimelineFilterList confs,
-      TimelineFilterList metrics, EnumSet<Field> fieldsToRetrieve) {
-    // currently the types that are handled separate from the generic entity
-    // table are application, flow run, and flow activity entities
-    if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) {
-      return new ApplicationEntityReader(userId, clusterId, flowName, flowRunId,
-          appId, entityType, entityId, confs, metrics, fieldsToRetrieve);
-    } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) {
-      return new FlowRunEntityReader(userId, clusterId, flowName, flowRunId,
-          appId, entityType, entityId, confs, metrics, fieldsToRetrieve);
-    } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) {
-      return new FlowActivityEntityReader(userId, clusterId, flowName, flowRunId,
-          appId, entityType, entityId, fieldsToRetrieve);
-    } else {
-      // assume we're dealing with a generic entity read
-      return new GenericEntityReader(userId, clusterId, flowName, flowRunId,
-        appId, entityType, entityId, confs, metrics, fieldsToRetrieve);
-    }
-  }
-
-  /**
-   * Creates a timeline entity reader instance for reading set of entities with
-   * the specified input and predicates.
-   */
-  public static TimelineEntityReader createMultipleEntitiesReader(String userId,
-      String clusterId, String flowName, Long flowRunId, String appId,
-      String entityType, Long limit, Long createdTimeBegin, Long createdTimeEnd,
-      Long modifiedTimeBegin, Long modifiedTimeEnd,
-      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
-      Map<String, Object> infoFilters, Map<String, String> configFilters,
-      Set<String> metricFilters, Set<String> eventFilters,
-      TimelineFilterList confs, TimelineFilterList metrics,
-      EnumSet<Field> fieldsToRetrieve) {
-    // currently the types that are handled separate from the generic entity
-    // table are application, flow run, and flow activity entities
-    if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) {
-      return new ApplicationEntityReader(userId, clusterId, flowName, flowRunId,
-          appId, entityType, limit, createdTimeBegin, createdTimeEnd,
-          modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
-          infoFilters, configFilters, metricFilters, eventFilters, confs,
-          metrics, fieldsToRetrieve);
-    } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) {
-      return new FlowActivityEntityReader(userId, clusterId, flowName, flowRunId,
-          appId, entityType, limit, createdTimeBegin, createdTimeEnd,
-          modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
-          infoFilters, configFilters, metricFilters, eventFilters,
-          fieldsToRetrieve);
-    } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) {
-      return new FlowRunEntityReader(userId, clusterId, flowName, flowRunId,
-          appId, entityType, limit, createdTimeBegin, createdTimeEnd,
-          modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
-          infoFilters, configFilters, metricFilters, eventFilters, confs,
-          metrics, fieldsToRetrieve);
-    } else {
-      // assume we're dealing with a generic entity read
-      return new GenericEntityReader(userId, clusterId, flowName, flowRunId,
-          appId, entityType, limit, createdTimeBegin, createdTimeEnd,
-          modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
-          infoFilters, configFilters, metricFilters, eventFilters, confs,
-          metrics, fieldsToRetrieve, false);
-    }
-  }
-}


Mime
View raw message