Return-Path: X-Original-To: apmail-asterixdb-commits-archive@minotaur.apache.org Delivered-To: apmail-asterixdb-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7BEDE18C37 for ; Tue, 25 Aug 2015 16:41:27 +0000 (UTC) Received: (qmail 69886 invoked by uid 500); 25 Aug 2015 16:41:27 -0000 Delivered-To: apmail-asterixdb-commits-archive@asterixdb.apache.org Received: (qmail 69853 invoked by uid 500); 25 Aug 2015 16:41:27 -0000 Mailing-List: contact commits-help@asterixdb.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.incubator.apache.org Delivered-To: mailing list commits@asterixdb.incubator.apache.org Received: (qmail 69844 invoked by uid 99); 25 Aug 2015 16:41:27 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Aug 2015 16:41:27 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id C4F8B1AACD1 for ; Tue, 25 Aug 2015 16:41:26 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.774 X-Spam-Level: * X-Spam-Status: No, score=1.774 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.006] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id SFkZy5ZNxVe1 for ; Tue, 25 Aug 2015 16:41:14 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id B330C261A5 for ; Tue, 25 Aug 2015 16:41:14 +0000 (UTC) Received: (qmail 69476 invoked by uid 99); 25 Aug 2015 16:41:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Aug 2015 16:41:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7A507E00D5; Tue, 25 Aug 2015 16:41:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: imaxon@apache.org To: commits@asterixdb.incubator.apache.org Date: Tue, 25 Aug 2015 16:41:22 -0000 Message-Id: <3c2d0002c0bd4bad9e2093fb15e3487c@git.apache.org> In-Reply-To: <339f74e1fc5c4aabb18d5e46ecf6ba11@git.apache.org> References: <339f74e1fc5c4aabb18d5e46ecf6ba11@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/51] [partial] incubator-asterixdb-hyracks git commit: Change folder structure for Java repackage http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java ---------------------------------------------------------------------- diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java new file mode 100644 index 0000000..2f080fb --- /dev/null +++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java @@ -0,0 +1,105 @@ +/* + * Copyright 2009-2013 by The Regents of the University of California + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * you may obtain a copy of the License from + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.uci.ics.hyracks.algebricks.rewriter.rules; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.commons.lang3.mutable.MutableObject; + +import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable; +import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression; +import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions; +import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamLimitPOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil; +import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; + +public class CopyLimitDownRule implements IAlgebraicRewriteRule { + + @Override + public boolean rewritePost(Mutable opRef, IOptimizationContext context) { + return false; + } + + @Override + public boolean rewritePre(Mutable opRef, IOptimizationContext context) throws AlgebricksException { + AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue(); + if (op.getOperatorTag() != LogicalOperatorTag.LIMIT) { + return false; + } + LimitOperator limitOp = (LimitOperator) op; + if (!limitOp.isTopmostLimitOp()) { + return false; + } + + List limitUsedVars = new ArrayList<>(); + VariableUtilities.getUsedVariables(limitOp, limitUsedVars); + + Mutable safeOpRef = null; + Mutable candidateOpRef = limitOp.getInputs().get(0); + + List candidateProducedVars = new ArrayList<>(); + while (true) { + candidateProducedVars.clear(); + ILogicalOperator candidateOp = candidateOpRef.getValue(); + LogicalOperatorTag candidateOpTag = candidateOp.getOperatorTag(); + if (candidateOp.getInputs().size() > 1 || !candidateOp.isMap() + || candidateOpTag == LogicalOperatorTag.SELECT || candidateOpTag == LogicalOperatorTag.LIMIT + || !OperatorPropertiesUtil.disjoint(limitUsedVars, candidateProducedVars)) { + break; + } + + safeOpRef = candidateOpRef; + candidateOpRef = safeOpRef.getValue().getInputs().get(0); + } + + if (safeOpRef != null) { + ILogicalOperator safeOp = safeOpRef.getValue(); + Mutable unsafeOpRef = safeOp.getInputs().get(0); + ILogicalOperator unsafeOp = unsafeOpRef.getValue(); + LimitOperator limitCloneOp = null; + if (limitOp.getOffset().getValue() == null) { + limitCloneOp = new LimitOperator(limitOp.getMaxObjects().getValue(), false); + } else { + IFunctionInfo finfoAdd = context.getMetadataProvider().lookupFunction( + AlgebricksBuiltinFunctions.NUMERIC_ADD); + List> addArgs = new ArrayList<>(); + addArgs.add(new MutableObject(limitOp.getMaxObjects().getValue().cloneExpression())); + addArgs.add(new MutableObject(limitOp.getOffset().getValue().cloneExpression())); + ScalarFunctionCallExpression maxPlusOffset = new ScalarFunctionCallExpression(finfoAdd, addArgs); + limitCloneOp = new LimitOperator(maxPlusOffset, false); + } + limitCloneOp.setPhysicalOperator(new StreamLimitPOperator()); + limitCloneOp.getInputs().add(new MutableObject(unsafeOp)); + limitCloneOp.setExecutionMode(unsafeOp.getExecutionMode()); + limitCloneOp.recomputeSchema(); + unsafeOpRef.setValue(limitCloneOp); + context.computeAndSetTypeEnvironmentForOperator(limitCloneOp); + context.addToDontApplySet(this, limitOp); + } + + return safeOpRef != null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateGroupByEmptyKeyRule.java ---------------------------------------------------------------------- diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateGroupByEmptyKeyRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateGroupByEmptyKeyRule.java new file mode 100644 index 0000000..e93fdd1 --- /dev/null +++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateGroupByEmptyKeyRule.java @@ -0,0 +1,86 @@ +/* + * Copyright 2009-2013 by The Regents of the University of California + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * you may obtain a copy of the License from + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.uci.ics.hyracks.algebricks.rewriter.rules; + +import java.util.List; + +import org.apache.commons.lang3.mutable.Mutable; + +import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; +import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; + +/** + * This rule lift out the aggregate operator out from a group-by operator + * if the gby operator groups-by on empty key, e.g., the group-by variables are empty. + * + * @author yingyib + */ +public class EliminateGroupByEmptyKeyRule implements IAlgebraicRewriteRule { + + @Override + public boolean rewritePre(Mutable opRef, IOptimizationContext context) throws AlgebricksException { + return false; + } + + @Override + public boolean rewritePost(Mutable opRef, IOptimizationContext context) + throws AlgebricksException { + AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue(); + if (op.getOperatorTag() != LogicalOperatorTag.GROUP) { + return false; + } + GroupByOperator groupOp = (GroupByOperator) op; + List groupVars = groupOp.getGbyVarList(); + if (groupVars.size() > 0) { + return false; + } + List nestedPlans = groupOp.getNestedPlans(); + if (nestedPlans.size() > 1) { + return false; + } + ILogicalPlan nestedPlan = nestedPlans.get(0); + if (nestedPlan.getRoots().size() > 1) { + return false; + } + Mutable topOpRef = nestedPlan.getRoots().get(0); + ILogicalOperator topOp = nestedPlan.getRoots().get(0).getValue(); + Mutable nestedTupleSourceRef = getNestedTupleSourceReference(topOpRef); + /** + * connect nested top op into the plan + */ + opRef.setValue(topOp); + /** + * connect child op into the plan + */ + nestedTupleSourceRef.setValue(groupOp.getInputs().get(0).getValue()); + return true; + } + + private Mutable getNestedTupleSourceReference(Mutable nestedTopOperatorRef) { + Mutable currentOpRef = nestedTopOperatorRef; + while (currentOpRef.getValue().getInputs() != null && currentOpRef.getValue().getInputs().size() > 0) { + currentOpRef = currentOpRef.getValue().getInputs().get(0); + } + return currentOpRef; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateSubplanRule.java ---------------------------------------------------------------------- diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateSubplanRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateSubplanRule.java new file mode 100644 index 0000000..8a381f7 --- /dev/null +++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateSubplanRule.java @@ -0,0 +1,127 @@ +/* + * Copyright 2009-2013 by The Regents of the University of California + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * you may obtain a copy of the License from + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.uci.ics.hyracks.algebricks.rewriter.rules; + +import java.util.LinkedList; + +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.commons.lang3.mutable.MutableObject; + +import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; +import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil; +import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil; +import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; + +public class EliminateSubplanRule implements IAlgebraicRewriteRule { + + @Override + public boolean rewritePost(Mutable opRef, IOptimizationContext context) { + return false; + } + + /** + * Eliminate Subplan above ETS + * and Subplan that has only ops. with one input and no free vars. (could we + * modify it to consider free vars which are sources of Unnest or Assign, if + * there are no aggregates?) + */ + @Override + public boolean rewritePre(Mutable opRef, IOptimizationContext context) throws AlgebricksException { + AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue(); + if (op.getOperatorTag() != LogicalOperatorTag.SUBPLAN) { + return false; + } + SubplanOperator subplan = (SubplanOperator) op; + + Mutable outerRef = subplan.getInputs().get(0); + AbstractLogicalOperator outerRefOp = (AbstractLogicalOperator) outerRef.getValue(); + if (outerRefOp.getOperatorTag() == LogicalOperatorTag.EMPTYTUPLESOURCE) { + elimSubplanOverEts(opRef, context); + return true; + } + if (subplan.getNestedPlans().size() == 1 && subplan.getNestedPlans().get(0).getRoots().size() == 1 + && !OperatorPropertiesUtil.hasFreeVariables(subplan)) { + if (elimOneSubplanWithNoFreeVars(opRef)) { + return true; + } + } + + return false; + } + + private boolean elimOneSubplanWithNoFreeVars(Mutable opRef) { + SubplanOperator subplan = (SubplanOperator) opRef.getValue(); + AbstractLogicalOperator rootOp = (AbstractLogicalOperator) subplan.getNestedPlans().get(0).getRoots().get(0) + .getValue(); + if (rootOp.getOperatorTag() == LogicalOperatorTag.EMPTYTUPLESOURCE + || rootOp.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) { + opRef.setValue(subplan.getInputs().get(0).getValue()); + return true; + } else { + AbstractLogicalOperator botOp = rootOp; + if (botOp.getInputs().size() != 1) { + return false; + } + do { + Mutable botRef = botOp.getInputs().get(0); + botOp = (AbstractLogicalOperator) botRef.getValue(); + if (botOp.getOperatorTag() == LogicalOperatorTag.EMPTYTUPLESOURCE) { + botRef.setValue(subplan.getInputs().get(0).getValue()); + opRef.setValue(rootOp); + return true; + } + } while (botOp.getInputs().size() == 1); + return false; + } + } + + private void elimSubplanOverEts(Mutable opRef, IOptimizationContext ctx) + throws AlgebricksException { + SubplanOperator subplan = (SubplanOperator) opRef.getValue(); + for (ILogicalPlan p : subplan.getNestedPlans()) { + for (Mutable r : p.getRoots()) { + OperatorManipulationUtil.ntsToEts(r, ctx); + } + } + LinkedList> allRoots = subplan.allRootsInReverseOrder(); + if (allRoots.size() == 1) { + opRef.setValue(allRoots.get(0).getValue()); + } else { + ILogicalOperator topOp = null; + for (Mutable r : allRoots) { + if (topOp == null) { + topOp = r.getValue(); + } else { + LeftOuterJoinOperator j = new LeftOuterJoinOperator(new MutableObject( + ConstantExpression.TRUE)); + j.getInputs().add(new MutableObject(topOp)); + j.getInputs().add(r); + ctx.setOutputTypeEnvironment(j, j.computeOutputTypeEnvironment(ctx)); + topOp = j; + } + } + opRef.setValue(topOp); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateSubplanWithInputCardinalityOneRule.java ---------------------------------------------------------------------- diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateSubplanWithInputCardinalityOneRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateSubplanWithInputCardinalityOneRule.java new file mode 100644 index 0000000..1397956 --- /dev/null +++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateSubplanWithInputCardinalityOneRule.java @@ -0,0 +1,199 @@ +/* + * Copyright 2009-2013 by The Regents of the University of California + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * you may obtain a copy of the License from + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.uci.ics.hyracks.algebricks.rewriter.rules; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.commons.lang3.mutable.Mutable; + +import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; +import edu.uci.ics.hyracks.algebricks.common.utils.ListSet; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities; +import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil; +import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; + +/** + * This rule eliminates a subplan with the following pattern: + * -- SUBPLAN + * -- OP (where OP produces exactly one tuple) + * The live variables at OP will not be used after SUBPLAN. + * Note: This rule must be applied after + * the RemoveRedundantVariablesRule (to avoid the lineage analysis of variable cardinality). + * + * @author yingyib + */ +public class EliminateSubplanWithInputCardinalityOneRule implements IAlgebraicRewriteRule { + /** The pointer to the topmost operator */ + private Mutable rootRef; + /** Whether the rule has even been invoked */ + private boolean invoked = false; + + @Override + public boolean rewritePost(Mutable opRef, IOptimizationContext context) { + return false; + } + + @Override + public boolean rewritePre(Mutable opRef, IOptimizationContext context) throws AlgebricksException { + if (!invoked) { + rootRef = opRef; + invoked = true; + } + AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue(); + if (op.getInputs().size() <= 0) { + return false; + } + boolean changed = false; + for (Mutable subplanRef : op.getInputs()) { + AbstractLogicalOperator op1 = (AbstractLogicalOperator) subplanRef.getValue(); + if (op1.getOperatorTag() != LogicalOperatorTag.SUBPLAN) { + continue; + } + + SubplanOperator subplan = (SubplanOperator) op1; + Set usedVarsUp = new ListSet(); + OperatorPropertiesUtil.getFreeVariablesInPath(rootRef.getValue(), subplan, usedVarsUp); + // TODO(buyingyi): figure out the rewriting for subplan operators with multiple subplans. + if (subplan.getNestedPlans().size() != 1) { + continue; + } + + ILogicalOperator subplanInputOperator = subplan.getInputs().get(0).getValue(); + Set subplanInputVars = new ListSet(); + VariableUtilities.getLiveVariables(subplanInputOperator, subplanInputVars); + int subplanInputVarSize = subplanInputVars.size(); + subplanInputVars.removeAll(usedVarsUp); + // Makes sure the free variables are only used in the subplan. + if (subplanInputVars.size() < subplanInputVarSize) { + continue; + } + Set freeVars = new ListSet(); + OperatorPropertiesUtil.getFreeVariablesInSubplans(subplan, freeVars); + boolean cardinalityOne = isCardinalityOne(subplan.getInputs().get(0), freeVars); + if (cardinalityOne) { + /** If the cardinality of freeVars in the subplan is one, the subplan can be removed. */ + ILogicalPlan plan = subplan.getNestedPlans().get(0); + + List> rootRefs = plan.getRoots(); + // TODO(buyingyi): investigate the case of multi-root plans. + if (rootRefs.size() != 1) { + continue; + } + Set> ntsSet = new ListSet>(); + findNts(rootRefs.get(0), ntsSet); + + /** Replaces nts with the input operator of the subplan. */ + for (Mutable nts : ntsSet) { + nts.setValue(subplanInputOperator); + } + subplanRef.setValue(rootRefs.get(0).getValue()); + changed = true; + } else { + continue; + } + } + return changed; + } + + /** + * Whether the cardinality of the input free variables are one. + * + * @param opRef + * the operator to be checked (including its input operators) + * @param freeVars + * variables to be checked for produced operators + * @return true if every input variable has cardinality one; false otherwise. + * @throws AlgebricksException + */ + private boolean isCardinalityOne(Mutable opRef, Set freeVars) + throws AlgebricksException { + Set varsWithCardinalityOne = new ListSet(); + Set varsLiveAtUnnestAndJoin = new ListSet(); + isCardinalityOne(opRef, freeVars, varsWithCardinalityOne, varsLiveAtUnnestAndJoin); + varsWithCardinalityOne.removeAll(varsLiveAtUnnestAndJoin); + return varsWithCardinalityOne.equals(freeVars); + } + + /** + * Recursively adding variables which has cardinality one and in int the input free variable set. + * + * @param opRef + * , the current operator reference. + * @param freeVars + * , a set of variables. + * @param varsWithCardinalityOne + * , variables in the free variable set with cardinality one at the time they are created. + * @param varsLiveAtUnnestAndJoin + * , live variables at Unnest and Join. The cardinalities of those variables can become more than one + * even if their cardinalities were one at the time those variables were created. + * @throws AlgebricksException + */ + private void isCardinalityOne(Mutable opRef, Set freeVars, + Set varsWithCardinalityOne, Set varsLiveAtUnnestAndJoin) + throws AlgebricksException { + AbstractLogicalOperator operator = (AbstractLogicalOperator) opRef.getValue(); + List producedVars = new ArrayList(); + VariableUtilities.getProducedVariables(operator, producedVars); + if (operator.getOperatorTag() == LogicalOperatorTag.UNNEST + || operator.getOperatorTag() == LogicalOperatorTag.INNERJOIN + || operator.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN) { + VariableUtilities.getLiveVariables(operator, varsLiveAtUnnestAndJoin); + } + if (operator.getOperatorTag() == LogicalOperatorTag.AGGREGATE) { + for (LogicalVariable producedVar : producedVars) { + if (freeVars.contains(producedVar)) { + varsWithCardinalityOne.add(producedVar); + } + } + } + if (varsWithCardinalityOne.size() == freeVars.size()) { + return; + } + for (Mutable childRef : operator.getInputs()) { + isCardinalityOne(childRef, freeVars, varsWithCardinalityOne, varsLiveAtUnnestAndJoin); + } + } + + /** + * Find the NestedTupleSource operator in the direct/undirect input operators of opRef. + * + * @param opRef + * , the current operator reference. + * @param ntsSet + * , the set NestedTupleSource operator references. + */ + private void findNts(Mutable opRef, Set> ntsSet) { + int childSize = opRef.getValue().getInputs().size(); + if (childSize == 0) { + AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue(); + if (op.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) { + ntsSet.add(opRef); + } + return; + } + for (Mutable childRef : opRef.getValue().getInputs()) { + findNts(childRef, ntsSet); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java ---------------------------------------------------------------------- diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java new file mode 100644 index 0000000..d4834a3 --- /dev/null +++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java @@ -0,0 +1,165 @@ +/* + * Copyright 2009-2013 by The Regents of the University of California + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * you may obtain a copy of the License from + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package edu.uci.ics.hyracks.algebricks.rewriter.rules; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.commons.lang3.mutable.MutableObject; + +import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; +import edu.uci.ics.hyracks.algebricks.common.utils.Pair; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; +import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder; +import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; + +/** + * If there is any ordering property before the subplan operator, the ordering should + * be kept after the subplan. + * This rule adds a redundant order operator after those cases, to guarantee the correctness. + * + * @author yingyib + */ +public class EnforceOrderByAfterSubplan implements IAlgebraicRewriteRule { + /** a set of order-breaking operators */ + private final Set orderBreakingOps = new HashSet(); + /** a set of order-sensitive operators */ + private final Set orderSensitiveOps = new HashSet(); + + public EnforceOrderByAfterSubplan() { + /** add operators that break the ordering */ + orderBreakingOps.add(LogicalOperatorTag.INNERJOIN); + orderBreakingOps.add(LogicalOperatorTag.LEFTOUTERJOIN); + orderBreakingOps.add(LogicalOperatorTag.UNIONALL); + orderBreakingOps.add(LogicalOperatorTag.AGGREGATE); + + /** add operators that are sensitive to the ordering */ + orderSensitiveOps.add(LogicalOperatorTag.LIMIT); + } + + @Override + public boolean rewritePre(Mutable opRef, IOptimizationContext context) throws AlgebricksException { + return false; + } + + @Override + public boolean rewritePost(Mutable opRef, IOptimizationContext context) + throws AlgebricksException { + AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue(); + if (context.checkIfInDontApplySet(this, op1)) { + return false; + } + List> inputs = op1.getInputs(); + context.addToDontApplySet(this, op1); + if (op1.getOperatorTag() == LogicalOperatorTag.ORDER || inputs == null) { + /** + * does not apply if + * 1. there is yet-another order operator on-top-of the subplan, because the downstream order operator's ordering will be broken anyway + * 2. the input operator(s) is null + */ + return false; + } + boolean changed = false; + for (int i = 0; i < inputs.size(); i++) { + Mutable inputOpRef = inputs.get(i); + AbstractLogicalOperator op = (AbstractLogicalOperator) inputOpRef.getValue(); + context.addToDontApplySet(this, op); + if (op.getOperatorTag() != LogicalOperatorTag.SUBPLAN) { + continue; + } + + /** + * check the order operators whose ordering is not broken before the subplan operator, and then + * duplicate them on-top-of the subplan operator + */ + boolean foundTarget = true; + boolean orderSensitive = false; + Mutable childRef = op.getInputs().get(0); + AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue(); + while (child.getOperatorTag() != LogicalOperatorTag.ORDER) { + context.addToDontApplySet(this, child); + if (orderBreakingOps.contains(child.getOperatorTag())) { + foundTarget = false; + break; + } + if(child.getOperatorTag() == LogicalOperatorTag.GROUP){ + foundTarget = false; + break; + } + if (orderSensitiveOps.contains(child.getOperatorTag())) { + orderSensitive = true; + } + List> childInputs = child.getInputs(); + if (childInputs == null || childInputs.size() > 2 || childInputs.size() < 1) { + foundTarget = false; + break; + } else { + childRef = childInputs.get(0); + child = (AbstractLogicalOperator) childRef.getValue(); + } + } + /** the target order-by operator has not been found. */ + if (!foundTarget) { + return false; + } + + /** copy the original order-by operator and insert on-top-of the subplan operator */ + context.addToDontApplySet(this, child); + OrderOperator sourceOrderOp = (OrderOperator) child; + for (Pair> expr : sourceOrderOp.getOrderExpressions()) { + if (!expr.second.getValue().isFunctional()) { + return false; + } + } + List>> orderExprs = deepCopyOrderAndExpression(sourceOrderOp + .getOrderExpressions()); + OrderOperator newOrderOp = new OrderOperator(orderExprs); + context.addToDontApplySet(this, newOrderOp); + inputs.set(i, new MutableObject(newOrderOp)); + newOrderOp.getInputs().add(inputOpRef); + context.computeAndSetTypeEnvironmentForOperator(newOrderOp); + + if (!orderSensitive) { + /** remove the original order-by */ + childRef.setValue(sourceOrderOp.getInputs().get(0).getValue()); + } + changed = true; + } + return changed; + } + + private Mutable deepCopyExpressionRef(Mutable oldExpr) { + return new MutableObject(((AbstractLogicalExpression) oldExpr.getValue()).cloneExpression()); + } + + private List>> deepCopyOrderAndExpression( + List>> ordersAndExprs) { + List>> newOrdersAndExprs = new ArrayList>>(); + for (Pair> pair : ordersAndExprs) + newOrdersAndExprs.add(new Pair>(pair.first, + deepCopyExpressionRef(pair.second))); + return newOrdersAndExprs; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java ---------------------------------------------------------------------- diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java new file mode 100644 index 0000000..ebd7da1 --- /dev/null +++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java @@ -0,0 +1,614 @@ +/* + * Copyright 2009-2013 by The Regents of the University of California + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * you may obtain a copy of the License from + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.uci.ics.hyracks.algebricks.rewriter.rules; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.commons.lang3.mutable.MutableObject; + +import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; +import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException; +import edu.uci.ics.hyracks.algebricks.common.utils.Pair; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.EquivalenceClass; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.OperatorAnnotations; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.FDsAndEquivClassesVisitor; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AbstractStableSortPOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.BroadcastPOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RandomMergeExchangePOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionPOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergePOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RangePartitionPOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor; +import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.FunctionalDependency; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty.PartitioningType; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalGroupingProperty; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PropertiesUtil; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty; +import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil; +import edu.uci.ics.hyracks.algebricks.core.config.AlgebricksConfig; +import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; +import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; +import edu.uci.ics.hyracks.algebricks.rewriter.util.PhysicalOptimizationsUtil; +import edu.uci.ics.hyracks.dataflow.common.data.partition.range.IRangeMap; + +public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule { + + private static final INodeDomain DEFAULT_DOMAIN = new DefaultNodeGroupDomain("__DEFAULT"); + + private PhysicalOptimizationConfig physicalOptimizationConfig; + + @Override + public boolean rewritePost(Mutable opRef, IOptimizationContext context) + throws AlgebricksException { + return false; + } + + @Override + public boolean rewritePre(Mutable opRef, IOptimizationContext context) throws AlgebricksException { + AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue(); + // wait for the physical operators to be set first + if (op.getPhysicalOperator() == null) { + return false; + } + if (context.checkIfInDontApplySet(this, op)) { + return false; + } + + List fds = context.getFDList(op); + if (fds != null && !fds.isEmpty()) { + return false; + } + // These are actually logical constraints, so they could be pre-computed + // somewhere else, too. + + physicalOptimizationConfig = context.getPhysicalOptimizationConfig(); + AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Optimizing operator " + op.getPhysicalOperator() + ".\n"); + + PhysicalOptimizationsUtil.computeFDsAndEquivalenceClasses(op, context); + + StructuralPropertiesVector pvector = new StructuralPropertiesVector(new RandomPartitioningProperty(null), + new LinkedList()); + boolean changed = physOptimizeOp(opRef, pvector, false, context); + op.computeDeliveredPhysicalProperties(context); + AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Structural properties for " + op.getPhysicalOperator() + ": " + + op.getDeliveredPhysicalProperties() + "\n"); + + context.addToDontApplySet(this, opRef.getValue()); + + return changed; + } + + private boolean physOptimizePlan(ILogicalPlan plan, IPhysicalPropertiesVector pvector, boolean nestedPlan, + IOptimizationContext context) throws AlgebricksException { + boolean changed = false; + for (Mutable root : plan.getRoots()) { + if (physOptimizeOp(root, pvector, nestedPlan, context)) { + changed = true; + } + AbstractLogicalOperator op = (AbstractLogicalOperator) root.getValue(); + op.computeDeliveredPhysicalProperties(context); + AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Structural properties for " + op.getPhysicalOperator() + + ": " + op.getDeliveredPhysicalProperties() + "\n"); + } + return changed; + } + + private boolean physOptimizeOp(Mutable opRef, IPhysicalPropertiesVector required, + boolean nestedPlan, IOptimizationContext context) throws AlgebricksException { + + boolean changed = false; + AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue(); + optimizeUsingConstraintsAndEquivClasses(op); + PhysicalRequirements pr = op.getRequiredPhysicalPropertiesForChildren(required); + IPhysicalPropertiesVector[] reqdProperties = null; + if (pr != null) { + reqdProperties = pr.getRequiredProperties(); + } + boolean opIsRedundantSort = false; + + // compute properties and figure out the domain + INodeDomain childrenDomain = null; + { + int j = 0; + for (Mutable childRef : op.getInputs()) { + AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue(); + // recursive call + if (physOptimizeOp(childRef, reqdProperties[j], nestedPlan, context)) { + changed = true; + } + child.computeDeliveredPhysicalProperties(context); + IPhysicalPropertiesVector delivered = child.getDeliveredPhysicalProperties(); + if (childrenDomain == null) { + childrenDomain = delivered.getPartitioningProperty().getNodeDomain(); + } else { + INodeDomain dom2 = delivered.getPartitioningProperty().getNodeDomain(); + if (!childrenDomain.sameAs(dom2)) { + childrenDomain = DEFAULT_DOMAIN; + } + } + j++; + } + } + + if (reqdProperties != null) { + for (int k = 0; k < reqdProperties.length; k++) { + IPhysicalPropertiesVector pv = reqdProperties[k]; + IPartitioningProperty pp = pv.getPartitioningProperty(); + if (pp != null && pp.getNodeDomain() == null) { + pp.setNodeDomain(childrenDomain); + } + } + } + + IPartitioningProperty firstDeliveredPartitioning = null; + int i = 0; + for (Mutable childRef : op.getInputs()) { + AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue(); + IPhysicalPropertiesVector delivered = child.getDeliveredPhysicalProperties(); + + AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Properties delivered by " + child.getPhysicalOperator() + + ": " + delivered + "\n"); + IPartitioningRequirementsCoordinator prc = pr.getPartitioningCoordinator(); + Pair pbpp = prc.coordinateRequirements( + reqdProperties[i].getPartitioningProperty(), firstDeliveredPartitioning, op, context); + boolean mayExpandPartitioningProperties = pbpp.first; + IPhysicalPropertiesVector rqd = new StructuralPropertiesVector(pbpp.second, + reqdProperties[i].getLocalProperties()); + + AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Required properties for " + child.getPhysicalOperator() + + ": " + rqd + "\n"); + IPhysicalPropertiesVector diff = delivered.getUnsatisfiedPropertiesFrom(rqd, + mayExpandPartitioningProperties, context.getEquivalenceClassMap(child), context.getFDList(child)); + + if (isRedundantSort(opRef, delivered, diff, context)) { + opIsRedundantSort = true; + } + + if (diff != null) { + changed = true; + addEnforcers(op, i, diff, rqd, delivered, childrenDomain, nestedPlan, context); + + AbstractLogicalOperator newChild = ((AbstractLogicalOperator) op.getInputs().get(i).getValue()); + + if (newChild != child) { + delivered = newChild.getDeliveredPhysicalProperties(); + IPhysicalPropertiesVector newDiff = newPropertiesDiff(newChild, rqd, + mayExpandPartitioningProperties, context); + AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> New properties diff: " + newDiff + "\n"); + + if (isRedundantSort(opRef, delivered, newDiff, context)) { + opIsRedundantSort = true; + break; + } + } + + } + if (firstDeliveredPartitioning == null) { + IPartitioningProperty dpp = delivered.getPartitioningProperty(); + if (dpp.getPartitioningType() == PartitioningType.ORDERED_PARTITIONED + || dpp.getPartitioningType() == PartitioningType.UNORDERED_PARTITIONED) { + firstDeliveredPartitioning = dpp; + } + } + + i++; + } + + if (op.hasNestedPlans()) { + AbstractOperatorWithNestedPlans nested = (AbstractOperatorWithNestedPlans) op; + for (ILogicalPlan p : nested.getNestedPlans()) { + if (physOptimizePlan(p, required, true, context)) { + changed = true; + } + } + } + + if (opIsRedundantSort) { + if (AlgebricksConfig.DEBUG) { + AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Removing redundant SORT operator " + + op.getPhysicalOperator() + "\n"); + printOp(op); + } + changed = true; + AbstractLogicalOperator nextOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue(); + if (nextOp.getOperatorTag() == LogicalOperatorTag.PROJECT) { + nextOp = (AbstractLogicalOperator) nextOp.getInputs().get(0).getValue(); + } + opRef.setValue(nextOp); + // Now, transfer annotations from the original sort op. to this one. + AbstractLogicalOperator transferTo = nextOp; + if (transferTo.getOperatorTag() == LogicalOperatorTag.EXCHANGE) { + // + // remove duplicate exchange operator + transferTo = (AbstractLogicalOperator) transferTo.getInputs().get(0).getValue(); + } + transferTo.getAnnotations().putAll(op.getAnnotations()); + physOptimizeOp(opRef, required, nestedPlan, context); + } + return changed; + } + + private IPhysicalPropertiesVector newPropertiesDiff(AbstractLogicalOperator newChild, + IPhysicalPropertiesVector required, boolean mayExpandPartitioningProperties, IOptimizationContext context) + throws AlgebricksException { + IPhysicalPropertiesVector newDelivered = newChild.getDeliveredPhysicalProperties(); + + Map newChildEqClasses = context.getEquivalenceClassMap(newChild); + List newChildFDs = context.getFDList(newChild); + if (newChildEqClasses == null || newChildFDs == null) { + FDsAndEquivClassesVisitor fdsVisitor = new FDsAndEquivClassesVisitor(); + newChild.accept(fdsVisitor, context); + newChildEqClasses = context.getEquivalenceClassMap(newChild); + newChildFDs = context.getFDList(newChild); + } + AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Required properties for new op. " + + newChild.getPhysicalOperator() + ": " + required + "\n"); + + return newDelivered.getUnsatisfiedPropertiesFrom(required, mayExpandPartitioningProperties, newChildEqClasses, + newChildFDs); + } + + private void optimizeUsingConstraintsAndEquivClasses(AbstractLogicalOperator op) { + IPhysicalOperator pOp = op.getPhysicalOperator(); + switch (pOp.getOperatorTag()) { + case HASH_GROUP_BY: + case EXTERNAL_GROUP_BY: { + GroupByOperator gby = (GroupByOperator) op; + ExternalGroupByPOperator hgbyOp = (ExternalGroupByPOperator) pOp; + hgbyOp.computeColumnSet(gby.getGroupByList()); + break; + } + case PRE_CLUSTERED_GROUP_BY: { + GroupByOperator gby = (GroupByOperator) op; + PreclusteredGroupByPOperator preSortedGby = (PreclusteredGroupByPOperator) pOp; + preSortedGby.setGbyColumns(gby.getGbyVarList()); + break; + } + case PRE_SORTED_DISTINCT_BY: { + DistinctOperator d = (DistinctOperator) op; + PreSortedDistinctByPOperator preSortedDistinct = (PreSortedDistinctByPOperator) pOp; + preSortedDistinct.setDistinctByColumns(d.getDistinctByVarList()); + break; + } + } + } + + private List getOrderColumnsFromGroupingProperties(List reqd, + List dlvd) { + List returnedProperties = new ArrayList(); + List rqdCols = new ArrayList(); + List dlvdCols = new ArrayList(); + for (ILocalStructuralProperty r : reqd) { + r.getVariables(rqdCols); + } + for (ILocalStructuralProperty d : dlvd) { + d.getVariables(dlvdCols); + } + + int prefix = dlvdCols.size() - 1; + for (; prefix >= 0;) { + if (!rqdCols.contains(dlvdCols.get(prefix))) { + prefix--; + } else { + break; + } + } + + LocalOrderProperty orderProp = (LocalOrderProperty) dlvd.get(0); + List orderColumns = orderProp.getOrderColumns(); + for (int j = 0; j <= prefix; j++) { + returnedProperties.add(new OrderColumn(orderColumns.get(j).getColumn(), orderColumns.get(j).getOrder())); + } + // maintain other order columns after the required order columns + if (returnedProperties.size() != 0) { + for (int j = prefix + 1; j < dlvdCols.size(); j++) { + OrderColumn oc = orderColumns.get(j); + returnedProperties.add(new OrderColumn(oc.getColumn(), oc.getOrder())); + } + } + return returnedProperties; + } + + /* + * We assume delivered to be already normalized. + */ + private boolean isRedundantSort(Mutable opRef, IPhysicalPropertiesVector delivered, + IPhysicalPropertiesVector diffOfProperties, IOptimizationContext context) { + AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue(); + if (op.getOperatorTag() != LogicalOperatorTag.ORDER + || (op.getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.STABLE_SORT && op + .getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.IN_MEMORY_STABLE_SORT) + || delivered.getLocalProperties() == null) { + return false; + } + AbstractStableSortPOperator sortOp = (AbstractStableSortPOperator) op.getPhysicalOperator(); + sortOp.computeLocalProperties(op); + ILocalStructuralProperty orderProp = sortOp.getOrderProperty(); + return PropertiesUtil.matchLocalProperties(Collections.singletonList(orderProp), + delivered.getLocalProperties(), context.getEquivalenceClassMap(op), context.getFDList(op)); + } + + private void addEnforcers(AbstractLogicalOperator op, int childIndex, + IPhysicalPropertiesVector diffPropertiesVector, IPhysicalPropertiesVector required, + IPhysicalPropertiesVector deliveredByChild, INodeDomain domain, boolean nestedPlan, + IOptimizationContext context) throws AlgebricksException { + + IPartitioningProperty pp = diffPropertiesVector.getPartitioningProperty(); + if (pp == null || pp.getPartitioningType() == PartitioningType.UNPARTITIONED) { + addLocalEnforcers(op, childIndex, diffPropertiesVector.getLocalProperties(), nestedPlan, context); + IPhysicalPropertiesVector deliveredByNewChild = ((AbstractLogicalOperator) op.getInputs().get(0).getValue()) + .getDeliveredPhysicalProperties(); + addPartitioningEnforcers(op, childIndex, pp, required, deliveredByNewChild, domain, context); + } else { + addPartitioningEnforcers(op, childIndex, pp, required, deliveredByChild, domain, context); + AbstractLogicalOperator newChild = (AbstractLogicalOperator) op.getInputs().get(childIndex).getValue(); + IPhysicalPropertiesVector newDiff = newPropertiesDiff(newChild, required, true, context); + AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> New properties diff: " + newDiff + "\n"); + if (newDiff != null) { + addLocalEnforcers(op, childIndex, newDiff.getLocalProperties(), nestedPlan, context); + } + } + } + + private void addLocalEnforcers(AbstractLogicalOperator op, int i, List localProperties, + boolean nestedPlan, IOptimizationContext context) throws AlgebricksException { + if (AlgebricksConfig.DEBUG) { + AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Adding local enforcers for local props = " + localProperties + + "\n"); + } + + if (localProperties == null || localProperties.isEmpty()) { + return; + } + + Mutable topOp = new MutableObject(); + topOp.setValue(op.getInputs().get(i).getValue()); + LinkedList oList = new LinkedList(); + + for (ILocalStructuralProperty prop : localProperties) { + switch (prop.getPropertyType()) { + case LOCAL_ORDER_PROPERTY: { + oList.add((LocalOrderProperty) prop); + break; + } + case LOCAL_GROUPING_PROPERTY: { + LocalGroupingProperty g = (LocalGroupingProperty) prop; + Collection vars = (g.getPreferredOrderEnforcer() != null) ? g + .getPreferredOrderEnforcer() : g.getColumnSet(); + List orderColumns = new ArrayList(); + for (LogicalVariable v : vars) { + OrderColumn oc = new OrderColumn(v, OrderKind.ASC); + orderColumns.add(oc); + } + LocalOrderProperty lop = new LocalOrderProperty(orderColumns); + oList.add(lop); + break; + } + default: { + throw new IllegalStateException(); + } + } + } + if (!oList.isEmpty()) { + topOp = enforceOrderProperties(oList, topOp, nestedPlan, context); + } + + op.getInputs().set(i, topOp); + OperatorPropertiesUtil.computeSchemaAndPropertiesRecIfNull((AbstractLogicalOperator) topOp.getValue(), context); + printOp((AbstractLogicalOperator) topOp.getValue()); + } + + private Mutable enforceOrderProperties(List oList, + Mutable topOp, boolean isMicroOp, IOptimizationContext context) + throws AlgebricksException { + List>> oe = new LinkedList>>(); + for (LocalOrderProperty orderProperty : oList) { + for (OrderColumn oc : orderProperty.getOrderColumns()) { + IOrder ordType = (oc.getOrder() == OrderKind.ASC) ? OrderOperator.ASC_ORDER : OrderOperator.DESC_ORDER; + Pair> pair = new Pair>(ordType, + new MutableObject(new VariableReferenceExpression(oc.getColumn()))); + oe.add(pair); + } + } + OrderOperator oo = new OrderOperator(oe); + oo.setExecutionMode(AbstractLogicalOperator.ExecutionMode.LOCAL); + if (isMicroOp) { + oo.setPhysicalOperator(new InMemoryStableSortPOperator()); + } else { + oo.setPhysicalOperator(new StableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort())); + } + oo.getInputs().add(topOp); + context.computeAndSetTypeEnvironmentForOperator(oo); + if (AlgebricksConfig.DEBUG) { + AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Added sort enforcer " + oo.getPhysicalOperator() + ".\n"); + } + return new MutableObject(oo); + } + + private void addPartitioningEnforcers(ILogicalOperator op, int i, IPartitioningProperty pp, + IPhysicalPropertiesVector required, IPhysicalPropertiesVector deliveredByChild, INodeDomain domain, + IOptimizationContext context) throws AlgebricksException { + if (pp != null) { + IPhysicalOperator pop; + switch (pp.getPartitioningType()) { + case UNPARTITIONED: { + List ordCols = computeOrderColumns(deliveredByChild); + if (ordCols == null || ordCols.size() == 0) { + pop = new RandomMergeExchangePOperator(); + } else { + if (op.getAnnotations().containsKey(OperatorAnnotations.USE_RANGE_CONNECTOR)) { + IRangeMap rangeMap = (IRangeMap) op.getAnnotations().get( + OperatorAnnotations.USE_RANGE_CONNECTOR); + pop = new RangePartitionMergePOperator(ordCols, domain, rangeMap); + } else { + OrderColumn[] sortColumns = new OrderColumn[ordCols.size()]; + sortColumns = ordCols.toArray(sortColumns); + pop = new SortMergeExchangePOperator(sortColumns); + } + } + break; + } + case UNORDERED_PARTITIONED: { + List varList = new ArrayList( + ((UnorderedPartitionedProperty) pp).getColumnSet()); + List cldLocals = deliveredByChild.getLocalProperties(); + List reqdLocals = required.getLocalProperties(); + boolean propWasSet = false; + pop = null; + if (reqdLocals != null && cldLocals != null && allAreOrderProps(cldLocals)) { + AbstractLogicalOperator c = (AbstractLogicalOperator) op.getInputs().get(i).getValue(); + Map ecs = context.getEquivalenceClassMap(c); + List fds = context.getFDList(c); + if (PropertiesUtil.matchLocalProperties(reqdLocals, cldLocals, ecs, fds)) { + List orderColumns = getOrderColumnsFromGroupingProperties(reqdLocals, + cldLocals); + pop = new HashPartitionMergeExchangePOperator(orderColumns, varList, domain); + propWasSet = true; + } + } + if (!propWasSet) { + pop = new HashPartitionExchangePOperator(varList, domain); + } + break; + } + case ORDERED_PARTITIONED: { + pop = new RangePartitionPOperator(((OrderedPartitionedProperty) pp).getOrderColumns(), domain, null); + break; + } + case BROADCAST: { + pop = new BroadcastPOperator(domain); + break; + } + case RANDOM: { + RandomPartitioningProperty rpp = (RandomPartitioningProperty) pp; + INodeDomain nd = rpp.getNodeDomain(); + pop = new RandomPartitionPOperator(nd); + break; + } + default: { + throw new NotImplementedException("Enforcer for " + pp.getPartitioningType() + + " partitioning type has not been implemented."); + } + } + Mutable ci = op.getInputs().get(i); + ExchangeOperator exchg = new ExchangeOperator(); + exchg.setPhysicalOperator(pop); + setNewOp(ci, exchg, context); + exchg.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED); + OperatorPropertiesUtil.computeSchemaAndPropertiesRecIfNull(exchg, context); + context.computeAndSetTypeEnvironmentForOperator(exchg); + if (AlgebricksConfig.DEBUG) { + AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Added partitioning enforcer " + + exchg.getPhysicalOperator() + ".\n"); + printOp((AbstractLogicalOperator) op); + } + } + } + + private boolean allAreOrderProps(List cldLocals) { + for (ILocalStructuralProperty lsp : cldLocals) { + if (lsp.getPropertyType() != PropertyType.LOCAL_ORDER_PROPERTY) { + return false; + } + } + return !cldLocals.isEmpty(); + } + + private void printOp(AbstractLogicalOperator op) throws AlgebricksException { + StringBuilder sb = new StringBuilder(); + LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor(); + PlanPrettyPrinter.printOperator(op, sb, pvisitor, 0); + AlgebricksConfig.ALGEBRICKS_LOGGER.fine(sb.toString()); + } + + private List computeOrderColumns(IPhysicalPropertiesVector pv) { + List ordCols = new ArrayList(); + List localProps = pv.getLocalProperties(); + if (localProps == null || localProps.size() == 0) { + return null; + } else { + for (ILocalStructuralProperty p : localProps) { + if (p.getPropertyType() == PropertyType.LOCAL_ORDER_PROPERTY) { + LocalOrderProperty lop = (LocalOrderProperty) p; + ordCols.addAll(lop.getOrderColumns()); + } else { + return null; + } + } + return ordCols; + } + + } + + private void setNewOp(Mutable opRef, AbstractLogicalOperator newOp, IOptimizationContext context) + throws AlgebricksException { + ILogicalOperator oldOp = opRef.getValue(); + opRef.setValue(newOp); + newOp.getInputs().add(new MutableObject(oldOp)); + newOp.recomputeSchema(); + newOp.computeDeliveredPhysicalProperties(context); + context.computeAndSetTypeEnvironmentForOperator(newOp); + AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Structural properties for " + newOp.getPhysicalOperator() + + ": " + newOp.getDeliveredPhysicalProperties() + "\n"); + + PhysicalOptimizationsUtil.computeFDsAndEquivalenceClasses(newOp, context); + } + +}