Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C409A10184 for ; Wed, 20 May 2015 16:01:26 +0000 (UTC) Received: (qmail 31955 invoked by uid 500); 20 May 2015 16:01:22 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 31901 invoked by uid 500); 20 May 2015 16:01:22 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 30939 invoked by uid 99); 20 May 2015 16:01:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 May 2015 16:01:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A22F6DFF6A; Wed, 20 May 2015 16:01:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: spena@apache.org To: commits@hive.apache.org Date: Wed, 20 May 2015 16:01:53 -0000 Message-Id: <18ee1d01c89f4948acab79cbc59b75c2@git.apache.org> In-Reply-To: <84cd9115ef1a4f41a6069936b2a085f3@git.apache.org> References: <84cd9115ef1a4f41a6069936b2a085f3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [34/50] [abbrv] hive git commit: HIVE-10627: CBO - Queries fail with Failed to breakup Windowing invocations into Groups (Jesus Camacho Rodriguez via Laljo John Pullokkaran) HIVE-10627: CBO - Queries fail with Failed to breakup Windowing invocations into Groups (Jesus Camacho Rodriguez via Laljo John Pullokkaran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f4923cee Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f4923cee Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f4923cee Branch: refs/heads/parquet Commit: f4923ceeb3ea8ec544ae55236b65620307ce233b Parents: c640a38 Author: jpullokk Authored: Mon May 18 12:34:14 2015 -0700 Committer: jpullokk Committed: Mon May 18 12:34:14 2015 -0700 ---------------------------------------------------------------------- .../calcite/rules/HiveWindowingFixRule.java | 163 +++++++++++++++++++ .../calcite/translator/ASTConverter.java | 2 - .../hadoop/hive/ql/parse/CalcitePlanner.java | 11 ++ 3 files changed, 174 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f4923cee/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveWindowingFixRule.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveWindowingFixRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveWindowingFixRule.java new file mode 100644 index 0000000..ff203d3 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveWindowingFixRule.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.RelFactories.ProjectFactory; +import org.apache.calcite.rex.RexFieldCollation; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexOver; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; + +/** + * Rule to fix windowing issue when it is done over + * aggregation columns (more info in HIVE-10627). + * + * This rule is applied as a post-processing step after + * optimization by Calcite in order to add columns + * that may be pruned by RelFieldTrimmer, but are + * still needed due to the concrete implementation of + * Windowing processing in Hive. + */ +public class HiveWindowingFixRule extends RelOptRule { + + public static final HiveWindowingFixRule INSTANCE = new HiveWindowingFixRule(); + + private final ProjectFactory projectFactory; + + + private HiveWindowingFixRule() { + super( + operand(Project.class, + operand(Aggregate.class, any()))); + this.projectFactory = HiveProject.DEFAULT_PROJECT_FACTORY; + } + + @Override + public void onMatch(RelOptRuleCall call) { + Project project = call.rel(0); + Aggregate aggregate = call.rel(1); + + // 1. We go over the expressions in the project operator + // and we separate the windowing nodes that are result + // of an aggregate expression from the rest of nodes + final int groupingFields = aggregate.getGroupCount() + aggregate.getIndicatorCount(); + Set projectExprsDigest = new HashSet(); + Map windowingExprsDigestToNodes = new HashMap(); + for (RexNode r : project.getChildExps()) { + if (r instanceof RexOver) { + RexOver rexOverNode = (RexOver) r; + // Operands + for (RexNode operand : rexOverNode.getOperands()) { + if (operand instanceof RexInputRef && + ((RexInputRef)operand).getIndex() >= groupingFields) { + windowingExprsDigestToNodes.put(operand.toString(), operand); + } + } + // Partition keys + for (RexNode partitionKey : rexOverNode.getWindow().partitionKeys) { + if (partitionKey instanceof RexInputRef && + ((RexInputRef)partitionKey).getIndex() >= groupingFields) { + windowingExprsDigestToNodes.put(partitionKey.toString(), partitionKey); + } + } + // Order keys + for (RexFieldCollation orderKey : rexOverNode.getWindow().orderKeys) { + if (orderKey.left instanceof RexInputRef && + ((RexInputRef)orderKey.left).getIndex() >= groupingFields) { + windowingExprsDigestToNodes.put(orderKey.left.toString(), orderKey.left); + } + } + } else { + projectExprsDigest.add(r.toString()); + } + } + + // 2. We check whether there is a column needed by the + // windowing operation that is missing in the + // project expressions. For instance, if the windowing + // operation is over an aggregation column, Hive expects + // that column to be in the Select clause of the query. + // The idea is that if there is a column missing, we will + // replace the old project operator by two new project + // operators: + // - a project operator containing the original columns + // of the project operator plus all the columns that were + // missing + // - a project on top of the previous one, that will take + // out the columns that were missing and were added by the + // previous project + + // These data structures are needed to create the new project + // operator (below) + final List belowProjectExprs = new ArrayList(); + final List belowProjectColumnNames = new ArrayList(); + + // This data structure is needed to create the new project + // operator (top) + final List topProjectExprs = new ArrayList(); + + final int projectCount = project.getChildExps().size(); + for (int i = 0; i < projectCount; i++) { + belowProjectExprs.add(project.getChildExps().get(i)); + belowProjectColumnNames.add(project.getRowType().getFieldNames().get(i)); + topProjectExprs.add(RexInputRef.of(i, project.getRowType())); + } + boolean windowingFix = false; + for (Entry windowingExpr : windowingExprsDigestToNodes.entrySet()) { + if (!projectExprsDigest.contains(windowingExpr.getKey())) { + windowingFix = true; + belowProjectExprs.add(windowingExpr.getValue()); + int colIndex = 0; + String alias = "window_col_" + colIndex; + while (belowProjectColumnNames.contains(alias)) { + alias = "window_col_" + (colIndex++); + } + belowProjectColumnNames.add(alias); + } + } + + if (!windowingFix) { + // We do not need to do anything, we bail out + return; + } + + // 3. We need to fix it, we create the two replacement project + // operators + RelNode newProjectRel = projectFactory.createProject( + aggregate, belowProjectExprs, belowProjectColumnNames); + RelNode newTopProjectRel = projectFactory.createProject( + newProjectRel, topProjectExprs, project.getRowType().getFieldNames()); + + call.transformTo(newTopProjectRel); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/f4923cee/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java index 0ada068..95f43d4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java @@ -54,10 +54,8 @@ import org.apache.calcite.util.ImmutableBitSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException; -import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveGroupingID; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSort; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; http://git-wip-us.apache.org/repos/asf/hive/blob/f4923cee/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 6e6923c..c412561 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -145,6 +145,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveInsertExchange4Join import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveJoinAddNotNullRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveJoinToMultiJoinRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HivePartitionPruneRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveWindowingFixRule; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ASTConverter; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.HiveOpConverter; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.JoinCondTypeCheckProcFactory; @@ -855,6 +856,16 @@ public class CalcitePlanner extends SemanticAnalyzer { calciteOptimizedPlan = hepPlanner.findBestExp(); + // run rule to fix windowing issue when it is done over + // aggregation columns (HIVE-10627) + hepPgmBldr = new HepProgramBuilder().addMatchOrder(HepMatchOrder.BOTTOM_UP); + hepPgmBldr.addRuleInstance(HiveWindowingFixRule.INSTANCE); + hepPlanner = new HepPlanner(hepPgmBldr.build()); + hepPlanner.registerMetadataProviders(list); + cluster.setMetadataProvider(new CachingRelMetadataProvider(chainedProvider, hepPlanner)); + hepPlanner.setRoot(calciteOptimizedPlan); + calciteOptimizedPlan = hepPlanner.findBestExp(); + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_CBO_RETPATH_HIVEOP)) { // run rules to aid in translation from Optiq tree -> Hive tree hepPgmBldr = new HepProgramBuilder().addMatchOrder(HepMatchOrder.BOTTOM_UP);