hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From na...@apache.org
Subject svn commit: r1374389 [1/2] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ conf/ hbase-handler/src/test/results/positive/ ql/src/java/org/apache/hadoop/hive/ql/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hi...
Date Fri, 17 Aug 2012 18:28:58 GMT
Author: namit
Date: Fri Aug 17 18:28:56 2012
New Revision: 1374389

URL: http://svn.apache.org/viewvc?rev=1374389&view=rev
Log:
HIVE-2925 - Support non-MR fetching for simple queries with select/limit/filter operations only (Navis via namit)


Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ListSinkDesc.java
    hive/trunk/ql/src/test/queries/clientpositive/nonmr_fetch.q
    hive/trunk/ql/src/test/results/clientpositive/nonmr_fetch.q.out
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedListObjectInspector.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedMapObjectInspector.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedObjectInspectorFactory.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedStructObjectInspector.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedUnionObjectInspector.java
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/conf/hive-default.xml.template
    hive/trunk/hbase-handler/src/test/results/positive/hbase_ppd_key_range.q.out
    hive/trunk/hbase-handler/src/test/results/positive/hbase_pushdown.q.out
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapperContext.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GlobalLimitCtx.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
    hive/trunk/ql/src/test/queries/clientpositive/partition_wise_fileformat2.q
    hive/trunk/ql/src/test/results/clientpositive/input.q.out
    hive/trunk/ql/src/test/results/clientpositive/input0.q.out
    hive/trunk/ql/src/test/results/clientpositive/input_limit.q.out
    hive/trunk/ql/src/test/results/clientpositive/input_part0.q.out
    hive/trunk/ql/src/test/results/clientpositive/input_part3.q.out
    hive/trunk/ql/src/test/results/clientpositive/input_part4.q.out
    hive/trunk/ql/src/test/results/clientpositive/input_part8.q.out
    hive/trunk/ql/src/test/results/clientpositive/partition_wise_fileformat2.q.out
    hive/trunk/ql/src/test/results/clientpositive/ppr_pushdown3.q.out
    hive/trunk/ql/src/test/results/clientpositive/regex_col.q.out
    hive/trunk/ql/src/test/results/clientpositive/source.q.out

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1374389&r1=1374388&r2=1374389&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Fri Aug 17 18:28:56 2012
@@ -549,6 +549,9 @@ public class HiveConf extends Configurat
 
     HIVEOUTERJOINSUPPORTSFILTERS("hive.outerjoin.supports.filters", true),
 
+    // 'minimal', 'more' (and 'all' later)
+    HIVEFETCHTASKCONVERSION("hive.fetch.task.conversion", "minimal"),
+
     // Serde for FetchTask
     HIVEFETCHOUTPUTSERDE("hive.fetch.output.serde", "org.apache.hadoop.hive.serde2.DelimitedJSONSerDe"),
 

Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1374389&r1=1374388&r2=1374389&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Fri Aug 17 18:28:56 2012
@@ -1412,5 +1412,17 @@
   </description>
 </property>
 
+<property>
+  <name>hive.fetch.task.conversion</name>
+  <value>minimal</value>
+  <description>
+    Some select queries can be converted to single FETCH task minimizing latency.
+    Currently the query should be single sourced not having any subquery and should not have
+    any aggregations or distincts (which incurrs RS), lateral views and joins.
+    1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only
+    2. more    : SELECT, FILTER, LIMIT only (+TABLESAMPLE, virtual columns)
+  </description>
+</property>
+
 </configuration>
 

Modified: hive/trunk/hbase-handler/src/test/results/positive/hbase_ppd_key_range.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/results/positive/hbase_ppd_key_range.q.out?rev=1374389&r1=1374388&r2=1374389&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/test/results/positive/hbase_ppd_key_range.q.out (original)
+++ hive/trunk/hbase-handler/src/test/results/positive/hbase_ppd_key_range.q.out Fri Aug 17 18:28:56 2012
@@ -441,6 +441,17 @@ STAGE PLANS:
   Stage: Stage-0
     Fetch Operator
       limit: -1
+      Processor Tree:
+        TableScan
+          alias: hbase_pushdown
+          Select Operator
+            expressions:
+                  expr: key
+                  type: string
+                  expr: value
+                  type: string
+            outputColumnNames: _col0, _col1
+            ListSink
 
 
 PREHOOK: query: -- with a predicate which is not actually part of the filter, so

Modified: hive/trunk/hbase-handler/src/test/results/positive/hbase_pushdown.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/results/positive/hbase_pushdown.q.out?rev=1374389&r1=1374388&r2=1374389&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/test/results/positive/hbase_pushdown.q.out (original)
+++ hive/trunk/hbase-handler/src/test/results/positive/hbase_pushdown.q.out Fri Aug 17 18:28:56 2012
@@ -256,6 +256,17 @@ STAGE PLANS:
   Stage: Stage-0
     Fetch Operator
       limit: -1
+      Processor Tree:
+        TableScan
+          alias: hbase_pushdown
+          Select Operator
+            expressions:
+                  expr: key
+                  type: int
+                  expr: value
+                  type: string
+            outputColumnNames: _col0, _col1
+            ListSink
 
 
 PREHOOK: query: -- with a predicate which is not actually part of the filter, so

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1374389&r1=1374388&r2=1374389&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Fri Aug 17 18:28:56 2012
@@ -442,13 +442,6 @@ public class Driver implements CommandPr
       sem.validate();
 
       plan = new QueryPlan(command, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN));
-      // initialize FetchTask right here
-      if (plan.getFetchTask() != null) {
-        plan.getFetchTask().initialize(conf, plan, null);
-      }
-
-      // get the output schema
-      schema = getSchema(sem, conf);
 
       // test Only - serialize the query plan and deserialize it
       if ("true".equalsIgnoreCase(System.getProperty("test.serialize.qplan"))) {
@@ -477,6 +470,9 @@ public class Driver implements CommandPr
         plan.getFetchTask().initialize(conf, plan, null);
       }
 
+      // get the output schema
+      schema = getSchema(sem, conf);
+
       //do the authorization check
       if (HiveConf.getBoolVar(conf,
           HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapperContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapperContext.java?rev=1374389&r1=1374388&r2=1374389&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapperContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapperContext.java Fri Aug 17 18:28:56 2012
@@ -68,7 +68,10 @@ public class ExecMapperContext {
     ioCxt = IOContext.get();
   }
 
-
+  public void clear() {
+    IOContext.clear();
+    ioCxt = null;
+  }
 
   /**
    * For CompbineFileInputFormat, the mapper's input file will be changed on the
@@ -174,5 +177,4 @@ public class ExecMapperContext {
   public void setIoCxt(IOContext ioCxt) {
     this.ioCxt = ioCxt;
   }
-
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java?rev=1374389&r1=1374388&r2=1374389&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java Fri Aug 17 18:28:56 2012
@@ -35,13 +35,19 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.io.HiveRecordReader;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.parse.SplitSample;
 import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.DelegatedObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
@@ -66,40 +72,83 @@ public class FetchOperator implements Se
 
   private boolean isNativeTable;
   private FetchWork work;
+  private Operator<?> operator;    // operator tree for processing row further (option)
   private int splitNum;
   private PartitionDesc currPart;
   private TableDesc currTbl;
   private boolean tblDataDone;
 
+  private boolean hasVC;
+  private boolean isPartitioned;
+  private StructObjectInspector vcsOI;
+  private List<VirtualColumn> vcCols;
+  private ExecMapperContext context;
+
   private transient RecordReader<WritableComparable, Writable> currRecReader;
-  private transient InputSplit[] inputSplits;
+  private transient FetchInputFormatSplit[] inputSplits;
   private transient InputFormat inputFormat;
   private transient JobConf job;
   private transient WritableComparable key;
   private transient Writable value;
+  private transient Writable[] vcValues;
   private transient Deserializer serde;
   private transient Iterator<Path> iterPath;
   private transient Iterator<PartitionDesc> iterPartDesc;
   private transient Path currPath;
+  private transient StructObjectInspector objectInspector;
   private transient StructObjectInspector rowObjectInspector;
-  private transient Object[] rowWithPart;
+  private transient Object[] row;
+
   public FetchOperator() {
   }
 
   public FetchOperator(FetchWork work, JobConf job) {
+    this.job = job;
     this.work = work;
-    initialize(job);
+    initialize();
   }
 
-  public void initialize(JobConf job) {
+  public FetchOperator(FetchWork work, JobConf job, Operator<?> operator,
+      List<VirtualColumn> vcCols) {
     this.job = job;
+    this.work = work;
+    this.operator = operator;
+    this.vcCols = vcCols;
+    initialize();
+  }
+
+  private void initialize() {
+    if (hasVC = vcCols != null && !vcCols.isEmpty()) {
+      List<String> names = new ArrayList<String>(vcCols.size());
+      List<ObjectInspector> inspectors = new ArrayList<ObjectInspector>(vcCols.size());
+      for (VirtualColumn vc : vcCols) {
+        inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(
+                vc.getTypeInfo().getPrimitiveCategory()));
+        names.add(vc.getName());
+      }
+      vcsOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, inspectors);
+      vcValues = new Writable[vcCols.size()];
+    }
+    isPartitioned = work.isPartitioned();
     tblDataDone = false;
-    rowWithPart = new Object[2];
-    if (work.getTblDir() != null) {
+    if (hasVC && isPartitioned) {
+      row = new Object[3];
+    } else if (hasVC || isPartitioned) {
+      row = new Object[2];
+    } else {
+      row = new Object[1];
+    }
+    if (work.getTblDesc() != null) {
       isNativeTable = !work.getTblDesc().isNonNative();
     } else {
       isNativeTable = true;
     }
+    if (hasVC || work.getSplitSample() != null) {
+      context = new ExecMapperContext();
+      if (operator != null) {
+        operator.setExecContext(context);
+      }
+    }
   }
 
   public FetchWork getWork() {
@@ -151,6 +200,7 @@ public class FetchOperator implements Se
    */
   private static Map<Class, InputFormat<WritableComparable, Writable>> inputFormats = new HashMap<Class, InputFormat<WritableComparable, Writable>>();
 
+  @SuppressWarnings("unchecked")
   static InputFormat<WritableComparable, Writable> getInputFormatFromCache(Class inputFormatClass,
       Configuration conf) throws IOException {
     if (!inputFormats.containsKey(inputFormatClass)) {
@@ -166,17 +216,50 @@ public class FetchOperator implements Se
     return inputFormats.get(inputFormatClass);
   }
 
-  private void setPrtnDesc(TableDesc table, Map<String, String> partSpec) throws Exception {
+  private StructObjectInspector setTableDesc(TableDesc table) throws Exception {
+    Deserializer serde = table.getDeserializerClass().newInstance();
+    serde.initialize(job, table.getProperties());
+    return createRowInspector(getCurrent(serde));
+  }
+
+  private StructObjectInspector setPrtnDesc(PartitionDesc partition) throws Exception {
+    Deserializer serde = partition.getDeserializerClass().newInstance();
+    serde.initialize(job, partition.getProperties());
+    String pcols = partition.getTableDesc().getProperties().getProperty(
+        org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_PARTITION_COLUMNS);
+    String[] partKeys = pcols.trim().split("/");
+    row[1] = createPartValue(partKeys, partition.getPartSpec());
+    return createRowInspector(getCurrent(serde), partKeys);
+  }
+
+  private StructObjectInspector setPrtnDesc(TableDesc table) throws Exception {
+    Deserializer serde = table.getDeserializerClass().newInstance();
+    serde.initialize(job, table.getProperties());
     String pcols = table.getProperties().getProperty(
         org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_PARTITION_COLUMNS);
     String[] partKeys = pcols.trim().split("/");
-    if (partSpec != null) {
-      rowWithPart[1] = createPartValue(partKeys, partSpec);
+    row[1] = null;
+    return createRowInspector(getCurrent(serde), partKeys);
+  }
+
+  private StructObjectInspector getCurrent(Deserializer serde) throws SerDeException {
+    ObjectInspector current = serde.getObjectInspector();
+    if (objectInspector != null) {
+      current = DelegatedObjectInspectorFactory.reset(objectInspector, current);
+    } else {
+      current = DelegatedObjectInspectorFactory.wrap(current);
     }
-    rowObjectInspector = createRowInspector(partKeys);
+    return objectInspector = (StructObjectInspector) current;
+  }
+
+  private StructObjectInspector createRowInspector(StructObjectInspector current)
+      throws SerDeException {
+    return hasVC ? ObjectInspectorFactory.getUnionStructObjectInspector(
+        Arrays.asList(current, vcsOI)) : current;
   }
 
-  private StructObjectInspector createRowInspector(String[] partKeys) throws SerDeException {
+  private StructObjectInspector createRowInspector(StructObjectInspector current, String[] partKeys)
+      throws SerDeException {
     List<String> partNames = new ArrayList<String>();
     List<ObjectInspector> partObjectInspectors = new ArrayList<ObjectInspector>();
     for (String key : partKeys) {
@@ -185,10 +268,10 @@ public class FetchOperator implements Se
     }
     StructObjectInspector partObjectInspector = ObjectInspectorFactory
         .getStandardStructObjectInspector(partNames, partObjectInspectors);
-    StructObjectInspector inspector = (StructObjectInspector) serde.getObjectInspector();
 
     return ObjectInspectorFactory.getUnionStructObjectInspector(
-        Arrays.asList(inspector, partObjectInspector));
+        hasVC ? Arrays.asList(current, partObjectInspector, vcsOI) :
+            Arrays.asList(current, partObjectInspector));
   }
 
   private List<String> createPartValue(String[] partKeys, Map<String, String> partSpec) {
@@ -202,7 +285,7 @@ public class FetchOperator implements Se
   private void getNextPath() throws Exception {
     // first time
     if (iterPath == null) {
-      if (work.getTblDir() != null) {
+      if (work.isNotPartitioned()) {
         if (!tblDataDone) {
           currPath = work.getTblDirPath();
           currTbl = work.getTblDesc();
@@ -280,9 +363,19 @@ public class FetchOperator implements Se
         tmp = new PartitionDesc(currTbl, null);
       }
 
-      inputFormat = getInputFormatFromCache(tmp.getInputFileFormatClass(), job);
+      Class<? extends InputFormat> formatter = tmp.getInputFileFormatClass();
+      inputFormat = getInputFormatFromCache(formatter, job);
       Utilities.copyTableJobPropertiesToConf(tmp.getTableDesc(), job);
-      inputSplits = inputFormat.getSplits(job, 1);
+      InputSplit[] splits = inputFormat.getSplits(job, 1);
+      FetchInputFormatSplit[] inputSplits = new FetchInputFormatSplit[splits.length];
+      for (int i = 0; i < splits.length; i++) {
+        inputSplits[i] = new FetchInputFormatSplit(splits[i], formatter.getName());
+      }
+      if (work.getSplitSample() != null) {
+        inputSplits = splitSampling(work.getSplitSample(), inputSplits);
+      }
+      this.inputSplits = inputSplits;
+
       splitNum = 0;
       serde = tmp.getDeserializerClass().newInstance();
       serde.initialize(job, tmp.getProperties());
@@ -294,7 +387,7 @@ public class FetchOperator implements Se
       }
 
       if (currPart != null) {
-        setPrtnDesc(currPart.getTableDesc(), currPart.getPartSpec());
+        setPrtnDesc(currPart);
       }
     }
 
@@ -307,12 +400,74 @@ public class FetchOperator implements Se
       return getRecordReader();
     }
 
-    currRecReader = inputFormat.getRecordReader(inputSplits[splitNum++], job, Reporter.NULL);
+    final FetchInputFormatSplit target = inputSplits[splitNum];
+
+    @SuppressWarnings("unchecked")
+    final RecordReader<WritableComparable, Writable> reader =
+        inputFormat.getRecordReader(target.getInputSplit(), job, Reporter.NULL);
+    if (hasVC || work.getSplitSample() != null) {
+      currRecReader = new HiveRecordReader<WritableComparable, Writable>(reader, job) {
+        @Override
+        public boolean doNext(WritableComparable key, Writable value) throws IOException {
+          // if current pos is larger than shrinkedLength which is calculated for
+          // each split by table sampling, stop fetching any more (early exit)
+          if (target.shrinkedLength > 0 &&
+              context.getIoCxt().getCurrentBlockStart() > target.shrinkedLength) {
+            return false;
+          }
+          return super.doNext(key, value);
+        }
+      };
+      ((HiveContextAwareRecordReader)currRecReader).
+          initIOContext(target, job, inputFormat.getClass(), reader);
+    } else {
+      currRecReader = reader;
+    }
+    splitNum++;
     key = currRecReader.createKey();
     value = currRecReader.createValue();
     return currRecReader;
   }
 
+  private FetchInputFormatSplit[] splitSampling(SplitSample splitSample,
+      FetchInputFormatSplit[] splits) {
+    long totalSize = 0;
+    for (FetchInputFormatSplit split: splits) {
+        totalSize += split.getLength();
+    }
+    List<FetchInputFormatSplit> result = new ArrayList<FetchInputFormatSplit>();
+    long targetSize = (long) (totalSize * splitSample.getPercent() / 100D);
+    int startIndex = splitSample.getSeedNum() % splits.length;
+    long size = 0;
+    for (int i = 0; i < splits.length; i++) {
+      FetchInputFormatSplit split = splits[(startIndex + i) % splits.length];
+      result.add(split);
+      long splitgLength = split.getLength();
+      if (size + splitgLength >= targetSize) {
+        if (size + splitgLength > targetSize) {
+          split.shrinkedLength = targetSize - size;
+        }
+        break;
+      }
+      size += splitgLength;
+    }
+    return result.toArray(new FetchInputFormatSplit[result.size()]);
+  }
+
+  /**
+   * Get the next row and push down it to operator tree.
+   * Currently only used by FetchTask.
+   **/
+  public boolean pushRow() throws IOException, HiveException {
+    InspectableObject row = getNextRow();
+    if (row != null) {
+      operator.process(row.o, 0);
+    }
+    return row != null;
+  }
+
+  private transient final InspectableObject inspectable = new InspectableObject();
+
   /**
    * Get the next row. The fetch context is modified appropriately.
    *
@@ -320,6 +475,9 @@ public class FetchOperator implements Se
   public InspectableObject getNextRow() throws IOException {
     try {
       while (true) {
+        if (context != null) {
+          context.resetRow();
+        }
         if (currRecReader == null) {
           currRecReader = getRecordReader();
           if (currRecReader == null) {
@@ -329,13 +487,27 @@ public class FetchOperator implements Se
 
         boolean ret = currRecReader.next(key, value);
         if (ret) {
-          if (this.currPart == null) {
-            Object obj = serde.deserialize(value);
-            return new InspectableObject(obj, serde.getObjectInspector());
-          } else {
-            rowWithPart[0] = serde.deserialize(value);
-            return new InspectableObject(rowWithPart, rowObjectInspector);
+          if (operator != null && context != null && context.inputFileChanged()) {
+            // The child operators cleanup if input file has changed
+            try {
+              operator.cleanUpInputFileChanged();
+            } catch (HiveException e) {
+              throw new IOException(e);
+            }
+          }
+          if (hasVC) {
+            vcValues = MapOperator.populateVirtualColumnValues(context, vcCols, vcValues, serde);
+            row[isPartitioned ? 2 : 1] = vcValues;
+          }
+          row[0] = serde.deserialize(value);
+          if (hasVC || isPartitioned) {
+            inspectable.o = row;
+            inspectable.oi = rowObjectInspector;
+            return inspectable;
           }
+          inspectable.o = row[0];
+          inspectable.oi = serde.getObjectInspector();
+          return inspectable;
         } else {
           currRecReader.close();
           currRecReader = null;
@@ -356,6 +528,14 @@ public class FetchOperator implements Se
         currRecReader.close();
         currRecReader = null;
       }
+      if (operator != null) {
+        operator.close(false);
+        operator = null;
+      }
+      if (context != null) {
+        context.clear();
+        context = null;
+      }
       this.currPath = null;
       this.iterPath = null;
       this.iterPartDesc = null;
@@ -373,7 +553,7 @@ public class FetchOperator implements Se
     this.iterPath = iterPath;
     this.iterPartDesc = iterPartDesc;
     if (iterPartDesc == null) {
-      if (work.getTblDir() != null) {
+      if (work.isNotPartitioned()) {
         this.currTbl = work.getTblDesc();
       } else {
         // hack, get the first.
@@ -388,31 +568,19 @@ public class FetchOperator implements Se
    */
   public ObjectInspector getOutputObjectInspector() throws HiveException {
     try {
-      if (work.getTblDir() != null) {
-        TableDesc tbl = work.getTblDesc();
-        Deserializer serde = tbl.getDeserializerClass().newInstance();
-        serde.initialize(job, tbl.getProperties());
-        return serde.getObjectInspector();
+      if (work.isNotPartitioned()) {
+        return setTableDesc(work.getTblDesc());
       }
-      TableDesc tbl;
-      Map<String, String> partSpec;
       List<PartitionDesc> listParts = work.getPartDesc();
       if (listParts == null || listParts.isEmpty()) {
-        tbl = work.getTblDesc();
-        partSpec = null;
-      } else {
-        currPart = listParts.get(0);
-        tbl = currPart.getTableDesc();
-        partSpec = currPart.getPartSpec();
-      }
-      serde = tbl.getDeserializerClass().newInstance();
-      serde.initialize(job, tbl.getProperties());
-      setPrtnDesc(tbl, partSpec);
-      currPart = null;
-      return rowObjectInspector;
+        return setPrtnDesc(work.getTblDesc());
+      }
+      return setPrtnDesc(listParts.get(0));
     } catch (Exception e) {
       throw new HiveException("Failed with exception " + e.getMessage()
           + org.apache.hadoop.util.StringUtils.stringifyException(e));
+    } finally {
+      currPart = null;
     }
   }
 
@@ -440,4 +608,20 @@ public class FetchOperator implements Se
     }
     return results.toArray(new FileStatus[results.size()]);
   }
+
+  // for split sampling. shrinkedLength is checked against IOContext.getCurrentBlockStart,
+  // which is from RecordReader.getPos(). So some inputformats which does not support getPos()
+  // like HiveHBaseTableInputFormat cannot be used with this (todo)
+  private static class FetchInputFormatSplit extends HiveInputFormat.HiveInputSplit {
+
+    // shrinked size for this split. counter part of this in normal mode is
+    // InputSplitShim.shrinkedLength.
+    // what's different is that this is evaluated by unit of row using RecordReader.getPos()
+    // and that is evaluated by unit of split using InputSplt.getLength().
+    private long shrinkedLength = -1;
+
+    public FetchInputFormatSplit(InputSplit split, String name) {
+      super(split, name);
+    }
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java?rev=1374389&r1=1374388&r2=1374389&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java Fri Aug 17 18:28:56 2012
@@ -21,27 +21,24 @@ package org.apache.hadoop.hive.ql.exec;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Properties;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
-import org.apache.hadoop.hive.serde.Constants;
-import org.apache.hadoop.hive.serde2.DelimitedJSONSerDe;
-import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -51,9 +48,10 @@ public class FetchTask extends Task<Fetc
   private static final long serialVersionUID = 1L;
 
   private int maxRows = 100;
-  private FetchOperator ftOp;
-  private SerDe mSerde;
+  private FetchOperator fetch;
+  private ListSinkOperator sink;
   private int totalRows;
+
   private static transient final Log LOG = LogFactory.getLog(FetchTask.class);
 
   public FetchTask() {
@@ -63,28 +61,22 @@ public class FetchTask extends Task<Fetc
   @Override
   public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) {
     super.initialize(conf, queryPlan, ctx);
+    work.initializeForFetch();
 
     try {
       // Create a file system handle
       JobConf job = new JobConf(conf, ExecDriver.class);
 
-      String serdeName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE);
-      Class<? extends SerDe> serdeClass = Class.forName(serdeName, true,
-          JavaUtils.getClassLoader()).asSubclass(SerDe.class);
-      // cast only needed for Hadoop 0.17 compatibility
-      mSerde = (SerDe) ReflectionUtils.newInstance(serdeClass, null);
-
-      Properties serdeProp = new Properties();
-
-      // this is the default serialization format
-      if (mSerde instanceof DelimitedJSONSerDe) {
-        serdeProp.put(Constants.SERIALIZATION_FORMAT, "" + Utilities.tabCode);
-        serdeProp.put(Constants.SERIALIZATION_NULL_FORMAT, work.getSerializationNullFormat());
+      Operator<?> source = work.getSource();
+      if (source instanceof TableScanOperator) {
+        TableScanOperator ts = (TableScanOperator) source;
+        HiveInputFormat.pushFilters(job, ts);
+        ColumnProjectionUtils.appendReadColumnIDs(job, ts.getNeededColumnIDs());
       }
+      sink = work.getSink();
+      fetch = new FetchOperator(work, job, source, getVirtualColumns(source));
+      source.initialize(conf, new ObjectInspector[]{fetch.getOutputObjectInspector()});
 
-      mSerde.initialize(job, serdeProp);
-
-      ftOp = new FetchOperator(work, job);
     } catch (Exception e) {
       // Bail out ungracefully - we should never hit
       // this here - but would have hit it in SemanticAnalyzer
@@ -93,6 +85,13 @@ public class FetchTask extends Task<Fetc
     }
   }
 
+  private List<VirtualColumn> getVirtualColumns(Operator<?> ts) {
+    if (ts instanceof TableScanOperator && ts.getConf() != null) {
+      return ((TableScanOperator)ts).getConf().getVirtualCols();
+    }
+    return null;
+  }
+
   @Override
   public int execute(DriverContext driverContext) {
     assert false;
@@ -122,48 +121,26 @@ public class FetchTask extends Task<Fetc
 
   @Override
   public boolean fetch(ArrayList<String> res) throws IOException, CommandNeedRetryException {
+    sink.reset(res);
     try {
-      int numRows = 0;
-      int rowsRet = maxRows;
-
-      if (work.getLeastNumRows() > 0) {
-        if (totalRows == work.getLeastNumRows()) {
-          return false;
-        }
-        for (int i = 0; i < work.getLeastNumRows(); i++) {
-          InspectableObject io = ftOp.getNextRow();
-          if (io == null) {
-            throw new CommandNeedRetryException();
-          }
-          res.add(((Text) mSerde.serialize(io.o, io.oi)).toString());
-          numRows++;
-        }
-        totalRows = work.getLeastNumRows();
-        return true;
-      }
-
-      if ((work.getLimit() >= 0) && ((work.getLimit() - totalRows) < rowsRet)) {
-        rowsRet = work.getLimit() - totalRows;
+      int rowsRet = work.getLeastNumRows();
+      if (rowsRet <= 0) {
+        rowsRet = work.getLimit() >= 0 ? Math.min(work.getLimit() - totalRows, maxRows) : maxRows;
       }
       if (rowsRet <= 0) {
-        ftOp.clearFetchContext();
+        fetch.clearFetchContext();
         return false;
       }
-
-      while (numRows < rowsRet) {
-        InspectableObject io = ftOp.getNextRow();
-        if (io == null) {
-          if (numRows == 0) {
-            return false;
+      boolean fetched = false;
+      while (sink.getNumRows() < rowsRet) {
+        if (!fetch.pushRow()) {
+          if (work.getLeastNumRows() > 0) {
+            throw new CommandNeedRetryException();
           }
-          totalRows += numRows;
-          return true;
+          return fetched;
         }
-
-        res.add(((Text) mSerde.serialize(io.o, io.oi)).toString());
-        numRows++;
+        fetched = true;
       }
-      totalRows += numRows;
       return true;
     } catch (CommandNeedRetryException e) {
       throw e;
@@ -171,6 +148,8 @@ public class FetchTask extends Task<Fetc
       throw e;
     } catch (Exception e) {
       throw new IOException(e);
+    } finally {
+      totalRows += sink.getNumRows();
     }
   }
 
@@ -203,8 +182,8 @@ public class FetchTask extends Task<Fetc
    * @throws HiveException
    */
   public void clearFetch() throws HiveException {
-    if (null != ftOp) {
-      ftOp.clearFetchContext();
+    if (fetch != null) {
+      fetch.clearFetchContext();
     }
   }
 }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java?rev=1374389&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java Fri Aug 17 18:28:56 2012
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.util.ArrayList;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ListSinkDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.DelimitedJSONSerDe;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * For fetch task with operator tree, row read from FetchOperator is processed via operator tree
+ * and finally arrives to this operator.
+ */
+public class ListSinkOperator extends Operator<ListSinkDesc> {
+
+  private transient SerDe mSerde;
+
+  private transient ArrayList<String> res;
+  private transient int numRows;
+
+  @Override
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    try {
+      mSerde = initializeSerde(hconf);
+      initializeChildren(hconf);
+    } catch (Exception e) {
+      throw new HiveException(e);
+    }
+  }
+
+  private SerDe initializeSerde(Configuration conf) throws Exception {
+    String serdeName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE);
+    Class<? extends SerDe> serdeClass = Class.forName(serdeName, true,
+        JavaUtils.getClassLoader()).asSubclass(SerDe.class);
+    // cast only needed for Hadoop 0.17 compatibility
+    SerDe serde = ReflectionUtils.newInstance(serdeClass, null);
+
+    Properties serdeProp = new Properties();
+
+    // this is the default serialization format
+    if (serde instanceof DelimitedJSONSerDe) {
+      serdeProp.put(Constants.SERIALIZATION_FORMAT, "" + Utilities.tabCode);
+      serdeProp.put(Constants.SERIALIZATION_NULL_FORMAT, getConf().getSerializationNullFormat());
+    }
+    serde.initialize(conf, serdeProp);
+    return serde;
+  }
+
+  public ListSinkOperator initialize(SerDe mSerde) {
+    this.mSerde = mSerde;
+    return this;
+  }
+
+  public void reset(ArrayList<String> res) {
+    this.res = res;
+    this.numRows = 0;
+  }
+
+  public int getNumRows() {
+    return numRows;
+  }
+
+  public void processOp(Object row, int tag) throws HiveException {
+    try {
+      res.add(mSerde.serialize(row, outputObjInspector).toString());
+      numRows++;
+    } catch (SerDeException e) {
+      throw new HiveException(e);
+    }
+  }
+
+  public OperatorType getType() {
+    return OperatorType.FORWARD;
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1374389&r1=1374388&r2=1374389&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Fri Aug 17 18:28:56 2012
@@ -33,7 +33,6 @@ import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.io.IOContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
@@ -494,13 +493,16 @@ public class MapOperator extends Operato
       // The child operators cleanup if input file has changed
       cleanUpInputFileChanged();
     }
+    ExecMapperContext context = getExecContext();
 
     Object row = null;
     try {
       if (this.hasVC) {
         this.rowWithPartAndVC[0] = deserializer.deserialize(value);
         int vcPos = isPartitioned ? 2 : 1;
-        populateVirtualColumnValues();
+        if (context != null) {
+          populateVirtualColumnValues(context, vcs, vcValues, deserializer);
+        }
         this.rowWithPartAndVC[vcPos] = this.vcValues;
       } else if (!isPartitioned) {
         row = deserializer.deserialize((Writable) value);
@@ -549,54 +551,60 @@ public class MapOperator extends Operato
     }
   }
 
-  private void populateVirtualColumnValues() {
-    if (this.vcs != null) {
-      ExecMapperContext mapExecCxt = this.getExecContext();
-      IOContext ioCxt = mapExecCxt.getIoCxt();
-      for (int i = 0; i < vcs.size(); i++) {
-        VirtualColumn vc = vcs.get(i);
-        if (vc.equals(VirtualColumn.FILENAME) && mapExecCxt.inputFileChanged()) {
-          this.vcValues[i] = new Text(mapExecCxt.getCurrentInputFile());
-        } else if (vc.equals(VirtualColumn.BLOCKOFFSET)) {
-          long current = ioCxt.getCurrentBlockStart();
-          LongWritable old = (LongWritable) this.vcValues[i];
-          if (old == null) {
-            old = new LongWritable(current);
-            this.vcValues[i] = old;
-            continue;
-          }
-          if (current != old.get()) {
-            old.set(current);
-          }
-        } else if (vc.equals(VirtualColumn.ROWOFFSET)) {
-          long current = ioCxt.getCurrentRow();
-          LongWritable old = (LongWritable) this.vcValues[i];
-          if (old == null) {
-            old = new LongWritable(current);
-            this.vcValues[i] = old;
-            continue;
-          }
-          if (current != old.get()) {
-            old.set(current);
-          }
-        } else if (vc.equals(VirtualColumn.RAWDATASIZE)) {
-          long current = 0L;
-          SerDeStats stats = this.deserializer.getSerDeStats();
-          if(stats != null) {
-            current = stats.getRawDataSize();
-          }
-          LongWritable old = (LongWritable) this.vcValues[i];
-          if (old == null) {
-            old = new LongWritable(current);
-            this.vcValues[i] = old;
-            continue;
-          }
-          if (current != old.get()) {
-            old.set(current);
-          }
+  public static Writable[] populateVirtualColumnValues(ExecMapperContext ctx,
+      List<VirtualColumn> vcs, Writable[] vcValues, Deserializer deserializer) {
+    if (vcs == null) {
+      return vcValues;
+    }
+    if (vcValues == null) {
+      vcValues = new Writable[vcs.size()];
+    }
+    for (int i = 0; i < vcs.size(); i++) {
+      VirtualColumn vc = vcs.get(i);
+      if (vc.equals(VirtualColumn.FILENAME)) {
+        if (ctx.inputFileChanged()) {
+          vcValues[i] = new Text(ctx.getCurrentInputFile());
+        }
+      } else if (vc.equals(VirtualColumn.BLOCKOFFSET)) {
+        long current = ctx.getIoCxt().getCurrentBlockStart();
+        LongWritable old = (LongWritable) vcValues[i];
+        if (old == null) {
+          old = new LongWritable(current);
+          vcValues[i] = old;
+          continue;
+        }
+        if (current != old.get()) {
+          old.set(current);
+        }
+      } else if (vc.equals(VirtualColumn.ROWOFFSET)) {
+        long current = ctx.getIoCxt().getCurrentRow();
+        LongWritable old = (LongWritable) vcValues[i];
+        if (old == null) {
+          old = new LongWritable(current);
+          vcValues[i] = old;
+          continue;
+        }
+        if (current != old.get()) {
+          old.set(current);
+        }
+      } else if (vc.equals(VirtualColumn.RAWDATASIZE)) {
+        long current = 0L;
+        SerDeStats stats = deserializer.getSerDeStats();
+        if(stats != null) {
+          current = stats.getRawDataSize();
+        }
+        LongWritable old = (LongWritable) vcValues[i];
+        if (old == null) {
+          old = new LongWritable(current);
+          vcValues[i] = old;
+          continue;
+        }
+        if (current != old.get()) {
+          old.set(current);
         }
       }
     }
+    return vcValues;
   }
 
   @Override

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1374389&r1=1374388&r2=1374389&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Fri Aug 17 18:28:56 2012
@@ -339,7 +339,7 @@ public class HiveInputFormat<K extends W
     return partDesc;
   }
 
-  protected void pushFilters(JobConf jobConf, TableScanOperator tableScan) {
+  public static void pushFilters(JobConf jobConf, TableScanOperator tableScan) {
 
     TableScanDesc scanDesc = tableScan.getConf();
     if (scanDesc == null) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=1374389&r1=1374388&r2=1374389&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java Fri Aug 17 18:28:56 2012
@@ -39,6 +39,10 @@ public class IOContext {
     return IOContext.threadLocal.get();
   }
 
+  public static void clear() {
+    IOContext.threadLocal.remove();
+  }
+
   long currentBlockStart;
   long nextBlockStart;
   long currentRow;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1374389&r1=1374388&r2=1374389&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Fri Aug 17 18:28:56 2012
@@ -77,6 +77,7 @@ public class Optimizer {
     if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVELIMITOPTENABLE)) {
       transformations.add(new GlobalLimitOptimizer());
     }
+    transformations.add(new SimpleFetchOptimizer());  // must be called last
   }
 
   /**

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java?rev=1374389&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java Fri Aug 17 18:28:56 2012
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.LimitOperator;
+import org.apache.hadoop.hive.ql.exec.ListSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+import org.apache.hadoop.hive.ql.parse.QB;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.SplitSample;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.ql.plan.ListSinkDesc;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+
+/**
+ * Tries to convert simple fetch query to single fetch task, which fetches rows directly
+ * from location of table/partition.
+ */
+public class SimpleFetchOptimizer implements Transform {
+
+  private final Log LOG = LogFactory.getLog(SimpleFetchOptimizer.class.getName());
+
+  public ParseContext transform(ParseContext pctx) throws SemanticException {
+    Map<String, Operator<? extends Serializable>> topOps = pctx.getTopOps();
+    if (pctx.getQB().isSimpleSelectQuery() && topOps.size() == 1) {
+      // no join, no groupby, no distinct, no lateral view, no subq,
+      // no CTAS or insert, not analyze command, and single sourced.
+      String alias = (String) pctx.getTopOps().keySet().toArray()[0];
+      Operator topOp = (Operator) pctx.getTopOps().values().toArray()[0];
+      if (topOp instanceof TableScanOperator) {
+        try {
+          FetchTask fetchTask = optimize(pctx, alias, (TableScanOperator) topOp);
+          if (fetchTask != null) {
+            pctx.setFetchTask(fetchTask);
+          }
+        } catch (HiveException e) {
+          // Has to use full name to make sure it does not conflict with
+          // org.apache.commons.lang.StringUtils
+          LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
+          if (e instanceof SemanticException) {
+            throw (SemanticException) e;
+          }
+          throw new SemanticException(e.getMessage(), e);
+        }
+      }
+    }
+    return pctx;
+  }
+
+  // returns non-null FetchTask instance when succeeded
+  @SuppressWarnings("unchecked")
+  private FetchTask optimize(ParseContext pctx, String alias, TableScanOperator source)
+      throws HiveException {
+    String mode = HiveConf.getVar(
+        pctx.getConf(), HiveConf.ConfVars.HIVEFETCHTASKCONVERSION);
+
+    boolean aggressive = "more".equals(mode);
+    FetchData fetch = checkTree(aggressive, pctx, alias, source);
+    if (fetch != null) {
+      int limit = pctx.getQB().getParseInfo().getOuterQueryLimit();
+      FetchWork fetchWork = fetch.convertToWork();
+      FetchTask fetchTask = (FetchTask) TaskFactory.get(fetchWork, pctx.getConf());
+      fetchWork.setSink(fetch.completed(pctx, fetchWork));
+      fetchWork.setSource(source);
+      fetchWork.setLimit(limit);
+      return fetchTask;
+    }
+    return null;
+  }
+
+  // all we can handle is LimitOperator, FilterOperator SelectOperator and final FS
+  //
+  // for non-aggressive mode (minimal)
+  // 1. samping is not allowed
+  // 2. for partitioned table, all filters should be targeted to partition column
+  // 3. SelectOperator should be select star
+  private FetchData checkTree(boolean aggressive, ParseContext pctx, String alias,
+      TableScanOperator ts) throws HiveException {
+    SplitSample splitSample = pctx.getNameToSplitSample().get(alias);
+    if (!aggressive && splitSample != null) {
+      return null;
+    }
+    QB qb = pctx.getQB();
+    if (!aggressive && qb.hasTableSample(alias)) {
+      return null;
+    }
+
+    Table table = qb.getMetaData().getAliasToTable().get(alias);
+    if (table == null) {
+      return null;
+    }
+    if (!table.isPartitioned()) {
+      return checkOperators(new FetchData(table, splitSample), ts, aggressive, false);
+    }
+
+    boolean bypassFilter = false;
+    if (HiveConf.getBoolVar(pctx.getConf(), HiveConf.ConfVars.HIVEOPTPPD)) {
+      ExprNodeDesc pruner = pctx.getOpToPartPruner().get(ts);
+      bypassFilter = PartitionPruner.onlyContainsPartnCols(table, pruner);
+    }
+    if (aggressive || bypassFilter) {
+      PrunedPartitionList pruned = pctx.getPrunedPartitions(alias, ts);
+      if (aggressive || pruned.getUnknownPartns().isEmpty()) {
+        bypassFilter &= pruned.getUnknownPartns().isEmpty();
+        return checkOperators(new FetchData(pruned, splitSample), ts, aggressive, bypassFilter);
+      }
+    }
+    return null;
+  }
+
+  private FetchData checkOperators(FetchData fetch, TableScanOperator ts, boolean aggresive,
+      boolean bypassFilter) {
+    if (ts.getChildOperators().size() != 1) {
+      return null;
+    }
+    Operator<?> op = ts.getChildOperators().get(0);
+    for (; ; op = op.getChildOperators().get(0)) {
+      if (aggresive) {
+        if (!(op instanceof LimitOperator || op instanceof FilterOperator
+            || op instanceof SelectOperator)) {
+          break;
+        }
+      } else if (!(op instanceof LimitOperator || (op instanceof FilterOperator && bypassFilter)
+          || (op instanceof SelectOperator && ((SelectOperator) op).getConf().isSelectStar()))) {
+        break;
+      }
+      if (op.getChildOperators() == null || op.getChildOperators().size() != 1) {
+        return null;
+      }
+    }
+    if (op instanceof FileSinkOperator) {
+      fetch.fileSink = op;
+      return fetch;
+    }
+    return null;
+  }
+
+  private class FetchData {
+
+    private final Table table;
+    private final SplitSample splitSample;
+    private final PrunedPartitionList partsList;
+    private final HashSet<ReadEntity> inputs = new HashSet<ReadEntity>();
+
+    // this is always non-null when conversion is completed
+    private Operator<?> fileSink;
+
+    private FetchData(Table table, SplitSample splitSample) {
+      this.table = table;
+      this.partsList = null;
+      this.splitSample = splitSample;
+    }
+
+    private FetchData(PrunedPartitionList partsList, SplitSample splitSample) {
+      this.table = null;
+      this.partsList = partsList;
+      this.splitSample = splitSample;
+    }
+
+    private FetchWork convertToWork() throws HiveException {
+      inputs.clear();
+      if (table != null) {
+        inputs.add(new ReadEntity(table));
+        String path = table.getPath().toString();
+        FetchWork work = new FetchWork(path, Utilities.getTableDesc(table));
+        PlanUtils.configureInputJobPropertiesForStorageHandler(work.getTblDesc());
+        work.setSplitSample(splitSample);
+        return work;
+      }
+      List<String> listP = new ArrayList<String>();
+      List<PartitionDesc> partP = new ArrayList<PartitionDesc>();
+
+      for (Partition partition : partsList.getNotDeniedPartns()) {
+        inputs.add(new ReadEntity(partition));
+        listP.add(partition.getPartitionPath().toString());
+        partP.add(Utilities.getPartitionDesc(partition));
+      }
+      TableDesc table = Utilities.getTableDesc(partsList.getSourceTable());
+      FetchWork work = new FetchWork(listP, partP, table);
+      if (!work.getPartDesc().isEmpty()) {
+        PartitionDesc part0 = work.getPartDesc().get(0);
+        PlanUtils.configureInputJobPropertiesForStorageHandler(part0.getTableDesc());
+        work.setSplitSample(splitSample);
+      }
+      return work;
+    }
+
+    // this optimizer is for replacing FS to temp+fetching from temp with
+    // single direct fetching, which means FS is not needed any more when conversion completed.
+    // rows forwarded will be received by ListSinkOperator, which is replacing FS
+    private ListSinkOperator completed(ParseContext pctx, FetchWork work) {
+      pctx.getSemanticInputs().addAll(inputs);
+      ListSinkOperator sink = new ListSinkOperator();
+      sink.setConf(new ListSinkDesc(work.getSerializationNullFormat()));
+      sink.setParentOperators(new ArrayList<Operator<? extends Serializable>>());
+      Operator<? extends Serializable> parent = fileSink.getParentOperators().get(0);
+      sink.getParentOperators().add(parent);
+      parent.replaceChild(fileSink, sink);
+      fileSink.setParentOperators(null);
+      return sink;
+    }
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GlobalLimitCtx.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GlobalLimitCtx.java?rev=1374389&r1=1374388&r2=1374389&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GlobalLimitCtx.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GlobalLimitCtx.java Fri Aug 17 18:28:56 2012
@@ -21,7 +21,7 @@ package org.apache.hadoop.hive.ql.parse;
 import org.apache.hadoop.hive.ql.plan.LimitDesc;
 
 /**
- *
+ * context for pruning inputs. populated by GlobalLimitOptimizer
  */
 public class GlobalLimitCtx {
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java?rev=1374389&r1=1374388&r2=1374389&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java Fri Aug 17 18:28:56 2012
@@ -29,6 +29,7 @@ import java.util.Set;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
@@ -37,7 +38,9 @@ import org.apache.hadoop.hive.ql.exec.Ta
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
 import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
@@ -100,6 +103,8 @@ public class ParseContext {
   private HashSet<ReadEntity> semanticInputs;
   private List<Task<? extends Serializable>> rootTasks;
 
+  private FetchTask fetchTask;
+
   public ParseContext() {
   }
 
@@ -533,4 +538,23 @@ public class ParseContext {
     this.rootTasks.remove(rootTask);
     this.rootTasks.addAll(tasks);
   }
+
+  public FetchTask getFetchTask() {
+    return fetchTask;
+  }
+
+  public void setFetchTask(FetchTask fetchTask) {
+    this.fetchTask = fetchTask;
+  }
+
+  public PrunedPartitionList getPrunedPartitions(String alias, TableScanOperator ts)
+      throws HiveException {
+    PrunedPartitionList partsList = opToPartList.get(ts);
+    if (partsList == null) {
+      partsList = PartitionPruner.prune(topToTable.get(ts),
+          opToPartPruner.get(ts), conf, alias, prunedPartitions);
+      opToPartList.put(ts, partsList);
+    }
+    return partsList;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java?rev=1374389&r1=1374388&r2=1374389&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java Fri Aug 17 18:28:56 2012
@@ -92,8 +92,8 @@ public class PrunedPartitionList {
    */
   public List<Partition> getNotDeniedPartns() {
     List<Partition> partitions = new ArrayList<Partition>();
-    partitions.addAll(unknownPartns);
     partitions.addAll(confirmedPartns);
+    partitions.addAll(unknownPartns);
     return partitions;
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java?rev=1374389&r1=1374388&r2=1374389&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java Fri Aug 17 18:28:56 2012
@@ -183,8 +183,13 @@ public class QB {
     return isQuery;
   }
 
-  public boolean isSelectStarQuery() {
-    return qbp.isSelectStarQuery() && aliasToSubq.isEmpty() && !isCTAS() && !qbp.isAnalyzeCommand();
+  public boolean isSimpleSelectQuery() {
+    return qbp.isSimpleSelectQuery() && aliasToSubq.isEmpty() && !isCTAS() &&
+        !qbp.isAnalyzeCommand();
+  }
+
+  public boolean hasTableSample(String alias) {
+    return qbp.getTabSample(alias) != null;
   }
 
   public CreateTableDesc getTableDesc() {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java?rev=1374389&r1=1374388&r2=1374389&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java Fri Aug 17 18:28:56 2012
@@ -376,8 +376,8 @@ public class QBParseInfo {
     this.outerQueryLimit = outerQueryLimit;
   }
 
-  public boolean isSelectStarQuery() {
-    if (isSubQ || (joinExpr != null) || (!nameToSample.isEmpty())
+  public boolean isSimpleSelectQuery() {
+    if (isSubQ || (joinExpr != null)
         || (!destToGroupby.isEmpty()) || (!destToClusterby.isEmpty())
         || (!aliasToLateralViews.isEmpty())) {
       return false;
@@ -413,23 +413,6 @@ public class QBParseInfo {
       }
     }
 
-    iter = destToSelExpr.entrySet().iterator();
-    while (iter.hasNext()) {
-      Map.Entry<String, ASTNode> entry = iter.next();
-      ASTNode selExprList = entry.getValue();
-      // Iterate over the selects
-      for (int i = 0; i < selExprList.getChildCount(); ++i) {
-
-        // list of the columns
-        ASTNode selExpr = (ASTNode) selExprList.getChild(i);
-        ASTNode sel = (ASTNode) selExpr.getChild(0);
-
-        if (sel.getToken().getType() != HiveParser.TOK_ALLCOLREF) {
-          return false;
-        }
-      }
-    }
-
     return true;
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1374389&r1=1374388&r2=1374389&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Fri Aug 17 18:28:56 2012
@@ -110,7 +110,6 @@ import org.apache.hadoop.hive.ql.optimiz
 import org.apache.hadoop.hive.ql.optimizer.Optimizer;
 import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
 import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalOptimizer;
-import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
 import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec.SpecType;
 import org.apache.hadoop.hive.ql.parse.QBParseInfo.ClauseType;
@@ -143,7 +142,6 @@ import org.apache.hadoop.hive.ql.plan.Lo
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
-import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.ScriptDesc;
@@ -285,6 +283,7 @@ public class SemanticAnalyzer extends Ba
     qb = pctx.getQB();
     groupOpToInputTables = pctx.getGroupOpToInputTables();
     prunedPartitions = pctx.getPrunedPartitions();
+    fetchTask = pctx.getFetchTask();
     setLineageInfo(pctx.getLineageInfo());
   }
 
@@ -7015,95 +7014,16 @@ public class SemanticAnalyzer extends Ba
   }
 
   @SuppressWarnings("nls")
-  private void genMapRedTasks(QB qb) throws SemanticException {
-    FetchWork fetch = null;
-    List<Task<MoveWork>> mvTask = new ArrayList<Task<MoveWork>>();
-    FetchTask fetchTask = null;
-
-    QBParseInfo qbParseInfo = qb.getParseInfo();
-
-    // Does this query need reduce job
-    if (qb.isSelectStarQuery() && qbParseInfo.getDestToClusterBy().isEmpty()
-        && qbParseInfo.getDestToDistributeBy().isEmpty()
-        && qbParseInfo.getDestToOrderBy().isEmpty()
-        && qbParseInfo.getDestToSortBy().isEmpty()) {
-      boolean noMapRed = false;
-
-      Iterator<Map.Entry<String, Table>> iter = qb.getMetaData()
-          .getAliasToTable().entrySet().iterator();
-      Table tab = (iter.next()).getValue();
-      if (!tab.isPartitioned()) {
-        if (qbParseInfo.getDestToWhereExpr().isEmpty()) {
-          fetch = new FetchWork(tab.getPath().toString(), Utilities
-              .getTableDesc(tab), qb.getParseInfo().getOuterQueryLimit());
-          noMapRed = true;
-          inputs.add(new ReadEntity(tab));
-        }
-      } else {
-
-        if (topOps.size() == 1) {
-          TableScanOperator ts = (TableScanOperator) topOps.values().toArray()[0];
-
-          // check if the pruner only contains partition columns
-          if (PartitionPruner.onlyContainsPartnCols(topToTable.get(ts),
-              opToPartPruner.get(ts))) {
-
-            PrunedPartitionList partsList = null;
-            try {
-              partsList = opToPartList.get(ts);
-              if (partsList == null) {
-                partsList = PartitionPruner.prune(topToTable.get(ts),
-                    opToPartPruner.get(ts), conf, (String) topOps.keySet()
-                    .toArray()[0], prunedPartitions);
-                opToPartList.put(ts, partsList);
-              }
-            } catch (HiveException e) {
-              // Has to use full name to make sure it does not conflict with
-              // org.apache.commons.lang.StringUtils
-              LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
-              throw new SemanticException(e.getMessage(), e);
-            }
-
-            // If there is any unknown partition, create a map-reduce job for
-            // the filter to prune correctly
-            if ((partsList.getUnknownPartns().size() == 0)) {
-              List<String> listP = new ArrayList<String>();
-              List<PartitionDesc> partP = new ArrayList<PartitionDesc>();
-
-              Set<Partition> parts = partsList.getConfirmedPartns();
-              Iterator<Partition> iterParts = parts.iterator();
-              while (iterParts.hasNext()) {
-                Partition part = iterParts.next();
-
-                listP.add(part.getPartitionPath().toString());
-                try {
-                  partP.add(Utilities.getPartitionDesc(part));
-                } catch (HiveException e) {
-                  throw new SemanticException(e.getMessage(), e);
-                }
-                inputs.add(new ReadEntity(part));
-              }
-
-              TableDesc table = Utilities.getTableDesc(partsList.getSourceTable());
-              fetch = new FetchWork(listP, partP, table, qb.getParseInfo()
-                  .getOuterQueryLimit());
-              noMapRed = true;
-            }
-          }
-        }
-      }
-
-      if (noMapRed) {
-        PlanUtils.configureInputJobPropertiesForStorageHandler(fetch.getTblDesc());
-        fetchTask = (FetchTask) TaskFactory.get(fetch, conf);
-        setFetchTask(fetchTask);
-
-        // remove root tasks if any
-        rootTasks.clear();
-        return;
-      }
+  private void genMapRedTasks(ParseContext pCtx) throws SemanticException {
+    if (pCtx.getFetchTask() != null) {
+      // replaced by single fetch task
+      init(pCtx);
+      return;
     }
 
+    init(pCtx);
+    List<Task<MoveWork>> mvTask = new ArrayList<Task<MoveWork>>();
+
     // In case of a select, use a fetch task instead of a move task
     if (qb.getIsQuery()) {
       if ((!loadTableWork.isEmpty()) || (loadFileWork.size() != 1)) {
@@ -7115,10 +7035,10 @@ public class SemanticAnalyzer extends Ba
       String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
       TableDesc resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat);
 
-      fetch = new FetchWork(new Path(loadFileWork.get(0).getSourceDir()).toString(),
+      FetchWork fetch = new FetchWork(new Path(loadFileWork.get(0).getSourceDir()).toString(),
           resultTab, qb.getParseInfo().getOuterQueryLimit());
 
-      fetchTask = (FetchTask) TaskFactory.get(fetch, conf);
+      FetchTask fetchTask = (FetchTask) TaskFactory.get(fetch, conf);
       setFetchTask(fetchTask);
 
       // For the FetchTask, the limit optimiztion requires we fetch all the rows
@@ -7542,12 +7462,10 @@ public class SemanticAnalyzer extends Ba
     optm.setPctx(pCtx);
     optm.initialize(conf);
     pCtx = optm.optimize();
-    init(pCtx);
-    qb = pCtx.getQB();
 
     // At this point we have the complete operator tree
     // from which we want to find the reduce operator
-    genMapRedTasks(qb);
+    genMapRedTasks(pCtx);
 
     LOG.info("Completed plan generation");
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java?rev=1374389&r1=1374388&r2=1374389&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java Fri Aug 17 18:28:56 2012
@@ -23,6 +23,9 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ListSinkOperator;
+import org.apache.hadoop.hive.ql.parse.SplitSample;
 
 /**
  * FetchWork.
@@ -38,9 +41,14 @@ public class FetchWork implements Serial
   private ArrayList<String> partDir;
   private ArrayList<PartitionDesc> partDesc;
 
+  private Operator<?> source;
+  private ListSinkOperator sink;
+
   private int limit;
   private int leastNumRows;
 
+  private SplitSample splitSample;
+
   /**
    * Serialization Null Format for the serde used to fetch data.
    */
@@ -71,6 +79,14 @@ public class FetchWork implements Serial
     this.limit = limit;
   }
 
+  public void initializeForFetch() {
+    if (source == null) {
+      sink = new ListSinkOperator();
+      sink.setConf(new ListSinkDesc(serializationNullFormat));
+      source = sink;
+    }
+  }
+
   public String getSerializationNullFormat() {
     return serializationNullFormat;
   }
@@ -79,6 +95,14 @@ public class FetchWork implements Serial
     serializationNullFormat = format;
   }
 
+  public boolean isNotPartitioned() {
+    return tblDir != null;
+  }
+
+  public boolean isPartitioned() {
+    return tblDir == null;
+  }
+
   /**
    * @return the tblDir
    */
@@ -200,6 +224,31 @@ public class FetchWork implements Serial
     this.leastNumRows = leastNumRows;
   }
 
+  @Explain(displayName = "Processor Tree")
+  public Operator<?> getSource() {
+    return source;
+  }
+
+  public void setSource(Operator<?> source) {
+    this.source = source;
+  }
+
+  public ListSinkOperator getSink() {
+    return sink;
+  }
+
+  public void setSink(ListSinkOperator sink) {
+    this.sink = sink;
+  }
+
+  public void setSplitSample(SplitSample splitSample) {
+    this.splitSample = splitSample;
+  }
+
+  public SplitSample getSplitSample() {
+    return splitSample;
+  }
+
   @Override
   public String toString() {
     if (tblDir != null) {

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ListSinkDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ListSinkDesc.java?rev=1374389&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ListSinkDesc.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ListSinkDesc.java Fri Aug 17 18:28:56 2012
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.io.Serializable;
+
+/**
+ * description for ListSinkOperator, just for explain result.
+ */
+@Explain(displayName = "ListSink")
+public class ListSinkDesc implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  private String serializationNullFormat = "NULL";
+
+  public ListSinkDesc() {
+  }
+
+  public ListSinkDesc(String serializationNullFormat) {
+    this.serializationNullFormat = serializationNullFormat;
+  }
+
+  public String getSerializationNullFormat() {
+    return serializationNullFormat;
+  }
+
+  public void setSerializationNullFormat(String serializationNullFormat) {
+    this.serializationNullFormat = serializationNullFormat;
+  }
+}

Added: hive/trunk/ql/src/test/queries/clientpositive/nonmr_fetch.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/nonmr_fetch.q?rev=1374389&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/nonmr_fetch.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/nonmr_fetch.q Fri Aug 17 18:28:56 2012
@@ -0,0 +1,83 @@
+set hive.fetch.task.conversion=minimal;
+
+-- backward compatible (minimal)
+explain select * from src limit 10;
+select * from src limit 10;
+
+explain select * from srcpart where ds='2008-04-08' AND hr='11' limit 10;
+select * from srcpart where ds='2008-04-08' AND hr='11' limit 10;
+
+-- negative, select expression
+explain select key from src limit 10;
+select key from src limit 10;
+
+-- negative, filter on non-partition column
+explain select * from srcpart where key > 100 limit 10;
+select * from srcpart where key > 100 limit 10;
+
+-- negative, table sampling
+explain select * from src TABLESAMPLE (0.25 PERCENT) limit 10;
+select * from src TABLESAMPLE (0.25 PERCENT) limit 10;
+
+set hive.fetch.task.conversion=more;
+
+-- backward compatible (more)
+explain select * from src limit 10;
+select * from src limit 10;
+
+explain select * from srcpart where ds='2008-04-08' AND hr='11' limit 10;
+select * from srcpart where ds='2008-04-08' AND hr='11' limit 10;
+
+-- select expression
+explain select cast(key as int) * 10, upper(value) from src limit 10;
+select cast(key as int) * 10, upper(value) from src limit 10;
+
+-- filter on non-partition column
+explain select key from src where key < 100 limit 10;
+select key from src where key < 100 limit 10;
+
+-- select expr for partitioned table
+explain select key from srcpart where ds='2008-04-08' AND hr='11' limit 10;
+select key from srcpart where ds='2008-04-08' AND hr='11' limit 10;
+
+-- virtual columns
+explain select *, BLOCK__OFFSET__INSIDE__FILE from src where key < 10 limit 10;
+select *, BLOCK__OFFSET__INSIDE__FILE from src where key < 100 limit 10;
+
+-- virtual columns on partitioned table
+explain select *, BLOCK__OFFSET__INSIDE__FILE from srcpart where key < 10 limit 30;
+select *, BLOCK__OFFSET__INSIDE__FILE from srcpart where key < 10 limit 30;
+
+-- bucket sampling
+explain select *, BLOCK__OFFSET__INSIDE__FILE from src TABLESAMPLE (BUCKET 1 OUT OF 40 ON key);
+select *, BLOCK__OFFSET__INSIDE__FILE from src TABLESAMPLE (BUCKET 1 OUT OF 40 ON key);
+explain select *, BLOCK__OFFSET__INSIDE__FILE from srcpart TABLESAMPLE (BUCKET 1 OUT OF 40 ON key);
+select *, BLOCK__OFFSET__INSIDE__FILE from srcpart TABLESAMPLE (BUCKET 1 OUT OF 40 ON key);
+
+-- split sampling
+explain select * from src TABLESAMPLE (0.25 PERCENT);
+select * from src TABLESAMPLE (0.25 PERCENT);
+explain select *, BLOCK__OFFSET__INSIDE__FILE from srcpart TABLESAMPLE (0.25 PERCENT);
+select *, BLOCK__OFFSET__INSIDE__FILE from srcpart TABLESAMPLE (0.25 PERCENT);
+
+-- non deterministic func
+explain select key, value, BLOCK__OFFSET__INSIDE__FILE from srcpart where ds="2008-04-09" AND rand() > 1;
+select key, value, BLOCK__OFFSET__INSIDE__FILE from srcpart where ds="2008-04-09" AND rand() > 1;
+
+-- negative, groupby
+explain select key, count(value) from src group by key;
+
+-- negative, distinct
+explain select distinct key, value from src;
+
+-- negative, CTAS
+explain create table srcx as select distinct key, value from src;
+
+-- negative, analyze
+explain analyze table src compute statistics;
+
+-- negative, subq
+explain select a.* from (select * from src) a;
+
+-- negative, join
+explain select * from src join src src2 on src.key=src2.key;

Modified: hive/trunk/ql/src/test/queries/clientpositive/partition_wise_fileformat2.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/partition_wise_fileformat2.q?rev=1374389&r1=1374388&r2=1374389&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/partition_wise_fileformat2.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/partition_wise_fileformat2.q Fri Aug 17 18:28:56 2012
@@ -8,5 +8,11 @@ insert overwrite table partition_test_pa
 alter table partition_test_partitioned set fileformat Sequencefile;
 insert overwrite table partition_test_partitioned partition(dt=102) select * from src1;
 
+set hive.fetch.task.conversion=minimal;
+explain select *, BLOCK__OFFSET__INSIDE__FILE from partition_test_partitioned where dt >=100 and dt <= 102;
+select * from partition_test_partitioned where dt >=100 and dt <= 102;
+
+set hive.fetch.task.conversion=more;
+explain select *, BLOCK__OFFSET__INSIDE__FILE from partition_test_partitioned where dt >=100 and dt <= 102;
 select * from partition_test_partitioned where dt >=100 and dt <= 102;
 

Modified: hive/trunk/ql/src/test/results/clientpositive/input.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/input.q.out?rev=1374389&r1=1374388&r2=1374389&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/input.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/input.q.out Fri Aug 17 18:28:56 2012
@@ -14,6 +14,17 @@ STAGE PLANS:
   Stage: Stage-0
     Fetch Operator
       limit: -1
+      Processor Tree:
+        TableScan
+          alias: x
+          Select Operator
+            expressions:
+                  expr: key
+                  type: string
+                  expr: value
+                  type: string
+            outputColumnNames: _col0, _col1
+            ListSink
 
 
 PREHOOK: query: SELECT x.* FROM SRC x

Modified: hive/trunk/ql/src/test/results/clientpositive/input0.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/input0.q.out?rev=1374389&r1=1374388&r2=1374389&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/input0.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/input0.q.out Fri Aug 17 18:28:56 2012
@@ -14,6 +14,17 @@ STAGE PLANS:
   Stage: Stage-0
     Fetch Operator
       limit: -1
+      Processor Tree:
+        TableScan
+          alias: src
+          Select Operator
+            expressions:
+                  expr: key
+                  type: string
+                  expr: value
+                  type: string
+            outputColumnNames: _col0, _col1
+            ListSink
 
 
 PREHOOK: query: SELECT * FROM src

Modified: hive/trunk/ql/src/test/results/clientpositive/input_limit.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/input_limit.q.out?rev=1374389&r1=1374388&r2=1374389&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/input_limit.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/input_limit.q.out Fri Aug 17 18:28:56 2012
@@ -14,6 +14,18 @@ STAGE PLANS:
   Stage: Stage-0
     Fetch Operator
       limit: 20
+      Processor Tree:
+        TableScan
+          alias: x
+          Select Operator
+            expressions:
+                  expr: key
+                  type: string
+                  expr: value
+                  type: string
+            outputColumnNames: _col0, _col1
+            Limit
+              ListSink
 
 
 PREHOOK: query: SELECT x.* FROM SRC x LIMIT 20

Modified: hive/trunk/ql/src/test/results/clientpositive/input_part0.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/input_part0.q.out?rev=1374389&r1=1374388&r2=1374389&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/input_part0.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/input_part0.q.out Fri Aug 17 18:28:56 2012
@@ -14,6 +14,21 @@ STAGE PLANS:
   Stage: Stage-0
     Fetch Operator
       limit: -1
+      Processor Tree:
+        TableScan
+          alias: x
+          Select Operator
+            expressions:
+                  expr: key
+                  type: string
+                  expr: value
+                  type: string
+                  expr: ds
+                  type: string
+                  expr: hr
+                  type: string
+            outputColumnNames: _col0, _col1, _col2, _col3
+            ListSink
 
 
 PREHOOK: query: SELECT x.* FROM SRCPART x WHERE x.ds = '2008-04-08'

Modified: hive/trunk/ql/src/test/results/clientpositive/input_part3.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/input_part3.q.out?rev=1374389&r1=1374388&r2=1374389&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/input_part3.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/input_part3.q.out Fri Aug 17 18:28:56 2012
@@ -14,6 +14,21 @@ STAGE PLANS:
   Stage: Stage-0
     Fetch Operator
       limit: -1
+      Processor Tree:
+        TableScan
+          alias: x
+          Select Operator
+            expressions:
+                  expr: key
+                  type: string
+                  expr: value
+                  type: string
+                  expr: ds
+                  type: string
+                  expr: hr
+                  type: string
+            outputColumnNames: _col0, _col1, _col2, _col3
+            ListSink
 
 
 PREHOOK: query: SELECT x.* FROM SRCPART x WHERE x.ds = '2008-04-08' and x.hr = 11

Modified: hive/trunk/ql/src/test/results/clientpositive/input_part4.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/input_part4.q.out?rev=1374389&r1=1374388&r2=1374389&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/input_part4.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/input_part4.q.out Fri Aug 17 18:28:56 2012
@@ -14,6 +14,25 @@ STAGE PLANS:
   Stage: Stage-0
     Fetch Operator
       limit: -1
+      Processor Tree:
+        TableScan
+          alias: x
+          Filter Operator
+            predicate:
+                expr: ((ds = '2008-04-08') and (hr = 15.0))
+                type: boolean
+            Select Operator
+              expressions:
+                    expr: key
+                    type: string
+                    expr: value
+                    type: string
+                    expr: ds
+                    type: string
+                    expr: hr
+                    type: string
+              outputColumnNames: _col0, _col1, _col2, _col3
+              ListSink
 
 
 PREHOOK: query: SELECT x.* FROM SRCPART x WHERE x.ds = '2008-04-08' and x.hr = 15

Modified: hive/trunk/ql/src/test/results/clientpositive/input_part8.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/input_part8.q.out?rev=1374389&r1=1374388&r2=1374389&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/input_part8.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/input_part8.q.out Fri Aug 17 18:28:56 2012
@@ -14,6 +14,22 @@ STAGE PLANS:
   Stage: Stage-0
     Fetch Operator
       limit: 10
+      Processor Tree:
+        TableScan
+          alias: x
+          Select Operator
+            expressions:
+                  expr: key
+                  type: string
+                  expr: value
+                  type: string
+                  expr: ds
+                  type: string
+                  expr: hr
+                  type: string
+            outputColumnNames: _col0, _col1, _col2, _col3
+            Limit
+              ListSink
 
 
 PREHOOK: query: SELECT x.* FROM SRCPART x WHERE ds = '2008-04-08' LIMIT 10



Mime
View raw message