Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 45D12174D2 for ; Fri, 7 Nov 2014 20:43:05 +0000 (UTC) Received: (qmail 77097 invoked by uid 500); 7 Nov 2014 20:43:05 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 77057 invoked by uid 500); 7 Nov 2014 20:43:05 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 77046 invoked by uid 99); 7 Nov 2014 20:43:05 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Nov 2014 20:43:05 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Nov 2014 20:43:00 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id A3FF82388B71; Fri, 7 Nov 2014 20:42:03 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1637444 [6/20] - in /hive/branches/spark: ./ cli/src/test/org/apache/hadoop/hive/cli/ common/ common/src/java/org/apache/hadoop/hive/common/type/ common/src/java/org/apache/hadoop/hive/conf/ common/src/test/org/apache/hadoop/hive/conf/ com... Date: Fri, 07 Nov 2014 20:41:45 -0000 To: commits@hive.apache.org From: brock@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20141107204203.A3FF82388B71@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java Fri Nov 7 20:41:34 2014 @@ -21,9 +21,9 @@ package org.apache.hadoop.hive.ql.exec.v import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.hive.common.type.Decimal128; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.vector.expressions.DecimalUtil; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; @@ -41,7 +41,6 @@ import org.apache.hadoop.hive.serde2.obj import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; -import org.apache.hive.common.util.Decimal128FastBuffer; /** * Generated from template VectorUDAFAvg.txt. @@ -57,24 +56,45 @@ public class VectorUDAFAvgDecimal extend private static final long serialVersionUID = 1L; - transient private final Decimal128 sum = new Decimal128(); + transient private final HiveDecimalWritable sum = new HiveDecimalWritable(); transient private long count; transient private boolean isNull; - public void sumValueWithCheck(Decimal128 value, short scale) { + // We use this to catch overflow. + transient private boolean isOutOfRange; + + public void sumValueWithNullCheck(HiveDecimalWritable writable, short scale) { + if (isOutOfRange) { + return; + } + HiveDecimal value = writable.getHiveDecimal(); if (isNull) { - sum.update(value); - sum.changeScaleDestructive(scale); + sum.set(value); count = 1; isNull = false; } else { - sum.addDestructive(value, scale); + HiveDecimal result; + try { + result = sum.getHiveDecimal().add(value); + } catch (ArithmeticException e) { // catch on overflow + isOutOfRange = true; + return; + } + sum.set(result); count++; } } - public void sumValueNoCheck(Decimal128 value, short scale) { - sum.addDestructive(value, scale); + public void sumValueNoNullCheck(HiveDecimalWritable writable, short scale) { + HiveDecimal value = writable.getHiveDecimal(); + HiveDecimal result; + try { + result = sum.getHiveDecimal().add(value); + } catch (ArithmeticException e) { // catch on overflow + isOutOfRange = true; + return; + } + sum.set(result); count++; } @@ -87,7 +107,8 @@ public class VectorUDAFAvgDecimal extend @Override public void reset() { isNull = true; - sum.zeroClear(); + isOutOfRange = false; + sum.set(HiveDecimal.ZERO); count = 0L; } } @@ -98,8 +119,6 @@ public class VectorUDAFAvgDecimal extend transient private HiveDecimalWritable resultSum; transient private StructObjectInspector soi; - transient private final Decimal128FastBuffer scratch; - /** * The scale of the SUM in the partial output */ @@ -120,12 +139,6 @@ public class VectorUDAFAvgDecimal extend */ private short inputPrecision; - /** - * A value used as scratch to avoid allocating at runtime. - * Needed by computations like vector[0] * batchSize - */ - transient private Decimal128 scratchDecimal = new Decimal128(); - public VectorUDAFAvgDecimal(VectorExpression inputExpression) { this(); this.inputExpression = inputExpression; @@ -138,7 +151,6 @@ public class VectorUDAFAvgDecimal extend resultSum = new HiveDecimalWritable(); partialResult[0] = resultCount; partialResult[1] = resultSum; - scratch = new Decimal128FastBuffer(); } @@ -185,7 +197,7 @@ public class VectorUDAFAvgDecimal extend DecimalColumnVector inputVector = ( DecimalColumnVector)batch. cols[this.inputExpression.getOutputColumn()]; - Decimal128[] vector = inputVector.vector; + HiveDecimalWritable[] vector = inputVector.vector; if (inputVector.noNulls) { if (inputVector.isRepeating) { @@ -231,7 +243,7 @@ public class VectorUDAFAvgDecimal extend private void iterateNoNullsRepeatingWithAggregationSelection( VectorAggregationBufferRow[] aggregationBufferSets, int bufferIndex, - Decimal128 value, + HiveDecimalWritable value, int batchSize) { for (int i=0; i < batchSize; ++i) { @@ -239,14 +251,14 @@ public class VectorUDAFAvgDecimal extend aggregationBufferSets, bufferIndex, i); - myagg.sumValueWithCheck(value, this.sumScale); + myagg.sumValueWithNullCheck(value, this.sumScale); } } private void iterateNoNullsSelectionWithAggregationSelection( VectorAggregationBufferRow[] aggregationBufferSets, int bufferIndex, - Decimal128[] values, + HiveDecimalWritable[] values, int[] selection, int batchSize) { @@ -255,28 +267,28 @@ public class VectorUDAFAvgDecimal extend aggregationBufferSets, bufferIndex, i); - myagg.sumValueWithCheck(values[selection[i]], this.sumScale); + myagg.sumValueWithNullCheck(values[selection[i]], this.sumScale); } } private void iterateNoNullsWithAggregationSelection( VectorAggregationBufferRow[] aggregationBufferSets, int bufferIndex, - Decimal128[] values, + HiveDecimalWritable[] values, int batchSize) { for (int i=0; i < batchSize; ++i) { Aggregation myagg = getCurrentAggregationBuffer( aggregationBufferSets, bufferIndex, i); - myagg.sumValueWithCheck(values[i], this.sumScale); + myagg.sumValueWithNullCheck(values[i], this.sumScale); } } private void iterateHasNullsRepeatingSelectionWithAggregationSelection( VectorAggregationBufferRow[] aggregationBufferSets, int bufferIndex, - Decimal128 value, + HiveDecimalWritable value, int batchSize, int[] selection, boolean[] isNull) { @@ -287,7 +299,7 @@ public class VectorUDAFAvgDecimal extend aggregationBufferSets, bufferIndex, i); - myagg.sumValueWithCheck(value, this.sumScale); + myagg.sumValueWithNullCheck(value, this.sumScale); } } @@ -296,7 +308,7 @@ public class VectorUDAFAvgDecimal extend private void iterateHasNullsRepeatingWithAggregationSelection( VectorAggregationBufferRow[] aggregationBufferSets, int bufferIndex, - Decimal128 value, + HiveDecimalWritable value, int batchSize, boolean[] isNull) { @@ -306,7 +318,7 @@ public class VectorUDAFAvgDecimal extend aggregationBufferSets, bufferIndex, i); - myagg.sumValueWithCheck(value, this.sumScale); + myagg.sumValueWithNullCheck(value, this.sumScale); } } } @@ -314,7 +326,7 @@ public class VectorUDAFAvgDecimal extend private void iterateHasNullsSelectionWithAggregationSelection( VectorAggregationBufferRow[] aggregationBufferSets, int bufferIndex, - Decimal128[] values, + HiveDecimalWritable[] values, int batchSize, int[] selection, boolean[] isNull) { @@ -326,7 +338,7 @@ public class VectorUDAFAvgDecimal extend aggregationBufferSets, bufferIndex, j); - myagg.sumValueWithCheck(values[i], this.sumScale); + myagg.sumValueWithNullCheck(values[i], this.sumScale); } } } @@ -334,7 +346,7 @@ public class VectorUDAFAvgDecimal extend private void iterateHasNullsWithAggregationSelection( VectorAggregationBufferRow[] aggregationBufferSets, int bufferIndex, - Decimal128[] values, + HiveDecimalWritable[] values, int batchSize, boolean[] isNull) { @@ -344,7 +356,7 @@ public class VectorUDAFAvgDecimal extend aggregationBufferSets, bufferIndex, i); - myagg.sumValueWithCheck(values[i], this.sumScale); + myagg.sumValueWithNullCheck(values[i], this.sumScale); } } } @@ -367,18 +379,31 @@ public class VectorUDAFAvgDecimal extend Aggregation myagg = (Aggregation)agg; - Decimal128[] vector = inputVector.vector; + HiveDecimalWritable[] vector = inputVector.vector; if (inputVector.isRepeating) { if (inputVector.noNulls) { if (myagg.isNull) { myagg.isNull = false; - myagg.sum.zeroClear(); + myagg.sum.set(HiveDecimal.ZERO); myagg.count = 0; } - scratchDecimal.update(batchSize); - scratchDecimal.multiplyDestructive(vector[0], vector[0].getScale()); - myagg.sum.update(scratchDecimal); + HiveDecimal value = vector[0].getHiveDecimal(); + HiveDecimal multiple; + try { + multiple = value.multiply(HiveDecimal.create(batchSize)); + } catch (ArithmeticException e) { // catch on overflow + myagg.isOutOfRange = true; + return; + } + HiveDecimal result; + try { + result = myagg.sum.getHiveDecimal().add(multiple); + } catch (ArithmeticException e) { // catch on overflow + myagg.isOutOfRange = true; + return; + } + myagg.sum.set(result); myagg.count += batchSize; } return; @@ -400,7 +425,7 @@ public class VectorUDAFAvgDecimal extend private void iterateSelectionHasNulls( Aggregation myagg, - Decimal128[] vector, + HiveDecimalWritable[] vector, int batchSize, boolean[] isNull, int[] selected) { @@ -408,57 +433,57 @@ public class VectorUDAFAvgDecimal extend for (int j=0; j< batchSize; ++j) { int i = selected[j]; if (!isNull[i]) { - Decimal128 value = vector[i]; - myagg.sumValueWithCheck(value, this.sumScale); + HiveDecimalWritable value = vector[i]; + myagg.sumValueWithNullCheck(value, this.sumScale); } } } private void iterateSelectionNoNulls( Aggregation myagg, - Decimal128[] vector, + HiveDecimalWritable[] vector, int batchSize, int[] selected) { if (myagg.isNull) { myagg.isNull = false; - myagg.sum.zeroClear(); + myagg.sum.set(HiveDecimal.ZERO); myagg.count = 0; } for (int i=0; i< batchSize; ++i) { - Decimal128 value = vector[selected[i]]; - myagg.sumValueNoCheck(value, this.sumScale); + HiveDecimalWritable value = vector[selected[i]]; + myagg.sumValueNoNullCheck(value, this.sumScale); } } private void iterateNoSelectionHasNulls( Aggregation myagg, - Decimal128[] vector, + HiveDecimalWritable[] vector, int batchSize, boolean[] isNull) { for(int i=0;i tRealOutputFormat = new ThreadLocal() { + @Override + protected String initialValue() { + return null; + } + }; @SuppressWarnings("unchecked") private static Map, Class> @@ -105,11 +110,9 @@ public final class HiveFileFormatUtils { } Class result = outputFormatSubstituteMap .get(origin); - //register this output format into the map for the first time - if ((storagehandlerflag == true) && (result == null)) { + if ((storagehandlerflag == true) && (result == null || result == HivePassThroughOutputFormat.class)) { HiveFileFormatUtils.setRealOutputFormatClassName(origin.getName()); result = HivePassThroughOutputFormat.class; - HiveFileFormatUtils.registerOutputFormatSubstitute((Class) origin,HivePassThroughOutputFormat.class); } return result; } @@ -120,7 +123,7 @@ public final class HiveFileFormatUtils { @SuppressWarnings("unchecked") public static String getRealOutputFormatClassName() { - return realoutputFormat; + return tRealOutputFormat.get(); } /** @@ -129,7 +132,7 @@ public final class HiveFileFormatUtils { public static void setRealOutputFormatClassName( String destination) { if (destination != null){ - realoutputFormat = destination; + tRealOutputFormat.set(destination); } else { return; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java Fri Nov 7 20:41:34 2014 @@ -30,14 +30,17 @@ import org.apache.avro.file.CodecFactory import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable; import org.apache.hadoop.hive.serde2.avro.AvroSerdeException; import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reporter; @@ -47,7 +50,9 @@ import org.apache.hadoop.util.Progressab * Write to an Avro file from a Hive process. */ public class AvroContainerOutputFormat - implements HiveOutputFormat { + implements HiveOutputFormat { + + public static final Log LOG = LogFactory.getLog(AvroContainerOutputFormat.class); @Override public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, @@ -75,21 +80,62 @@ public class AvroContainerOutputFormat return new AvroGenericRecordWriter(dfw); } - //no records will be emitted from Hive - @Override - public RecordWriter - getRecordWriter(FileSystem ignored, JobConf job, String name, - Progressable progress) { - return new RecordWriter() { - @Override - public void write(LongWritable key, AvroGenericRecordWritable value) { - throw new RuntimeException("Should not be called"); - } + class WrapperRecordWriter implements RecordWriter { + FileSinkOperator.RecordWriter hiveWriter = null; + JobConf jobConf; + Progressable progressable; + String fileName; + + public WrapperRecordWriter(JobConf jobConf, Progressable progressable, String fileName){ + this.progressable = progressable; + this.jobConf = jobConf; + this.fileName = fileName; + } + + private FileSinkOperator.RecordWriter getHiveWriter() throws IOException { + if (this.hiveWriter == null){ + Properties properties = new Properties(); + for (AvroSerdeUtils.AvroTableProperties tableProperty : AvroSerdeUtils.AvroTableProperties.values()){ + String propVal; + if((propVal = jobConf.get(tableProperty.getPropName())) != null){ + properties.put(tableProperty.getPropName(),propVal); + } + } + + Boolean isCompressed = jobConf.getBoolean("mapreduce.output.fileoutputformat.compress", false); + Path path = new Path(this.fileName); + if(path.getFileSystem(jobConf).isDirectory(path)){ + // This path is only potentially encountered during setup + // Otherwise, a specific part_xxxx file name is generated and passed in. + path = new Path(path,"_dummy"); + } - @Override - public void close(Reporter reporter) { + this.hiveWriter = getHiveRecordWriter(jobConf,path,null,isCompressed, properties, progressable); } - }; + return this.hiveWriter; + } + + @Override + public void write(K key, V value) throws IOException { + getHiveWriter().write(value); + } + + @Override + public void close(Reporter reporter) throws IOException { + // Normally, I'd worry about the blanket false being passed in here, and that + // it'd need to be integrated into an abort call for an OutputCommitter, but the + // underlying recordwriter ignores it and throws it away, so it's irrelevant. + getHiveWriter().close(false); + } + + } + + //no records will be emitted from Hive + @Override + public RecordWriter + getRecordWriter(FileSystem ignored, JobConf job, String fileName, + Progressable progress) throws IOException { + return new WrapperRecordWriter(job,progress,fileName); } @Override Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Fri Nov 7 20:41:34 2014 @@ -1259,12 +1259,9 @@ class RecordReaderImpl implements Record if (!result.isNull[0]) { BigInteger bInt = SerializationUtils.readBigInteger(valueStream); short scaleInData = (short) scaleStream.next(); - result.vector[0].update(bInt, scaleInData); - - // Change the scale to match the schema if the scale in data is different. - if (scale != scaleInData) { - result.vector[0].changeScaleDestructive((short) scale); - } + HiveDecimal dec = HiveDecimal.create(bInt, scaleInData); + dec = HiveDecimalUtils.enforcePrecisionScale(dec, precision, scale); + result.set(0, dec); } } else { // result vector has isNull values set, use the same to read scale vector. @@ -1273,13 +1270,10 @@ class RecordReaderImpl implements Record for (int i = 0; i < batchSize; i++) { if (!result.isNull[i]) { BigInteger bInt = SerializationUtils.readBigInteger(valueStream); - result.vector[i].update(bInt, (short) scratchScaleVector.vector[i]); - - // Change the scale to match the schema if the scale is less than in data. - // (HIVE-7373) If scale is bigger, then it leaves the original trailing zeros - if (scale < scratchScaleVector.vector[i]) { - result.vector[i].changeScaleDestructive((short) scale); - } + short scaleInData = (short) scratchScaleVector.vector[i]; + HiveDecimal dec = HiveDecimal.create(bInt, scaleInData); + dec = HiveDecimalUtils.enforcePrecisionScale(dec, precision, scale); + result.set(i, dec); } } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java Fri Nov 7 20:41:34 2014 @@ -18,14 +18,14 @@ package org.apache.hadoop.hive.ql.log; -import java.util.HashMap; -import java.util.Map; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.session.SessionState; +import java.util.HashMap; +import java.util.Map; + /** * PerfLogger. * @@ -147,10 +147,37 @@ public class PerfLogger { } public Long getStartTime(String method) { - return startTimes.get(method); + long startTime = 0L; + + if (startTimes.containsKey(method)) { + startTime = startTimes.get(method); + } + return startTime; } public Long getEndTime(String method) { - return endTimes.get(method); + long endTime = 0L; + + if (endTimes.containsKey(method)) { + endTime = endTimes.get(method); + } + return endTime; } + + public boolean startTimeHasMethod(String method) { + return startTimes.containsKey(method); + } + + public boolean endTimeHasMethod(String method) { + return endTimes.containsKey(method); + } + + public Long getDuration(String method) { + long duration = 0; + if (startTimes.containsKey(method) && endTimes.containsKey(method)) { + duration = endTimes.get(method) - startTimes.get(method); + } + return duration; + } + } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Fri Nov 7 20:41:34 2014 @@ -18,16 +18,6 @@ package org.apache.hadoop.hive.ql.optimizer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Stack; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; @@ -39,7 +29,6 @@ import org.apache.hadoop.hive.ql.exec.Jo import org.apache.hadoop.hive.ql.exec.LateralViewForwardOperator; import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator; import org.apache.hadoop.hive.ql.exec.LimitOperator; -import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.PTFOperator; @@ -76,6 +65,16 @@ import org.apache.hadoop.hive.ql.plan.pt import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; + /** * Factory for generating the different node processors used by ColumnPruner. */ @@ -600,8 +599,7 @@ public final class ColumnPrunerProcFacto // revert output cols of SEL(*) to ExprNodeColumnDesc String[] tabcol = rr.reverseLookup(col); ColumnInfo colInfo = rr.get(tabcol[0], tabcol[1]); - ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(colInfo.getType(), - colInfo.getInternalName(), colInfo.getTabAlias(), colInfo.getIsVirtualCol()); + ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(colInfo); colList.add(colExpr); outputColNames.add(col); } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java Fri Nov 7 20:41:34 2014 @@ -31,11 +31,13 @@ import org.apache.hadoop.hive.ql.exec.Ut import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.optimizer.stats.annotation.StatsRulesProcFactory; import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc.ExprNodeDescEqualityWrapper; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.stats.StatsUtils; import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.AUTOPARALLEL; import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.UNIFORM; @@ -82,7 +84,8 @@ public class SetReducerParallelism imple for (Operator sibling: sink.getChildOperators().get(0).getParentOperators()) { if (sibling.getStatistics() != null) { - numberOfBytes += sibling.getStatistics().getDataSize(); + numberOfBytes = StatsUtils.safeAdd( + numberOfBytes, sibling.getStatistics().getDataSize()); } else { LOG.warn("No stats available from: "+sibling); } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java Fri Nov 7 20:41:34 2014 @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.lib.Rul import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -164,12 +165,23 @@ public class SkewJoinOptimizer implement return null; } + // have to create a QBJoinTree for the cloned join operator + QBJoinTree originJoinTree = parseContext.getJoinContext().get(joinOp); + QBJoinTree newJoinTree; + try { + newJoinTree = originJoinTree.clone(); + } catch (CloneNotSupportedException e) { + LOG.debug("QBJoinTree could not be cloned: ", e); + return null; + } + JoinOperator joinOpClone; if (processSelect) { joinOpClone = (JoinOperator)(currOpClone.getParentOperators().get(0)); } else { joinOpClone = (JoinOperator)currOpClone; } + parseContext.getJoinContext().put(joinOpClone, newJoinTree); List tableScanCloneOpsForJoin = new ArrayList(); @@ -201,6 +213,7 @@ public class SkewJoinOptimizer implement } parseContext.getTopOps().put(newAlias, tso); + setUpAlias(originJoinTree, newJoinTree, tabAlias, newAlias, tso); } // Now do a union of the select operators: selectOp and selectOpClone @@ -610,6 +623,48 @@ public class SkewJoinOptimizer implement } } } + + /** + * Set alias in the cloned join tree + */ + private static void setUpAlias(QBJoinTree origin, QBJoinTree cloned, String origAlias, + String newAlias, Operator topOp) { + cloned.getAliasToOpInfo().remove(origAlias); + cloned.getAliasToOpInfo().put(newAlias, topOp); + if (origin.getLeftAlias().equals(origAlias)) { + cloned.setLeftAlias(null); + cloned.setLeftAlias(newAlias); + } + replaceAlias(origin.getLeftAliases(), cloned.getLeftAliases(), origAlias, newAlias); + replaceAlias(origin.getRightAliases(), cloned.getRightAliases(), origAlias, newAlias); + replaceAlias(origin.getBaseSrc(), cloned.getBaseSrc(), origAlias, newAlias); + replaceAlias(origin.getMapAliases(), cloned.getMapAliases(), origAlias, newAlias); + replaceAlias(origin.getStreamAliases(), cloned.getStreamAliases(), origAlias, newAlias); + } + + private static void replaceAlias(String[] origin, String[] cloned, + String alias, String newAlias) { + if (origin == null || cloned == null || origin.length != cloned.length) { + return; + } + for (int i = 0; i < origin.length; i++) { + if (origin[i].equals(alias)) { + cloned[i] = newAlias; + } + } + } + + private static void replaceAlias(List origin, List cloned, + String alias, String newAlias) { + if (origin == null || cloned == null || origin.size() != cloned.size()) { + return; + } + for (int i = 0; i < origin.size(); i++) { + if (origin.get(i).equals(alias)) { + cloned.set(i, newAlias); + } + } + } } /* (non-Javadoc) Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java Fri Nov 7 20:41:34 2014 @@ -18,14 +18,8 @@ package org.apache.hadoop.hive.ql.optimizer; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Stack; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -72,8 +66,14 @@ import org.apache.hadoop.hive.ql.plan.Re import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; /** * When dynamic partitioning (with or without bucketing and sorting) is enabled, this optimization @@ -157,7 +157,11 @@ public class SortedDynPartitionOptimizer // the reduce sink key. Since both key columns are not prefix subset // ReduceSinkDeDuplication will not merge them together resulting in 2 MR jobs. // To avoid that we will remove the RS (and EX) inserted by enforce bucketing/sorting. - removeRSInsertedByEnforceBucketing(fsOp); + if (!removeRSInsertedByEnforceBucketing(fsOp)) { + LOG.debug("Bailing out of sort dynamic partition optimization as some partition columns " + + "got constant folded."); + return null; + } // unlink connection between FS and its parent Operator fsParent = fsOp.getParentOperators().get(0); @@ -209,8 +213,7 @@ public class SortedDynPartitionOptimizer ArrayList newValueCols = Lists.newArrayList(); Map colExprMap = Maps.newHashMap(); for (ColumnInfo ci : valColInfo) { - newValueCols.add(new ExprNodeColumnDesc(ci.getType(), ci.getInternalName(), ci - .getTabAlias(), ci.isHiddenVirtualCol())); + newValueCols.add(new ExprNodeColumnDesc(ci)); colExprMap.put(ci.getInternalName(), newValueCols.get(newValueCols.size() - 1)); } ReduceSinkDesc rsConf = getReduceSinkDesc(partitionPositions, sortPositions, sortOrder, @@ -263,7 +266,7 @@ public class SortedDynPartitionOptimizer // Remove RS and EX introduced by enforce bucketing/sorting config // Convert PARENT -> RS -> EX -> FS to PARENT -> FS - private void removeRSInsertedByEnforceBucketing(FileSinkOperator fsOp) { + private boolean removeRSInsertedByEnforceBucketing(FileSinkOperator fsOp) { HiveConf hconf = parseCtx.getConf(); boolean enforceBucketing = HiveConf.getBoolVar(hconf, ConfVars.HIVEENFORCEBUCKETING); boolean enforceSorting = HiveConf.getBoolVar(hconf, ConfVars.HIVEENFORCESORTING); @@ -298,17 +301,27 @@ public class SortedDynPartitionOptimizer Operator rsGrandChild = rsChild.getChildOperators().get(0); if (rsChild instanceof ExtractOperator) { + // if schema size cannot be matched, then it could be because of constant folding + // converting partition column expression to constant expression. The constant + // expression will then get pruned by column pruner since it will not reference to + // any columns. + if (rsParent.getSchema().getSignature().size() != + rsChild.getSchema().getSignature().size()) { + return false; + } rsParent.getChildOperators().clear(); rsParent.getChildOperators().add(rsGrandChild); rsGrandChild.getParentOperators().clear(); rsGrandChild.getParentOperators().add(rsParent); parseCtx.removeOpParseCtx(rsToRemove); parseCtx.removeOpParseCtx(rsChild); - LOG.info("Removed " + rsParent.getOperatorId() + " and " + rsChild.getOperatorId() + LOG.info("Removed " + rsToRemove.getOperatorId() + " and " + rsChild.getOperatorId() + " as it was introduced by enforce bucketing/sorting."); } } } + + return true; } private List getPartitionPositions(DynamicPartitionCtx dpCtx, RowSchema schema) { @@ -476,8 +489,7 @@ public class SortedDynPartitionOptimizer for (Integer idx : pos) { ColumnInfo ci = colInfos.get(idx); - ExprNodeColumnDesc encd = new ExprNodeColumnDesc(ci.getType(), ci.getInternalName(), - ci.getTabAlias(), ci.isHiddenVirtualCol()); + ExprNodeColumnDesc encd = new ExprNodeColumnDesc(ci); cols.add(encd); } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/HiveOptiqUtil.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/HiveOptiqUtil.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/HiveOptiqUtil.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/HiveOptiqUtil.java Fri Nov 7 20:41:34 2014 @@ -75,7 +75,7 @@ public class HiveOptiqUtil { return vCols; } - public static boolean validateASTForCBO(ASTNode ast) { + public static boolean validateASTForUnsupportedTokens(ASTNode ast) { String astTree = ast.toStringTree(); // if any of following tokens are present in AST, bail out String[] tokens = { "TOK_CHARSETLITERAL","TOK_TABLESPLITSAMPLE" }; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/rules/PartitionPruner.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/rules/PartitionPruner.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/rules/PartitionPruner.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/rules/PartitionPruner.java Fri Nov 7 20:41:34 2014 @@ -108,7 +108,7 @@ public class PartitionPruner { boolean argsPruned = false; GenericUDF hiveUDF = SqlFunctionConverter.getHiveUDF(call.getOperator(), - call.getType()); + call.getType(), call.operands.size()); if (hiveUDF != null && !FunctionRegistry.isDeterministic(hiveUDF)) { return null; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/ExprNodeConverter.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/ExprNodeConverter.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/ExprNodeConverter.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/ExprNodeConverter.java Fri Nov 7 20:41:34 2014 @@ -89,17 +89,17 @@ public class ExprNodeConverter extends R ArrayList tmpExprArgs = new ArrayList(); tmpExprArgs.addAll(args.subList(0, 2)); gfDesc = new ExprNodeGenericFuncDesc(TypeConverter.convert(call.getType()), - SqlFunctionConverter.getHiveUDF(call.getOperator(), call.getType()), tmpExprArgs); + SqlFunctionConverter.getHiveUDF(call.getOperator(), call.getType(), 2), tmpExprArgs); for (int i = 2; i < call.operands.size(); i++) { tmpExprArgs = new ArrayList(); tmpExprArgs.add(gfDesc); tmpExprArgs.add(args.get(i)); gfDesc = new ExprNodeGenericFuncDesc(TypeConverter.convert(call.getType()), - SqlFunctionConverter.getHiveUDF(call.getOperator(), call.getType()), tmpExprArgs); + SqlFunctionConverter.getHiveUDF(call.getOperator(), call.getType(), 2), tmpExprArgs); } } else { gfDesc = new ExprNodeGenericFuncDesc(TypeConverter.convert(call.getType()), - SqlFunctionConverter.getHiveUDF(call.getOperator(), call.getType()), args); + SqlFunctionConverter.getHiveUDF(call.getOperator(), call.getType(), args.size()), args); } return gfDesc; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/JoinCondTypeCheckProcFactory.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/JoinCondTypeCheckProcFactory.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/JoinCondTypeCheckProcFactory.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/JoinCondTypeCheckProcFactory.java Fri Nov 7 20:41:34 2014 @@ -17,15 +17,6 @@ */ package org.apache.hadoop.hive.ql.optimizer.optiq.translator; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Stack; - import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.FunctionInfo; @@ -47,6 +38,15 @@ import org.apache.hadoop.hive.ql.udf.gen import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; + /** * JoinCondTypeCheckProcFactory is used by Optiq planner(CBO) to generate Join Conditions from Join Condition AST. * Reasons for sub class: @@ -99,8 +99,7 @@ public class JoinCondTypeCheckProcFactor if (!qualifiedAccess) { colInfo = getColInfo(ctx, null, tableOrCol, expr); // It's a column. - return new ExprNodeColumnDesc(colInfo.getType(), colInfo.getInternalName(), - colInfo.getTabAlias(), colInfo.getIsVirtualCol()); + return new ExprNodeColumnDesc(colInfo); } else if (hasTableAlias(ctx, tableOrCol, expr)) { return null; } else { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/SqlFunctionConverter.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/SqlFunctionConverter.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/SqlFunctionConverter.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/SqlFunctionConverter.java Fri Nov 7 20:41:34 2014 @@ -98,10 +98,19 @@ public class SqlFunctionConverter { return getOptiqFn(name, optiqArgTypes, retType); } - public static GenericUDF getHiveUDF(SqlOperator op, RelDataType dt) { + public static GenericUDF getHiveUDF(SqlOperator op, RelDataType dt, int argsLength) { String name = reverseOperatorMap.get(op); - if (name == null) + if (name == null) { name = op.getName(); + } + // Make sure we handle unary + and - correctly. + if (argsLength == 1) { + if (name == "+") { + name = FunctionRegistry.UNARY_PLUS_FUNC_NAME; + } else if (name == "-") { + name = FunctionRegistry.UNARY_MINUS_FUNC_NAME; + } + } FunctionInfo hFn = name != null ? FunctionRegistry.getFunctionInfo(name) : null; if (hFn == null) hFn = handleExplicitCast(op, dt); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/TypeConverter.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/TypeConverter.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/TypeConverter.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/TypeConverter.java Fri Nov 7 20:41:34 2014 @@ -187,7 +187,7 @@ public class TypeConverter { throw new RuntimeException("Unsupported Type : " + type.getTypeName()); } - return convertedType; + return dtFactory.createTypeWithNullability(convertedType, true); } public static RelDataType convert(ListTypeInfo lstType, Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java Fri Nov 7 20:41:34 2014 @@ -18,13 +18,6 @@ package org.apache.hadoop.hive.ql.optimizer.physical; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.ColumnInfo; @@ -60,6 +53,13 @@ import org.apache.hadoop.hive.ql.plan.Ta import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + /** * GenMRSkewJoinProcessor. * @@ -192,9 +192,7 @@ public final class GenMRSkewJoinProcesso String newColName = i + "_VALUE_" + k; // any name, it does not matter. ColumnInfo columnInfo = new ColumnInfo(newColName, type, alias.toString(), false); columnInfos.add(columnInfo); - newValueExpr.add(new ExprNodeColumnDesc( - columnInfo.getType(), columnInfo.getInternalName(), - columnInfo.getTabAlias(), false)); + newValueExpr.add(new ExprNodeColumnDesc(columnInfo)); if (!first) { colNames = colNames + ","; colTypes = colTypes + ","; @@ -216,9 +214,7 @@ public final class GenMRSkewJoinProcesso ColumnInfo columnInfo = new ColumnInfo(joinKeys.get(k), TypeInfoFactory .getPrimitiveTypeInfo(joinKeyTypes.get(k)), alias.toString(), false); columnInfos.add(columnInfo); - newKeyExpr.add(new ExprNodeColumnDesc( - columnInfo.getType(), columnInfo.getInternalName(), - columnInfo.getTabAlias(), false)); + newKeyExpr.add(new ExprNodeColumnDesc(columnInfo)); } newJoinValues.put(alias, newValueExpr); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java Fri Nov 7 20:41:34 2014 @@ -18,13 +18,8 @@ package org.apache.hadoop.hive.ql.optimizer.stats.annotation; -import java.lang.reflect.Field; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Stack; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.Re import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.SelectOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; @@ -77,8 +73,13 @@ import org.apache.hadoop.hive.ql.udf.gen import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; public class StatsRulesProcFactory { @@ -170,7 +171,7 @@ public class StatsRulesProcFactory { // in case of select(*) the data size does not change if (!sop.getConf().isSelectStar() && !sop.getConf().isSelStarNoCompute()) { long dataSize = StatsUtils.getDataSizeFromColumnStats(stats.getNumRows(), colStats); - stats.setDataSize(setMaxIfInvalid(dataSize)); + stats.setDataSize(dataSize); } sop.setStatistics(stats); @@ -322,8 +323,8 @@ public class StatsRulesProcFactory { } else if (udf instanceof GenericUDFOPOr) { // for OR condition independently compute and update stats for (ExprNodeDesc child : genFunc.getChildren()) { - newNumRows += evaluateChildExpr(stats, child, aspCtx, neededCols, - fop); + newNumRows = StatsUtils.safeAdd( + evaluateChildExpr(stats, child, aspCtx, neededCols, fop), newNumRows); } } else if (udf instanceof GenericUDFOPNot) { newNumRows = evaluateNotExpr(stats, pred, aspCtx, neededCols, fop); @@ -677,9 +678,9 @@ public class StatsRulesProcFactory { if (cs != null) { long ndv = cs.getCountDistint(); if (cs.getNumNulls() > 0) { - ndv += 1; + ndv = StatsUtils.safeAdd(ndv, 1); } - ndvProduct *= ndv; + ndvProduct = StatsUtils.safeMult(ndvProduct, ndv); } else { if (parentStats.getColumnStatsState().equals(Statistics.State.COMPLETE)) { // the column must be an aggregate column inserted by GBY. We @@ -714,15 +715,16 @@ public class StatsRulesProcFactory { if (mapSideHashAgg) { if (containsGroupingSet) { // Case 4: column stats, hash aggregation, grouping sets - cardinality = Math.min((parentNumRows * sizeOfGroupingSet) / 2, - ndvProduct * parallelism * sizeOfGroupingSet); + cardinality = Math.min( + (StatsUtils.safeMult(parentNumRows, sizeOfGroupingSet)) / 2, + StatsUtils.safeMult(StatsUtils.safeMult(ndvProduct, parallelism), sizeOfGroupingSet)); if (isDebugEnabled) { LOG.debug("[Case 4] STATS-" + gop.toString() + ": cardinality: " + cardinality); } } else { // Case 3: column stats, hash aggregation, NO grouping sets - cardinality = Math.min(parentNumRows / 2, ndvProduct * parallelism); + cardinality = Math.min(parentNumRows / 2, StatsUtils.safeMult(ndvProduct, parallelism)); if (isDebugEnabled) { LOG.debug("[Case 3] STATS-" + gop.toString() + ": cardinality: " + cardinality); @@ -731,7 +733,7 @@ public class StatsRulesProcFactory { } else { if (containsGroupingSet) { // Case 6: column stats, NO hash aggregation, grouping sets - cardinality = parentNumRows * sizeOfGroupingSet; + cardinality = StatsUtils.safeMult(parentNumRows, sizeOfGroupingSet); if (isDebugEnabled) { LOG.debug("[Case 6] STATS-" + gop.toString() + ": cardinality: " + cardinality); @@ -758,7 +760,7 @@ public class StatsRulesProcFactory { if (containsGroupingSet) { // Case 8: column stats, grouping sets - cardinality = Math.min(parentNumRows, ndvProduct * sizeOfGroupingSet); + cardinality = Math.min(parentNumRows, StatsUtils.safeMult(ndvProduct, sizeOfGroupingSet)); if (isDebugEnabled) { LOG.debug("[Case 8] STATS-" + gop.toString() + ": cardinality: " + cardinality); @@ -789,7 +791,7 @@ public class StatsRulesProcFactory { if (containsGroupingSet) { // Case 2: NO column stats, NO hash aggregation, grouping sets - cardinality = parentNumRows * sizeOfGroupingSet; + cardinality = StatsUtils.safeMult(parentNumRows, sizeOfGroupingSet); if (isDebugEnabled) { LOG.debug("[Case 2] STATS-" + gop.toString() + ": cardinality: " + cardinality); @@ -828,7 +830,6 @@ public class StatsRulesProcFactory { // for those newly added columns if (!colExprMap.containsKey(ci.getInternalName())) { String colName = ci.getInternalName(); - colName = StatsUtils.stripPrefixFromColumnName(colName); String tabAlias = ci.getTabAlias(); String colType = ci.getTypeName(); ColStatistics cs = new ColStatistics(tabAlias, colName, colType); @@ -902,7 +903,7 @@ public class StatsRulesProcFactory { long avgKeySize = 0; for (ColStatistics cs : colStats) { if (cs != null) { - numEstimatedRows *= cs.getCountDistint(); + numEstimatedRows = StatsUtils.safeMult(numEstimatedRows, cs.getCountDistint()); avgKeySize += Math.ceil(cs.getAvgColLen()); } } @@ -956,7 +957,7 @@ public class StatsRulesProcFactory { long hashEntrySize = gop.javaHashEntryOverHead + avgKeySize + avgValSize; // estimated hash table size - long estHashTableSize = numEstimatedRows * hashEntrySize; + long estHashTableSize = StatsUtils.safeMult(numEstimatedRows, hashEntrySize); if (estHashTableSize < maxMemHashAgg) { return true; @@ -1065,7 +1066,7 @@ public class StatsRulesProcFactory { // detect if there are multiple attributes in join key ReduceSinkOperator rsOp = (ReduceSinkOperator) jop.getParentOperators().get(0); - List keyExprs = rsOp.getConf().getKeyCols(); + List keyExprs = rsOp.getConf().getOutputKeyColumnNames(); numAttr = keyExprs.size(); // infer PK-FK relationship in single attribute join case @@ -1077,7 +1078,7 @@ public class StatsRulesProcFactory { ReduceSinkOperator parent = (ReduceSinkOperator) jop.getParentOperators().get(pos); Statistics parentStats = parent.getStatistics(); - keyExprs = parent.getConf().getKeyCols(); + keyExprs = parent.getConf().getOutputKeyColumnNames(); // Parent RS may have column statistics from multiple parents. // Populate table alias to row count map, this will be used later to @@ -1096,8 +1097,8 @@ public class StatsRulesProcFactory { // used to quickly look-up for column statistics of join key. // TODO: expressions in join condition will be ignored. assign // internal name for expressions and estimate column statistics for expression. - List fqCols = - StatsUtils.getFullQualifedColNameFromExprs(keyExprs, parent.getColumnExprMap()); + List fqCols = StatsUtils.getFullyQualifedReducerKeyNames(keyExprs, + parent.getColumnExprMap()); joinKeys.put(pos, fqCols); // get column statistics for all output columns @@ -1119,7 +1120,6 @@ public class StatsRulesProcFactory { for (int idx = 0; idx < numAttr; idx++) { for (Integer i : joinKeys.keySet()) { String col = joinKeys.get(i).get(idx); - col = StatsUtils.stripPrefixFromColumnName(col); ColStatistics cs = joinedColStats.get(col); if (cs != null) { perAttrDVs.add(cs.getCountDistint()); @@ -1136,13 +1136,12 @@ public class StatsRulesProcFactory { denom = getEasedOutDenominator(distinctVals); } else { for (Long l : distinctVals) { - denom *= l; + denom = StatsUtils.safeMult(denom, l); } } } else { for (List jkeys : joinKeys.values()) { for (String jk : jkeys) { - jk = StatsUtils.stripPrefixFromColumnName(jk); ColStatistics cs = joinedColStats.get(jk); if (cs != null) { distinctVals.add(cs.getCountDistint()); @@ -1166,7 +1165,6 @@ public class StatsRulesProcFactory { ExprNodeDesc end = colExprMap.get(key); if (end instanceof ExprNodeColumnDesc) { String colName = ((ExprNodeColumnDesc) end).getColumn(); - colName = StatsUtils.stripPrefixFromColumnName(colName); String tabAlias = ((ExprNodeColumnDesc) end).getTabAlias(); String fqColName = StatsUtils.getFullyQualifiedColumnName(tabAlias, colName); ColStatistics cs = joinedColStats.get(fqColName); @@ -1214,13 +1212,13 @@ public class StatsRulesProcFactory { } long maxDataSize = parentSizes.get(maxRowIdx); - long newNumRows = (long) (joinFactor * maxRowCount * (numParents - 1)); - long newDataSize = (long) (joinFactor * maxDataSize * (numParents - 1)); + long newNumRows = StatsUtils.safeMult(StatsUtils.safeMult(maxRowCount, (numParents - 1)), joinFactor); + long newDataSize = StatsUtils.safeMult(StatsUtils.safeMult(maxDataSize, (numParents - 1)), joinFactor); Statistics wcStats = new Statistics(); - wcStats.setNumRows(setMaxIfInvalid(newNumRows)); - wcStats.setDataSize(setMaxIfInvalid(newDataSize)); + wcStats.setNumRows(newNumRows); + wcStats.setDataSize(newDataSize); jop.setStatistics(wcStats); - + if (isDebugEnabled) { LOG.debug("[1] STATS-" + jop.toString() + ": " + wcStats.extendedToString()); } @@ -1339,6 +1337,7 @@ public class StatsRulesProcFactory { } } + // No need for overflow checks, assume selectivity is always <= 1.0 float selMultiParent = 1.0f; for(Operator parent : multiParentOp.getParentOperators()) { // In the above example, TS-1 -> RS-1 and TS-2 -> RS-2 are simple trees @@ -1369,8 +1368,8 @@ public class StatsRulesProcFactory { Operator op = ops.get(i); if (op != null && op instanceof ReduceSinkOperator) { ReduceSinkOperator rsOp = (ReduceSinkOperator) op; - List keys = rsOp.getConf().getKeyCols(); - List fqCols = StatsUtils.getFullQualifedColNameFromExprs(keys, + List keys = rsOp.getConf().getOutputKeyColumnNames(); + List fqCols = StatsUtils.getFullyQualifedReducerKeyNames(keys, rsOp.getColumnExprMap()); if (fqCols.size() == 1) { String joinCol = fqCols.get(0); @@ -1400,8 +1399,8 @@ public class StatsRulesProcFactory { Operator op = ops.get(i); if (op instanceof ReduceSinkOperator) { ReduceSinkOperator rsOp = (ReduceSinkOperator) op; - List keys = rsOp.getConf().getKeyCols(); - List fqCols = StatsUtils.getFullQualifedColNameFromExprs(keys, + List keys = rsOp.getConf().getOutputKeyColumnNames(); + List fqCols = StatsUtils.getFullyQualifedReducerKeyNames(keys, rsOp.getColumnExprMap()); if (fqCols.size() == 1) { String joinCol = fqCols.get(0); @@ -1441,7 +1440,7 @@ public class StatsRulesProcFactory { LOG.info("STATS-" + jop.toString() + ": Overflow in number of rows." + newNumRows + " rows will be set to Long.MAX_VALUE"); } - newNumRows = setMaxIfInvalid(newNumRows); + newNumRows = StatsUtils.getMaxIfOverflow(newNumRows); stats.setNumRows(newNumRows); // scale down/up the column statistics based on the changes in number of @@ -1472,7 +1471,7 @@ public class StatsRulesProcFactory { stats.setColumnStats(colStats); long newDataSize = StatsUtils .getDataSizeFromColumnStats(newNumRows, colStats); - stats.setDataSize(setMaxIfInvalid(newDataSize)); + stats.setDataSize(StatsUtils.getMaxIfOverflow(newDataSize)); } private long computeNewRowCount(List rowCountParents, long denom) { @@ -1494,7 +1493,7 @@ public class StatsRulesProcFactory { for (int i = 0; i < rowCountParents.size(); i++) { if (i != maxIdx) { - result *= rowCountParents.get(i); + result = StatsUtils.safeMult(result, rowCountParents.get(i)); } } @@ -1512,7 +1511,6 @@ public class StatsRulesProcFactory { // find min NDV for joining columns for (Map.Entry> entry : joinKeys.entrySet()) { String key = entry.getValue().get(joinColIdx); - key = StatsUtils.stripPrefixFromColumnName(key); ColStatistics cs = joinedColStats.get(key); if (cs != null && cs.getCountDistint() < minNDV) { minNDV = cs.getCountDistint(); @@ -1523,7 +1521,6 @@ public class StatsRulesProcFactory { if (minNDV != Long.MAX_VALUE) { for (Map.Entry> entry : joinKeys.entrySet()) { String key = entry.getValue().get(joinColIdx); - key = StatsUtils.stripPrefixFromColumnName(key); ColStatistics cs = joinedColStats.get(key); if (cs != null) { cs.setCountDistint(minNDV); @@ -1569,7 +1566,7 @@ public class StatsRulesProcFactory { long denom = 1; for (int i = 0; i < distinctVals.size(); i++) { if (i != minIdx) { - denom *= distinctVals.get(i); + denom = StatsUtils.safeMult(denom, distinctVals.get(i)); } } return denom; @@ -1613,12 +1610,13 @@ public class StatsRulesProcFactory { // in the absence of column statistics, compute data size based on // based on average row size Statistics wcStats = parentStats.clone(); + limit = StatsUtils.getMaxIfOverflow(limit); if (limit <= parentStats.getNumRows()) { long numRows = limit; long avgRowSize = parentStats.getAvgRowSize(); - long dataSize = avgRowSize * limit; - wcStats.setNumRows(setMaxIfInvalid(numRows)); - wcStats.setDataSize(setMaxIfInvalid(dataSize)); + long dataSize = StatsUtils.safeMult(avgRowSize, limit); + wcStats.setNumRows(numRows); + wcStats.setDataSize(dataSize); } lop.setStatistics(wcStats); @@ -1662,26 +1660,26 @@ public class StatsRulesProcFactory { if (satisfyPrecondition(parentStats)) { List colStats = Lists.newArrayList(); for (String key : outKeyColNames) { - String prefixedKey = "KEY." + key; + String prefixedKey = Utilities.ReduceField.KEY.toString() + "." + key; ExprNodeDesc end = colExprMap.get(prefixedKey); if (end != null) { ColStatistics cs = StatsUtils .getColStatisticsFromExpression(conf, parentStats, end); if (cs != null) { - cs.setColumnName(key); + cs.setColumnName(prefixedKey); colStats.add(cs); } } } for (String val : outValueColNames) { - String prefixedVal = "VALUE." + val; + String prefixedVal = Utilities.ReduceField.VALUE.toString() + "." + val; ExprNodeDesc end = colExprMap.get(prefixedVal); if (end != null) { ColStatistics cs = StatsUtils .getColStatisticsFromExpression(conf, parentStats, end); if (cs != null) { - cs.setColumnName(val); + cs.setColumnName(prefixedVal); colStats.add(cs); } } @@ -1815,7 +1813,7 @@ public class StatsRulesProcFactory { + newNumRows + " rows will be set to Long.MAX_VALUE"); } - newNumRows = setMaxIfInvalid(newNumRows); + newNumRows = StatsUtils.getMaxIfOverflow(newNumRows); long oldRowCount = stats.getNumRows(); double ratio = (double) newNumRows / (double) oldRowCount; stats.setNumRows(newNumRows); @@ -1842,10 +1840,10 @@ public class StatsRulesProcFactory { } stats.setColumnStats(colStats); long newDataSize = StatsUtils.getDataSizeFromColumnStats(newNumRows, colStats); - stats.setDataSize(setMaxIfInvalid(newDataSize)); + stats.setDataSize(StatsUtils.getMaxIfOverflow(newDataSize)); } else { long newDataSize = (long) (ratio * stats.getDataSize()); - stats.setDataSize(setMaxIfInvalid(newDataSize)); + stats.setDataSize(StatsUtils.getMaxIfOverflow(newDataSize)); } } @@ -1853,14 +1851,4 @@ public class StatsRulesProcFactory { return stats != null && stats.getBasicStatsState().equals(Statistics.State.COMPLETE) && !stats.getColumnStatsState().equals(Statistics.State.NONE); } - - /** - * negative number of rows or data sizes are invalid. It could be because of - * long overflow in which case return Long.MAX_VALUE - * @param val - input value - * @return Long.MAX_VALUE if val is negative else val - */ - static long setMaxIfInvalid(long val) { - return val < 0 ? Long.MAX_VALUE : val; - } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java Fri Nov 7 20:41:34 2014 @@ -22,6 +22,7 @@ import java.util.List; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.exec.ExplainSQRewriteTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.plan.ExplainSQRewriteWork; @@ -55,7 +56,7 @@ public class ExplainSQRewriteSemanticAna ctx ); - Task explTask = TaskFactory.get(work, conf); + ExplainSQRewriteTask explTask = (ExplainSQRewriteTask) TaskFactory.get(work, conf); fieldList = explTask.getResultSchema(); rootTasks.add(explTask); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java Fri Nov 7 20:41:34 2014 @@ -106,7 +106,7 @@ public class ExplainSemanticAnalyzer ext work.setAppendTaskType( HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEEXPLAINDEPENDENCYAPPENDTASKTYPES)); - Task explTask = TaskFactory.get(work, conf); + ExplainTask explTask = (ExplainTask) TaskFactory.get(work, conf); fieldList = explTask.getResultSchema(); rootTasks.add(explTask); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/JoinCond.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/JoinCond.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/JoinCond.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/JoinCond.java Fri Nov 7 20:41:34 2014 @@ -79,4 +79,7 @@ public class JoinCond { this.joinType = joinType; } + public void setPreserved(boolean preserved) { + this.preserved = preserved; + } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java Fri Nov 7 20:41:34 2014 @@ -238,6 +238,8 @@ public class LoadSemanticAnalyzer extend // create final load/move work + boolean preservePartitionSpecs = false; + Map partSpec = ts.getPartSpec(); if (partSpec == null) { partSpec = new LinkedHashMap(); @@ -252,9 +254,14 @@ public class LoadSemanticAnalyzer extend throw new SemanticException(ErrorMsg.OFFLINE_TABLE_OR_PARTITION. getMsg(ts.tableName + ":" + part.getName())); } - outputs.add(new WriteEntity(part, - (isOverWrite ? WriteEntity.WriteType.INSERT_OVERWRITE : - WriteEntity.WriteType.INSERT))); + if (isOverWrite){ + outputs.add(new WriteEntity(part, WriteEntity.WriteType.INSERT_OVERWRITE)); + } else { + outputs.add(new WriteEntity(part, WriteEntity.WriteType.INSERT)); + // If partition already exists and we aren't overwriting it, then respect + // its current location info rather than picking it from the parent TableDesc + preservePartitionSpecs = true; + } } else { outputs.add(new WriteEntity(ts.tableHandle, (isOverWrite ? WriteEntity.WriteType.INSERT_OVERWRITE : @@ -269,6 +276,12 @@ public class LoadSemanticAnalyzer extend LoadTableDesc loadTableWork; loadTableWork = new LoadTableDesc(new Path(fromURI), Utilities.getTableDesc(ts.tableHandle), partSpec, isOverWrite); + if (preservePartitionSpecs){ + // Note : preservePartitionSpecs=true implies inheritTableSpecs=false but + // but preservePartitionSpecs=false(default) here is not sufficient enough + // info to set inheritTableSpecs=true + loadTableWork.setInheritTableSpecs(false); + } Task childTask = TaskFactory.get(new MoveWork(getInputs(), getOutputs(), loadTableWork, null, true, isLocal), conf);