hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sj...@apache.org
Subject [42/50] [abbrv] hadoop git commit: YARN-4074. [timeline reader] implement support for querying for flows and flow runs (sjlee via vrushali)
Date Tue, 13 Oct 2015 17:53:30 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d8d8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
index c514c20..889ae19 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
@@ -20,13 +20,8 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
 
 import java.io.IOException;
 import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.NavigableMap;
-import java.util.NavigableSet;
 import java.util.Set;
-import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,47 +29,17 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
-
-import com.google.common.base.Preconditions;
 
 public class HBaseTimelineReaderImpl
     extends AbstractService implements TimelineReader {
 
   private static final Log LOG = LogFactory
       .getLog(HBaseTimelineReaderImpl.class);
-  private static final long DEFAULT_BEGIN_TIME = 0L;
-  private static final long DEFAULT_END_TIME = Long.MAX_VALUE;
 
   private Configuration hbaseConf = null;
   private Connection conn;
-  private EntityTable entityTable;
-  private AppToFlowTable appToFlowTable;
-  private ApplicationTable applicationTable;
 
   public HBaseTimelineReaderImpl() {
     super(HBaseTimelineReaderImpl.class.getName());
@@ -85,9 +50,6 @@ public class HBaseTimelineReaderImpl
     super.serviceInit(conf);
     hbaseConf = HBaseConfiguration.create(conf);
     conn = ConnectionFactory.createConnection(hbaseConf);
-    entityTable = new EntityTable();
-    appToFlowTable = new AppToFlowTable();
-    applicationTable = new ApplicationTable();
   }
 
   @Override
@@ -104,35 +66,10 @@ public class HBaseTimelineReaderImpl
       String flowId, Long flowRunId, String appId, String entityType,
       String entityId, EnumSet<Field> fieldsToRetrieve)
       throws IOException {
-    validateParams(userId, clusterId, appId, entityType, entityId, true);
-    // In reality both should be null or neither should be null
-    if (flowId == null || flowRunId == null) {
-      FlowContext context = lookupFlowContext(clusterId, appId);
-      flowId = context.flowId;
-      flowRunId = context.flowRunId;
-    }
-    if (fieldsToRetrieve == null) {
-      fieldsToRetrieve = EnumSet.noneOf(Field.class);
-    }
-
-    boolean isApplication = isApplicationEntity(entityType);
-    byte[] rowKey = isApplication ?
-        ApplicationRowKey.getRowKey(clusterId, userId, flowId, flowRunId,
-            appId) :
-        EntityRowKey.getRowKey(clusterId, userId, flowId, flowRunId, appId,
-            entityType, entityId);
-    Get get = new Get(rowKey);
-    get.setMaxVersions(Integer.MAX_VALUE);
-    Result result = isApplication ?
-        applicationTable.getResult(hbaseConf, conn, get) :
-        entityTable.getResult(hbaseConf, conn, get);
-    return parseEntity(result, fieldsToRetrieve,
-        false, DEFAULT_BEGIN_TIME, DEFAULT_END_TIME, false, DEFAULT_BEGIN_TIME,
-        DEFAULT_END_TIME, null, null, null, null, null, null, isApplication);
-  }
-
-  private static boolean isApplicationEntity(String entityType) {
-    return TimelineEntityType.YARN_APPLICATION.toString().equals(entityType);
+    TimelineEntityReader reader =
+        TimelineEntityReaderFactory.createSingleEntityReader(userId, clusterId,
+            flowId, flowRunId, appId, entityType, entityId, fieldsToRetrieve);
+    return reader.readEntity(hbaseConf, conn);
   }
 
   @Override
@@ -144,361 +81,12 @@ public class HBaseTimelineReaderImpl
       Map<String, Object> infoFilters, Map<String, String> configFilters,
       Set<String> metricFilters, Set<String> eventFilters,
       EnumSet<Field> fieldsToRetrieve) throws IOException {
-    validateParams(userId, clusterId, appId, entityType, null, false);
-    // In reality both should be null or neither should be null
-    if (flowId == null || flowRunId == null) {
-      FlowContext context = lookupFlowContext(clusterId, appId);
-      flowId = context.flowId;
-      flowRunId = context.flowRunId;
-    }
-    if (limit == null) {
-      limit = TimelineReader.DEFAULT_LIMIT;
-    }
-    if (createdTimeBegin == null) {
-      createdTimeBegin = DEFAULT_BEGIN_TIME;
-    }
-    if (createdTimeEnd == null) {
-      createdTimeEnd = DEFAULT_END_TIME;
-    }
-    if (modifiedTimeBegin == null) {
-      modifiedTimeBegin = DEFAULT_BEGIN_TIME;
-    }
-    if (modifiedTimeEnd == null) {
-      modifiedTimeEnd = DEFAULT_END_TIME;
-    }
-    if (fieldsToRetrieve == null) {
-      fieldsToRetrieve = EnumSet.noneOf(Field.class);
-    }
-
-    NavigableSet<TimelineEntity> entities = new TreeSet<>();
-    boolean isApplication = isApplicationEntity(entityType);
-    if (isApplication) {
-      // If getEntities() is called for an application, there can be at most
-      // one entity. If the entity passes the filter, it is returned. Otherwise,
-      // an empty set is returned.
-      byte[] rowKey = ApplicationRowKey.getRowKey(clusterId, userId, flowId,
-          flowRunId, appId);
-      Get get = new Get(rowKey);
-      get.setMaxVersions(Integer.MAX_VALUE);
-      Result result = applicationTable.getResult(hbaseConf, conn, get);
-      TimelineEntity entity = parseEntity(result, fieldsToRetrieve,
-          true, createdTimeBegin, createdTimeEnd, true, modifiedTimeBegin,
-          modifiedTimeEnd, isRelatedTo, relatesTo, infoFilters, configFilters,
-          eventFilters, metricFilters, isApplication);
-      if (entity != null) {
-        entities.add(entity);
-      }
-    } else {
-      // Scan through part of the table to find the entities belong to one app
-      // and one type
-      Scan scan = new Scan();
-      scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix(
-          clusterId, userId, flowId, flowRunId, appId, entityType));
-      scan.setMaxVersions(Integer.MAX_VALUE);
-      ResultScanner scanner =
-          entityTable.getResultScanner(hbaseConf, conn, scan);
-      for (Result result : scanner) {
-        TimelineEntity entity = parseEntity(result, fieldsToRetrieve,
-            true, createdTimeBegin, createdTimeEnd,
-            true, modifiedTimeBegin, modifiedTimeEnd,
-            isRelatedTo, relatesTo, infoFilters, configFilters, eventFilters,
-            metricFilters, isApplication);
-        if (entity == null) {
-          continue;
-        }
-        if (entities.size() > limit) {
-          entities.pollLast();
-        }
-        entities.add(entity);
-      }
-    }
-    return entities;
-  }
-
-  private FlowContext lookupFlowContext(String clusterId, String appId)
-      throws IOException {
-    byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
-    Get get = new Get(rowKey);
-    Result result = appToFlowTable.getResult(hbaseConf, conn, get);
-    if (result != null && !result.isEmpty()) {
-      return new FlowContext(
-          AppToFlowColumn.FLOW_ID.readResult(result).toString(),
-          ((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue());
-    } else {
-       throw new IOException(
-           "Unable to find the context flow ID and flow run ID for clusterId=" +
-           clusterId + ", appId=" + appId);
-    }
-  }
-
-  private static class FlowContext {
-    private String flowId;
-    private Long flowRunId;
-    public FlowContext(String flowId, Long flowRunId) {
-      this.flowId = flowId;
-      this.flowRunId = flowRunId;
-    }
-  }
-
-  private static void validateParams(String userId, String clusterId,
-      String appId, String entityType, String entityId, boolean checkEntityId) {
-    Preconditions.checkNotNull(userId, "userId shouldn't be null");
-    Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
-    Preconditions.checkNotNull(appId, "appId shouldn't be null");
-    Preconditions.checkNotNull(entityType, "entityType shouldn't be null");
-    if (checkEntityId) {
-      Preconditions.checkNotNull(entityId, "entityId shouldn't be null");
-    }
-  }
-
-  private static TimelineEntity parseEntity(
-      Result result, EnumSet<Field> fieldsToRetrieve,
-      boolean checkCreatedTime, long createdTimeBegin, long createdTimeEnd,
-      boolean checkModifiedTime, long modifiedTimeBegin, long modifiedTimeEnd,
-      Map<String, Set<String>> isRelatedTo, Map<String, Set<String>> relatesTo,
-      Map<String, Object> infoFilters, Map<String, String> configFilters,
-      Set<String> eventFilters, Set<String> metricFilters,
-      boolean isApplication)
-          throws IOException {
-    if (result == null || result.isEmpty()) {
-      return null;
-    }
-    TimelineEntity entity = new TimelineEntity();
-    String entityType = isApplication ?
-        TimelineEntityType.YARN_APPLICATION.toString() :
-        EntityColumn.TYPE.readResult(result).toString();
-    entity.setType(entityType);
-    String entityId = isApplication ?
-        ApplicationColumn.ID.readResult(result).toString() :
-        EntityColumn.ID.readResult(result).toString();
-    entity.setId(entityId);
-
-    // fetch created time
-    Number createdTime = isApplication ?
-        (Number)ApplicationColumn.CREATED_TIME.readResult(result) :
-        (Number)EntityColumn.CREATED_TIME.readResult(result);
-    entity.setCreatedTime(createdTime.longValue());
-    if (checkCreatedTime && (entity.getCreatedTime() < createdTimeBegin ||
-        entity.getCreatedTime() > createdTimeEnd)) {
-      return null;
-    }
-
-    // fetch modified time
-    Number modifiedTime = isApplication ?
-        (Number)ApplicationColumn.MODIFIED_TIME.readResult(result) :
-        (Number)EntityColumn.MODIFIED_TIME.readResult(result);
-    entity.setModifiedTime(modifiedTime.longValue());
-    if (checkModifiedTime && (entity.getModifiedTime() < modifiedTimeBegin ||
-        entity.getModifiedTime() > modifiedTimeEnd)) {
-      return null;
-    }
-
-    // fetch is related to entities
-    boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
-      if (isApplication) {
-        readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO,
-            true);
-      } else {
-        readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO,
-            true);
-      }
-      if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations(
-          entity.getIsRelatedToEntities(), isRelatedTo)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
-        entity.getIsRelatedToEntities().clear();
-      }
-    }
-
-    // fetch relates to entities
-    boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
-      if (isApplication) {
-        readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO,
-            false);
-      } else {
-        readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false);
-      }
-      if (checkRelatesTo && !TimelineReaderUtils.matchRelations(
-          entity.getRelatesToEntities(), relatesTo)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.RELATES_TO)) {
-        entity.getRelatesToEntities().clear();
-      }
-    }
-
-    // fetch info
-    boolean checkInfo = infoFilters != null && infoFilters.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
-      if (isApplication) {
-        readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false);
-      } else {
-        readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false);
-      }
-      if (checkInfo &&
-          !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.INFO)) {
-        entity.getInfo().clear();
-      }
-    }
-
-    // fetch configs
-    boolean checkConfigs = configFilters != null && configFilters.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
-      if (isApplication) {
-        readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true);
-      } else {
-        readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true);
-      }
-      if (checkConfigs && !TimelineReaderUtils.matchFilters(
-          entity.getConfigs(), configFilters)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.CONFIGS)) {
-        entity.getConfigs().clear();
-      }
-    }
-
-    // fetch events
-    boolean checkEvents = eventFilters != null && eventFilters.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
-      readEvents(entity, result, isApplication);
-      if (checkEvents && !TimelineReaderUtils.matchEventFilters(
-          entity.getEvents(), eventFilters)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.EVENTS)) {
-        entity.getEvents().clear();
-      }
-    }
-
-    // fetch metrics
-    boolean checkMetrics = metricFilters != null && metricFilters.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
-      readMetrics(entity, result, isApplication);
-      if (checkMetrics && !TimelineReaderUtils.matchMetricFilters(
-          entity.getMetrics(), metricFilters)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.METRICS)) {
-        entity.getMetrics().clear();
-      }
-    }
-    return entity;
-  }
-
-  private static <T> void readRelationship(
-      TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
-      boolean isRelatedTo) throws IOException {
-    // isRelatedTo and relatesTo are of type Map<String, Set<String>>
-    Map<String, Object> columns = prefix.readResults(result);
-    for (Map.Entry<String, Object> column : columns.entrySet()) {
-      for (String id : Separator.VALUES.splitEncoded(
-          column.getValue().toString())) {
-        if (isRelatedTo) {
-          entity.addIsRelatedToEntity(column.getKey(), id);
-        } else {
-          entity.addRelatesToEntity(column.getKey(), id);
-        }
-      }
-    }
-  }
-
-  private static <T> void readKeyValuePairs(
-      TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
-      boolean isConfig) throws IOException {
-    // info and configuration are of type Map<String, Object or String>
-    Map<String, Object> columns = prefix.readResults(result);
-    if (isConfig) {
-      for (Map.Entry<String, Object> column : columns.entrySet()) {
-        entity.addConfig(column.getKey(), column.getValue().toString());
-      }
-    } else {
-      entity.addInfo(columns);
-    }
-  }
-
-  /**
-   * Read events from the entity table or the application table. The column name
-   * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted
-   * if there is no info associated with the event.
-   *
-   * See {@link EntityTable} and {@link ApplicationTable} for a more detailed
-   * schema description.
-   */
-  private static void readEvents(TimelineEntity entity, Result result,
-      boolean isApplication) throws IOException {
-    Map<String, TimelineEvent> eventsMap = new HashMap<>();
-    Map<?, Object> eventsResult = isApplication ?
-        ApplicationColumnPrefix.EVENT.
-            readResultsHavingCompoundColumnQualifiers(result) :
-        EntityColumnPrefix.EVENT.
-            readResultsHavingCompoundColumnQualifiers(result);
-    for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) {
-      byte[][] karr = (byte[][])eventResult.getKey();
-      // the column name is of the form "eventId=timestamp=infoKey"
-      if (karr.length == 3) {
-        String id = Bytes.toString(karr[0]);
-        long ts = TimelineWriterUtils.invert(Bytes.toLong(karr[1]));
-        String key = Separator.VALUES.joinEncoded(id, Long.toString(ts));
-        TimelineEvent event = eventsMap.get(key);
-        if (event == null) {
-          event = new TimelineEvent();
-          event.setId(id);
-          event.setTimestamp(ts);
-          eventsMap.put(key, event);
-        }
-        // handle empty info
-        String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]);
-        if (infoKey != null) {
-          event.addInfo(infoKey, eventResult.getValue());
-        }
-      } else {
-        LOG.warn("incorrectly formatted column name: it will be discarded");
-        continue;
-      }
-    }
-    Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
-    entity.addEvents(eventsSet);
-  }
-
-  private static void readMetrics(TimelineEntity entity, Result result,
-      boolean isApplication) throws IOException {
-    NavigableMap<String, NavigableMap<Long, Number>> metricsResult;
-    if (isApplication) {
-      metricsResult =
-          ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result);
-    } else {
-      metricsResult =
-          EntityColumnPrefix.METRIC.readResultsWithTimestamps(result);
-    }
-    for (Map.Entry<String, NavigableMap<Long, Number>> metricResult:
-        metricsResult.entrySet()) {
-      TimelineMetric metric = new TimelineMetric();
-      metric.setId(metricResult.getKey());
-      // Simply assume that if the value set contains more than 1 elements, the
-      // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric
-      metric.setType(metricResult.getValue().size() > 1 ?
-          TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE);
-      metric.addValues(metricResult.getValue());
-      entity.addMetric(metric);
-    }
+    TimelineEntityReader reader =
+        TimelineEntityReaderFactory.createMultipleEntitiesReader(userId,
+            clusterId, flowId, flowRunId, appId, entityType, limit,
+            createdTimeBegin, createdTimeEnd, modifiedTimeBegin,
+            modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters,
+            metricFilters, eventFilters, fieldsToRetrieve);
+    return reader.readEntities(hbaseConf, conn);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d8d8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
new file mode 100644
index 0000000..0d1134c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+
+/**
+ * The base class for reading and deserializing timeline entities from the
+ * HBase storage. Different types can be defined for different types of the
+ * entities that are being requested.
+ */
+abstract class TimelineEntityReader {
+  protected final boolean singleEntityRead;
+
+  protected String userId;
+  protected String clusterId;
+  protected String flowId;
+  protected Long flowRunId;
+  protected String appId;
+  protected String entityType;
+  protected EnumSet<Field> fieldsToRetrieve;
+  // used only for a single entity read mode
+  protected String entityId;
+  // used only for multiple entity read mode
+  protected Long limit;
+  protected Long createdTimeBegin;
+  protected Long createdTimeEnd;
+  protected Long modifiedTimeBegin;
+  protected Long modifiedTimeEnd;
+  protected Map<String, Set<String>> relatesTo;
+  protected Map<String, Set<String>> isRelatedTo;
+  protected Map<String, Object> infoFilters;
+  protected Map<String, String> configFilters;
+  protected Set<String> metricFilters;
+  protected Set<String> eventFilters;
+
+  /**
+   * Main table the entity reader uses.
+   */
+  protected BaseTable<?> table;
+
+  /**
+   * Instantiates a reader for multiple-entity reads.
+   */
+  protected TimelineEntityReader(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      Long limit, Long createdTimeBegin, Long createdTimeEnd,
+      Long modifiedTimeBegin, Long modifiedTimeEnd,
+      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String> metricFilters, Set<String> eventFilters,
+      EnumSet<Field> fieldsToRetrieve) {
+    this.singleEntityRead = false;
+    this.userId = userId;
+    this.clusterId = clusterId;
+    this.flowId = flowId;
+    this.flowRunId = flowRunId;
+    this.appId = appId;
+    this.entityType = entityType;
+    this.fieldsToRetrieve = fieldsToRetrieve;
+    this.limit = limit;
+    this.createdTimeBegin = createdTimeBegin;
+    this.createdTimeEnd = createdTimeEnd;
+    this.modifiedTimeBegin = modifiedTimeBegin;
+    this.modifiedTimeEnd = modifiedTimeEnd;
+    this.relatesTo = relatesTo;
+    this.isRelatedTo = isRelatedTo;
+    this.infoFilters = infoFilters;
+    this.configFilters = configFilters;
+    this.metricFilters = metricFilters;
+    this.eventFilters = eventFilters;
+
+    this.table = getTable();
+  }
+
+  /**
+   * Instantiates a reader for single-entity reads.
+   */
+  protected TimelineEntityReader(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      String entityId, EnumSet<Field> fieldsToRetrieve) {
+    this.singleEntityRead = true;
+    this.userId = userId;
+    this.clusterId = clusterId;
+    this.flowId = flowId;
+    this.flowRunId = flowRunId;
+    this.appId = appId;
+    this.entityType = entityType;
+    this.fieldsToRetrieve = fieldsToRetrieve;
+    this.entityId = entityId;
+
+    this.table = getTable();
+  }
+
+  /**
+   * Reads and deserializes a single timeline entity from the HBase storage.
+   */
+  public TimelineEntity readEntity(Configuration hbaseConf, Connection conn)
+      throws IOException {
+    validateParams();
+    augmentParams(hbaseConf, conn);
+
+    Result result = getResult(hbaseConf, conn);
+    return parseEntity(result);
+  }
+
+  /**
+   * Reads and deserializes a set of timeline entities from the HBase storage.
+   * It goes through all the results available, and returns the number of
+   * entries as specified in the limit in the entity's natural sort order.
+   */
+  public Set<TimelineEntity> readEntities(Configuration hbaseConf,
+      Connection conn) throws IOException {
+    validateParams();
+    augmentParams(hbaseConf, conn);
+
+    NavigableSet<TimelineEntity> entities = new TreeSet<>();
+    Iterable<Result> results = getResults(hbaseConf, conn);
+    for (Result result : results) {
+      TimelineEntity entity = parseEntity(result);
+      if (entity == null) {
+        continue;
+      }
+      entities.add(entity);
+      if (entities.size() > limit) {
+        entities.pollLast();
+      }
+    }
+    return entities;
+  }
+
+  /**
+   * Returns the main table to be used by the entity reader.
+   */
+  protected abstract BaseTable<?> getTable();
+
+  /**
+   * Validates the required parameters to read the entities.
+   */
+  protected abstract void validateParams();
+
+  /**
+   * Sets certain parameters to defaults if the values are not provided.
+   */
+  protected abstract void augmentParams(Configuration hbaseConf,
+      Connection conn) throws IOException;
+
+  /**
+   * Fetches a {@link Result} instance for a single-entity read.
+   *
+   * @return the {@link Result} instance or null if no such record is found.
+   */
+  protected abstract Result getResult(Configuration hbaseConf, Connection conn)
+      throws IOException;
+
+  /**
+   * Fetches an iterator for {@link Result} instances for a multi-entity read.
+   */
+  protected abstract Iterable<Result> getResults(Configuration hbaseConf,
+      Connection conn) throws IOException;
+
+  /**
+   * Given a {@link Result} instance, deserializes and creates a
+   * {@link TimelineEntity}.
+   *
+   * @return the {@link TimelineEntity} instance, or null if the {@link Result}
+   * is null or empty.
+   */
+  protected abstract TimelineEntity parseEntity(Result result)
+      throws IOException;
+
+  /**
+   * Helper method for reading and deserializing {@link TimelineMetric} objects
+   * using the specified column prefix. The timeline metrics then are added to
+   * the given timeline entity.
+   */
+  protected void readMetrics(TimelineEntity entity, Result result,
+      ColumnPrefix<?> columnPrefix) throws IOException {
+    NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
+        columnPrefix.readResultsWithTimestamps(result);
+    for (Map.Entry<String, NavigableMap<Long, Number>> metricResult:
+        metricsResult.entrySet()) {
+      TimelineMetric metric = new TimelineMetric();
+      metric.setId(metricResult.getKey());
+      // Simply assume that if the value set contains more than 1 elements, the
+      // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric
+      metric.setType(metricResult.getValue().size() > 1 ?
+          TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE);
+      metric.addValues(metricResult.getValue());
+      entity.addMetric(metric);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d8d8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
new file mode 100644
index 0000000..4fdef40
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+
+/**
+ * Factory methods for instantiating a timeline entity reader.
+ */
+class TimelineEntityReaderFactory {
+  /**
+   * Creates a timeline entity reader instance for reading a single entity with
+   * the specified input.
+   */
+  public static TimelineEntityReader createSingleEntityReader(String userId,
+      String clusterId, String flowId, Long flowRunId, String appId,
+      String entityType, String entityId, EnumSet<Field> fieldsToRetrieve) {
+    // currently the types that are handled separate from the generic entity
+    // table are application, flow run, and flow activity entities
+    if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) {
+      return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId,
+          appId, entityType, entityId, fieldsToRetrieve);
+    } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) {
+      return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId,
+          appId, entityType, entityId, fieldsToRetrieve);
+    } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) {
+      return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId,
+          appId, entityType, entityId, fieldsToRetrieve);
+    } else {
+      // assume we're dealing with a generic entity read
+      return new GenericEntityReader(userId, clusterId, flowId, flowRunId,
+        appId, entityType, entityId, fieldsToRetrieve);
+    }
+  }
+
+  /**
+   * Creates a timeline entity reader instance for reading set of entities with
+   * the specified input and predicates.
+   */
+  public static TimelineEntityReader createMultipleEntitiesReader(String userId,
+      String clusterId, String flowId, Long flowRunId, String appId,
+      String entityType, Long limit, Long createdTimeBegin, Long createdTimeEnd,
+      Long modifiedTimeBegin, Long modifiedTimeEnd,
+      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String> metricFilters, Set<String> eventFilters,
+      EnumSet<Field> fieldsToRetrieve) {
+    // currently the types that are handled separate from the generic entity
+    // table are application, flow run, and flow activity entities
+    if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) {
+      return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId,
+          appId, entityType, limit, createdTimeBegin, createdTimeEnd,
+          modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
+          infoFilters, configFilters, metricFilters, eventFilters,
+          fieldsToRetrieve);
+    } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) {
+      return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId,
+          appId, entityType, limit, createdTimeBegin, createdTimeEnd,
+          modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
+          infoFilters, configFilters, metricFilters, eventFilters,
+          fieldsToRetrieve);
+    } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) {
+      return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId,
+          appId, entityType, limit, createdTimeBegin, createdTimeEnd,
+          modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
+          infoFilters, configFilters, metricFilters, eventFilters,
+          fieldsToRetrieve);
+    } else {
+      // assume we're dealing with a generic entity read
+      return new GenericEntityReader(userId, clusterId, flowId, flowRunId,
+          appId, entityType, limit, createdTimeBegin, createdTimeEnd,
+          modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
+          infoFilters, configFilters, metricFilters, eventFilters,
+          fieldsToRetrieve);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d8d8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
index 5f3868b..e3b5a87 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
@@ -19,14 +19,46 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.application;
 
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
 
 /**
  * Represents a rowkey for the application table.
  */
 public class ApplicationRowKey {
-  // TODO: more methods are needed for this class.
+  private final String clusterId;
+  private final String userId;
+  private final String flowId;
+  private final long flowRunId;
+  private final String appId;
 
-  // TODO: API needs to be cleaned up.
+  public ApplicationRowKey(String clusterId, String userId, String flowId,
+      long flowRunId, String appId) {
+    this.clusterId = clusterId;
+    this.userId = userId;
+    this.flowId = flowId;
+    this.flowRunId = flowRunId;
+    this.appId = appId;
+  }
+
+  public String getClusterId() {
+    return clusterId;
+  }
+
+  public String getUserId() {
+    return userId;
+  }
+
+  public String getFlowId() {
+    return flowId;
+  }
+
+  public long getFlowRunId() {
+    return flowRunId;
+  }
+
+  public String getAppId() {
+    return appId;
+  }
 
   /**
    * Constructs a row key for the application table as follows:
@@ -46,22 +78,32 @@ public class ApplicationRowKey {
             flowId));
     // Note that flowRunId is a long, so we can't encode them all at the same
     // time.
-    byte[] second = Bytes.toBytes(ApplicationRowKey.invert(flowRunId));
+    byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
     byte[] third = Bytes.toBytes(appId);
     return Separator.QUALIFIERS.join(first, second, third);
   }
 
   /**
-   * Converts a timestamp into its inverse timestamp to be used in (row) keys
-   * where we want to have the most recent timestamp in the top of the table
-   * (scans start at the most recent timestamp first).
-   *
-   * @param key value to be inverted so that the latest version will be first in
-   *          a scan.
-   * @return inverted long
+   * Given the raw row key as bytes, returns the row key as an object.
    */
-  public static long invert(Long key) {
-    return Long.MAX_VALUE - key;
-  }
+  public static ApplicationRowKey parseRowKey(byte[] rowKey) {
+    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
+
+    if (rowKeyComponents.length < 5) {
+      throw new IllegalArgumentException("the row key is not valid for " +
+          "an application");
+    }
 
+    String clusterId =
+        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0]));
+    String userId =
+        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1]));
+    String flowId =
+        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
+    long flowRunId =
+        TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3]));
+    String appId =
+        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[4]));
+    return new ApplicationRowKey(clusterId, userId, flowId, flowRunId, appId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d8d8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
index ad4fec6..ca88056 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
@@ -24,6 +24,22 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
  * Represents a rowkey for the app_flow table.
  */
 public class AppToFlowRowKey {
+  private final String clusterId;
+  private final String appId;
+
+  public AppToFlowRowKey(String clusterId, String appId) {
+    this.clusterId = clusterId;
+    this.appId = appId;
+  }
+
+  public String getClusterId() {
+    return clusterId;
+  }
+
+  public String getAppId() {
+    return appId;
+  }
+
   /**
    * Constructs a row key prefix for the app_flow table as follows:
    * {@code clusterId!AppId}
@@ -36,4 +52,19 @@ public class AppToFlowRowKey {
     return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, appId));
   }
 
+  /**
+   * Given the raw row key as bytes, returns the row key as an object.
+   */
+  public static AppToFlowRowKey parseRowKey(byte[] rowKey) {
+    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
+
+    if (rowKeyComponents.length < 2) {
+      throw new IllegalArgumentException("the row key is not valid for " +
+          "the app-to-flow table");
+    }
+
+    String clusterId = Bytes.toString(rowKeyComponents[0]);
+    String appId = Bytes.toString(rowKeyComponents[1]);
+    return new AppToFlowRowKey(clusterId, appId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d8d8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
index abba79a..9545438 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
@@ -31,7 +31,8 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 
 /**
- * Implements behavior common to tables used in the timeline service storage.
+ * Implements behavior common to tables used in the timeline service storage. It
+ * is thread-safe, and can be used by multiple threads concurrently.
  *
  * @param <T> reference to the table instance class itself for type safety.
  */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d8d8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
index 9a72be0..6a534ed73 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
 
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
 
@@ -26,9 +25,52 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWrit
  * Represents a rowkey for the entity table.
  */
 public class EntityRowKey {
-  // TODO: more methods are needed for this class.
+  private final String clusterId;
+  private final String userId;
+  private final String flowId;
+  private final long flowRunId;
+  private final String appId;
+  private final String entityType;
+  private final String entityId;
 
-  // TODO: API needs to be cleaned up.
+  public EntityRowKey(String clusterId, String userId, String flowId,
+      long flowRunId, String appId, String entityType, String entityId) {
+    this.clusterId = clusterId;
+    this.userId = userId;
+    this.flowId = flowId;
+    this.flowRunId = flowRunId;
+    this.appId = appId;
+    this.entityType = entityType;
+    this.entityId = entityId;
+  }
+
+  public String getClusterId() {
+    return clusterId;
+  }
+
+  public String getUserId() {
+    return userId;
+  }
+
+  public String getFlowId() {
+    return flowId;
+  }
+
+  public long getFlowRunId() {
+    return flowRunId;
+  }
+
+  public String getAppId() {
+    return appId;
+  }
+
+  public String getEntityType() {
+    return entityType;
+  }
+
+  public String getEntityId() {
+    return entityId;
+  }
 
   /**
    * Constructs a row key prefix for the entity table as follows:
@@ -106,4 +148,32 @@ public class EntityRowKey {
     return Separator.QUALIFIERS.join(first, second, third);
   }
 
+  /**
+   * Given the raw row key as bytes, returns the row key as an object.
+   */
+  public static EntityRowKey parseRowKey(byte[] rowKey) {
+    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
+
+    if (rowKeyComponents.length < 7) {
+      throw new IllegalArgumentException("the row key is not valid for " +
+          "an entity");
+    }
+
+    String userId =
+        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0]));
+    String clusterId =
+        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1]));
+    String flowId =
+        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
+    long flowRunId =
+        TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3]));
+    String appId =
+        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[4]));
+    String entityType =
+        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[5]));
+    String entityId =
+        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[6]));
+    return new EntityRowKey(clusterId, userId, flowId, flowRunId, appId,
+        entityType, entityId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d8d8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
index 19e4e83..18ca599 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
@@ -55,6 +55,10 @@ public class FlowActivityRowKey {
     return flowId;
   }
 
+  public static byte[] getRowKeyPrefix(String clusterId) {
+    return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, ""));
+  }
+
   /**
    * Constructs a row key for the flow activity table as follows:
    * {@code clusterId!dayTimestamp!user!flowId}
@@ -65,7 +69,8 @@ public class FlowActivityRowKey {
    * @param flowId
    * @return byte array with the row key prefix
    */
-  public static byte[] getRowKey(String clusterId, String userId, String flowId) {
+  public static byte[] getRowKey(String clusterId, String userId,
+      String flowId) {
     long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
         .currentTimeMillis());
     return getRowKey(clusterId, dayTs, userId, flowId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d8d8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
index e133241..880d481 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
@@ -25,7 +25,34 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWrit
  * Represents a rowkey for the flow run table.
  */
 public class FlowRunRowKey {
-  // TODO: more methods are needed for this class like parse row key
+  private final String clusterId;
+  private final String userId;
+  private final String flowId;
+  private final long flowRunId;
+
+  public FlowRunRowKey(String clusterId, String userId, String flowId,
+      long flowRunId) {
+    this.clusterId = clusterId;
+    this.userId = userId;
+    this.flowId = flowId;
+    this.flowRunId = flowRunId;
+  }
+
+  public String getClusterId() {
+    return clusterId;
+  }
+
+  public String getUserId() {
+    return userId;
+  }
+
+  public String getFlowId() {
+    return flowId;
+  }
+
+  public long getFlowRunId() {
+    return flowRunId;
+  }
 
   /**
    * Constructs a row key for the entity table as follows: {
@@ -47,4 +74,25 @@ public class FlowRunRowKey {
     return Separator.QUALIFIERS.join(first, second);
   }
 
+  /**
+   * Given the raw row key as bytes, returns the row key as an object.
+   */
+  public static FlowRunRowKey parseRowKey(byte[] rowKey) {
+    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
+
+    if (rowKeyComponents.length < 4) {
+      throw new IllegalArgumentException("the row key is not valid for " +
+          "a flow run");
+    }
+
+    String clusterId =
+        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0]));
+    String userId =
+        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1]));
+    String flowId =
+        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
+    long flowRunId =
+        TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3]));
+    return new FlowRunRowKey(clusterId, userId, flowId, flowRunId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d8d8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
index a1948aa..651bb3a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -18,6 +18,15 @@
 
 package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
@@ -33,15 +42,6 @@ import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
 import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
 /**
  * Invoked via the coprocessor when a Get or a Scan is issued for flow run
  * table. Looks through the list of cells per row, checks their tags and does

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d8d8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
index 3962341..01920b3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
@@ -508,32 +508,28 @@ public class TestHBaseTimelineStorage {
   private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user,
       String flow, long runid, String appName, TimelineEntity te) {
 
-    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
-
-    assertTrue(rowKeyComponents.length == 7);
-    assertEquals(user, Bytes.toString(rowKeyComponents[0]));
-    assertEquals(cluster, Bytes.toString(rowKeyComponents[1]));
-    assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
-    assertEquals(TimelineWriterUtils.invert(runid),
-        Bytes.toLong(rowKeyComponents[3]));
-    assertEquals(appName, Bytes.toString(rowKeyComponents[4]));
-    assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5]));
-    assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6]));
+    EntityRowKey key = EntityRowKey.parseRowKey(rowKey);
+
+    assertEquals(user, key.getUserId());
+    assertEquals(cluster, key.getClusterId());
+    assertEquals(flow, key.getFlowId());
+    assertEquals(runid, key.getFlowRunId());
+    assertEquals(appName, key.getAppId());
+    assertEquals(te.getType(), key.getEntityType());
+    assertEquals(te.getId(), key.getEntityId());
     return true;
   }
 
   private boolean isApplicationRowKeyCorrect(byte[] rowKey, String cluster,
       String user, String flow, long runid, String appName) {
 
-    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
+    ApplicationRowKey key = ApplicationRowKey.parseRowKey(rowKey);
 
-    assertTrue(rowKeyComponents.length == 5);
-    assertEquals(cluster, Bytes.toString(rowKeyComponents[0]));
-    assertEquals(user, Bytes.toString(rowKeyComponents[1]));
-    assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
-    assertEquals(TimelineWriterUtils.invert(runid),
-        Bytes.toLong(rowKeyComponents[3]));
-    assertEquals(appName, Bytes.toString(rowKeyComponents[4]));
+    assertEquals(cluster, key.getClusterId());
+    assertEquals(user, key.getUserId());
+    assertEquals(flow, key.getFlowId());
+    assertEquals(runid, key.getFlowRunId());
+    assertEquals(appName, key.getAppId());
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d8d8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
index f8331fa..d18613a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
@@ -45,7 +45,7 @@ class TestFlowDataGenerator {
     String type = TimelineEntityType.YARN_APPLICATION.toString();
     entity.setId(id);
     entity.setType(type);
-    Long cTime = 1425016501000L;
+    long cTime = 1425016501000L;
     entity.setCreatedTime(cTime);
 
     // add metrics
@@ -54,8 +54,8 @@ class TestFlowDataGenerator {
     m1.setId(metric1);
     Map<Long, Number> metricValues = new HashMap<Long, Number>();
     long ts = System.currentTimeMillis();
-    metricValues.put(ts - 100000, 2);
-    metricValues.put(ts - 80000, 40);
+    metricValues.put(ts - 100000, 2L);
+    metricValues.put(ts - 80000, 40L);
     m1.setType(Type.TIME_SERIES);
     m1.setValues(metricValues);
     metrics.add(m1);
@@ -64,8 +64,8 @@ class TestFlowDataGenerator {
     m2.setId(metric2);
     metricValues = new HashMap<Long, Number>();
     ts = System.currentTimeMillis();
-    metricValues.put(ts - 100000, 31);
-    metricValues.put(ts - 80000, 57);
+    metricValues.put(ts - 100000, 31L);
+    metricValues.put(ts - 80000, 57L);
     m2.setType(Type.TIME_SERIES);
     m2.setValues(metricValues);
     metrics.add(m2);
@@ -80,7 +80,7 @@ class TestFlowDataGenerator {
     String type = TimelineEntityType.YARN_APPLICATION.toString();
     entity.setId(id);
     entity.setType(type);
-    Long cTime = 1425016501000L;
+    long cTime = 1425016501000L;
     entity.setCreatedTime(cTime);
     // add metrics
     Set<TimelineMetric> metrics = new HashSet<>();
@@ -103,8 +103,8 @@ class TestFlowDataGenerator {
     String type = TimelineEntityType.YARN_APPLICATION.toString();
     entity.setId(id);
     entity.setType(type);
-    Long cTime = 20000000000000L;
-    Long mTime = 1425026901000L;
+    long cTime = 20000000000000L;
+    long mTime = 1425026901000L;
     entity.setCreatedTime(cTime);
     entity.setModifiedTime(mTime);
     // add metrics
@@ -113,10 +113,10 @@ class TestFlowDataGenerator {
     m1.setId(metric1);
     Map<Long, Number> metricValues = new HashMap<Long, Number>();
     long ts = System.currentTimeMillis();
-    metricValues.put(ts - 120000, 100000000);
-    metricValues.put(ts - 100000, 200000000);
-    metricValues.put(ts - 80000, 300000000);
-    metricValues.put(ts - 60000, 400000000);
+    metricValues.put(ts - 120000, 100000000L);
+    metricValues.put(ts - 100000, 200000000L);
+    metricValues.put(ts - 80000, 300000000L);
+    metricValues.put(ts - 60000, 400000000L);
     metricValues.put(ts - 40000, 50000000000L);
     metricValues.put(ts - 20000, 60000000000L);
     m1.setType(Type.TIME_SERIES);
@@ -126,7 +126,7 @@ class TestFlowDataGenerator {
 
     TimelineEvent event = new TimelineEvent();
     event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    Long expTs = 1436512802000L;
+    long expTs = 1436512802000L;
     event.setTimestamp(expTs);
     String expKey = "foo_event";
     Object expVal = "test";
@@ -142,9 +142,9 @@ class TestFlowDataGenerator {
     return entity;
   }
 
-  static TimelineEntity getEntityGreaterStartTime() {
+  static TimelineEntity getEntityGreaterStartTime(long startTs) {
     TimelineEntity entity = new TimelineEntity();
-    entity.setCreatedTime(30000000000000L);
+    entity.setCreatedTime(startTs);
     entity.setId("flowRunHello with greater start time");
     String type = TimelineEntityType.YARN_APPLICATION.toString();
     entity.setType(type);
@@ -173,14 +173,13 @@ class TestFlowDataGenerator {
     return entity;
   }
 
-  static TimelineEntity getEntityMinStartTime() {
+  static TimelineEntity getEntityMinStartTime(long startTs) {
     TimelineEntity entity = new TimelineEntity();
     String id = "flowRunHelloMInStartTime";
     String type = TimelineEntityType.YARN_APPLICATION.toString();
     entity.setId(id);
     entity.setType(type);
-    Long cTime = 10000000000000L;
-    entity.setCreatedTime(cTime);
+    entity.setCreatedTime(startTs);
     TimelineEvent event = new TimelineEvent();
     event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
     event.setTimestamp(System.currentTimeMillis());
@@ -195,12 +194,12 @@ class TestFlowDataGenerator {
     String type = TimelineEntityType.YARN_APPLICATION.toString();
     entity.setId(id);
     entity.setType(type);
-    Long cTime = 1425016501000L;
+    long cTime = 1425016501000L;
     entity.setCreatedTime(cTime);
 
     TimelineEvent event = new TimelineEvent();
     event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    Long expTs = 1436512802000L;
+    long expTs = 1436512802000L;
     event.setTimestamp(expTs);
     String expKey = "foo_event";
     Object expVal = "test";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d8d8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
index b4a0c74..6bdec6b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
@@ -21,19 +21,16 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.Set;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Get;
@@ -42,26 +39,17 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
-import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnFamily;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -119,11 +107,13 @@ public class TestHBaseStorageFlowActivity {
     String user = "testWriteFlowRunMinMaxToHBase_user1";
     String flow = "testing_flowRun_flow_name";
     String flowVersion = "CF7022C10F1354";
-    Long runid = 1002345678919L;
+    long runid = 1002345678919L;
     String appName = "application_100000000000_1111";
+    long minStartTs = 10000000000000L;
+    long greaterStartTs = 30000000000000L;
     long endTs = 1439750690000L;
     TimelineEntity entityMinStartTime = TestFlowDataGenerator
-        .getEntityMinStartTime();
+        .getEntityMinStartTime(minStartTs);
 
     try {
       hbi = new HBaseTimelineWriterImpl(c1);
@@ -146,7 +136,7 @@ public class TestHBaseStorageFlowActivity {
 
       // writer another entity with greater start time
       TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
-          .getEntityGreaterStartTime();
+          .getEntityGreaterStartTime(greaterStartTs);
       te = new TimelineEntities();
       te.addEntity(entityGreaterStartTime);
       appName = "application_1000000000000000_2222";
@@ -181,6 +171,31 @@ public class TestHBaseStorageFlowActivity {
     assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
     assertEquals(1, values.size());
     checkFlowActivityRunId(runid, flowVersion, values);
+
+    // use the timeline reader to verify data
+    HBaseTimelineReaderImpl hbr = null;
+    try {
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
+      // get the flow activity entity
+      Set<TimelineEntity> entities =
+          hbr.getEntities(null, cluster, null, null, null,
+              TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null,
+              null, null, null, null, null, null, null, null, null);
+      assertEquals(1, entities.size());
+      for (TimelineEntity e : entities) {
+        FlowActivityEntity flowActivity = (FlowActivityEntity)e;
+        assertEquals(cluster, flowActivity.getCluster());
+        assertEquals(user, flowActivity.getUser());
+        assertEquals(flow, flowActivity.getFlowName());
+        assertEquals(dayTs, flowActivity.getDate().getTime());
+        Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns();
+        assertEquals(1, flowRuns.size());
+      }
+    } finally {
+      hbr.close();
+    }
   }
 
   /**
@@ -193,7 +208,7 @@ public class TestHBaseStorageFlowActivity {
     String user = "testWriteFlowActivityOneFlow_user1";
     String flow = "flow_activity_test_flow_name";
     String flowVersion = "A122110F135BC4";
-    Long runid = 1001111178919L;
+    long runid = 1001111178919L;
 
     TimelineEntities te = new TimelineEntities();
     TimelineEntity entityApp1 = TestFlowDataGenerator.getFlowApp1();
@@ -212,10 +227,35 @@ public class TestHBaseStorageFlowActivity {
     }
     // check flow activity
     checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1);
+
+    // use the reader to verify the data
+    HBaseTimelineReaderImpl hbr = null;
+    try {
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
+
+      Set<TimelineEntity> entities =
+          hbr.getEntities(user, cluster, flow, null, null,
+              TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null,
+              null, null, null, null, null, null, null, null, null);
+      assertEquals(1, entities.size());
+      for (TimelineEntity e : entities) {
+        FlowActivityEntity entity = (FlowActivityEntity)e;
+        NavigableSet<FlowRunEntity> flowRuns = entity.getFlowRuns();
+        assertEquals(1, flowRuns.size());
+        for (FlowRunEntity flowRun : flowRuns) {
+          assertEquals(runid, flowRun.getRunId());
+          assertEquals(flowVersion, flowRun.getVersion());
+        }
+      }
+    } finally {
+      hbr.close();
+    }
   }
 
   private void checkFlowActivityTable(String cluster, String user, String flow,
-      String flowVersion, Long runid, Configuration c1) throws IOException {
+      String flowVersion, long runid, Configuration c1) throws IOException {
     Scan s = new Scan();
     s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
     byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow);
@@ -263,7 +303,7 @@ public class TestHBaseStorageFlowActivity {
     String user = "testManyRunsFlowActivity_c_user1";
     String flow = "flow_activity_test_flow_name";
     String flowVersion1 = "A122110F135BC4";
-    Long runid1 = 11111111111L;
+    long runid1 = 11111111111L;
 
     String flowVersion2 = "A12222222222C4";
     long runid2 = 2222222222222L;
@@ -303,11 +343,50 @@ public class TestHBaseStorageFlowActivity {
     checkFlowActivityTableSeveralRuns(cluster, user, flow, c1, flowVersion1,
         runid1, flowVersion2, runid2, flowVersion3, runid3);
 
+    // use the timeline reader to verify data
+    HBaseTimelineReaderImpl hbr = null;
+    try {
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
+
+      Set<TimelineEntity> entities =
+          hbr.getEntities(null, cluster, null, null, null,
+              TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null,
+              null, null, null, null, null, null, null, null, null);
+      assertEquals(1, entities.size());
+      for (TimelineEntity e : entities) {
+        FlowActivityEntity flowActivity = (FlowActivityEntity)e;
+        assertEquals(cluster, flowActivity.getCluster());
+        assertEquals(user, flowActivity.getUser());
+        assertEquals(flow, flowActivity.getFlowName());
+        long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+            .currentTimeMillis());
+        assertEquals(dayTs, flowActivity.getDate().getTime());
+        Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns();
+        assertEquals(3, flowRuns.size());
+        for (FlowRunEntity flowRun : flowRuns) {
+          long runId = flowRun.getRunId();
+          String version = flowRun.getVersion();
+          if (runId == runid1) {
+            assertEquals(flowVersion1, version);
+          } else if (runId == runid2) {
+            assertEquals(flowVersion2, version);
+          } else if (runId == runid3) {
+            assertEquals(flowVersion3, version);
+          } else {
+            fail("unknown run id: " + runId);
+          }
+        }
+      }
+    } finally {
+      hbr.close();
+    }
   }
 
   private void checkFlowActivityTableSeveralRuns(String cluster, String user,
-      String flow, Configuration c1, String flowVersion1, Long runid1,
-      String flowVersion2, Long runid2, String flowVersion3, Long runid3)
+      String flow, Configuration c1, String flowVersion1, long runid1,
+      String flowVersion2, long runid2, String flowVersion3, long runid3)
       throws IOException {
     Scan s = new Scan();
     s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
@@ -351,7 +430,7 @@ public class TestHBaseStorageFlowActivity {
     assertEquals(1, rowCount);
   }
 
-  private void checkFlowActivityRunId(Long runid, String flowVersion,
+  private void checkFlowActivityRunId(long runid, String flowVersion,
       Map<byte[], byte[]> values) throws IOException {
     byte[] rq = ColumnHelper.getColumnQualifier(
         FlowActivityColumnPrefix.RUN_ID.getColumnPrefixBytes(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d8d8fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
index bf524ea..b0f83b7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -21,20 +21,15 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Get;
@@ -42,32 +37,16 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
-import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnFamily;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -125,11 +104,13 @@ public class TestHBaseStorageFlowRun {
     String user = "testWriteFlowRunMinMaxToHBase_user1";
     String flow = "testing_flowRun_flow_name";
     String flowVersion = "CF7022C10F1354";
-    Long runid = 1002345678919L;
+    long runid = 1002345678919L;
     String appName = "application_100000000000_1111";
+    long minStartTs = 10000000000000L;
+    long greaterStartTs = 30000000000000L;
     long endTs = 1439750690000L;
     TimelineEntity entityMinStartTime = TestFlowDataGenerator
-        .getEntityMinStartTime();
+        .getEntityMinStartTime(minStartTs);
 
     try {
       hbi = new HBaseTimelineWriterImpl(c1);
@@ -152,7 +133,7 @@ public class TestHBaseStorageFlowRun {
 
       // writer another entity with greater start time
       TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
-          .getEntityGreaterStartTime();
+          .getEntityGreaterStartTime(greaterStartTs);
       te = new TimelineEntities();
       te.addEntity(entityGreaterStartTime);
       appName = "application_1000000000000000_2222";
@@ -183,24 +164,29 @@ public class TestHBaseStorageFlowRun {
         .getBytes());
 
     assertEquals(2, r1.size());
-    Long starttime = (Long) GenericObjectMapper.read(values
+    long starttime = (Long) GenericObjectMapper.read(values
         .get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()));
-    Long expmin = entityMinStartTime.getCreatedTime();
-    assertEquals(expmin, starttime);
+    assertEquals(minStartTs, starttime);
     assertEquals(endTs, GenericObjectMapper.read(values
         .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes())));
-  }
 
-  boolean isFlowRunRowKeyCorrect(byte[] rowKey, String cluster, String user,
-      String flow, Long runid) {
-    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1);
-    assertTrue(rowKeyComponents.length == 4);
-    assertEquals(cluster, Bytes.toString(rowKeyComponents[0]));
-    assertEquals(user, Bytes.toString(rowKeyComponents[1]));
-    assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
-    assertEquals(TimelineWriterUtils.invert(runid),
-        Bytes.toLong(rowKeyComponents[3]));
-    return true;
+    // use the timeline reader to verify data
+    HBaseTimelineReaderImpl hbr = null;
+    try {
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
+      // get the flow run entity
+      TimelineEntity entity =
+          hbr.getEntity(user, cluster, flow, runid, null,
+              TimelineEntityType.YARN_FLOW_RUN.toString(), null, null);
+      assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
+      FlowRunEntity flowRun = (FlowRunEntity)entity;
+      assertEquals(minStartTs, flowRun.getStartTime());
+      assertEquals(endTs, flowRun.getMaxEndTime());
+    } finally {
+      hbr.close();
+    }
   }
 
   /**
@@ -218,7 +204,7 @@ public class TestHBaseStorageFlowRun {
     String user = "testWriteFlowRunMetricsOneFlow_user1";
     String flow = "testing_flowRun_metrics_flow_name";
     String flowVersion = "CF7022C10F1354";
-    Long runid = 1002345678919L;
+    long runid = 1002345678919L;
 
     TimelineEntities te = new TimelineEntities();
     TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1();
@@ -244,6 +230,41 @@ public class TestHBaseStorageFlowRun {
 
     // check flow run
     checkFlowRunTable(cluster, user, flow, runid, c1);
+
+    // use the timeline reader to verify data
+    HBaseTimelineReaderImpl hbr = null;
+    try {
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
+      TimelineEntity entity =
+          hbr.getEntity(user, cluster, flow, runid, null,
+            TimelineEntityType.YARN_FLOW_RUN.toString(), null, null);
+      assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
+      Set<TimelineMetric> metrics = entity.getMetrics();
+      assertEquals(2, metrics.size());
+      for (TimelineMetric metric : metrics) {
+        String id = metric.getId();
+        Map<Long, Number> values = metric.getValues();
+        assertEquals(1, values.size());
+        Number value = null;
+        for (Number n : values.values()) {
+          value = n;
+        }
+        switch (id) {
+        case metric1:
+          assertEquals(141, value);
+          break;
+        case metric2:
+          assertEquals(57, value);
+          break;
+        default:
+          fail("unrecognized metric: " + id);
+        }
+      }
+    } finally {
+      hbr.close();
+    }
   }
 
   private void checkFlowRunTable(String cluster, String user, String flow,


Mime
View raw message