hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From heyongqi...@apache.org
Subject svn commit: r1202525 [1/2] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/index/compact/ ql/src/java/org/apache/hadoop/hive/ql/io/ ql/src/java/org/a...
Date Wed, 16 Nov 2011 04:42:04 GMT
Author: heyongqiang
Date: Wed Nov 16 04:42:03 2011
New Revision: 1202525

URL: http://svn.apache.org/viewvc?rev=1202525&view=rev
Log:
HIVE-2535: Use sorted nature of compact indexes (Kevin Wilfong via He Yongqiang)

Added:
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyHiveSortedInputFormatUsedHook.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
    hive/trunk/ql/src/test/queries/clientpositive/index_compact_binary_search.q
    hive/trunk/ql/src/test/results/clientpositive/index_compact_binary_search.q.out
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/conf/hive-default.xml
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBaseCompare.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1202525&r1=1202524&r2=1202525&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Nov 16 04:42:03 2011
@@ -407,6 +407,7 @@ public class HiveConf extends Configurat
     HIVEOPTINDEXFILTER_COMPACT_MAXSIZE("hive.optimize.index.filter.compact.maxsize", (long) -1), // infinity
     HIVE_INDEX_COMPACT_QUERY_MAX_ENTRIES("hive.index.compact.query.max.entries", (long) 10000000), // 10M
     HIVE_INDEX_COMPACT_QUERY_MAX_SIZE("hive.index.compact.query.max.size", (long) 10 * 1024 * 1024 * 1024), // 10G
+    HIVE_INDEX_COMPACT_BINARY_SEARCH("hive.index.compact.binary.search", true),
 
     // Statistics
     HIVESTATSAUTOGATHER("hive.stats.autogather", true),

Modified: hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml?rev=1202525&r1=1202524&r2=1202525&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml (original)
+++ hive/trunk/conf/hive-default.xml Wed Nov 16 04:42:03 2011
@@ -1071,6 +1071,12 @@
 </property>
 
 <property>
+  <name>hive.index.compact.binary.search</name>
+  <value>true</value>
+  <description>Whether or not to use a binary search to find the entries in an index table that match the filter, where possible</description>
+</property>
+
+<property>
   <name>hive.exim.uri.scheme.whitelist</name>
   <value>hdfs,pfile</value>
   <description>A comma separated list of acceptable URI schemes for import and export.</description>

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=1202525&r1=1202524&r2=1202525&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Wed Nov 16 04:42:03 2011
@@ -527,6 +527,9 @@ public class ExecDriver extends Task<Map
       conf.set("hive.index.compact.file", work.getIndexIntermediateFile());
       conf.set("hive.index.blockfilter.file", work.getIndexIntermediateFile());
     }
+
+    // Intentionally overwrites anything the user may have put here
+    conf.setBoolean("hive.input.format.sorted", work.isInputFormatSorted());
   }
 
   public boolean mapStarted() {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java?rev=1202525&r1=1202524&r2=1202525&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java Wed Nov 16 04:42:03 2011
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBaseCompare;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFCase;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFWhen;
 import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
@@ -81,12 +82,12 @@ public class ExprNodeGenericFuncEvaluato
     void evaluate() throws HiveException {
       obj = eval.evaluate(rowObject);
     }
-    
+
     public Object get() throws HiveException {
       return obj;
     }
   }
-  
+
   public ExprNodeGenericFuncEvaluator(ExprNodeGenericFuncDesc expr) {
     this.expr = expr;
     children = new ExprNodeEvaluator[expr.getChildExprs().size()];
@@ -162,4 +163,35 @@ public class ExprNodeGenericFuncEvaluato
     return genericUDF.evaluate(deferredChildren);
   }
 
+  /**
+   * If the genericUDF is a base comparison, it returns an integer based on the result of comparing
+   * the two sides of the UDF, like the compareTo method in Comparable.
+   *
+   * If the genericUDF is not a base comparison, or there is an error executing the comparison, it
+   * returns null.
+   * @param row
+   * @return
+   * @throws HiveException
+   */
+  public Integer compare(Object row) throws HiveException {
+    if (!expr.isSortedExpr() || !(genericUDF instanceof GenericUDFBaseCompare)) {
+      for (ExprNodeEvaluator evaluator: children) {
+        if (evaluator instanceof ExprNodeGenericFuncEvaluator) {
+          Integer comparison = ((ExprNodeGenericFuncEvaluator) evaluator).compare(row);
+          if (comparison != null) {
+            return comparison;
+          }
+        }
+      }
+      return null;
+    }
+
+    rowObject = row;
+    if (isEager) {
+      for (int i = 0; i < deferredChildren.length; i++) {
+        ((EagerExprObject) deferredChildren[i]).evaluate();
+      }
+    }
+    return ((GenericUDFBaseCompare)genericUDF).compare(deferredChildren);
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java?rev=1202525&r1=1202524&r2=1202525&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java Wed Nov 16 04:42:03 2011
@@ -22,6 +22,7 @@ import java.io.Serializable;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.IOContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.FilterDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -49,6 +50,8 @@ public class FilterOperator extends Oper
   private transient ExprNodeEvaluator conditionEvaluator;
   private transient PrimitiveObjectInspector conditionInspector;
   private transient int consecutiveFails;
+  private transient int consecutiveSearches;
+  private transient IOContext ioContext;
   transient int heartbeatInterval;
 
   public FilterOperator() {
@@ -56,6 +59,7 @@ public class FilterOperator extends Oper
     filtered_count = new LongWritable();
     passed_count = new LongWritable();
     consecutiveFails = 0;
+    consecutiveSearches = 0;
   }
 
   @Override
@@ -67,6 +71,7 @@ public class FilterOperator extends Oper
       statsMap.put(Counter.FILTERED, filtered_count);
       statsMap.put(Counter.PASSED, passed_count);
       conditionInspector = null;
+      ioContext = IOContext.get();
     } catch (Throwable e) {
       throw new HiveException(e);
     }
@@ -80,7 +85,47 @@ public class FilterOperator extends Oper
       conditionInspector = (PrimitiveObjectInspector) conditionEvaluator
           .initialize(rowInspector);
     }
+
+    // If the input is sorted, and we are executing a search based on the arguments to this filter,
+    // set the comparison in the IOContext and the type of the UDF
+    if (conf.isSortedFilter() && ioContext.useSorted()) {
+      if (!(conditionEvaluator instanceof ExprNodeGenericFuncEvaluator)) {
+        LOG.error("Attempted to use the fact data is sorted when the conditionEvaluator is not " +
+                  "of type ExprNodeGenericFuncEvaluator");
+        ioContext.setUseSorted(false);
+        return;
+      } else {
+        ioContext.setComparison(((ExprNodeGenericFuncEvaluator)conditionEvaluator).compare(row));
+      }
+
+      if (ioContext.getGenericUDFClassName() == null) {
+        ioContext.setGenericUDFClassName(
+            ((ExprNodeGenericFuncEvaluator)conditionEvaluator).genericUDF.getClass().getName());
+      }
+
+      // If we are currently searching the data for a place to begin, do not return data yet
+      if (ioContext.isBinarySearching()) {
+        consecutiveSearches++;
+        // In case we're searching through an especially large set of data, send a heartbeat in
+        // order to avoid timeout
+        if (((consecutiveSearches % heartbeatInterval) == 0) && (reporter != null)) {
+          reporter.progress();
+        }
+        return;
+      }
+    }
+
     Object condition = conditionEvaluator.evaluate(row);
+
+    // If we are currently performing a binary search on the input, don't forward the results
+    // Currently this value is set when a query is optimized using a compact index.  The map reduce
+    // job responsible for scanning and filtering the index sets this value.  It remains set
+    // throughout the binary search executed by the HiveBinarySearchRecordResder until a starting
+    // point for a linear scan has been identified, at which point this value is unset.
+    if (ioContext.isBinarySearching()) {
+      return;
+    }
+
     Boolean ret = (Boolean) conditionInspector
         .getPrimitiveJavaObject(condition);
     if (Boolean.TRUE.equals(ret)) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java?rev=1202525&r1=1202524&r2=1202525&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java Wed Nov 16 04:42:03 2011
@@ -18,7 +18,10 @@
 
 package org.apache.hadoop.hive.ql.index.compact;
 
+import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Set;
@@ -27,11 +30,14 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Index;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
@@ -39,6 +45,7 @@ import org.apache.hadoop.hive.ql.index.H
 import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
 import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
 import org.apache.hadoop.hive.ql.index.TableBasedIndexHandler;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler.DecomposedPredicate;
 import org.apache.hadoop.hive.ql.metadata.HiveUtils;
@@ -46,7 +53,10 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.optimizer.IndexUtils;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
@@ -57,6 +67,10 @@ import org.apache.hadoop.hive.ql.udf.gen
 public class CompactIndexHandler extends TableBasedIndexHandler {
 
   private Configuration configuration;
+  // The names of the partition columns
+  private Set<String> partitionCols;
+  // Whether or not the conditions have been met to use the fact the index is sorted
+  private boolean useSorted;
   private static final Log LOG = LogFactory.getLog(CompactIndexHandler.class.getName());
 
 
@@ -172,6 +186,58 @@ public class CompactIndexHandler extends
     Driver driver = new Driver(queryConf);
     driver.compile(qlCommand.toString(), false);
 
+    if (pctx.getConf().getBoolVar(ConfVars.HIVE_INDEX_COMPACT_BINARY_SEARCH) && useSorted) {
+      // For now, only works if the predicate is a single condition
+      MapredWork work = null;
+      String originalInputFormat = null;
+      for (Task task : driver.getPlan().getRootTasks()) {
+        // The index query should have one and only one map reduce task in the root tasks
+        // Otherwise something is wrong, log the problem and continue using the default format
+        if (task.getWork() instanceof MapredWork) {
+          if (work != null) {
+            LOG.error("Tried to use a binary search on a compact index but there were an " +
+                      "unexpected number (>1) of root level map reduce tasks in the " +
+                      "reentrant query plan.");
+            work.setInputformat(null);
+            work.setInputFormatSorted(false);
+            break;
+          }
+          work = (MapredWork)task.getWork();
+          String inputFormat = work.getInputformat();
+          originalInputFormat = inputFormat;
+          if (inputFormat == null) {
+            inputFormat = HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVEINPUTFORMAT);
+          }
+
+          // We can only perform a binary search with HiveInputFormat and CombineHiveInputFormat
+          // and BucketizedHiveInputFormat
+          try {
+            if (!HiveInputFormat.class.isAssignableFrom(Class.forName(inputFormat))) {
+              work = null;
+              break;
+            }
+          } catch (ClassNotFoundException e) {
+            LOG.error("Map reduce work's input format class: " + inputFormat + " was not found. " +
+                       "Cannot use the fact the compact index is sorted.");
+            work = null;
+            break;
+          }
+
+          work.setInputFormatSorted(true);
+        }
+      }
+
+      if (work != null) {
+        // Find the filter operator and expr node which act on the index column and mark them
+        if (!findIndexColumnFilter(work.getAliasToWork().values())) {
+          LOG.error("Could not locate the index column's filter operator and expr node. Cannot " +
+                    "use the fact the compact index is sorted.");
+          work.setInputformat(originalInputFormat);
+          work.setInputFormatSorted(false);
+        }
+      }
+    }
+
 
     queryContext.addAdditionalSemanticInputs(driver.getPlan().getInputs());
     queryContext.setQueryTasks(driver.getPlan().getRootTasks());
@@ -179,6 +245,62 @@ public class CompactIndexHandler extends
   }
 
   /**
+   * Does a depth first search on the operator tree looking for a filter operator whose predicate
+   * has one child which is a column which is not in the partition
+   * @param operators
+   * @return whether or not it has found its target
+   */
+  private boolean findIndexColumnFilter(Collection<Operator<? extends Serializable>> operators) {
+    for (Operator<? extends Serializable> op : operators) {
+      if (op instanceof FilterOperator && ((FilterOperator)op).getConf().getPredicate().getChildren() != null) {
+        // Is this the target
+        if (findIndexColumnExprNodeDesc(((FilterOperator)op).getConf().getPredicate())) {
+          ((FilterOperator)op).getConf().setSortedFilter(true);
+          return true;
+        }
+      }
+
+      // If the target has been found, no need to continue
+      if (findIndexColumnFilter(op.getChildOperators())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean findIndexColumnExprNodeDesc(ExprNodeDesc expression) {
+    if (expression.getChildren() == null) {
+      return false;
+    }
+
+    if (expression.getChildren().size() == 2) {
+      ExprNodeColumnDesc columnDesc = null;
+      if (expression.getChildren().get(0) instanceof ExprNodeColumnDesc) {
+        columnDesc = (ExprNodeColumnDesc)expression.getChildren().get(0);
+      } else if (expression.getChildren().get(1) instanceof ExprNodeColumnDesc) {
+        columnDesc = (ExprNodeColumnDesc)expression.getChildren().get(1);
+      }
+
+      // Is this the target
+      if (columnDesc != null && !partitionCols.contains(columnDesc.getColumn())) {
+        assert expression instanceof ExprNodeGenericFuncDesc :
+               "Expression containing index column is does not support sorting, should not try" +
+               "and sort";
+        ((ExprNodeGenericFuncDesc)expression).setSortedExpr(true);
+        return true;
+      }
+    }
+
+    for (ExprNodeDesc child : expression.getChildren()) {
+      // If the target has been found, no need to continue
+      if (findIndexColumnExprNodeDesc(child)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
    * Split the predicate into the piece we can deal with (pushed), and the one we can't (residual)
    * @param predicate
    * @param index
@@ -195,6 +317,20 @@ public class CompactIndexHandler extends
       return null;
     }
 
+    int numIndexCols = 0;
+    for (IndexSearchCondition searchCondition : searchConditions) {
+      if (!partitionCols.contains(searchCondition.getColumnDesc().getColumn())) {
+        numIndexCols++;
+      }
+    }
+
+    // For now, only works if the predicate has a single condition on an index column
+    if (numIndexCols == 1) {
+      useSorted = true;
+    } else {
+      useSorted = false;
+    }
+
     DecomposedPredicate decomposedPredicate = new DecomposedPredicate();
     decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions(searchConditions);
     decomposedPredicate.residualPredicate = residualPredicate;
@@ -226,12 +362,14 @@ public class CompactIndexHandler extends
 
     // partitioned columns are treated as if they have indexes so that the partitions
     // are used during the index query generation
+    partitionCols = new HashSet<String>();
     for (Partition part : queryPartitions) {
       if (part.getSpec().isEmpty()) {
         continue; // empty partitions are from whole tables, so we don't want to add them in
       }
       for (String column : part.getSpec().keySet()) {
         analyzer.allowColumnName(column);
+        partitionCols.add(column);
       }
     }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java?rev=1202525&r1=1202524&r2=1202525&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java Wed Nov 16 04:42:03 2011
@@ -23,12 +23,10 @@ import java.io.IOException;
 import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
 
 /**
  * BucketizedHiveRecordReader is a wrapper on a list of RecordReader. It behaves
@@ -39,68 +37,66 @@ public class BucketizedHiveRecordReader<
     extends HiveContextAwareRecordReader<K, V> {
   protected final BucketizedHiveInputSplit split;
   protected final InputFormat inputFormat;
-  protected final JobConf jobConf;
   protected final Reporter reporter;
-  protected RecordReader<K,V> curReader;
   protected long progress;
   protected int idx;
-  
+
   public BucketizedHiveRecordReader(InputFormat inputFormat,
       BucketizedHiveInputSplit bucketizedSplit, JobConf jobConf,
       Reporter reporter) throws IOException {
+    super(jobConf);
+
     this.split = bucketizedSplit;
     this.inputFormat = inputFormat;
-    this.jobConf = jobConf;
     this.reporter = reporter;
     initNextRecordReader();
   }
 
+  @Override
   public void doClose() throws IOException {
-    if (curReader != null) {
-      curReader.close();
-      curReader = null;
+    if (recordReader != null) {
+      recordReader.close();
+      recordReader = null;
     }
     idx = 0;
   }
 
   public K createKey() {
-    return (K) curReader.createKey();
+    return (K) recordReader.createKey();
   }
 
   public V createValue() {
-    return (V) curReader.createValue();
+    return (V) recordReader.createValue();
   }
 
   public long getPos() throws IOException {
-    if (curReader != null) {
-      return curReader.getPos();
+    if (recordReader != null) {
+      return recordReader.getPos();
     } else {
       return 0;
     }
   }
 
+  @Override
   public float getProgress() throws IOException {
     // The calculation is strongly dependent on the assumption that all splits
     // came from the same file
-    return Math.min(1.0f, ((curReader == null) ? progress : curReader.getPos())
-        / (float) (split.getLength()));
+    return Math.min(1.0f, ((recordReader == null || this.getIOContext().isBinarySearching()) ?
+        progress : recordReader.getPos()) / (float) (split.getLength()));
   }
 
+  @Override
   public boolean doNext(K key, V value) throws IOException {
-    while ((curReader == null) || !doNextWithExceptionHandler(key, value)) {
+    while ((recordReader == null) || !doNextWithExceptionHandler(key, value)) {
       if (!initNextRecordReader()) {
         return false;
       }
     }
     return true;
   }
-  
+
   private boolean doNextWithExceptionHandler(K key, V value) throws IOException {
-    try {
-      return curReader.next(key, value);
-    } catch (Exception e) {
-      return HiveIOExceptionHandlerUtil.handleRecordReaderNextException(e, jobConf);
-    }
+    return super.doNext(key, value);
   }
 
   /**
@@ -108,9 +104,9 @@ public class BucketizedHiveRecordReader<
    * BucketizedHiveRecordReader.
    */
   protected boolean initNextRecordReader() throws IOException {
-    if (curReader != null) {
-      curReader.close();
-      curReader = null;
+    if (recordReader != null) {
+      recordReader.close();
+      recordReader = null;
       if (idx > 0) {
         progress += split.getLength(idx - 1); // done processing so far
       }
@@ -123,10 +119,15 @@ public class BucketizedHiveRecordReader<
 
     // get a record reader for the idx-th chunk
     try {
-      curReader = inputFormat.getRecordReader(split.getSplit(idx), jobConf,
+      recordReader = inputFormat.getRecordReader(split.getSplit(idx), jobConf,
           reporter);
     } catch (Exception e) {
-      curReader = HiveIOExceptionHandlerUtil.handleRecordReaderCreationException(e, jobConf);
+      recordReader = HiveIOExceptionHandlerUtil.handleRecordReaderCreationException(e, jobConf);
+    }
+
+    // if we're performing a binary search, we need to restart it
+    if (isSorted) {
+      initIOContextSortedProps((FileSplit) split.getSplit(idx), recordReader, jobConf);
     }
     idx++;
     return true;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java?rev=1202525&r1=1202524&r2=1202525&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java Wed Nov 16 04:42:03 2011
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.io;
 
 import java.io.DataInput;
 import java.io.DataOutput;
-import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -32,8 +31,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -46,9 +43,9 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.parse.SplitSample;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.HadoopShims.CombineFileInputFormatShim;
 import org.apache.hadoop.hive.shims.HadoopShims.InputSplitShim;
+import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
@@ -59,7 +56,6 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.util.StringUtils;
 
 
 /**

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java?rev=1202525&r1=1202524&r2=1202525&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java Wed Nov 16 04:42:03 2011
@@ -30,7 +30,6 @@ import org.apache.hadoop.mapred.FileSpli
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 
 /**
@@ -42,12 +41,10 @@ import org.apache.hadoop.mapred.Reporter
 public class CombineHiveRecordReader<K extends WritableComparable, V extends Writable>
     extends HiveContextAwareRecordReader<K, V> {
 
-  private final RecordReader recordReader;
-
   public CombineHiveRecordReader(InputSplit split, Configuration conf,
       Reporter reporter, Integer partition) throws IOException {
-    JobConf job = (JobConf) conf;
-    CombineHiveInputSplit hsplit = new CombineHiveInputSplit(job,
+    super((JobConf)conf);
+    CombineHiveInputSplit hsplit = new CombineHiveInputSplit(jobConf,
         (InputSplitShim) split);
     String inputFormatClassName = hsplit.inputFormatClassName();
     Class inputFormatClass = null;
@@ -58,15 +55,16 @@ public class CombineHiveRecordReader<K e
           + inputFormatClassName);
     }
     InputFormat inputFormat = HiveInputFormat.getInputFormatFromCache(
-        inputFormatClass, job);
+        inputFormatClass, jobConf);
 
     // create a split for the given partition
     FileSplit fsplit = new FileSplit(hsplit.getPaths()[partition], hsplit
         .getStartOffsets()[partition], hsplit.getLengths()[partition], hsplit
         .getLocations());
 
-    this.recordReader = inputFormat.getRecordReader(fsplit, job, reporter);
-    this.initIOContext(fsplit, job, inputFormatClass, this.recordReader);
+    this.setRecordReader(inputFormat.getRecordReader(fsplit, jobConf, reporter));
+
+    this.initIOContext(fsplit, jobConf, inputFormatClass, this.recordReader);
   }
 
   @Override
@@ -86,7 +84,12 @@ public class CombineHiveRecordReader<K e
     return recordReader.getPos();
   }
 
+  @Override
   public float getProgress() throws IOException {
+    if (isSorted) {
+      return super.getProgress();
+    }
+
     return recordReader.getProgress();
   }
 
@@ -95,6 +98,6 @@ public class CombineHiveRecordReader<K e
     if (ExecMapper.getDone()) {
       return false;
     }
-    return recordReader.next(key, value);
+    return super.doNext(key, value);
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java?rev=1202525&r1=1202524&r2=1202525&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java Wed Nov 16 04:42:03 2011
@@ -19,27 +19,68 @@
 package org.apache.hadoop.hive.ql.io;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
+import org.apache.hadoop.hive.ql.io.IOContext.Comparison;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 
+/** This class prepares an IOContext, and provides the ability to perform a binary search on the
+  * data.  The binary search can be used by setting the value of inputFormatSorted in the
+  * MapreduceWork to true, but it should only be used if the data is going to a FilterOperator,
+  * which filters by comparing a value in the data with a constant, using one of the comparisons
+  * =, <, >, <=, >=.  If the RecordReader's underlying format is an RCFile, this object can perform
+  * a binary search to find the block to begin reading from, and stop reading once it can be
+  * determined no other entries will match the filter.
+  */
 public abstract class HiveContextAwareRecordReader<K, V> implements RecordReader<K, V> {
 
+  private static final Log LOG = LogFactory.getLog(HiveContextAwareRecordReader.class.getName());
+
   private boolean initDone = false;
+  private long rangeStart;
+  private long rangeEnd;
+  private long splitEnd;
+  private long previousPosition = -1;
+  private boolean wasUsingSortedSearch = false;
+  private String genericUDFClassName = null;
+  private final List<Comparison> stopComparisons = new ArrayList<Comparison>();
+
+  protected RecordReader recordReader;
+  protected JobConf jobConf;
+  protected boolean isSorted = false;
 
-  /**
-   * Reads the next key/value pair from the input for processing.
-   *
-   * @param key the key to read data into
-   * @param value the value to read data into
-   * @return true if a key/value was read, false if at EOF
-   */
-  public abstract boolean doNext(K key, V value) throws IOException;
+  public HiveContextAwareRecordReader(JobConf conf) throws IOException {
+    this(null, conf);
+  }
+
+  public HiveContextAwareRecordReader(RecordReader recordReader) {
+    this.recordReader = recordReader;
+  }
+
+  public HiveContextAwareRecordReader(RecordReader recordReader, JobConf conf)
+      throws IOException {
+    this.recordReader = recordReader;
+    this.jobConf = conf;
+  }
+
+  public void setRecordReader(RecordReader recordReader) {
+    this.recordReader = recordReader;
+  }
 
   /**
    * Close this {@link InputSplit} to future operations.
@@ -120,6 +161,7 @@ public abstract class HiveContextAwareRe
 
   public void initIOContext(FileSplit split, JobConf job,
       Class inputFormatClass, RecordReader recordReader) throws IOException {
+
     boolean blockPointer = false;
     long blockStart = -1;
     FileSplit fileSplit = (FileSplit) split;
@@ -142,5 +184,162 @@ public abstract class HiveContextAwareRe
       in.close();
     }
     this.initIOContext(blockStart, blockPointer, split.getPath().toString());
+
+    this.initIOContextSortedProps(split, recordReader, job);
+  }
+
+  public void initIOContextSortedProps(FileSplit split, RecordReader recordReader, JobConf job) {
+    this.getIOContext().resetSortingValues();
+    this.isSorted = jobConf.getBoolean("hive.input.format.sorted", false);
+
+    this.rangeStart = split.getStart();
+    this.rangeEnd = split.getStart() + split.getLength();
+    this.splitEnd = rangeEnd;
+    if (recordReader instanceof RCFileRecordReader && rangeEnd != 0 && this.isSorted) {
+      // Binary search only works if we know the size of the split, and the recordReader is an
+      // RCFileRecordReader
+      this.getIOContext().setUseSorted(true);
+      this.getIOContext().setIsBinarySearching(true);
+      this.wasUsingSortedSearch = true;
+    } else {
+      // Use the defalut methods for next in the child class
+      this.isSorted = false;
+    }
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    if (this.getIOContext().isBinarySearching()) {
+      return 0;
+    } else {
+      return recordReader.getProgress();
+    }
+  }
+
+  public boolean doNext(K key, V value) throws IOException {
+    if (this.isSorted) {
+      if (this.getIOContext().shouldEndBinarySearch() ||
+          (!this.getIOContext().useSorted() && this.wasUsingSortedSearch)) {
+        beginLinearSearch();
+        this.wasUsingSortedSearch = false;
+        this.getIOContext().setEndBinarySearch(false);
+      }
+
+      if (this.getIOContext().useSorted()) {
+        if (this.genericUDFClassName == null &&
+            this.getIOContext().getGenericUDFClassName() != null) {
+          setGenericUDFClassName(this.getIOContext().getGenericUDFClassName());
+        }
+
+        if (this.getIOContext().isBinarySearching()) {
+          // Proceed with a binary search
+          if (this.getIOContext().getComparison() != null) {
+            switch (this.getIOContext().getComparison()) {
+              case GREATER:
+              case EQUAL:
+                // Indexes have only one entry per value, could go linear from here, if we want to
+                // use this for any sorted table, we'll need to continue the search
+                rangeEnd = previousPosition;
+                break;
+              case LESS:
+                rangeStart = previousPosition;
+                break;
+              default:
+                break;
+            }
+          }
+
+          long position = (rangeStart + rangeEnd) / 2;
+          sync(position);
+
+          long newPosition = getSyncedPosition();
+          // If the newPosition is the same as the previousPosition, we've reached the end of the
+          // binary search, if the new position at least as big as the size of the split, any
+          // matching rows must be in the final block, so we can end the binary search.
+          if (newPosition == previousPosition || newPosition >= splitEnd) {
+            this.getIOContext().setIsBinarySearching(false);
+            sync(rangeStart);
+          }
+
+          previousPosition = newPosition;
+        }  else if (foundAllTargets()) {
+          // Found all possible rows which will not be filtered
+          return false;
+        }
+      }
+    }
+
+    try {
+      return recordReader.next(key,  value);
+    } catch (Exception e) {
+      return HiveIOExceptionHandlerUtil.handleRecordReaderNextException(e, jobConf);
+    }
+  }
+
+  private void sync(long position) throws IOException {
+     ((RCFileRecordReader)recordReader).sync(position);
+     ((RCFileRecordReader)recordReader).resetBuffer();
+  }
+
+  private long getSyncedPosition() throws IOException {
+    return recordReader.getPos();
+  }
+  /**
+   * This uses the name of the generic UDF being used by the filter to determine whether we should
+   * perform a binary search, and what the comparisons we should use to signal the end of the
+   * linear scan are.
+   * @param genericUDFClassName
+   * @throws IOException
+   */
+  private void setGenericUDFClassName(String genericUDFClassName) throws IOException {
+    this.genericUDFClassName = genericUDFClassName;
+    if (genericUDFClassName.equals(GenericUDFOPEqual.class.getName())) {
+      stopComparisons.add(Comparison.GREATER);
+    } else if (genericUDFClassName.equals(GenericUDFOPLessThan.class.getName())) {
+      stopComparisons.add(Comparison.EQUAL);
+      stopComparisons.add(Comparison.GREATER);
+      if (this.getIOContext().isBinarySearching()) {
+        beginLinearSearch();
+      }
+    } else if (genericUDFClassName.equals(GenericUDFOPEqualOrLessThan.class.getName())) {
+      stopComparisons.add(Comparison.GREATER);
+      if (this.getIOContext().isBinarySearching()) {
+        beginLinearSearch();
+      }
+    } else if (genericUDFClassName.equals(GenericUDFOPGreaterThan.class.getName()) ||
+        genericUDFClassName.equals(GenericUDFOPEqualOrGreaterThan.class.getName())) {
+      // Do nothing
+    } else {
+      // This is an unsupported operator
+      LOG.debug(genericUDFClassName + " is not the name of a supported class.  " +
+                "Continuing linearly.");
+      if (this.getIOContext().isBinarySearching()) {
+        beginLinearSearch();
+      }
+    }
+  }
+
+  /**
+   * This should be called after the binary search is finished and before the linear scan begins
+   * @throws IOException
+   */
+  private void beginLinearSearch() throws IOException {
+    sync(rangeStart);
+    this.getIOContext().setIsBinarySearching(false);
+    this.wasUsingSortedSearch = false;
+  }
+
+  /**
+   * Returns true if the current comparison is in the list of stop comparisons, i.e. we've found
+   * all records which won't be filtered
+   * @return
+   */
+  public boolean foundAllTargets() {
+    if (this.getIOContext().getComparison() == null ||
+        !stopComparisons.contains(this.getIOContext().getComparison())) {
+      return false;
+    }
+
+    return true;
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1202525&r1=1202524&r2=1202525&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Wed Nov 16 04:42:03 2011
@@ -33,8 +33,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.io.HiveIOExceptionHandler;
-import org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain;
 import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -174,7 +172,7 @@ public class HiveInputFormat<K extends W
   }
 
   JobConf job;
-  
+
   public void configure(JobConf job) {
     this.job = job;
   }
@@ -207,7 +205,7 @@ public class HiveInputFormat<K extends W
       Reporter reporter) throws IOException {
 
     HiveInputSplit hsplit = (HiveInputSplit) split;
-    
+
     InputSplit inputSplit = hsplit.getInputSplit();
     String inputFormatClassName = null;
     Class inputFormatClass = null;
@@ -249,7 +247,7 @@ public class HiveInputFormat<K extends W
     rr.initIOContext(hsplit, job, inputFormatClass, innerReader);
     return rr;
   }
-  
+
   protected Map<String, PartitionDesc> pathToPartitionInfo;
   MapredWork mrwork = null;
 
@@ -382,11 +380,11 @@ public class HiveInputFormat<K extends W
     if (this.mrwork == null) {
       init(job);
     }
-    
+
     if(this.mrwork.getPathToAliases() == null) {
       return;
     }
-    
+
     ArrayList<String> aliases = new ArrayList<String>();
     Iterator<Entry<String, ArrayList<String>>> iterator = this.mrwork
         .getPathToAliases().entrySet().iterator();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java?rev=1202525&r1=1202524&r2=1202525&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java Wed Nov 16 04:42:03 2011
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.io;
 
 import java.io.IOException;
 
-import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
 import org.apache.hadoop.hive.ql.exec.ExecMapper;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -34,21 +33,19 @@ import org.apache.hadoop.mapred.RecordRe
 public class HiveRecordReader<K extends WritableComparable, V extends Writable>
     extends HiveContextAwareRecordReader<K, V> {
 
-  private final RecordReader recordReader;
-  
-  private JobConf jobConf;
+
 
   public HiveRecordReader(RecordReader recordReader)
       throws IOException {
-    this.recordReader = recordReader;
+    super(recordReader);
   }
-  
+
   public HiveRecordReader(RecordReader recordReader, JobConf conf)
       throws IOException {
-    this.recordReader = recordReader;
-    this.jobConf = conf;
+    super(recordReader, conf);
   }
 
+  @Override
   public void doClose() throws IOException {
     recordReader.close();
   }
@@ -65,7 +62,12 @@ public class HiveRecordReader<K extends 
     return recordReader.getPos();
   }
 
+  @Override
   public float getProgress() throws IOException {
+    if (isSorted) {
+      return super.getProgress();
+    }
+
     return recordReader.getProgress();
   }
 
@@ -74,11 +76,7 @@ public class HiveRecordReader<K extends 
     if (ExecMapper.getDone()) {
       return false;
     }
-    try {
-      return recordReader.next(key, value);
-    } catch (Exception e) {
-      return HiveIOExceptionHandlerUtil.handleRecordReaderNextException(e, jobConf);
-    }
+    return super.doNext(key, value);
   }
 
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=1202525&r1=1202524&r2=1202525&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java Wed Nov 16 04:42:03 2011
@@ -45,6 +45,24 @@ public class IOContext {
   boolean isBlockPointer;
   boolean ioExceptions;
 
+  // Are we using the fact the input is sorted
+  boolean useSorted = false;
+  // Are we currently performing a binary search
+  boolean isBinarySearching = false;
+  // Do we want to end the binary search
+  boolean endBinarySearch = false;
+  // The result of the comparison of the last row processed
+  Comparison comparison = null;
+  // The class name of the generic UDF being used by the filter
+  String genericUDFClassName = null;
+
+  public static enum Comparison {
+    GREATER,
+    LESS,
+    EQUAL,
+    UNKNOWN
+  }
+
   String inputFile;
 
   public IOContext() {
@@ -102,4 +120,69 @@ public class IOContext {
   public boolean getIOExceptions() {
     return ioExceptions;
   }
+
+  public boolean useSorted() {
+    return useSorted;
+  }
+
+  public void setUseSorted(boolean useSorted) {
+    this.useSorted = useSorted;
+  }
+
+  public boolean isBinarySearching() {
+    return isBinarySearching;
+  }
+
+  public void setIsBinarySearching(boolean isBinarySearching) {
+    this.isBinarySearching = isBinarySearching;
+  }
+
+  public boolean shouldEndBinarySearch() {
+    return endBinarySearch;
+  }
+
+  public void setEndBinarySearch(boolean endBinarySearch) {
+    this.endBinarySearch = endBinarySearch;
+  }
+
+  public Comparison getComparison() {
+    return comparison;
+  }
+
+  public void setComparison(Integer comparison) {
+    if (comparison == null && this.isBinarySearching) {
+      // Nothing we can do here, so just proceed normally from now on
+      endBinarySearch = true;
+    } else {
+      if (comparison == null) {
+        this.comparison = Comparison.UNKNOWN;
+      } else if (comparison.intValue() < 0) {
+        this.comparison = Comparison.LESS;
+      } else if (comparison.intValue() > 0) {
+        this.comparison = Comparison.GREATER;
+      } else {
+        this.comparison = Comparison.EQUAL;
+      }
+    }
+  }
+
+  public String getGenericUDFClassName() {
+    return genericUDFClassName;
+  }
+
+  public void setGenericUDFClassName(String genericUDFClassName) {
+    this.genericUDFClassName = genericUDFClassName;
+  }
+
+  /**
+   * The thread local IOContext is static, we may need to restart the search if, for instance,
+   * multiple files are being searched as part of a CombinedHiveRecordReader
+   */
+  public void resetSortingValues() {
+    this.useSorted = false;
+    this.isBinarySearching = false;
+    this.endBinarySearch = false;
+    this.comparison = null;
+    this.genericUDFClassName = null;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=1202525&r1=1202524&r2=1202525&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Wed Nov 16 04:42:03 2011
@@ -43,12 +43,12 @@ import org.apache.hadoop.hive.serde2.col
 import org.apache.hadoop.hive.serde2.columnar.LazyDecompressionCallback;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile.Metadata;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.VersionMismatchException;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.SequenceFile.Metadata;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionInputStream;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
@@ -61,7 +61,7 @@ import org.apache.hadoop.util.Reflection
  * <code>RCFile</code>s, short of Record Columnar File, are flat files
  * consisting of binary key/value pairs, which shares much similarity with
  * <code>SequenceFile</code>.
- * 
+ *
  * RCFile stores columns of a table in a record columnar way. It first
  * partitions rows horizontally into row splits. and then it vertically
  * partitions each row split in a columnar way. RCFile first stores the meta
@@ -75,7 +75,7 @@ import org.apache.hadoop.util.Reflection
  * <code>RCFile</code> provides {@link Writer}, {@link Reader} and classes for
  * writing, reading respectively.
  * </p>
- * 
+ *
  * <p>
  * RCFile stores columns of a table in a record columnar way. It first
  * partitions rows horizontally into row splits. and then it vertically
@@ -83,21 +83,21 @@ import org.apache.hadoop.util.Reflection
  * data of a row split, as the key part of a record, and all the data of a row
  * split as the value part.
  * </p>
- * 
+ *
  * <p>
  * RCFile compresses values in a more fine-grained manner then record level
  * compression. However, It currently does not support compress the key part
  * yet. The actual compression algorithm used to compress key and/or values can
  * be specified by using the appropriate {@link CompressionCodec}.
  * </p>
- * 
+ *
  * <p>
  * The {@link Reader} is used to read and explain the bytes of RCFile.
  * </p>
- * 
+ *
  * <h4 id="Formats">RCFile Formats</h4>
- * 
- * 
+ *
+ *
  * <h5 id="Header">RC Header</h5>
  * <ul>
  * <li>version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of
@@ -113,7 +113,7 @@ import org.apache.hadoop.util.Reflection
  * <li>metadata - {@link Metadata} for this file.</li>
  * <li>sync - A sync marker to denote end of the header.</li>
  * </ul>
- * 
+ *
  * <h5>RCFile Format</h5>
  * <ul>
  * <li><a href="#Header">Header</a></li>
@@ -143,7 +143,7 @@ import org.apache.hadoop.util.Reflection
  * </ul>
  * </li>
  * </ul>
- * 
+ *
  */
 public class RCFile {
 
@@ -177,7 +177,7 @@ public class RCFile {
   /**
    * KeyBuffer is the key of each record in RCFile. Its on-disk layout is as
    * below:
-   * 
+   *
    * <ul>
    * <li>record length in bytes,it is the sum of bytes used to store the key
    * part and the value part.</li>
@@ -204,7 +204,7 @@ public class RCFile {
     private int numberRows = 0;
     // how many columns
     private int columnNumber = 0;
-    
+
     // return the number of columns recorded in this file's header
     public int getColumnNumber() {
       return columnNumber;
@@ -227,7 +227,7 @@ public class RCFile {
 
     /**
      * add in a new column's meta data.
-     * 
+     *
      * @param columnValueLen
      *          this total bytes number of this column's values in this split
      * @param colValLenBuffer
@@ -277,7 +277,7 @@ public class RCFile {
 
     /**
      * get number of bytes to store the keyBuffer.
-     * 
+     *
      * @return number of bytes used to store this KeyBuffer on disk
      * @throws IOException
      */
@@ -370,7 +370,7 @@ public class RCFile {
     Decompressor valDecompressor = null;
     NonSyncDataInputBuffer decompressBuffer = new NonSyncDataInputBuffer();
     CompressionInputStream deflatFilter = null;
-    
+
     public ValueBuffer() throws IOException {
     }
 
@@ -522,7 +522,7 @@ public class RCFile {
         CodecPool.returnDecompressor(valDecompressor);
       }
     }
-    
+
     @Override
     public int compareTo(Object arg0) {
       throw new RuntimeException("compareTo not supported in class "
@@ -533,7 +533,7 @@ public class RCFile {
   /**
    * Write KeyBuffer/ValueBuffer pairs to a RCFile. RCFile's format is
    * compatible with SequenceFile's.
-   * 
+   *
    */
   public static class Writer {
 
@@ -664,7 +664,7 @@ public class RCFile {
 
     /**
      * Constructs a RCFile Writer.
-     * 
+     *
      * @param fs
      *          the file system used
      * @param conf
@@ -680,7 +680,7 @@ public class RCFile {
 
     /**
      * Constructs a RCFile Writer.
-     * 
+     *
      * @param fs
      *          the file system used
      * @param conf
@@ -699,9 +699,9 @@ public class RCFile {
     }
 
     /**
-     * 
+     *
      * Constructs a RCFile Writer.
-     * 
+     *
      * @param fs
      *          the file system used
      * @param conf
@@ -837,7 +837,7 @@ public class RCFile {
      * column number in the file, zero bytes are appended for the empty columns.
      * If its size() is greater then the column number in the file, the exceeded
      * columns' bytes are ignored.
-     * 
+     *
      * @param val
      * @throws IOException
      */
@@ -936,7 +936,7 @@ public class RCFile {
       bufferedRecords = 0;
       columnBufferSize = 0;
     }
-    
+
     /**
      * flush a block out without doing anything except compressing the key part.
      */
@@ -945,7 +945,7 @@ public class RCFile {
       checkAndWriteSync(); // sync
       out.writeInt(recordLen); // total record length
       out.writeInt(keyLength); // key portion length
-      
+
       if(this.isCompressed()) {
         //compress key and write key out
         keyCompressionBuffer.reset();
@@ -1001,7 +1001,7 @@ public class RCFile {
 
   /**
    * Read KeyBuffer/ValueBuffer pairs from a RCFile.
-   * 
+   *
    */
   public static class Reader {
     private static class SelectedColumn {
@@ -1032,7 +1032,7 @@ public class RCFile {
 
     private final ValueBuffer currentValue;
 
-    private boolean[] skippedColIDs = null;
+    private final boolean[] skippedColIDs = null;
 
     private int readRowsIndexInBuffer = 0;
 
@@ -1242,7 +1242,7 @@ public class RCFile {
 
     /**
      * Set the current byte position in the input file.
-     * 
+     *
      * <p>
      * The position passed must be a position returned by
      * {@link RCFile.Writer#getLength()} when writing this file. To seek to an
@@ -1254,6 +1254,18 @@ public class RCFile {
       in.seek(position);
     }
 
+    /**
+     * Resets the values which determine if there are more rows in the buffer
+     *
+     * This can be used after one calls seek or sync, if one called next before that.
+     * Otherwise, the seek or sync will have no effect, it will continue to get rows from the
+     * buffer built up from the call to next.
+     */
+    public synchronized void resetBuffer() {
+      readRowsIndexInBuffer = 0;
+      recordsNumInValBuffer = 0;
+    }
+
     /** Seek to the next sync mark past a given position. */
     public synchronized void sync(long position) throws IOException {
       if (position + SYNC_SIZE >= end) {
@@ -1309,7 +1321,7 @@ public class RCFile {
     /**
      * Read and return the next record length, potentially skipping over a sync
      * block.
-     * 
+     *
      * @return the length of the next record or -1 if there is no next record
      * @throws IOException
      */
@@ -1407,7 +1419,7 @@ public class RCFile {
       currentValue.readFields(in);
       currentValue.inited = true;
     }
-    
+
     public boolean nextBlock() throws IOException {
       int keyLength = nextKeyBuffer();
       if(keyLength > 0) {
@@ -1430,7 +1442,7 @@ public class RCFile {
      * Calling getColumn() with not change the result of
      * {@link #next(LongWritable)} and
      * {@link #getCurrentRow(BytesRefArrayWritable)}.
-     * 
+     *
      * @param columnID
      * @throws IOException
      */
@@ -1459,7 +1471,7 @@ public class RCFile {
       ValueBuffer.LazyDecompressionCallbackImpl decompCallBack = null;
       boolean decompressed = currentValue.decompressedFlag[selColIdx];
       if (decompressed) {
-        uncompData = 
+        uncompData =
               currentValue.loadedColumnsValueBuffer[selColIdx].getData();
       } else {
         decompCallBack = currentValue.lazyDecompressCallbackObjs[selColIdx];
@@ -1485,7 +1497,7 @@ public class RCFile {
      * current value buffer. It will influence the result of
      * {@link #next(LongWritable)} and
      * {@link #getCurrentRow(BytesRefArrayWritable)}
-     * 
+     *
      * @return whether there still has records or not
      * @throws IOException
      */
@@ -1500,7 +1512,7 @@ public class RCFile {
      * of rows passed by, because {@link #seek(long)},
      * {@link #nextColumnsBatch()} can change the underlying key buffer and
      * value buffer.
-     * 
+     *
      * @return next row number
      * @throws IOException
      */
@@ -1567,7 +1579,7 @@ public class RCFile {
     /**
      * get the current row used,make sure called {@link #next(LongWritable)}
      * first.
-     * 
+     *
      * @throws IOException
      */
     public synchronized void getCurrentRow(BytesRefArrayWritable ret) throws IOException {
@@ -1596,11 +1608,11 @@ public class RCFile {
         for (int j = 0; j < selectedColumns.length; ++j) {
           SelectedColumn col = selectedColumns[j];
           int i = col.colIndex;
-  
+
           BytesRefWritable ref = ret.unCheckedGet(i);
-  
+
           colAdvanceRow(j, col);
-  
+
           if (currentValue.decompressedFlag[j]) {
             ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
                 col.rowReadIndex, col.prvLength);
@@ -1611,14 +1623,14 @@ public class RCFile {
           col.rowReadIndex += col.prvLength;
         }
       } else {
-        // This version of the loop eliminates a condition check and branch 
+        // This version of the loop eliminates a condition check and branch
         // and is measurably faster (20% or so)
         for (int j = 0; j < selectedColumns.length; ++j) {
           SelectedColumn col = selectedColumns[j];
           int i = col.colIndex;
-  
+
           BytesRefWritable ref = ret.unCheckedGet(i);
-  
+
           colAdvanceRow(j, col);
           ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
                 col.rowReadIndex, col.prvLength);
@@ -1666,7 +1678,7 @@ public class RCFile {
     public String toString() {
       return file.toString();
     }
-    
+
     public boolean isCompressedRCFile() {
       return this.decompress;
     }
@@ -1689,7 +1701,7 @@ public class RCFile {
     public KeyBuffer getCurrentKeyBufferObj() {
       return this.currentKey;
     }
-    
+
     /**
      * return the ValueBuffer object used in the reader. Internally in each
      * reader, there is only one ValueBuffer object, which gets reused for every
@@ -1698,7 +1710,7 @@ public class RCFile {
     public ValueBuffer getCurrentValueBufferObj() {
       return this.currentValue;
     }
-    
+
     //return the current block's length
     public int getCurrentBlockLength() {
       return this.currentRecordLength;
@@ -1713,11 +1725,11 @@ public class RCFile {
     public int getCurrentCompressedKeyLen() {
       return this.compressedKeyLen;
     }
-    
+
     //return the CompressionCodec used for this file
     public CompressionCodec getCompressionCodec() {
       return this.codec;
     }
-    
+
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java?rev=1202525&r1=1202524&r2=1202525&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java Wed Nov 16 04:42:03 2011
@@ -129,6 +129,14 @@ public class RCFileRecordReader<K extend
     in.seek(pos);
   }
 
+  public void sync(long pos) throws IOException {
+    in.sync(pos);
+  }
+
+  public void resetBuffer() {
+    in.resetBuffer();
+  }
+
   public long getStart() {
     return start;
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java?rev=1202525&r1=1202524&r2=1202525&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java Wed Nov 16 04:42:03 2011
@@ -66,6 +66,8 @@ public class ExprNodeGenericFuncDesc ext
    * the canonical type information for this NodeDesc.
    */
   private ObjectInspector writableObjectInspector;
+  //Is this an expression that should perform a comparison for sorted searches
+  private boolean isSortedExpr;
 
   public ExprNodeGenericFuncDesc() {
   }
@@ -247,4 +249,12 @@ public class ExprNodeGenericFuncDesc ext
     return true;
   }
 
+  public boolean isSortedExpr() {
+    return isSortedExpr;
+  }
+
+  public void setSortedExpr(boolean isSortedExpr) {
+    this.isSortedExpr = isSortedExpr;
+  }
+
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java?rev=1202525&r1=1202524&r2=1202525&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java Wed Nov 16 04:42:03 2011
@@ -68,6 +68,8 @@ public class FilterDesc implements Seria
   private org.apache.hadoop.hive.ql.plan.ExprNodeDesc predicate;
   private boolean isSamplingPred;
   private transient sampleDesc sampleDescr;
+  //Is this a filter that should perform a comparison for sorted searches
+  private boolean isSortedFilter;
 
   public FilterDesc() {
   }
@@ -116,4 +118,12 @@ public class FilterDesc implements Seria
     this.sampleDescr = sampleDescr;
   }
 
+  public boolean isSortedFilter() {
+    return isSortedFilter;
+  }
+
+  public void setSortedFilter(boolean isSortedFilter) {
+    this.isSortedFilter = isSortedFilter;
+  }
+
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java?rev=1202525&r1=1202524&r2=1202525&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java Wed Nov 16 04:42:03 2011
@@ -87,6 +87,9 @@ public class MapredWork implements Seria
 
   private boolean mapperCannotSpanPartns;
 
+  // used to indicate the input is sorted, and so a BinarySearchRecordReader shoudl be used
+  private boolean inputFormatSorted = false;
+
   public MapredWork() {
     aliasToPartnInfo = new LinkedHashMap<String, PartitionDesc>();
   }
@@ -437,6 +440,14 @@ public class MapredWork implements Seria
     this.opParseCtxMap = opParseCtxMap;
   }
 
+  public boolean isInputFormatSorted() {
+    return inputFormatSorted;
+  }
+
+  public void setInputFormatSorted(boolean inputFormatSorted) {
+    this.inputFormatSorted = inputFormatSorted;
+  }
+
   public void resolveDynamicPartitionMerge(HiveConf conf, Path path,
       TableDesc tblDesc, ArrayList<String> aliases, PartitionDesc partDesc) {
     pathToAliases.put(path.toString(), aliases);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBaseCompare.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBaseCompare.java?rev=1202525&r1=1202524&r2=1202525&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBaseCompare.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBaseCompare.java Wed Nov 16 04:42:03 2011
@@ -22,11 +22,13 @@ import org.apache.hadoop.hive.ql.exec.De
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils.ReturnObjectInspectorResolver;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
@@ -159,6 +161,34 @@ public abstract class GenericUDFBaseComp
 
   }
 
+  public Integer compare(DeferredObject[] arguments) throws HiveException {
+    Object o0,o1;
+    o0 = arguments[0].get();
+    if (o0 == null) {
+      return null;
+    }
+    o1 = arguments[1].get();
+    if (o1 == null) {
+      return null;
+    }
+
+    if (compareType == CompareType.NEED_CONVERT) {
+      Object converted_o0 = converter0.convert(o0);
+      if (converted_o0 == null) {
+        return null;
+      }
+      Object converted_o1 = converter1.convert(o1);
+      if (converted_o1 == null) {
+        return null;
+      }
+      return ObjectInspectorUtils.compare(
+          converted_o0, compareOI,
+          converted_o1, compareOI);
+    } else {
+      return ObjectInspectorUtils.compare(
+          o0, argumentOIs[0], o1, argumentOIs[1]);
+    }
+  }
 
   @Override
   public String getDisplayString(String[] children) {

Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyHiveSortedInputFormatUsedHook.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyHiveSortedInputFormatUsedHook.java?rev=1202525&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyHiveSortedInputFormatUsedHook.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyHiveSortedInputFormatUsedHook.java Wed Nov 16 04:42:03 2011
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.hooks.HookContext.HookType;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+
+public class VerifyHiveSortedInputFormatUsedHook implements ExecuteWithHookContext {
+
+  public void run(HookContext hookContext) {
+    if (hookContext.getHookType().equals(HookType.POST_EXEC_HOOK)) {
+
+      // Go through the root tasks, and verify the input format of the map reduce task(s) is
+      // HiveSortedInputFormat
+      ArrayList<Task<? extends Serializable>> rootTasks =
+          hookContext.getQueryPlan().getRootTasks();
+      for (Task<? extends Serializable> rootTask : rootTasks) {
+        if (rootTask.getWork() instanceof MapredWork) {
+          Assert.assertTrue("The root map reduce task's input was not marked as sorted.",
+              ((MapredWork)rootTask.getWork()).isInputFormatSorted());
+        }
+      }
+    }
+  }
+}

Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java?rev=1202525&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java Wed Nov 16 04:42:03 2011
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.mockito.InOrder;
+
+/**
+ * TestHiveBinarySearchRecordReader.
+ *
+ */
+public class TestHiveBinarySearchRecordReader extends TestCase {
+
+  private RCFileRecordReader rcfReader;
+  private JobConf conf;
+  private TestHiveInputSplit hiveSplit;
+  private HiveContextAwareRecordReader hbsReader;
+  private IOContext ioContext;
+
+  private static class TestHiveInputSplit extends HiveInputSplit {
+
+    @Override
+    public long getStart() {
+      return 0;
+    }
+
+    @Override
+    public long getLength() {
+      return 100;
+    }
+
+    @Override
+    public Path getPath() {
+      return new Path("/");
+    }
+  }
+
+  private static class TestHiveRecordReader<K extends WritableComparable, V extends Writable>
+      extends HiveContextAwareRecordReader<K, V> {
+
+    public TestHiveRecordReader(RecordReader recordReader, JobConf conf) throws IOException {
+      super(recordReader, conf);
+    }
+
+    @Override
+    public K createKey() {
+      return null;
+    }
+
+    @Override
+    public V createValue() {
+      return null;
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return 0;
+    }
+
+    @Override
+    public boolean doNext(K key, V value) throws IOException {
+      return super.doNext(key, value);
+    }
+
+    @Override
+    public void doClose() throws IOException {
+
+    }
+
+  }
+
+  private void resetIOContext() {
+    ioContext = IOContext.get();
+    ioContext.setUseSorted(false);
+    ioContext.setIsBinarySearching(false);
+    ioContext.setEndBinarySearch(false);
+    ioContext.setComparison(null);
+    ioContext.setGenericUDFClassName(null);
+  }
+
+  private void init() throws IOException {
+    resetIOContext();
+    rcfReader = mock(RCFileRecordReader.class);
+    when(rcfReader.next((LongWritable)anyObject(),
+                        (BytesRefArrayWritable )anyObject())).thenReturn(true);
+    // Since the start is 0, and the length is 100, the first call to sync should be with the value
+    // 50 so return that for getPos()
+    when(rcfReader.getPos()).thenReturn(50L);
+    conf = new JobConf();
+    conf.setBoolean("hive.input.format.sorted", true);
+    hiveSplit = new TestHiveInputSplit();
+    hbsReader = new TestHiveRecordReader(rcfReader, conf);
+    hbsReader.initIOContext(hiveSplit, conf, Class.class, rcfReader);
+  }
+
+  private boolean executeDoNext(HiveContextAwareRecordReader hbsReader) throws IOException {
+     return hbsReader.next(hbsReader.createKey(), hbsReader.createValue());
+  }
+
+  public void testNonLinearGreaterThan() throws Exception {
+    init();
+    Assert.assertTrue(executeDoNext(hbsReader));
+    verify(rcfReader).sync(50);
+
+    ioContext.setComparison(1);
+    when(rcfReader.getPos()).thenReturn(25L);
+
+    // By setting the comparison to greater, the search should use the block [0, 50]
+    Assert.assertTrue(executeDoNext(hbsReader));
+    verify(rcfReader).sync(25);
+  }
+
+  public void testNonLinearLessThan() throws Exception {
+    init();
+    Assert.assertTrue(executeDoNext(hbsReader));
+    verify(rcfReader).sync(50);
+
+    ioContext.setComparison(-1);
+    when(rcfReader.getPos()).thenReturn(75L);
+
+    // By setting the comparison to less, the search should use the block [50, 100]
+    Assert.assertTrue(executeDoNext(hbsReader));
+    verify(rcfReader).sync(75);
+  }
+
+  public void testNonLinearEqualTo() throws Exception {
+    init();
+    Assert.assertTrue(executeDoNext(hbsReader));
+    verify(rcfReader).sync(50);
+
+    ioContext.setComparison(0);
+    when(rcfReader.getPos()).thenReturn(25L);
+
+    // By setting the comparison to equal, the search should use the block [0, 50]
+    Assert.assertTrue(executeDoNext(hbsReader));
+    verify(rcfReader).sync(25);
+  }
+
+  public void testHitLastBlock() throws Exception {
+    init();
+    Assert.assertTrue(executeDoNext(hbsReader));
+    verify(rcfReader).sync(50);
+
+    ioContext.setComparison(-1);
+    when(rcfReader.getPos()).thenReturn(100L);
+
+    // When sync is called it will return 100, the value signaling the end of the file, this should
+    // result in a call to sync to the beginning of the block it was searching [50, 100], and it
+    // should continue normally
+    Assert.assertTrue(executeDoNext(hbsReader));
+    InOrder inOrder = inOrder(rcfReader);
+    inOrder.verify(rcfReader).sync(75);
+    inOrder.verify(rcfReader).sync(50);
+    Assert.assertFalse(ioContext.isBinarySearching());
+  }
+
+  public void testHitSamePositionTwice() throws Exception {
+    init();
+    Assert.assertTrue(executeDoNext(hbsReader));
+    verify(rcfReader).sync(50);
+
+    ioContext.setComparison(1);
+
+    // When getPos is called it should return the same value, signaling the end of the search, so
+    // the search should continue linearly and it should sync to the beginning of the block [0, 50]
+    Assert.assertTrue(executeDoNext(hbsReader));
+    InOrder inOrder = inOrder(rcfReader);
+    inOrder.verify(rcfReader).sync(25);
+    inOrder.verify(rcfReader).sync(0);
+    Assert.assertFalse(ioContext.isBinarySearching());
+  }
+
+  public void testResetRange() throws Exception {
+    init();
+    InOrder inOrder = inOrder(rcfReader);
+    Assert.assertTrue(executeDoNext(hbsReader));
+    inOrder.verify(rcfReader).sync(50);
+
+    ioContext.setComparison(-1);
+    when(rcfReader.getPos()).thenReturn(75L);
+
+    Assert.assertTrue(executeDoNext(hbsReader));
+    inOrder.verify(rcfReader).sync(75);
+
+    ioContext.setEndBinarySearch(true);
+
+    // This should make the search linear, sync to the beginning of the block being searched
+    // [50, 100], set the comparison to be null, and the flag to reset the range should be unset
+    Assert.assertTrue(executeDoNext(hbsReader));
+    inOrder.verify(rcfReader).sync(50);
+    Assert.assertFalse(ioContext.isBinarySearching());
+    Assert.assertFalse(ioContext.shouldEndBinarySearch());
+  }
+
+  public void testEqualOpClass() throws Exception {
+    init();
+    ioContext.setGenericUDFClassName(GenericUDFOPEqual.class.getName());
+    Assert.assertTrue(ioContext.isBinarySearching());
+    Assert.assertTrue(executeDoNext(hbsReader));
+    ioContext.setIsBinarySearching(false);
+    ioContext.setComparison(-1);
+    Assert.assertTrue(executeDoNext(hbsReader));
+    ioContext.setComparison(0);
+    Assert.assertTrue(executeDoNext(hbsReader));
+    ioContext.setComparison(1);
+    Assert.assertFalse(executeDoNext(hbsReader));
+  }
+
+  public void testLessThanOpClass() throws Exception {
+    init();
+    ioContext.setGenericUDFClassName(GenericUDFOPLessThan.class.getName());
+    Assert.assertTrue(executeDoNext(hbsReader));
+    Assert.assertFalse(ioContext.isBinarySearching());
+    ioContext.setComparison(-1);
+    Assert.assertTrue(executeDoNext(hbsReader));
+    ioContext.setComparison(0);
+    Assert.assertFalse(executeDoNext(hbsReader));
+    ioContext.setComparison(1);
+    Assert.assertFalse(executeDoNext(hbsReader));
+  }
+
+  public void testLessThanOrEqualOpClass() throws Exception {
+    init();
+    ioContext.setGenericUDFClassName(GenericUDFOPEqualOrLessThan.class.getName());
+    Assert.assertTrue(executeDoNext(hbsReader));
+    Assert.assertFalse(ioContext.isBinarySearching());
+    ioContext.setComparison(-1);
+    Assert.assertTrue(executeDoNext(hbsReader));
+    ioContext.setComparison(0);
+    Assert.assertTrue(executeDoNext(hbsReader));
+    ioContext.setComparison(1);
+    Assert.assertFalse(executeDoNext(hbsReader));
+  }
+
+  public void testGreaterThanOpClass() throws Exception {
+    init();
+    ioContext.setGenericUDFClassName(GenericUDFOPGreaterThan.class.getName());
+    Assert.assertTrue(ioContext.isBinarySearching());
+    Assert.assertTrue(executeDoNext(hbsReader));
+    ioContext.setIsBinarySearching(false);
+    ioContext.setComparison(-1);
+    Assert.assertTrue(executeDoNext(hbsReader));
+    ioContext.setComparison(0);
+    Assert.assertTrue(executeDoNext(hbsReader));
+    ioContext.setComparison(1);
+    Assert.assertTrue(executeDoNext(hbsReader));
+  }
+
+  public void testGreaterThanOrEqualOpClass() throws Exception {
+    init();
+    ioContext.setGenericUDFClassName(GenericUDFOPEqualOrGreaterThan.class.getName());
+    Assert.assertTrue(ioContext.isBinarySearching());
+    Assert.assertTrue(executeDoNext(hbsReader));
+    ioContext.setIsBinarySearching(false);
+    ioContext.setComparison(-1);
+    Assert.assertTrue(executeDoNext(hbsReader));
+    ioContext.setComparison(0);
+    Assert.assertTrue(executeDoNext(hbsReader));
+    ioContext.setComparison(1);
+    Assert.assertTrue(executeDoNext(hbsReader));
+  }
+
+  public static void main(String[] args) throws Exception {
+    new TestHiveBinarySearchRecordReader().testNonLinearGreaterThan();
+    new TestHiveBinarySearchRecordReader().testNonLinearLessThan();
+    new TestHiveBinarySearchRecordReader().testNonLinearEqualTo();
+    new TestHiveBinarySearchRecordReader().testHitLastBlock();
+    new TestHiveBinarySearchRecordReader().testHitSamePositionTwice();
+    new TestHiveBinarySearchRecordReader().testResetRange();
+    new TestHiveBinarySearchRecordReader().testEqualOpClass();
+    new TestHiveBinarySearchRecordReader().testLessThanOpClass();
+    new TestHiveBinarySearchRecordReader().testLessThanOrEqualOpClass();
+    new TestHiveBinarySearchRecordReader().testGreaterThanOpClass();
+    new TestHiveBinarySearchRecordReader().testGreaterThanOrEqualOpClass();
+  }
+}

Added: hive/trunk/ql/src/test/queries/clientpositive/index_compact_binary_search.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/index_compact_binary_search.q?rev=1202525&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/index_compact_binary_search.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/index_compact_binary_search.q Wed Nov 16 04:42:03 2011
@@ -0,0 +1,132 @@
+SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+SET hive.default.fileformat=TextFile;
+
+CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD;
+ALTER INDEX src_index ON src REBUILD;
+
+SET hive.optimize.index.filter=true;
+SET hive.optimize.index.filter.compact.minsize=1;
+SET hive.index.compact.binary.search=true;
+
+SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyHiveSortedInputFormatUsedHook;
+
+SELECT * FROM src WHERE key = '0';
+
+SELECT * FROM src WHERE key < '1';
+
+SELECT * FROM src WHERE key <= '0';
+
+SELECT * FROM src WHERE key > '8';
+
+SELECT * FROM src WHERE key >= '9';
+
+SET hive.exec.post.hooks=;
+
+DROP INDEX src_index ON src;
+
+SET hive.default.fileformat=RCFILE;
+
+CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD;
+ALTER INDEX src_index ON src REBUILD;
+
+SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyHiveSortedInputFormatUsedHook;
+
+SELECT * FROM src WHERE key = '0';
+
+SELECT * FROM src WHERE key < '1';
+
+SELECT * FROM src WHERE key <= '0';
+
+SELECT * FROM src WHERE key > '8';
+
+SELECT * FROM src WHERE key >= '9';
+
+SET hive.exec.post.hooks=;
+
+DROP INDEX src_index ON src;
+
+SET hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+SET hive.default.fileformat=TextFile;
+
+CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD;
+ALTER INDEX src_index ON src REBUILD;
+
+SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyHiveSortedInputFormatUsedHook;
+
+SELECT * FROM src WHERE key = '0';
+
+SELECT * FROM src WHERE key < '1';
+
+SELECT * FROM src WHERE key <= '0';
+
+SELECT * FROM src WHERE key > '8';
+
+SELECT * FROM src WHERE key >= '9';
+
+SET hive.exec.post.hooks=;
+
+DROP INDEX src_index ON src;
+
+SET hive.default.fileformat=RCFILE;
+
+CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD;
+ALTER INDEX src_index ON src REBUILD;
+
+SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyHiveSortedInputFormatUsedHook;
+
+SELECT * FROM src WHERE key = '0';
+
+SELECT * FROM src WHERE key < '1';
+
+SELECT * FROM src WHERE key <= '0';
+
+SELECT * FROM src WHERE key > '8';
+
+SELECT * FROM src WHERE key >= '9';
+
+SET hive.exec.post.hooks=;
+
+DROP INDEX src_index ON src;
+
+SET hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
+SET hive.default.fileformat=TextFile;
+
+CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD;
+ALTER INDEX src_index ON src REBUILD;
+
+SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyHiveSortedInputFormatUsedHook;
+
+SELECT * FROM src WHERE key = '0';
+
+SELECT * FROM src WHERE key < '1';
+
+SELECT * FROM src WHERE key <= '0';
+
+SELECT * FROM src WHERE key > '8';
+
+SELECT * FROM src WHERE key >= '9';
+
+SET hive.exec.post.hooks=;
+
+DROP INDEX src_index ON src;
+
+SET hive.default.fileformat=RCFILE;
+
+CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD;
+ALTER INDEX src_index ON src REBUILD;
+
+SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyHiveSortedInputFormatUsedHook;
+
+SELECT * FROM src WHERE key = '0';
+
+SELECT * FROM src WHERE key < '1';
+
+SELECT * FROM src WHERE key <= '0';
+
+SELECT * FROM src WHERE key > '8';
+
+SELECT * FROM src WHERE key >= '9';
+
+SET hive.exec.post.hooks=;
+
+DROP INDEX src_index ON src;
\ No newline at end of file



Mime
View raw message