hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sunc...@apache.org
Subject [2/2] hive git commit: HIVE-15057: Nested column pruning: support all operators (Chao Sun, reviewed by Ferdinand Xu)
Date Sat, 03 Dec 2016 19:43:10 GMT
HIVE-15057: Nested column pruning: support all operators (Chao Sun, reviewed by Ferdinand Xu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a625bb03
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a625bb03
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a625bb03

Branch: refs/heads/master
Commit: a625bb0393b00fb803e11fbc684944e7e39b66d3
Parents: 2feaa5d
Author: Chao Sun <sunchao@apache.org>
Authored: Sat Dec 3 11:42:20 2016 -0800
Committer: Chao Sun <sunchao@apache.org>
Committed: Sat Dec 3 11:42:28 2016 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/exec/MapOperator.java |   72 +-
 .../parquet/read/DataWritableReadSupport.java   |   24 +-
 .../serde/ArrayWritableObjectInspector.java     |   18 +-
 .../ql/io/parquet/serde/ParquetHiveSerDe.java   |   55 +-
 .../hive/ql/optimizer/ColumnPrunerProcCtx.java  |  297 ++---
 .../ql/optimizer/ColumnPrunerProcFactory.java   |  248 ++--
 .../hadoop/hive/ql/optimizer/FieldNode.java     |  134 ++-
 .../RewriteQueryUsingAggregateIndexCtx.java     |    3 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |    4 +-
 .../hadoop/hive/ql/plan/ExprNodeColumnDesc.java |   50 +-
 .../apache/hadoop/hive/ql/plan/TableDesc.java   |    4 +
 .../read/TestDataWritableReadSupport.java       |   16 +-
 .../ql/optimizer/TestColumnPrunerProcCtx.java   |   91 +-
 .../clientpositive/nested_column_pruning.q      |  112 ++
 .../clientpositive/nested_column_pruning.q.out  | 1072 ++++++++++++++++++
 .../hive/serde2/ColumnProjectionUtils.java      |   18 +-
 16 files changed, 1815 insertions(+), 403 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/a625bb03/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
index 4bdd3c9..2a46b30 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeStats;
@@ -178,7 +179,6 @@ public class MapOperator extends AbstractMapOperator {
         SerDeUtils.createOverlayedProperties(td.getProperties(), pd.getProperties());
 
     Map<String, String> partSpec = pd.getPartSpec();
-
     opCtx.tableName = String.valueOf(overlayedProps.getProperty("name"));
     opCtx.partName = String.valueOf(partSpec);
     opCtx.deserializer = pd.getDeserializer(hconf);
@@ -279,19 +279,20 @@ public class MapOperator extends AbstractMapOperator {
    * and P1's schema is same as T, whereas P2's scheme is different from T, conversion
    * might be needed for both P1 and P2, since SettableOI might be needed for T
    */
-  private Map<TableDesc, StructObjectInspector> getConvertedOI(Configuration hconf)
+  private Map<TableDesc, StructObjectInspector> getConvertedOI(Map<String, Configuration> tableToConf)
       throws HiveException {
     Map<TableDesc, StructObjectInspector> tableDescOI =
         new HashMap<TableDesc, StructObjectInspector>();
     Set<TableDesc> identityConverterTableDesc = new HashSet<TableDesc>();
+
     try {
       Map<ObjectInspector, Boolean> oiSettableProperties = new HashMap<ObjectInspector, Boolean>();
 
       for (Path onefile : conf.getPathToAliases().keySet()) {
         PartitionDesc pd = conf.getPathToPartitionInfo().get(onefile);
         TableDesc tableDesc = pd.getTableDesc();
+        Configuration hconf = tableToConf.get(tableDesc.getTableName());
         Deserializer partDeserializer = pd.getDeserializer(hconf);
-
         StructObjectInspector partRawRowObjectInspector;
         boolean isAcid = AcidUtils.isTablePropertyTransactional(tableDesc.getProperties());
         if (Utilities.isSchemaEvolutionEnabled(hconf, isAcid) && Utilities.isInputFileFormatSelfDescribing(pd)) {
@@ -329,6 +330,58 @@ public class MapOperator extends AbstractMapOperator {
     return tableDescOI;
   }
 
+  /**
+   * For each source table, combine the nested column pruning information from all its
+   * table scan descriptors and set it in a configuration copy. This is necessary since
+   * the configuration property "READ_NESTED_COLUMN_PATH_CONF_STR" is set on a per-table
+   * basis, so we can't just use a single configuration for all the tables.
+   */
+  private Map<String, Configuration> cloneConfsForNestedColPruning(Configuration hconf) {
+    Map<String, Configuration> tableNameToConf = new HashMap<>();
+
+    for (Map.Entry<Path, ArrayList<String>> e : conf.getPathToAliases().entrySet()) {
+      List<String> aliases = e.getValue();
+      if (aliases == null || aliases.isEmpty()) {
+        continue;
+      }
+
+      String tableName = conf.getPathToPartitionInfo().get(e.getKey()).getTableName();
+      for (String alias: aliases) {
+        Operator<?> rootOp = conf.getAliasToWork().get(alias);
+        if (!(rootOp instanceof TableScanOperator)) {
+          continue;
+        }
+        TableScanDesc tableScanDesc = ((TableScanOperator) rootOp).getConf();
+        List<String> nestedColumnPaths = tableScanDesc.getNeededNestedColumnPaths();
+        if (nestedColumnPaths == null || nestedColumnPaths.isEmpty()) {
+          continue;
+        }
+        if (!tableNameToConf.containsKey(tableName)) {
+          Configuration clonedConf = new Configuration(hconf);
+          clonedConf.unset(ColumnProjectionUtils.READ_NESTED_COLUMN_PATH_CONF_STR);
+          tableNameToConf.put(tableName, clonedConf);
+        }
+        Configuration newConf = tableNameToConf.get(tableName);
+        ColumnProjectionUtils.appendNestedColumnPaths(newConf, nestedColumnPaths);
+      }
+    }
+
+    // Assign tables without nested column pruning info to the default conf
+    for (PartitionDesc pd : conf.getPathToPartitionInfo().values()) {
+      if (!tableNameToConf.containsKey(pd.getTableName())) {
+        tableNameToConf.put(pd.getTableName(), hconf);
+      }
+    }
+
+    for (PartitionDesc pd: conf.getAliasToPartnInfo().values()) {
+      if (!tableNameToConf.containsKey(pd.getTableName())) {
+        tableNameToConf.put(pd.getTableName(), hconf);
+      }
+    }
+
+    return tableNameToConf;
+  }
+
   /*
    * This is the same as the setChildren method below but for empty tables.
    * It takes care of the following:
@@ -339,15 +392,19 @@ public class MapOperator extends AbstractMapOperator {
   public void initEmptyInputChildren(List<Operator<?>> children, Configuration hconf)
     throws SerDeException, Exception {
     setChildOperators(children);
+
+    Map<String, Configuration> tableNameToConf = cloneConfsForNestedColPruning(hconf);
+
     for (Operator<?> child : children) {
       TableScanOperator tsOp = (TableScanOperator) child;
       StructObjectInspector soi = null;
       PartitionDesc partDesc = conf.getAliasToPartnInfo().get(tsOp.getConf().getAlias());
+      Configuration newConf = tableNameToConf.get(partDesc.getTableDesc().getTableName());
       Deserializer serde = partDesc.getTableDesc().getDeserializer();
       partDesc.setProperties(partDesc.getProperties());
       MapOpCtx opCtx = new MapOpCtx(tsOp.getConf().getAlias(), child, partDesc);
       StructObjectInspector tableRowOI = (StructObjectInspector) serde.getObjectInspector();
-      initObjectInspector(hconf, opCtx, tableRowOI);
+      initObjectInspector(newConf, opCtx, tableRowOI);
       soi = opCtx.rowObjectInspector;
       child.getParentOperators().add(this);
       childrenOpToOI.put(child, soi);
@@ -359,12 +416,15 @@ public class MapOperator extends AbstractMapOperator {
     List<Operator<? extends OperatorDesc>> children =
         new ArrayList<Operator<? extends OperatorDesc>>();
 
-    Map<TableDesc, StructObjectInspector> convertedOI = getConvertedOI(hconf);
+    Map<String, Configuration> tableNameToConf = cloneConfsForNestedColPruning(hconf);
+    Map<TableDesc, StructObjectInspector> convertedOI = getConvertedOI(tableNameToConf);
 
     for (Map.Entry<Path, ArrayList<String>> entry : conf.getPathToAliases().entrySet()) {
       Path onefile = entry.getKey();
       List<String> aliases = entry.getValue();
       PartitionDesc partDesc = conf.getPathToPartitionInfo().get(onefile);
+      TableDesc tableDesc = partDesc.getTableDesc();
+      Configuration newConf = tableNameToConf.get(tableDesc.getTableName());
 
       for (String alias : aliases) {
         Operator<? extends OperatorDesc> op = conf.getAliasToWork().get(alias);
@@ -381,7 +441,7 @@ public class MapOperator extends AbstractMapOperator {
         }
         MapOpCtx context = new MapOpCtx(alias, op, partDesc);
         StructObjectInspector tableRowOI = convertedOI.get(partDesc.getTableDesc());
-        contexts.put(op, initObjectInspector(hconf, context, tableRowOI));
+        contexts.put(op, initObjectInspector(newConf, context, tableRowOI));
 
         if (children.contains(op) == false) {
           op.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>(1));

http://git-wip-us.apache.org/repos/asf/hive/blob/a625bb03/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java
index 16064b2..604cbbc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java
@@ -228,7 +228,7 @@ public class DataWritableReadSupport extends ReadSupport<ArrayWritable> {
     MessageType schema,
     List<String> colNames,
     List<Integer> colIndexes,
-    List<String> nestedColumnPaths) {
+    Set<String> nestedColumnPaths) {
     List<Type> schemaTypes = new ArrayList<Type>();
 
     Map<String, FieldNode> prunedCols = getPrunedNestedColumns(nestedColumnPaths);
@@ -236,7 +236,8 @@ public class DataWritableReadSupport extends ReadSupport<ArrayWritable> {
       if (i < colNames.size()) {
         if (i < schema.getFieldCount()) {
           Type t = schema.getType(i);
-          if (!prunedCols.containsKey(t.getName())) {
+          String tn = t.getName().toLowerCase();
+          if (!prunedCols.containsKey(tn)) {
             schemaTypes.add(schema.getType(i));
           } else {
             if (t.isPrimitive()) {
@@ -245,7 +246,7 @@ public class DataWritableReadSupport extends ReadSupport<ArrayWritable> {
             } else {
               // For group type, we need to build the projected group type with required leaves
               List<Type> g =
-                projectLeafTypes(Arrays.asList(t), Arrays.asList(prunedCols.get(t.getName())));
+                projectLeafTypes(Arrays.asList(t), Arrays.asList(prunedCols.get(tn)));
               if (!g.isEmpty()) {
                 schemaTypes.addAll(g);
               }
@@ -264,20 +265,19 @@ public class DataWritableReadSupport extends ReadSupport<ArrayWritable> {
 
   /**
    * Return the columns which contains required nested attribute level
-   * e.g.
-   * Given struct a <x:int, y:int> and a is required while y is not, so the method will return a
-   * who contains the attribute x
+   * E.g., given struct a:<x:int, y:int> while 'x' is required and 'y' is not, the method will return
+   * a pruned struct for 'a' which only contains the attribute 'x'
    *
    * @param nestedColPaths the paths for required nested attribute
-   * @return column list contains required nested attribute
+   * @return a map from the column to its selected nested column paths, of which the keys are all lower-cased.
    */
-  private static Map<String, FieldNode> getPrunedNestedColumns(List<String> nestedColPaths) {
+  private static Map<String, FieldNode> getPrunedNestedColumns(Set<String> nestedColPaths) {
     Map<String, FieldNode> resMap = new HashMap<>();
     if (nestedColPaths.isEmpty()) {
       return resMap;
     }
     for (String s : nestedColPaths) {
-      String c = StringUtils.split(s, '.')[0];
+      String c = StringUtils.split(s, '.')[0].toLowerCase();
       if (!resMap.containsKey(c)) {
         FieldNode f = NestedColumnFieldPruningUtils.addNodeByPath(null, s);
         resMap.put(c, f);
@@ -306,10 +306,10 @@ public class DataWritableReadSupport extends ReadSupport<ArrayWritable> {
     }
     Map<String, FieldNode> fieldMap = new HashMap<>();
     for (FieldNode n : nodes) {
-      fieldMap.put(n.getFieldName(), n);
+      fieldMap.put(n.getFieldName().toLowerCase(), n);
     }
     for (Type type : types) {
-      String tn = type.getName();
+      String tn = type.getName().toLowerCase();
 
       if (fieldMap.containsKey(tn)) {
         FieldNode f = fieldMap.get(tn);
@@ -373,7 +373,7 @@ public class DataWritableReadSupport extends ReadSupport<ArrayWritable> {
       contextMetadata.put(PARQUET_COLUMN_INDEX_ACCESS, String.valueOf(indexAccess));
       this.hiveTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList);
 
-      List<String> groupPaths = ColumnProjectionUtils.getNestedColumnPaths(configuration);
+      Set<String> groupPaths = ColumnProjectionUtils.getNestedColumnPaths(configuration);
       List<Integer> indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration);
       if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) {
         MessageType requestedSchemaByUser = getProjectedSchema(tableSchema, columnNamesList,

http://git-wip-us.apache.org/repos/asf/hive/blob/a625bb03/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ArrayWritableObjectInspector.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ArrayWritableObjectInspector.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ArrayWritableObjectInspector.java
index d4f20fc..f350035 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ArrayWritableObjectInspector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ArrayWritableObjectInspector.java
@@ -72,14 +72,20 @@ public class ArrayWritableObjectInspector extends SettableStructObjectInspector
       final String name = fieldNames.get(i);
       final TypeInfo fieldInfo = fieldInfos.get(i);
 
-      StructFieldImpl field;
-      if (prunedTypeInfo != null && prunedTypeInfo.getAllStructFieldNames().indexOf(name) >= 0) {
-        int adjustedIndex = prunedTypeInfo.getAllStructFieldNames().indexOf(name);
-        TypeInfo prunedFieldInfo = prunedTypeInfo.getAllStructFieldTypeInfos().get(adjustedIndex);
-        field = new StructFieldImpl(name, getObjectInspector(fieldInfo, prunedFieldInfo), i, adjustedIndex);
-      } else {
+      StructFieldImpl field = null;
+      if (prunedTypeInfo != null) {
+        for (int idx = 0; idx < prunedTypeInfo.getAllStructFieldNames().size(); ++idx) {
+          if (prunedTypeInfo.getAllStructFieldNames().get(idx).equalsIgnoreCase(name)) {
+            TypeInfo prunedFieldInfo = prunedTypeInfo.getAllStructFieldTypeInfos().get(idx);
+            field = new StructFieldImpl(name, getObjectInspector(fieldInfo, prunedFieldInfo), i, idx);
+            break;
+          }
+        }
+      }
+      if (field == null) {
         field = new StructFieldImpl(name, getObjectInspector(fieldInfo, null), i, i);
       }
+
       fields.add(field);
       fieldsByName.put(name.toLowerCase(), field);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/a625bb03/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java
index ef79760..a124938 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java
@@ -22,6 +22,7 @@ import java.util.Properties;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.optimizer.FieldNode;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
@@ -116,8 +117,9 @@ public class ParquetHiveSerDe extends AbstractSerDe {
         (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
     StructTypeInfo prunedTypeInfo = null;
     if (conf != null) {
-      String prunedColumnPaths = conf.get(ColumnProjectionUtils.READ_NESTED_COLUMN_PATH_CONF_STR);
-      if (prunedColumnPaths != null) {
+      String rawPrunedColumnPaths = conf.get(ColumnProjectionUtils.READ_NESTED_COLUMN_PATH_CONF_STR);
+      if (rawPrunedColumnPaths != null) {
+        List<String> prunedColumnPaths = processRawPrunedPaths(rawPrunedColumnPaths);
         prunedTypeInfo = pruneFromPaths(completeTypeInfo, prunedColumnPaths);
       }
     }
@@ -177,30 +179,43 @@ public class ParquetHiveSerDe extends AbstractSerDe {
   }
 
   /**
+   * Given a list of raw pruned paths separated by ',', return a list of merged pruned paths.
+   * For instance, if the 'prunedPaths' is "s.a, s, s", this returns ["s"].
+   */
+  private static List<String> processRawPrunedPaths(String prunedPaths) {
+    List<FieldNode> fieldNodes = new ArrayList<>();
+    for (String p : prunedPaths.split(",")) {
+      fieldNodes = FieldNode.mergeFieldNodes(fieldNodes, FieldNode.fromPath(p));
+    }
+    List<String> prunedPathList = new ArrayList<>();
+    for (FieldNode fn : fieldNodes) {
+      prunedPathList.addAll(fn.toPaths());
+    }
+    return prunedPathList;
+  }
+
+  /**
    * Given a complete struct type info and pruned paths containing selected fields
    * from the type info, return a pruned struct type info only with the selected fields.
    *
    * For instance, if 'originalTypeInfo' is: s:struct<a:struct<b:int, c:boolean>, d:string>
-   *   and 'prunedPaths' is "s.a.b,s.d", then the result will be:
+   *   and 'prunedPaths' is ["s.a.b,s.d"], then the result will be:
    *   s:struct<a:struct<b:int>, d:string>
    *
    * @param originalTypeInfo the complete struct type info
    * @param prunedPaths a string representing the pruned paths, separated by ','
    * @return the pruned struct type info
    */
-  private StructTypeInfo pruneFromPaths(
-      StructTypeInfo originalTypeInfo, String prunedPaths) {
+  private static StructTypeInfo pruneFromPaths(
+      StructTypeInfo originalTypeInfo, List<String> prunedPaths) {
     PrunedStructTypeInfo prunedTypeInfo = new PrunedStructTypeInfo(originalTypeInfo);
-
-    String[] prunedPathList = prunedPaths.split(",");
-    for (String path : prunedPathList) {
+    for (String path : prunedPaths) {
       pruneFromSinglePath(prunedTypeInfo, path);
     }
-
     return prunedTypeInfo.prune();
   }
 
-  private void pruneFromSinglePath(PrunedStructTypeInfo prunedInfo, String path) {
+  private static void pruneFromSinglePath(PrunedStructTypeInfo prunedInfo, String path) {
     Preconditions.checkArgument(prunedInfo != null,
       "PrunedStructTypeInfo for path " + path + " should not be null");
 
@@ -212,7 +227,7 @@ public class ParquetHiveSerDe extends AbstractSerDe {
     String fieldName = path.substring(0, index);
     prunedInfo.markSelected(fieldName);
     if (index < path.length()) {
-      pruneFromSinglePath(prunedInfo.children.get(fieldName), path.substring(index + 1));
+      pruneFromSinglePath(prunedInfo.getChild(fieldName), path.substring(index + 1));
     }
   }
 
@@ -228,16 +243,22 @@ public class ParquetHiveSerDe extends AbstractSerDe {
       for (int i = 0; i < typeInfo.getAllStructFieldTypeInfos().size(); ++i) {
         TypeInfo ti = typeInfo.getAllStructFieldTypeInfos().get(i);
         if (ti.getCategory() == Category.STRUCT) {
-          this.children.put(typeInfo.getAllStructFieldNames().get(i),
+          this.children.put(typeInfo.getAllStructFieldNames().get(i).toLowerCase(),
               new PrunedStructTypeInfo((StructTypeInfo) ti));
         }
       }
     }
 
+    PrunedStructTypeInfo getChild(String fieldName) {
+      return children.get(fieldName.toLowerCase());
+    }
+
     void markSelected(String fieldName) {
-      int index = typeInfo.getAllStructFieldNames().indexOf(fieldName);
-      if (index >= 0) {
-        selected[index] = true;
+      for (int i = 0; i < typeInfo.getAllStructFieldNames().size(); ++i) {
+        if (typeInfo.getAllStructFieldNames().get(i).equalsIgnoreCase(fieldName)) {
+          selected[i] = true;
+          break;
+        }
       }
     }
 
@@ -250,8 +271,8 @@ public class ParquetHiveSerDe extends AbstractSerDe {
         String fn = oldNames.get(i);
         if (selected[i]) {
           newNames.add(fn);
-          if (children.containsKey(fn)) {
-            newTypes.add(children.get(fn).prune());
+          if (children.containsKey(fn.toLowerCase())) {
+            newTypes.add(children.get(fn.toLowerCase()).prune());
           } else {
             newTypes.add(oldTypes.get(i));
           }

http://git-wip-us.apache.org/repos/asf/hive/blob/a625bb03/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java
index 4364298..e9af7a7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java
@@ -19,11 +19,10 @@
 package org.apache.hadoop.hive.ql.optimizer;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
@@ -34,111 +33,78 @@ import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeColumnListDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import static org.apache.hadoop.hive.ql.optimizer.FieldNode.mergeFieldNodes;
 
 /**
  * This class implements the processor context for Column Pruner.
  */
 public class ColumnPrunerProcCtx implements NodeProcessorCtx {
-
   private final ParseContext pctx;
 
-  private final Map<Operator<? extends OperatorDesc>, List<String>> prunedColLists;
-
   /**
-   * This map stores the pruned nested column path for each operator
+   * A mapping from operators to nested column paths being used in them.
+   * Note: paths are of format "s.a.b" which represents field "b" of
+   *   struct "a" is being used, while "a" itself is a field of struct "s".
    */
-  private final Map<Operator<? extends OperatorDesc>, List<String>> prunedNestedColLists;
-
-  private final Map<CommonJoinOperator, Map<Byte, List<String>>> joinPrunedColLists;
-
-  private final Map<UnionOperator, List<Integer>> unionPrunedColLists;
+  private final Map<Operator<? extends OperatorDesc>, List<FieldNode>> prunedColLists;
+  private final Map<CommonJoinOperator, Map<Byte, List<FieldNode>>> joinPrunedColLists;
 
   public ColumnPrunerProcCtx(ParseContext pctx) {
     this.pctx = pctx;
-    prunedColLists = new HashMap<Operator<? extends OperatorDesc>, List<String>>();
-    prunedNestedColLists = new HashMap<Operator<? extends OperatorDesc>, List<String>>();
-    joinPrunedColLists = new HashMap<CommonJoinOperator, Map<Byte, List<String>>>();
-    unionPrunedColLists = new HashMap<>();
+    prunedColLists = new HashMap<>();
+    joinPrunedColLists = new HashMap<>();
   }
 
   public ParseContext getParseContext() {
     return pctx;
   }
 
-  public Map<CommonJoinOperator, Map<Byte, List<String>>> getJoinPrunedColLists() {
+  public Map<CommonJoinOperator, Map<Byte, List<FieldNode>>> getJoinPrunedColLists() {
     return joinPrunedColLists;
   }
 
-  public Map<UnionOperator, List<Integer>> getUnionPrunedColLists() {
-    return unionPrunedColLists;
-  }
-
-  /**
-   * @return the prunedColLists
-   */
-  public List<String> getPrunedColList(Operator<? extends OperatorDesc> op) {
+  public List<FieldNode> getPrunedColList(Operator<? extends OperatorDesc> op) {
     return prunedColLists.get(op);
   }
 
-  public Map<Operator<? extends OperatorDesc>, List<String>> getPrunedColLists() {
+  public Map<Operator<? extends OperatorDesc>, List<FieldNode>> getPrunedColLists() {
     return prunedColLists;
   }
 
-  public Map<Operator<? extends OperatorDesc>, List<String>> getPrunedNestedColLists() {
-    return prunedNestedColLists;
-  }
-
   /**
-   * Creates the list of internal column names(these names are used in the
-   * RowResolver and are different from the external column names) that are
-   * needed in the subtree. These columns eventually have to be selected from
-   * the table scan.
+   * Creates the list of internal column names(represented by field nodes,
+   * these names are used in the RowResolver and are different from the
+   * external column names) that are needed in the subtree. These columns
+   * eventually have to be selected from the table scan.
    *
-   * @param curOp
-   *          The root of the operator subtree.
-   * @return List<String> of the internal column names.
-   * @throws SemanticException
+   * @param curOp The root of the operator subtree.
+   * @return a list of field nodes representing the internal column names.
    */
-  public List<String> genColLists(Operator<? extends OperatorDesc> curOp)
+  public List<FieldNode> genColLists(Operator<? extends OperatorDesc> curOp)
       throws SemanticException {
     if (curOp.getChildOperators() == null) {
       return null;
     }
-    List<String> colList = null;
+    List<FieldNode> colList = null;
     for (Operator<? extends OperatorDesc> child : curOp.getChildOperators()) {
-      List<String> prunList = null;
+      List<FieldNode> prunList = null;
       if (child instanceof CommonJoinOperator) {
         int tag = child.getParentOperators().indexOf(curOp);
         prunList = joinPrunedColLists.get(child).get((byte) tag);
-      } else if (child instanceof UnionOperator) {
-        List<Integer> positions = unionPrunedColLists.get(child);
-        if (positions != null) {
-          prunList = new ArrayList<>();
-          RowSchema oldRS = curOp.getSchema();
-          for (Integer pos : positions) {
-            ColumnInfo colInfo = oldRS.getSignature().get(pos);
-            prunList.add(colInfo.getInternalName());
-          }
-        }
       } else if (child instanceof FileSinkOperator) {
         prunList = new ArrayList<>();
         RowSchema oldRS = curOp.getSchema();
         for (ColumnInfo colInfo : oldRS.getSignature()) {
-          prunList.add(colInfo.getInternalName());
+          prunList.add(new FieldNode(colInfo.getInternalName()));
         }
       } else {
         prunList = prunedColLists.get(child);
@@ -147,49 +113,25 @@ public class ColumnPrunerProcCtx implements NodeProcessorCtx {
         continue;
       }
       if (colList == null) {
-        colList = new ArrayList<String>(prunList);
+        colList = new ArrayList<>(prunList);
       } else {
-        colList = Utilities.mergeUniqElems(colList, prunList);
+        colList = mergeFieldNodes(colList, prunList);
       }
     }
     return colList;
   }
 
   /**
-   * Get the path to the root column for the nested column attribute
-   *
-   * @param curOp current operator
-   * @return the nested column paths for current operator and its child operator
-   */
-  public List<String> genNestedColPaths(Operator<? extends OperatorDesc> curOp) {
-    if (curOp.getChildOperators() == null) {
-      return null;
-    }
-    Set<String> groupPathsList = new HashSet<>();
-
-    for (Operator<? extends OperatorDesc> child : curOp.getChildOperators()) {
-      if (prunedNestedColLists.containsKey(child)) {
-        groupPathsList.addAll(prunedNestedColLists.get(child));
-      }
-    }
-
-    return new ArrayList<>(groupPathsList);
-  }
-
-  /**
-   * Creates the list of internal column names(these names are used in the
-   * RowResolver and are different from the external column names) that are
-   * needed in the subtree. These columns eventually have to be selected from
-   * the table scan.
+   * Creates the list of internal column names (represented by field nodes,
+   * these names are used in the RowResolver and are different from the
+   * external column names) that are needed in the subtree. These columns
+   * eventually have to be selected from the table scan.
    *
-   * @param curOp
-   *          The root of the operator subtree.
-   * @param child
-   *          The consumer.
-   * @return List<String> of the internal column names.
-   * @throws SemanticException
+   * @param curOp The root of the operator subtree.
+   * @param child The consumer.
+   * @return a list of field nodes representing the internal column names.
    */
-  public List<String> genColLists(Operator<? extends OperatorDesc> curOp,
+  public List<FieldNode> genColLists(Operator<? extends OperatorDesc> curOp,
           Operator<? extends OperatorDesc> child)
       throws SemanticException {
     if (curOp.getChildOperators() == null) {
@@ -198,43 +140,31 @@ public class ColumnPrunerProcCtx implements NodeProcessorCtx {
     if (child instanceof CommonJoinOperator) {
       int tag = child.getParentOperators().indexOf(curOp);
       return joinPrunedColLists.get(child).get((byte) tag);
-    } else if (child instanceof UnionOperator) {
-      List<Integer> positions = unionPrunedColLists.get(child);
-      List<String> prunList = new ArrayList<>();
-      if (positions != null && positions.size() > 0) {
-        RowSchema oldRS = curOp.getSchema();
-        for (Integer pos : positions) {
-          ColumnInfo colInfo = oldRS.getSignature().get(pos);
-          prunList.add(colInfo.getInternalName());
-        }
-      }
-      return prunList;
     } else {
       return prunedColLists.get(child);
     }
   }
 
   /**
-   * Creates the list of internal column names from select expressions in a
-   * select operator. This function is used for the select operator instead of
-   * the genColLists function (which is used by the rest of the operators).
+   * Creates the list of internal column names (represented by field nodes)
+   * from select expressions in a select operator. This function is used for the
+   * select operator instead of the genColLists function (which is used by
+   * the rest of the operators).
    *
-   * @param op
-   *          The select operator.
-   * @return List<String> of the internal column names.
+   * @param op The select operator.
+   * @return a list of field nodes representing the internal column names.
    */
-  public List<String> getColsFromSelectExpr(SelectOperator op) {
-    List<String> cols = new ArrayList<String>();
+  public List<FieldNode> getColsFromSelectExpr(SelectOperator op) {
+    List<FieldNode> cols = new ArrayList<>();
     SelectDesc conf = op.getConf();
     if(conf.isSelStarNoCompute()) {
       for (ColumnInfo colInfo : op.getSchema().getSignature()) {
-        cols.add(colInfo.getInternalName());
+        cols.add(new FieldNode(colInfo.getInternalName()));
       }
-    }
-    else {
+    } else {
       List<ExprNodeDesc> exprList = conf.getColList();
         for (ExprNodeDesc expr : exprList) {
-          cols = Utilities.mergeUniqElems(cols, expr.getCols());
+          cols = mergeFieldNodesWithDesc(cols, expr);
         }
     }
     return cols;
@@ -243,16 +173,14 @@ public class ColumnPrunerProcCtx implements NodeProcessorCtx {
   /**
    * Creates the list of internal column names for select * expressions.
    *
-   * @param op
-   *          The select operator.
-   * @param colList
-   *          The list of internal column names returned by the children of the
-   *          select operator.
-   * @return List<String> of the internal column names.
+   * @param op The select operator.
+   * @param colList The list of internal column names (represented by field nodes)
+   *                returned by the children of the select operator.
+   * @return a list of field nodes representing the internal column names.
    */
-  public List<String> getSelectColsFromChildren(SelectOperator op,
-      List<String> colList) {
-    List<String> cols = new ArrayList<String>();
+  public List<FieldNode> getSelectColsFromChildren(SelectOperator op,
+      List<FieldNode> colList) {
+    List<FieldNode> cols = new ArrayList<>();
     SelectDesc conf = op.getConf();
 
     if (colList != null  && conf.isSelStarNoCompute()) {
@@ -268,9 +196,24 @@ public class ColumnPrunerProcCtx implements NodeProcessorCtx {
     // input columns are used.
     List<String> outputColumnNames = conf.getOutputColumnNames();
     for (int i = 0; i < outputColumnNames.size(); i++) {
-      if (colList == null || colList.contains(outputColumnNames.get(i))) {
-        ExprNodeDesc expr = selectExprs.get(i);
-        cols = Utilities.mergeUniqElems(cols, expr.getCols());
+      if (colList == null) {
+        cols = mergeFieldNodesWithDesc(cols, selectExprs.get(i));
+      } else {
+        FieldNode childFn = lookupColumn(colList, outputColumnNames.get(i));
+        if (childFn != null) {
+          // In SemanticAnalyzer we inject SEL op before aggregation. The columns
+          // in this SEL are derived from the table schema, and do not reflect the
+          // actual columns being selected in the current query.
+          // In this case, we skip the merge and just use the path from the child ops.
+          ExprNodeDesc desc = selectExprs.get(i);
+          if (desc instanceof ExprNodeColumnDesc && ((ExprNodeColumnDesc) desc).getIsGenerated()) {
+            FieldNode fn = new FieldNode(((ExprNodeColumnDesc) desc).getColumn());
+            fn.setNodes(childFn.getNodes());
+            cols = mergeFieldNodes(cols, fn);
+          } else {
+            cols = mergeFieldNodesWithDesc(cols, selectExprs.get(i));
+          }
+        }
       }
     }
 
@@ -278,56 +221,30 @@ public class ColumnPrunerProcCtx implements NodeProcessorCtx {
   }
 
   /**
-   * Creates the list of internal group paths for select * expressions.
-   *
-   * @param op        The select operator.
-   * @param paths The list of nested column paths returned by the children of the
-   *                  select operator.
-   * @return List<String> of the nested column path from leaf to the root.
+   * Given the 'desc', construct a list of field nodes representing the
+   * nested columns paths referenced by this 'desc'.
+   * @param desc the node descriptor
+   * @return a list of nested column paths referenced in the 'desc'
    */
-  public List<String> getSelectNestedColPathsFromChildren(
-    SelectOperator op,
-    List<String> paths) {
-    List<String> groups = new ArrayList<>();
-    SelectDesc conf = op.getConf();
-
-    if (paths != null && conf.isSelStarNoCompute()) {
-      groups.addAll(paths);
-      return groups;
-    }
-
-    List<ExprNodeDesc> selectDescs = conf.getColList();
-
-    List<String> outputColumnNames = conf.getOutputColumnNames();
-    for (int i = 0; i < outputColumnNames.size(); i++) {
-      if (paths == null || paths.contains(outputColumnNames.get(i))) {
-        ExprNodeDesc desc = selectDescs.get(i);
-        List<String> gp = getNestedColPathByDesc(desc);
-        groups.addAll(gp);
-      }
-    }
-
-    return groups;
-  }
-
-  // Entry method
-  private List<String> getNestedColPathByDesc(ExprNodeDesc desc) {
-    List<String> res = new ArrayList<>();
-    getNestedColsFromExprNodeDesc(desc, "", res);
-    return res;
+  private static List<FieldNode> getNestedColPathByDesc(ExprNodeDesc desc) {
+    List<FieldNode> res = new ArrayList<>();
+    getNestedColsFromExprNodeDesc(desc, null, res);
+    return mergeFieldNodes(new ArrayList<FieldNode>(), res);
   }
 
-  private void getNestedColsFromExprNodeDesc(
+  private static void getNestedColsFromExprNodeDesc(
     ExprNodeDesc desc,
-    String pathToRoot,
-    List<String> paths) {
+    FieldNode pathToRoot,
+    List<FieldNode> paths) {
     if (desc instanceof ExprNodeColumnDesc) {
       String f = ((ExprNodeColumnDesc) desc).getColumn();
-      String p = pathToRoot.isEmpty() ? f : f + "." + pathToRoot;
+      FieldNode p = new FieldNode(f);
+      p.addFieldNodes(pathToRoot);
       paths.add(p);
     } else if (desc instanceof ExprNodeFieldDesc) {
       String f = ((ExprNodeFieldDesc) desc).getFieldName();
-      String p = pathToRoot.isEmpty() ? f : f + "." + pathToRoot;
+      FieldNode p = new FieldNode(f);
+      p.addFieldNodes(pathToRoot);
       getNestedColsFromExprNodeDesc(((ExprNodeFieldDesc) desc).getDesc(), p, paths);
     } else {
       List<ExprNodeDesc> children = desc.getChildren();
@@ -343,11 +260,11 @@ public class ColumnPrunerProcCtx implements NodeProcessorCtx {
   /**
    * Create the list of internal columns for select tag of LV
    */
-  public List<String> getSelectColsFromLVJoin(RowSchema rs,
-      List<String> colList) throws SemanticException {
-    List<String> columns = new ArrayList<String>();
-    for (String col : colList) {
-      if (rs.getColumnInfo(col) != null) {
+  public List<FieldNode> getSelectColsFromLVJoin(RowSchema rs,
+      List<FieldNode> colList) throws SemanticException {
+    List<FieldNode> columns = new ArrayList<>();
+    for (FieldNode col : colList) {
+      if (rs.getColumnInfo(col.getFieldName()) != null) {
         columns.add(col);
       }
     }
@@ -369,13 +286,11 @@ public class ColumnPrunerProcCtx implements NodeProcessorCtx {
     if (curOp.getChildOperators() == null || !(curOp instanceof FilterOperator)) {
       return;
     }
-    List<String> parentPrunList = prunedColLists.get(curOp);
+    List<FieldNode> parentPrunList = prunedColLists.get(curOp);
     if(parentPrunList == null || parentPrunList.size() == 0) {
       return;
     }
-    FilterOperator filOp = (FilterOperator)curOp;
-    List<String> prunList = null;
-    List<Integer>[] childToParentIndex = null;
+    List<FieldNode> prunList = null;
 
     for (Operator<? extends OperatorDesc> child : curOp.getChildOperators()) {
       if (child instanceof UnionOperator) {
@@ -389,7 +304,7 @@ public class ColumnPrunerProcCtx implements NodeProcessorCtx {
         Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
         ArrayList<ColumnInfo> outputRS = new ArrayList<ColumnInfo>();
         for (ColumnInfo colInfo : child.getSchema().getSignature()) {
-          if (!prunList.contains(colInfo.getInternalName())) {
+          if (lookupColumn(prunList, colInfo.getInternalName()) == null) {
             continue;
           }
           ExprNodeDesc colDesc = new ExprNodeColumnDesc(colInfo.getType(),
@@ -408,10 +323,36 @@ public class ColumnPrunerProcCtx implements NodeProcessorCtx {
             select, new RowSchema(outputRS), curOp);
         OperatorFactory.makeChild(sel, child);
         sel.setColumnExprMap(colExprMap);
-
       }
+    }
+  }
 
+  static ArrayList<String> toColumnNames(List<FieldNode> columns) {
+    ArrayList<String> names = new ArrayList<>();
+    for (FieldNode fn : columns) {
+      names.add(fn.getFieldName());
     }
+    return names;
   }
 
+  static List<FieldNode> fromColumnNames(List<String> columnNames) {
+    List<FieldNode> fieldNodes = new ArrayList<>();
+    for (String cn : columnNames) {
+      fieldNodes.add(new FieldNode(cn));
+    }
+    return fieldNodes;
+  }
+
+  static FieldNode lookupColumn(Collection<FieldNode> columns, String colName) {
+    for (FieldNode fn : columns) {
+      if (fn.getFieldName() != null && fn.getFieldName().equals(colName)) {
+        return fn;
+      }
+    }
+    return null;
+  }
+
+  static List<FieldNode> mergeFieldNodesWithDesc(List<FieldNode> left, ExprNodeDesc desc) {
+    return FieldNode.mergeFieldNodes(left, getNestedColPathByDesc(desc));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/a625bb03/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
index 6ca4df9..7681a83 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.optimizer;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -79,6 +80,12 @@ import org.apache.hadoop.hive.ql.plan.ptf.WindowTableFunctionDef;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
+import static org.apache.hadoop.hive.ql.optimizer.ColumnPrunerProcCtx.fromColumnNames;
+import static org.apache.hadoop.hive.ql.optimizer.ColumnPrunerProcCtx.lookupColumn;
+import static org.apache.hadoop.hive.ql.optimizer.ColumnPrunerProcCtx.mergeFieldNodesWithDesc;
+import static org.apache.hadoop.hive.ql.optimizer.ColumnPrunerProcCtx.toColumnNames;
+import static org.apache.hadoop.hive.ql.optimizer.FieldNode.mergeFieldNodes;
+
 /**
  * Factory for generating the different node processors used by ColumnPruner.
  */
@@ -98,11 +105,8 @@ public final class ColumnPrunerProcFactory {
       FilterOperator op = (FilterOperator) nd;
       ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx;
       ExprNodeDesc condn = op.getConf().getPredicate();
-      // get list of columns used in the filter
-      List<String> cl = condn.getCols();
-      // merge it with the downstream col list
-      List<String> filterOpPrunedColLists = Utilities.mergeUniqElems(cppCtx.genColLists(op), cl);
-      List<String> filterOpPrunedColListsOrderPreserved = preserveColumnOrder(op,
+      List<FieldNode> filterOpPrunedColLists = mergeFieldNodesWithDesc(cppCtx.genColLists(op), condn);
+      List<FieldNode> filterOpPrunedColListsOrderPreserved = preserveColumnOrder(op,
           filterOpPrunedColLists);
       cppCtx.getPrunedColLists().put(op,
           filterOpPrunedColListsOrderPreserved);
@@ -131,25 +135,27 @@ public final class ColumnPrunerProcFactory {
         Object... nodeOutputs) throws SemanticException {
       GroupByOperator gbOp = (GroupByOperator) nd;
       ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx;
-      List<String> colLists = new ArrayList<String>();
+      List<FieldNode> colLists = new ArrayList<>();
       GroupByDesc conf = gbOp.getConf();
+
       ArrayList<ExprNodeDesc> keys = conf.getKeys();
       for (ExprNodeDesc key : keys) {
-        colLists = Utilities.mergeUniqElems(colLists, key.getCols());
+        colLists = mergeFieldNodesWithDesc(colLists, key);
       }
 
       ArrayList<AggregationDesc> aggrs = conf.getAggregators();
       for (AggregationDesc aggr : aggrs) {
         ArrayList<ExprNodeDesc> params = aggr.getParameters();
         for (ExprNodeDesc param : params) {
-          colLists = Utilities.mergeUniqElems(colLists, param.getCols());
+          colLists = mergeFieldNodesWithDesc(colLists, param);
         }
       }
+
       int groupingSetPosition = conf.getGroupingSetPosition();
       if (groupingSetPosition >= 0) {
-        List<String> neededCols = cppCtx.genColLists(gbOp);
+        List<FieldNode> neededCols = cppCtx.genColLists(gbOp);
         String groupingColumn = conf.getOutputColumnNames().get(groupingSetPosition);
-        if (!neededCols.contains(groupingColumn)) {
+        if (lookupColumn(neededCols, groupingColumn) == null) {
           conf.getOutputColumnNames().remove(groupingSetPosition);
           if (gbOp.getSchema() != null) {
             gbOp.getSchema().getSignature().remove(groupingSetPosition);
@@ -163,8 +169,8 @@ public final class ColumnPrunerProcFactory {
         if (child instanceof SelectOperator || child instanceof ReduceSinkOperator) {
           continue;
         }
-        List<String> colList = cppCtx.genColLists(gbOp, child);
-        Set<String> neededCols = new HashSet<String>();
+        List<FieldNode> colList = cppCtx.genColLists(gbOp, child);
+        Set<FieldNode> neededCols = new HashSet<>();
         if (colList != null) {
           neededCols.addAll(colList);
         } else {
@@ -177,7 +183,7 @@ public final class ColumnPrunerProcFactory {
           Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
           ArrayList<ColumnInfo> outputRS = new ArrayList<ColumnInfo>();
           for (ColumnInfo colInfo : gbOp.getSchema().getSignature()) {
-            if (!neededCols.contains(colInfo.getInternalName())) {
+            if (lookupColumn(neededCols, colInfo.getInternalName()) == null) {
               continue;
             }
             ExprNodeDesc colDesc = new ExprNodeColumnDesc(colInfo.getType(),
@@ -223,14 +229,14 @@ public final class ColumnPrunerProcFactory {
       Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;
       RowSchema inputRS = op.getSchema();
 
-      List<String> prunedCols = cppCtx.getPrunedColList(op.getChildOperators()
+      List<FieldNode> prunedCols = cppCtx.getPrunedColList(op.getChildOperators()
           .get(0));
       Operator<? extends OperatorDesc> parent = op.getParentOperators().get(0);
       RowSchema parentRS = parent.getSchema();
       List<ColumnInfo> sig = parentRS.getSignature();
-      List<String> colList = new ArrayList<String>();
+      List<FieldNode> colList = new ArrayList<>();
       for (ColumnInfo cI : sig) {
-        colList.add(cI.getInternalName());
+        colList.add(new FieldNode(cI.getInternalName()));
       }
 
       if (prunedCols.size() != inputRS.getSignature().size()
@@ -239,7 +245,8 @@ public final class ColumnPrunerProcFactory {
         ArrayList<String> outputs = new ArrayList<String>();
         Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
         ArrayList<ColumnInfo> outputRS = new ArrayList<ColumnInfo>();
-        for (String internalName : prunedCols) {
+        for (FieldNode internalCol: prunedCols) {
+          String internalName = internalCol.getFieldName();
           ColumnInfo valueInfo = inputRS.getColumnInfo(internalName);
           ExprNodeDesc colDesc = new ExprNodeColumnDesc(valueInfo.getType(),
               valueInfo.getInternalName(), valueInfo.getTabAlias(), valueInfo.getIsVirtualCol());
@@ -273,7 +280,7 @@ public final class ColumnPrunerProcFactory {
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
         Object... nodeOutputs) throws SemanticException {
       super.process(nd, stack, ctx, nodeOutputs);
-      List<String> cols = ((ColumnPrunerProcCtx)ctx).getPrunedColLists().get(nd);
+      List<FieldNode> cols = ((ColumnPrunerProcCtx) ctx).getPrunedColLists().get(nd);
       if (null != cols) {
         pruneOperator(ctx, (LimitOperator) nd, cols);
       }
@@ -313,14 +320,14 @@ public final class ColumnPrunerProcFactory {
         return super.process(nd, stack, cppCtx, nodeOutputs);
       }
 
-      List<String> prunedCols = cppCtx.getPrunedColList(op.getChildOperators().get(0));
+      List<FieldNode> prunedCols = cppCtx.getPrunedColList(op.getChildOperators().get(0));
       if (conf.forWindowing()) {
         WindowTableFunctionDef def = (WindowTableFunctionDef) funcDef;
-        prunedCols = Utilities.mergeUniqElems(getWindowFunctionColumns(def), prunedCols);
+        prunedCols = mergeFieldNodes(prunedCols, getWindowFunctionColumns(def));
       } else if (conf.forNoop()) {
         prunedCols = new ArrayList(cppCtx.getPrunedColList(op.getChildOperators().get(0)));
       } else {
-        prunedCols = referencedColumns;
+        prunedCols = fromColumnNames(referencedColumns);
       }
 
       List<ColumnInfo> newRS = prunedColumnsList(prunedCols, op.getSchema(), funcDef);
@@ -328,16 +335,16 @@ public final class ColumnPrunerProcFactory {
       op.getSchema().setSignature(new ArrayList<ColumnInfo>(newRS));
 
       ShapeDetails outputShape = funcDef.getStartOfChain().getInput().getOutputShape();
-      cppCtx.getPrunedColLists().put(op, outputShape.getColumnNames());
+      cppCtx.getPrunedColLists().put(op, fromColumnNames(outputShape.getColumnNames()));
       return null;
     }
 
-    private List<ColumnInfo> buildPrunedRS(List<String> prunedCols, RowSchema oldRS)
+    private List<ColumnInfo> buildPrunedRS(List<FieldNode> prunedCols, RowSchema oldRS)
         throws SemanticException {
       ArrayList<ColumnInfo> sig = new ArrayList<ColumnInfo>();
-      HashSet<String> prunedColsSet = new HashSet<String>(prunedCols);
+      HashSet<FieldNode> prunedColsSet = new HashSet<>(prunedCols);
       for (ColumnInfo cInfo : oldRS.getSignature()) {
-        if (prunedColsSet.contains(cInfo.getInternalName())) {
+        if (lookupColumn(prunedColsSet, cInfo.getInternalName()) != null) {
           sig.add(cInfo);
         }
       }
@@ -345,22 +352,21 @@ public final class ColumnPrunerProcFactory {
     }
 
     // always should be in this order (see PTFDeserializer#initializeWindowing)
-    private List<String> getWindowFunctionColumns(WindowTableFunctionDef tDef) {
-      List<String> columns = new ArrayList<String>();
+    private List<FieldNode> getWindowFunctionColumns(WindowTableFunctionDef tDef) {
+      List<FieldNode> columns = new ArrayList<>();
       if (tDef.getWindowFunctions() != null) {
         for (WindowFunctionDef wDef : tDef.getWindowFunctions()) {
-          columns.add(wDef.getAlias());
+          columns.add(new FieldNode(wDef.getAlias()));
         }
       }
       return columns;
     }
 
-    private RowResolver buildPrunedRR(List<String> prunedCols, RowSchema oldRS)
-        throws SemanticException {
+    private RowResolver buildPrunedRR(List<FieldNode> prunedCols, RowSchema oldRS) throws SemanticException {
       RowResolver resolver = new RowResolver();
-      HashSet<String> prunedColsSet = new HashSet<String>(prunedCols);
+      HashSet<FieldNode> prunedColsSet = new HashSet<>(prunedCols);
       for (ColumnInfo cInfo : oldRS.getSignature()) {
-        if (prunedColsSet.contains(cInfo.getInternalName())) {
+        if (lookupColumn(prunedColsSet, cInfo.getInternalName()) != null) {
           resolver.put(cInfo.getTabAlias(), cInfo.getAlias(), cInfo);
         }
       }
@@ -370,7 +376,7 @@ public final class ColumnPrunerProcFactory {
     /*
      * add any input columns referenced in WindowFn args or expressions.
      */
-    private List<ColumnInfo> prunedColumnsList(List<String> prunedCols, RowSchema oldRS,
+    private List<ColumnInfo> prunedColumnsList(List<FieldNode> prunedCols, RowSchema oldRS,
         PartitionedTableFunctionDef pDef) throws SemanticException {
       pDef.getOutputShape().setRr(null);
       pDef.getOutputShape().setColumnNames(null);
@@ -383,20 +389,20 @@ public final class ColumnPrunerProcFactory {
             }
             for (PTFExpressionDef arg : wDef.getArgs()) {
               ExprNodeDesc exprNode = arg.getExprNode();
-              Utilities.mergeUniqElems(prunedCols, exprNode.getCols());
+              prunedCols = mergeFieldNodesWithDesc(prunedCols, exprNode);
             }
           }
         }
         if (tDef.getPartition() != null) {
           for (PTFExpressionDef col : tDef.getPartition().getExpressions()) {
             ExprNodeDesc exprNode = col.getExprNode();
-            Utilities.mergeUniqElems(prunedCols, exprNode.getCols());
+            prunedCols = mergeFieldNodesWithDesc(prunedCols, exprNode);
           }
         }
         if (tDef.getOrder() != null) {
           for (PTFExpressionDef col : tDef.getOrder().getExpressions()) {
             ExprNodeDesc exprNode = col.getExprNode();
-            Utilities.mergeUniqElems(prunedCols, exprNode.getCols());
+            prunedCols = mergeFieldNodesWithDesc(prunedCols, exprNode);
           }
         }
       } else {
@@ -408,9 +414,9 @@ public final class ColumnPrunerProcFactory {
         return prunedColumnsList(prunedCols, oldRS, (PartitionedTableFunctionDef)input);
       }
 
-      ArrayList<String> inputColumns = prunedInputList(prunedCols, input);
+      ArrayList<FieldNode> inputColumns = prunedInputList(prunedCols, input);
       input.getOutputShape().setRr(buildPrunedRR(inputColumns, oldRS));
-      input.getOutputShape().setColumnNames(inputColumns);
+      input.getOutputShape().setColumnNames(toColumnNames(inputColumns));
 
       return buildPrunedRS(prunedCols, oldRS);
     }
@@ -419,17 +425,17 @@ public final class ColumnPrunerProcFactory {
      * from the prunedCols list filter out columns that refer to WindowFns or WindowExprs
      * the returned list is set as the prunedList needed by the PTFOp.
      */
-    private ArrayList<String> prunedInputList(List<String> prunedCols, PTFInputDef tDef) {
-      ArrayList<String> prunedInputCols = new ArrayList<String>();
+    private ArrayList<FieldNode> prunedInputList(List<FieldNode> prunedCols, PTFInputDef tDef) {
+      ArrayList<FieldNode> prunedInputCols = new ArrayList<>();
 
       StructObjectInspector OI = tDef.getOutputShape().getOI();
       for(StructField f : OI.getAllStructFieldRefs()) {
         String fName = f.getFieldName();
-        if ( prunedCols.contains(fName)) {
-          prunedInputCols.add(fName);
+        FieldNode fn = lookupColumn(prunedCols, fName);
+        if (fn != null) {
+          prunedInputCols.add(fn);
         }
       }
-
       return prunedInputCols;
     }
   }
@@ -477,23 +483,19 @@ public final class ColumnPrunerProcFactory {
         Object... nodeOutputs) throws SemanticException {
       TableScanOperator scanOp = (TableScanOperator) nd;
       ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx;
-      List<String> cols = cppCtx
+      List<FieldNode> cols = cppCtx
           .genColLists((Operator<? extends OperatorDesc>) nd);
       if (cols == null && !scanOp.getConf().isGatherStats() ) {
         scanOp.setNeededColumnIDs(null);
         return null;
       }
 
-      cols = cols == null ? new ArrayList<String>() : cols;
-      List nestedCols = cppCtx.genNestedColPaths((Operator<? extends OperatorDesc>) nd);
+      cols = cols == null ? new ArrayList<FieldNode>() : cols;
 
       cppCtx.getPrunedColLists().put((Operator<? extends OperatorDesc>) nd, cols);
-      cppCtx.getPrunedNestedColLists().put((Operator<? extends OperatorDesc>) nd, nestedCols);
       RowSchema inputRS = scanOp.getSchema();
       setupNeededColumns(scanOp, inputRS, cols);
 
-      scanOp.setNeededNestedColumnPaths(nestedCols);
-
       return null;
     }
   }
@@ -502,9 +504,10 @@ public final class ColumnPrunerProcFactory {
    * RowSchema as well as the needed virtual columns, into TableScanDesc.
    */
   public static void setupNeededColumns(TableScanOperator scanOp, RowSchema inputRS,
-      List<String> cols) throws SemanticException {
+      List<FieldNode> cols) throws SemanticException {
     List<Integer> neededColumnIds = new ArrayList<Integer>();
     List<String> neededColumnNames = new ArrayList<String>();
+    List<String> neededNestedColumnPaths = new ArrayList<>();
     List<String> referencedColumnNames = new ArrayList<String>();
     TableScanDesc desc = scanOp.getConf();
     List<VirtualColumn> virtualCols = desc.getVirtualCols();
@@ -512,10 +515,11 @@ public final class ColumnPrunerProcFactory {
 
     // add virtual columns for ANALYZE TABLE
     if(scanOp.getConf().isGatherStats()) {
-      cols.add(VirtualColumn.RAWDATASIZE.getName());
+      cols.add(new FieldNode(VirtualColumn.RAWDATASIZE.getName()));
     }
 
-    for (String column : cols) {
+    for (FieldNode fn : cols) {
+      String column = fn.getFieldName();
       ColumnInfo colInfo = inputRS.getColumnInfo(column);
       if (colInfo == null) {
         continue;
@@ -538,12 +542,14 @@ public final class ColumnPrunerProcFactory {
         // get the needed columns by id and name
         neededColumnIds.add(position);
         neededColumnNames.add(column);
+        neededNestedColumnPaths.addAll(fn.toPaths());
       }
     }
 
     desc.setVirtualCols(newVirtualCols);
     scanOp.setNeededColumnIDs(neededColumnIds);
     scanOp.setNeededColumns(neededColumnNames);
+    scanOp.setNeededNestedColumnPaths(neededNestedColumnPaths);
     scanOp.setReferencedColumns(referencedColumnNames);
   }
 
@@ -567,21 +573,21 @@ public final class ColumnPrunerProcFactory {
       ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx;
       ReduceSinkDesc conf = op.getConf();
 
-      List<String> colLists = new ArrayList<String>();
+      List<FieldNode> colLists = new ArrayList<>();
       ArrayList<ExprNodeDesc> keys = conf.getKeyCols();
       LOG.debug("Reduce Sink Operator " + op.getIdentifier() + " key:" + keys);
       for (ExprNodeDesc key : keys) {
-        colLists = Utilities.mergeUniqElems(colLists, key.getCols());
+        colLists = mergeFieldNodesWithDesc(colLists, key);
       }
       for (ExprNodeDesc key : conf.getPartitionCols()) {
-        colLists = Utilities.mergeUniqElems(colLists, key.getCols());
+        colLists = mergeFieldNodesWithDesc(colLists, key);
       }
 
       assert op.getNumChild() == 1;
 
       Operator<? extends OperatorDesc> child = op.getChildOperators().get(0);
 
-      List<String> childCols = null;
+      List<FieldNode> childCols = null;
       if (child instanceof CommonJoinOperator) {
         childCols = cppCtx.getJoinPrunedColLists().get(child) == null
                 ? null : cppCtx.getJoinPrunedColLists().get(child)
@@ -596,16 +602,21 @@ public final class ColumnPrunerProcFactory {
       if (childCols != null) {
         boolean[] flags = new boolean[valCols.size()];
 
-        for (String childCol : childCols) {
-          int index = valColNames.indexOf(Utilities.removeValueTag(childCol));
+        for (FieldNode childCol : childCols) {
+          int index = valColNames.indexOf(Utilities.removeValueTag(childCol.getFieldName()));
           if (index < 0) {
             continue;
           }
           flags[index] = true;
-          colLists = Utilities.mergeUniqElems(colLists, valCols.get(index).getCols());
+          colLists = mergeFieldNodesWithDesc(colLists, valCols.get(index));
         }
 
-        Collections.sort(colLists);
+        Collections.sort(colLists, new Comparator<FieldNode>() {
+          @Override
+          public int compare(FieldNode o1, FieldNode o2) {
+            return o1.getFieldName().compareTo(o2.getFieldName());
+          }
+        });
         pruneReduceSinkOperator(flags, op, cppCtx);
         cppCtx.getPrunedColLists().put(op, colLists);
         return null;
@@ -614,7 +625,7 @@ public final class ColumnPrunerProcFactory {
       // Reduce Sink contains the columns needed - no need to aggregate from
       // children
       for (ExprNodeDesc val : valCols) {
-        colLists = Utilities.mergeUniqElems(colLists, val.getCols());
+        colLists = mergeFieldNodesWithDesc(colLists, val);
       }
 
       cppCtx.getPrunedColLists().put(op, colLists);
@@ -640,7 +651,7 @@ public final class ColumnPrunerProcFactory {
         Object... nodeOutputs) throws SemanticException {
       LateralViewJoinOperator op = (LateralViewJoinOperator) nd;
       ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx;
-      List<String> cols = cppCtx.genColLists(op);
+      List<FieldNode> cols = cppCtx.genColLists(op);
       if (cols == null) {
         return null;
       }
@@ -658,25 +669,25 @@ public final class ColumnPrunerProcFactory {
       // columns from SEL(*) branch only and append all columns from UDTF branch to it
       int numSelColumns = op.getConf().getNumSelColumns();
 
-      List<String> colsAfterReplacement = new ArrayList<String>();
-      ArrayList<String> newColNames = new ArrayList<String>();
-      for (String col : cols) {
-        int index = outputCols.indexOf(col);
+      List<FieldNode> colsAfterReplacement = new ArrayList<>();
+      List<FieldNode> newCols = new ArrayList<>();
+      for (FieldNode col : cols) {
+        int index = outputCols.indexOf(col.getFieldName());
         // colExprMap.size() == size of cols from SEL(*) branch
         if (index >= 0 && index < numSelColumns) {
-          ExprNodeDesc transformed = colExprMap.get(col);
-          Utilities.mergeUniqElems(colsAfterReplacement, transformed.getCols());
-          newColNames.add(col);
+          ExprNodeDesc transformed = colExprMap.get(col.getFieldName());
+          colsAfterReplacement = mergeFieldNodesWithDesc(colsAfterReplacement, transformed);
+          newCols.add(col);
         }
       }
       // update number of columns from sel(*)
-      op.getConf().setNumSelColumns(newColNames.size());
+      op.getConf().setNumSelColumns(newCols.size());
 
       // add all UDTF columns
       // following SEL will do CP for columns from UDTF, not adding SEL in here
-      newColNames.addAll(outputCols.subList(numSelColumns, outputCols.size()));
-      op.getConf().setOutputInternalColNames(newColNames);
-      pruneOperator(ctx, op, newColNames);
+      newCols.addAll(fromColumnNames(outputCols.subList(numSelColumns, outputCols.size())));
+      op.getConf().setOutputInternalColNames(toColumnNames(newCols));
+      pruneOperator(ctx, op, newCols);
       cppCtx.getPrunedColLists().put(op, colsAfterReplacement);
       return null;
     }
@@ -698,26 +709,26 @@ public final class ColumnPrunerProcFactory {
 
       // Update the info of SEL operator based on the pruned reordered columns
       // these are from ColumnPrunerSelectProc
-      List<String> cols = cppCtx.getPrunedColList(select);
+      List<FieldNode> cols = cppCtx.getPrunedColList(select);
       RowSchema rs = op.getSchema();
-      ArrayList<ExprNodeDesc> colList = new ArrayList<ExprNodeDesc>();
-      ArrayList<String> outputColNames = new ArrayList<String>();
-      for (String col : cols) {
+      ArrayList<ExprNodeDesc> colList = new ArrayList<>();
+      List<FieldNode> outputCols = new ArrayList<>();
+      for (FieldNode col : cols) {
         // revert output cols of SEL(*) to ExprNodeColumnDesc
-        ColumnInfo colInfo = rs.getColumnInfo(col);
+        ColumnInfo colInfo = rs.getColumnInfo(col.getFieldName());
         ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(colInfo);
         colList.add(colExpr);
-        outputColNames.add(col);
+        outputCols.add(col);
       }
       // replace SEL(*) to SEL(exprs)
       ((SelectDesc)select.getConf()).setSelStarNoCompute(false);
       ((SelectDesc)select.getConf()).setColList(colList);
-      ((SelectDesc)select.getConf()).setOutputColumnNames(outputColNames);
-      pruneOperator(ctx, select, outputColNames);
+      ((SelectDesc)select.getConf()).setOutputColumnNames(toColumnNames(outputCols));
+      pruneOperator(ctx, select, outputCols);
 
       Operator<?> udtfPath = op.getChildOperators().get(LateralViewJoinOperator.UDTF_TAG);
-      List<String> lvFCols = new ArrayList<String>(cppCtx.getPrunedColLists().get(udtfPath));
-      lvFCols = Utilities.mergeUniqElems(lvFCols, outputColNames);
+      List<FieldNode> lvFCols = new ArrayList<>(cppCtx.getPrunedColLists().get(udtfPath));
+      lvFCols = mergeFieldNodes(lvFCols, outputCols);
       pruneOperator(ctx, op, lvFCols);
 
       return null;
@@ -757,7 +768,7 @@ public final class ColumnPrunerProcFactory {
         }
       }
 
-      List<String> cols = cppCtx.genColLists(op);
+      List<FieldNode> cols = cppCtx.genColLists(op);
 
       SelectDesc conf = op.getConf();
 
@@ -774,7 +785,6 @@ public final class ColumnPrunerProcFactory {
       // and return the ones which have a marked column
       cppCtx.getPrunedColLists().put(op,
           cppCtx.getSelectColsFromChildren(op, cols));
-      cppCtx.getPrunedNestedColLists().put(op, cppCtx.getSelectNestedColPathsFromChildren(op, cols));
       if (cols == null || conf.isSelStarNoCompute()) {
         return null;
       }
@@ -788,8 +798,8 @@ public final class ColumnPrunerProcFactory {
       if (cppCtx.getParseContext().getColumnAccessInfo() != null
           && cppCtx.getParseContext().getViewProjectToTableSchema() != null
           && cppCtx.getParseContext().getViewProjectToTableSchema().containsKey(op)) {
-        for (String col : cols) {
-          int index = originalOutputColumnNames.indexOf(col);
+        for (FieldNode col : cols) {
+          int index = originalOutputColumnNames.indexOf(col.getFieldName());
           Table tab = cppCtx.getParseContext().getViewProjectToTableSchema().get(op);
           cppCtx.getParseContext().getColumnAccessInfo()
               .add(tab.getCompleteName(), tab.getCols().get(index).getName());
@@ -800,16 +810,16 @@ public final class ColumnPrunerProcFactory {
         ArrayList<String> newOutputColumnNames = new ArrayList<String>();
         ArrayList<ColumnInfo> rs_oldsignature = op.getSchema().getSignature();
         ArrayList<ColumnInfo> rs_newsignature = new ArrayList<ColumnInfo>();
-        for (String col : cols) {
-          int index = originalOutputColumnNames.indexOf(col);
-          newOutputColumnNames.add(col);
+        for (FieldNode col : cols) {
+          int index = originalOutputColumnNames.indexOf(col.getFieldName());
+          newOutputColumnNames.add(col.getFieldName());
           newColList.add(originalColList.get(index));
           rs_newsignature.add(rs_oldsignature.get(index));
         }
         op.getSchema().setSignature(rs_newsignature);
         conf.setColList(newColList);
         conf.setOutputColumnNames(newOutputColumnNames);
-        handleChildren(op, cols, cppCtx);
+        handleChildren(op, toColumnNames(cols), cppCtx);
       }
 
       return null;
@@ -999,21 +1009,21 @@ public final class ColumnPrunerProcFactory {
         throws SemanticException {
       ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx;
       UnionOperator op = (UnionOperator) nd;
-      List<String> childColLists = cppCtx.genColLists(op);
+      List<FieldNode> childColLists = cppCtx.genColLists(op);
       if (childColLists == null) {
         return null;
       }
       RowSchema inputSchema = op.getSchema();
       if (inputSchema != null) {
-        List<Integer> positions = new ArrayList<>();
-        RowSchema oldRS = op.getSchema();
-        for (int index = 0; index < oldRS.getSignature().size(); index++) {
-          ColumnInfo colInfo = oldRS.getSignature().get(index);
-          if (childColLists.contains(colInfo.getInternalName())) {
-            positions.add(index);
+        List<FieldNode> prunedCols = new ArrayList<>();
+        for (int index = 0; index < inputSchema.getSignature().size(); index++) {
+          ColumnInfo colInfo = inputSchema.getSignature().get(index);
+          FieldNode fn = lookupColumn(childColLists, colInfo.getInternalName());
+          if (fn != null) {
+            prunedCols.add(fn);
           }
         }
-        cppCtx.getUnionPrunedColLists().put(op, positions);
+        cppCtx.getPrunedColLists().put(op, prunedCols);
       }
       return null;
     }
@@ -1021,7 +1031,7 @@ public final class ColumnPrunerProcFactory {
 
   private static void pruneOperator(NodeProcessorCtx ctx,
       Operator<? extends OperatorDesc> op,
-      List<String> cols)
+      List<FieldNode> cols)
       throws SemanticException {
     // the pruning needs to preserve the order of columns in the input schema
     RowSchema inputSchema = op.getSchema();
@@ -1029,7 +1039,7 @@ public final class ColumnPrunerProcFactory {
       ArrayList<ColumnInfo> rs = new ArrayList<ColumnInfo>();
       RowSchema oldRS = op.getSchema();
       for(ColumnInfo i : oldRS.getSignature()) {
-        if ( cols.contains(i.getInternalName())) {
+        if (lookupColumn(cols, i.getInternalName()) != null) {
           rs.add(i);
         }
       }
@@ -1044,16 +1054,17 @@ public final class ColumnPrunerProcFactory {
    * @return
    * @throws SemanticException
    */
-  private static List<String> preserveColumnOrder(Operator<? extends OperatorDesc> op,
-      List<String> cols)
+  private static List<FieldNode> preserveColumnOrder(Operator<? extends OperatorDesc> op,
+      List<FieldNode> cols)
       throws SemanticException {
     RowSchema inputSchema = op.getSchema();
     if (inputSchema != null) {
-      ArrayList<String> rs = new ArrayList<String>();
+      ArrayList<FieldNode> rs = new ArrayList<>();
       ArrayList<ColumnInfo> inputCols = inputSchema.getSignature();
       for (ColumnInfo i: inputCols) {
-        if (cols.contains(i.getInternalName())) {
-          rs.add(i.getInternalName());
+        FieldNode fn = lookupColumn(cols, i.getInternalName());
+        if (fn != null) {
+          rs.add(fn);
         }
       }
       return rs;
@@ -1062,7 +1073,6 @@ public final class ColumnPrunerProcFactory {
     }
   }
 
-
   private static void pruneJoinOperator(NodeProcessorCtx ctx,
       CommonJoinOperator op, JoinDesc conf,
       Map<String, ExprNodeDesc> columnExprMap,
@@ -1073,14 +1083,14 @@ public final class ColumnPrunerProcFactory {
 
     LOG.info("JOIN " + op.getIdentifier() + " oldExprs: " + conf.getExprs());
 
-    List<String> childColLists = cppCtx.genColLists(op);
+    List<FieldNode> childColLists = cppCtx.genColLists(op);
     if (childColLists == null) {
       return;
     }
 
-    Map<Byte, List<String>> prunedColLists = new HashMap<Byte, List<String>>();
+    Map<Byte, List<FieldNode>> prunedColLists = new HashMap<>();
     for (byte tag : conf.getTagOrder()) {
-      prunedColLists.put(tag, new ArrayList<String>());
+      prunedColLists.put(tag, new ArrayList<FieldNode>());
     }
 
     //add the columns in join filters
@@ -1091,8 +1101,8 @@ public final class ColumnPrunerProcFactory {
       Map.Entry<Byte, List<ExprNodeDesc>> entry = iter.next();
       Byte tag = entry.getKey();
       for (ExprNodeDesc desc : entry.getValue()) {
-        List<String> cols = prunedColLists.get(tag);
-        cols = Utilities.mergeUniqElems(cols, desc.getCols());
+        List<FieldNode> cols = prunedColLists.get(tag);
+        cols = mergeFieldNodesWithDesc(cols, desc);
         prunedColLists.put(tag, cols);
      }
     }
@@ -1106,7 +1116,7 @@ public final class ColumnPrunerProcFactory {
       String internalName = conf.getOutputColumnNames().get(i);
       ExprNodeDesc desc = columnExprMap.get(internalName);
       Byte tag = conf.getReversedExprs().get(internalName);
-      if (!childColLists.contains(internalName)) {
+      if (lookupColumn(childColLists, internalName) == null) {
         int index = conf.getExprs().get(tag).indexOf(desc);
         if (index < 0) {
           continue;
@@ -1116,12 +1126,12 @@ public final class ColumnPrunerProcFactory {
           retainMap.get(tag).remove(index);
         }
       } else {
-        List<String> prunedRSList = prunedColLists.get(tag);
+        List<FieldNode> prunedRSList = prunedColLists.get(tag);
         if (prunedRSList == null) {
-          prunedRSList = new ArrayList<String>();
+          prunedRSList = new ArrayList<>();
           prunedColLists.put(tag, prunedRSList);
         }
-        prunedRSList = Utilities.mergeUniqElems(prunedRSList, desc.getCols());
+        prunedColLists.put(tag, mergeFieldNodesWithDesc(prunedRSList, desc));
         outputCols.add(internalName);
         newColExprMap.put(internalName, desc);
       }
@@ -1154,8 +1164,8 @@ public final class ColumnPrunerProcFactory {
         for (int j = 0; j < lists.size(); j++) {
           ExprNodeDesc desc = lists.get(j);
           Byte tag = entry.getKey();
-          List<String> cols = prunedColLists.get(tag);
-          cols = Utilities.mergeUniqElems(cols, desc.getCols());
+          List<FieldNode> cols = prunedColLists.get(tag);
+          cols = mergeFieldNodesWithDesc(cols, desc);
           prunedColLists.put(tag, cols);
         }
       }
@@ -1164,7 +1174,7 @@ public final class ColumnPrunerProcFactory {
 
     for (Operator<? extends OperatorDesc> child : childOperators) {
       if (child instanceof ReduceSinkOperator) {
-        boolean[] flags = getPruneReduceSinkOpRetainFlags(childColLists,
+        boolean[] flags = getPruneReduceSinkOpRetainFlags(toColumnNames(childColLists),
             (ReduceSinkOperator) child);
         pruneReduceSinkOperator(flags, (ReduceSinkOperator) child, cppCtx);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/a625bb03/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FieldNode.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FieldNode.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FieldNode.java
index 1579797..c96e1fb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FieldNode.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FieldNode.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
+import com.google.common.base.Preconditions;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -35,9 +37,21 @@ public class FieldNode {
     return fieldName;
   }
 
+  public void setFieldName(String fieldName) {
+    this.fieldName = fieldName;
+  }
+
   public void addFieldNodes(FieldNode... nodes) {
-    if (nodes != null || nodes.length > 0) {
-      this.nodes.addAll(Arrays.asList(nodes));
+    if (nodes != null) {
+      addFieldNodes(Arrays.asList(nodes));
+    }
+  }
+
+  public void addFieldNodes(List<FieldNode> nodes) {
+    for (FieldNode fn : nodes) {
+      if (fn != null) {
+        this.nodes.add(fn);
+      }
     }
   }
 
@@ -45,6 +59,95 @@ public class FieldNode {
     return nodes;
   }
 
+  public void setNodes(List<FieldNode> nodes) {
+    this.nodes = nodes;
+  }
+
+  public List<String> toPaths() {
+    List<String> result = new ArrayList<>();
+    if (nodes.isEmpty()) {
+      result.add(fieldName);
+    } else {
+      for (FieldNode child : nodes) {
+        for (String rest : child.toPaths()) {
+          result.add(fieldName + "." + rest);
+        }
+      }
+    }
+    return result;
+  }
+
+  public static FieldNode fromPath(String path) {
+    String[] parts = path.split("\\.");
+    return fromPath(parts, 0);
+  }
+
+  private static FieldNode fromPath(String[] parts, int index) {
+    if (index == parts.length) {
+      return null;
+    }
+    FieldNode fn = new FieldNode(parts[index]);
+    fn.addFieldNodes(fromPath(parts, index + 1));
+    return fn;
+  }
+
+  /**
+   * Merge the field node 'fn' into list 'nodes', and return the result list.
+   */
+  public static List<FieldNode> mergeFieldNodes(List<FieldNode> nodes, FieldNode fn) {
+    List<FieldNode> result = new ArrayList<>(nodes);
+    for (int i = 0; i < nodes.size(); ++i) {
+      FieldNode mfn = mergeFieldNode(nodes.get(i), fn);
+      if (mfn != null) {
+        result.set(i, mfn);
+        return result;
+      }
+    }
+    result.add(fn);
+    return result;
+  }
+
+  public static List<FieldNode> mergeFieldNodes(List<FieldNode> left, List<FieldNode> right) {
+    List<FieldNode> result = new ArrayList<>(left);
+    for (FieldNode fn : right) {
+      result = mergeFieldNodes(result, fn);
+    }
+    return result;
+  }
+
+  /**
+   * Merge the field nodes 'left' and 'right' and return the merged node.
+   * Return null if the two nodes cannot be merged.
+   *
+   * There are basically 3 cases here:
+   * 1. 'left' and 'right' have the same depth, e.g., 'left' is s[b[c]] and
+   *   'right' is s[b[d]]. In this case, the merged node is s[b[c,d]]
+   * 2. 'left' has larger depth than 'right', e.g., 'left' is s[b] while
+   *   'right' is s[b[d]]. In this case, the merged node is s[b]
+   * 3. 'left' has smaller depth than 'right', e.g., 'left' is s[b[c]] while
+   *   'right' is s[b]. This is the opposite case of 2), and similarly,
+   *   the merged node is s[b].
+   *
+   * A example where the two inputs cannot be merged is, 'left' is s[b] while
+   *   'right' is p[c].
+   */
+  public static FieldNode mergeFieldNode(FieldNode left, FieldNode right) {
+    Preconditions.checkArgument(left.getFieldName() != null && right.getFieldName() != null);
+    if (!left.getFieldName().equals(right.getFieldName())) {
+      return null;
+    }
+    if (left.getNodes().isEmpty()) {
+      return left;
+    } else if (right.getNodes().isEmpty()) {
+      return right;
+    } else {
+      // Both are not empty. Merge two lists.
+      FieldNode result = new FieldNode(left.getFieldName());
+      result.setNodes(mergeFieldNodes(left.getNodes(), right.getNodes()));
+      return result;
+    }
+  }
+
   @Override
   public String toString() {
     String res = fieldName;
@@ -63,18 +166,27 @@ public class FieldNode {
   }
 
   @Override
-  public boolean equals(Object object) {
-    FieldNode fieldNode = (FieldNode) object;
-    if (!fieldName.equals(fieldNode.getFieldName()) || fieldNode.getNodes().size() != fieldNode
-      .getNodes().size()) {
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
       return false;
     }
 
-    for (int i = 0; i < fieldNode.getNodes().size(); i++) {
-      if (fieldNode.getNodes().get(i).equals(nodes.get(i))) {
-        return false;
-      }
+    FieldNode fieldNode = (FieldNode) o;
+
+    if (fieldName != null ? !fieldName.equals(fieldNode.fieldName) : fieldNode.fieldName != null) {
+      return false;
     }
-    return true;
+    return nodes != null ? nodes.equals(fieldNode.nodes) : fieldNode.nodes == null;
+
+  }
+
+  @Override
+  public int hashCode() {
+    int result = fieldName != null ? fieldName.hashCode() : 0;
+    result = 31 * result + (nodes != null ? nodes.hashCode() : 0);
+    return result;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/a625bb03/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java
index 3d11907..dcea0e5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 
+import org.apache.hadoop.hive.ql.optimizer.FieldNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.Path;
@@ -213,7 +214,7 @@ public final class RewriteQueryUsingAggregateIndexCtx  implements NodeProcessorC
     rewriteQueryCtx.getParseContext().setTopOps(topOps);
 
     ColumnPrunerProcFactory.setupNeededColumns(scanOperator, rs,
-        Arrays.asList(rewriteQueryCtx.getIndexKey()));
+        Arrays.asList(new FieldNode(rewriteQueryCtx.getIndexKey())));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/a625bb03/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index d55db0a..42a7ab9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -9135,9 +9135,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         new HashMap<String, ExprNodeDesc>();
     for (int i = 0; i < columns.size(); i++) {
       ColumnInfo col = columns.get(i);
-      colList.add(new ExprNodeColumnDesc(col));
+      colList.add(new ExprNodeColumnDesc(col, true));
       columnNames.add(col.getInternalName());
-      columnExprMap.put(col.getInternalName(), new ExprNodeColumnDesc(col));
+      columnExprMap.put(col.getInternalName(), new ExprNodeColumnDesc(col, true));
     }
     RowResolver outputRR = inputRR.duplicate();
     Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(

http://git-wip-us.apache.org/repos/asf/hive/blob/a625bb03/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeColumnDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeColumnDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeColumnDesc.java
index 9a32054..4cfd0d7 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeColumnDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeColumnDesc.java
@@ -54,36 +54,60 @@ public class ExprNodeColumnDesc extends ExprNodeDesc implements Serializable {
    */
   private boolean isSkewedCol;
 
+  /**
+   * Is this column a generated column, i.e., a column
+   * that is generated from table schema when Hive inserting SEL op for column pruning.
+   * This column has no relation with the input query.
+   *
+   * This is used for nested column pruning where we could have the following scenario:
+   *   ...
+   *    |
+   *   SEL (use a)
+   *    |
+   *   OP (use a.f)
+   *    |
+   *   ...
+   * Without this field we do not know whether the column 'a' is actually specified in
+   * the input query or an inserted op by Hive. For the former case, the pruning needs
+   * to produce 'a', while for the latter case, it should produce 'a.f'.
+   */
+  private transient boolean isGenerated;
+
   public ExprNodeColumnDesc() {
   }
 
   public ExprNodeColumnDesc(ColumnInfo ci) {
-    this(ci.getType(), ci.getInternalName(), ci.getTabAlias(), ci.getIsVirtualCol());
+    this(ci, false);
+  }
+
+  public ExprNodeColumnDesc(ColumnInfo ci, boolean isGenerated) {
+    this(ci.getType(), ci.getInternalName(), ci.getTabAlias(), ci.getIsVirtualCol(), false, isGenerated);
   }
 
   public ExprNodeColumnDesc(TypeInfo typeInfo, String column, String tabAlias,
       boolean isPartitionColOrVirtualCol) {
-    super(typeInfo);
-    this.column = column;
-    this.tabAlias = tabAlias;
-    this.isPartitionColOrVirtualCol = isPartitionColOrVirtualCol;
+    this(typeInfo, column, tabAlias, isPartitionColOrVirtualCol, false, false);
   }
 
   public ExprNodeColumnDesc(Class<?> c, String column, String tabAlias,
       boolean isPartitionColOrVirtualCol) {
-    super(TypeInfoFactory.getPrimitiveTypeInfoFromJavaPrimitive(c));
-    this.column = column;
-    this.tabAlias = tabAlias;
-    this.isPartitionColOrVirtualCol = isPartitionColOrVirtualCol;
+    this(TypeInfoFactory.getPrimitiveTypeInfoFromJavaPrimitive(c),
+        column, tabAlias, isPartitionColOrVirtualCol, false, false);
   }
 
   public ExprNodeColumnDesc(TypeInfo typeInfo, String column, String tabAlias,
       boolean isPartitionColOrVirtualCol, boolean isSkewedCol) {
+    this(typeInfo, column, tabAlias, isPartitionColOrVirtualCol, isSkewedCol, false);
+  }
+
+  public ExprNodeColumnDesc(TypeInfo typeInfo, String column, String tabAlias,
+      boolean isPartitionColOrVirtualCol, boolean isSkewedCol, boolean isGenerated) {
     super(typeInfo);
     this.column = column;
     this.tabAlias = tabAlias;
     this.isPartitionColOrVirtualCol = isPartitionColOrVirtualCol;
     this.isSkewedCol = isSkewedCol;
+    this.isGenerated = isGenerated;
   }
 
   public String getColumn() {
@@ -110,6 +134,14 @@ public class ExprNodeColumnDesc extends ExprNodeDesc implements Serializable {
     this.isPartitionColOrVirtualCol = isPartitionCol;
   }
 
+  public boolean getIsGenerated() {
+    return this.isGenerated;
+  }
+
+  public void setIsGenerated(boolean isGenerated) {
+    this.isGenerated = isGenerated;
+  }
+
   @Override
   public String toString() {
     return "Column[" + column + "]";

http://git-wip-us.apache.org/repos/asf/hive/blob/a625bb03/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
index 1da8e91..4f053d8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
@@ -199,6 +199,10 @@ public class TableDesc implements Serializable, Cloneable {
 
   @Override
   public boolean equals(Object o) {
+    if (o == this) {
+      return true;
+    }
+
     if (!(o instanceof TableDesc)) {
       return false;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/a625bb03/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/read/TestDataWritableReadSupport.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/read/TestDataWritableReadSupport.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/read/TestDataWritableReadSupport.java
index b3aaca6..fc08ea6 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/read/TestDataWritableReadSupport.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/read/TestDataWritableReadSupport.java
@@ -13,11 +13,13 @@
  */
 package org.apache.hadoop.hive.ql.io.parquet.read;
 
+import com.google.common.collect.Sets;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.MessageTypeParser;
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.HashSet;
 
 import static org.apache.hadoop.hive.ql.io.parquet.HiveParquetSchemaTestUtils.testConversion;
 
@@ -36,7 +38,7 @@ public class TestDataWritableReadSupport {
 
     testConversion("structCol", "struct<a:int>", DataWritableReadSupport
       .getProjectedSchema(originalMsg, Arrays.asList("structCol"), Arrays.asList(0),
-        Arrays.asList("structCol.a")).toString());
+        Sets.newHashSet("structCol.a")).toString());
   }
 
   @Test
@@ -51,7 +53,7 @@ public class TestDataWritableReadSupport {
 
     testConversion("structCol", "struct<a:int,b:double>", DataWritableReadSupport
       .getProjectedSchema(originalMsg, Arrays.asList("structCol"), Arrays.asList(0),
-        Arrays.asList("structCol.a", "structCol.b")).toString());
+        Sets.newHashSet("structCol.a", "structCol.b")).toString());
   }
 
   @Test
@@ -67,7 +69,7 @@ public class TestDataWritableReadSupport {
 
     testConversion("structCol,c", "struct<b:double>,boolean", DataWritableReadSupport
       .getProjectedSchema(originalMsg, Arrays.asList("structCol", "c"), Arrays.asList(0, 1),
-        Arrays.asList("structCol.b", "c")).toString());
+        Sets.newHashSet("structCol.b", "c")).toString());
   }
 
   @Test
@@ -86,7 +88,7 @@ public class TestDataWritableReadSupport {
 
     testConversion("structCol", "struct<subStructCol:struct<b:bigint>>", DataWritableReadSupport
       .getProjectedSchema(originalMsg, Arrays.asList("structCol"), Arrays.asList(0),
-        Arrays.asList("structCol.subStructCol.b")).toString());
+        Sets.newHashSet("structCol.subStructCol.b")).toString());
   }
 
   @Test
@@ -105,8 +107,8 @@ public class TestDataWritableReadSupport {
 
     testConversion("structCol", "struct<subStructCol:struct<b:bigint,c:boolean>>",
       DataWritableReadSupport
-        .getProjectedSchema(originalMsg, Arrays.asList("structCol"), Arrays.asList(0), Arrays
-            .asList("structCol.subStructCol", "structCol.subStructCol.b",
-              "structCol.subStructCol.c")).toString());
+        .getProjectedSchema(originalMsg, Arrays.asList("structCol"), Arrays.asList(0),
+          Sets.newHashSet("structCol.subStructCol", "structCol.subStructCol.b",
+            "structCol.subStructCol.c")).toString());
   }
 }


Mime
View raw message