Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 932C1200C04 for ; Tue, 24 Jan 2017 21:02:52 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 91D64160B38; Tue, 24 Jan 2017 20:02:52 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2186A160B5F for ; Tue, 24 Jan 2017 21:02:49 +0100 (CET) Received: (qmail 74827 invoked by uid 500); 24 Jan 2017 20:02:49 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 74471 invoked by uid 99); 24 Jan 2017 20:02:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Jan 2017 20:02:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 871B8DFCA0; Tue, 24 Jan 2017 20:02:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jdere@apache.org To: commits@hive.apache.org Date: Tue, 24 Jan 2017 20:02:53 -0000 Message-Id: In-Reply-To: <71697f9c61174c1a81d391a8f686df8d@git.apache.org> References: <71697f9c61174c1a81d391a8f686df8d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [6/6] hive git commit: HIVE-15269: Dynamic Min-Max/BloomFilter runtime-filtering for Tez (Deepak Jaiswal via Jason Dere) archived-at: Tue, 24 Jan 2017 20:02:52 -0000 HIVE-15269: Dynamic Min-Max/BloomFilter runtime-filtering for Tez (Deepak Jaiswal via Jason Dere) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cc3fd84e Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cc3fd84e Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cc3fd84e Branch: refs/heads/master Commit: cc3fd84ee3ac2f855e257a94bad894909bf628f4 Parents: 3040f6e Author: Jason Dere Authored: Tue Jan 24 12:01:41 2017 -0800 Committer: Jason Dere Committed: Tue Jan 24 12:01:41 2017 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 5 + .../test/resources/testconfiguration.properties | 2 + .../apache/orc/impl/TestRecordReaderImpl.java | 2 +- .../hive/ql/exec/AbstractMapJoinOperator.java | 2 +- .../hadoop/hive/ql/exec/CommonJoinOperator.java | 4 +- .../hive/ql/exec/DynamicValueRegistry.java | 30 + .../hive/ql/exec/ExprNodeColumnEvaluator.java | 5 +- .../exec/ExprNodeConstantDefaultEvaluator.java | 7 +- .../hive/ql/exec/ExprNodeConstantEvaluator.java | 7 +- .../ql/exec/ExprNodeDynamicValueEvaluator.java | 54 + .../hadoop/hive/ql/exec/ExprNodeEvaluator.java | 13 +- .../hive/ql/exec/ExprNodeEvaluatorFactory.java | 18 +- .../hive/ql/exec/ExprNodeEvaluatorHead.java | 2 +- .../hive/ql/exec/ExprNodeEvaluatorRef.java | 2 +- .../hive/ql/exec/ExprNodeFieldEvaluator.java | 7 +- .../ql/exec/ExprNodeGenericFuncEvaluator.java | 7 +- .../hadoop/hive/ql/exec/FilterOperator.java | 2 +- .../hadoop/hive/ql/exec/FunctionRegistry.java | 4 +- .../hadoop/hive/ql/exec/GroupByOperator.java | 36 +- .../hive/ql/exec/HashTableSinkOperator.java | 6 +- .../apache/hadoop/hive/ql/exec/JoinUtil.java | 8 +- .../apache/hadoop/hive/ql/exec/ObjectCache.java | 10 + .../hadoop/hive/ql/exec/ObjectCacheWrapper.java | 5 + .../hadoop/hive/ql/exec/SelectOperator.java | 2 +- .../hadoop/hive/ql/exec/mr/ObjectCache.java | 5 + .../ql/exec/tez/DynamicValueRegistryTez.java | 131 ++ .../hive/ql/exec/tez/LlapObjectCache.java | 18 + .../hive/ql/exec/tez/MapRecordProcessor.java | 29 +- .../hadoop/hive/ql/exec/tez/ObjectCache.java | 16 + .../hive/ql/exec/tez/ReduceRecordProcessor.java | 29 +- .../ql/exec/vector/VectorMapJoinOperator.java | 2 +- .../exec/vector/VectorSMBMapJoinOperator.java | 2 +- .../hive/ql/io/sarg/ConvertAstToSearchArg.java | 56 +- .../DynamicPartitionPruningOptimization.java | 342 +++- .../optimizer/FixedBucketPruningOptimizer.java | 2 +- ...edundantDynamicPruningConditionsRemoval.java | 24 +- .../stats/annotation/StatsRulesProcFactory.java | 64 +- .../hadoop/hive/ql/parse/GenTezUtils.java | 170 +- .../hadoop/hive/ql/parse/ParseContext.java | 21 + .../hadoop/hive/ql/parse/RuntimeValuesInfo.java | 62 + .../hadoop/hive/ql/parse/TaskCompiler.java | 3 + .../hadoop/hive/ql/parse/TezCompiler.java | 422 ++++- .../hadoop/hive/ql/plan/AggregationDesc.java | 7 + .../apache/hadoop/hive/ql/plan/BaseWork.java | 15 + .../hadoop/hive/ql/plan/DynamicValue.java | 137 ++ .../hive/ql/plan/ExprNodeDynamicValueDesc.java | 76 + .../ql/udf/generic/GenericUDAFBloomFilter.java | 267 +++ .../ql/udf/generic/GenericUDAFEvaluator.java | 7 + .../ql/udf/generic/GenericUDFInBloomFilter.java | 168 ++ .../ql/io/sarg/TestConvertAstToSearchArg.java | 39 +- .../hive/ql/io/sarg/TestSearchArgumentImpl.java | 2 +- .../ql/optimizer/physical/TestVectorizer.java | 26 +- .../clientpositive/dynamic_semijoin_reduction.q | 68 + .../llap/dynamic_partition_pruning.q.out | 69 +- .../llap/dynamic_semijoin_reduction.q.out | 1535 ++++++++++++++++++ .../clientpositive/llap/join32_lessSize.q.out | 2 +- .../clientpositive/llap/llap_partitioned.q.out | 2 +- .../results/clientpositive/llap/mergejoin.q.out | 561 ++++++- .../results/clientpositive/llap/orc_llap.q.out | 78 +- .../clientpositive/llap/subquery_scalar.q.out | 16 +- .../vectorized_dynamic_partition_pruning.q.out | 71 +- .../results/clientpositive/perf/query16.q.out | 14 +- .../results/clientpositive/perf/query6.q.out | 8 +- .../results/clientpositive/perf/query83.q.out | 16 +- .../results/clientpositive/show_functions.q.out | 2 + .../clientpositive/tez/explainanalyze_3.q.out | 127 +- .../clientpositive/tez/explainuser_3.q.out | 131 +- .../hadoop/hive/ql/io/sarg/LiteralDelegate.java | 31 + .../hive/ql/io/sarg/SearchArgumentFactory.java | 9 +- .../hive/ql/io/sarg/SearchArgumentImpl.java | 65 +- .../apache/hive/common/util/BloomFilter.java | 51 + 71 files changed, 4741 insertions(+), 497 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index fc9734b..8e319c6 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2851,6 +2851,11 @@ public class HiveConf extends Configuration { TEZ_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE("hive.tez.dynamic.partition.pruning.max.data.size", 100*1024*1024L, "Maximum total data size of events in dynamic pruning."), + TEZ_DYNAMIC_SEMIJOIN_REDUCTION("hive.tez.dynamic.semijoin.reduction", true, + "When dynamic semijoin is enabled, shuffle joins will perform a leaky semijoin before shuffle. This " + + "requires hive.tez.dynamic.partition.pruning to be enabled."), + TEZ_MAX_BLOOM_FILTER_ENTRIES("hive.tez.max.bloom.filter.entries", 100000000L, + "Bloom filter should be of at max certain size to be effective"), TEZ_SMB_NUMBER_WAVES( "hive.tez.smb.number.waves", (float) 0.5, http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 792de2e..bd76b7d 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -154,6 +154,7 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\ delete_whole_partition.q,\ disable_merge_for_bucketing.q,\ dynamic_partition_pruning.q,\ + dynamic_semijoin_reduction.q,\ dynpart_sort_opt_vectorization.q,\ dynpart_sort_optimization.q,\ dynpart_sort_optimization2.q,\ @@ -480,6 +481,7 @@ minillaplocal.query.files=acid_globallimit.q,\ correlationoptimizer6.q,\ disable_merge_for_bucketing.q,\ dynamic_partition_pruning.q,\ + dynamic_semijoin_reduction.q,\ dynpart_sort_opt_vectorization.q,\ dynpart_sort_optimization.q,\ dynpart_sort_optimization_acid.q,\ http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/orc/src/test/org/apache/orc/impl/TestRecordReaderImpl.java ---------------------------------------------------------------------- diff --git a/orc/src/test/org/apache/orc/impl/TestRecordReaderImpl.java b/orc/src/test/org/apache/orc/impl/TestRecordReaderImpl.java index cdd62ac..30b42ee 100644 --- a/orc/src/test/org/apache/orc/impl/TestRecordReaderImpl.java +++ b/orc/src/test/org/apache/orc/impl/TestRecordReaderImpl.java @@ -76,7 +76,7 @@ public class TestRecordReaderImpl { Object literal, List literalList) { return new SearchArgumentImpl.PredicateLeafImpl(operator, type, columnName, - literal, literalList); + literal, literalList, null); } // can add .verboseLogging() to cause Mockito to log invocations http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java index 69ba4a2..669e23e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java @@ -70,7 +70,7 @@ public abstract class AbstractMapJoinOperator extends Co if (conf.getGenJoinKeys()) { int tagLen = conf.getTagLength(); joinKeys = new List[tagLen]; - JoinUtil.populateJoinKeyValue(joinKeys, conf.getKeys(), NOTSKIPBIGTABLE); + JoinUtil.populateJoinKeyValue(joinKeys, conf.getKeys(), NOTSKIPBIGTABLE, hconf); joinKeysObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinKeys, inputObjInspectors,NOTSKIPBIGTABLE, tagLen); } http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java index 7e9007c..df1898e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java @@ -254,11 +254,11 @@ public abstract class CommonJoinOperator extends noOuterJoin = conf.isNoOuterJoin(); totalSz = JoinUtil.populateJoinKeyValue(joinValues, conf.getExprs(), - order,NOTSKIPBIGTABLE); + order,NOTSKIPBIGTABLE, hconf); //process join filters joinFilters = new List[tagLen]; - JoinUtil.populateJoinKeyValue(joinFilters, conf.getFilters(),order,NOTSKIPBIGTABLE); + JoinUtil.populateJoinKeyValue(joinFilters, conf.getFilters(),order,NOTSKIPBIGTABLE, hconf); joinValuesObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinValues, http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/DynamicValueRegistry.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DynamicValueRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DynamicValueRegistry.java new file mode 100644 index 0000000..63336bd --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DynamicValueRegistry.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +public interface DynamicValueRegistry { + + // Abstract class to hold info required for the implementation + public static abstract class RegistryConf { + } + + Object getValue(String key) throws Exception; + + void init(RegistryConf conf) throws Exception; +} http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java index 24c8281..b0384df 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -38,8 +39,8 @@ public class ExprNodeColumnEvaluator extends ExprNodeEvaluator { + + transient ObjectInspector oi; + + public ExprNodeDynamicValueEvaluator(ExprNodeDynamicValueDesc expr, Configuration conf) { + super(expr, conf); + oi = ObjectInspectorUtils.getStandardObjectInspector(expr.getWritableObjectInspector(), ObjectInspectorCopyOption.WRITABLE); + } + + @Override + public ObjectInspector initialize(ObjectInspector rowInspector) throws HiveException { + return oi; + } + + @Override + protected Object _evaluate(Object row, int version) throws HiveException { + DynamicValue dynamicValue = expr.getDynamicValue(); + dynamicValue.setConf(conf); + return dynamicValue.getWritableValue(); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluator.java index b8d6ab7..375d65f 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -30,9 +31,11 @@ public abstract class ExprNodeEvaluator { protected final T expr; protected ObjectInspector outputOI; + protected Configuration conf; - public ExprNodeEvaluator(T expr) { + public ExprNodeEvaluator(T expr, Configuration conf) { this.expr = expr; + this.conf = conf; } /** @@ -109,4 +112,12 @@ public abstract class ExprNodeEvaluator { public String toString() { return "ExprNodeEvaluator[" + expr + "]"; } + + public Configuration getConf() { + return conf; + } + + public void setConf(Configuration conf) { + this.conf = conf; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorFactory.java index 0d03d8f..34aec55 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorFactory.java @@ -21,11 +21,13 @@ package org.apache.hadoop.hive.ql.exec; import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDefaultDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; @@ -39,9 +41,13 @@ public final class ExprNodeEvaluatorFactory { } public static ExprNodeEvaluator get(ExprNodeDesc desc) throws HiveException { + return get(desc, null); + } + + public static ExprNodeEvaluator get(ExprNodeDesc desc, Configuration conf) throws HiveException { // Constant node if (desc instanceof ExprNodeConstantDesc) { - return new ExprNodeConstantEvaluator((ExprNodeConstantDesc) desc); + return new ExprNodeConstantEvaluator((ExprNodeConstantDesc) desc, conf); } // Special 'default' constant node @@ -51,15 +57,19 @@ public final class ExprNodeEvaluatorFactory { // Column-reference node, e.g. a column in the input row if (desc instanceof ExprNodeColumnDesc) { - return new ExprNodeColumnEvaluator((ExprNodeColumnDesc) desc); + return new ExprNodeColumnEvaluator((ExprNodeColumnDesc) desc, conf); } // Generic Function node, e.g. CASE, an operator or a UDF node if (desc instanceof ExprNodeGenericFuncDesc) { - return new ExprNodeGenericFuncEvaluator((ExprNodeGenericFuncDesc) desc); + return new ExprNodeGenericFuncEvaluator((ExprNodeGenericFuncDesc) desc, conf); } // Field node, e.g. get a.myfield1 from a if (desc instanceof ExprNodeFieldDesc) { - return new ExprNodeFieldEvaluator((ExprNodeFieldDesc) desc); + return new ExprNodeFieldEvaluator((ExprNodeFieldDesc) desc, conf); + } + // Dynamic value which will be determined during query runtime + if (desc instanceof ExprNodeDynamicValueDesc) { + return new ExprNodeDynamicValueEvaluator((ExprNodeDynamicValueDesc) desc, conf); } throw new RuntimeException( "Cannot find ExprNodeEvaluator for the exprNodeDesc = " + desc); http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorHead.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorHead.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorHead.java index 42685fb..991bc13 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorHead.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorHead.java @@ -30,7 +30,7 @@ public class ExprNodeEvaluatorHead extends ExprNodeEvaluator { private final ExprNodeEvaluator referencing; public ExprNodeEvaluatorHead(ExprNodeEvaluator referencing) { - super(referencing.getExpr()); + super(referencing.getExpr(), referencing.getConf()); this.referencing = referencing; } http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorRef.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorRef.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorRef.java index 0a6b66a..625d486 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorRef.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorRef.java @@ -30,7 +30,7 @@ public class ExprNodeEvaluatorRef extends ExprNodeEvaluator { private final ExprNodeEvaluator referencing; public ExprNodeEvaluatorRef(ExprNodeEvaluator referencing) { - super(referencing.getExpr()); + super(referencing.getExpr(), referencing.getConf()); this.referencing = referencing; } http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFieldEvaluator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFieldEvaluator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFieldEvaluator.java index ff32626..1241343 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFieldEvaluator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFieldEvaluator.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec; import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc; import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; @@ -43,9 +44,9 @@ public class ExprNodeFieldEvaluator extends ExprNodeEvaluator transient ObjectInspector structFieldObjectInspector; transient ObjectInspector resultObjectInspector; - public ExprNodeFieldEvaluator(ExprNodeFieldDesc desc) throws HiveException { - super(desc); - leftEvaluator = ExprNodeEvaluatorFactory.get(desc.getDesc()); + public ExprNodeFieldEvaluator(ExprNodeFieldDesc desc, Configuration conf) throws HiveException { + super(desc, conf); + leftEvaluator = ExprNodeEvaluatorFactory.get(desc.getDesc(), conf); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java index 221abd9..8b9baa6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; @@ -91,13 +92,13 @@ public class ExprNodeGenericFuncEvaluator extends ExprNodeEvaluator implements try { heartbeatInterval = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVESENDHEARTBEAT); - conditionEvaluator = ExprNodeEvaluatorFactory.get(conf.getPredicate()); + conditionEvaluator = ExprNodeEvaluatorFactory.get(conf.getPredicate(), hconf); if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEEXPREVALUATIONCACHE)) { conditionEvaluator = ExprNodeEvaluatorFactory.toCachedEval(conditionEvaluator); } http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java index 4fce1ac..e166eee 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java @@ -370,6 +370,7 @@ public final class FunctionRegistry { system.registerGenericUDF("not", GenericUDFOPNot.class); system.registerGenericUDF("!", GenericUDFOPNot.class); system.registerGenericUDF("between", GenericUDFBetween.class); + system.registerGenericUDF("in_bloom_filter", GenericUDFInBloomFilter.class); system.registerGenericUDF("ewah_bitmap_and", GenericUDFEWAHBitmapAnd.class); system.registerGenericUDF("ewah_bitmap_or", GenericUDFEWAHBitmapOr.class); @@ -427,7 +428,7 @@ public final class FunctionRegistry { system.registerGenericUDAF("ewah_bitmap", new GenericUDAFEWAHBitmap()); system.registerGenericUDAF("compute_stats", new GenericUDAFComputeStats()); - + system.registerGenericUDAF("bloom_filter", new GenericUDAFBloomFilter()); system.registerUDAF("percentile", UDAFPercentile.class); @@ -472,7 +473,6 @@ public final class FunctionRegistry { system.registerGenericUDF("to_unix_timestamp", GenericUDFToUnixTimeStamp.class); system.registerGenericUDF("internal_interval", GenericUDFInternalInterval.class); - // Generic UDTF's system.registerGenericUDTF("explode", GenericUDTFExplode.class); system.registerGenericUDTF("replicate_rows", GenericUDTFReplicateRows.class); http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java index 073147f..be561ce 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java @@ -212,7 +212,7 @@ public class GroupByOperator extends Operator { keyObjectInspectors = new ObjectInspector[numKeys]; currentKeyObjectInspectors = new ObjectInspector[numKeys]; for (int i = 0; i < numKeys; i++) { - keyFields[i] = ExprNodeEvaluatorFactory.get(conf.getKeys().get(i)); + keyFields[i] = ExprNodeEvaluatorFactory.get(conf.getKeys().get(i), hconf); keyObjectInspectors[i] = keyFields[i].initialize(rowInspector); currentKeyObjectInspectors[i] = ObjectInspectorUtils .getStandardObjectInspector(keyObjectInspectors[i], @@ -258,7 +258,7 @@ public class GroupByOperator extends Operator { new ExprNodeColumnDesc(TypeInfoUtils.getTypeInfoFromObjectInspector( sf.getFieldObjectInspector()), keyField.getFieldName() + "." + sf.getFieldName(), null, - false)); + false), hconf); unionExprEval.initialize(rowInspector); } } @@ -283,7 +283,7 @@ public class GroupByOperator extends Operator { aggregationParameterObjects[i] = new Object[parameters.size()]; for (int j = 0; j < parameters.size(); j++) { aggregationParameterFields[i][j] = ExprNodeEvaluatorFactory - .get(parameters.get(j)); + .get(parameters.get(j), hconf); aggregationParameterObjectInspectors[i][j] = aggregationParameterFields[i][j] .initialize(rowInspector); if (unionExprEval != null) { @@ -352,6 +352,21 @@ public class GroupByOperator extends Operator { } } + // grouping id should be pruned, which is the last of key columns + // see ColumnPrunerGroupByProc + outputKeyLength = conf.pruneGroupingSetId() ? keyFields.length - 1 : keyFields.length; + + // init objectInspectors + ObjectInspector[] objectInspectors = + new ObjectInspector[outputKeyLength + aggregationEvaluators.length]; + for (int i = 0; i < outputKeyLength; i++) { + objectInspectors[i] = currentKeyObjectInspectors[i]; + } + for (int i = 0; i < aggregationEvaluators.length; i++) { + objectInspectors[outputKeyLength + i] = aggregationEvaluators[i].init(conf.getAggregators() + .get(i).getMode(), aggregationParameterObjectInspectors[i]); + } + aggregationsParametersLastInvoke = new Object[conf.getAggregators().size()][]; if ((conf.getMode() != GroupByDesc.Mode.HASH || conf.getBucketGroup()) && (!groupingSetsPresent)) { @@ -374,21 +389,6 @@ public class GroupByOperator extends Operator { List fieldNames = new ArrayList(conf.getOutputColumnNames()); - // grouping id should be pruned, which is the last of key columns - // see ColumnPrunerGroupByProc - outputKeyLength = conf.pruneGroupingSetId() ? keyFields.length - 1 : keyFields.length; - - // init objectInspectors - ObjectInspector[] objectInspectors = - new ObjectInspector[outputKeyLength + aggregationEvaluators.length]; - for (int i = 0; i < outputKeyLength; i++) { - objectInspectors[i] = currentKeyObjectInspectors[i]; - } - for (int i = 0; i < aggregationEvaluators.length; i++) { - objectInspectors[outputKeyLength + i] = aggregationEvaluators[i].init(conf.getAggregators() - .get(i).getMode(), aggregationParameterObjectInspectors[i]); - } - outputObjInspector = ObjectInspectorFactory .getStandardStructObjectInspector(fieldNames, Arrays.asList(objectInspectors)); http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java index ac5331e..3a366f6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java @@ -143,19 +143,19 @@ public class HashTableSinkOperator extends TerminalOperator i // process join keys joinKeys = new List[tagLen]; - JoinUtil.populateJoinKeyValue(joinKeys, conf.getKeys(), posBigTableAlias); + JoinUtil.populateJoinKeyValue(joinKeys, conf.getKeys(), posBigTableAlias, hconf); joinKeysObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinKeys, inputObjInspectors, posBigTableAlias, tagLen); // process join values joinValues = new List[tagLen]; - JoinUtil.populateJoinKeyValue(joinValues, conf.getExprs(), posBigTableAlias); + JoinUtil.populateJoinKeyValue(joinValues, conf.getExprs(), posBigTableAlias, hconf); joinValuesObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinValues, inputObjInspectors, posBigTableAlias, tagLen); // process join filters joinFilters = new List[tagLen]; - JoinUtil.populateJoinKeyValue(joinFilters, conf.getFilters(), posBigTableAlias); + JoinUtil.populateJoinKeyValue(joinFilters, conf.getFilters(), posBigTableAlias, hconf); joinFilterObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinFilters, inputObjInspectors, posBigTableAlias, tagLen); http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java index 9718c48..07a3dc6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java @@ -121,14 +121,14 @@ public class JoinUtil { } public static int populateJoinKeyValue(List[] outMap, - Map> inputMap, int posBigTableAlias) throws HiveException { - return populateJoinKeyValue(outMap, inputMap, null, posBigTableAlias); + Map> inputMap, int posBigTableAlias, Configuration conf) throws HiveException { + return populateJoinKeyValue(outMap, inputMap, null, posBigTableAlias, conf); } public static int populateJoinKeyValue(List[] outMap, Map> inputMap, Byte[] order, - int posBigTableAlias) throws HiveException { + int posBigTableAlias, Configuration conf) throws HiveException { int total = 0; for (Entry> e : inputMap.entrySet()) { if (e.getValue() == null) { @@ -140,7 +140,7 @@ public class JoinUtil { if (key == (byte) posBigTableAlias) { valueFields.add(null); } else { - valueFields.add(ExprNodeEvaluatorFactory.get(expr)); + valueFields.add(ExprNodeEvaluatorFactory.get(expr, conf)); } } outMap[key] = valueFields; http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java index 440e0a1..b931c95 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java @@ -44,6 +44,16 @@ public interface ObjectCache { public T retrieve(String key, Callable fn) throws HiveException; /** + * Retrieve object from cache. + * + * @param + * @param key + * function to generate the object if it's not there + * @return the last cached object with the key, null if none. + */ + public T retrieve(String key) throws HiveException; + + /** * Retrieve object from cache asynchronously. * * @param http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheWrapper.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheWrapper.java index 9768efa..71bcd98 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheWrapper.java @@ -36,6 +36,11 @@ public class ObjectCacheWrapper implements ObjectCache { } @Override + public T retrieve(String key) throws HiveException { + return globalCache.retrieve(makeKey(key)); + } + + @Override public T retrieve(String key, Callable fn) throws HiveException { return globalCache.retrieve(makeKey(key), fn); } http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java index 9049ddd..a30c771 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java @@ -63,7 +63,7 @@ public class SelectOperator extends Operator implements Serializable eval = new ExprNodeEvaluator[colList.size()]; for (int i = 0; i < colList.size(); i++) { assert (colList.get(i) != null); - eval[i] = ExprNodeEvaluatorFactory.get(colList.get(i)); + eval[i] = ExprNodeEvaluatorFactory.get(colList.get(i), hconf); } if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEEXPREVALUATIONCACHE)) { eval = ExprNodeEvaluatorFactory.toCachedEvals(eval); http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java index 008f8a4..cfe1750 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java @@ -47,6 +47,11 @@ public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache { } @Override + public T retrieve(String key) throws HiveException { + return retrieve(key, null); + } + + @Override public T retrieve(String key, Callable fn) throws HiveException { try { if (isDebugEnabled) { http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java new file mode 100644 index 0000000..7bbedf6 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory; +import org.apache.hadoop.hive.ql.exec.DynamicValueRegistry; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.tez.runtime.api.Input; +import org.apache.tez.runtime.api.LogicalInput; +import org.apache.tez.runtime.api.ProcessorContext; +import org.apache.tez.runtime.library.api.KeyValueReader; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DynamicValueRegistryTez implements DynamicValueRegistry { + private static final Logger LOG = LoggerFactory.getLogger(DynamicValueRegistryTez.class); + + public static class RegistryConfTez extends RegistryConf { + public Configuration conf; + public BaseWork baseWork; + public ProcessorContext processorContext; + public Map inputs; + + public RegistryConfTez(Configuration conf, BaseWork baseWork, + ProcessorContext processorContext, Map inputs) { + super(); + this.conf = conf; + this.baseWork = baseWork; + this.processorContext = processorContext; + this.inputs = inputs; + } + } + + protected Map values = Collections.synchronizedMap(new HashMap()); + + public DynamicValueRegistryTez() { + } + + @Override + public Object getValue(String key) { + if (!values.containsKey(key)) { + throw new IllegalStateException("Value does not exist in registry: " + key); + } + return values.get(key); + } + + protected void setValue(String key, Object value) { + values.put(key, value); + } + + @Override + public void init(RegistryConf conf) throws Exception { + RegistryConfTez rct = (RegistryConfTez) conf; + + for (String inputSourceName : rct.baseWork.getInputSourceToRuntimeValuesInfo().keySet()) { + LOG.info("Runtime value source: " + inputSourceName); + + LogicalInput runtimeValueInput = rct.inputs.get(inputSourceName); + RuntimeValuesInfo runtimeValuesInfo = rct.baseWork.getInputSourceToRuntimeValuesInfo().get(inputSourceName); + + // Setup deserializer/obj inspectors for the incoming data source + Deserializer deserializer = ReflectionUtils.newInstance(runtimeValuesInfo.getTableDesc().getDeserializerClass(), null); + deserializer.initialize(rct.conf, runtimeValuesInfo.getTableDesc().getProperties()); + ObjectInspector inspector = deserializer.getObjectInspector(); + + // Set up col expressions for the dynamic values using this input + List colExprEvaluators = new ArrayList(); + for (ExprNodeDesc expr : runtimeValuesInfo.getColExprs()) { + ExprNodeEvaluator exprEval = ExprNodeEvaluatorFactory.get(expr, null); + exprEval.initialize(inspector); + colExprEvaluators.add(exprEval); + } + + runtimeValueInput.start(); + List inputList = new ArrayList(); + inputList.add(runtimeValueInput); + rct.processorContext.waitForAllInputsReady(inputList); + + KeyValueReader kvReader = (KeyValueReader) runtimeValueInput.getReader(); + long rowCount = 0; + while (kvReader.next()) { + Object row = deserializer.deserialize((Writable) kvReader.getCurrentValue()); + rowCount++; + for (int colIdx = 0; colIdx < colExprEvaluators.size(); ++colIdx) { + // Read each expression and save it to the value registry + ExprNodeEvaluator eval = colExprEvaluators.get(colIdx); + Object val = eval.evaluate(row); + setValue(runtimeValuesInfo.getDynamicValueIDs().get(colIdx), val); + } + } + // For now, expecting a single row (min/max, aggregated bloom filter) + if (rowCount != 1) { + throw new IllegalStateException("Expected 1 row from " + inputSourceName + ", got " + rowCount); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java index 0141230..1ce8ee9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java @@ -60,6 +60,24 @@ public class LlapObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCac @SuppressWarnings("unchecked") @Override + public T retrieve(String key) throws HiveException { + + T value = null; + + lock.lock(); + try { + value = (T) registry.getIfPresent(key); + if (value != null && isLogDebugEnabled) { + LOG.debug("Found " + key + " in cache"); + } + return value; + } finally { + lock.unlock(); + } + } + + @SuppressWarnings("unchecked") + @Override public T retrieve(String key, Callable fn) throws HiveException { T value = null; http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index 955fa80..790c9d8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -51,11 +51,13 @@ import org.apache.hadoop.hive.ql.exec.TezDummyStoreOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.exec.tez.DynamicValueRegistryTez.RegistryConfTez; import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector; import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger; import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.DynamicValue; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.serde2.Deserializer; @@ -88,8 +90,8 @@ public class MapRecordProcessor extends RecordProcessor { private final ExecMapperContext execContext; private MapWork mapWork; List mergeWorkList; - List cacheKeys; - ObjectCache cache; + List cacheKeys, dynamicValueCacheKeys; + ObjectCache cache, dynamicValueCache; private int nRows; public MapRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception { @@ -99,9 +101,11 @@ public class MapRecordProcessor extends RecordProcessor { setLlapOfFragmentId(context); } cache = ObjectCacheFactory.getCache(jconf, queryId, true); + dynamicValueCache = ObjectCacheFactory.getCache(jconf, queryId, false); execContext = new ExecMapperContext(jconf); execContext.setJc(jconf); cacheKeys = new ArrayList(); + dynamicValueCacheKeys = new ArrayList(); nRows = 0; } @@ -295,6 +299,21 @@ public class MapRecordProcessor extends RecordProcessor { mapOp.initializeLocalWork(jconf); + // Setup values registry + checkAbortCondition(); + String valueRegistryKey = DynamicValue.DYNAMIC_VALUE_REGISTRY_CACHE_KEY; + // On LLAP dynamic value registry might already be cached. + final DynamicValueRegistryTez registryTez = dynamicValueCache.retrieve(valueRegistryKey, + new Callable() { + @Override + public DynamicValueRegistryTez call() { + return new DynamicValueRegistryTez(); + } + }); + dynamicValueCacheKeys.add(valueRegistryKey); + RegistryConfTez registryConf = new RegistryConfTez(jconf, mapWork, processorContext, inputs); + registryTez.init(registryConf); + checkAbortCondition(); initializeMapRecordSources(); mapOp.initializeMapOperator(jconf); @@ -435,6 +454,12 @@ public class MapRecordProcessor extends RecordProcessor { } } + if (dynamicValueCache != null && dynamicValueCacheKeys != null) { + for (String k: dynamicValueCacheKeys) { + dynamicValueCache.release(k); + } + } + // detecting failed executions by exceptions thrown by the operator tree try { if (mapOp == null || mapWork == null) { http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java index 06dca00..72dcdd3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java @@ -65,6 +65,22 @@ public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache { LOG.info("Releasing key: " + key); } + + @SuppressWarnings("unchecked") + @Override + public T retrieve(String key) throws HiveException { + T value = null; + try { + value = (T) registry.get(key); + if ( value != null) { + LOG.info("Found " + key + " in cache with value: " + value); + } + } catch (Exception e) { + throw new HiveException(e); + } + return value; + } + @SuppressWarnings("unchecked") @Override public T retrieve(String key, Callable fn) throws HiveException { http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index d80f201..2d06545 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -40,9 +40,11 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats; +import org.apache.hadoop.hive.ql.exec.tez.DynamicValueRegistryTez.RegistryConfTez; import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.DynamicValue; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -64,14 +66,14 @@ public class ReduceRecordProcessor extends RecordProcessor{ private static final String REDUCE_PLAN_KEY = "__REDUCE_PLAN__"; - private ObjectCache cache; + private ObjectCache cache, dynamicValueCache; public static final Logger l4j = LoggerFactory.getLogger(ReduceRecordProcessor.class); private ReduceWork reduceWork; List mergeWorkList = null; - List cacheKeys; + List cacheKeys, dynamicValueCacheKeys; private final Map connectOps = new TreeMap(); @@ -91,9 +93,11 @@ public class ReduceRecordProcessor extends RecordProcessor{ String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID); cache = ObjectCacheFactory.getCache(jconf, queryId, true); + dynamicValueCache = ObjectCacheFactory.getCache(jconf, queryId, false); String cacheKey = processorContext.getTaskVertexName() + REDUCE_PLAN_KEY; cacheKeys = Lists.newArrayList(cacheKey); + dynamicValueCacheKeys = new ArrayList(); reduceWork = (ReduceWork) cache.retrieve(cacheKey, new Callable() { @Override public Object call() { @@ -169,6 +173,21 @@ public class ReduceRecordProcessor extends RecordProcessor{ l4j.info("Memory available for operators set to {}", LlapUtil.humanReadableByteCount(memoryAvailableToTask)); } OperatorUtils.setMemoryAvailable(reducer.getChildOperators(), memoryAvailableToTask); + + // Setup values registry + String valueRegistryKey = DynamicValue.DYNAMIC_VALUE_REGISTRY_CACHE_KEY; + DynamicValueRegistryTez registryTez = dynamicValueCache.retrieve(valueRegistryKey, + new Callable() { + @Override + public DynamicValueRegistryTez call() { + return new DynamicValueRegistryTez(); + } + }); + dynamicValueCacheKeys.add(valueRegistryKey); + RegistryConfTez registryConf = new RegistryConfTez(jconf, reduceWork, processorContext, inputs); + registryTez.init(registryConf); + checkAbortCondition(); + if (numTags > 1) { sources = new ReduceRecordSource[numTags]; mainWorkOIs = new ObjectInspector[numTags]; @@ -348,6 +367,12 @@ public class ReduceRecordProcessor extends RecordProcessor{ } } + if (dynamicValueCache != null && dynamicValueCacheKeys != null) { + for (String k: dynamicValueCacheKeys) { + dynamicValueCache.release(k); + } + } + try { for (ReduceRecordSource rs: sources) { abort = abort && rs.close(); http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java index 0cb6c8a..848fc8e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java @@ -153,7 +153,7 @@ public class VectorMapJoinOperator extends VectorMapJoinBaseOperator { VectorExpression vectorExpr = bigTableValueExpressions[i]; // This is a vectorized aware evaluator - ExprNodeEvaluator eval = new ExprNodeEvaluator(desc) { + ExprNodeEvaluator eval = new ExprNodeEvaluator(desc, hconf) { int columnIndex; int writerIndex; http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java index 80b0a14..ac3363e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java @@ -207,7 +207,7 @@ public class VectorSMBMapJoinOperator extends SMBMapJoinOperator implements Vect VectorExpression vectorExpr = bigTableValueExpressions[i]; // This is a vectorized aware evaluator - ExprNodeEvaluator eval = new ExprNodeEvaluator(desc) { + ExprNodeEvaluator eval = new ExprNodeEvaluator(desc, hconf) { int columnIndex;; int writerIndex; http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/ConvertAstToSearchArg.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/ConvertAstToSearchArg.java b/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/ConvertAstToSearchArg.java index 9d900e4..997334b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/ConvertAstToSearchArg.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/ConvertAstToSearchArg.java @@ -27,9 +27,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.type.HiveChar; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; +import org.apache.hadoop.hive.ql.io.sarg.LiteralDelegate; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween; @@ -58,14 +60,16 @@ import com.esotericsoftware.kryo.io.Input; public class ConvertAstToSearchArg { private static final Logger LOG = LoggerFactory.getLogger(ConvertAstToSearchArg.class); - private final SearchArgument.Builder builder = - SearchArgumentFactory.newBuilder(); + private final SearchArgument.Builder builder; + private final Configuration conf; /** * Builds the expression and leaf list from the original predicate. * @param expression the expression to translate. */ - ConvertAstToSearchArg(ExprNodeGenericFuncDesc expression) { + ConvertAstToSearchArg(Configuration conf, ExprNodeGenericFuncDesc expression) { + this.conf = conf; + builder = SearchArgumentFactory.newBuilder(conf); parse(expression); } @@ -182,7 +186,7 @@ public class ConvertAstToSearchArg { * @param type the type of the expression * @return the literal boxed if found or null */ - private static Object findLiteral(ExprNodeGenericFuncDesc expr, + private static Object findLiteral(Configuration conf, ExprNodeGenericFuncDesc expr, PredicateLeaf.Type type) { List children = expr.getChildren(); if (children.size() != 2) { @@ -190,16 +194,29 @@ public class ConvertAstToSearchArg { } Object result = null; for(ExprNodeDesc child: children) { - if (child instanceof ExprNodeConstantDesc) { + Object currentResult = getLiteral(conf, child, type); + if (currentResult != null) { + // Both children in the expression should not be literal if (result != null) { return null; } - result = boxLiteral((ExprNodeConstantDesc) child, type); + result = currentResult; } } return result; } + private static Object getLiteral(Configuration conf, ExprNodeDesc child, PredicateLeaf.Type type) { + if (child instanceof ExprNodeConstantDesc) { + return boxLiteral((ExprNodeConstantDesc) child, type); + } else if (child instanceof ExprNodeDynamicValueDesc) { + LiteralDelegate value = ((ExprNodeDynamicValueDesc) child).getDynamicValue(); + value.setConf(conf); + return value; + } + return null; + } + /** * Return the boxed literal at the given position * @param expr the parent node @@ -207,15 +224,12 @@ public class ConvertAstToSearchArg { * @param position the child position to check * @return the boxed literal if found otherwise null */ - private static Object getLiteral(ExprNodeGenericFuncDesc expr, + private static Object getLiteral(Configuration conf, ExprNodeGenericFuncDesc expr, PredicateLeaf.Type type, int position) { List children = expr.getChildren(); - Object child = children.get(position); - if (child instanceof ExprNodeConstantDesc) { - return boxLiteral((ExprNodeConstantDesc) child, type); - } - return null; + ExprNodeDesc child = children.get(position); + return getLiteral(conf, child, type); } private static Object[] getLiteralList(ExprNodeGenericFuncDesc expr, @@ -272,16 +286,16 @@ public class ConvertAstToSearchArg { builder.isNull(columnName, type); break; case EQUALS: - builder.equals(columnName, type, findLiteral(expression, type)); + builder.equals(columnName, type, findLiteral(conf, expression, type)); break; case NULL_SAFE_EQUALS: - builder.nullSafeEquals(columnName, type, findLiteral(expression, type)); + builder.nullSafeEquals(columnName, type, findLiteral(conf, expression, type)); break; case LESS_THAN: - builder.lessThan(columnName, type, findLiteral(expression, type)); + builder.lessThan(columnName, type, findLiteral(conf, expression, type)); break; case LESS_THAN_EQUALS: - builder.lessThanEquals(columnName, type, findLiteral(expression, type)); + builder.lessThanEquals(columnName, type, findLiteral(conf, expression, type)); break; case IN: builder.in(columnName, type, @@ -289,8 +303,8 @@ public class ConvertAstToSearchArg { break; case BETWEEN: builder.between(columnName, type, - getLiteral(expression, type, variable + 1), - getLiteral(expression, type, variable + 2)); + getLiteral(conf, expression, type, variable + 1), + getLiteral(conf, expression, type, variable + 2)); break; } } catch (Exception e) { @@ -425,8 +439,8 @@ public class ConvertAstToSearchArg { public static final String SARG_PUSHDOWN = "sarg.pushdown"; - public static SearchArgument create(ExprNodeGenericFuncDesc expression) { - return new ConvertAstToSearchArg(expression).buildSearchArgument(); + public static SearchArgument create(Configuration conf, ExprNodeGenericFuncDesc expression) { + return new ConvertAstToSearchArg(conf, expression).buildSearchArgument(); } @@ -445,7 +459,7 @@ public class ConvertAstToSearchArg { public static SearchArgument createFromConf(Configuration conf) { String sargString; if ((sargString = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR)) != null) { - return create(SerializationUtilities.deserializeExpression(sargString)); + return create(conf, SerializationUtilities.deserializeExpression(sargString)); } else if ((sargString = conf.get(SARG_PUSHDOWN)) != null) { return create(sargString); } http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java index 26fcc45..c8691e8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java @@ -28,13 +28,8 @@ import java.util.Stack; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.ql.exec.FilterOperator; -import org.apache.hadoop.hive.ql.exec.GroupByOperator; -import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.OperatorFactory; -import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; -import org.apache.hadoop.hive.ql.exec.SelectOperator; -import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.*; +import org.apache.hadoop.hive.ql.io.AcidUtils.Operation; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; @@ -50,20 +45,19 @@ import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; +import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext; -import org.apache.hadoop.hive.ql.plan.AggregationDesc; -import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; -import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; -import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; -import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; -import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; -import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicListDesc; -import org.apache.hadoop.hive.ql.plan.FilterDesc; -import org.apache.hadoop.hive.ql.plan.GroupByDesc; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import org.apache.hadoop.hive.ql.plan.PlanUtils; -import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.ql.plan.*; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -148,15 +142,13 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor { FilterOperator filter = (FilterOperator) nd; FilterDesc desc = filter.getConf(); - TableScanOperator ts = null; - if (!parseContext.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING) && !parseContext.getConf().getBoolVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING)) { // nothing to do when the optimization is off return null; } - DynamicPartitionPrunerContext removerContext = new DynamicPartitionPrunerContext(); + TableScanOperator ts = null; if (filter.getParentOperators().size() == 1 && filter.getParentOperators().get(0) instanceof TableScanOperator) { @@ -169,14 +161,32 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor { LOG.debug("TableScan: " + ts); } + DynamicPartitionPrunerContext removerContext = new DynamicPartitionPrunerContext(); + // collect the dynamic pruning conditions removerContext.dynLists.clear(); collectDynamicPruningConditions(desc.getPredicate(), removerContext); + if (ts == null) { + // Replace the synthetic predicate with true and bail out + for (DynamicListContext ctx : removerContext) { + ExprNodeDesc constNode = + new ExprNodeConstantDesc(ctx.parent.getTypeInfo(), true); + replaceExprNode(ctx, desc, constNode); + } + return false; + } + + final boolean semiJoin = parseContext.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION); + for (DynamicListContext ctx : removerContext) { String column = ExprNodeDescUtils.extractColName(ctx.parent); + boolean semiJoinAttempted = false; + + if (column != null) { + // Need unique IDs to refer to each min/max key value in the DynamicValueRegistry + String keyBaseAlias = ""; - if (ts != null && column != null) { Table table = ts.getConf().getTableMetadata(); if (table != null && table.isPartitionKey(column)) { @@ -203,20 +213,56 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor { } } else { LOG.debug("Column " + column + " is not a partition column"); + if (semiJoin && ts.getConf().getFilterExpr() != null) { + LOG.debug("Initiate semijoin reduction for " + column); + // Get the table name from which the min-max values will come. + Operator op = ctx.generator; + while (!(op == null || op instanceof TableScanOperator)) { + op = op.getParentOperators().get(0); + } + String tableAlias = (op == null ? "" : ((TableScanOperator) op).getConf().getAlias()); + keyBaseAlias = ctx.generator.getOperatorId() + "_" + tableAlias + "_" + column; + + semiJoinAttempted = generateSemiJoinOperatorPlan(ctx, parseContext, ts, keyBaseAlias); + } } - } - // we always remove the condition by replacing it with "true" - ExprNodeDesc constNode = new ExprNodeConstantDesc(ctx.parent.getTypeInfo(), true); - if (ctx.grandParent == null) { - desc.setPredicate(constNode); + // If semijoin is attempted then replace the condition with a min-max filter + // and bloom filter else, + // we always remove the condition by replacing it with "true" + if (semiJoinAttempted) { + List betweenArgs = new ArrayList(); + betweenArgs.add(new ExprNodeConstantDesc(Boolean.FALSE)); // Do not invert between result + // add column expression here + betweenArgs.add(ctx.parent.getChildren().get(0)); + betweenArgs.add(new ExprNodeDynamicValueDesc(new DynamicValue(keyBaseAlias + "_min", ctx.desc.getTypeInfo()))); + betweenArgs.add(new ExprNodeDynamicValueDesc(new DynamicValue(keyBaseAlias + "_max", ctx.desc.getTypeInfo()))); + ExprNodeDesc betweenNode = ExprNodeGenericFuncDesc.newInstance( + FunctionRegistry.getFunctionInfo("between").getGenericUDF(), betweenArgs); + replaceExprNode(ctx, desc, betweenNode); + // add column expression for bloom filter + List bloomFilterArgs = new ArrayList(); + bloomFilterArgs.add(ctx.parent.getChildren().get(0)); + bloomFilterArgs.add(new ExprNodeDynamicValueDesc( + new DynamicValue(keyBaseAlias + "_bloom_filter", + TypeInfoFactory.binaryTypeInfo))); + ExprNodeDesc bloomFilterNode = ExprNodeGenericFuncDesc.newInstance( + FunctionRegistry.getFunctionInfo("in_bloom_filter"). + getGenericUDF(), bloomFilterArgs); + // ctx may not have the grandparent but it is set in filterDesc by now. + ExprNodeDesc grandParent = ctx.grandParent == null ? + desc.getPredicate() : ctx.grandParent; + grandParent.getChildren().add(bloomFilterNode); + } else { + ExprNodeDesc replaceNode = new ExprNodeConstantDesc(ctx.parent.getTypeInfo(), true); + replaceExprNode(ctx, desc, replaceNode); + } } else { - int i = ctx.grandParent.getChildren().indexOf(ctx.parent); - ctx.grandParent.getChildren().remove(i); - ctx.grandParent.getChildren().add(i, constNode); + ExprNodeDesc constNode = + new ExprNodeConstantDesc(ctx.parent.getTypeInfo(), true); + replaceExprNode(ctx, desc, constNode); } } - // if we pushed the predicate into the table scan we need to remove the // synthetic conditions there. cleanTableScanFilters(ts); @@ -224,6 +270,16 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor { return false; } + private void replaceExprNode(DynamicListContext ctx, FilterDesc desc, ExprNodeDesc node) { + if (ctx.grandParent == null) { + desc.setPredicate(node); + } else { + int i = ctx.grandParent.getChildren().indexOf(ctx.parent); + ctx.grandParent.getChildren().remove(i); + ctx.grandParent.getChildren().add(i, node); + } + } + private void cleanTableScanFilters(TableScanOperator ts) throws SemanticException { if (ts == null || ts.getConf() == null || ts.getConf().getFilterExpr() == null) { @@ -327,6 +383,228 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor { } } + // Generates plan for min/max when dynamic partition pruning is ruled out. + private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContext parseContext, + TableScanOperator ts, String keyBaseAlias) throws SemanticException { + + // we will put a fork in the plan at the source of the reduce sink + Operator parentOfRS = ctx.generator.getParentOperators().get(0); + + // we need the expr that generated the key of the reduce sink + ExprNodeDesc key = ctx.generator.getConf().getKeyCols().get(ctx.desc.getKeyIndex()); + + if (parentOfRS instanceof SelectOperator) { + // Make sure the semijoin branch is not on parition column. + String internalColName = null; + ExprNodeDesc exprNodeDesc = key; + // Find the ExprNodeColumnDesc + while (!(exprNodeDesc instanceof ExprNodeColumnDesc)) { + exprNodeDesc = exprNodeDesc.getChildren().get(0); + } + internalColName = ((ExprNodeColumnDesc) exprNodeDesc).getColumn(); + + ExprNodeColumnDesc colExpr = ((ExprNodeColumnDesc)(parentOfRS. + getColumnExprMap().get(internalColName))); + String colName = ExprNodeDescUtils.extractColName(colExpr); + + // Fetch the TableScan Operator. + Operator op = parentOfRS.getParentOperators().get(0); + while (op != null && !(op instanceof TableScanOperator)) { + op = op.getParentOperators().get(0); + } + assert op != null; + + Table table = ((TableScanOperator) op).getConf().getTableMetadata(); + if (table.isPartitionKey(colName)) { + // The column is partition column, skip the optimization. + return false; + } + } + List keyExprs = new ArrayList(); + keyExprs.add(key); + + // group by requires "ArrayList", don't ask. + ArrayList outputNames = new ArrayList(); + outputNames.add(HiveConf.getColumnInternalName(0)); + + // project the relevant key column + SelectDesc select = new SelectDesc(keyExprs, outputNames); + SelectOperator selectOp = + (SelectOperator) OperatorFactory.getAndMakeChild(select, + new RowSchema(parentOfRS.getSchema()), parentOfRS); + + // do a group by to aggregate min,max and bloom filter. + float groupByMemoryUsage = + HiveConf.getFloatVar(parseContext.getConf(), HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); + float memoryThreshold = + HiveConf.getFloatVar(parseContext.getConf(), + HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); + + ArrayList groupByExprs = new ArrayList(); + + // Add min/max and bloom filter aggregations + List aggFnOIs = new ArrayList(); + aggFnOIs.add(key.getWritableObjectInspector()); + ArrayList params = new ArrayList(); + params.add( + new ExprNodeColumnDesc(key.getTypeInfo(), outputNames.get(0), + "", false)); + + ArrayList aggs = new ArrayList(); + try { + AggregationDesc min = new AggregationDesc("min", + FunctionRegistry.getGenericUDAFEvaluator("min", aggFnOIs, false, false), + params, false, Mode.PARTIAL1); + AggregationDesc max = new AggregationDesc("max", + FunctionRegistry.getGenericUDAFEvaluator("max", aggFnOIs, false, false), + params, false, Mode.PARTIAL1); + AggregationDesc bloomFilter = new AggregationDesc("bloom_filter", + FunctionRegistry.getGenericUDAFEvaluator("bloom_filter", aggFnOIs, false, false), + params, false, Mode.PARTIAL1); + GenericUDAFBloomFilterEvaluator bloomFilterEval = (GenericUDAFBloomFilterEvaluator) bloomFilter.getGenericUDAFEvaluator(); + bloomFilterEval.setSourceOperator(selectOp); + bloomFilterEval.setMaxEntries(parseContext.getConf().getLongVar(ConfVars.TEZ_MAX_BLOOM_FILTER_ENTRIES)); + bloomFilter.setGenericUDAFWritableEvaluator(bloomFilterEval); + aggs.add(min); + aggs.add(max); + aggs.add(bloomFilter); + } catch (SemanticException e) { + LOG.error("Error creating min/max aggregations on key", e); + throw new IllegalStateException("Error creating min/max aggregations on key", e); + } + + // Create the Group by Operator + ArrayList gbOutputNames = new ArrayList(); + gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(0)); + gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(1)); + gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(2)); + GroupByDesc groupBy = new GroupByDesc(GroupByDesc.Mode.HASH, + gbOutputNames, new ArrayList(), aggs, false, + groupByMemoryUsage, memoryThreshold, null, false, 0, false); + + ArrayList groupbyColInfos = new ArrayList(); + groupbyColInfos.add(new ColumnInfo(gbOutputNames.get(0), key.getTypeInfo(), "", false)); + groupbyColInfos.add(new ColumnInfo(gbOutputNames.get(1), key.getTypeInfo(), "", false)); + groupbyColInfos.add(new ColumnInfo(gbOutputNames.get(2), key.getTypeInfo(), "", false)); + + GroupByOperator groupByOp = (GroupByOperator)OperatorFactory.getAndMakeChild( + groupBy, new RowSchema(groupbyColInfos), selectOp); + + groupByOp.setColumnExprMap(new HashMap()); + + // Get the column names of the aggregations for reduce sink + int colPos = 0; + ArrayList rsValueCols = new ArrayList(); + for (int i = 0; i < aggs.size() - 1; i++) { + ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(key.getTypeInfo(), + gbOutputNames.get(colPos++), "", false); + rsValueCols.add(colExpr); + } + + // Bloom Filter uses binary + ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(TypeInfoFactory.binaryTypeInfo, + gbOutputNames.get(colPos++), "", false); + rsValueCols.add(colExpr); + + // Create the reduce sink operator + ReduceSinkDesc rsDesc = PlanUtils.getReduceSinkDesc( + new ArrayList(), rsValueCols, gbOutputNames, false, + -1, 0, 1, Operation.NOT_ACID); + ReduceSinkOperator rsOp = (ReduceSinkOperator)OperatorFactory.getAndMakeChild( + rsDesc, new RowSchema(groupByOp.getSchema()), groupByOp); + Map columnExprMap = new HashMap(); + rsOp.setColumnExprMap(columnExprMap); + + // Create the final Group By Operator + ArrayList aggsFinal = new ArrayList(); + try { + List minFinalFnOIs = new ArrayList(); + List maxFinalFnOIs = new ArrayList(); + List bloomFilterFinalFnOIs = new ArrayList(); + ArrayList minFinalParams = new ArrayList(); + ArrayList maxFinalParams = new ArrayList(); + ArrayList bloomFilterFinalParams = new ArrayList(); + // Use the expressions from Reduce Sink. + minFinalFnOIs.add(rsValueCols.get(0).getWritableObjectInspector()); + maxFinalFnOIs.add(rsValueCols.get(1).getWritableObjectInspector()); + bloomFilterFinalFnOIs.add(rsValueCols.get(2).getWritableObjectInspector()); + // Coming from a ReduceSink the aggregations would be in the form VALUE._col0, VALUE._col1 + minFinalParams.add( + new ExprNodeColumnDesc( + rsValueCols.get(0).getTypeInfo(), + Utilities.ReduceField.VALUE + "." + + gbOutputNames.get(0), "", false)); + maxFinalParams.add( + new ExprNodeColumnDesc( + rsValueCols.get(1).getTypeInfo(), + Utilities.ReduceField.VALUE + "." + + gbOutputNames.get(1), "", false)); + bloomFilterFinalParams.add( + new ExprNodeColumnDesc( + rsValueCols.get(2).getTypeInfo(), + Utilities.ReduceField.VALUE + "." + + gbOutputNames.get(2), "", false)); + + AggregationDesc min = new AggregationDesc("min", + FunctionRegistry.getGenericUDAFEvaluator("min", minFinalFnOIs, + false, false), + minFinalParams, false, Mode.FINAL); + AggregationDesc max = new AggregationDesc("max", + FunctionRegistry.getGenericUDAFEvaluator("max", maxFinalFnOIs, + false, false), + maxFinalParams, false, Mode.FINAL); + AggregationDesc bloomFilter = new AggregationDesc("bloom_filter", + FunctionRegistry.getGenericUDAFEvaluator("bloom_filter", bloomFilterFinalFnOIs, + false, false), + bloomFilterFinalParams, false, Mode.FINAL); + GenericUDAFBloomFilterEvaluator bloomFilterEval = (GenericUDAFBloomFilterEvaluator) bloomFilter.getGenericUDAFEvaluator(); + bloomFilterEval.setSourceOperator(selectOp); + bloomFilterEval.setMaxEntries(parseContext.getConf().getLongVar(ConfVars.TEZ_MAX_BLOOM_FILTER_ENTRIES)); + bloomFilter.setGenericUDAFWritableEvaluator(bloomFilterEval); + + aggsFinal.add(min); + aggsFinal.add(max); + aggsFinal.add(bloomFilter); + } catch (SemanticException e) { + LOG.error("Error creating min/max aggregations on key", e); + throw new IllegalStateException("Error creating min/max aggregations on key", e); + } + + GroupByDesc groupByDescFinal = new GroupByDesc(GroupByDesc.Mode.FINAL, + gbOutputNames, new ArrayList(), aggsFinal, false, + groupByMemoryUsage, memoryThreshold, null, false, 0, false); + GroupByOperator groupByOpFinal = (GroupByOperator)OperatorFactory.getAndMakeChild( + groupByDescFinal, new RowSchema(rsOp.getSchema()), rsOp); + groupByOpFinal.setColumnExprMap(new HashMap()); + + // Create the final Reduce Sink Operator + ReduceSinkDesc rsDescFinal = PlanUtils.getReduceSinkDesc( + new ArrayList(), rsValueCols, gbOutputNames, false, + -1, 0, 1, Operation.NOT_ACID); + ReduceSinkOperator rsOpFinal = (ReduceSinkOperator)OperatorFactory.getAndMakeChild( + rsDescFinal, new RowSchema(groupByOpFinal.getSchema()), groupByOpFinal); + rsOpFinal.setColumnExprMap(columnExprMap); + + LOG.debug("DynamicMinMaxPushdown: Saving RS to TS mapping: " + rsOpFinal + ": " + ts); + parseContext.getRsOpToTsOpMap().put(rsOpFinal, ts); + + // Save the info that is required at query time to resolve dynamic/runtime values. + RuntimeValuesInfo runtimeValuesInfo = new RuntimeValuesInfo(); + TableDesc rsFinalTableDesc = PlanUtils.getReduceValueTableDesc( + PlanUtils.getFieldSchemasFromColumnList(rsValueCols, "_col")); + List dynamicValueIDs = new ArrayList(); + dynamicValueIDs.add(keyBaseAlias + "_min"); + dynamicValueIDs.add(keyBaseAlias + "_max"); + dynamicValueIDs.add(keyBaseAlias + "_bloom_filter"); + + runtimeValuesInfo.setTableDesc(rsFinalTableDesc); + runtimeValuesInfo.setDynamicValueIDs(dynamicValueIDs); + runtimeValuesInfo.setColExprs(rsValueCols); + parseContext.getRsToRuntimeValuesInfoMap().put(rsOpFinal, runtimeValuesInfo); + + return true; + } + private Map collectDynamicPruningConditions(ExprNodeDesc pred, NodeProcessorCtx ctx) throws SemanticException { http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java index 9e9beb0..b853a06 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java @@ -135,7 +135,7 @@ public class FixedBucketPruningOptimizer extends Transform { return; } // the sargs are closely tied to hive.optimize.index.filter - SearchArgument sarg = ConvertAstToSearchArg.create(filter); + SearchArgument sarg = ConvertAstToSearchArg.create(ctxt.pctx.getConf(), filter); if (sarg == null) { return; }