hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtcarre...@apache.org
Subject [41/50] [abbrv] hadoop git commit: YARN-3863. Support complex filters in TimelineReader (Varun Saxena via sjlee)
Date Wed, 04 May 2016 23:45:54 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b2df86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
index b5fc214..2d85bab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
@@ -17,21 +17,26 @@
 
 package org.apache.hadoop.yarn.server.timelineservice.storage.common;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedSet;
-import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
@@ -39,6 +44,15 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilter.TimelineFilterType;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
@@ -53,6 +67,8 @@ public final class TimelineStorageUtils {
   private TimelineStorageUtils() {
   }
 
+  private static final Log LOG = LogFactory.getLog(TimelineStorageUtils.class);
+
   /** empty bytes. */
   public static final byte[] EMPTY_BYTES = new byte[0];
 
@@ -312,6 +328,21 @@ public final class TimelineStorageUtils {
   }
 
   /**
+   * Check if we have a certain field amongst fields to retrieve. This method
+   * checks against {@link Field#ALL} as well because that would mean field
+   * passed needs to be matched.
+   *
+   * @param fieldsToRetrieve fields to be retrieved.
+   * @param requiredField fields to be checked in fieldsToRetrieve.
+   * @return true if has the required field, false otherwise.
+   */
+  public static boolean hasField(EnumSet<Field> fieldsToRetrieve,
+      Field requiredField) {
+    return fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(requiredField);
+  }
+
+  /**
    * Checks if the input TimelineEntity object is an ApplicationEntity.
    *
    * @param te TimelineEntity object.
@@ -385,87 +416,317 @@ public final class TimelineStorageUtils {
   }
 
   /**
+   * Matches key-values filter. Used for relatesTo/isRelatedTo filters.
    *
-   * @param entityRelations the relations of an entity
-   * @param relationFilters the relations for filtering
-   * @return a boolean flag to indicate if both match
+   * @param entity entity which holds relatesTo/isRelatedTo relations which we
+   *     will match against.
+   * @param keyValuesFilter key-values filter.
+   * @param entityFiltersType type of filters we are trying to match.
+   * @return true, if filter matches, false otherwise.
    */
-  public static boolean matchRelations(
-      Map<String, Set<String>> entityRelations,
-      Map<String, Set<String>> relationFilters) {
-    for (Map.Entry<String, Set<String>> relation : relationFilters.entrySet()) {
-      Set<String> ids = entityRelations.get(relation.getKey());
-      if (ids == null) {
+  private static boolean matchKeyValuesFilter(TimelineEntity entity,
+      TimelineKeyValuesFilter keyValuesFilter,
+      TimelineEntityFiltersType entityFiltersType) {
+    Map<String, Set<String>> relations = null;
+    if (entityFiltersType == TimelineEntityFiltersType.IS_RELATED_TO) {
+      relations = entity.getIsRelatedToEntities();
+    } else if (entityFiltersType == TimelineEntityFiltersType.RELATES_TO) {
+      relations = entity.getRelatesToEntities();
+    }
+    if (relations == null) {
+      return false;
+    }
+    Set<String> ids = relations.get(keyValuesFilter.getKey());
+    if (ids == null) {
+      return false;
+    }
+    boolean matched = false;
+    for (Object id : keyValuesFilter.getValues()) {
+      // Matches if id is found amongst the relationships for an entity and
+      // filter's compare op is EQUAL.
+      // If compare op is NOT_EQUAL, for a match to occur, id should not be
+      // found amongst relationships for an entity.
+      matched = !(ids.contains(id) ^
+          keyValuesFilter.getCompareOp() == TimelineCompareOp.EQUAL);
+      if (!matched) {
         return false;
       }
-      for (String id : relation.getValue()) {
-        if (!ids.contains(id)) {
-          return false;
-        }
-      }
     }
     return true;
   }
 
   /**
+   * Matches relatesto.
    *
-   * @param map the map of key/value pairs in an entity
-   * @param filters the map of key/value pairs for filtering
-   * @return a boolean flag to indicate if both match
+   * @param entity entity which holds relatesto relations.
+   * @param relatesTo the relations for filtering.
+   * @return true, if filter matches, false otherwise.
+   * @throws IOException if an unsupported filter for matching relations is
+   *     being matched.
    */
-  public static boolean matchFilters(Map<String, ? extends Object> map,
-      Map<String, ? extends Object> filters) {
-    for (Map.Entry<String, ? extends Object> filter : filters.entrySet()) {
-      Object value = map.get(filter.getKey());
-      if (value == null) {
-        return false;
-      }
-      if (!value.equals(filter.getValue())) {
-        return false;
-      }
+  public static boolean matchRelatesTo(TimelineEntity entity,
+      TimelineFilterList relatesTo) throws IOException {
+    return matchFilters(
+        entity, relatesTo, TimelineEntityFiltersType.RELATES_TO);
+  }
+
+  /**
+   * Matches isrelatedto.
+   *
+   * @param entity entity which holds isRelatedTo relations.
+   * @param isRelatedTo the relations for filtering.
+   * @return true, if filter matches, false otherwise.
+   * @throws IOException if an unsupported filter for matching relations is
+   *     being matched.
+   */
+  public static boolean matchIsRelatedTo(TimelineEntity entity,
+      TimelineFilterList isRelatedTo) throws IOException {
+    return matchFilters(
+        entity, isRelatedTo, TimelineEntityFiltersType.IS_RELATED_TO);
+  }
+
+  /**
+   * Matches key-value filter. Used for config and info filters.
+   *
+   * @param entity entity which holds the config/info which we will match
+   *     against.
+   * @param kvFilter a key-value filter.
+   * @param entityFiltersType type of filters we are trying to match.
+   * @return true, if filter matches, false otherwise.
+   */
+  private static boolean matchKeyValueFilter(TimelineEntity entity,
+      TimelineKeyValueFilter kvFilter,
+      TimelineEntityFiltersType entityFiltersType) {
+    Map<String, ? extends Object> map = null;
+    // Supported only for config and info filters.
+    if (entityFiltersType == TimelineEntityFiltersType.CONFIG) {
+      map = entity.getConfigs();
+    } else if (entityFiltersType == TimelineEntityFiltersType.INFO) {
+      map = entity.getInfo();
     }
-    return true;
+    if (map == null) {
+      return false;
+    }
+    Object value = map.get(kvFilter.getKey());
+    if (value == null) {
+      return false;
+    }
+    // Matches if filter's value is equal to the value of the key and filter's
+    // compare op is EQUAL.
+    // If compare op is NOT_EQUAL, for a match to occur, value should not be
+    // equal to the value of the key.
+    return !(value.equals(kvFilter.getValue()) ^
+        kvFilter.getCompareOp() == TimelineCompareOp.EQUAL);
+  }
+
+  /**
+   * Matches config filters.
+   *
+   * @param entity entity which holds a map of config key-value pairs.
+   * @param configFilters list of info filters.
+   * @return a boolean flag to indicate if both match.
+   * @throws IOException if an unsupported filter for matching config filters is
+   *     being matched.
+   */
+  public static boolean matchConfigFilters(TimelineEntity entity,
+      TimelineFilterList configFilters) throws IOException {
+    return
+        matchFilters(entity, configFilters, TimelineEntityFiltersType.CONFIG);
+  }
+
+  /**
+   * Matches info filters.
+   *
+   * @param entity entity which holds a map of info key-value pairs.
+   * @param infoFilters list of info filters.
+   * @return a boolean flag to indicate if both match.
+   * @throws IOException if an unsupported filter for matching info filters is
+   *     being matched.
+   */
+  public static boolean matchInfoFilters(TimelineEntity entity,
+      TimelineFilterList infoFilters) throws IOException {
+    return matchFilters(entity, infoFilters, TimelineEntityFiltersType.INFO);
   }
 
   /**
+   * Matches exists filter. Used for event filters.
    *
-   * @param entityEvents the set of event objects in an entity
-   * @param eventFilters the set of event Ids for filtering
-   * @return a boolean flag to indicate if both match
+   * @param entity entity which holds the events which we will match against.
+   * @param existsFilter exists filter.
+   * @param entityFiltersType type of filters we are trying to match.
+   * @return true, if filter matches, false otherwise.
    */
-  public static boolean matchEventFilters(Set<TimelineEvent> entityEvents,
-      Set<String> eventFilters) {
+  private static boolean matchExistsFilter(TimelineEntity entity,
+      TimelineExistsFilter existsFilter,
+      TimelineEntityFiltersType entityFiltersType) {
+    // Currently exists filter is only supported for event filters.
+    if (entityFiltersType != TimelineEntityFiltersType.EVENT) {
+      return false;
+    }
     Set<String> eventIds = new HashSet<String>();
-    for (TimelineEvent event : entityEvents) {
+    for (TimelineEvent event : entity.getEvents()) {
       eventIds.add(event.getId());
     }
-    for (String eventFilter : eventFilters) {
-      if (!eventIds.contains(eventFilter)) {
-        return false;
-      }
+    // Matches if filter's value is contained in the list of events filter's
+    // compare op is EQUAL.
+    // If compare op is NOT_EQUAL, for a match to occur, value should not be
+    // contained in the list of events.
+    return !(eventIds.contains(existsFilter.getValue()) ^
+        existsFilter.getCompareOp() == TimelineCompareOp.EQUAL);
+  }
+
+  /**
+   * Matches event filters.
+   *
+   * @param entity entity which holds a set of event objects.
+   * @param eventFilters the set of event Ids for filtering.
+   * @return a boolean flag to indicate if both match.
+   * @throws IOException if an unsupported filter for matching event filters is
+   *     being matched.
+   */
+  public static boolean matchEventFilters(TimelineEntity entity,
+      TimelineFilterList eventFilters) throws IOException {
+    return matchFilters(entity, eventFilters, TimelineEntityFiltersType.EVENT);
+  }
+
+  /**
+   * Compare two values based on comparison operator.
+   *
+   * @param compareOp comparison operator.
+   * @param val1 value 1.
+   * @param val2 value 2.
+   * @return true, if relation matches, false otherwise
+   */
+  private static boolean compareValues(TimelineCompareOp compareOp,
+      long val1, long val2) {
+    switch (compareOp) {
+    case LESS_THAN:
+      return val1 < val2;
+    case LESS_OR_EQUAL:
+      return val1 <= val2;
+    case EQUAL:
+      return val1 == val2;
+    case NOT_EQUAL:
+      return val1 != val2;
+    case GREATER_OR_EQUAL:
+      return val1 >= val2;
+    case GREATER_THAN:
+      return val1 > val2;
+    default:
+      throw new RuntimeException("Unknown TimelineCompareOp " +
+          compareOp.name());
     }
-    return true;
   }
 
   /**
+   * Matches compare filter. Used for metric filters.
    *
-   * @param metrics the set of metric objects in an entity
-   * @param metricFilters the set of metric Ids for filtering
-   * @return a boolean flag to indicate if both match
+   * @param entity entity which holds the metrics which we will match against.
+   * @param compareFilter compare filter.
+   * @param entityFiltersType type of filters we are trying to match.
+   * @return true, if filter matches, false otherwise.
+   * @throws IOException if metric filters holds non integral values.
    */
-  public static boolean matchMetricFilters(Set<TimelineMetric> metrics,
-      Set<String> metricFilters) {
-    Set<String> metricIds = new HashSet<String>();
-    for (TimelineMetric metric : metrics) {
-      metricIds.add(metric.getId());
+  private static boolean matchCompareFilter(TimelineEntity entity,
+      TimelineCompareFilter compareFilter,
+      TimelineEntityFiltersType entityFiltersType) throws IOException {
+    // Currently exists filter is only supported for metric filters.
+    if (entityFiltersType != TimelineEntityFiltersType.METRIC) {
+      return false;
+    }
+    // We expect only integral values(short/int/long) for metric filters.
+    if (!isIntegralValue(compareFilter.getValue())) {
+      throw new IOException("Metric filters has non integral values");
+    }
+    Map<String, TimelineMetric> metricMap =
+        new HashMap<String, TimelineMetric>();
+    for (TimelineMetric metric : entity.getMetrics()) {
+      metricMap.put(metric.getId(), metric);
     }
+    TimelineMetric metric = metricMap.get(compareFilter.getKey());
+    if (metric == null) {
+      return false;
+    }
+    // We will be using the latest value of metric to compare.
+    return compareValues(compareFilter.getCompareOp(),
+        metric.getValuesJAXB().firstEntry().getValue().longValue(),
+        ((Number)compareFilter.getValue()).longValue());
+  }
 
-    for (String metricFilter : metricFilters) {
-      if (!metricIds.contains(metricFilter)) {
-        return false;
+  /**
+   * Matches metric filters.
+   *
+   * @param entity entity which holds a set of metric objects.
+   * @param metricFilters list of metric filters.
+   * @return a boolean flag to indicate if both match.
+   * @throws IOException if an unsupported filter for matching metric filters is
+   *     being matched.
+   */
+  public static boolean matchMetricFilters(TimelineEntity entity,
+      TimelineFilterList metricFilters) throws IOException {
+    return matchFilters(
+        entity, metricFilters, TimelineEntityFiltersType.METRIC);
+  }
+
+  /**
+   * Common routine to match different filters. Iterates over a filter list and
+   * calls routines based on filter type.
+   *
+   * @param entity Timeline entity.
+   * @param filters filter list.
+   * @param entityFiltersType type of filters which are being matched.
+   * @return a boolean flag to indicate if filter matches.
+   * @throws IOException if an unsupported filter for matching this specific
+   *     filter is being matched.
+   */
+  private static boolean matchFilters(TimelineEntity entity,
+      TimelineFilterList filters, TimelineEntityFiltersType entityFiltersType)
+      throws IOException {
+    if (filters == null || filters.getFilterList().isEmpty()) {
+      return false;
+    }
+    TimelineFilterList.Operator operator = filters.getOperator();
+    for (TimelineFilter filter : filters.getFilterList()) {
+      TimelineFilterType filterType = filter.getFilterType();
+      if (!entityFiltersType.isValidFilter(filterType)) {
+        throw new IOException("Unsupported filter " + filterType);
+      }
+      boolean matched = false;
+      switch (filterType) {
+      case LIST:
+        matched = matchFilters(entity, (TimelineFilterList)filter,
+            entityFiltersType);
+        break;
+      case COMPARE:
+        matched = matchCompareFilter(entity, (TimelineCompareFilter)filter,
+            entityFiltersType);
+        break;
+      case EXISTS:
+        matched = matchExistsFilter(entity, (TimelineExistsFilter)filter,
+            entityFiltersType);
+        break;
+      case KEY_VALUE:
+        matched = matchKeyValueFilter(entity, (TimelineKeyValueFilter)filter,
+            entityFiltersType);
+        break;
+      case KEY_VALUES:
+        matched = matchKeyValuesFilter(entity, (TimelineKeyValuesFilter)filter,
+            entityFiltersType);
+        break;
+      default:
+        throw new IOException("Unsupported filter " + filterType);
+      }
+      if (!matched) {
+        if(operator == TimelineFilterList.Operator.AND) {
+          return false;
+        }
+      } else {
+        if(operator == TimelineFilterList.Operator.OR) {
+          return true;
+        }
       }
     }
-    return true;
+    return operator == TimelineFilterList.Operator.AND;
   }
 
   /**
@@ -530,4 +791,100 @@ public final class TimelineStorageUtils {
     }
     return appId;
   }
+
+  /**
+   * Helper method for reading relationship.
+   *
+   * @param <T> Describes the type of column prefix.
+   * @param entity entity to fill.
+   * @param result result from HBase.
+   * @param prefix column prefix.
+   * @param isRelatedTo if true, means relationship is to be added to
+   *     isRelatedTo, otherwise its added to relatesTo.
+   * @throws IOException if any problem is encountered while reading result.
+   */
+  public static <T> void readRelationship(
+      TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
+      boolean isRelatedTo) throws IOException {
+    // isRelatedTo and relatesTo are of type Map<String, Set<String>>
+    Map<String, Object> columns = prefix.readResults(result);
+    for (Map.Entry<String, Object> column : columns.entrySet()) {
+      for (String id : Separator.VALUES.splitEncoded(
+          column.getValue().toString())) {
+        if (isRelatedTo) {
+          entity.addIsRelatedToEntity(column.getKey(), id);
+        } else {
+          entity.addRelatesToEntity(column.getKey(), id);
+        }
+      }
+    }
+  }
+
+  /**
+   * Helper method for reading key-value pairs for either info or config.
+   *
+   * @param <T> Describes the type of column prefix.
+   * @param entity entity to fill.
+   * @param result result from HBase.
+   * @param prefix column prefix.
+   * @param isConfig if true, means we are reading configs, otherwise info.
+   * @throws IOException if any problem is encountered while reading result.
+   */
+  public static <T> void readKeyValuePairs(
+      TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
+      boolean isConfig) throws IOException {
+    // info and configuration are of type Map<String, Object or String>
+    Map<String, Object> columns = prefix.readResults(result);
+    if (isConfig) {
+      for (Map.Entry<String, Object> column : columns.entrySet()) {
+        entity.addConfig(column.getKey(), column.getValue().toString());
+      }
+    } else {
+      entity.addInfo(columns);
+    }
+  }
+
+  /**
+   * Read events from the entity table or the application table. The column name
+   * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted
+   * if there is no info associated with the event.
+   *
+   * @param <T> Describes the type of column prefix.
+   * @param entity entity to fill.
+   * @param result HBase Result.
+   * @param prefix column prefix.
+   * @throws IOException if any problem is encountered while reading result.
+   */
+  public static <T> void readEvents(TimelineEntity entity, Result result,
+      ColumnPrefix<T> prefix) throws IOException {
+    Map<String, TimelineEvent> eventsMap = new HashMap<>();
+    Map<?, Object> eventsResult =
+        prefix.readResultsHavingCompoundColumnQualifiers(result);
+    for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) {
+      byte[][] karr = (byte[][])eventResult.getKey();
+      // the column name is of the form "eventId=timestamp=infoKey"
+      if (karr.length == 3) {
+        String id = Bytes.toString(karr[0]);
+        long ts = TimelineStorageUtils.invertLong(Bytes.toLong(karr[1]));
+        String key = Separator.VALUES.joinEncoded(id, Long.toString(ts));
+        TimelineEvent event = eventsMap.get(key);
+        if (event == null) {
+          event = new TimelineEvent();
+          event.setId(id);
+          event.setTimestamp(ts);
+          eventsMap.put(key, event);
+        }
+        // handle empty info
+        String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]);
+        if (infoKey != null) {
+          event.addInfo(infoKey, eventResult.getValue());
+        }
+      } else {
+        LOG.warn("incorrectly formatted column name: it will be discarded");
+        continue;
+      }
+    }
+    Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
+    entity.addEvents(eventsSet);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b2df86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
index f47ba93..775879a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
@@ -24,8 +24,11 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
 
 /**
@@ -46,7 +49,8 @@ public enum EntityColumn implements Column<EntityTable> {
   /**
    * When the entity was created.
    */
-  CREATED_TIME(EntityColumnFamily.INFO, "created_time"),
+  CREATED_TIME(EntityColumnFamily.INFO, "created_time",
+      LongConverter.getInstance()),
 
   /**
    * The version of the flow that this entity belongs to.
@@ -60,12 +64,17 @@ public enum EntityColumn implements Column<EntityTable> {
 
   EntityColumn(ColumnFamily<EntityTable> columnFamily,
       String columnQualifier) {
+    this(columnFamily, columnQualifier, GenericConverter.getInstance());
+  }
+
+  EntityColumn(ColumnFamily<EntityTable> columnFamily,
+      String columnQualifier, ValueConverter converter) {
     this.columnFamily = columnFamily;
     this.columnQualifier = columnQualifier;
     // Future-proof by ensuring the right column prefix hygiene.
     this.columnQualifierBytes =
         Bytes.toBytes(Separator.SPACE.encode(columnQualifier));
-    this.column = new ColumnHelper<EntityTable>(columnFamily);
+    this.column = new ColumnHelper<EntityTable>(columnFamily, converter);
   }
 
   /**
@@ -108,6 +117,21 @@ public enum EntityColumn implements Column<EntityTable> {
     return null;
   }
 
+  @Override
+  public byte[] getColumnQualifierBytes() {
+    return columnQualifierBytes.clone();
+  }
+
+  @Override
+  public byte[] getColumnFamilyBytes() {
+    return columnFamily.getBytes();
+  }
+
+  @Override
+  public ValueConverter getValueConverter() {
+    return column.getValueConverter();
+  }
+
   /**
    * Retrieve an {@link EntityColumn} given a name, or null if there is no
    * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b2df86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
index f3c7e7f..de2b29d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
@@ -56,7 +56,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
   /**
    * Lifecycle events for an entity.
    */
-  EVENT(EntityColumnFamily.INFO, "e"),
+  EVENT(EntityColumnFamily.INFO, "e", true),
 
   /**
    * Config column stores configuration with config key as the column name.
@@ -78,6 +78,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
    */
   private final String columnPrefix;
   private final byte[] columnPrefixBytes;
+  private final boolean compoundColQual;
 
   /**
    * Private constructor, meant to be used by the enum definition.
@@ -87,7 +88,18 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
    */
   EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily,
       String columnPrefix) {
-    this(columnFamily, columnPrefix, GenericConverter.getInstance());
+    this(columnFamily, columnPrefix, false, GenericConverter.getInstance());
+  }
+
+  EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily,
+      String columnPrefix, boolean compondColQual) {
+    this(columnFamily, columnPrefix, compondColQual,
+        GenericConverter.getInstance());
+  }
+
+  EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily,
+      String columnPrefix, ValueConverter converter) {
+    this(columnFamily, columnPrefix, false, converter);
   }
 
   /**
@@ -99,7 +111,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
    * this column prefix.
    */
   EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily,
-      String columnPrefix, ValueConverter converter) {
+      String columnPrefix, boolean compondColQual, ValueConverter converter) {
     column = new ColumnHelper<EntityTable>(columnFamily, converter);
     this.columnFamily = columnFamily;
     this.columnPrefix = columnPrefix;
@@ -110,6 +122,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
       this.columnPrefixBytes =
           Bytes.toBytes(Separator.SPACE.encode(columnPrefix));
     }
+    this.compoundColQual = compondColQual;
   }
 
   /**
@@ -131,6 +144,24 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
         this.columnPrefixBytes, qualifierPrefix);
   }
 
+  @Override
+  public byte[] getColumnFamilyBytes() {
+    return columnFamily.getBytes();
+  }
+
+  @Override
+  public ValueConverter getValueConverter() {
+    return column.getValueConverter();
+  }
+
+  public byte[] getCompoundColQualBytes(String qualifier,
+      byte[]...components) {
+    if (!compoundColQual) {
+      return ColumnHelper.getColumnQualifier(null, qualifier);
+    }
+    return ColumnHelper.getCompoundColumnQualifierBytes(qualifier, components);
+  }
+
   /*
    * (non-Javadoc)
    *
@@ -287,5 +318,4 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
     // Default to null
     return null;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b2df86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
index a5933da..188c2fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
 
 /**
  * Identifies partially qualified columns for the {@link FlowActivityTable}.
@@ -50,6 +51,7 @@ public enum FlowActivityColumnPrefix
    */
   private final String columnPrefix;
   private final byte[] columnPrefixBytes;
+  private final boolean compoundColQual;
 
   private final AggregationOperation aggOp;
 
@@ -64,6 +66,12 @@ public enum FlowActivityColumnPrefix
   private FlowActivityColumnPrefix(
       ColumnFamily<FlowActivityTable> columnFamily, String columnPrefix,
       AggregationOperation aggOp) {
+    this(columnFamily, columnPrefix, aggOp, false);
+  }
+
+  private FlowActivityColumnPrefix(
+      ColumnFamily<FlowActivityTable> columnFamily, String columnPrefix,
+      AggregationOperation aggOp, boolean compoundColQual) {
     column = new ColumnHelper<FlowActivityTable>(columnFamily);
     this.columnFamily = columnFamily;
     this.columnPrefix = columnPrefix;
@@ -75,6 +83,7 @@ public enum FlowActivityColumnPrefix
           .encode(columnPrefix));
     }
     this.aggOp = aggOp;
+    this.compoundColQual = compoundColQual;
   }
 
   /**
@@ -100,6 +109,16 @@ public enum FlowActivityColumnPrefix
     return columnPrefixBytes.clone();
   }
 
+  @Override
+  public byte[] getColumnFamilyBytes() {
+    return columnFamily.getBytes();
+  }
+
+  @Override
+  public ValueConverter getValueConverter() {
+    return column.getValueConverter();
+  }
+
   public AggregationOperation getAttribute() {
     return aggOp;
   }
@@ -251,4 +270,20 @@ public enum FlowActivityColumnPrefix
     column.store(rowKey, tableMutator, columnQualifier, null, inputValue,
         combinedAttributes);
   }
+
+  @Override
+  public byte[] getCompoundColQualBytes(String qualifier,
+      byte[]...components) {
+    if (!compoundColQual) {
+      return ColumnHelper.getColumnQualifier(null, qualifier);
+    }
+    return ColumnHelper.getCompoundColumnQualifierBytes(qualifier, components);
+  }
+
+  @Override
+  public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result)
+      throws IOException {
+    // There are no compound column qualifiers for flow activity table.
+    return null;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b2df86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
index d50bb16..f1553b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
@@ -86,10 +86,12 @@ public enum FlowRunColumn implements Column<FlowRunTable> {
     return columnQualifier;
   }
 
+  @Override
   public byte[] getColumnQualifierBytes() {
     return columnQualifierBytes.clone();
   }
 
+  @Override
   public byte[] getColumnFamilyBytes() {
     return columnFamily.getBytes();
   }
@@ -144,6 +146,7 @@ public enum FlowRunColumn implements Column<FlowRunTable> {
     return null;
   }
 
+  @Override
   public ValueConverter getValueConverter() {
     return column.getValueConverter();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b2df86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
index fa94aae..77f2ab2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
@@ -52,6 +52,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
    */
   private final String columnPrefix;
   private final byte[] columnPrefixBytes;
+  private final boolean compoundColQual;
 
   private final AggregationOperation aggOp;
 
@@ -65,6 +66,12 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
    */
   private FlowRunColumnPrefix(ColumnFamily<FlowRunTable> columnFamily,
       String columnPrefix, AggregationOperation fra, ValueConverter converter) {
+    this(columnFamily, columnPrefix, fra, converter, false);
+  }
+
+  private FlowRunColumnPrefix(ColumnFamily<FlowRunTable> columnFamily,
+      String columnPrefix, AggregationOperation fra, ValueConverter converter,
+      boolean compoundColQual) {
     column = new ColumnHelper<FlowRunTable>(columnFamily, converter);
     this.columnFamily = columnFamily;
     this.columnPrefix = columnPrefix;
@@ -76,6 +83,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
           .encode(columnPrefix));
     }
     this.aggOp = fra;
+    this.compoundColQual = compoundColQual;
   }
 
   /**
@@ -101,6 +109,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
         this.columnPrefixBytes, qualifierPrefix);
   }
 
+  @Override
   public byte[] getColumnFamilyBytes() {
     return columnFamily.getBytes();
   }
@@ -222,6 +231,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
     return null;
   }
 
+  @Override
   public ValueConverter getValueConverter() {
     return column.getValueConverter();
   }
@@ -257,4 +267,20 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
     // Default to null
     return null;
   }
+
+  @Override
+  public byte[] getCompoundColQualBytes(String qualifier,
+      byte[]...components) {
+    if (!compoundColQual) {
+      return ColumnHelper.getColumnQualifier(null, qualifier);
+    }
+    return ColumnHelper.getCompoundColumnQualifierBytes(qualifier, components);
+  }
+
+  @Override
+  public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result)
+      throws IOException {
+    // There are no compound column qualifiers for flow run table.
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b2df86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
index 6baea37..0ace529 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -200,6 +200,7 @@ class FlowScanner implements RegionScanner, Closeable {
     int addedCnt = 0;
     long currentTimestamp = System.currentTimeMillis();
     ValueConverter converter = null;
+
     while (cellLimit <= 0 || addedCnt < cellLimit) {
       cell = peekAtNextCell(cellLimit);
       if (cell == null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b2df86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
index 0de09e0..53210f8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
 
 import java.io.IOException;
 import java.util.EnumSet;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Connection;
@@ -27,7 +28,6 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
-import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.FamilyFilter;
 import org.apache.hadoop.hbase.filter.FilterList;
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
 import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
@@ -76,93 +77,231 @@ class ApplicationEntityReader extends GenericEntityReader {
     return APPLICATION_TABLE;
   }
 
+  /**
+   * This method is called only for multiple entity reads.
+   */
   @Override
-  protected FilterList constructFilterListBasedOnFields() {
-    FilterList list = new FilterList(Operator.MUST_PASS_ONE);
-    TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
-    // Fetch all the columns.
-    if (dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) &&
-        (dataToRetrieve.getConfsToRetrieve() == null ||
-        dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty()) &&
-        (dataToRetrieve.getMetricsToRetrieve() == null ||
-        dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty())) {
-      return list;
+  protected FilterList constructFilterListBasedOnFilters() throws IOException {
+    // Filters here cannot be null for multiple entity reads as they are set in
+    // augmentParams if null.
+    TimelineEntityFilters filters = getFilters();
+    FilterList listBasedOnFilters = new FilterList();
+    // Create filter list based on created time range and add it to
+    // listBasedOnFilters.
+    long createdTimeBegin = filters.getCreatedTimeBegin();
+    long createdTimeEnd = filters.getCreatedTimeEnd();
+    if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) {
+      listBasedOnFilters.addFilter(
+          TimelineFilterUtils.createSingleColValueFiltersByRange(
+          ApplicationColumn.CREATED_TIME, createdTimeBegin, createdTimeEnd));
     }
-    FilterList infoColFamilyList = new FilterList();
-    // By default fetch everything in INFO column family.
-    FamilyFilter infoColumnFamily =
-        new FamilyFilter(CompareOp.EQUAL,
-           new BinaryComparator(ApplicationColumnFamily.INFO.getBytes()));
-    infoColFamilyList.addFilter(infoColumnFamily);
+    // Create filter list based on metric filters and add it to
+    // listBasedOnFilters.
+    TimelineFilterList metricFilters = filters.getMetricFilters();
+    if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) {
+      listBasedOnFilters.addFilter(
+          TimelineFilterUtils.createHBaseFilterList(
+              ApplicationColumnPrefix.METRIC, metricFilters));
+    }
+    // Create filter list based on config filters and add it to
+    // listBasedOnFilters.
+    TimelineFilterList configFilters = filters.getConfigFilters();
+    if (configFilters != null && !configFilters.getFilterList().isEmpty()) {
+      listBasedOnFilters.addFilter(
+          TimelineFilterUtils.createHBaseFilterList(
+              ApplicationColumnPrefix.CONFIG, configFilters));
+    }
+    // Create filter list based on info filters and add it to listBasedOnFilters
+    TimelineFilterList infoFilters = filters.getInfoFilters();
+    if (infoFilters != null && !infoFilters.getFilterList().isEmpty()) {
+      listBasedOnFilters.addFilter(
+          TimelineFilterUtils.createHBaseFilterList(
+              ApplicationColumnPrefix.INFO, infoFilters));
+    }
+    return listBasedOnFilters;
+  }
+
+  /**
+   * Add {@link QualifierFilter} filters to filter list for each column of
+   * application table.
+   *
+   * @param list filter list to which qualifier filters have to be added.
+   */
+  @Override
+  protected void updateFixedColumns(FilterList list) {
+    for (ApplicationColumn column : ApplicationColumn.values()) {
+      list.addFilter(new QualifierFilter(CompareOp.EQUAL,
+          new BinaryComparator(column.getColumnQualifierBytes())));
+    }
+  }
+
+  /**
+   * Creates a filter list which indicates that only some of the column
+   * qualifiers in the info column family will be returned in result.
+   *
+   * @return filter list.
+   * @throws IOException if any problem occurs while creating filter list.
+   */
+  private FilterList createFilterListForColsOfInfoFamily()
+      throws IOException {
+    FilterList infoFamilyColsFilter = new FilterList(Operator.MUST_PASS_ONE);
+    // Add filters for each column in entity table.
+    updateFixedColumns(infoFamilyColsFilter);
+    EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
+    // If INFO field has to be retrieved, add a filter for fetching columns
+    // with INFO column prefix.
+    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
+      infoFamilyColsFilter.addFilter(
+          TimelineFilterUtils.createHBaseQualifierFilter(
+              CompareOp.EQUAL, ApplicationColumnPrefix.INFO));
+    }
+    TimelineFilterList relatesTo = getFilters().getRelatesTo();
+    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
+      // If RELATES_TO field has to be retrieved, add a filter for fetching
+      // columns with RELATES_TO column prefix.
+      infoFamilyColsFilter.addFilter(
+          TimelineFilterUtils.createHBaseQualifierFilter(
+              CompareOp.EQUAL, ApplicationColumnPrefix.RELATES_TO));
+    } else if (relatesTo != null && !relatesTo.getFilterList().isEmpty()) {
+      // Even if fields to retrieve does not contain RELATES_TO, we still
+      // need to have a filter to fetch some of the column qualifiers if
+      // relatesTo filters are specified. relatesTo filters will then be
+      // matched after fetching rows from HBase.
+      Set<String> relatesToCols =
+          TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo);
+      infoFamilyColsFilter.addFilter(
+          TimelineFilterUtils.createFiltersFromColumnQualifiers(
+              ApplicationColumnPrefix.RELATES_TO, relatesToCols));
+    }
+    TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo();
+    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
+      // If IS_RELATED_TO field has to be retrieved, add a filter for fetching
+      // columns with IS_RELATED_TO column prefix.
+      infoFamilyColsFilter.addFilter(
+          TimelineFilterUtils.createHBaseQualifierFilter(
+              CompareOp.EQUAL, ApplicationColumnPrefix.IS_RELATED_TO));
+    } else if (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty()) {
+      // Even if fields to retrieve does not contain IS_RELATED_TO, we still
+      // need to have a filter to fetch some of the column qualifiers if
+      // isRelatedTo filters are specified. isRelatedTo filters will then be
+      // matched after fetching rows from HBase.
+      Set<String> isRelatedToCols =
+          TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo);
+      infoFamilyColsFilter.addFilter(
+          TimelineFilterUtils.createFiltersFromColumnQualifiers(
+              ApplicationColumnPrefix.IS_RELATED_TO, isRelatedToCols));
+    }
+    TimelineFilterList eventFilters = getFilters().getEventFilters();
+    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
+      // If EVENTS field has to be retrieved, add a filter for fetching columns
+      // with EVENT column prefix.
+      infoFamilyColsFilter.addFilter(
+          TimelineFilterUtils.createHBaseQualifierFilter(
+              CompareOp.EQUAL, ApplicationColumnPrefix.EVENT));
+    } else if (eventFilters != null && !eventFilters.getFilterList().isEmpty()){
+      // Even if fields to retrieve does not contain EVENTS, we still need to
+      // have a filter to fetch some of the column qualifiers on the basis of
+      // event filters specified. Event filters will then be matched after
+      // fetching rows from HBase.
+      Set<String> eventCols =
+          TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters);
+      infoFamilyColsFilter.addFilter(
+          TimelineFilterUtils.createFiltersFromColumnQualifiers(
+              ApplicationColumnPrefix.EVENT, eventCols));
+    }
+    return infoFamilyColsFilter;
+  }
+
+  /**
+   * Exclude column prefixes via filters which are not required(based on fields
+   * to retrieve) from info column family. These filters are added to filter
+   * list which contains a filter for getting info column family.
+   *
+   * @param infoColFamilyList filter list for info column family.
+   */
+  private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) {
+    EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
     // Events not required.
-    TimelineEntityFilters filters = getFilters();
-    if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.EVENTS) &&
-        !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) &&
-        (isSingleEntityRead() || filters.getEventFilters() == null)) {
+    if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
       infoColFamilyList.addFilter(
-          new QualifierFilter(CompareOp.NOT_EQUAL,
-          new BinaryPrefixComparator(
-          ApplicationColumnPrefix.EVENT.getColumnPrefixBytes(""))));
+          TimelineFilterUtils.createHBaseQualifierFilter(
+              CompareOp.NOT_EQUAL, ApplicationColumnPrefix.EVENT));
     }
     // info not required.
-    if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.INFO) &&
-        !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) &&
-        (isSingleEntityRead() || filters.getInfoFilters() == null)) {
+    if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
       infoColFamilyList.addFilter(
-          new QualifierFilter(CompareOp.NOT_EQUAL,
-          new BinaryPrefixComparator(
-          ApplicationColumnPrefix.INFO.getColumnPrefixBytes(""))));
+          TimelineFilterUtils.createHBaseQualifierFilter(
+              CompareOp.NOT_EQUAL, ApplicationColumnPrefix.INFO));
     }
-    // is releated to not required.
-    if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.IS_RELATED_TO) &&
-        !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) &&
-        (isSingleEntityRead() || filters.getIsRelatedTo() == null)) {
+    // is related to not required.
+    if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
       infoColFamilyList.addFilter(
-          new QualifierFilter(CompareOp.NOT_EQUAL,
-          new BinaryPrefixComparator(
-          ApplicationColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes(""))));
+          TimelineFilterUtils.createHBaseQualifierFilter(
+              CompareOp.NOT_EQUAL, ApplicationColumnPrefix.IS_RELATED_TO));
     }
     // relates to not required.
-    if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.RELATES_TO) &&
-        !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) &&
-        (isSingleEntityRead() || filters.getRelatesTo() == null)) {
+    if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
       infoColFamilyList.addFilter(
-          new QualifierFilter(CompareOp.NOT_EQUAL,
-          new BinaryPrefixComparator(
-          ApplicationColumnPrefix.RELATES_TO.getColumnPrefixBytes(""))));
+          TimelineFilterUtils.createHBaseQualifierFilter(
+              CompareOp.NOT_EQUAL, ApplicationColumnPrefix.RELATES_TO));
     }
-    list.addFilter(infoColFamilyList);
-    if ((dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS) ||
-        (!isSingleEntityRead() && filters.getConfigFilters() != null)) ||
-        (dataToRetrieve.getConfsToRetrieve() != null &&
-        !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty())) {
-      FilterList filterCfg =
-          new FilterList(new FamilyFilter(CompareOp.EQUAL,
-          new BinaryComparator(ApplicationColumnFamily.CONFIGS.getBytes())));
-      if (dataToRetrieve.getConfsToRetrieve() != null &&
-          !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty()) {
-        filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList(
-            ApplicationColumnPrefix.CONFIG,
-            dataToRetrieve.getConfsToRetrieve()));
-      }
-      list.addFilter(filterCfg);
+  }
+
+  /**
+   * Updates filter list based on fields for confs and metrics to retrieve.
+   *
+   * @param listBasedOnFields filter list based on fields.
+   * @throws IOException if any problem occurs while updating filter list.
+   */
+  private void updateFilterForConfsAndMetricsToRetrieve(
+      FilterList listBasedOnFields) throws IOException {
+    TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
+    // Please note that if confsToRetrieve is specified, we would have added
+    // CONFS to fields to retrieve in augmentParams() even if not specified.
+    if (dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS)) {
+      // Create a filter list for configs.
+      listBasedOnFields.addFilter(TimelineFilterUtils.
+          createFilterForConfsOrMetricsToRetrieve(
+              dataToRetrieve.getConfsToRetrieve(),
+              ApplicationColumnFamily.CONFIGS, ApplicationColumnPrefix.CONFIG));
     }
-    if ((dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS) ||
-        (!isSingleEntityRead() && filters.getMetricFilters() != null)) ||
-        (dataToRetrieve.getMetricsToRetrieve() != null &&
-        !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty())) {
-      FilterList filterMetrics =
-          new FilterList(new FamilyFilter(CompareOp.EQUAL,
-          new BinaryComparator(ApplicationColumnFamily.METRICS.getBytes())));
-      if (dataToRetrieve.getMetricsToRetrieve() != null &&
-          !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty()) {
-        filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList(
-            ApplicationColumnPrefix.METRIC,
-            dataToRetrieve.getMetricsToRetrieve()));
-      }
-      list.addFilter(filterMetrics);
+
+    // Please note that if metricsToRetrieve is specified, we would have added
+    // METRICS to fields to retrieve in augmentParams() even if not specified.
+    if (dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS)) {
+      // Create a filter list for metrics.
+      listBasedOnFields.addFilter(TimelineFilterUtils.
+          createFilterForConfsOrMetricsToRetrieve(
+              dataToRetrieve.getMetricsToRetrieve(),
+              ApplicationColumnFamily.METRICS, ApplicationColumnPrefix.METRIC));
+    }
+  }
+
+  @Override
+  protected FilterList constructFilterListBasedOnFields() throws IOException {
+    if (!needCreateFilterListBasedOnFields()) {
+      // Fetch all the columns. No need of a filter.
+      return null;
+    }
+    FilterList listBasedOnFields = new FilterList(Operator.MUST_PASS_ONE);
+    FilterList infoColFamilyList = new FilterList();
+    // By default fetch everything in INFO column family.
+    FamilyFilter infoColumnFamily =
+        new FamilyFilter(CompareOp.EQUAL,
+            new BinaryComparator(ApplicationColumnFamily.INFO.getBytes()));
+    infoColFamilyList.addFilter(infoColumnFamily);
+    if (!isSingleEntityRead() && fetchPartialColsFromInfoFamily()) {
+      // We can fetch only some of the columns from info family.
+      infoColFamilyList.addFilter(createFilterListForColsOfInfoFamily());
+    } else {
+      // Exclude column prefixes in info column family which are not required
+      // based on fields to retrieve.
+      excludeFieldsFromInfoColFamily(infoColFamilyList);
     }
-    return list;
+    listBasedOnFields.addFilter(infoColFamilyList);
+
+    updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields);
+    return listBasedOnFields;
   }
 
   @Override
@@ -182,6 +321,9 @@ class ApplicationEntityReader extends GenericEntityReader {
 
   @Override
   protected void validateParams() {
+    Preconditions.checkNotNull(getContext(), "context shouldn't be null");
+    Preconditions.checkNotNull(
+        getDataToRetrieve(), "data to retrieve shouldn't be null");
     Preconditions.checkNotNull(getContext().getClusterId(),
         "clusterId shouldn't be null");
     Preconditions.checkNotNull(getContext().getEntityType(),
@@ -202,6 +344,7 @@ class ApplicationEntityReader extends GenericEntityReader {
       throws IOException {
     TimelineReaderContext context = getContext();
     if (isSingleEntityRead()) {
+      // Get flow context information from AppToFlow table.
       if (context.getFlowName() == null || context.getFlowRunId() == null ||
           context.getUserId() == null) {
         FlowContext flowContext = lookupFlowContext(
@@ -211,7 +354,12 @@ class ApplicationEntityReader extends GenericEntityReader {
         context.setUserId(flowContext.getUserId());
       }
     }
+    // Add configs/metrics to fields to retrieve if confsToRetrieve and/or
+    // metricsToRetrieve are specified.
     getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve();
+    if (!isSingleEntityRead()) {
+      createFiltersIfNull();
+    }
   }
 
   @Override
@@ -252,114 +400,84 @@ class ApplicationEntityReader extends GenericEntityReader {
     Number createdTime =
         (Number)ApplicationColumn.CREATED_TIME.readResult(result);
     entity.setCreatedTime(createdTime.longValue());
-    if (!isSingleEntityRead() &&
-        (entity.getCreatedTime() < filters.getCreatedTimeBegin() ||
-        entity.getCreatedTime() > filters.getCreatedTimeEnd())) {
-      return null;
-    }
+
     EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
-    // fetch is related to entities
+    // fetch is related to entities and match isRelatedTo filter. If isRelatedTo
+    // filters do not match, entity would be dropped. We have to match filters
+    // locally as relevant HBase filters to filter out rows on the basis of
+    // isRelatedTo are not set in HBase scan.
     boolean checkIsRelatedTo =
-        filters != null && filters.getIsRelatedTo() != null &&
-        filters.getIsRelatedTo().size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
-      readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO,
-          true);
-      if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations(
-          entity.getIsRelatedToEntities(), filters.getIsRelatedTo())) {
+        !isSingleEntityRead() && filters.getIsRelatedTo() != null &&
+        filters.getIsRelatedTo().getFilterList().size() > 0;
+    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO) ||
+        checkIsRelatedTo) {
+      TimelineStorageUtils.readRelationship(
+          entity, result, ApplicationColumnPrefix.IS_RELATED_TO, true);
+      if (checkIsRelatedTo && !TimelineStorageUtils.matchIsRelatedTo(entity,
+          filters.getIsRelatedTo())) {
         return null;
       }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
+      if (!TimelineStorageUtils.hasField(fieldsToRetrieve,
+          Field.IS_RELATED_TO)) {
         entity.getIsRelatedToEntities().clear();
       }
     }
 
-    // fetch relates to entities
+    // fetch relates to entities and match relatesTo filter. If relatesTo
+    // filters do not match, entity would be dropped. We have to match filters
+    // locally as relevant HBase filters to filter out rows on the basis of
+    // relatesTo are not set in HBase scan.
     boolean checkRelatesTo =
-        filters != null && filters.getRelatesTo() != null &&
-        filters.getRelatesTo().size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
-      readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO,
-          false);
-      if (checkRelatesTo && !TimelineStorageUtils.matchRelations(
-          entity.getRelatesToEntities(), filters.getRelatesTo())) {
+        !isSingleEntityRead() && filters.getRelatesTo() != null &&
+        filters.getRelatesTo().getFilterList().size() > 0;
+    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO) ||
+        checkRelatesTo) {
+      TimelineStorageUtils.readRelationship(
+          entity, result, ApplicationColumnPrefix.RELATES_TO, false);
+      if (checkRelatesTo && !TimelineStorageUtils.matchRelatesTo(entity,
+          filters.getRelatesTo())) {
         return null;
       }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.RELATES_TO)) {
+      if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
         entity.getRelatesToEntities().clear();
       }
     }
 
-    // fetch info
-    boolean checkInfo = filters != null && filters.getInfoFilters() != null &&
-        filters.getInfoFilters().size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
-      readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false);
-      if (checkInfo &&
-          !TimelineStorageUtils.matchFilters(
-          entity.getInfo(), filters.getInfoFilters())) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.INFO)) {
-        entity.getInfo().clear();
-      }
+    // fetch info if fieldsToRetrieve contains INFO or ALL.
+    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
+      TimelineStorageUtils.readKeyValuePairs(
+          entity, result, ApplicationColumnPrefix.INFO, false);
     }
 
-    // fetch configs
-    boolean checkConfigs =
-        filters != null && filters.getConfigFilters() != null &&
-        filters.getConfigFilters().size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
-      readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true);
-      if (checkConfigs && !TimelineStorageUtils.matchFilters(
-          entity.getConfigs(), filters.getConfigFilters())) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.CONFIGS)) {
-        entity.getConfigs().clear();
-      }
+    // fetch configs if fieldsToRetrieve contains CONFIGS or ALL.
+    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.CONFIGS)) {
+      TimelineStorageUtils.readKeyValuePairs(
+          entity, result, ApplicationColumnPrefix.CONFIG, true);
     }
 
-    // fetch events
+    // fetch events and match event filters if they exist. If event filters do
+    // not match, entity would be dropped. We have to match filters locally
+    // as relevant HBase filters to filter out rows on the basis of events
+    // are not set in HBase scan.
     boolean checkEvents =
-        filters != null && filters.getEventFilters() != null &&
-        filters.getEventFilters().size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
-      readEvents(entity, result, true);
-      if (checkEvents && !TimelineStorageUtils.matchEventFilters(
-          entity.getEvents(), filters.getEventFilters())) {
+        !isSingleEntityRead() && filters.getEventFilters() != null &&
+        filters.getEventFilters().getFilterList().size() > 0;
+    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS) ||
+        checkEvents) {
+      TimelineStorageUtils.readEvents(
+          entity, result, ApplicationColumnPrefix.EVENT);
+      if (checkEvents && !TimelineStorageUtils.matchEventFilters(entity,
+          filters.getEventFilters())) {
         return null;
       }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.EVENTS)) {
+      if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
         entity.getEvents().clear();
       }
     }
 
-    // fetch metrics
-    boolean checkMetrics =
-        filters != null && filters.getMetricFilters() != null &&
-        filters.getMetricFilters().size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
+    // fetch metrics if fieldsToRetrieve contains METRICS or ALL.
+    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.METRICS)) {
       readMetrics(entity, result, ApplicationColumnPrefix.METRIC);
-      if (checkMetrics && !TimelineStorageUtils.matchMetricFilters(
-          entity.getMetrics(), filters.getMetricFilters())) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.METRICS)) {
-        entity.getMetrics().clear();
-      }
     }
     return entity;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b2df86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
index 0d2bdd8..d8ca038 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
 
 import java.io.IOException;
@@ -75,6 +76,12 @@ class FlowActivityEntityReader extends TimelineEntityReader {
   @Override
   protected void augmentParams(Configuration hbaseConf, Connection conn)
       throws IOException {
+    createFiltersIfNull();
+  }
+
+  @Override
+  protected FilterList constructFilterListBasedOnFilters() throws IOException {
+    return null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b2df86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
index 743315c..b2de2e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
@@ -38,9 +38,11 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
 import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
@@ -76,6 +78,9 @@ class FlowRunEntityReader extends TimelineEntityReader {
 
   @Override
   protected void validateParams() {
+    Preconditions.checkNotNull(getContext(), "context shouldn't be null");
+    Preconditions.checkNotNull(
+        getDataToRetrieve(), "data to retrieve shouldn't be null");
     Preconditions.checkNotNull(getContext().getClusterId(),
         "clusterId shouldn't be null");
     Preconditions.checkNotNull(getContext().getUserId(),
@@ -90,37 +95,87 @@ class FlowRunEntityReader extends TimelineEntityReader {
 
   @Override
   protected void augmentParams(Configuration hbaseConf, Connection conn) {
+    // Add metrics to fields to retrieve if metricsToRetrieve is specified.
     getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve();
+    if (!isSingleEntityRead()) {
+      createFiltersIfNull();
+    }
+  }
+
+  protected FilterList constructFilterListBasedOnFilters() throws IOException {
+    FilterList listBasedOnFilters = new FilterList();
+    // Filter based on created time range.
+    Long createdTimeBegin = getFilters().getCreatedTimeBegin();
+    Long createdTimeEnd = getFilters().getCreatedTimeEnd();
+    if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) {
+      listBasedOnFilters.addFilter(
+          TimelineFilterUtils.createSingleColValueFiltersByRange(
+          FlowRunColumn.MIN_START_TIME, createdTimeBegin, createdTimeEnd));
+    }
+    // Filter based on metric filters.
+    TimelineFilterList metricFilters = getFilters().getMetricFilters();
+    if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) {
+      listBasedOnFilters.addFilter(
+          TimelineFilterUtils.createHBaseFilterList(
+              FlowRunColumnPrefix.METRIC, metricFilters));
+    }
+    return listBasedOnFilters;
+  }
+
+  /**
+   * Add {@link QualifierFilter} filters to filter list for each column of
+   * flow run table.
+   *
+   * @return filter list to which qualifier filters have been added.
+   */
+  private FilterList updateFixedColumns() {
+    FilterList columnsList = new FilterList(Operator.MUST_PASS_ONE);
+    for (FlowRunColumn column : FlowRunColumn.values()) {
+      columnsList.addFilter(new QualifierFilter(CompareOp.EQUAL,
+          new BinaryComparator(column.getColumnQualifierBytes())));
+    }
+    return columnsList;
   }
 
   @Override
-  protected FilterList constructFilterListBasedOnFields() {
+  protected FilterList constructFilterListBasedOnFields() throws IOException {
     FilterList list = new FilterList(Operator.MUST_PASS_ONE);
-
     // By default fetch everything in INFO column family.
     FamilyFilter infoColumnFamily =
         new FamilyFilter(CompareOp.EQUAL,
            new BinaryComparator(FlowRunColumnFamily.INFO.getBytes()));
     TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
-    // Metrics not required.
-    if (!isSingleEntityRead() &&
-        !dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS) &&
-        !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL)) {
+    // If multiple entities have to be retrieved, check if metrics have to be
+    // retrieved and if not, add a filter so that metrics can be excluded.
+    // Metrics are always returned if we are reading a single entity.
+    if (!isSingleEntityRead() && !TimelineStorageUtils.hasField(
+        dataToRetrieve.getFieldsToRetrieve(), Field.METRICS)) {
       FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE);
       infoColFamilyList.addFilter(infoColumnFamily);
       infoColFamilyList.addFilter(
           new QualifierFilter(CompareOp.NOT_EQUAL,
           new BinaryPrefixComparator(
-          FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(""))));
-      list.addFilter(infoColFamilyList);
-    }
-    if (dataToRetrieve.getMetricsToRetrieve() != null &&
-        !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty()) {
-      FilterList infoColFamilyList = new FilterList();
-      infoColFamilyList.addFilter(infoColumnFamily);
-      infoColFamilyList.addFilter(TimelineFilterUtils.createHBaseFilterList(
-          FlowRunColumnPrefix.METRIC, dataToRetrieve.getMetricsToRetrieve()));
+              FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(""))));
       list.addFilter(infoColFamilyList);
+    } else {
+      // Check if metricsToRetrieve are specified and if they are, create a
+      // filter list for info column family by adding flow run tables columns
+      // and a list for metrics to retrieve. Pls note that fieldsToRetrieve
+      // will have METRICS added to it if metricsToRetrieve are specified
+      // (in augmentParams()).
+      TimelineFilterList metricsToRetrieve =
+          dataToRetrieve.getMetricsToRetrieve();
+      if (metricsToRetrieve != null &&
+          !metricsToRetrieve.getFilterList().isEmpty()) {
+        FilterList infoColFamilyList = new FilterList();
+        infoColFamilyList.addFilter(infoColumnFamily);
+        FilterList columnsList = updateFixedColumns();
+        columnsList.addFilter(
+            TimelineFilterUtils.createHBaseFilterList(
+                FlowRunColumnPrefix.METRIC, metricsToRetrieve));
+        infoColFamilyList.addFilter(columnsList);
+        list.addFilter(infoColFamilyList);
+      }
     }
     return list;
   }
@@ -175,11 +230,6 @@ class FlowRunEntityReader extends TimelineEntityReader {
     if (startTime != null) {
       flowRun.setStartTime(startTime.longValue());
     }
-    if (!isSingleEntityRead() &&
-        (flowRun.getStartTime() < getFilters().getCreatedTimeBegin() ||
-        flowRun.getStartTime() > getFilters().getCreatedTimeEnd())) {
-      return null;
-    }
 
     // read the end time if available
     Long endTime = (Long)FlowRunColumn.MAX_END_TIME.readResult(result);
@@ -193,9 +243,10 @@ class FlowRunEntityReader extends TimelineEntityReader {
       flowRun.setVersion(version);
     }
 
-    // read metrics
-    if (isSingleEntityRead() ||
-        getDataToRetrieve().getFieldsToRetrieve().contains(Field.METRICS)) {
+    // read metrics if its a single entity query or if METRICS are part of
+    // fieldsToRetrieve.
+    if (isSingleEntityRead() || TimelineStorageUtils.hasField(
+        getDataToRetrieve().getFieldsToRetrieve(), Field.METRICS)) {
       readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message