Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C4D8C17D71 for ; Sat, 28 Mar 2015 14:04:03 +0000 (UTC) Received: (qmail 45947 invoked by uid 500); 28 Mar 2015 14:04:02 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 45851 invoked by uid 500); 28 Mar 2015 14:04:02 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 44685 invoked by uid 99); 28 Mar 2015 14:04:01 -0000 Received: from eris.apache.org (HELO hades.apache.org) (140.211.11.105) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 28 Mar 2015 14:04:01 +0000 Received: from hades.apache.org (localhost [127.0.0.1]) by hades.apache.org (ASF Mail Server at hades.apache.org) with ESMTP id 95130AC09B7 for ; Sat, 28 Mar 2015 14:04:01 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1669775 [14/35] - in /hive/branches/spark: ./ ant/src/org/apache/hadoop/hive/ant/ beeline/src/java/org/apache/hive/beeline/ common/src/java/org/apache/hadoop/hive/common/ common/src/java/org/apache/hadoop/hive/common/type/ common/src/java/... Date: Sat, 28 Mar 2015 14:03:49 -0000 To: commits@hive.apache.org From: xuefu@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20150328140401.95130AC09B7@hades.apache.org> Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java?rev=1669775&r1=1669774&r2=1669775&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java Sat Mar 28 14:03:43 2015 @@ -18,17 +18,24 @@ package org.apache.hadoop.hive.ql.exec.vector; +import java.util.Arrays; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.JoinUtil; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer.ReusableGetAdaptor; +import org.apache.hadoop.hive.ql.exec.persistence.ObjectContainer; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; @@ -38,6 +45,7 @@ import org.apache.hadoop.hive.ql.plan.Ma import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.DataOutputBuffer; /** * The vectorized version of the MapJoinOperator. @@ -56,7 +64,7 @@ public class VectorMapJoinOperator exten private VectorExpression[] bigTableFilterExpressions; private VectorExpression[] bigTableValueExpressions; - + private VectorizationContext vOutContext; // The above members are initialized by the constructor and must not be @@ -64,6 +72,7 @@ public class VectorMapJoinOperator exten //--------------------------------------------------------------------------- private transient VectorizedRowBatch outputBatch; + private transient VectorizedRowBatch scratchBatch; // holds restored (from disk) big table rows private transient VectorExpressionWriter[] valueWriters; private transient Map outputVectorAssigners; @@ -76,7 +85,11 @@ public class VectorMapJoinOperator exten private transient VectorExpressionWriter[] keyOutputWriters; private transient VectorizedRowBatchCtx vrbCtx = null; - + + private transient int tag; // big table alias + private VectorExpressionWriter[] rowWriters; // Writer for producing row from input batch + protected transient Object[] singleRow; + public VectorMapJoinOperator() { super(); } @@ -112,9 +125,22 @@ public class VectorMapJoinOperator exten } @Override - public void initializeOp(Configuration hconf) throws HiveException { - super.initializeOp(hconf); - + public Collection> initializeOp(Configuration hconf) throws HiveException { + // Code borrowed from VectorReduceSinkOperator.initializeOp + VectorExpressionWriterFactory.processVectorInspector( + (StructObjectInspector) inputObjInspectors[0], + new VectorExpressionWriterFactory.SingleOIDClosure() { + @Override + public void assign(VectorExpressionWriter[] writers, + ObjectInspector objectInspector) { + rowWriters = writers; + inputObjInspectors[0] = objectInspector; + } + }); + singleRow = new Object[rowWriters.length]; + + Collection> result = super.initializeOp(hconf); + List keyDesc = conf.getKeys().get(posBigTable); keyOutputWriters = VectorExpressionWriterFactory.getExpressionWriters(keyDesc); @@ -178,6 +204,7 @@ public class VectorMapJoinOperator exten filterMaps[posBigTable] = null; outputVectorAssigners = new HashMap(); + return result; } /** @@ -208,22 +235,34 @@ public class VectorMapJoinOperator exten @Override public void closeOp(boolean aborted) throws HiveException { + super.closeOp(aborted); + for (MapJoinTableContainer tableContainer : mapJoinTables) { + if (tableContainer != null) { + tableContainer.dumpMetrics(); + } + } if (!aborted && 0 < outputBatch.size) { flushOutput(); } } @Override - protected void setMapJoinKey(ReusableGetAdaptor dest, Object row, byte alias) + protected JoinUtil.JoinResult setMapJoinKey(ReusableGetAdaptor dest, Object row, byte alias) throws HiveException { - dest.setFromVector(keyValues[batchIndex], keyOutputWriters, keyWrapperBatch); + return dest.setFromVector(keyValues[batchIndex], keyOutputWriters, keyWrapperBatch); } @Override - public void processOp(Object row, int tag) throws HiveException { + public void process(Object row, int tag) throws HiveException { byte alias = (byte) tag; VectorizedRowBatch inBatch = (VectorizedRowBatch) row; + // Preparation for hybrid grace hash join + this.tag = tag; + if (scratchBatch == null) { + scratchBatch = makeLike(inBatch); + } + if (null != bigTableFilterExpressions) { for(VectorExpression ve:bigTableFilterExpressions) { ve.evaluate(inBatch); @@ -246,7 +285,7 @@ public class VectorMapJoinOperator exten // of row-mode small-tables) this is a reasonable trade-off. // for(batchIndex=0; batchIndex < inBatch.size; ++batchIndex) { - super.processOp(row, tag); + super.process(row, tag); } // Set these two to invalid values so any attempt to use them @@ -259,4 +298,94 @@ public class VectorMapJoinOperator exten public VectorizationContext getOuputVectorizationContext() { return vOutContext; } + + @Override + protected void spillBigTableRow(MapJoinTableContainer hybridHtContainer, Object row) + throws HiveException { + // Extract the actual row from row batch + VectorizedRowBatch inBatch = (VectorizedRowBatch) row; + Object[] actualRow = getRowObject(inBatch, batchIndex); + super.spillBigTableRow(hybridHtContainer, actualRow); + } + + @Override + protected void reProcessBigTable(HybridHashTableContainer.HashPartition partition) + throws HiveException { + ObjectContainer bigTable = partition.getMatchfileObjContainer(); + + DataOutputBuffer dataOutputBuffer = new DataOutputBuffer(); + while (bigTable.hasNext()) { + Object row = bigTable.next(); + VectorizedBatchUtil.addProjectedRowToBatchFrom(row, + (StructObjectInspector) inputObjInspectors[posBigTable], + scratchBatch.size, scratchBatch, dataOutputBuffer); + scratchBatch.size++; + + if (scratchBatch.size == VectorizedRowBatch.DEFAULT_SIZE) { + process(scratchBatch, tag); // call process once we have a full batch + scratchBatch.reset(); + dataOutputBuffer.reset(); + } + } + // Process the row batch that has less than DEFAULT_SIZE rows + if (scratchBatch.size > 0) { + process(scratchBatch, tag); + scratchBatch.reset(); + dataOutputBuffer.reset(); + } + bigTable.clear(); + } + + // Code borrowed from VectorReduceSinkOperator + private Object[] getRowObject(VectorizedRowBatch vrb, int rowIndex) throws HiveException { + int batchIndex = rowIndex; + if (vrb.selectedInUse) { + batchIndex = vrb.selected[rowIndex]; + } + for (int i = 0; i < vrb.projectionSize; i++) { + ColumnVector vectorColumn = vrb.cols[vrb.projectedColumns[i]]; + if (vectorColumn != null) { + singleRow[i] = rowWriters[i].writeValue(vectorColumn, batchIndex); + } else { + // Some columns from tables are not used. + singleRow[i] = null; + } + } + return singleRow; + } + + /** + * Make a new (scratch) batch, which is exactly "like" the batch provided, except that it's empty + * @param batch the batch to imitate + * @return the new batch + * @throws HiveException + */ + VectorizedRowBatch makeLike(VectorizedRowBatch batch) throws HiveException { + VectorizedRowBatch newBatch = new VectorizedRowBatch(batch.numCols); + for (int i = 0; i < batch.numCols; i++) { + ColumnVector colVector = batch.cols[i]; + if (colVector != null) { + ColumnVector newColVector; + if (colVector instanceof LongColumnVector) { + newColVector = new LongColumnVector(); + } else if (colVector instanceof DoubleColumnVector) { + newColVector = new DoubleColumnVector(); + } else if (colVector instanceof BytesColumnVector) { + newColVector = new BytesColumnVector(); + } else if (colVector instanceof DecimalColumnVector) { + DecimalColumnVector decColVector = (DecimalColumnVector) colVector; + newColVector = new DecimalColumnVector(decColVector.precision, decColVector.scale); + } else { + throw new HiveException("Column vector class " + colVector.getClass().getName() + + " is not supported!"); + } + newBatch.cols[i] = newColVector; + newBatch.cols[i].init(); + } + } + newBatch.projectedColumns = Arrays.copyOf(batch.projectedColumns, batch.projectedColumns.length); + newBatch.projectionSize = batch.projectionSize; + newBatch.reset(); + return newBatch; + } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java?rev=1669775&r1=1669774&r2=1669775&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java Sat Mar 28 14:03:43 2015 @@ -18,6 +18,9 @@ package org.apache.hadoop.hive.ql.exec.vector; +import java.util.Collection; +import java.util.concurrent.Future; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; @@ -34,7 +37,7 @@ public class VectorReduceSinkOperator ex // Writer for producing row from input batch. private VectorExpressionWriter[] rowWriters; - + protected transient Object[] singleRow; public VectorReduceSinkOperator(VectorizationContext vContext, OperatorDesc conf) @@ -49,7 +52,7 @@ public class VectorReduceSinkOperator ex } @Override - protected void initializeOp(Configuration hconf) throws HiveException { + protected Collection> initializeOp(Configuration hconf) throws HiveException { // We need a input object inspector that is for the row we will extract out of the // vectorized row batch, not for example, an original inspector for an ORC table, etc. VectorExpressionWriterFactory.processVectorInspector( @@ -64,17 +67,16 @@ public class VectorReduceSinkOperator ex }); singleRow = new Object[rowWriters.length]; - // Call ReduceSinkOperator with new input inspector. - super.initializeOp(hconf); + return super.initializeOp(hconf); } @Override - public void processOp(Object data, int tag) throws HiveException { + public void process(Object data, int tag) throws HiveException { VectorizedRowBatch vrg = (VectorizedRowBatch) data; for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) { Object row = getRowObject(vrg, batchIndex); - super.processOp(row, tag); + super.process(row, tag); } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java?rev=1669775&r1=1669774&r2=1669775&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java Sat Mar 28 14:03:43 2015 @@ -19,9 +19,11 @@ package org.apache.hadoop.hive.ql.exec.vector; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -47,8 +49,8 @@ import org.apache.hadoop.hive.serde2.obj public class VectorSMBMapJoinOperator extends SMBMapJoinOperator implements VectorizationContextRegion { private static final Log LOG = LogFactory.getLog( - VectorSMBMapJoinOperator.class.getName()); - + VectorSMBMapJoinOperator.class.getName()); + private static final long serialVersionUID = 1L; private VectorExpression[] bigTableValueExpressions; @@ -65,7 +67,7 @@ public class VectorSMBMapJoinOperator ex // transient. //--------------------------------------------------------------------------- - private transient VectorizedRowBatch outputBatch; + private transient VectorizedRowBatch outputBatch; private transient VectorizedRowBatchCtx vrbCtx = null; @@ -78,23 +80,23 @@ public class VectorSMBMapJoinOperator ex private transient VectorHashKeyWrapper[] keyValues; private transient SMBJoinKeyEvaluator keyEvaluator; - + private transient VectorExpressionWriter[] valueWriters; - + private interface SMBJoinKeyEvaluator { List evaluate(VectorHashKeyWrapper kw) throws HiveException; -} +} public VectorSMBMapJoinOperator() { super(); } - + public VectorSMBMapJoinOperator(VectorizationContext vContext, OperatorDesc conf) throws HiveException { this(); SMBJoinDesc desc = (SMBJoinDesc) conf; this.conf = desc; - + order = desc.getTagOrder(); numAliases = desc.getExprs().size(); posBigTable = (byte) desc.getPosBigTable(); @@ -118,7 +120,7 @@ public class VectorSMBMapJoinOperator ex vOutContext = new VectorizationContext(desc.getOutputColumnNames()); vOutContext.setFileKey(vContext.getFileKey() + "/SMB_JOIN_" + desc.getBigTableAlias()); } - + @Override protected List smbJoinComputeKeys(Object row, byte alias) throws HiveException { if (alias == this.posBigTable) { @@ -127,21 +129,21 @@ public class VectorSMBMapJoinOperator ex } else { return super.smbJoinComputeKeys(row, alias); } - } - + } + @Override - protected void initializeOp(Configuration hconf) throws HiveException { - super.initializeOp(hconf); + protected Collection> initializeOp(Configuration hconf) throws HiveException { + Collection> result = super.initializeOp(hconf); vrbCtx = new VectorizedRowBatchCtx(); vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) this.outputObjInspector); - + outputBatch = vrbCtx.createVectorizedRowBatch(); - + keyWrapperBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions); - + outputVectorAssigners = new HashMap(); - + // This key evaluator translates from the vectorized VectorHashKeyWrapper format // into the row-mode MapJoinKey keyEvaluator = new SMBJoinKeyEvaluator() { @@ -163,14 +165,14 @@ public class VectorSMBMapJoinOperator ex return key; }; }.init(); - + Map> valueExpressions = conf.getExprs(); - List bigTableExpressions = valueExpressions.get(posBigTable); - + List bigTableExpressions = valueExpressions.get(posBigTable); + // We're hijacking the big table evaluators and replacing them with our own custom ones // which are going to return values from the input batch vector expressions List vectorNodeEvaluators = new ArrayList(bigTableExpressions.size()); - + VectorExpressionWriterFactory.processVectorExpressions( bigTableExpressions, new VectorExpressionWriterFactory.ListOIDClosure() { @@ -180,7 +182,7 @@ public class VectorSMBMapJoinOperator ex valueWriters = writers; joinValuesObjectInspectors[posBigTable] = oids; } - }); + }); for(int i=0; i implements + VectorizationContextRegion { private static final long serialVersionUID = 1L; @@ -62,7 +65,7 @@ public class VectorSelectOperator extend } /** - * Create a new vectorization context to create a new projection, but keep + * Create a new vectorization context to create a new projection, but keep * same output column manager must be inherited to track the scratch the columns. */ vOutContext = new VectorizationContext(vContext); @@ -74,7 +77,7 @@ public class VectorSelectOperator extend for (int i=0; i < colList.size(); ++i) { String columnName = this.conf.getOutputColumnNames().get(i); VectorExpression ve = vExpressions[i]; - vOutContext.addProjectionColumn(columnName, + vOutContext.addProjectionColumn(columnName, ve.getOutputColumn()); } } @@ -83,11 +86,11 @@ public class VectorSelectOperator extend } @Override - protected void initializeOp(Configuration hconf) throws HiveException { + protected Collection> initializeOp(Configuration hconf) throws HiveException { + Collection> result = super.initializeOp(hconf); // Just forward the row as is if (conf.isSelStarNoCompute()) { - initializeChildren(hconf); - return; + return null; } List objectInspectors = new ArrayList(); @@ -102,15 +105,15 @@ public class VectorSelectOperator extend outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector( outputFieldNames, objectInspectors); - initializeChildren(hconf); projectedColumns = new int [vExpressions.length]; for (int i = 0; i < projectedColumns.length; i++) { projectedColumns[i] = vExpressions[i].getOutputColumn(); } + return result; } @Override - public void processOp(Object row, int tag) throws HiveException { + public void process(Object row, int tag) throws HiveException { // Just forward the row as is if (conf.isSelStarNoCompute()) { @@ -167,4 +170,9 @@ public class VectorSelectOperator extend public VectorizationContext getOuputVectorizationContext() { return vOutContext; } + + @Override + public OperatorType getType() { + return OperatorType.SELECT; + } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java?rev=1669775&r1=1669774&r2=1669775&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java Sat Mar 28 14:03:43 2015 @@ -38,6 +38,8 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.common.type.HiveChar; import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; +import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; import org.apache.hadoop.hive.common.type.HiveVarchar; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory; @@ -113,6 +115,7 @@ import org.apache.hadoop.hive.serde2.typ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.util.StringUtils; +import org.apache.hive.common.util.DateUtils; /** * Context class for vectorization execution. @@ -253,6 +256,8 @@ public class VectorizationContext { castExpressionUdfs.add(GenericUDFToChar.class); castExpressionUdfs.add(GenericUDFToVarchar.class); castExpressionUdfs.add(GenericUDFTimestamp.class); + castExpressionUdfs.add(GenericUDFToIntervalYearMonth.class); + castExpressionUdfs.add(GenericUDFToIntervalDayTime.class); castExpressionUdfs.add(UDFToByte.class); castExpressionUdfs.add(UDFToBoolean.class); castExpressionUdfs.add(UDFToDouble.class); @@ -658,6 +663,12 @@ public class VectorizationContext { case TIMESTAMP: genericUdf = new GenericUDFToUnixTimeStamp(); break; + case INTERVAL_YEAR_MONTH: + genericUdf = new GenericUDFToIntervalYearMonth(); + break; + case INTERVAL_DAY_TIME: + genericUdf = new GenericUDFToIntervalDayTime(); + break; case BINARY: genericUdf = new GenericUDFToBinary(); break; @@ -871,8 +882,16 @@ public class VectorizationContext { switch (vectorArgType) { case INT_FAMILY: return new ConstantVectorExpression(outCol, ((Number) constantValue).longValue()); + case DATE: + return new ConstantVectorExpression(outCol, DateWritable.dateToDays((Date) constantValue)); case TIMESTAMP: return new ConstantVectorExpression(outCol, TimestampUtils.getTimeNanoSec((Timestamp) constantValue)); + case INTERVAL_YEAR_MONTH: + return new ConstantVectorExpression(outCol, + ((HiveIntervalYearMonth) constantValue).getTotalMonths()); + case INTERVAL_DAY_TIME: + return new ConstantVectorExpression(outCol, + DateUtils.getIntervalDayTimeTotalNanos((HiveIntervalDayTime) constantValue)); case FLOAT_FAMILY: return new ConstantVectorExpression(outCol, ((Number) constantValue).doubleValue()); case DECIMAL: @@ -1773,6 +1792,14 @@ public class VectorizationContext { return resultType.equalsIgnoreCase("date"); } + public static boolean isIntervalYearMonthFamily(String resultType) { + return resultType.equalsIgnoreCase("interval_year_month"); + } + + public static boolean isIntervalDayTimeFamily(String resultType) { + return resultType.equalsIgnoreCase("interval_day_time"); + } + // return true if this is any kind of float public static boolean isFloatFamily(String resultType) { return resultType.equalsIgnoreCase("double") @@ -1843,12 +1870,19 @@ public class VectorizationContext { private Object getVectorTypeScalarValue(ExprNodeConstantDesc constDesc) throws HiveException { String t = constDesc.getTypeInfo().getTypeName(); - if (isTimestampFamily(t)) { - return TimestampUtils.getTimeNanoSec((Timestamp) getScalarValue(constDesc)); - } else if (isDateFamily(t)) { - return DateWritable.dateToDays((Date) getScalarValue(constDesc)); - } else { - return getScalarValue(constDesc); + VectorExpression.Type type = VectorExpression.Type.getValue(t); + Object scalarValue = getScalarValue(constDesc); + switch (type) { + case TIMESTAMP: + return TimestampUtils.getTimeNanoSec((Timestamp) scalarValue); + case DATE: + return DateWritable.dateToDays((Date) scalarValue); + case INTERVAL_YEAR_MONTH: + return ((HiveIntervalYearMonth) scalarValue).getTotalMonths(); + case INTERVAL_DAY_TIME: + return DateUtils.getIntervalDayTimeTotalNanos((HiveIntervalDayTime) scalarValue); + default: + return scalarValue; } } @@ -1935,6 +1969,9 @@ public class VectorizationContext { return "Date"; case TIMESTAMP: return "Timestamp"; + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY_TIME: + return hiveTypeName; default: return "None"; } @@ -1959,6 +1996,9 @@ public class VectorizationContext { return "Date"; case TIMESTAMP: return "Timestamp"; + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY_TIME: + return hiveTypeName; default: return "None"; } @@ -1969,16 +2009,16 @@ public class VectorizationContext { // TODO: And, investigate if different reduce-side versions are needed for var* and std*, or if map-side aggregate can be used.. Right now they are conservatively // marked map-side (HASH). static ArrayList aggregatesDefinition = new ArrayList() {{ - add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.INT_DATETIME_FAMILY, null, VectorUDAFMinLong.class)); + add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.INT_DATETIME_INTERVAL_FAMILY, null, VectorUDAFMinLong.class)); add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFMinDouble.class)); add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null, VectorUDAFMinString.class)); add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMinDecimal.class)); - add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.INT_DATETIME_FAMILY, null, VectorUDAFMaxLong.class)); + add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.INT_DATETIME_INTERVAL_FAMILY, null, VectorUDAFMaxLong.class)); add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFMaxDouble.class)); add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null, VectorUDAFMaxString.class)); add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMaxDecimal.class)); add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.NONE, GroupByDesc.Mode.HASH, VectorUDAFCountStar.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_DATETIME_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_DATETIME_INTERVAL_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.MERGEPARTIAL, VectorUDAFCountMerge.class)); add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java?rev=1669775&r1=1669774&r2=1669775&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java Sat Mar 28 14:03:43 2015 @@ -26,6 +26,8 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.common.type.HiveChar; +import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; +import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; import org.apache.hadoop.hive.common.type.HiveVarchar; import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -34,6 +36,8 @@ import org.apache.hadoop.hive.serde2.io. import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.HiveCharWritable; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.io.HiveIntervalDayTimeWritable; +import org.apache.hadoop.hive.serde2.io.HiveIntervalYearMonthWritable; import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; import org.apache.hadoop.hive.serde2.io.TimestampWritable; @@ -50,6 +54,7 @@ import org.apache.hadoop.io.FloatWritabl import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hive.common.util.DateUtils; public class VectorizedBatchUtil { private static final Log LOG = LogFactory.getLog(VectorizedBatchUtil.class); @@ -126,6 +131,8 @@ public class VectorizedBatchUtil { case LONG: case TIMESTAMP: case DATE: + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY_TIME: cvList.add(new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE)); break; case FLOAT: @@ -235,11 +242,31 @@ public class VectorizedBatchUtil { final int off = colOffset; // Iterate thru the cols and load the batch for (int i = 0; i < fieldRefs.size(); i++) { - setVector(row, oi, fieldRefs, batch, buffer, rowIndex, i, off); + setVector(row, oi, fieldRefs.get(i), batch, buffer, rowIndex, i, off); } } /** + * Add only the projected column of a regular row to the specified vectorized row batch + * @param row the regular row + * @param oi object inspector for the row + * @param rowIndex the offset to add in the batch + * @param batch vectorized row batch + * @param buffer data output buffer + * @throws HiveException + */ + public static void addProjectedRowToBatchFrom(Object row, StructObjectInspector oi, + int rowIndex, VectorizedRowBatch batch, DataOutputBuffer buffer) throws HiveException { + List fieldRefs = oi.getAllStructFieldRefs(); + for (int i = 0; i < fieldRefs.size(); i++) { + int projectedOutputCol = batch.projectedColumns[i]; + if (batch.cols[projectedOutputCol] == null) { + continue; + } + setVector(row, oi, fieldRefs.get(i), batch, buffer, rowIndex, projectedOutputCol, 0); + } + } + /** * Iterates thru all the columns in a given row and populates the batch * from a given offset * @@ -268,21 +295,21 @@ public class VectorizedBatchUtil { // The value will have already been set before we're called, so don't overwrite it continue; } - setVector(row, oi, fieldRefs, batch, buffer, rowIndex, i, 0); + setVector(row, oi, fieldRefs.get(i), batch, buffer, rowIndex, i, 0); } } private static void setVector(Object row, StructObjectInspector oi, - List fieldRefs, + StructField field, VectorizedRowBatch batch, DataOutputBuffer buffer, int rowIndex, int colIndex, int offset) throws HiveException { - Object fieldData = oi.getStructFieldData(row, fieldRefs.get(colIndex)); - ObjectInspector foi = fieldRefs.get(colIndex).getFieldObjectInspector(); + Object fieldData = oi.getStructFieldData(row, field); + ObjectInspector foi = field.getFieldObjectInspector(); // Vectorization only supports PRIMITIVE data types. Assert the same assert (foi.getCategory() == Category.PRIMITIVE); @@ -390,6 +417,30 @@ public class VectorizedBatchUtil { lcv.isNull[rowIndex] = false; } else { lcv.vector[rowIndex] = 1; + setNullColIsNullValue(lcv, rowIndex); + } + } + break; + case INTERVAL_YEAR_MONTH: { + LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex]; + if (writableCol != null) { + HiveIntervalYearMonth i = ((HiveIntervalYearMonthWritable) writableCol).getHiveIntervalYearMonth(); + lcv.vector[rowIndex] = i.getTotalMonths(); + lcv.isNull[rowIndex] = false; + } else { + lcv.vector[rowIndex] = 1; + setNullColIsNullValue(lcv, rowIndex); + } + } + break; + case INTERVAL_DAY_TIME: { + LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex]; + if (writableCol != null) { + HiveIntervalDayTime i = ((HiveIntervalDayTimeWritable) writableCol).getHiveIntervalDayTime(); + lcv.vector[rowIndex] = DateUtils.getIntervalDayTimeTotalNanos(i); + lcv.isNull[rowIndex] = false; + } else { + lcv.vector[rowIndex] = 1; setNullColIsNullValue(lcv, rowIndex); } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1669775&r1=1669774&r2=1669775&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Sat Mar 28 14:03:43 2015 @@ -35,6 +35,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; +import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; @@ -42,6 +44,7 @@ import org.apache.hadoop.hive.ql.io.IOPr import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; @@ -61,6 +64,7 @@ import org.apache.hadoop.hive.serde2.typ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileSplit; +import org.apache.hive.common.util.DateUtils; /** * Context for Vectorized row batch. this calss does eager deserialization of row data using serde @@ -301,6 +305,8 @@ public class VectorizedRowBatchCtx { case LONG: case TIMESTAMP: case DATE: + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY_TIME: result.cols[j] = new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE); break; case FLOAT: @@ -503,7 +509,31 @@ public class VectorizedRowBatchCtx { } } break; - + + case INTERVAL_YEAR_MONTH: { + LongColumnVector lcv = (LongColumnVector) batch.cols[colIndex]; + if (value == null) { + lcv.noNulls = false; + lcv.isNull[0] = true; + lcv.isRepeating = true; + } else { + lcv.fill(((HiveIntervalYearMonth) value).getTotalMonths()); + lcv.isNull[0] = false; + } + } + + case INTERVAL_DAY_TIME: { + LongColumnVector lcv = (LongColumnVector) batch.cols[colIndex]; + if (value == null) { + lcv.noNulls = false; + lcv.isNull[0] = true; + lcv.isRepeating = true; + } else { + lcv.fill(DateUtils.getIntervalDayTimeTotalNanos((HiveIntervalDayTime) value)); + lcv.isNull[0] = false; + } + } + case FLOAT: { DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[colIndex]; if (value == null) { @@ -637,7 +667,9 @@ public class VectorizedRowBatchCtx { return new DecimalColumnVector(defaultSize, precisionScale[0], precisionScale[1]); } else if (type.equalsIgnoreCase("long") || type.equalsIgnoreCase("date") || - type.equalsIgnoreCase("timestamp")) { + type.equalsIgnoreCase("timestamp") || + type.equalsIgnoreCase(serdeConstants.INTERVAL_YEAR_MONTH_TYPE_NAME) || + type.equalsIgnoreCase(serdeConstants.INTERVAL_DAY_TIME_TYPE_NAME)) { return new LongColumnVector(defaultSize); } else { throw new Error("Cannot allocate vector column for " + type); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/CuckooSetBytes.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/CuckooSetBytes.java?rev=1669775&r1=1669774&r2=1669775&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/CuckooSetBytes.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/CuckooSetBytes.java Sat Mar 28 14:03:43 2015 @@ -206,7 +206,7 @@ public class CuckooSetBytes { // Save original values if (prev1 == null) { prev1 = t1; - prev1 = t2; + prev2 = t2; } t1 = new byte[n][]; t2 = new byte[n][]; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/CuckooSetDouble.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/CuckooSetDouble.java?rev=1669775&r1=1669774&r2=1669775&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/CuckooSetDouble.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/CuckooSetDouble.java Sat Mar 28 14:03:43 2015 @@ -18,8 +18,6 @@ package org.apache.hadoop.hive.ql.exec.vector.expressions; -import java.util.Arrays; -import java.util.Random; /** * A high-performance set implementation used to support fast set membership testing, Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/CuckooSetLong.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/CuckooSetLong.java?rev=1669775&r1=1669774&r2=1669775&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/CuckooSetLong.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/CuckooSetLong.java Sat Mar 28 14:03:43 2015 @@ -244,7 +244,7 @@ public class CuckooSetLong { // Save original values if (prev1 == null) { prev1 = t1; - prev1 = t2; + prev2 = t2; } t1 = new long[n]; t2 = new long[n]; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/MathExpr.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/MathExpr.java?rev=1669775&r1=1669774&r2=1669775&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/MathExpr.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/MathExpr.java Sat Mar 28 14:03:43 2015 @@ -75,11 +75,11 @@ public class MathExpr { return v == 0.0D ? 0L : 1L; } - /* Convert an integer value in miliseconds since the epoch to a timestamp value + /* Convert an integer value in seconds since the epoch to a timestamp value * for use in a long column vector, which is represented in nanoseconds since the epoch. */ public static long longToTimestamp(long v) { - return v * 1000000; + return v * 1000000000; } // Convert seconds since the epoch (with fraction) to nanoseconds, as a long integer. Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java?rev=1669775&r1=1669774&r2=1669775&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java Sat Mar 28 14:03:43 2015 @@ -30,7 +30,8 @@ import org.apache.hadoop.hive.ql.exec.ve */ public abstract class VectorExpression implements Serializable { public enum Type { - STRING, CHAR, VARCHAR, TIMESTAMP, DATE, LONG, DOUBLE, DECIMAL, OTHER; + STRING, CHAR, VARCHAR, TIMESTAMP, DATE, LONG, DOUBLE, DECIMAL, + INTERVAL_YEAR_MONTH, INTERVAL_DAY_TIME, OTHER; private static Map types = ImmutableMap.builder() .put("string", STRING) .put("char", CHAR) @@ -40,6 +41,8 @@ public abstract class VectorExpression i .put("long", LONG) .put("double", DOUBLE) .put("decimal", DECIMAL) + .put("interval_year_month", INTERVAL_YEAR_MONTH) + .put("interval_day_time", INTERVAL_DAY_TIME) .build(); public static Type getValue(String name) { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java?rev=1669775&r1=1669774&r2=1669775&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java Sat Mar 28 14:03:43 2015 @@ -28,6 +28,8 @@ import org.apache.commons.lang.ArrayUtil import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.common.type.HiveChar; import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; +import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; import org.apache.hadoop.hive.common.type.HiveVarchar; import org.apache.hadoop.hive.ql.exec.vector.*; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -47,6 +49,8 @@ import org.apache.hadoop.hive.serde2.obj import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableFloatObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableHiveCharObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableHiveDecimalObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableHiveIntervalDayTimeObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableHiveIntervalYearMonthObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableHiveVarcharObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableIntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableLongObjectInspector; @@ -56,6 +60,7 @@ import org.apache.hadoop.hive.serde2.obj import org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.Text; +import org.apache.hive.common.util.DateUtils; /** * VectorExpressionWritableFactory helper class for generating VectorExpressionWritable objects. @@ -430,6 +435,12 @@ public final class VectorExpressionWrite case DATE: return genVectorExpressionWritableDate( (SettableDateObjectInspector) fieldObjInspector); + case INTERVAL_YEAR_MONTH: + return genVectorExpressionWritableIntervalYearMonth( + (SettableHiveIntervalYearMonthObjectInspector) fieldObjInspector); + case INTERVAL_DAY_TIME: + return genVectorExpressionWritableIntervalDayTime( + (SettableHiveIntervalDayTimeObjectInspector) fieldObjInspector); case DECIMAL: return genVectorExpressionWritableDecimal( (SettableHiveDecimalObjectInspector) fieldObjInspector); @@ -586,6 +597,84 @@ public final class VectorExpressionWrite } }.init(fieldObjInspector); } + + private static VectorExpressionWriter genVectorExpressionWritableIntervalYearMonth( + SettableHiveIntervalYearMonthObjectInspector fieldObjInspector) throws HiveException { + return new VectorExpressionWriterLong() { + private Object obj; + private HiveIntervalYearMonth interval; + + public VectorExpressionWriter init(SettableHiveIntervalYearMonthObjectInspector objInspector) + throws HiveException { + super.init(objInspector); + interval = new HiveIntervalYearMonth(); + obj = initValue(null); + return this; + } + + @Override + public Object writeValue(long value) { + interval.set((int) value); + ((SettableHiveIntervalYearMonthObjectInspector) this.objectInspector).set(obj, interval); + return obj; + } + + @Override + public Object setValue(Object field, long value) { + if (null == field) { + field = initValue(null); + } + interval.set((int) value); + ((SettableHiveIntervalYearMonthObjectInspector) this.objectInspector).set(field, interval); + return field; + } + + @Override + public Object initValue(Object ignored) { + return ((SettableHiveIntervalYearMonthObjectInspector) this.objectInspector) + .create(new HiveIntervalYearMonth()); + } + }.init(fieldObjInspector); + } + + private static VectorExpressionWriter genVectorExpressionWritableIntervalDayTime( + SettableHiveIntervalDayTimeObjectInspector fieldObjInspector) throws HiveException { + return new VectorExpressionWriterLong() { + private Object obj; + private HiveIntervalDayTime interval; + + public VectorExpressionWriter init(SettableHiveIntervalDayTimeObjectInspector objInspector) + throws HiveException { + super.init(objInspector); + interval = new HiveIntervalDayTime(); + obj = initValue(null); + return this; + } + + @Override + public Object writeValue(long value) { + DateUtils.setIntervalDayTimeTotalNanos(interval, value); + ((SettableHiveIntervalDayTimeObjectInspector) this.objectInspector).set(obj, interval); + return obj; + } + + @Override + public Object setValue(Object field, long value) { + if (null == field) { + field = initValue(null); + } + DateUtils.setIntervalDayTimeTotalNanos(interval, value); + ((SettableHiveIntervalDayTimeObjectInspector) this.objectInspector).set(field, interval); + return field; + } + + @Override + public Object initValue(Object ignored) { + return ((SettableHiveIntervalDayTimeObjectInspector) this.objectInspector) + .create(new HiveIntervalDayTime()); + } + }.init(fieldObjInspector); + } private static VectorExpressionWriter genVectorExpressionWritableChar( SettableHiveCharObjectInspector fieldObjInspector) throws HiveException { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookUtils.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookUtils.java?rev=1669775&r1=1669774&r2=1669775&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookUtils.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookUtils.java Sat Mar 28 14:03:43 2015 @@ -65,4 +65,19 @@ public class HookUtils { return hooks; } + public static String redactLogString(HiveConf conf, String logString) + throws InstantiationException, IllegalAccessException, ClassNotFoundException { + + String redactedString = logString; + + if (conf != null && logString != null) { + List queryRedactors = getHooks(conf, ConfVars.QUERYREDACTORHOOKS, Redactor.class); + for (Redactor redactor : queryRedactors) { + redactor.setConf(conf); + redactedString = redactor.redactQuery(redactedString); + } + } + + return redactedString; + } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java?rev=1669775&r1=1669774&r2=1669775&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java Sat Mar 28 14:03:43 2015 @@ -146,11 +146,11 @@ public class VectorizedRCFileRecordReade @Override public VectorizedRowBatch createValue() { - VectorizedRowBatch result = null; + VectorizedRowBatch result; try { result = rbCtx.createVectorizedRowBatch(); } catch (HiveException e) { - new RuntimeException("Error creating a batch", e); + throw new RuntimeException("Error creating a batch", e); } return result; } @@ -193,7 +193,7 @@ public class VectorizedRCFileRecordReade } } } catch (Exception e) { - new RuntimeException("Error while getting next row", e); + throw new RuntimeException("Error while getting next row", e); } value.size = i; return more; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileMapper.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileMapper.java?rev=1669775&r1=1669774&r2=1669775&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileMapper.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileMapper.java Sat Mar 28 14:03:43 2015 @@ -97,7 +97,7 @@ public class MergeFileMapper extends Map row[0] = key; row[1] = value; try { - mergeOp.processOp(row, 0); + mergeOp.process(row, 0); } catch (HiveException e) { abort = true; throw new IOException(e); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java?rev=1669775&r1=1669774&r2=1669775&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java Sat Mar 28 14:03:43 2015 @@ -23,7 +23,7 @@ import java.util.EnumSet; import javax.annotation.Nullable; -interface CompressionCodec { +public interface CompressionCodec { public enum Modifier { /* speed/compression tradeoffs */ Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java?rev=1669775&r1=1669774&r2=1669775&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java Sat Mar 28 14:03:43 2015 @@ -50,7 +50,7 @@ import org.codehaus.jettison.json.JSONWr * A tool for printing out the file structure of ORC files. */ public final class FileDump { - private static final String ROWINDEX_PREFIX = "--rowindex="; + private static final String UNKNOWN = "UNKNOWN"; // not used private FileDump() {} @@ -77,9 +77,13 @@ public final class FileDump { } } + boolean printTimeZone = false; + if (cli.hasOption('t')) { + printTimeZone = true; + } String[] files = cli.getArgs(); if (dumpData) printData(Arrays.asList(files), conf); - else printMetaData(Arrays.asList(files), conf, rowIndexCols); + else printMetaData(Arrays.asList(files), conf, rowIndexCols, printTimeZone); } private static void printData(List files, Configuration conf) throws IOException, @@ -90,7 +94,7 @@ public final class FileDump { } private static void printMetaData(List files, Configuration conf, - List rowIndexCols) throws IOException { + List rowIndexCols, boolean printTimeZone) throws IOException { for (String filename : files) { System.out.println("Structure for " + filename); Path path = new Path(filename); @@ -125,11 +129,19 @@ public final class FileDump { for (StripeInformation stripe : reader.getStripes()) { ++stripeIx; long stripeStart = stripe.getOffset(); - System.out.println(" Stripe: " + stripe.toString()); OrcProto.StripeFooter footer = rows.readStripeFooter(stripe); + if (printTimeZone) { + String tz = footer.getWriterTimezone(); + if (tz == null || tz.isEmpty()) { + tz = UNKNOWN; + } + System.out.println(" Stripe: " + stripe.toString() + " timezone: " + tz); + } else { + System.out.println(" Stripe: " + stripe.toString()); + } long sectionStart = stripeStart; for(OrcProto.Stream section: footer.getStreamsList()) { - String kind = section.hasKind() ? section.getKind().name() : "UNKNOWN"; + String kind = section.hasKind() ? section.getKind().name() : UNKNOWN; System.out.println(" Stream: column " + section.getColumn() + " section " + kind + " start: " + sectionStart + " length " + section.getLength()); @@ -157,7 +169,7 @@ public final class FileDump { for (int colIdx : rowIndexCols) { sargColumns[colIdx] = true; } - RecordReaderImpl.Index indices = rows.readRowIndex(stripeIx, sargColumns); + RecordReaderImpl.Index indices = rows.readRowIndex(stripeIx, null, sargColumns); for (int col : rowIndexCols) { StringBuilder buf = new StringBuilder(); String rowIdxString = getFormattedRowIndices(col, indices.getRowGroupIndex()); @@ -278,6 +290,13 @@ public final class FileDump { .withDescription("Should the data be printed") .create('d')); + // to avoid breaking unit tests (when run in different time zones) for file dump, printing + // of timezone is made optional + result.addOption(OptionBuilder + .withLongOpt("timezone") + .withDescription("Print writer's time zone") + .create('t')); + result.addOption(OptionBuilder .withLongOpt("help") .withDescription("print help message") Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1669775&r1=1669774&r2=1669775&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Sat Mar 28 14:03:43 2015 @@ -20,28 +20,50 @@ package org.apache.hadoop.hive.ql.io.orc import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.ListIterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.DiskRange; +import org.apache.hadoop.hive.common.DiskRangeList; +import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk; +import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim; -abstract class InStream extends InputStream { +import com.google.common.annotations.VisibleForTesting; + +public abstract class InStream extends InputStream { private static final Log LOG = LogFactory.getLog(InStream.class); + protected final String name; + protected final long length; + + public InStream(String name, long length) { + this.name = name; + this.length = length; + } + + public String getStreamName() { + return name; + } + + public long getStreamLength() { + return length; + } + private static class UncompressedStream extends InStream { - private final String name; - private final ByteBuffer[] bytes; - private final long[] offsets; + private final List bytes; private final long length; private long currentOffset; private ByteBuffer range; private int currentRange; - public UncompressedStream(String name, ByteBuffer[] input, long[] offsets, - long length) { - this.name = name; + public UncompressedStream(String name, List input, long length) { + super(name, length); this.bytes = input; - this.offsets = offsets; this.length = length; currentRange = 0; currentOffset = 0; @@ -83,12 +105,10 @@ abstract class InStream extends InputStr @Override public void close() { - currentRange = bytes.length; + currentRange = bytes.size(); currentOffset = length; // explicit de-ref of bytes[] - for(int i = 0; i < bytes.length; i++) { - bytes[i] = null; - } + bytes.clear(); } @Override @@ -97,23 +117,27 @@ abstract class InStream extends InputStr } public void seek(long desired) { - for(int i = 0; i < bytes.length; ++i) { - if (desired == 0 && bytes[i].remaining() == 0) { - if (LOG.isWarnEnabled()) { - LOG.warn("Attempting seek into empty stream (" + name + ") Skipping stream."); - } + if (desired == 0 && bytes.isEmpty()) { + logEmptySeek(name); + return; + } + int i = 0; + for(DiskRange curRange : bytes) { + if (desired == 0 && curRange.getData().remaining() == 0) { + logEmptySeek(name); return; } - if (offsets[i] <= desired && - desired - offsets[i] < bytes[i].remaining()) { + if (curRange.getOffset() <= desired && + (desired - curRange.getOffset()) < curRange.getLength()) { currentOffset = desired; currentRange = i; - this.range = bytes[i].duplicate(); + this.range = curRange.getData().duplicate(); int pos = range.position(); - pos += (int)(desired - offsets[i]); // this is why we duplicate + pos += (int)(desired - curRange.getOffset()); // this is why we duplicate this.range.position(pos); return; } + ++i; } throw new IllegalArgumentException("Seek in " + name + " to " + desired + " is outside of the data"); @@ -127,50 +151,40 @@ abstract class InStream extends InputStr } } + private static ByteBuffer allocateBuffer(int size, boolean isDirect) { + // TODO: use the same pool as the ORC readers + if (isDirect) { + return ByteBuffer.allocateDirect(size); + } else { + return ByteBuffer.allocate(size); + } + } + private static class CompressedStream extends InStream { - private final String name; - private final ByteBuffer[] bytes; - private final long[] offsets; + private final List bytes; private final int bufferSize; - private final long length; private ByteBuffer uncompressed; private final CompressionCodec codec; private ByteBuffer compressed; private long currentOffset; private int currentRange; private boolean isUncompressedOriginal; - private boolean isDirect = false; - public CompressedStream(String name, ByteBuffer[] input, - long[] offsets, long length, - CompressionCodec codec, int bufferSize - ) { + public CompressedStream(String name, List input, long length, + CompressionCodec codec, int bufferSize) { + super(name, length); this.bytes = input; - this.name = name; this.codec = codec; - this.length = length; - if(this.length > 0) { - isDirect = this.bytes[0].isDirect(); - } - this.offsets = offsets; this.bufferSize = bufferSize; currentOffset = 0; currentRange = 0; } - private ByteBuffer allocateBuffer(int size) { - // TODO: use the same pool as the ORC readers - if(isDirect == true) { - return ByteBuffer.allocateDirect(size); - } else { - return ByteBuffer.allocate(size); - } - } - private void readHeader() throws IOException { if (compressed == null || compressed.remaining() <= 0) { seek(currentOffset); } + long originalOffset = currentOffset; if (compressed.remaining() > OutStream.HEADER_SIZE) { int b0 = compressed.get() & 0xff; int b1 = compressed.get() & 0xff; @@ -193,10 +207,10 @@ abstract class InStream extends InputStr isUncompressedOriginal = true; } else { if (isUncompressedOriginal) { - uncompressed = allocateBuffer(bufferSize); + uncompressed = allocateBuffer(bufferSize, slice.isDirect()); isUncompressedOriginal = false; } else if (uncompressed == null) { - uncompressed = allocateBuffer(bufferSize); + uncompressed = allocateBuffer(bufferSize, slice.isDirect()); } else { uncompressed.clear(); } @@ -246,11 +260,9 @@ abstract class InStream extends InputStr public void close() { uncompressed = null; compressed = null; - currentRange = bytes.length; + currentRange = bytes.size(); currentOffset = length; - for(int i = 0; i < bytes.length; i++) { - bytes[i] = null; - } + bytes.clear(); } @Override @@ -267,7 +279,7 @@ abstract class InStream extends InputStr } } - /* slices a read only contigous buffer of chunkLength */ + /* slices a read only contiguous buffer of chunkLength */ private ByteBuffer slice(int chunkLength) throws IOException { int len = chunkLength; final long oldOffset = currentOffset; @@ -279,7 +291,7 @@ abstract class InStream extends InputStr currentOffset += len; compressed.position(compressed.position() + len); return slice; - } else if (currentRange >= (bytes.length - 1)) { + } else if (currentRange >= (bytes.size() - 1)) { // nothing has been modified yet throw new IOException("EOF in " + this + " while trying to read " + chunkLength + " bytes"); @@ -293,16 +305,19 @@ abstract class InStream extends InputStr // we need to consolidate 2 or more buffers into 1 // first copy out compressed buffers - ByteBuffer copy = allocateBuffer(chunkLength); + ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect()); currentOffset += compressed.remaining(); len -= compressed.remaining(); copy.put(compressed); + ListIterator iter = bytes.listIterator(currentRange); - while (len > 0 && (++currentRange) < bytes.length) { + while (len > 0 && iter.hasNext()) { + ++currentRange; if (LOG.isDebugEnabled()) { LOG.debug(String.format("Read slow-path, >1 cross block reads with %s", this.toString())); } - compressed = bytes[currentRange].duplicate(); + DiskRange range = iter.next(); + compressed = range.getData().duplicate(); if (compressed.remaining() >= len) { slice = compressed.slice(); slice.limit(len); @@ -323,40 +338,46 @@ abstract class InStream extends InputStr } private void seek(long desired) throws IOException { - for(int i = 0; i < bytes.length; ++i) { - if (offsets[i] <= desired && - desired - offsets[i] < bytes[i].remaining()) { + if (desired == 0 && bytes.isEmpty()) { + logEmptySeek(name); + return; + } + int i = 0; + for (DiskRange range : bytes) { + if (range.getOffset() <= desired && desired < range.getEnd()) { currentRange = i; - compressed = bytes[i].duplicate(); + compressed = range.getData().duplicate(); int pos = compressed.position(); - pos += (int)(desired - offsets[i]); + pos += (int)(desired - range.getOffset()); compressed.position(pos); currentOffset = desired; return; } + ++i; } // if they are seeking to the precise end, go ahead and let them go there - int segments = bytes.length; - if (segments != 0 && - desired == offsets[segments - 1] + bytes[segments - 1].remaining()) { + int segments = bytes.size(); + if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) { + DiskRange range = bytes.get(segments - 1); currentRange = segments - 1; - compressed = bytes[currentRange].duplicate(); + compressed = range.getData().duplicate(); compressed.position(compressed.limit()); currentOffset = desired; return; } - throw new IOException("Seek outside of data in " + this + " to " + - desired); + throw new IOException("Seek outside of data in " + this + " to " + desired); } private String rangeString() { StringBuilder builder = new StringBuilder(); - for(int i=0; i < offsets.length; ++i) { + int i = 0; + for (DiskRange range : bytes) { if (i != 0) { builder.append("; "); } - builder.append(" range " + i + " = " + offsets[i] + " to " + - bytes[i].remaining()); + builder.append(" range " + i + " = " + range.getOffset() + + " to " + (range.getEnd() - range.getOffset())); + ++i; } return builder.toString(); } @@ -375,10 +396,16 @@ abstract class InStream extends InputStr public abstract void seek(PositionProvider index) throws IOException; + private static void logEmptySeek(String name) { + if (LOG.isWarnEnabled()) { + LOG.warn("Attempting seek into empty stream (" + name + ") Skipping stream."); + } + } + /** * Create an input stream from a list of buffers. - * @param name the name of the stream - * @param input the list of ranges of bytes for the stream + * @param streamName the name of the stream + * @param buffers the list of ranges of bytes for the stream * @param offsets a list of offsets (the same length as input) that must * contain the first offset of the each set of bytes in input * @param length the length in bytes of the stream @@ -387,17 +414,40 @@ abstract class InStream extends InputStr * @return an input stream * @throws IOException */ - public static InStream create(String name, - ByteBuffer[] input, + @VisibleForTesting + @Deprecated + public static InStream create(String streamName, + ByteBuffer[] buffers, long[] offsets, long length, CompressionCodec codec, int bufferSize) throws IOException { + List input = new ArrayList(buffers.length); + for (int i = 0; i < buffers.length; ++i) { + input.add(new BufferChunk(buffers[i], offsets[i])); + } + return create(streamName, input, length, codec, bufferSize); + } + + /** + * Create an input stream from a list of disk ranges with data. + * @param name the name of the stream + * @param input the list of ranges of bytes for the stream; from disk or cache + * @param length the length in bytes of the stream + * @param codec the compression codec + * @param bufferSize the compression buffer size + * @return an input stream + * @throws IOException + */ + public static InStream create(String name, + List input, + long length, + CompressionCodec codec, + int bufferSize) throws IOException { if (codec == null) { - return new UncompressedStream(name, input, offsets, length); + return new UncompressedStream(name, input, length); } else { - return new CompressedStream(name, input, offsets, length, codec, - bufferSize); + return new CompressedStream(name, input, length, codec, bufferSize); } } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java?rev=1669775&r1=1669774&r2=1669775&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java Sat Mar 28 14:03:43 2015 @@ -193,7 +193,7 @@ public final class OrcFile { private ReaderImpl.FileMetaInfo fileMetaInfo; private long maxLength = Long.MAX_VALUE; - ReaderOptions(Configuration conf) { + public ReaderOptions(Configuration conf) { this.conf = conf; } ReaderOptions fileMetaInfo(ReaderImpl.FileMetaInfo info) { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1669775&r1=1669774&r2=1669775&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Sat Mar 28 14:03:43 2015 @@ -44,6 +44,9 @@ import org.apache.hadoop.hive.conf.HiveC import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; @@ -102,8 +105,7 @@ import com.google.common.util.concurrent */ public class OrcInputFormat implements InputFormat, InputFormatChecker, VectorizedInputFormatInterface, - AcidInputFormat, - CombineHiveInputFormat.AvoidSplitCombination { + AcidInputFormat, CombineHiveInputFormat.AvoidSplitCombination { private static final Log LOG = LogFactory.getLog(OrcInputFormat.class); static final HadoopShims SHIMS = ShimLoader.getHadoopShims(); @@ -111,7 +113,6 @@ public class OrcInputFormat implements SHIMS.getHadoopConfNames().get("MAPREDMINSPLITSIZE"); static final String MAX_SPLIT_SIZE = SHIMS.getHadoopConfNames().get("MAPREDMAXSPLITSIZE"); - static final String SARG_PUSHDOWN = "sarg.pushdown"; private static final long DEFAULT_MIN_SPLIT_SIZE = 16 * 1024 * 1024; private static final long DEFAULT_MAX_SPLIT_SIZE = 256 * 1024 * 1024; @@ -217,14 +218,17 @@ public class OrcInputFormat implements long offset, long length ) throws IOException { Reader.Options options = new Reader.Options().range(offset, length); - boolean isOriginal = - !file.hasMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME); + boolean isOriginal = isOriginal(file); List types = file.getTypes(); - setIncludedColumns(options, types, conf, isOriginal); + options.include(genIncludedColumns(types, conf, isOriginal)); setSearchArgument(options, types, conf, isOriginal); return file.rowsOptions(options); } + public static boolean isOriginal(Reader file) { + return !file.hasMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME); + } + /** * Recurse down into a type subtree turning on all of the sub-columns. * @param types the types of the file @@ -244,6 +248,21 @@ public class OrcInputFormat implements } } + public static boolean[] genIncludedColumns( + List types, List included, boolean isOriginal) { + int rootColumn = getRootColumn(isOriginal); + int numColumns = types.size() - rootColumn; + boolean[] result = new boolean[numColumns]; + result[0] = true; + OrcProto.Type root = types.get(rootColumn); + for(int i=0; i < root.getSubtypesCount(); ++i) { + if (included.contains(i)) { + includeColumnRecursive(types, result, root.getSubtypes(i), + rootColumn); + } + } + return result; + } /** * Take the configuration and figure out which columns we need to include. * @param options the options to update @@ -251,64 +270,51 @@ public class OrcInputFormat implements * @param conf the configuration * @param isOriginal is the file in the original format? */ - static void setIncludedColumns(Reader.Options options, - List types, - Configuration conf, - boolean isOriginal) { - int rootColumn = getRootColumn(isOriginal); - if (!ColumnProjectionUtils.isReadAllColumns(conf)) { - int numColumns = types.size() - rootColumn; - boolean[] result = new boolean[numColumns]; - result[0] = true; - OrcProto.Type root = types.get(rootColumn); + public static boolean[] genIncludedColumns( + List types, Configuration conf, boolean isOriginal) { + if (!ColumnProjectionUtils.isReadAllColumns(conf)) { List included = ColumnProjectionUtils.getReadColumnIDs(conf); - for(int i=0; i < root.getSubtypesCount(); ++i) { - if (included.contains(i)) { - includeColumnRecursive(types, result, root.getSubtypes(i), - rootColumn); - } - } - options.include(result); + return genIncludedColumns(types, included, isOriginal); } else { - options.include(null); + return null; } } + public static String[] getSargColumnNames(String[] originalColumnNames, + List types, boolean[] includedColumns, boolean isOriginal) { + int rootColumn = getRootColumn(isOriginal); + String[] columnNames = new String[types.size() - rootColumn]; + int i = 0; + for(int columnId: types.get(rootColumn).getSubtypesList()) { + if (includedColumns == null || includedColumns[columnId - rootColumn]) { + // this is guaranteed to be positive because types only have children + // ids greater than their own id. + columnNames[columnId - rootColumn] = originalColumnNames[i++]; + } + } + return columnNames; + } + static void setSearchArgument(Reader.Options options, List types, Configuration conf, boolean isOriginal) { - int rootColumn = getRootColumn(isOriginal); - String serializedPushdown = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR); - String sargPushdown = conf.get(SARG_PUSHDOWN); - String columnNamesString = - conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR); - if ((sargPushdown == null && serializedPushdown == null) - || columnNamesString == null) { + String columnNamesString = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR); + if (columnNamesString == null) { + LOG.debug("No ORC pushdown predicate - no column names"); + options.searchArgument(null, null); + return; + } + SearchArgument sarg = SearchArgumentFactory.createFromConf(conf); + if (sarg == null) { LOG.debug("No ORC pushdown predicate"); options.searchArgument(null, null); - } else { - SearchArgument sarg; - if (serializedPushdown != null) { - sarg = SearchArgumentFactory.create - (Utilities.deserializeExpression(serializedPushdown)); - } else { - sarg = SearchArgumentFactory.create(sargPushdown); - } - LOG.info("ORC pushdown predicate: " + sarg); - String[] neededColumnNames = columnNamesString.split(","); - String[] columnNames = new String[types.size() - rootColumn]; - boolean[] includedColumns = options.getInclude(); - int i = 0; - for(int columnId: types.get(rootColumn).getSubtypesList()) { - if (includedColumns == null || includedColumns[columnId - rootColumn]) { - // this is guaranteed to be positive because types only have children - // ids greater than their own id. - columnNames[columnId - rootColumn] = neededColumnNames[i++]; - } - } - options.searchArgument(sarg, columnNames); + return; } + + LOG.info("ORC pushdown predicate: " + sarg); + options.searchArgument(sarg, getSargColumnNames( + columnNamesString.split(","), types, options.getInclude(), isOriginal)); } @Override @@ -776,7 +782,7 @@ public class OrcInputFormat implements // deltas may change the rows making them match the predicate. if (deltas.isEmpty()) { Reader.Options options = new Reader.Options(); - setIncludedColumns(options, types, context.conf, isOriginal); + options.include(genIncludedColumns(types, context.conf, isOriginal)); setSearchArgument(options, types, context.conf, isOriginal); // only do split pruning if HIVE-8732 has been fixed in the writer if (options.getSearchArgument() != null && @@ -1124,7 +1130,7 @@ public class OrcInputFormat implements .getBucket(); reader = OrcFile.createReader(path, OrcFile.readerOptions(conf)); final List types = reader.getTypes(); - setIncludedColumns(readOptions, types, conf, split.isOriginal()); + readOptions.include(genIncludedColumns(types, conf, split.isOriginal())); setSearchArgument(readOptions, types, conf, split.isOriginal()); } else { bucket = (int) split.getStart(); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionProvider.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionProvider.java?rev=1669775&r1=1669774&r2=1669775&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionProvider.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionProvider.java Sat Mar 28 14:03:43 2015 @@ -21,6 +21,6 @@ package org.apache.hadoop.hive.ql.io.orc /** * An interface used for seeking to a row index. */ -interface PositionProvider { +public interface PositionProvider { long getNext(); } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java?rev=1669775&r1=1669774&r2=1669775&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java Sat Mar 28 14:03:43 2015 @@ -318,4 +318,5 @@ public interface Reader { boolean[] include, SearchArgument sarg, String[] neededColumns) throws IOException; + MetadataReader metadata() throws IOException; }