Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8A342115D7 for ; Wed, 10 Sep 2014 07:41:46 +0000 (UTC) Received: (qmail 30431 invoked by uid 500); 10 Sep 2014 07:41:46 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 30391 invoked by uid 500); 10 Sep 2014 07:41:46 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 30380 invoked by uid 99); 10 Sep 2014 07:41:46 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Sep 2014 07:41:46 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Sep 2014 07:41:41 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 3B642238899C; Wed, 10 Sep 2014 07:41:21 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: svn commit: r1623929 [1/2] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/ ql/src/java/org/apache/hadoop/hive/ql/plan/ ql/src/te... Date: Wed, 10 Sep 2014 07:41:19 -0000 To: commits@hive.apache.org From: hashutosh@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140910074121.3B642238899C@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: hashutosh Date: Wed Sep 10 07:41:19 2014 New Revision: 1623929 URL: http://svn.apache.org/r1623929 Log: HIVE-7405 : Vectorize GROUP BY on the Reduce-Side (Part 1 – Basic) (Matt McCline via Ashutosh Chauhan) Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractVectorDesc.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorDesc.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorGroupByDesc.java Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AggregateDefinition.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q hive/trunk/ql/src/test/results/clientpositive/dynpart_sort_opt_vectorization.q.out hive/trunk/ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out hive/trunk/ql/src/test/results/clientpositive/tez/tez_join_hash.q.out hive/trunk/ql/src/test/results/clientpositive/tez/vector_left_outer_join.q.out hive/trunk/ql/src/test/results/clientpositive/tez/vectorized_nested_mapjoin.q.out hive/trunk/ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1623929&r1=1623928&r2=1623929&view=diff ============================================================================== --- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original) +++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Sep 10 07:41:19 2014 @@ -1710,6 +1710,9 @@ public class HiveConf extends Configurat HIVE_VECTORIZATION_ENABLED("hive.vectorized.execution.enabled", false, "This flag should be set to true to enable vectorized mode of query execution.\n" + "The default value is false."), + HIVE_VECTORIZATION_REDUCE_ENABLED("hive.vectorized.execution.reduce.enabled", true, + "This flag should be set to true to enable vectorized mode of the reduce-side of query execution.\n" + + "The default value is true."), HIVE_VECTORIZATION_GROUPBY_CHECKINTERVAL("hive.vectorized.groupby.checkinterval", 100000, "Number of entries added to the group by aggregation hash before a recomputation of average entry size is performed."), HIVE_VECTORIZATION_GROUPBY_MAXENTRIES("hive.vectorized.groupby.maxentries", 1000000, Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AggregateDefinition.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AggregateDefinition.java?rev=1623929&r1=1623928&r2=1623929&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AggregateDefinition.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AggregateDefinition.java Wed Sep 10 07:41:19 2014 @@ -19,16 +19,20 @@ package org.apache.hadoop.hive.ql.exec.vector; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; +import org.apache.hadoop.hive.ql.plan.GroupByDesc; class AggregateDefinition { + private String name; private VectorExpressionDescriptor.ArgumentType type; + private GroupByDesc.Mode mode; private Class aggClass; AggregateDefinition(String name, VectorExpressionDescriptor.ArgumentType type, - Class aggClass) { + GroupByDesc.Mode mode, Class aggClass) { this.name = name; this.type = type; + this.mode = mode; this.aggClass = aggClass; } @@ -38,6 +42,9 @@ class AggregateDefinition { VectorExpressionDescriptor.ArgumentType getType() { return type; } + GroupByDesc.Mode getMode() { + return mode; + } Class getAggClass() { return aggClass; } Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java?rev=1623929&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java Wed Sep 10 07:41:19 2014 @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +import java.util.Arrays; + +import org.apache.hadoop.hive.ql.metadata.HiveException; + +/** + * Class to keep information on a set of typed vector columns. Used by + * other classes to efficiently access the set of columns. + */ +public class VectorColumnSetInfo { + + // For simpler access, we make these members protected instead of + // providing get methods. + + /** + * indices of LONG primitive keys. + */ + protected int[] longIndices; + + /** + * indices of DOUBLE primitive keys. + */ + protected int[] doubleIndices; + + /** + * indices of string (byte[]) primitive keys. + */ + protected int[] stringIndices; + + /** + * indices of decimal primitive keys. + */ + protected int[] decimalIndices; + + /** + * Helper class for looking up a key value based on key index. + */ + public class KeyLookupHelper { + public int longIndex; + public int doubleIndex; + public int stringIndex; + public int decimalIndex; + + private static final int INDEX_UNUSED = -1; + + private void resetIndices() { + this.longIndex = this.doubleIndex = this.stringIndex = this.decimalIndex = INDEX_UNUSED; + } + public void setLong(int index) { + resetIndices(); + this.longIndex= index; + } + + public void setDouble(int index) { + resetIndices(); + this.doubleIndex = index; + } + + public void setString(int index) { + resetIndices(); + this.stringIndex = index; + } + + public void setDecimal(int index) { + resetIndices(); + this.decimalIndex = index; + } + } + + /** + * Lookup vector to map from key index to primitive type index. + */ + protected KeyLookupHelper[] indexLookup; + + private int keyCount; + private int addIndex; + + protected int longIndicesIndex; + protected int doubleIndicesIndex; + protected int stringIndicesIndex; + protected int decimalIndicesIndex; + + protected VectorColumnSetInfo(int keyCount) { + this.keyCount = keyCount; + this.addIndex = 0; + + // We'll over allocate and then shrink the array for each type + longIndices = new int[this.keyCount]; + longIndicesIndex = 0; + doubleIndices = new int[this.keyCount]; + doubleIndicesIndex = 0; + stringIndices = new int[this.keyCount]; + stringIndicesIndex = 0; + decimalIndices = new int[this.keyCount]; + decimalIndicesIndex = 0; + indexLookup = new KeyLookupHelper[this.keyCount]; + } + + protected void addKey(String outputType) throws HiveException { + indexLookup[addIndex] = new KeyLookupHelper(); + if (VectorizationContext.isIntFamily(outputType) || + VectorizationContext.isDatetimeFamily(outputType)) { + longIndices[longIndicesIndex] = addIndex; + indexLookup[addIndex].setLong(longIndicesIndex); + ++longIndicesIndex; + } else if (VectorizationContext.isFloatFamily(outputType)) { + doubleIndices[doubleIndicesIndex] = addIndex; + indexLookup[addIndex].setDouble(doubleIndicesIndex); + ++doubleIndicesIndex; + } else if (VectorizationContext.isStringFamily(outputType)) { + stringIndices[stringIndicesIndex]= addIndex; + indexLookup[addIndex].setString(stringIndicesIndex); + ++stringIndicesIndex; + } else if (VectorizationContext.isDecimalFamily(outputType)) { + decimalIndices[decimalIndicesIndex]= addIndex; + indexLookup[addIndex].setDecimal(decimalIndicesIndex); + ++decimalIndicesIndex; + } + else { + throw new HiveException("Unsuported vector output type: " + outputType); + } + addIndex++; + } + + protected void finishAdding() { + longIndices = Arrays.copyOf(longIndices, longIndicesIndex); + doubleIndices = Arrays.copyOf(doubleIndices, doubleIndicesIndex); + stringIndices = Arrays.copyOf(stringIndices, stringIndicesIndex); + decimalIndices = Arrays.copyOf(decimalIndices, decimalIndicesIndex); + } +} \ No newline at end of file Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java?rev=1623929&r1=1623928&r2=1623929&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java Wed Sep 10 07:41:19 2014 @@ -32,8 +32,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.KeyWrapper; +import org.apache.hadoop.hive.ql.exec.KeyWrapperFactory; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; @@ -47,13 +50,17 @@ import org.apache.hadoop.hive.ql.plan.ap import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.DataOutputBuffer; /** * Vectorized GROUP BY operator implementation. Consumes the vectorized input and * stores the aggregate operators' intermediate states. Emits row mode output. * */ -public class VectorGroupByOperator extends GroupByOperator { +public class VectorGroupByOperator extends GroupByOperator implements VectorizationContextRegion { private static final Log LOG = LogFactory.getLog( VectorGroupByOperator.class.getName()); @@ -70,6 +77,17 @@ public class VectorGroupByOperator exten */ private VectorExpression[] keyExpressions; + private boolean isVectorOutput; + + // Create a new outgoing vectorization context because column name map will change. + private VectorizationContext vOutContext = null; + + private String fileKey; + + // The above members are initialized by the constructor and must not be + // transient. + //--------------------------------------------------------------------------- + private transient VectorExpressionWriter[] keyOutputWriters; /** @@ -85,11 +103,18 @@ public class VectorGroupByOperator exten private transient Object[] forwardCache; + private transient VectorizedRowBatch outputBatch; + private transient VectorizedRowBatchCtx vrbCtx; + + private transient VectorColumnAssign[] vectorColumnAssign; + /** - * Interface for processing mode: global, hash or streaming + * Interface for processing mode: global, hash, unsorted streaming, or group batch */ private static interface IProcessingMode { public void initialize(Configuration hconf) throws HiveException; + public void startGroup() throws HiveException; + public void endGroup() throws HiveException; public void processBatch(VectorizedRowBatch batch) throws HiveException; public void close(boolean aborted) throws HiveException; } @@ -98,6 +123,15 @@ public class VectorGroupByOperator exten * Base class for all processing modes */ private abstract class ProcessingModeBase implements IProcessingMode { + + // Overridden and used in sorted reduce group batch processing mode. + public void startGroup() throws HiveException { + // Do nothing. + } + public void endGroup() throws HiveException { + // Do nothing. + } + /** * Evaluates the aggregators on the current batch. * The aggregationBatchInfo must have been prepared @@ -170,7 +204,7 @@ public class VectorGroupByOperator exten @Override public void close(boolean aborted) throws HiveException { if (!aborted) { - flushSingleRow(null, aggregationBuffers); + writeSingleRow(null, aggregationBuffers); } } } @@ -426,7 +460,7 @@ public class VectorGroupByOperator exten while(iter.hasNext()) { Map.Entry pair = iter.next(); - flushSingleRow((VectorHashKeyWrapper) pair.getKey(), pair.getValue()); + writeSingleRow((VectorHashKeyWrapper) pair.getKey(), pair.getValue()); if (!all) { iter.remove(); @@ -501,20 +535,21 @@ public class VectorGroupByOperator exten if (numEntriesHashTable > sumBatchSize * minReductionHashAggr) { flush(true); - changeToStreamingMode(); + changeToUnsortedStreamingMode(); } } } } /** - * Streaming processing mode. Intermediate values are flushed each time key changes. - * In this mode we're relying on the MR shuffle and merge the intermediates in the reduce. + * Unsorted streaming processing mode. Each input VectorizedRowBatch may have + * a mix of different keys (hence unsorted). Intermediate values are flushed + * each time key changes. */ - private class ProcessingModeStreaming extends ProcessingModeBase { + private class ProcessingModeUnsortedStreaming extends ProcessingModeBase { /** - * The aggreagation buffers used in streaming mode + * The aggregation buffers used in streaming mode */ private VectorAggregationBufferRow currentStreamingAggregators; @@ -557,7 +592,7 @@ public class VectorGroupByOperator exten // Nothing to do } }); - LOG.info("using streaming aggregation processing mode"); + LOG.info("using unsorted streaming aggregation processing mode"); } @Override @@ -601,7 +636,7 @@ public class VectorGroupByOperator exten // Now flush/forward all keys/rows, except the last (current) one for (int i = 0; i < flushMark; ++i) { - flushSingleRow(keysToFlush[i], rowsToFlush[i]); + writeSingleRow(keysToFlush[i], rowsToFlush[i]); rowsToFlush[i].reset(); streamAggregationBufferRowPool.putInPool(rowsToFlush[i]); } @@ -610,7 +645,79 @@ public class VectorGroupByOperator exten @Override public void close(boolean aborted) throws HiveException { if (!aborted && null != streamingKey) { - flushSingleRow(streamingKey, currentStreamingAggregators); + writeSingleRow(streamingKey, currentStreamingAggregators); + } + } + } + + /** + * Sorted reduce group batch processing mode. Each input VectorizedRowBatch will have the + * same key. On endGroup (or close), the intermediate values are flushed. + */ + private class ProcessingModeGroupBatches extends ProcessingModeBase { + + private boolean inGroup; + private boolean first; + + /** + * The group vector key helper. + */ + VectorGroupKeyHelper groupKeyHelper; + + /** + * The group vector aggregation buffers. + */ + private VectorAggregationBufferRow groupAggregators; + + /** + * Buffer to hold string values. + */ + private DataOutputBuffer buffer; + + @Override + public void initialize(Configuration hconf) throws HiveException { + inGroup = false; + groupKeyHelper = new VectorGroupKeyHelper(keyExpressions.length); + groupKeyHelper.init(keyExpressions); + groupAggregators = allocateAggregationBuffer(); + buffer = new DataOutputBuffer(); + LOG.info("using sorted group batch aggregation processing mode"); + } + + @Override + public void startGroup() throws HiveException { + inGroup = true; + first = true; + } + + @Override + public void endGroup() throws HiveException { + if (inGroup && !first) { + writeGroupRow(groupAggregators, buffer); + groupAggregators.reset(); + } + inGroup = false; + } + + @Override + public void processBatch(VectorizedRowBatch batch) throws HiveException { + assert(inGroup); + if (first) { + // Copy the group key to output batch now. We'll copy in the aggregates at the end of the group. + first = false; + groupKeyHelper.copyGroupKey(batch, outputBatch, buffer); + } + + // Aggregate this batch. + for (int i = 0; i < aggregators.length; ++i) { + aggregators[i].aggregateInput(groupAggregators.getAggregationBuffer(i), batch); + } + } + + @Override + public void close(boolean aborted) throws HiveException { + if (!aborted && inGroup && !first) { + writeGroupRow(groupAggregators, buffer); } } } @@ -633,8 +740,20 @@ public class VectorGroupByOperator exten aggregators = new VectorAggregateExpression[aggrDesc.size()]; for (int i = 0; i < aggrDesc.size(); ++i) { AggregationDesc aggDesc = aggrDesc.get(i); - aggregators[i] = vContext.getAggregatorExpression(aggDesc); + aggregators[i] = vContext.getAggregatorExpression(aggDesc, desc.getVectorDesc().isReduce()); } + + isVectorOutput = desc.getVectorDesc().isVectorOutput(); + + List outColNames = desc.getOutputColumnNames(); + Map mapOutCols = new HashMap(outColNames.size()); + int outColIndex = 0; + for(String outCol: outColNames) { + mapOutCols.put(outCol, outColIndex++); + } + vOutContext = new VectorizationContext(mapOutCols, outColIndex); + vOutContext.setFileKey(vContext.getFileKey() + "/_GROUPBY_"); + fileKey = vOutContext.getFileKey(); } public VectorGroupByOperator() { @@ -662,13 +781,23 @@ public class VectorGroupByOperator exten objectInspectors.add(aggregators[i].getOutputObjectInspector()); } - keyWrappersBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions); - aggregationBatchInfo = new VectorAggregationBufferBatch(); - aggregationBatchInfo.compileAggregationBatchInfo(aggregators); - + if (!conf.getVectorDesc().isVectorGroupBatches()) { + // These data structures are only used by the map-side processing modes. + keyWrappersBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions); + aggregationBatchInfo = new VectorAggregationBufferBatch(); + aggregationBatchInfo.compileAggregationBatchInfo(aggregators); + } + LOG.warn("VectorGroupByOperator is vector output " + isVectorOutput); List outputFieldNames = conf.getOutputColumnNames(); outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector( outputFieldNames, objectInspectors); + if (isVectorOutput) { + vrbCtx = new VectorizedRowBatchCtx(); + vrbCtx.init(hconf, fileKey, (StructObjectInspector) outputObjInspector); + outputBatch = vrbCtx.createVectorizedRowBatch(); + vectorColumnAssign = VectorColumnAssignFactory.buildAssigners( + outputBatch, outputObjInspector, vOutContext.getColumnMap(), conf.getOutputColumnNames()); + } } catch (HiveException he) { throw he; @@ -678,32 +807,43 @@ public class VectorGroupByOperator exten initializeChildren(hconf); - forwardCache =new Object[keyExpressions.length + aggregators.length]; + forwardCache = new Object[keyExpressions.length + aggregators.length]; if (keyExpressions.length == 0) { - processingMode = this.new ProcessingModeGlobalAggregate(); - } - else { - //TODO: consider if parent can offer order guarantees - // If input is sorted, is more efficient to use the streaming mode + processingMode = this.new ProcessingModeGlobalAggregate(); + } else if (conf.getVectorDesc().isVectorGroupBatches()) { + // Sorted GroupBy of vector batches where an individual batch has the same group key (e.g. reduce). + processingMode = this.new ProcessingModeGroupBatches(); + } else { + // We start in hash mode and may dynamically switch to unsorted stream mode. processingMode = this.new ProcessingModeHashAggregate(); } processingMode.initialize(hconf); } /** - * changes the processing mode to streaming + * changes the processing mode to unsorted streaming * This is done at the request of the hash agg mode, if the number of keys * exceeds the minReductionHashAggr factor * @throws HiveException */ - private void changeToStreamingMode() throws HiveException { - processingMode = this.new ProcessingModeStreaming(); + private void changeToUnsortedStreamingMode() throws HiveException { + processingMode = this.new ProcessingModeUnsortedStreaming(); processingMode.initialize(null); LOG.trace("switched to streaming mode"); } @Override + public void startGroup() throws HiveException { + processingMode.startGroup(); + } + + @Override + public void endGroup() throws HiveException { + processingMode.endGroup(); + } + + @Override public void processOp(Object row, int tag) throws HiveException { VectorizedRowBatch batch = (VectorizedRowBatch) row; @@ -719,26 +859,72 @@ public class VectorGroupByOperator exten * @param agg * @throws HiveException */ - private void flushSingleRow(VectorHashKeyWrapper kw, VectorAggregationBufferRow agg) + private void writeSingleRow(VectorHashKeyWrapper kw, VectorAggregationBufferRow agg) throws HiveException { int fi = 0; - for (int i = 0; i < keyExpressions.length; ++i) { - forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue ( - kw, i, keyOutputWriters[i]); + if (!isVectorOutput) { + // Output row. + for (int i = 0; i < keyExpressions.length; ++i) { + forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue ( + kw, i, keyOutputWriters[i]); + } + for (int i = 0; i < aggregators.length; ++i) { + forwardCache[fi++] = aggregators[i].evaluateOutput(agg.getAggregationBuffer(i)); + } + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("forwarding keys: %s: %s", + kw, Arrays.toString(forwardCache))); + } + forward(forwardCache, outputObjInspector); + } else { + // Output keys and aggregates into the output batch. + for (int i = 0; i < keyExpressions.length; ++i) { + vectorColumnAssign[fi++].assignObjectValue(keyWrappersBatch.getWritableKeyValue ( + kw, i, keyOutputWriters[i]), outputBatch.size); + } + for (int i = 0; i < aggregators.length; ++i) { + vectorColumnAssign[fi++].assignObjectValue(aggregators[i].evaluateOutput( + agg.getAggregationBuffer(i)), outputBatch.size); + } + ++outputBatch.size; + if (outputBatch.size == VectorizedRowBatch.DEFAULT_SIZE) { + flushOutput(); + } } + } + + /** + * Emits a (reduce) group row, made from the key (copied in at the beginning of the group) and + * the row aggregation buffers values + * @param agg + * @param buffer + * @throws HiveException + */ + private void writeGroupRow(VectorAggregationBufferRow agg, DataOutputBuffer buffer) + throws HiveException { + int fi = keyExpressions.length; // Start after group keys. for (int i = 0; i < aggregators.length; ++i) { - forwardCache[fi++] = aggregators[i].evaluateOutput(agg.getAggregationBuffer(i)); + vectorColumnAssign[fi++].assignObjectValue(aggregators[i].evaluateOutput( + agg.getAggregationBuffer(i)), outputBatch.size); } - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("forwarding keys: %s: %s", - kw, Arrays.toString(forwardCache))); + ++outputBatch.size; + if (outputBatch.size == VectorizedRowBatch.DEFAULT_SIZE) { + flushOutput(); + buffer.reset(); } - forward(forwardCache, outputObjInspector); + } + + private void flushOutput() throws HiveException { + forward(outputBatch, null); + outputBatch.reset(); } @Override public void closeOp(boolean aborted) throws HiveException { processingMode.close(aborted); + if (!aborted && isVectorOutput && outputBatch.size > 0) { + flushOutput(); + } } static public String getOperatorName() { @@ -761,4 +947,8 @@ public class VectorGroupByOperator exten this.aggregators = aggregators; } + @Override + public VectorizationContext getOuputVectorizationContext() { + return vOutContext; + } } Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java?rev=1623929&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java Wed Sep 10 07:41:19 2014 @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.io.DataOutputBuffer; + +/** + * Class for copying the group key from an input batch to an output batch. + */ +public class VectorGroupKeyHelper extends VectorColumnSetInfo { + + public VectorGroupKeyHelper(int keyCount) { + super(keyCount); + } + + void init(VectorExpression[] keyExpressions) throws HiveException { + // Inspect the output type of each key expression. + for(int i=0; i < keyExpressions.length; ++i) { + addKey(keyExpressions[i].getOutputType()); + } + finishAdding(); + } + + public void copyGroupKey(VectorizedRowBatch inputBatch, VectorizedRowBatch outputBatch, + DataOutputBuffer buffer) throws HiveException { + // Grab the key at index 0. We don't care about selected or repeating since all keys in the input batch are the same. + for(int i = 0; i< longIndices.length; ++i) { + int keyIndex = longIndices[i]; + LongColumnVector inputColumnVector = (LongColumnVector) inputBatch.cols[keyIndex]; + LongColumnVector outputColumnVector = (LongColumnVector) outputBatch.cols[keyIndex]; + if (inputColumnVector.noNulls || !inputColumnVector.isNull[0]) { + outputColumnVector.vector[outputBatch.size] = inputColumnVector.vector[0]; + } else if (inputColumnVector.noNulls ){ + outputColumnVector.noNulls = false; + outputColumnVector.isNull[outputBatch.size] = true; + } else { + outputColumnVector.isNull[outputBatch.size] = true; + } + } + for(int i=0;i aggregatesDefinition = new ArrayList() {{ - add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFMinLong.class)); - add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFMinDouble.class)); - add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, VectorUDAFMinString.class)); - add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFMinDecimal.class)); - add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFMaxLong.class)); - add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFMaxDouble.class)); - add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, VectorUDAFMaxString.class)); - add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFMaxDecimal.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.NONE, VectorUDAFCountStar.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFCount.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFCount.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, VectorUDAFCount.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFCount.class)); - add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFSumLong.class)); - add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFSumDouble.class)); - add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFSumDecimal.class)); - add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFAvgLong.class)); - add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFAvgDouble.class)); - add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFAvgDecimal.class)); - add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFVarPopLong.class)); - add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFVarPopLong.class)); - add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFVarPopDouble.class)); - add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFVarPopDouble.class)); - add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFVarPopDecimal.class)); - add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFVarPopDecimal.class)); - add(new AggregateDefinition("var_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFVarSampLong.class)); - add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFVarSampDouble.class)); - add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFVarSampDecimal.class)); - add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFStdPopLong.class)); - add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFStdPopLong.class)); - add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFStdPopLong.class)); - add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFStdPopDouble.class)); - add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFStdPopDouble.class)); - add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFStdPopDouble.class)); - add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFStdPopDecimal.class)); - add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFStdPopDecimal.class)); - add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFStdPopDecimal.class)); - add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFStdSampLong.class)); - add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFStdSampDouble.class)); - add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFStdSampDecimal.class)); + add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, null, VectorUDAFMinLong.class)); + add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFMinDouble.class)); + add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null, VectorUDAFMinString.class)); + add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMinDecimal.class)); + add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, null, VectorUDAFMaxLong.class)); + add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFMaxDouble.class)); + add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null, VectorUDAFMaxString.class)); + add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMaxDecimal.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.NONE, GroupByDesc.Mode.HASH, VectorUDAFCountStar.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.MERGEPARTIAL, VectorUDAFSumLong.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); + add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, null, VectorUDAFSumLong.class)); + add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFSumDouble.class)); + add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFSumDecimal.class)); + add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFAvgLong.class)); + add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFAvgDouble.class)); + add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFAvgDecimal.class)); + add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopLong.class)); + add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopLong.class)); + add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopDouble.class)); + add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopDouble.class)); + add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarPopDecimal.class)); + add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarPopDecimal.class)); + add(new AggregateDefinition("var_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarSampLong.class)); + add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarSampDouble.class)); + add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarSampDecimal.class)); + add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class)); + add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class)); + add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class)); + add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class)); + add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class)); + add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class)); + add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class)); + add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class)); + add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class)); + add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdSampLong.class)); + add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdSampDouble.class)); + add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdSampDecimal.class)); }}; - public VectorAggregateExpression getAggregatorExpression(AggregationDesc desc) + public VectorAggregateExpression getAggregatorExpression(AggregationDesc desc, boolean isReduce) throws HiveException { ArrayList paramDescList = desc.getParameters(); @@ -1948,8 +1956,15 @@ public class VectorizationContext { for (AggregateDefinition aggDef : aggregatesDefinition) { if (aggregateName.equalsIgnoreCase(aggDef.getName()) && ((aggDef.getType() == VectorExpressionDescriptor.ArgumentType.NONE && - inputType == VectorExpressionDescriptor.ArgumentType.NONE) || + inputType == VectorExpressionDescriptor.ArgumentType.NONE) || (aggDef.getType().isSameTypeOrFamily(inputType)))) { + + if (aggDef.getMode() == GroupByDesc.Mode.HASH && isReduce) { + continue; + } else if (aggDef.getMode() == GroupByDesc.Mode.MERGEPARTIAL && !isReduce) { + continue; + } + Class aggClass = aggDef.getAggClass(); try { @@ -1967,7 +1982,7 @@ public class VectorizationContext { } throw new HiveException("Vector aggregate not implemented: \"" + aggregateName + - "\" for type: \"" + inputType.name() + ""); + "\" for type: \"" + inputType.name() + " (reduce-side = " + isReduce + ")"); } public Map getOutputColumnTypeMap() { Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1623929&r1=1623928&r2=1623929&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Wed Sep 10 07:41:19 2014 @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.exec.Ut import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.IOPrepareCache; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.Deserializer; @@ -124,15 +125,20 @@ public class VectorizedRowBatchCtx { * Used by non-tablescan operators when they change the vectorization context * @param hiveConf * @param fileKey - * The key on which to retrieve the extra column mapping from the map scratch + * The key on which to retrieve the extra column mapping from the map/reduce scratch * @param rowOI * Object inspector that shapes the column types */ public void init(Configuration hiveConf, String fileKey, StructObjectInspector rowOI) { - columnTypeMap = Utilities - .getMapRedWork(hiveConf).getMapWork().getScratchColumnVectorTypes() - .get(fileKey); + MapredWork mapredWork = Utilities.getMapRedWork(hiveConf); + Map> scratchColumnVectorTypes; + if (mapredWork.getMapWork() != null) { + scratchColumnVectorTypes = mapredWork.getMapWork().getScratchColumnVectorTypes(); + } else { + scratchColumnVectorTypes = mapredWork.getReduceWork().getScratchColumnVectorTypes(); + } + columnTypeMap = scratchColumnVectorTypes.get(fileKey); this.rowOI= rowOI; this.rawRowOI = rowOI; } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java?rev=1623929&r1=1623928&r2=1623929&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java Wed Sep 10 07:41:19 2014 @@ -38,9 +38,11 @@ import org.apache.hadoop.hive.ql.exec.mr import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.exec.vector.VectorExtractOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor; +import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; +import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; @@ -61,6 +63,7 @@ import org.apache.hadoop.hive.ql.plan.Ba import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; @@ -71,6 +74,7 @@ import org.apache.hadoop.hive.ql.plan.SM import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.plan.VectorGroupByDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.udf.UDFAcos; import org.apache.hadoop.hive.ql.udf.UDFAsin; @@ -290,23 +294,26 @@ public class Vectorizer implements Physi throws SemanticException { Task currTask = (Task) nd; if (currTask instanceof MapRedTask) { - convertMapWork(((MapRedTask) currTask).getWork().getMapWork()); + convertMapWork(((MapRedTask) currTask).getWork().getMapWork(), false); } else if (currTask instanceof TezTask) { TezWork work = ((TezTask) currTask).getWork(); for (BaseWork w: work.getAllWork()) { if (w instanceof MapWork) { - convertMapWork((MapWork)w); + convertMapWork((MapWork) w, true); } else if (w instanceof ReduceWork) { // We are only vectorizing Reduce under Tez. - convertReduceWork((ReduceWork)w); + if (HiveConf.getBoolVar(pctx.getConf(), + HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_ENABLED)) { + convertReduceWork((ReduceWork) w); + } } } } return null; } - private void convertMapWork(MapWork mapWork) throws SemanticException { - boolean ret = validateMapWork(mapWork); + private void convertMapWork(MapWork mapWork, boolean isTez) throws SemanticException { + boolean ret = validateMapWork(mapWork, isTez); if (ret) { vectorizeMapWork(mapWork); } @@ -319,7 +326,8 @@ public class Vectorizer implements Physi + ReduceSinkOperator.getOperatorName()), np); } - private boolean validateMapWork(MapWork mapWork) throws SemanticException { + private boolean validateMapWork(MapWork mapWork, boolean isTez) throws SemanticException { + LOG.info("Validating MapWork..."); // Validate the input format for (String path : mapWork.getPathToPartitionInfo().keySet()) { @@ -333,7 +341,7 @@ public class Vectorizer implements Physi } } Map opRules = new LinkedHashMap(); - MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor(); + MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor(isTez); addMapWorkRules(opRules, vnp); Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null); GraphWalker ogw = new DefaultGraphWalker(disp); @@ -417,9 +425,12 @@ public class Vectorizer implements Physi private void addReduceWorkRules(Map opRules, NodeProcessor np) { opRules.put(new RuleRegExp("R1", ExtractOperator.getOperatorName() + ".*"), np); opRules.put(new RuleRegExp("R2", GroupByOperator.getOperatorName() + ".*"), np); + opRules.put(new RuleRegExp("R3", SelectOperator.getOperatorName() + ".*"), np); } private boolean validateReduceWork(ReduceWork reduceWork) throws SemanticException { + LOG.info("Validating ReduceWork..."); + // Validate input to ReduceWork. if (!getOnlyStructObjectInspectors(reduceWork)) { return false; @@ -487,16 +498,21 @@ public class Vectorizer implements Physi class MapWorkValidationNodeProcessor implements NodeProcessor { + private boolean isTez; + + public MapWorkValidationNodeProcessor(boolean isTez) { + this.isTez = isTez; + } + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { for (Node n : stack) { Operator op = (Operator) n; - if ((op.getType().equals(OperatorType.REDUCESINK) || op.getType().equals(OperatorType.FILESINK)) && - op.getParentOperators().get(0).getType().equals(OperatorType.GROUPBY)) { + if (nonVectorizableChildOfGroupBy(op)) { return new Boolean(true); } - boolean ret = validateMapWorkOperator(op); + boolean ret = validateMapWorkOperator(op, isTez); if (!ret) { LOG.info("MapWork Operator: " + op.getName() + " could not be vectorized."); return new Boolean(false); @@ -513,6 +529,9 @@ public class Vectorizer implements Physi Object... nodeOutputs) throws SemanticException { for (Node n : stack) { Operator op = (Operator) n; + if (nonVectorizableChildOfGroupBy(op)) { + return new Boolean(true); + } boolean ret = validateReduceWorkOperator(op); if (!ret) { LOG.info("ReduceWork Operator: " + op.getName() + " could not be vectorized."); @@ -579,21 +598,6 @@ public class Vectorizer implements Physi return vContext; } - public Boolean nonVectorizableChildOfGroupBy(Operator op) { - Operator currentOp = op; - while (currentOp.getParentOperators().size() > 0) { - currentOp = currentOp.getParentOperators().get(0); - if (currentOp.getType().equals(OperatorType.GROUPBY)) { - // No need to vectorize - if (!opsDone.contains(op)) { - opsDone.add(op); - } - return true; - } - } - return false; - } - public Operator doVectorize(Operator op, VectorizationContext vContext) throws SemanticException { Operator vectorOp = op; @@ -665,9 +669,13 @@ public class Vectorizer implements Physi assert vContext != null; - // Currently, Vectorized GROUPBY outputs rows, not vectorized row batchs. So, don't vectorize - // any operators below GROUPBY. + // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't + // vectorize the operators below it. if (nonVectorizableChildOfGroupBy(op)) { + // No need to vectorize + if (!opsDone.contains(op)) { + opsDone.add(op); + } return null; } @@ -719,13 +727,22 @@ public class Vectorizer implements Physi assert vContext != null; - // Currently, Vectorized GROUPBY outputs rows, not vectorized row batchs. So, don't vectorize - // any operators below GROUPBY. + // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't + // vectorize the operators below it. if (nonVectorizableChildOfGroupBy(op)) { + // No need to vectorize + if (!opsDone.contains(op)) { + opsDone.add(op); + } return null; } Operator vectorOp = doVectorize(op, vContext); + if (vectorOp instanceof VectorGroupByOperator) { + VectorGroupByOperator groupBy = (VectorGroupByOperator) vectorOp; + VectorGroupByDesc vectorDesc = groupBy.getConf().getVectorDesc(); + vectorDesc.setVectorGroupBatches(true); + } if (saveRootVectorOp && op != vectorOp) { rootVectorOp = vectorOp; } @@ -772,7 +789,7 @@ public class Vectorizer implements Physi return pctx; } - boolean validateMapWorkOperator(Operator op) { + boolean validateMapWorkOperator(Operator op, boolean isTez) { boolean ret = false; switch (op.getType()) { case MAPJOIN: @@ -783,7 +800,7 @@ public class Vectorizer implements Physi } break; case GROUPBY: - ret = validateGroupByOperator((GroupByOperator) op); + ret = validateGroupByOperator((GroupByOperator) op, false, isTez); break; case FILTER: ret = validateFilterOperator((FilterOperator) op); @@ -814,6 +831,17 @@ public class Vectorizer implements Physi case EXTRACT: ret = validateExtractOperator((ExtractOperator) op); break; + case MAPJOIN: + // Does MAPJOIN actually get planned in Reduce? + if (op instanceof MapJoinOperator) { + ret = validateMapJoinOperator((MapJoinOperator) op); + } else if (op instanceof SMBMapJoinOperator) { + ret = validateSMBMapJoinOperator((SMBMapJoinOperator) op); + } + break; + case GROUPBY: + ret = validateGroupByOperator((GroupByOperator) op, true, true); + break; case FILTER: ret = validateFilterOperator((FilterOperator) op); break; @@ -836,6 +864,23 @@ public class Vectorizer implements Physi return ret; } + public Boolean nonVectorizableChildOfGroupBy(Operator op) { + Operator currentOp = op; + while (currentOp.getParentOperators().size() > 0) { + currentOp = currentOp.getParentOperators().get(0); + if (currentOp.getType().equals(OperatorType.GROUPBY)) { + GroupByDesc desc = (GroupByDesc)currentOp.getConf(); + boolean isVectorOutput = desc.getVectorDesc().isVectorOutput(); + if (isVectorOutput) { + // This GROUP BY does vectorize its output. + return false; + } + return true; + } + } + return false; + } + private boolean validateSMBMapJoinOperator(SMBMapJoinOperator op) { SMBJoinDesc desc = op.getConf(); // Validation is the same as for map join, since the 'small' tables are not vectorized @@ -886,16 +931,57 @@ public class Vectorizer implements Physi return validateExprNodeDesc(desc, VectorExpressionDescriptor.Mode.FILTER); } - private boolean validateGroupByOperator(GroupByOperator op) { - if (op.getConf().isGroupingSetsPresent()) { - LOG.warn("Grouping sets not supported in vector mode"); + private boolean validateGroupByOperator(GroupByOperator op, boolean isReduce, boolean isTez) { + GroupByDesc desc = op.getConf(); + VectorGroupByDesc vectorDesc = desc.getVectorDesc(); + + if (desc.isGroupingSetsPresent()) { + LOG.info("Grouping sets not supported in vector mode"); return false; } - boolean ret = validateExprNodeDesc(op.getConf().getKeys()); + boolean ret = validateExprNodeDesc(desc.getKeys()); if (!ret) { return false; } - return validateAggregationDesc(op.getConf().getAggregators()); + ret = validateAggregationDesc(desc.getAggregators(), isReduce); + if (!ret) { + return false; + } + boolean isVectorOutput = isTez && aggregatorsOutputIsPrimitive(desc.getAggregators(), isReduce); + vectorDesc.setVectorOutput(isVectorOutput); + if (isReduce) { + if (desc.isDistinct()) { + LOG.info("Distinct not supported in reduce vector mode"); + return false; + } + // Sort-based GroupBy? + if (desc.getMode() != GroupByDesc.Mode.COMPLETE && + desc.getMode() != GroupByDesc.Mode.PARTIAL1 && + desc.getMode() != GroupByDesc.Mode.PARTIAL2 && + desc.getMode() != GroupByDesc.Mode.MERGEPARTIAL) { + LOG.info("Reduce vector mode not supported when input for GROUP BY not sorted"); + return false; + } + LOG.info("Reduce GROUP BY mode is " + desc.getMode().name()); + if (desc.getGroupKeyNotReductionKey()) { + LOG.info("Reduce vector mode not supported when group key is not reduction key"); + return false; + } + if (!isVectorOutput) { + LOG.info("Reduce vector mode only supported when aggregate outputs are primitive types"); + return false; + } + if (desc.getKeys().size() > 0) { + LOG.info("Reduce-side GROUP BY will process key groups"); + vectorDesc.setVectorGroupBatches(true); + } else { + LOG.info("Reduce-side GROUP BY will do global aggregation"); + } + vectorDesc.setIsReduce(true); + } else { + LOG.info("Downstream operators of map-side GROUP BY will be vectorized: " + isVectorOutput); + } + return true; } private boolean validateExtractOperator(ExtractOperator op) { @@ -930,9 +1016,9 @@ public class Vectorizer implements Physi return true; } - private boolean validateAggregationDesc(List descs) { + private boolean validateAggregationDesc(List descs, boolean isReduce) { for (AggregationDesc d : descs) { - boolean ret = validateAggregationDesc(d); + boolean ret = validateAggregationDesc(d, isReduce); if (!ret) { return false; } @@ -952,9 +1038,7 @@ public class Vectorizer implements Physi String typeName = desc.getTypeInfo().getTypeName(); boolean ret = validateDataType(typeName); if (!ret) { - if (LOG.isDebugEnabled()) { - LOG.debug("Cannot vectorize " + desc.toString() + " of type " + typeName); - } + LOG.info("Cannot vectorize " + desc.toString() + " of type " + typeName); return false; } if (desc instanceof ExprNodeGenericFuncDesc) { @@ -987,12 +1071,11 @@ public class Vectorizer implements Physi VectorizationContext vc = new ValidatorVectorizationContext(); if (vc.getVectorExpression(desc, mode) == null) { // TODO: this cannot happen - VectorizationContext throws in such cases. + LOG.info("getVectorExpression returned null"); return false; } } catch (Exception e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Failed to vectorize", e); - } + LOG.info("Failed to vectorize", e); return false; } return true; @@ -1011,16 +1094,56 @@ public class Vectorizer implements Physi } } - private boolean validateAggregationDesc(AggregationDesc aggDesc) { + private boolean validateAggregationDesc(AggregationDesc aggDesc, boolean isReduce) { if (!supportedAggregationUdfs.contains(aggDesc.getGenericUDAFName().toLowerCase())) { return false; } if (aggDesc.getParameters() != null) { return validateExprNodeDesc(aggDesc.getParameters()); } + // See if we can vectorize the aggregation. + try { + VectorizationContext vc = new ValidatorVectorizationContext(); + if (vc.getAggregatorExpression(aggDesc, isReduce) == null) { + // TODO: this cannot happen - VectorizationContext throws in such cases. + LOG.info("getAggregatorExpression returned null"); + return false; + } + } catch (Exception e) { + LOG.info("Failed to vectorize", e); + return false; + } + return true; + } + + private boolean aggregatorsOutputIsPrimitive(List descs, boolean isReduce) { + for (AggregationDesc d : descs) { + boolean ret = aggregatorsOutputIsPrimitive(d, isReduce); + if (!ret) { + return false; + } + } return true; } + private boolean aggregatorsOutputIsPrimitive(AggregationDesc aggDesc, boolean isReduce) { + VectorizationContext vc = new ValidatorVectorizationContext(); + VectorAggregateExpression vectorAggrExpr; + try { + vectorAggrExpr = vc.getAggregatorExpression(aggDesc, isReduce); + } catch (Exception e) { + // We should have already attempted to vectorize in validateAggregationDesc. + LOG.info("Vectorization of aggreation should have succeeded ", e); + return false; + } + + ObjectInspector outputObjInspector = vectorAggrExpr.getOutputObjectInspector(); + if (outputObjInspector.getCategory() == ObjectInspector.Category.PRIMITIVE) { + return true; + } + return false; + } + private boolean validateDataType(String type) { return supportedDataTypesPattern.matcher(type.toLowerCase()).matches(); } Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractVectorDesc.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractVectorDesc.java?rev=1623929&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractVectorDesc.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractVectorDesc.java Wed Sep 10 07:41:19 2014 @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +public class AbstractVectorDesc implements VectorDesc { + + @Override + public Object clone() throws CloneNotSupportedException { + throw new CloneNotSupportedException("clone not supported"); + } +} Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java?rev=1623929&r1=1623928&r2=1623929&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java Wed Sep 10 07:41:19 2014 @@ -69,7 +69,11 @@ public class GroupByDesc extends Abstrac transient private boolean isDistinct; private boolean dontResetAggrsDistinct; + // Extra parameters only for vectorization. + private VectorGroupByDesc vectorDesc; + public GroupByDesc() { + vectorDesc = new VectorGroupByDesc(); } public GroupByDesc( @@ -102,6 +106,7 @@ public class GroupByDesc extends Abstrac final boolean groupingSetsPresent, final int groupingSetsPosition, final boolean isDistinct) { + vectorDesc = new VectorGroupByDesc(); this.mode = mode; this.outputColumnNames = outputColumnNames; this.keys = keys; @@ -116,6 +121,14 @@ public class GroupByDesc extends Abstrac this.isDistinct = isDistinct; } + public void setVectorDesc(VectorGroupByDesc vectorDesc) { + this.vectorDesc = vectorDesc; + } + + public VectorGroupByDesc getVectorDesc() { + return vectorDesc; + } + public Mode getMode() { return mode; } @@ -268,6 +281,14 @@ public class GroupByDesc extends Abstrac this.groupingSetPosition = groupingSetPosition; } + public boolean isDontResetAggrsDistinct() { + return dontResetAggrsDistinct; + } + + public void setDontResetAggrsDistinct(boolean dontResetAggrsDistinct) { + this.dontResetAggrsDistinct = dontResetAggrsDistinct; + } + public boolean isDistinct() { return isDistinct; } @@ -276,11 +297,4 @@ public class GroupByDesc extends Abstrac this.isDistinct = isDistinct; } - public boolean isDontResetAggrsDistinct() { - return dontResetAggrsDistinct; - } - - public void setDontResetAggrsDistinct(boolean dontResetAggrsDistinct) { - this.dontResetAggrsDistinct = dontResetAggrsDistinct; - } } Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorDesc.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorDesc.java?rev=1623929&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorDesc.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorDesc.java Wed Sep 10 07:41:19 2014 @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; + +public interface VectorDesc extends Serializable, Cloneable { + public Object clone() throws CloneNotSupportedException; +} Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorGroupByDesc.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorGroupByDesc.java?rev=1623929&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorGroupByDesc.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorGroupByDesc.java Wed Sep 10 07:41:19 2014 @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +/** + * VectorGroupByDesc. + * + * Extra parameters beyond GroupByDesc just for the VectorGroupByOperator. + * + * We don't extend GroupByDesc because the base OperatorDesc doesn't support + * clone and adding it is a lot work for little gain. + */ +public class VectorGroupByDesc extends AbstractVectorDesc { + + private static long serialVersionUID = 1L; + + private boolean isReduce; + private boolean isVectorGroupBatches; + private boolean isVectorOutput; + + public VectorGroupByDesc() { + this.isReduce = false; + this.isVectorGroupBatches = false; + this.isVectorOutput = false; + } + + public boolean isReduce() { + return isReduce; + } + + public void setIsReduce(boolean isReduce) { + this.isReduce = isReduce; + } + + public boolean isVectorGroupBatches() { + return isVectorGroupBatches; + } + + public void setVectorGroupBatches(boolean isVectorGroupBatches) { + this.isVectorGroupBatches = isVectorGroupBatches; + } + + public boolean isVectorOutput() { + return isVectorOutput; + } + + public void setVectorOutput(boolean isVectorOutput) { + this.isVectorOutput = isVectorOutput; + } +} Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java?rev=1623929&r1=1623928&r2=1623929&view=diff ============================================================================== --- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java (original) +++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java Wed Sep 10 07:41:19 2014 @@ -107,7 +107,7 @@ public class TestVectorizer { gbyOp.setConf(desc); Vectorizer v = new Vectorizer(); - Assert.assertTrue(v.validateMapWorkOperator(gbyOp)); + Assert.assertTrue(v.validateMapWorkOperator(gbyOp, false)); VectorGroupByOperator vectorOp = (VectorGroupByOperator) v.vectorizeOperator(gbyOp, vContext); Assert.assertEquals(VectorUDAFSumLong.class, vectorOp.getAggregators()[0].getClass()); VectorUDAFSumLong udaf = (VectorUDAFSumLong) vectorOp.getAggregators()[0]; @@ -150,7 +150,7 @@ public class TestVectorizer { /** * prepareAbstractMapJoin prepares a join operator descriptor, used as helper by SMB and Map join tests. */ - private void prepareAbstractMapJoin(AbstractMapJoinOperator mop, MapJoinDesc mjdesc) { + private void prepareAbstractMapJoin(AbstractMapJoinOperator map, MapJoinDesc mjdesc) { mjdesc.setPosBigTable(0); List expr = new ArrayList(); expr.add(new ExprNodeColumnDesc(Integer.class, "col1", "T", false)); @@ -180,14 +180,14 @@ public class TestVectorizer { */ @Test public void testValidateMapJoinOperator() { - MapJoinOperator mop = new MapJoinOperator(); + MapJoinOperator map = new MapJoinOperator(); MapJoinDesc mjdesc = new MapJoinDesc(); - prepareAbstractMapJoin(mop, mjdesc); - mop.setConf(mjdesc); + prepareAbstractMapJoin(map, mjdesc); + map.setConf(mjdesc); Vectorizer vectorizer = new Vectorizer(); - Assert.assertTrue(vectorizer.validateMapWorkOperator(mop)); + Assert.assertTrue(vectorizer.validateMapWorkOperator(map, false)); } @@ -196,13 +196,13 @@ public class TestVectorizer { */ @Test public void testValidateSMBJoinOperator() { - SMBMapJoinOperator mop = new SMBMapJoinOperator(); + SMBMapJoinOperator map = new SMBMapJoinOperator(); SMBJoinDesc mjdesc = new SMBJoinDesc(); - prepareAbstractMapJoin(mop, mjdesc); - mop.setConf(mjdesc); + prepareAbstractMapJoin(map, mjdesc); + map.setConf(mjdesc); Vectorizer vectorizer = new Vectorizer(); - Assert.assertTrue(vectorizer.validateMapWorkOperator(mop)); + Assert.assertTrue(vectorizer.validateMapWorkOperator(map, false)); } } Modified: hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q?rev=1623929&r1=1623928&r2=1623929&view=diff ============================================================================== --- hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q (original) +++ hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q Wed Sep 10 07:41:19 2014 @@ -150,7 +150,9 @@ insert overwrite table over1k_part_buck_ desc formatted over1k_part_buck_sort2_orc partition(t=27); desc formatted over1k_part_buck_sort2_orc partition(t="__HIVE_DEFAULT_PARTITION__"); +explain select * from over1k_part_buck_sort2_orc; select * from over1k_part_buck_sort2_orc; +explain select count(*) from over1k_part_buck_sort2_orc; select count(*) from over1k_part_buck_sort2_orc; set hive.optimize.sort.dynamic.partition=true; @@ -159,5 +161,7 @@ insert overwrite table over1k_part_buck_ desc formatted over1k_part_buck_sort2_orc partition(t=27); desc formatted over1k_part_buck_sort2_orc partition(t="__HIVE_DEFAULT_PARTITION__"); +explain select * from over1k_part_buck_sort2_orc; select * from over1k_part_buck_sort2_orc; +explain select count(*) from over1k_part_buck_sort2_orc; select count(*) from over1k_part_buck_sort2_orc;