hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1669775 [14/35] - in /hive/branches/spark: ./ ant/src/org/apache/hadoop/hive/ant/ beeline/src/java/org/apache/hive/beeline/ common/src/java/org/apache/hadoop/hive/common/ common/src/java/org/apache/hadoop/hive/common/type/ common/src/java/...
Date Sat, 28 Mar 2015 14:03:49 GMT
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java Sat Mar 28 14:03:43 2015
@@ -18,17 +18,24 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
+import java.util.Arrays;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
+import org.apache.hadoop.hive.ql.exec.JoinUtil;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer.ReusableGetAdaptor;
+import org.apache.hadoop.hive.ql.exec.persistence.ObjectContainer;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
@@ -38,6 +45,7 @@ import org.apache.hadoop.hive.ql.plan.Ma
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.DataOutputBuffer;
 
 /**
  * The vectorized version of the MapJoinOperator.
@@ -56,7 +64,7 @@ public class VectorMapJoinOperator exten
 
   private VectorExpression[] bigTableFilterExpressions;
   private VectorExpression[] bigTableValueExpressions;
-  
+
   private VectorizationContext vOutContext;
 
   // The above members are initialized by the constructor and must not be
@@ -64,6 +72,7 @@ public class VectorMapJoinOperator exten
   //---------------------------------------------------------------------------
 
   private transient VectorizedRowBatch outputBatch;
+  private transient VectorizedRowBatch scratchBatch;  // holds restored (from disk) big table rows
   private transient VectorExpressionWriter[] valueWriters;
   private transient Map<ObjectInspector, VectorColumnAssign[]> outputVectorAssigners;
 
@@ -76,7 +85,11 @@ public class VectorMapJoinOperator exten
   private transient VectorExpressionWriter[] keyOutputWriters;
 
   private transient VectorizedRowBatchCtx vrbCtx = null;
-  
+
+  private transient int tag;  // big table alias
+  private VectorExpressionWriter[] rowWriters;  // Writer for producing row from input batch
+  protected transient Object[] singleRow;
+
   public VectorMapJoinOperator() {
     super();
   }
@@ -112,9 +125,22 @@ public class VectorMapJoinOperator exten
   }
 
   @Override
-  public void initializeOp(Configuration hconf) throws HiveException {
-    super.initializeOp(hconf);
-    
+  public Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    // Code borrowed from VectorReduceSinkOperator.initializeOp
+    VectorExpressionWriterFactory.processVectorInspector(
+        (StructObjectInspector) inputObjInspectors[0],
+        new VectorExpressionWriterFactory.SingleOIDClosure() {
+          @Override
+          public void assign(VectorExpressionWriter[] writers,
+                             ObjectInspector objectInspector) {
+            rowWriters = writers;
+            inputObjInspectors[0] = objectInspector;
+          }
+        });
+    singleRow = new Object[rowWriters.length];
+
+    Collection<Future<?>> result = super.initializeOp(hconf);
+
     List<ExprNodeDesc> keyDesc = conf.getKeys().get(posBigTable);
     keyOutputWriters = VectorExpressionWriterFactory.getExpressionWriters(keyDesc);
 
@@ -178,6 +204,7 @@ public class VectorMapJoinOperator exten
     filterMaps[posBigTable] = null;
 
     outputVectorAssigners = new HashMap<ObjectInspector, VectorColumnAssign[]>();
+    return result;
   }
 
   /**
@@ -208,22 +235,34 @@ public class VectorMapJoinOperator exten
 
   @Override
   public void closeOp(boolean aborted) throws HiveException {
+    super.closeOp(aborted);
+    for (MapJoinTableContainer tableContainer : mapJoinTables) {
+      if (tableContainer != null) {
+        tableContainer.dumpMetrics();
+      }
+    }
     if (!aborted && 0 < outputBatch.size) {
       flushOutput();
     }
   }
 
   @Override
-  protected void setMapJoinKey(ReusableGetAdaptor dest, Object row, byte alias)
+  protected JoinUtil.JoinResult setMapJoinKey(ReusableGetAdaptor dest, Object row, byte alias)
       throws HiveException {
-    dest.setFromVector(keyValues[batchIndex], keyOutputWriters, keyWrapperBatch);
+    return dest.setFromVector(keyValues[batchIndex], keyOutputWriters, keyWrapperBatch);
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     byte alias = (byte) tag;
     VectorizedRowBatch inBatch = (VectorizedRowBatch) row;
 
+    // Preparation for hybrid grace hash join
+    this.tag = tag;
+    if (scratchBatch == null) {
+      scratchBatch = makeLike(inBatch);
+    }
+
     if (null != bigTableFilterExpressions) {
       for(VectorExpression ve:bigTableFilterExpressions) {
         ve.evaluate(inBatch);
@@ -246,7 +285,7 @@ public class VectorMapJoinOperator exten
     // of row-mode small-tables) this is a reasonable trade-off.
     //
     for(batchIndex=0; batchIndex < inBatch.size; ++batchIndex) {
-      super.processOp(row, tag);
+      super.process(row, tag);
     }
 
     // Set these two to invalid values so any attempt to use them
@@ -259,4 +298,94 @@ public class VectorMapJoinOperator exten
   public VectorizationContext getOuputVectorizationContext() {
     return vOutContext;
   }
+
+  @Override
+  protected void spillBigTableRow(MapJoinTableContainer hybridHtContainer, Object row)
+      throws HiveException {
+    // Extract the actual row from row batch
+    VectorizedRowBatch inBatch = (VectorizedRowBatch) row;
+    Object[] actualRow = getRowObject(inBatch, batchIndex);
+    super.spillBigTableRow(hybridHtContainer, actualRow);
+  }
+
+  @Override
+  protected void reProcessBigTable(HybridHashTableContainer.HashPartition partition)
+      throws HiveException {
+    ObjectContainer bigTable = partition.getMatchfileObjContainer();
+
+    DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
+    while (bigTable.hasNext()) {
+      Object row = bigTable.next();
+      VectorizedBatchUtil.addProjectedRowToBatchFrom(row,
+          (StructObjectInspector) inputObjInspectors[posBigTable],
+          scratchBatch.size, scratchBatch, dataOutputBuffer);
+      scratchBatch.size++;
+
+      if (scratchBatch.size == VectorizedRowBatch.DEFAULT_SIZE) {
+        process(scratchBatch, tag); // call process once we have a full batch
+        scratchBatch.reset();
+        dataOutputBuffer.reset();
+      }
+    }
+    // Process the row batch that has less than DEFAULT_SIZE rows
+    if (scratchBatch.size > 0) {
+      process(scratchBatch, tag);
+      scratchBatch.reset();
+      dataOutputBuffer.reset();
+    }
+    bigTable.clear();
+  }
+
+  // Code borrowed from VectorReduceSinkOperator
+  private Object[] getRowObject(VectorizedRowBatch vrb, int rowIndex) throws HiveException {
+    int batchIndex = rowIndex;
+    if (vrb.selectedInUse) {
+      batchIndex = vrb.selected[rowIndex];
+    }
+    for (int i = 0; i < vrb.projectionSize; i++) {
+      ColumnVector vectorColumn = vrb.cols[vrb.projectedColumns[i]];
+      if (vectorColumn != null) {
+        singleRow[i] = rowWriters[i].writeValue(vectorColumn, batchIndex);
+      } else {
+        // Some columns from tables are not used.
+        singleRow[i] = null;
+      }
+    }
+    return singleRow;
+  }
+
+  /**
+   * Make a new (scratch) batch, which is exactly "like" the batch provided, except that it's empty
+   * @param batch the batch to imitate
+   * @return the new batch
+   * @throws HiveException
+   */
+  VectorizedRowBatch makeLike(VectorizedRowBatch batch) throws HiveException {
+    VectorizedRowBatch newBatch = new VectorizedRowBatch(batch.numCols);
+    for (int i = 0; i < batch.numCols; i++) {
+      ColumnVector colVector = batch.cols[i];
+      if (colVector != null) {
+        ColumnVector newColVector;
+        if (colVector instanceof LongColumnVector) {
+          newColVector = new LongColumnVector();
+        } else if (colVector instanceof DoubleColumnVector) {
+          newColVector = new DoubleColumnVector();
+        } else if (colVector instanceof BytesColumnVector) {
+          newColVector = new BytesColumnVector();
+        } else if (colVector instanceof DecimalColumnVector) {
+          DecimalColumnVector decColVector = (DecimalColumnVector) colVector;
+          newColVector = new DecimalColumnVector(decColVector.precision, decColVector.scale);
+        } else {
+          throw new HiveException("Column vector class " + colVector.getClass().getName() +
+          " is not supported!");
+        }
+        newBatch.cols[i] = newColVector;
+        newBatch.cols[i].init();
+      }
+    }
+    newBatch.projectedColumns = Arrays.copyOf(batch.projectedColumns, batch.projectedColumns.length);
+    newBatch.projectionSize = batch.projectionSize;
+    newBatch.reset();
+    return newBatch;
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java Sat Mar 28 14:03:43 2015
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
+import java.util.Collection;
+import java.util.concurrent.Future;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
@@ -34,7 +37,7 @@ public class VectorReduceSinkOperator ex
 
   // Writer for producing row from input batch.
   private VectorExpressionWriter[] rowWriters;
-  
+
   protected transient Object[] singleRow;
 
   public VectorReduceSinkOperator(VectorizationContext vContext, OperatorDesc conf)
@@ -49,7 +52,7 @@ public class VectorReduceSinkOperator ex
   }
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
     // We need a input object inspector that is for the row we will extract out of the
     // vectorized row batch, not for example, an original inspector for an ORC table, etc.
     VectorExpressionWriterFactory.processVectorInspector(
@@ -64,17 +67,16 @@ public class VectorReduceSinkOperator ex
             });
     singleRow = new Object[rowWriters.length];
 
-    // Call ReduceSinkOperator with new input inspector.
-    super.initializeOp(hconf);
+    return super.initializeOp(hconf);
   }
 
   @Override
-  public void processOp(Object data, int tag) throws HiveException {
+  public void process(Object data, int tag) throws HiveException {
     VectorizedRowBatch vrg = (VectorizedRowBatch) data;
 
     for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) {
       Object row = getRowObject(vrg, batchIndex);
-      super.processOp(row, tag);
+      super.process(row, tag);
     }
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java Sat Mar 28 14:03:43 2015
@@ -19,9 +19,11 @@
 package org.apache.hadoop.hive.ql.exec.vector;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -47,8 +49,8 @@ import org.apache.hadoop.hive.serde2.obj
 public class VectorSMBMapJoinOperator extends SMBMapJoinOperator implements VectorizationContextRegion {
 
   private static final Log LOG = LogFactory.getLog(
-      VectorSMBMapJoinOperator.class.getName());  
-  
+      VectorSMBMapJoinOperator.class.getName());
+
   private static final long serialVersionUID = 1L;
 
   private VectorExpression[] bigTableValueExpressions;
@@ -65,7 +67,7 @@ public class VectorSMBMapJoinOperator ex
   // transient.
   //---------------------------------------------------------------------------
 
-  private transient VectorizedRowBatch outputBatch;  
+  private transient VectorizedRowBatch outputBatch;
 
   private transient VectorizedRowBatchCtx vrbCtx = null;
 
@@ -78,23 +80,23 @@ public class VectorSMBMapJoinOperator ex
   private transient VectorHashKeyWrapper[] keyValues;
 
   private transient SMBJoinKeyEvaluator keyEvaluator;
-  
+
   private transient VectorExpressionWriter[] valueWriters;
-  
+
   private interface SMBJoinKeyEvaluator {
     List<Object> evaluate(VectorHashKeyWrapper kw) throws HiveException;
-}  
+}
 
   public VectorSMBMapJoinOperator() {
     super();
   }
-  
+
   public VectorSMBMapJoinOperator(VectorizationContext vContext, OperatorDesc conf)
       throws HiveException {
     this();
     SMBJoinDesc desc = (SMBJoinDesc) conf;
     this.conf = desc;
-    
+
     order = desc.getTagOrder();
     numAliases = desc.getExprs().size();
     posBigTable = (byte) desc.getPosBigTable();
@@ -118,7 +120,7 @@ public class VectorSMBMapJoinOperator ex
     vOutContext = new VectorizationContext(desc.getOutputColumnNames());
     vOutContext.setFileKey(vContext.getFileKey() + "/SMB_JOIN_" + desc.getBigTableAlias());
   }
-  
+
   @Override
   protected List<Object> smbJoinComputeKeys(Object row, byte alias) throws HiveException {
     if (alias == this.posBigTable) {
@@ -127,21 +129,21 @@ public class VectorSMBMapJoinOperator ex
     } else {
       return super.smbJoinComputeKeys(row, alias);
     }
-  }  
-  
+  }
+
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
-    super.initializeOp(hconf);
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
 
     vrbCtx = new VectorizedRowBatchCtx();
     vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) this.outputObjInspector);
-    
+
     outputBatch = vrbCtx.createVectorizedRowBatch();
-    
+
     keyWrapperBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions);
-    
+
     outputVectorAssigners = new HashMap<ObjectInspector, VectorColumnAssign[]>();
-    
+
     // This key evaluator translates from the vectorized VectorHashKeyWrapper format
     // into the row-mode MapJoinKey
     keyEvaluator = new SMBJoinKeyEvaluator() {
@@ -163,14 +165,14 @@ public class VectorSMBMapJoinOperator ex
         return key;
       };
     }.init();
-    
+
     Map<Byte, List<ExprNodeDesc>> valueExpressions = conf.getExprs();
-    List<ExprNodeDesc> bigTableExpressions = valueExpressions.get(posBigTable);    
-    
+    List<ExprNodeDesc> bigTableExpressions = valueExpressions.get(posBigTable);
+
     // We're hijacking the big table evaluators and replacing them with our own custom ones
     // which are going to return values from the input batch vector expressions
     List<ExprNodeEvaluator> vectorNodeEvaluators = new ArrayList<ExprNodeEvaluator>(bigTableExpressions.size());
-    
+
     VectorExpressionWriterFactory.processVectorExpressions(
         bigTableExpressions,
         new VectorExpressionWriterFactory.ListOIDClosure() {
@@ -180,7 +182,7 @@ public class VectorSMBMapJoinOperator ex
             valueWriters = writers;
             joinValuesObjectInspectors[posBigTable] = oids;
           }
-        });    
+        });
 
     for(int i=0; i<bigTableExpressions.size(); ++i) {
       ExprNodeDesc desc = bigTableExpressions.get(i);
@@ -213,51 +215,51 @@ public class VectorSMBMapJoinOperator ex
     }
     // Now replace the old evaluators with our own
     joinValues[posBigTable] = vectorNodeEvaluators;
-    
+    return result;
   }
-  
+
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     byte alias = (byte) tag;
-    
+
     if (alias != this.posBigTable) {
-      super.processOp(row, tag);
+      super.process(row, tag);
     } else {
-  
+
       VectorizedRowBatch inBatch = (VectorizedRowBatch) row;
-  
+
       if (null != bigTableFilterExpressions) {
         for(VectorExpression ve : bigTableFilterExpressions) {
           ve.evaluate(inBatch);
         }
       }
-  
+
       if (null != bigTableValueExpressions) {
         for(VectorExpression ve : bigTableValueExpressions) {
           ve.evaluate(inBatch);
         }
       }
-  
+
       keyWrapperBatch.evaluateBatch(inBatch);
       keyValues = keyWrapperBatch.getVectorHashKeyWrappers();
-  
+
       // This implementation of vectorized JOIN is delegating all the work
       // to the row-mode implementation by hijacking the big table node evaluators
       // and calling the row-mode join processOp for each row in the input batch.
-      // Since the JOIN operator is not fully vectorized anyway at the moment 
+      // Since the JOIN operator is not fully vectorized anyway at the moment
       // (due to the use of row-mode small-tables) this is a reasonable trade-off.
       //
       for(batchIndex=0; batchIndex < inBatch.size; ++batchIndex ) {
-        super.processOp(row, tag);
+        super.process(row, tag);
       }
-  
+
       // Set these two to invalid values so any attempt to use them
       // outside the inner loop results in NPE/OutOfBounds errors
       batchIndex = -1;
       keyValues = null;
     }
   }
-  
+
   @Override
   public void closeOp(boolean aborted) throws HiveException {
     super.closeOp(aborted);
@@ -265,7 +267,7 @@ public class VectorSMBMapJoinOperator ex
       flushOutput();
     }
   }
-  
+
   @Override
   protected void internalForward(Object row, ObjectInspector outputOI) throws HiveException {
     Object[] values = (Object[]) row;
@@ -283,11 +285,11 @@ public class VectorSMBMapJoinOperator ex
       flushOutput();
     }
   }
-  
+
   private void flushOutput() throws HiveException {
     forward(outputBatch, null);
     outputBatch.reset();
-  }  
+  }
 
   @Override
   public VectorizationContext getOuputVectorizationContext() {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java Sat Mar 28 14:03:43 2015
@@ -19,11 +19,12 @@
 package org.apache.hadoop.hive.ql.exec.vector;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
-import java.util.Map;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
@@ -31,13 +32,15 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 
 /**
  * Select operator implementation.
  */
-public class VectorSelectOperator extends SelectOperator implements VectorizationContextRegion {
+public class VectorSelectOperator extends Operator<SelectDesc> implements
+    VectorizationContextRegion {
 
   private static final long serialVersionUID = 1L;
 
@@ -62,7 +65,7 @@ public class VectorSelectOperator extend
     }
 
     /**
-     * Create a new vectorization context to create a new projection, but keep 
+     * Create a new vectorization context to create a new projection, but keep
      * same output column manager must be inherited to track the scratch the columns.
      */
     vOutContext = new VectorizationContext(vContext);
@@ -74,7 +77,7 @@ public class VectorSelectOperator extend
     for (int i=0; i < colList.size(); ++i) {
       String columnName = this.conf.getOutputColumnNames().get(i);
       VectorExpression ve = vExpressions[i];
-      vOutContext.addProjectionColumn(columnName, 
+      vOutContext.addProjectionColumn(columnName,
               ve.getOutputColumn());
     }
   }
@@ -83,11 +86,11 @@ public class VectorSelectOperator extend
   }
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
     // Just forward the row as is
     if (conf.isSelStarNoCompute()) {
-      initializeChildren(hconf);
-      return;
+      return null;
     }
 
     List<ObjectInspector> objectInspectors = new ArrayList<ObjectInspector>();
@@ -102,15 +105,15 @@ public class VectorSelectOperator extend
     outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
         outputFieldNames, objectInspectors);
 
-    initializeChildren(hconf);
     projectedColumns = new int [vExpressions.length];
     for (int i = 0; i < projectedColumns.length; i++) {
       projectedColumns[i] = vExpressions[i].getOutputColumn();
     }
+    return result;
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
 
     // Just forward the row as is
     if (conf.isSelStarNoCompute()) {
@@ -167,4 +170,9 @@ public class VectorSelectOperator extend
   public VectorizationContext getOuputVectorizationContext() {
     return vOutContext;
   }
+
+  @Override
+  public OperatorType getType() {
+    return OperatorType.SELECT;
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java Sat Mar 28 14:03:43 2015
@@ -38,6 +38,8 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.common.type.HiveChar;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
+import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
 import org.apache.hadoop.hive.common.type.HiveVarchar;
 import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
 import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
@@ -113,6 +115,7 @@ import org.apache.hadoop.hive.serde2.typ
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.DateUtils;
 
 /**
  * Context class for vectorization execution.
@@ -253,6 +256,8 @@ public class VectorizationContext {
     castExpressionUdfs.add(GenericUDFToChar.class);
     castExpressionUdfs.add(GenericUDFToVarchar.class);
     castExpressionUdfs.add(GenericUDFTimestamp.class);
+    castExpressionUdfs.add(GenericUDFToIntervalYearMonth.class);
+    castExpressionUdfs.add(GenericUDFToIntervalDayTime.class);
     castExpressionUdfs.add(UDFToByte.class);
     castExpressionUdfs.add(UDFToBoolean.class);
     castExpressionUdfs.add(UDFToDouble.class);
@@ -658,6 +663,12 @@ public class VectorizationContext {
       case TIMESTAMP:
         genericUdf = new GenericUDFToUnixTimeStamp();
         break;
+      case INTERVAL_YEAR_MONTH:
+        genericUdf = new GenericUDFToIntervalYearMonth();
+        break;
+      case INTERVAL_DAY_TIME:
+        genericUdf = new GenericUDFToIntervalDayTime();
+        break;
       case BINARY:
         genericUdf = new GenericUDFToBinary();
         break;
@@ -871,8 +882,16 @@ public class VectorizationContext {
     switch (vectorArgType) {
     case INT_FAMILY:
       return new ConstantVectorExpression(outCol, ((Number) constantValue).longValue());
+    case DATE:
+      return new ConstantVectorExpression(outCol, DateWritable.dateToDays((Date) constantValue));
     case TIMESTAMP:
       return new ConstantVectorExpression(outCol, TimestampUtils.getTimeNanoSec((Timestamp) constantValue));
+    case INTERVAL_YEAR_MONTH:
+      return new ConstantVectorExpression(outCol,
+          ((HiveIntervalYearMonth) constantValue).getTotalMonths());
+    case INTERVAL_DAY_TIME:
+      return new ConstantVectorExpression(outCol,
+          DateUtils.getIntervalDayTimeTotalNanos((HiveIntervalDayTime) constantValue));
     case FLOAT_FAMILY:
       return new ConstantVectorExpression(outCol, ((Number) constantValue).doubleValue());
     case DECIMAL:
@@ -1773,6 +1792,14 @@ public class VectorizationContext {
     return resultType.equalsIgnoreCase("date");
   }
 
+  public static boolean isIntervalYearMonthFamily(String resultType) {
+    return resultType.equalsIgnoreCase("interval_year_month");
+  }
+
+  public static boolean isIntervalDayTimeFamily(String resultType) {
+    return resultType.equalsIgnoreCase("interval_day_time");
+  }
+
   // return true if this is any kind of float
   public static boolean isFloatFamily(String resultType) {
     return resultType.equalsIgnoreCase("double")
@@ -1843,12 +1870,19 @@ public class VectorizationContext {
 
   private Object getVectorTypeScalarValue(ExprNodeConstantDesc constDesc) throws HiveException {
     String t = constDesc.getTypeInfo().getTypeName();
-    if (isTimestampFamily(t)) {
-      return TimestampUtils.getTimeNanoSec((Timestamp) getScalarValue(constDesc));
-    } else if (isDateFamily(t)) {
-      return DateWritable.dateToDays((Date) getScalarValue(constDesc));
-    } else {
-      return getScalarValue(constDesc);
+    VectorExpression.Type type = VectorExpression.Type.getValue(t);
+    Object scalarValue = getScalarValue(constDesc);
+    switch (type) {
+      case TIMESTAMP:
+        return TimestampUtils.getTimeNanoSec((Timestamp) scalarValue);
+      case DATE:
+        return DateWritable.dateToDays((Date) scalarValue);
+      case INTERVAL_YEAR_MONTH:
+        return ((HiveIntervalYearMonth) scalarValue).getTotalMonths();
+      case INTERVAL_DAY_TIME:
+        return DateUtils.getIntervalDayTimeTotalNanos((HiveIntervalDayTime) scalarValue);
+      default:
+        return scalarValue;
     }
   }
 
@@ -1935,6 +1969,9 @@ public class VectorizationContext {
       return "Date";
     case TIMESTAMP:
       return "Timestamp";
+    case INTERVAL_YEAR_MONTH:
+    case INTERVAL_DAY_TIME:
+      return hiveTypeName;
     default:
       return "None";
     }
@@ -1959,6 +1996,9 @@ public class VectorizationContext {
       return "Date";
     case TIMESTAMP:
       return "Timestamp";
+    case INTERVAL_YEAR_MONTH:
+    case INTERVAL_DAY_TIME:
+      return hiveTypeName;
     default:
       return "None";
     }
@@ -1969,16 +2009,16 @@ public class VectorizationContext {
   // TODO:   And, investigate if different reduce-side versions are needed for var* and std*, or if map-side aggregate can be used..  Right now they are conservatively
   //         marked map-side (HASH).
   static ArrayList<AggregateDefinition> aggregatesDefinition = new ArrayList<AggregateDefinition>() {{
-    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.INT_DATETIME_FAMILY,    null,                          VectorUDAFMinLong.class));
+    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.INT_DATETIME_INTERVAL_FAMILY,    null,                          VectorUDAFMinLong.class));
     add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           null,                          VectorUDAFMinDouble.class));
     add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.STRING_FAMILY,          null,                          VectorUDAFMinString.class));
     add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.DECIMAL,                null,                          VectorUDAFMinDecimal.class));
-    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.INT_DATETIME_FAMILY,    null,                          VectorUDAFMaxLong.class));
+    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.INT_DATETIME_INTERVAL_FAMILY,    null,                          VectorUDAFMaxLong.class));
     add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           null,                          VectorUDAFMaxDouble.class));
     add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.STRING_FAMILY,          null,                          VectorUDAFMaxString.class));
     add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.DECIMAL,                null,                          VectorUDAFMaxDecimal.class));
     add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.NONE,                   GroupByDesc.Mode.HASH,         VectorUDAFCountStar.class));
-    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.INT_DATETIME_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
+    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.INT_DATETIME_INTERVAL_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
     add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.INT_FAMILY,             GroupByDesc.Mode.MERGEPARTIAL, VectorUDAFCountMerge.class));
     add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
     add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.STRING_FAMILY,          GroupByDesc.Mode.HASH,         VectorUDAFCount.class));

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java Sat Mar 28 14:03:43 2015
@@ -26,6 +26,8 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.common.type.HiveChar;
+import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
+import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
 import org.apache.hadoop.hive.common.type.HiveVarchar;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -34,6 +36,8 @@ import org.apache.hadoop.hive.serde2.io.
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.HiveIntervalDayTimeWritable;
+import org.apache.hadoop.hive.serde2.io.HiveIntervalYearMonthWritable;
 import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
 import org.apache.hadoop.hive.serde2.io.ShortWritable;
 import org.apache.hadoop.hive.serde2.io.TimestampWritable;
@@ -50,6 +54,7 @@ import org.apache.hadoop.io.FloatWritabl
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hive.common.util.DateUtils;
 
 public class VectorizedBatchUtil {
   private static final Log LOG = LogFactory.getLog(VectorizedBatchUtil.class);
@@ -126,6 +131,8 @@ public class VectorizedBatchUtil {
         case LONG:
         case TIMESTAMP:
         case DATE:
+        case INTERVAL_YEAR_MONTH:
+        case INTERVAL_DAY_TIME:
           cvList.add(new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE));
           break;
         case FLOAT:
@@ -235,11 +242,31 @@ public class VectorizedBatchUtil {
     final int off = colOffset;
     // Iterate thru the cols and load the batch
     for (int i = 0; i < fieldRefs.size(); i++) {
-      setVector(row, oi, fieldRefs, batch, buffer, rowIndex, i, off);
+      setVector(row, oi, fieldRefs.get(i), batch, buffer, rowIndex, i, off);
     }
   }
 
   /**
+   * Add only the projected column of a regular row to the specified vectorized row batch
+   * @param row the regular row
+   * @param oi object inspector for the row
+   * @param rowIndex the offset to add in the batch
+   * @param batch vectorized row batch
+   * @param buffer data output buffer
+   * @throws HiveException
+   */
+  public static void addProjectedRowToBatchFrom(Object row, StructObjectInspector oi,
+      int rowIndex, VectorizedRowBatch batch, DataOutputBuffer buffer) throws HiveException {
+    List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
+    for (int i = 0; i < fieldRefs.size(); i++) {
+      int projectedOutputCol = batch.projectedColumns[i];
+      if (batch.cols[projectedOutputCol] == null) {
+        continue;
+      }
+      setVector(row, oi, fieldRefs.get(i), batch, buffer, rowIndex, projectedOutputCol, 0);
+    }
+  }
+  /**
    * Iterates thru all the columns in a given row and populates the batch
    * from a given offset
    *
@@ -268,21 +295,21 @@ public class VectorizedBatchUtil {
         // The value will have already been set before we're called, so don't overwrite it
         continue;
       }
-      setVector(row, oi, fieldRefs, batch, buffer, rowIndex, i, 0);
+      setVector(row, oi, fieldRefs.get(i), batch, buffer, rowIndex, i, 0);
     }
   }
 
   private static void setVector(Object row,
                                 StructObjectInspector oi,
-                                List<? extends StructField> fieldRefs,
+                                StructField field,
                                 VectorizedRowBatch batch,
                                 DataOutputBuffer buffer,
                                 int rowIndex,
                                 int colIndex,
                                 int offset) throws HiveException {
 
-    Object fieldData = oi.getStructFieldData(row, fieldRefs.get(colIndex));
-    ObjectInspector foi = fieldRefs.get(colIndex).getFieldObjectInspector();
+    Object fieldData = oi.getStructFieldData(row, field);
+    ObjectInspector foi = field.getFieldObjectInspector();
 
     // Vectorization only supports PRIMITIVE data types. Assert the same
     assert (foi.getCategory() == Category.PRIMITIVE);
@@ -390,6 +417,30 @@ public class VectorizedBatchUtil {
         lcv.isNull[rowIndex] = false;
       } else {
         lcv.vector[rowIndex] = 1;
+        setNullColIsNullValue(lcv, rowIndex);
+      }
+    }
+      break;
+    case INTERVAL_YEAR_MONTH: {
+      LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
+      if (writableCol != null) {
+        HiveIntervalYearMonth i = ((HiveIntervalYearMonthWritable) writableCol).getHiveIntervalYearMonth();
+        lcv.vector[rowIndex] = i.getTotalMonths();
+        lcv.isNull[rowIndex] = false;
+      } else {
+        lcv.vector[rowIndex] = 1;
+        setNullColIsNullValue(lcv, rowIndex);
+      }
+    }
+      break;
+    case INTERVAL_DAY_TIME: {
+      LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
+      if (writableCol != null) {
+        HiveIntervalDayTime i = ((HiveIntervalDayTimeWritable) writableCol).getHiveIntervalDayTime();
+        lcv.vector[rowIndex] = DateUtils.getIntervalDayTimeTotalNanos(i);
+        lcv.isNull[rowIndex] = false;
+      } else {
+        lcv.vector[rowIndex] = 1;
         setNullColIsNullValue(lcv, rowIndex);
       }
     }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Sat Mar 28 14:03:43 2015
@@ -35,6 +35,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
+import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
@@ -42,6 +44,7 @@ import org.apache.hadoop.hive.ql.io.IOPr
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDeException;
@@ -61,6 +64,7 @@ import org.apache.hadoop.hive.serde2.typ
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hive.common.util.DateUtils;
 
 /**
  * Context for Vectorized row batch. this calss does eager deserialization of row data using serde
@@ -301,6 +305,8 @@ public class VectorizedRowBatchCtx {
           case LONG:
           case TIMESTAMP:
           case DATE:
+          case INTERVAL_YEAR_MONTH:
+          case INTERVAL_DAY_TIME:
             result.cols[j] = new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
             break;
           case FLOAT:
@@ -503,7 +509,31 @@ public class VectorizedRowBatchCtx {
           }
         }
         break;
-        
+
+        case INTERVAL_YEAR_MONTH: {
+          LongColumnVector lcv = (LongColumnVector) batch.cols[colIndex];
+          if (value == null) {
+            lcv.noNulls = false;
+            lcv.isNull[0] = true;
+            lcv.isRepeating = true;
+          } else {
+            lcv.fill(((HiveIntervalYearMonth) value).getTotalMonths());
+            lcv.isNull[0] = false;
+          }
+        }
+
+        case INTERVAL_DAY_TIME: {
+          LongColumnVector lcv = (LongColumnVector) batch.cols[colIndex];
+          if (value == null) {
+            lcv.noNulls = false;
+            lcv.isNull[0] = true;
+            lcv.isRepeating = true;
+          } else {
+            lcv.fill(DateUtils.getIntervalDayTimeTotalNanos((HiveIntervalDayTime) value));
+            lcv.isNull[0] = false;
+          }
+        }
+
         case FLOAT: {
           DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[colIndex];
           if (value == null) {
@@ -637,7 +667,9 @@ public class VectorizedRowBatchCtx {
       return new DecimalColumnVector(defaultSize, precisionScale[0], precisionScale[1]);
     } else if (type.equalsIgnoreCase("long") ||
                type.equalsIgnoreCase("date") ||
-               type.equalsIgnoreCase("timestamp")) {
+               type.equalsIgnoreCase("timestamp") ||
+               type.equalsIgnoreCase(serdeConstants.INTERVAL_YEAR_MONTH_TYPE_NAME) ||
+               type.equalsIgnoreCase(serdeConstants.INTERVAL_DAY_TIME_TYPE_NAME)) {
       return new LongColumnVector(defaultSize);
     } else {
       throw new Error("Cannot allocate vector column for " + type);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/CuckooSetBytes.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/CuckooSetBytes.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/CuckooSetBytes.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/CuckooSetBytes.java Sat Mar 28 14:03:43 2015
@@ -206,7 +206,7 @@ public class CuckooSetBytes {
     // Save original values
     if (prev1 == null) {
       prev1 = t1;
-      prev1 = t2;
+      prev2 = t2;
     }
     t1 = new byte[n][];
     t2 = new byte[n][];

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/CuckooSetDouble.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/CuckooSetDouble.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/CuckooSetDouble.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/CuckooSetDouble.java Sat Mar 28 14:03:43 2015
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.hive.ql.exec.vector.expressions;
 
-import java.util.Arrays;
-import java.util.Random;
 
 /**
  * A high-performance set implementation used to support fast set membership testing,

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/CuckooSetLong.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/CuckooSetLong.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/CuckooSetLong.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/CuckooSetLong.java Sat Mar 28 14:03:43 2015
@@ -244,7 +244,7 @@ public class CuckooSetLong {
     // Save original values
     if (prev1 == null) {
       prev1 = t1;
-      prev1 = t2;
+      prev2 = t2;
     }
     t1 = new long[n];
     t2 = new long[n];

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/MathExpr.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/MathExpr.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/MathExpr.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/MathExpr.java Sat Mar 28 14:03:43 2015
@@ -75,11 +75,11 @@ public class MathExpr {
     return v == 0.0D ? 0L : 1L;
   }
 
-  /* Convert an integer value in miliseconds since the epoch to a timestamp value
+  /* Convert an integer value in seconds since the epoch to a timestamp value
    * for use in a long column vector, which is represented in nanoseconds since the epoch.
    */
   public static long longToTimestamp(long v) {
-    return v * 1000000;
+    return v * 1000000000;
   }
 
   // Convert seconds since the epoch (with fraction) to nanoseconds, as a long integer.

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java Sat Mar 28 14:03:43 2015
@@ -30,7 +30,8 @@ import org.apache.hadoop.hive.ql.exec.ve
  */
 public abstract class VectorExpression implements Serializable {
   public enum Type {
-    STRING, CHAR, VARCHAR, TIMESTAMP, DATE, LONG, DOUBLE, DECIMAL, OTHER;
+    STRING, CHAR, VARCHAR, TIMESTAMP, DATE, LONG, DOUBLE, DECIMAL,
+    INTERVAL_YEAR_MONTH, INTERVAL_DAY_TIME, OTHER;
     private static Map<String, Type> types = ImmutableMap.<String, Type>builder()
         .put("string", STRING)
         .put("char", CHAR)
@@ -40,6 +41,8 @@ public abstract class VectorExpression i
         .put("long", LONG)
         .put("double", DOUBLE)
         .put("decimal", DECIMAL)
+        .put("interval_year_month", INTERVAL_YEAR_MONTH)
+        .put("interval_day_time", INTERVAL_DAY_TIME)
         .build();
 
     public static Type getValue(String name) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java Sat Mar 28 14:03:43 2015
@@ -28,6 +28,8 @@ import org.apache.commons.lang.ArrayUtil
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hive.common.type.HiveChar;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
+import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
 import org.apache.hadoop.hive.common.type.HiveVarchar;
 import org.apache.hadoop.hive.ql.exec.vector.*;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -47,6 +49,8 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableFloatObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableHiveCharObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableHiveDecimalObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableHiveIntervalDayTimeObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableHiveIntervalYearMonthObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableHiveVarcharObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableIntObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableLongObjectInspector;
@@ -56,6 +60,7 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.io.Text;
+import org.apache.hive.common.util.DateUtils;
 
 /**
  * VectorExpressionWritableFactory helper class for generating VectorExpressionWritable objects.
@@ -430,6 +435,12 @@ public final class VectorExpressionWrite
           case DATE:
             return genVectorExpressionWritableDate(
                 (SettableDateObjectInspector) fieldObjInspector);
+          case INTERVAL_YEAR_MONTH:
+            return genVectorExpressionWritableIntervalYearMonth(
+                (SettableHiveIntervalYearMonthObjectInspector) fieldObjInspector);
+          case INTERVAL_DAY_TIME:
+            return genVectorExpressionWritableIntervalDayTime(
+                (SettableHiveIntervalDayTimeObjectInspector) fieldObjInspector);
           case DECIMAL:
             return genVectorExpressionWritableDecimal(
                 (SettableHiveDecimalObjectInspector) fieldObjInspector);
@@ -586,6 +597,84 @@ public final class VectorExpressionWrite
       }
    }.init(fieldObjInspector);
   }
+
+  private static VectorExpressionWriter genVectorExpressionWritableIntervalYearMonth(
+      SettableHiveIntervalYearMonthObjectInspector fieldObjInspector) throws HiveException {
+    return new VectorExpressionWriterLong() {
+      private Object obj;
+      private HiveIntervalYearMonth interval;
+
+      public VectorExpressionWriter init(SettableHiveIntervalYearMonthObjectInspector objInspector)
+          throws HiveException {
+        super.init(objInspector);
+        interval = new HiveIntervalYearMonth();
+        obj = initValue(null);
+        return this;
+      }
+
+      @Override
+      public Object writeValue(long value) {
+        interval.set((int) value);
+        ((SettableHiveIntervalYearMonthObjectInspector) this.objectInspector).set(obj, interval);
+        return obj;
+      }
+
+      @Override
+      public Object setValue(Object field, long value) {
+        if (null == field) {
+          field = initValue(null);
+        }
+        interval.set((int) value);
+        ((SettableHiveIntervalYearMonthObjectInspector) this.objectInspector).set(field, interval);
+        return field;
+      }
+
+      @Override
+      public Object initValue(Object ignored) {
+        return ((SettableHiveIntervalYearMonthObjectInspector) this.objectInspector)
+            .create(new HiveIntervalYearMonth());
+      }
+   }.init(fieldObjInspector);
+  }
+
+  private static VectorExpressionWriter genVectorExpressionWritableIntervalDayTime(
+      SettableHiveIntervalDayTimeObjectInspector fieldObjInspector) throws HiveException {
+    return new VectorExpressionWriterLong() {
+      private Object obj;
+      private HiveIntervalDayTime interval;
+
+      public VectorExpressionWriter init(SettableHiveIntervalDayTimeObjectInspector objInspector)
+          throws HiveException {
+        super.init(objInspector);
+        interval = new HiveIntervalDayTime();
+        obj = initValue(null);
+        return this;
+      }
+
+      @Override
+      public Object writeValue(long value) {
+        DateUtils.setIntervalDayTimeTotalNanos(interval, value);
+        ((SettableHiveIntervalDayTimeObjectInspector) this.objectInspector).set(obj, interval);
+        return obj;
+      }
+
+      @Override
+      public Object setValue(Object field, long value) {
+        if (null == field) {
+          field = initValue(null);
+        }
+        DateUtils.setIntervalDayTimeTotalNanos(interval, value);
+        ((SettableHiveIntervalDayTimeObjectInspector) this.objectInspector).set(field, interval);
+        return field;
+      }
+
+      @Override
+      public Object initValue(Object ignored) {
+        return ((SettableHiveIntervalDayTimeObjectInspector) this.objectInspector)
+            .create(new HiveIntervalDayTime());
+      }
+   }.init(fieldObjInspector);
+  }
 
   private static VectorExpressionWriter genVectorExpressionWritableChar(
         SettableHiveCharObjectInspector fieldObjInspector) throws HiveException {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookUtils.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookUtils.java Sat Mar 28 14:03:43 2015
@@ -65,4 +65,19 @@ public class HookUtils {
     return hooks;
   }
 
+  public static String redactLogString(HiveConf conf, String logString)
+      throws InstantiationException, IllegalAccessException, ClassNotFoundException {
+
+    String redactedString = logString;
+
+    if (conf != null && logString != null) {
+      List<Redactor> queryRedactors = getHooks(conf, ConfVars.QUERYREDACTORHOOKS, Redactor.class);
+      for (Redactor redactor : queryRedactors) {
+        redactor.setConf(conf);
+        redactedString = redactor.redactQuery(redactedString);
+      }
+    }
+
+    return redactedString;
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java Sat Mar 28 14:03:43 2015
@@ -146,11 +146,11 @@ public class VectorizedRCFileRecordReade
 
   @Override
   public VectorizedRowBatch createValue() {
-    VectorizedRowBatch result = null;
+    VectorizedRowBatch result;
     try {
       result = rbCtx.createVectorizedRowBatch();
     } catch (HiveException e) {
-      new RuntimeException("Error creating a batch", e);
+      throw new RuntimeException("Error creating a batch", e);
     }
     return result;
   }
@@ -193,7 +193,7 @@ public class VectorizedRCFileRecordReade
         }
       }
     } catch (Exception e) {
-      new RuntimeException("Error while getting next row", e);
+      throw new RuntimeException("Error while getting next row", e);
     }
     value.size = i;
     return more;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileMapper.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileMapper.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileMapper.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileMapper.java Sat Mar 28 14:03:43 2015
@@ -97,7 +97,7 @@ public class MergeFileMapper extends Map
     row[0] = key;
     row[1] = value;
     try {
-      mergeOp.processOp(row, 0);
+      mergeOp.process(row, 0);
     } catch (HiveException e) {
       abort = true;
       throw new IOException(e);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java Sat Mar 28 14:03:43 2015
@@ -23,7 +23,7 @@ import java.util.EnumSet;
 
 import javax.annotation.Nullable;
 
-interface CompressionCodec {
+public interface CompressionCodec {
 
   public enum Modifier {
     /* speed/compression tradeoffs */

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java Sat Mar 28 14:03:43 2015
@@ -50,7 +50,7 @@ import org.codehaus.jettison.json.JSONWr
  * A tool for printing out the file structure of ORC files.
  */
 public final class FileDump {
-  private static final String ROWINDEX_PREFIX = "--rowindex=";
+  private static final String UNKNOWN = "UNKNOWN";
 
   // not used
   private FileDump() {}
@@ -77,9 +77,13 @@ public final class FileDump {
       }
     }
 
+    boolean printTimeZone = false;
+    if (cli.hasOption('t')) {
+      printTimeZone = true;
+    }
     String[] files = cli.getArgs();
     if (dumpData) printData(Arrays.asList(files), conf);
-    else printMetaData(Arrays.asList(files), conf, rowIndexCols);
+    else printMetaData(Arrays.asList(files), conf, rowIndexCols, printTimeZone);
   }
 
   private static void printData(List<String> files, Configuration conf) throws IOException,
@@ -90,7 +94,7 @@ public final class FileDump {
   }
 
   private static void printMetaData(List<String> files, Configuration conf,
-                                    List<Integer> rowIndexCols) throws IOException {
+      List<Integer> rowIndexCols, boolean printTimeZone) throws IOException {
     for (String filename : files) {
       System.out.println("Structure for " + filename);
       Path path = new Path(filename);
@@ -125,11 +129,19 @@ public final class FileDump {
       for (StripeInformation stripe : reader.getStripes()) {
         ++stripeIx;
         long stripeStart = stripe.getOffset();
-        System.out.println("  Stripe: " + stripe.toString());
         OrcProto.StripeFooter footer = rows.readStripeFooter(stripe);
+        if (printTimeZone) {
+          String tz = footer.getWriterTimezone();
+          if (tz == null || tz.isEmpty()) {
+            tz = UNKNOWN;
+          }
+          System.out.println("  Stripe: " + stripe.toString() + " timezone: " + tz);
+        } else {
+          System.out.println("  Stripe: " + stripe.toString());
+        }
         long sectionStart = stripeStart;
         for(OrcProto.Stream section: footer.getStreamsList()) {
-          String kind = section.hasKind() ? section.getKind().name() : "UNKNOWN";
+          String kind = section.hasKind() ? section.getKind().name() : UNKNOWN;
           System.out.println("    Stream: column " + section.getColumn() +
               " section " + kind + " start: " + sectionStart +
               " length " + section.getLength());
@@ -157,7 +169,7 @@ public final class FileDump {
           for (int colIdx : rowIndexCols) {
             sargColumns[colIdx] = true;
           }
-          RecordReaderImpl.Index indices = rows.readRowIndex(stripeIx, sargColumns);
+          RecordReaderImpl.Index indices = rows.readRowIndex(stripeIx, null, sargColumns);
           for (int col : rowIndexCols) {
             StringBuilder buf = new StringBuilder();
             String rowIdxString = getFormattedRowIndices(col, indices.getRowGroupIndex());
@@ -278,6 +290,13 @@ public final class FileDump {
         .withDescription("Should the data be printed")
         .create('d'));
 
+    // to avoid breaking unit tests (when run in different time zones) for file dump, printing
+    // of timezone is made optional
+    result.addOption(OptionBuilder
+        .withLongOpt("timezone")
+        .withDescription("Print writer's time zone")
+        .create('t'));
+
     result.addOption(OptionBuilder
         .withLongOpt("help")
         .withDescription("print help message")

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Sat Mar 28 14:03:43 2015
@@ -20,28 +20,50 @@ package org.apache.hadoop.hive.ql.io.orc
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.common.DiskRangeList;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
+import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
 
-abstract class InStream extends InputStream {
+import com.google.common.annotations.VisibleForTesting;
+
+public abstract class InStream extends InputStream {
 
   private static final Log LOG = LogFactory.getLog(InStream.class);
 
+  protected final String name;
+  protected final long length;
+
+  public InStream(String name, long length) {
+    this.name = name;
+    this.length = length;
+  }
+
+  public String getStreamName() {
+    return name;
+  }
+
+  public long getStreamLength() {
+    return length;
+  }
+
   private static class UncompressedStream extends InStream {
-    private final String name;
-    private final ByteBuffer[] bytes;
-    private final long[] offsets;
+    private final List<DiskRange> bytes;
     private final long length;
     private long currentOffset;
     private ByteBuffer range;
     private int currentRange;
 
-    public UncompressedStream(String name, ByteBuffer[] input, long[] offsets,
-                              long length) {
-      this.name = name;
+    public UncompressedStream(String name, List<DiskRange> input, long length) {
+      super(name, length);
       this.bytes = input;
-      this.offsets = offsets;
       this.length = length;
       currentRange = 0;
       currentOffset = 0;
@@ -83,12 +105,10 @@ abstract class InStream extends InputStr
 
     @Override
     public void close() {
-      currentRange = bytes.length;
+      currentRange = bytes.size();
       currentOffset = length;
       // explicit de-ref of bytes[]
-      for(int i = 0; i < bytes.length; i++) {
-        bytes[i] = null;
-      }
+      bytes.clear();
     }
 
     @Override
@@ -97,23 +117,27 @@ abstract class InStream extends InputStr
     }
 
     public void seek(long desired) {
-      for(int i = 0; i < bytes.length; ++i) {
-        if (desired == 0 && bytes[i].remaining() == 0) {
-          if (LOG.isWarnEnabled()) {
-            LOG.warn("Attempting seek into empty stream (" + name + ") Skipping stream.");
-          }
+      if (desired == 0 && bytes.isEmpty()) {
+        logEmptySeek(name);
+        return;
+      }
+      int i = 0;
+      for(DiskRange curRange : bytes) {
+        if (desired == 0 && curRange.getData().remaining() == 0) {
+          logEmptySeek(name);
           return;
         }
-        if (offsets[i] <= desired &&
-            desired - offsets[i] < bytes[i].remaining()) {
+        if (curRange.getOffset() <= desired &&
+            (desired - curRange.getOffset()) < curRange.getLength()) {
           currentOffset = desired;
           currentRange = i;
-          this.range = bytes[i].duplicate();
+          this.range = curRange.getData().duplicate();
           int pos = range.position();
-          pos += (int)(desired - offsets[i]); // this is why we duplicate
+          pos += (int)(desired - curRange.getOffset()); // this is why we duplicate
           this.range.position(pos);
           return;
         }
+        ++i;
       }
       throw new IllegalArgumentException("Seek in " + name + " to " +
         desired + " is outside of the data");
@@ -127,50 +151,40 @@ abstract class InStream extends InputStr
     }
   }
 
+  private static ByteBuffer allocateBuffer(int size, boolean isDirect) {
+    // TODO: use the same pool as the ORC readers
+    if (isDirect) {
+      return ByteBuffer.allocateDirect(size);
+    } else {
+      return ByteBuffer.allocate(size);
+    }
+  }
+
   private static class CompressedStream extends InStream {
-    private final String name;
-    private final ByteBuffer[] bytes;
-    private final long[] offsets;
+    private final List<DiskRange> bytes;
     private final int bufferSize;
-    private final long length;
     private ByteBuffer uncompressed;
     private final CompressionCodec codec;
     private ByteBuffer compressed;
     private long currentOffset;
     private int currentRange;
     private boolean isUncompressedOriginal;
-    private boolean isDirect = false;
 
-    public CompressedStream(String name, ByteBuffer[] input,
-                            long[] offsets, long length,
-                            CompressionCodec codec, int bufferSize
-                           ) {
+    public CompressedStream(String name, List<DiskRange> input, long length,
+                            CompressionCodec codec, int bufferSize) {
+      super(name, length);
       this.bytes = input;
-      this.name = name;
       this.codec = codec;
-      this.length = length;
-      if(this.length > 0) {
-        isDirect = this.bytes[0].isDirect();
-      }
-      this.offsets = offsets;
       this.bufferSize = bufferSize;
       currentOffset = 0;
       currentRange = 0;
     }
 
-    private ByteBuffer allocateBuffer(int size) {
-      // TODO: use the same pool as the ORC readers
-      if(isDirect == true) {
-        return ByteBuffer.allocateDirect(size);
-      } else {
-        return ByteBuffer.allocate(size);
-      }
-    }
-
     private void readHeader() throws IOException {
       if (compressed == null || compressed.remaining() <= 0) {
         seek(currentOffset);
       }
+      long originalOffset = currentOffset;
       if (compressed.remaining() > OutStream.HEADER_SIZE) {
         int b0 = compressed.get() & 0xff;
         int b1 = compressed.get() & 0xff;
@@ -193,10 +207,10 @@ abstract class InStream extends InputStr
           isUncompressedOriginal = true;
         } else {
           if (isUncompressedOriginal) {
-            uncompressed = allocateBuffer(bufferSize);
+            uncompressed = allocateBuffer(bufferSize, slice.isDirect());
             isUncompressedOriginal = false;
           } else if (uncompressed == null) {
-            uncompressed = allocateBuffer(bufferSize);
+            uncompressed = allocateBuffer(bufferSize, slice.isDirect());
           } else {
             uncompressed.clear();
           }
@@ -246,11 +260,9 @@ abstract class InStream extends InputStr
     public void close() {
       uncompressed = null;
       compressed = null;
-      currentRange = bytes.length;
+      currentRange = bytes.size();
       currentOffset = length;
-      for(int i = 0; i < bytes.length; i++) {
-        bytes[i] = null;
-      }
+      bytes.clear();
     }
 
     @Override
@@ -267,7 +279,7 @@ abstract class InStream extends InputStr
       }
     }
 
-    /* slices a read only contigous buffer of chunkLength */
+    /* slices a read only contiguous buffer of chunkLength */
     private ByteBuffer slice(int chunkLength) throws IOException {
       int len = chunkLength;
       final long oldOffset = currentOffset;
@@ -279,7 +291,7 @@ abstract class InStream extends InputStr
         currentOffset += len;
         compressed.position(compressed.position() + len);
         return slice;
-      } else if (currentRange >= (bytes.length - 1)) {
+      } else if (currentRange >= (bytes.size() - 1)) {
         // nothing has been modified yet
         throw new IOException("EOF in " + this + " while trying to read " +
             chunkLength + " bytes");
@@ -293,16 +305,19 @@ abstract class InStream extends InputStr
 
       // we need to consolidate 2 or more buffers into 1
       // first copy out compressed buffers
-      ByteBuffer copy = allocateBuffer(chunkLength);
+      ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect());
       currentOffset += compressed.remaining();
       len -= compressed.remaining();
       copy.put(compressed);
+      ListIterator<DiskRange> iter = bytes.listIterator(currentRange);
 
-      while (len > 0 && (++currentRange) < bytes.length) {
+      while (len > 0 && iter.hasNext()) {
+        ++currentRange;
         if (LOG.isDebugEnabled()) {
           LOG.debug(String.format("Read slow-path, >1 cross block reads with %s", this.toString()));
         }
-        compressed = bytes[currentRange].duplicate();
+        DiskRange range = iter.next();
+        compressed = range.getData().duplicate();
         if (compressed.remaining() >= len) {
           slice = compressed.slice();
           slice.limit(len);
@@ -323,40 +338,46 @@ abstract class InStream extends InputStr
     }
 
     private void seek(long desired) throws IOException {
-      for(int i = 0; i < bytes.length; ++i) {
-        if (offsets[i] <= desired &&
-            desired - offsets[i] < bytes[i].remaining()) {
+      if (desired == 0 && bytes.isEmpty()) {
+        logEmptySeek(name);
+        return;
+      }
+      int i = 0;
+      for (DiskRange range : bytes) {
+        if (range.getOffset() <= desired && desired < range.getEnd()) {
           currentRange = i;
-          compressed = bytes[i].duplicate();
+          compressed = range.getData().duplicate();
           int pos = compressed.position();
-          pos += (int)(desired - offsets[i]);
+          pos += (int)(desired - range.getOffset());
           compressed.position(pos);
           currentOffset = desired;
           return;
         }
+        ++i;
       }
       // if they are seeking to the precise end, go ahead and let them go there
-      int segments = bytes.length;
-      if (segments != 0 &&
-          desired == offsets[segments - 1] + bytes[segments - 1].remaining()) {
+      int segments = bytes.size();
+      if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
+        DiskRange range = bytes.get(segments - 1);
         currentRange = segments - 1;
-        compressed = bytes[currentRange].duplicate();
+        compressed = range.getData().duplicate();
         compressed.position(compressed.limit());
         currentOffset = desired;
         return;
       }
-      throw new IOException("Seek outside of data in " + this + " to " +
-        desired);
+      throw new IOException("Seek outside of data in " + this + " to " + desired);
     }
 
     private String rangeString() {
       StringBuilder builder = new StringBuilder();
-      for(int i=0; i < offsets.length; ++i) {
+      int i = 0;
+      for (DiskRange range : bytes) {
         if (i != 0) {
           builder.append("; ");
         }
-        builder.append(" range " + i + " = " + offsets[i] + " to " +
-            bytes[i].remaining());
+        builder.append(" range " + i + " = " + range.getOffset()
+            + " to " + (range.getEnd() - range.getOffset()));
+        ++i;
       }
       return builder.toString();
     }
@@ -375,10 +396,16 @@ abstract class InStream extends InputStr
 
   public abstract void seek(PositionProvider index) throws IOException;
 
+  private static void logEmptySeek(String name) {
+    if (LOG.isWarnEnabled()) {
+      LOG.warn("Attempting seek into empty stream (" + name + ") Skipping stream.");
+    }
+  }
+
   /**
    * Create an input stream from a list of buffers.
-   * @param name the name of the stream
-   * @param input the list of ranges of bytes for the stream
+   * @param streamName the name of the stream
+   * @param buffers the list of ranges of bytes for the stream
    * @param offsets a list of offsets (the same length as input) that must
    *                contain the first offset of the each set of bytes in input
    * @param length the length in bytes of the stream
@@ -387,17 +414,40 @@ abstract class InStream extends InputStr
    * @return an input stream
    * @throws IOException
    */
-  public static InStream create(String name,
-                                ByteBuffer[] input,
+  @VisibleForTesting
+  @Deprecated
+  public static InStream create(String streamName,
+                                ByteBuffer[] buffers,
                                 long[] offsets,
                                 long length,
                                 CompressionCodec codec,
                                 int bufferSize) throws IOException {
+    List<DiskRange> input = new ArrayList<DiskRange>(buffers.length);
+    for (int i = 0; i < buffers.length; ++i) {
+      input.add(new BufferChunk(buffers[i], offsets[i]));
+    }
+    return create(streamName, input, length, codec, bufferSize);
+  }
+
+  /**
+   * Create an input stream from a list of disk ranges with data.
+   * @param name the name of the stream
+   * @param input the list of ranges of bytes for the stream; from disk or cache
+   * @param length the length in bytes of the stream
+   * @param codec the compression codec
+   * @param bufferSize the compression buffer size
+   * @return an input stream
+   * @throws IOException
+   */
+  public static InStream create(String name,
+                                List<DiskRange> input,
+                                long length,
+                                CompressionCodec codec,
+                                int bufferSize) throws IOException {
     if (codec == null) {
-      return new UncompressedStream(name, input, offsets, length);
+      return new UncompressedStream(name, input, length);
     } else {
-      return new CompressedStream(name, input, offsets, length, codec,
-          bufferSize);
+      return new CompressedStream(name, input, length, codec, bufferSize);
     }
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java Sat Mar 28 14:03:43 2015
@@ -193,7 +193,7 @@ public final class OrcFile {
     private ReaderImpl.FileMetaInfo fileMetaInfo;
     private long maxLength = Long.MAX_VALUE;
 
-    ReaderOptions(Configuration conf) {
+    public ReaderOptions(Configuration conf) {
       this.conf = conf;
     }
     ReaderOptions fileMetaInfo(ReaderImpl.FileMetaInfo info) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Sat Mar 28 14:03:43 2015
@@ -44,6 +44,9 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
@@ -102,8 +105,7 @@ import com.google.common.util.concurrent
  */
 public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
   InputFormatChecker, VectorizedInputFormatInterface,
-    AcidInputFormat<NullWritable, OrcStruct>, 
-    CombineHiveInputFormat.AvoidSplitCombination {
+    AcidInputFormat<NullWritable, OrcStruct>, CombineHiveInputFormat.AvoidSplitCombination {
 
   private static final Log LOG = LogFactory.getLog(OrcInputFormat.class);
   static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
@@ -111,7 +113,6 @@ public class OrcInputFormat  implements
       SHIMS.getHadoopConfNames().get("MAPREDMINSPLITSIZE");
   static final String MAX_SPLIT_SIZE =
       SHIMS.getHadoopConfNames().get("MAPREDMAXSPLITSIZE");
-  static final String SARG_PUSHDOWN = "sarg.pushdown";
 
   private static final long DEFAULT_MIN_SPLIT_SIZE = 16 * 1024 * 1024;
   private static final long DEFAULT_MAX_SPLIT_SIZE = 256 * 1024 * 1024;
@@ -217,14 +218,17 @@ public class OrcInputFormat  implements
                                                   long offset, long length
                                                   ) throws IOException {
     Reader.Options options = new Reader.Options().range(offset, length);
-    boolean isOriginal =
-        !file.hasMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME);
+    boolean isOriginal = isOriginal(file);
     List<OrcProto.Type> types = file.getTypes();
-    setIncludedColumns(options, types, conf, isOriginal);
+    options.include(genIncludedColumns(types, conf, isOriginal));
     setSearchArgument(options, types, conf, isOriginal);
     return file.rowsOptions(options);
   }
 
+  public static boolean isOriginal(Reader file) {
+    return !file.hasMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME);
+  }
+
   /**
    * Recurse down into a type subtree turning on all of the sub-columns.
    * @param types the types of the file
@@ -244,6 +248,21 @@ public class OrcInputFormat  implements
     }
   }
 
+  public static boolean[] genIncludedColumns(
+      List<OrcProto.Type> types, List<Integer> included, boolean isOriginal) {
+    int rootColumn = getRootColumn(isOriginal);
+    int numColumns = types.size() - rootColumn;
+    boolean[] result = new boolean[numColumns];
+    result[0] = true;
+    OrcProto.Type root = types.get(rootColumn);
+    for(int i=0; i < root.getSubtypesCount(); ++i) {
+      if (included.contains(i)) {
+        includeColumnRecursive(types, result, root.getSubtypes(i),
+            rootColumn);
+      }
+    }
+    return result;
+  }
   /**
    * Take the configuration and figure out which columns we need to include.
    * @param options the options to update
@@ -251,64 +270,51 @@ public class OrcInputFormat  implements
    * @param conf the configuration
    * @param isOriginal is the file in the original format?
    */
-  static void setIncludedColumns(Reader.Options options,
-                                 List<OrcProto.Type> types,
-                                 Configuration conf,
-                                 boolean isOriginal) {
-    int rootColumn = getRootColumn(isOriginal);
-    if (!ColumnProjectionUtils.isReadAllColumns(conf)) {
-      int numColumns = types.size() - rootColumn;
-      boolean[] result = new boolean[numColumns];
-      result[0] = true;
-      OrcProto.Type root = types.get(rootColumn);
+  public static boolean[] genIncludedColumns(
+      List<OrcProto.Type> types, Configuration conf, boolean isOriginal) {
+     if (!ColumnProjectionUtils.isReadAllColumns(conf)) {
       List<Integer> included = ColumnProjectionUtils.getReadColumnIDs(conf);
-      for(int i=0; i < root.getSubtypesCount(); ++i) {
-        if (included.contains(i)) {
-          includeColumnRecursive(types, result, root.getSubtypes(i),
-              rootColumn);
-        }
-      }
-      options.include(result);
+      return genIncludedColumns(types, included, isOriginal);
     } else {
-      options.include(null);
+      return null;
     }
   }
 
+  public static String[] getSargColumnNames(String[] originalColumnNames,
+      List<OrcProto.Type> types, boolean[] includedColumns, boolean isOriginal) {
+    int rootColumn = getRootColumn(isOriginal);
+    String[] columnNames = new String[types.size() - rootColumn];
+    int i = 0;
+    for(int columnId: types.get(rootColumn).getSubtypesList()) {
+      if (includedColumns == null || includedColumns[columnId - rootColumn]) {
+        // this is guaranteed to be positive because types only have children
+        // ids greater than their own id.
+        columnNames[columnId - rootColumn] = originalColumnNames[i++];
+      }
+    }
+    return columnNames;
+  }
+
   static void setSearchArgument(Reader.Options options,
                                 List<OrcProto.Type> types,
                                 Configuration conf,
                                 boolean isOriginal) {
-    int rootColumn = getRootColumn(isOriginal);
-    String serializedPushdown = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
-    String sargPushdown = conf.get(SARG_PUSHDOWN);
-    String columnNamesString =
-        conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
-    if ((sargPushdown == null && serializedPushdown == null)
-        || columnNamesString == null) {
+    String columnNamesString = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
+    if (columnNamesString == null) {
+      LOG.debug("No ORC pushdown predicate - no column names");
+      options.searchArgument(null, null);
+      return;
+    }
+    SearchArgument sarg = SearchArgumentFactory.createFromConf(conf);
+    if (sarg == null) {
       LOG.debug("No ORC pushdown predicate");
       options.searchArgument(null, null);
-    } else {
-      SearchArgument sarg;
-      if (serializedPushdown != null) {
-        sarg = SearchArgumentFactory.create
-            (Utilities.deserializeExpression(serializedPushdown));
-      } else {
-        sarg = SearchArgumentFactory.create(sargPushdown);
-      }
-      LOG.info("ORC pushdown predicate: " + sarg);
-      String[] neededColumnNames = columnNamesString.split(",");
-      String[] columnNames = new String[types.size() - rootColumn];
-      boolean[] includedColumns = options.getInclude();
-      int i = 0;
-      for(int columnId: types.get(rootColumn).getSubtypesList()) {
-        if (includedColumns == null || includedColumns[columnId - rootColumn]) {
-          // this is guaranteed to be positive because types only have children
-          // ids greater than their own id.
-          columnNames[columnId - rootColumn] = neededColumnNames[i++];
-        }
-      }
-      options.searchArgument(sarg, columnNames);
+      return;
     }
+
+    LOG.info("ORC pushdown predicate: " + sarg);
+    options.searchArgument(sarg, getSargColumnNames(
+        columnNamesString.split(","), types, options.getInclude(), isOriginal));
   }
 
   @Override
@@ -776,7 +782,7 @@ public class OrcInputFormat  implements
         // deltas may change the rows making them match the predicate.
         if (deltas.isEmpty()) {
           Reader.Options options = new Reader.Options();
-          setIncludedColumns(options, types, context.conf, isOriginal);
+          options.include(genIncludedColumns(types, context.conf, isOriginal));
           setSearchArgument(options, types, context.conf, isOriginal);
           // only do split pruning if HIVE-8732 has been fixed in the writer
           if (options.getSearchArgument() != null &&
@@ -1124,7 +1130,7 @@ public class OrcInputFormat  implements
           .getBucket();
       reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
       final List<OrcProto.Type> types = reader.getTypes();
-      setIncludedColumns(readOptions, types, conf, split.isOriginal());
+      readOptions.include(genIncludedColumns(types, conf, split.isOriginal()));
       setSearchArgument(readOptions, types, conf, split.isOriginal());
     } else {
       bucket = (int) split.getStart();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionProvider.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionProvider.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionProvider.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionProvider.java Sat Mar 28 14:03:43 2015
@@ -21,6 +21,6 @@ package org.apache.hadoop.hive.ql.io.orc
 /**
  * An interface used for seeking to a row index.
  */
-interface PositionProvider {
+public interface PositionProvider {
   long getNext();
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java Sat Mar 28 14:03:43 2015
@@ -318,4 +318,5 @@ public interface Reader {
                     boolean[] include, SearchArgument sarg,
                     String[] neededColumns) throws IOException;
 
+  MetadataReader metadata() throws IOException;
 }



Mime
View raw message