Return-Path: X-Original-To: apmail-pig-commits-archive@www.apache.org Delivered-To: apmail-pig-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 75ACB10714 for ; Mon, 17 Feb 2014 08:27:32 +0000 (UTC) Received: (qmail 16158 invoked by uid 500); 17 Feb 2014 08:27:32 -0000 Delivered-To: apmail-pig-commits-archive@pig.apache.org Received: (qmail 15050 invoked by uid 500); 17 Feb 2014 08:27:26 -0000 Mailing-List: contact commits-help@pig.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pig.apache.org Delivered-To: mailing list commits@pig.apache.org Received: (qmail 15026 invoked by uid 99); 17 Feb 2014 08:27:24 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Feb 2014 08:27:24 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Feb 2014 08:27:15 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 2441623888E2; Mon, 17 Feb 2014 08:26:53 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1568901 - in /pig/branches/tez/src/org/apache/pig: backend/hadoop/executionengine/physicalLayer/expressionOperators/ backend/hadoop/executionengine/physicalLayer/plans/ backend/hadoop/executionengine/physicalLayer/relationalOperators/ back... Date: Mon, 17 Feb 2014 08:26:52 -0000 To: commits@pig.apache.org From: daijy@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140217082653.2441623888E2@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: daijy Date: Mon Feb 17 08:26:52 2014 New Revision: 1568901 URL: http://svn.apache.org/r1568901 Log: PIG-3757: Make scalar work Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java pig/branches/tez/src/org/apache/pig/impl/builtin/ReadScalarsTez.java Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java pig/branches/tez/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1568901&r1=1568900&r2=1568901&view=diff ============================================================================== --- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original) +++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Mon Feb 17 08:26:52 2014 @@ -538,6 +538,11 @@ public class POUserFunc extends Expressi public FuncSpec getFuncSpec() { return funcSpec; } + + public void setFuncSpec(FuncSpec funcSpec) { + this.funcSpec = funcSpec; + instantiateFunc(funcSpec); + } public String[] getCacheFiles() { return cacheFiles; Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1568901&r1=1568900&r2=1568901&view=diff ============================================================================== --- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original) +++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Mon Feb 17 08:26:52 2014 @@ -294,7 +294,12 @@ public class PhyPlanVisitor extends Plan } public void visitLimit(POLimit lim) throws VisitorException{ - //do nothing + PhysicalPlan inpPlan = lim.getLimitPlan(); + if (inpPlan!=null) { + pushWalker(mCurrentWalker.spawnChildWalker(inpPlan)); + visit(); + popWalker(); + } } public void visitCross(POCross cross) throws VisitorException{ Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java?rev=1568901&r1=1568900&r2=1568901&view=diff ============================================================================== --- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java (original) +++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java Mon Feb 17 08:26:52 2014 @@ -59,7 +59,7 @@ public class CombinerPackager extends Pa super(); keyType = pkgr.keyType; numInputs = 1; - inner = new boolean[1]; + inner = new boolean[pkgr.inner.length]; for (int i = 0; i < pkgr.inner.length; i++) { inner[i] = true; } Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java?rev=1568901&view=auto ============================================================================== --- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java (added) +++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java Mon Feb 17 08:26:52 2014 @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.tez; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; +import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.PlanException; +import org.apache.pig.impl.plan.ReverseDependencyOrderWalker; +import org.apache.pig.impl.plan.VisitorException; + +public class MultiQueryOptimizerTez extends TezOpPlanVisitor { + public MultiQueryOptimizerTez(TezOperPlan plan) { + super(plan, new ReverseDependencyOrderWalker(plan)); + } + + @Override + public void visitTezOp(TezOperator tezOp) throws VisitorException { + try { + if (!tezOp.isSplitter()) { + return; + } + + List splittees = new ArrayList(); + + List successors = getPlan().getSuccessors(tezOp); + List succ_successors = new ArrayList(); + for (TezOperator successor : successors) { + // don't want to be complicated by nested split + if (successor.isSplitter()) { + continue; + } + // If has other dependency, don't merge into split, + if (getPlan().getPredecessors(successor).size()!=1) { + continue; + } + boolean containsBlacklistedOp = false; + for (PhysicalOperator op : successor.plan) { + if (op instanceof POReservoirSample || op instanceof POPoissonSample) { + containsBlacklistedOp = true; + break; + } + } + if (containsBlacklistedOp) { + continue; + } + // Detect diamond shape, we cannot merge it into split, since Tez + // does not handle double edge between vertexes + boolean sharedSucc = false; + if (getPlan().getSuccessors(successor)!=null) { + for (TezOperator succ_successor : getPlan().getSuccessors(successor)) { + if (succ_successors.contains(succ_successor)) { + sharedSucc = true; + break; + } + } + succ_successors.addAll(getPlan().getSuccessors(successor)); + } + if (sharedSucc) { + continue; + } + splittees.add(successor); + } + + if (splittees.size()==0) { + return; + } + + if (splittees.size()==1 && successors.size()==1) { + // We don't need a POSplit here, we can merge the splittee into spliter + PhysicalOperator firstNodeLeaf = tezOp.plan.getLeaves().get(0); + PhysicalOperator firstNodeLeafPred = tezOp.plan.getPredecessors(firstNodeLeaf).get(0); + + TezOperator singleSplitee = splittees.get(0); + PhysicalOperator secondNodeRoot = singleSplitee.plan.getRoots().get(0); + PhysicalOperator secondNodeSucc = singleSplitee.plan.getSuccessors(secondNodeRoot).get(0); + + tezOp.plan.remove(firstNodeLeaf); + singleSplitee.plan.remove(secondNodeRoot); + + //TODO remove filter all + + tezOp.plan.merge(singleSplitee.plan); + tezOp.plan.connect(firstNodeLeafPred, secondNodeSucc); + + addSubPlanPropertiesToParent(tezOp, singleSplitee); + + removeSplittee(getPlan(), tezOp, singleSplitee); + } else { + POValueOutputTez valueOutput = (POValueOutputTez)tezOp.plan.getLeaves().get(0); + POSplit split = new POSplit(OperatorKey.genOpKey(valueOutput.getOperatorKey().getScope())); + for (TezOperator splitee : splittees) { + PhysicalOperator spliteeRoot = splitee.plan.getRoots().get(0); + splitee.plan.remove(spliteeRoot); + split.addPlan(splitee.plan); + + addSubPlanPropertiesToParent(tezOp, splitee); + + removeSplittee(getPlan(), tezOp, splitee); + valueOutput.outputKeys.remove(splitee.getOperatorKey().toString()); + } + if (!valueOutput.outputKeys.isEmpty()) { + // We still need valueOutput + PhysicalPlan phyPlan = new PhysicalPlan(); + phyPlan.addAsLeaf(valueOutput); + split.addPlan(phyPlan); + } + PhysicalOperator pred = tezOp.plan.getPredecessors(valueOutput).get(0); + tezOp.plan.disconnect(pred, valueOutput); + tezOp.plan.remove(valueOutput); + tezOp.plan.add(split); + tezOp.plan.connect(pred, split); + } + } catch (PlanException e) { + throw new VisitorException(e); + } + } + + static public void removeSplittee(TezOperPlan plan, TezOperator splitter, TezOperator splittee) throws PlanException { + if (plan.getSuccessors(splittee)!=null) { + List succs = new ArrayList(); + succs.addAll(plan.getSuccessors(splittee)); + plan.disconnect(splitter, splittee); + for (TezOperator succTezOperator : succs) { + TezEdgeDescriptor edge = succTezOperator.inEdges.get(splittee.getOperatorKey()); + + splitter.outEdges.remove(splittee.getOperatorKey()); + succTezOperator.inEdges.remove(splittee.getOperatorKey()); + plan.disconnect(splittee, succTezOperator); + TezCompilerUtil.connect(plan, splitter, succTezOperator, edge); + } + } + plan.remove(splittee); + } + + static public void addSubPlanPropertiesToParent(TezOperator parentOper, TezOperator subPlanOper) { + if (subPlanOper.getRequestedParallelism() > parentOper.getRequestedParallelism()) { + parentOper.setRequestedParallelism(subPlanOper.getRequestedParallelism()); + } + subPlanOper.setRequestedParallelismByReference(parentOper); + if (subPlanOper.UDFs != null) { + parentOper.UDFs.addAll(subPlanOper.UDFs); + } + if (subPlanOper.scalars != null) { + parentOper.scalars.addAll(subPlanOper.scalars); + } + if (subPlanOper.outEdges != null) { + for (Entry entry: subPlanOper.outEdges.entrySet()) { + parentOper.outEdges.put(entry.getKey(), entry.getValue()); + } + } + } +} Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1568901&r1=1568900&r2=1568901&view=diff ============================================================================== --- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original) +++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Mon Feb 17 08:26:52 2014 @@ -33,12 +33,14 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.data.BinSedesTuple; import org.apache.pig.data.SchemaTupleBackend; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.builtin.ReadScalarsTez; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.UDFContext; import org.apache.tez.common.TezUtils; @@ -158,6 +160,12 @@ public class PigProcessor implements Log for (POValueInputTez input : valueInputs){ input.attachInputs(inputs, conf); } + LinkedList scalarInputs = PlanHelper.getPhysicalOperators(execPlan, POUserFunc.class); + for (POUserFunc userFunc : scalarInputs ) { + if (userFunc.getFunc() instanceof ReadScalarsTez) { + ((ReadScalarsTez)userFunc.getFunc()).attachInputs(inputs, conf); + } + } LinkedList broadcasts = PlanHelper.getPhysicalOperators(execPlan, POFRJoinTez.class); for (POFRJoinTez broadcast : broadcasts){ broadcast.attachInputs(inputs, conf); Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java?rev=1568901&r1=1568900&r2=1568901&view=diff ============================================================================== --- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java (original) +++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java Mon Feb 17 08:26:52 2014 @@ -65,6 +65,10 @@ public class SecondaryKeyOptimizerTez ex break; } } + + if (connectingLR == null) { + continue; + } // Detected the POLocalRearrange -> POPackage pattern. Let's add // combiner if possible. Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1568901&r1=1568900&r2=1568901&view=diff ============================================================================== --- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original) +++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Mon Feb 17 08:26:52 2014 @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -84,6 +85,7 @@ import org.apache.pig.impl.builtin.Defau import org.apache.pig.impl.builtin.FindQuantiles; import org.apache.pig.impl.builtin.GetMemNumRows; import org.apache.pig.impl.builtin.PartitionSkewedKeys; +import org.apache.pig.impl.builtin.ReadScalarsTez; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.plan.DepthFirstWalker; @@ -223,17 +225,6 @@ public class TezCompiler extends PhyPlan for (PhysicalOperator op : ops) { compile(op); - if (curTezOp.isSplitSubPlan()) { - // Set inputs to null as POSplit will attach input to roots - for (PhysicalOperator root : curTezOp.plan.getRoots()) { - root.setInputs(null); - } - TezOperator splitOp = splitsSeen.get(curTezOp.getSplitOperatorKey()); - POSplit split = findPOSplit(splitOp, curTezOp.getSplitOperatorKey()); - split.addPlan(curTezOp.plan); - addSubPlanPropertiesToParent(splitOp, curTezOp); - curTezOp = splitOp; - } } for (TezOperator tezOper : splitsSeen.values()) { @@ -245,39 +236,42 @@ public class TezCompiler extends PhyPlan } tezOper.setClosed(true); } - - connectSoftLink(); + + fixScalar(); return tezPlan; } + + private void fixScalar() throws VisitorException, PlanException { + // Mapping POStore to POValueOuptut + Map storeSeen = new HashMap(); + + for (TezOperator tezOp : tezPlan) { + List userFuncs = PlanHelper.getPhysicalOperators(tezOp.plan, POUserFunc.class); + for (POUserFunc userFunc : userFuncs) { + if (userFunc.getReferencedOperator()!=null) { // Scalar + POStore store = (POStore)userFunc.getReferencedOperator(); + + TezOperator from = phyToTezOpMap.get(store); - private void addSubPlanPropertiesToParent(TezOperator parentOper, TezOperator subPlanOper) { - if (subPlanOper.getRequestedParallelism() > parentOper.getRequestedParallelism()) { - parentOper.setRequestedParallelism(subPlanOper.getRequestedParallelism()); - } - subPlanOper.setRequestedParallelismByReference(parentOper); - if (subPlanOper.UDFs != null) { - parentOper.UDFs.addAll(subPlanOper.UDFs); - } - if (subPlanOper.outEdges != null) { - for (Entry entry: subPlanOper.outEdges.entrySet()) { - parentOper.outEdges.put(entry.getKey(), entry.getValue()); - } - } - } + FuncSpec newSpec = new FuncSpec(ReadScalarsTez.class.getName(), from.getOperatorKey().toString()); + userFunc.setFuncSpec(newSpec); - private void connectSoftLink() throws PlanException, IOException { - for (PhysicalOperator op : plan) { - if (plan.getSoftLinkPredecessors(op)!=null) { - for (PhysicalOperator pred : plan.getSoftLinkPredecessors(op)) { - TezOperator from = phyToTezOpMap.get(pred); - TezOperator to = phyToTezOpMap.get(op); - if (from==to) { - continue; - } - if (tezPlan.getPredecessors(to)==null || !tezPlan.getPredecessors(to).contains(from)) { - tezPlan.connect(from, to); + if (storeSeen.containsKey(store)) { + storeSeen.get(store).outputKeys.add(tezOp.getOperatorKey().toString()); + } else { + POValueOutputTez output = new POValueOutputTez(OperatorKey.genOpKey(scope)); + output.addOutputKey(tezOp.getOperatorKey().toString()); + from.plan.remove(from.plan.getOperator(store.getOperatorKey())); + from.plan.addAsLeaf(output); + storeSeen.put(store, output); } + + TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, from, tezOp); + //TODO shared edge once support is available in Tez + edge.dataMovementType = DataMovementType.BROADCAST; + edge.outputClassName = OnFileUnorderedKVOutput.class.getName(); + edge.inputClassName = ShuffledUnorderedKVInput.class.getName(); } } } @@ -314,33 +308,66 @@ public class TezCompiler extends PhyPlan } PhysicalOperator p = predecessors.get(0); - TezOperator oper = null; + TezOperator storeTezOper = null; if (p instanceof POStore) { - oper = phyToTezOpMap.get(p); + storeTezOper = phyToTezOpMap.get(p); } else { int errCode = 2126; - String msg = "Predecessor of load should be a store. Got "+p.getClass(); + String msg = "Predecessor of load should be a store. Got " + p.getClass(); throw new PlanException(msg, errCode, PigException.BUG); } - - // Need new operator + PhysicalOperator store = storeTezOper.plan.getOperator(p.getOperatorKey()); + // replace POStore to POValueOutputTez, convert the tezOperator to splitter + storeTezOper.plan.disconnect(storeTezOper.plan.getPredecessors(store).get(0), store); + storeTezOper.plan.remove(store); + POValueOutputTez valueOutput = new POValueOutputTez(new OperatorKey(scope,nig.getNextNodeId(scope))); + storeTezOper.plan.addAsLeaf(valueOutput); + storeTezOper.setSplitter(true); + + // Create a splittee of store only + TezOperator storeOnlyTezOperator = getTezOp(); + PhysicalPlan storeOnlyPhyPlan = new PhysicalPlan(); + POValueInputTez valueInput = new POValueInputTez(new OperatorKey(scope,nig.getNextNodeId(scope))); + valueInput.setInputKey(storeTezOper.getOperatorKey().toString()); + storeOnlyPhyPlan.addAsLeaf(valueInput); + storeOnlyPhyPlan.addAsLeaf(store); + storeOnlyTezOperator.plan = storeOnlyPhyPlan; + tezPlan.add(storeOnlyTezOperator); + phyToTezOpMap.put(store, storeOnlyTezOperator); + + // Create new operator as second splittee curTezOp = getTezOp(); - curTezOp.plan.add(op); + POValueInputTez valueInput2 = new POValueInputTez(new OperatorKey(scope,nig.getNextNodeId(scope))); + valueInput2.setInputKey(storeTezOper.getOperatorKey().toString()); + curTezOp.plan.add(valueInput2); tezPlan.add(curTezOp); - plan.disconnect(op, p); - oper.segmentBelow = true; - tezPlan.connect(oper, curTezOp); - phyToTezOpMap.put(op, curTezOp); + // Connect splitter to splittee + TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, storeTezOper, storeOnlyTezOperator); + edge.dataMovementType = DataMovementType.ONE_TO_ONE; + edge.outputClassName = OnFileUnorderedKVOutput.class.getName(); + edge.inputClassName = ShuffledUnorderedKVInput.class.getName(); + storeOnlyTezOperator.setRequestedParallelismByReference(storeTezOper); + + edge = TezCompilerUtil.connect(tezPlan, storeTezOper, curTezOp); + edge.dataMovementType = DataMovementType.ONE_TO_ONE; + edge.outputClassName = OnFileUnorderedKVOutput.class.getName(); + edge.inputClassName = ShuffledUnorderedKVInput.class.getName(); + curTezOp.setRequestedParallelismByReference(storeTezOper); + return; } Collections.sort(predecessors); - compiledInputs = new TezOperator[predecessors.size()]; - int i = -1; - for (PhysicalOperator pred : predecessors) { - compile(pred); - compiledInputs[++i] = curTezOp; + if(op instanceof POSplit && splitsSeen.containsKey(op.getOperatorKey())){ + // skip follow up POSplit + } else { + compiledInputs = new TezOperator[predecessors.size()]; + int i = -1; + for (PhysicalOperator pred : predecessors) { + compile(pred); + compiledInputs[++i] = curTezOp; + } } } else { // No predecessors. Mostly a load. But this is where we start. We @@ -365,20 +392,6 @@ public class TezCompiler extends PhyPlan compiledInputs = prevCompInp; } - /** - * Start a new TezOperator whose plan will be the sub-plan of POSplit - * - * @param splitOperatorKey - * OperatorKey of the POSplit for which the new plan is a sub-plan - * @return the new TezOperator - * @throws PlanException - */ - private TezOperator startNew(OperatorKey splitOperatorKey) throws PlanException { - TezOperator ret = getTezOp(); - ret.setSplitOperatorKey(splitOperatorKey); - return ret; - } - private void nonBlocking(PhysicalOperator op) throws PlanException, IOException { TezOperator tezOp; if (compiledInputs.length == 1) { @@ -400,46 +413,11 @@ public class TezCompiler extends PhyPlan tezPlan.add(newTezOp); for (TezOperator tezOp : compiledInputs) { tezOp.setClosed(true); - handleSplitAndConnect(tezPlan, tezOp, newTezOp); + TezCompilerUtil.connect(tezPlan, tezOp, newTezOp); } curTezOp = newTezOp; } - private TezEdgeDescriptor handleSplitAndConnect(TezOperPlan tezPlan, TezOperator from, TezOperator to) - throws PlanException { - return handleSplitAndConnect(tezPlan, from, to, true); - } - - private TezEdgeDescriptor handleSplitAndConnect(TezOperPlan tezPlan, - TezOperator from, TezOperator to, boolean addToSplitPlan) - throws PlanException { - // Add edge descriptors from POLocalRearrange in POSplit - // sub-plan to new operators - PhysicalOperator leaf = from.plan.getLeaves().get(0); - // It could be POStoreTez incase of sampling job in order by - if (leaf instanceof POLocalRearrangeTez) { - POLocalRearrangeTez lr = (POLocalRearrangeTez) leaf; - lr.setOutputKey(to.getOperatorKey().toString()); - } - TezEdgeDescriptor edge = null; - if (from.isSplitSubPlan()) { - TezOperator splitOp = splitsSeen.get(from.getSplitOperatorKey()); - if (addToSplitPlan) { - // Set inputs to null as POSplit will attach input to roots - for (PhysicalOperator root : from.plan.getRoots()) { - root.setInputs(null); - } - POSplit split = findPOSplit(splitOp, from.getSplitOperatorKey()); - split.addPlan(from.plan); - addSubPlanPropertiesToParent(splitOp, curTezOp); - } - edge = TezCompilerUtil.connect(tezPlan, splitOp, to); - } else { - edge = TezCompilerUtil.connect(tezPlan, from, to); - } - return edge; - } - private POSplit findPOSplit(TezOperator tezOp, OperatorKey splitKey) throws PlanException { POSplit split = (POSplit) tezOp.plan.getOperator(splitKey); @@ -473,105 +451,6 @@ public class TezCompiler extends PhyPlan } /** - * Remove the operator and the whole tree connected to that operator from - * the plan. Only remove corresponding connected sub-plan if you encounter - * another Split operator in the predecessor. - * - * @param op Operator to remove - * @throws VisitorException - */ - private void removeDupOpTreeOfSplit(TezOperPlan plan, TezOperator op, boolean isMultiQuery) - throws VisitorException { - Stack stack = new Stack(); - stack.push(op); - while (!stack.isEmpty()) { - op = stack.pop(); - List predecessors = plan.getPredecessors(op); - if (predecessors != null) { - if (isMultiQuery) { - for (TezOperator pred : predecessors) { - if (!pred.isSplitOperator()) { - stack.push(pred); - } else { - List splits = PlanHelper.getPhysicalOperators( - pred.plan, POSplit.class); - for (POSplit split : splits) { - PhysicalPlan planToRemove = null; - for (PhysicalPlan splitPlan : split.getPlans()) { - PhysicalOperator phyOp = splitPlan - .getLeaves().get(0); - if (phyOp instanceof POLocalRearrangeTez) { - POLocalRearrangeTez lr = (POLocalRearrangeTez) phyOp; - if (lr.getOutputKey().equals( - op.getOperatorKey().toString())) { - planToRemove = splitPlan; - break; - } - } - } - if (planToRemove != null) { - split.getPlans().remove(planToRemove); - break; - } - } - } - } - } else { - for (TezOperator pred : predecessors) { - // Remove everything till we encounter another split - if (!pred.isSplitOperator()) { - stack.push(pred); - } else { - // If split operator, just remove from the output - POValueOutputTez valueOut = (POValueOutputTez)pred.plan.getLeaves().get(0); - valueOut.removeOutputKey(op.getOperatorKey().toString()); - //TODO Handle shared edge when available in Tez - pred.outEdges.remove(op.getOperatorKey().toString()); - } - } - } - } - plan.remove(op); - } - } - - /** - * In case of mulitple levels of split, after removing duplicate tree we need to reset - * input of operators in the old tree as some of the inputs of the PhysicalOperator in - * original tree will now be overwritten and referring to operators in - * duplicate tree. For eg: POFilter inputs will refer to the duplicate tree's - * POValueInputTez even though it is connected to a original split tree's POValueInputTez - */ - private void resetInputsOfPredecessors(TezOperPlan plan, TezOperator op) { - Stack stack = new Stack(); - stack.push(op); - while (!stack.isEmpty()) { - op = stack.pop(); - List predecessors = plan.getPredecessors(op); - if (predecessors != null) { - for (TezOperator pred : predecessors) { - resetInputs(pred.plan, pred.plan.getLeaves()); - if (!pred.isSplitOperator()) { - stack.push(pred); - } - } - } - } - } - - private void resetInputs(PhysicalPlan plan, List ops) { - for (PhysicalOperator op : ops) { - List preds = plan.getPredecessors(op); - if (preds != null) { - for (PhysicalOperator pred : preds) { - pred.setInputs(plan.getPredecessors(pred)); - resetInputs(plan, plan.getPredecessors(pred)); - } - } - } - } - - /** * Merges the TezOperators in the compiledInputs into a single merged * TezOperator. * @@ -702,7 +581,7 @@ public class TezCompiler extends PhyPlan @Override public void visitDistinct(PODistinct op) throws VisitorException { try { - POLocalRearrange lr = localRearrangeFactory.create(); + POLocalRearrangeTez lr = localRearrangeFactory.create(); lr.setDistinct(true); lr.setAlias(op.getAlias()); curTezOp.plan.addAsLeaf(lr); @@ -751,23 +630,6 @@ public class TezCompiler extends PhyPlan @Override public void visitFilter(POFilter op) throws VisitorException { try { - if (curTezOp.isSplitSubPlan() || curTezOp.getSplitParent() != null) { - // Do not add the filter. Refer NoopFilterRemover.java of MR - PhysicalPlan filterPlan = op.getPlan(); - if (filterPlan.size() == 1) { - PhysicalOperator fp = filterPlan.getRoots().get(0); - if (fp instanceof ConstantExpression) { - ConstantExpression exp = (ConstantExpression)fp; - Object value = exp.getValue(); - if (value instanceof Boolean) { - Boolean filterValue = (Boolean)value; - if (filterValue) { - return; - } - } - } - } - } nonBlocking(op); processUDFs(op.getPlan()); phyToTezOpMap.put(op, curTezOp); @@ -795,7 +657,7 @@ public class TezCompiler extends PhyPlan lr.setOutputKey(curTezOp.getOperatorKey().toString()); tezOp.plan.addAsLeaf(lr); - TezEdgeDescriptor edge = handleSplitAndConnect(tezPlan, tezOp, curTezOp); + TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, tezOp, curTezOp); if (tezOp.getSplitOperatorKey() != null) { inputKeys.add(tezOp.getSplitOperatorKey().toString()); } else { @@ -1983,7 +1845,7 @@ public class TezCompiler extends PhyPlan Pair quantJobParallelismPair = getOrderbySamplingAggregationJob(op, rp); TezOperator[] sortOpers = getSortJobs(prevOper, lr, op, keyType, fields); - TezEdgeDescriptor edge = handleSplitAndConnect(tezPlan, prevOper, sortOpers[0]); + TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, prevOper, sortOpers[0]); // Use 1-1 edge edge.dataMovementType = DataMovementType.ONE_TO_ONE; @@ -2003,7 +1865,7 @@ public class TezCompiler extends PhyPlan sortOpers[1].setRequestedParallelism(quantJobParallelismPair.second); */ - handleSplitAndConnect(tezPlan, prevOper, quantJobParallelismPair.first, false); + TezCompilerUtil.connect(tezPlan, prevOper, quantJobParallelismPair.first); lr.setOutputKey(sortOpers[0].getOperatorKey().toString()); lrSample.setOutputKey(quantJobParallelismPair.first.getOperatorKey().toString()); @@ -2036,66 +1898,30 @@ public class TezCompiler extends PhyPlan @Override public void visitSplit(POSplit op) throws VisitorException { try { - boolean isMultiQuery = "true".equalsIgnoreCase(pigContext - .getProperties().getProperty(PigConfiguration.OPT_MULTIQUERY, "true")); - - if (isMultiQuery) { - if (splitsSeen.containsKey(op.getOperatorKey())) { - // Since the plan for this split already exists in the tez plan, - // discard the hierarchy or tez operators we constructed so far - // till we encountered the split in this tree - removeDupOpTreeOfSplit(tezPlan, curTezOp, isMultiQuery); - curTezOp = startNew(op.getOperatorKey()); - } else { - nonBlocking(op); - if(curTezOp.isSplitSubPlan()) { - // Split followed by another split - // Set inputs to null as POSplit will attach input to roots - for (PhysicalOperator root : curTezOp.plan.getRoots()) { - root.setInputs(null); - } - TezOperator splitOp = splitsSeen.get(curTezOp.getSplitOperatorKey()); - POSplit split = findPOSplit(splitOp, curTezOp.getSplitOperatorKey()); - split.addPlan(curTezOp.plan); - addSubPlanPropertiesToParent(splitOp, curTezOp); - splitsSeen.put(op.getOperatorKey(), splitOp); - phyToTezOpMap.put(op, splitOp); - } else { - curTezOp.setSplitOperator(true); - splitsSeen.put(op.getOperatorKey(), curTezOp); - phyToTezOpMap.put(op, curTezOp); - } - curTezOp = startNew(op.getOperatorKey()); - } + TezOperator splitOp = curTezOp; + POValueOutputTez output = null; + if (splitsSeen.containsKey(op.getOperatorKey())) { + splitOp = splitsSeen.get(op.getOperatorKey()); + output = (POValueOutputTez)splitOp.plan.getLeaves().get(0); } else { - TezOperator splitOp = curTezOp; - POValueOutputTez output = null; - if (splitsSeen.containsKey(op.getOperatorKey())) { - removeDupOpTreeOfSplit(tezPlan, curTezOp, isMultiQuery); - splitOp = splitsSeen.get(op.getOperatorKey()); - resetInputsOfPredecessors(tezPlan, splitOp); - output = (POValueOutputTez)splitOp.plan.getLeaves().get(0); - } else { - splitOp.setSplitOperator(true); - splitsSeen.put(op.getOperatorKey(), splitOp); - phyToTezOpMap.put(op, splitOp); - output = new POValueOutputTez(OperatorKey.genOpKey(scope)); - splitOp.plan.addAsLeaf(output); - } - curTezOp = getTezOp(); - curTezOp.setSplitParent(splitOp.getOperatorKey()); - tezPlan.add(curTezOp); - output.addOutputKey(curTezOp.getOperatorKey().toString()); - TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, splitOp, curTezOp); - //TODO shared edge once support is available in Tez - edge.dataMovementType = DataMovementType.ONE_TO_ONE; - edge.outputClassName = OnFileUnorderedKVOutput.class.getName(); - edge.inputClassName = ShuffledUnorderedKVInput.class.getName(); - curTezOp.setRequestedParallelismByReference(splitOp); - POValueInputTez input = new POValueInputTez(OperatorKey.genOpKey(scope)); - input.setInputKey(splitOp.getOperatorKey().toString()); - curTezOp.plan.addAsLeaf(input); + splitsSeen.put(op.getOperatorKey(), splitOp); + splitOp.setSplitter(true); + phyToTezOpMap.put(op, splitOp); + output = new POValueOutputTez(OperatorKey.genOpKey(scope)); + splitOp.plan.addAsLeaf(output); } + curTezOp = getTezOp(); + tezPlan.add(curTezOp); + output.addOutputKey(curTezOp.getOperatorKey().toString()); + TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, splitOp, curTezOp); + //TODO shared edge once support is available in Tez + edge.dataMovementType = DataMovementType.ONE_TO_ONE; + edge.outputClassName = OnFileUnorderedKVOutput.class.getName(); + edge.inputClassName = ShuffledUnorderedKVInput.class.getName(); + curTezOp.setRequestedParallelismByReference(splitOp); + POValueInputTez input = new POValueInputTez(OperatorKey.genOpKey(scope)); + input.setInputKey(splitOp.getOperatorKey().toString()); + curTezOp.plan.addAsLeaf(input); } catch (Exception e) { int errCode = 2034; String msg = "Error compiling operator " Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1568901&r1=1568900&r2=1568901&view=diff ============================================================================== --- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original) +++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Mon Feb 17 08:26:52 2014 @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -200,6 +201,24 @@ public class TezDagBuilder extends TezOp break; } } + + List valueOutputs = PlanHelper.getPhysicalOperators(from.plan, + POValueOutputTez.class); + if (!valueOutputs.isEmpty()) { + POValueOutputTez valueOutput = valueOutputs.get(0); + for (String outputKey : valueOutput.outputKeys) { + if (outputKey.equals(to.getOperatorKey().toString())) { + conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, + POValueOutputTez.EmptyWritable.class.getName()); + conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, + BinSedesTuple.class.getName()); + conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, + POValueOutputTez.EmptyWritable.class.getName()); + conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS, + BinSedesTuple.class.getName()); + } + } + } conf.setBoolean("mapred.mapper.new-api", true); conf.set("pig.pigContext", ObjectSerializer.serialize(pc)); @@ -240,17 +259,6 @@ public class TezDagBuilder extends TezOp edge.partitionerClass.getName()); } - if (from.plan.getLeaves().get(0) instanceof POValueOutputTez) { - conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, - POValueOutputTez.EmptyWritable.class.getName()); - conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, - BinSedesTuple.class.getName()); - conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, - POValueOutputTez.EmptyWritable.class.getName()); - conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS, - BinSedesTuple.class.getName()); - } - MRToTezHelper.convertMRToTezRuntimeConf(conf, globalConf); in.setUserPayload(TezUtils.createUserPayloadFromConf(conf)); @@ -367,13 +375,22 @@ public class TezDagBuilder extends TezOp // Set input keys for POShuffleTezLoad. This is used to identify // the inputs that are attached to the POShuffleTezLoad in the // backend. + Map localRearrangeMap = new TreeMap(); for (TezOperator pred : mPlan.getPredecessors(tezOp)) { if (tezOp.sampleOperator != null && tezOp.sampleOperator == pred) { // skip sample vertex input } else { - newPack.addInputKey(pred.getOperatorKey().toString()); + LinkedList lrs = PlanHelper.getPhysicalOperators(pred.plan, POLocalRearrangeTez.class); + for (POLocalRearrangeTez lr : lrs) { + if (lr.getOutputKey().equals(tezOp.getOperatorKey().toString())) { + localRearrangeMap.put((int)lr.getIndex(), pred.getOperatorKey().toString()); + } + } } } + for (Map.Entry entry : localRearrangeMap.entrySet()) { + newPack.addInputKey(entry.getValue()); + } if (succsList != null) { for (PhysicalOperator succs : succsList) { @@ -383,8 +400,22 @@ public class TezDagBuilder extends TezOp setIntermediateInputKeyValue(pack.getPkgr().getKeyType(), payloadConf, tezOp); + } else if (roots.size() == 1 && roots.get(0) instanceof POIdentityInOutTez) { + POIdentityInOutTez identityInOut = (POIdentityInOutTez) roots.get(0); + // TODO Need to fix multiple input key mapping + TezOperator identityInOutPred = null; + for (TezOperator pred : mPlan.getPredecessors(tezOp)) { + if (!pred.isSampler()) { + identityInOutPred = pred; + break; + } + } + identityInOut.setInputKey(identityInOutPred.getOperatorKey().toString()); + } else if (roots.size() == 1 && roots.get(0) instanceof POValueInputTez) { + POValueInputTez valueInput = (POValueInputTez) roots.get(0); + TezOperator pred = mPlan.getPredecessors(tezOp).get(0); + valueInput.setInputKey(pred.getOperatorKey().toString()); } - payloadConf.setClass("mapreduce.outputformat.class", PigOutputFormat.class, OutputFormat.class); Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1568901&r1=1568900&r2=1568901&view=diff ============================================================================== --- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original) +++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Mon Feb 17 08:26:52 2014 @@ -168,6 +168,16 @@ public class TezLauncher extends Launche SecondaryKeyOptimizerTez skOptimizer = new SecondaryKeyOptimizerTez(tezPlan); skOptimizer.visit(); } + + boolean isMultiQuery = + "true".equalsIgnoreCase(pc.getProperties().getProperty(PigConfiguration.OPT_MULTIQUERY, "true")); + + if (isMultiQuery) { + // reduces the number of TezOpers in the Tez plan generated + // by multi-query (multi-store) script. + MultiQueryOptimizerTez mqOptimizer = new MultiQueryOptimizerTez(tezPlan); + mqOptimizer.visit(); + } // Run AccumulatorOptimizer on Tez plan boolean isAccum = Boolean.parseBoolean(pc.getProperties().getProperty( Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1568901&r1=1568900&r2=1568901&view=diff ============================================================================== --- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java (original) +++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java Mon Feb 17 08:26:52 2014 @@ -71,12 +71,8 @@ public class TezOperator extends Operato // Only POStore or POLocalRearrange leaf can be a sub-plan of POSplit private OperatorKey splitOperatorKey = null; - // This indicates that this TezOper has POSplit as a parent. - // This is the case where multi-query is turned off. - private OperatorKey splitParent = null; - // This indicates that this TezOper is a split operator - private boolean isSplitOper; + private boolean splitter; // Indicates that the plan creation is complete boolean closed = false; @@ -179,24 +175,12 @@ public class TezOperator extends Operato this.splitOperatorKey = splitOperatorKey; } - public boolean isSplitSubPlan() { - return splitOperatorKey != null; - } - - public OperatorKey getSplitParent() { - return splitParent; - } - - public void setSplitParent(OperatorKey splitParent) { - this.splitParent = splitParent; - } - - public boolean isSplitOperator() { - return isSplitOper; + public void setSplitter(boolean spl) { + splitter = spl; } - public void setSplitOperator(boolean isSplitOperator) { - this.isSplitOper = isSplitOperator; + public boolean isSplitter() { + return splitter; } public boolean isClosed() { Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1568901&r1=1568900&r2=1568901&view=diff ============================================================================== --- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java (original) +++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java Mon Feb 17 08:26:52 2014 @@ -3,9 +3,11 @@ package org.apache.pig.backend.hadoop.ex import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map.Entry; import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; @@ -76,12 +78,31 @@ public class TezCompilerUtil { static public TezEdgeDescriptor connect(TezOperPlan plan, TezOperator from, TezOperator to) throws PlanException { plan.connect(from, to); + PhysicalOperator leaf = from.plan.getLeaves().get(0); + // It could be POStoreTez incase of sampling job in order by + if (leaf instanceof POLocalRearrangeTez) { + POLocalRearrangeTez lr = (POLocalRearrangeTez) leaf; + lr.setOutputKey(to.getOperatorKey().toString()); + } // Add edge descriptors to old and new operators TezEdgeDescriptor edge = new TezEdgeDescriptor(); to.inEdges.put(from.getOperatorKey(), edge); from.outEdges.put(to.getOperatorKey(), edge); return edge; } + + static public void connect(TezOperPlan plan, TezOperator from, TezOperator to, TezEdgeDescriptor edge) throws PlanException { + plan.connect(from, to); + PhysicalOperator leaf = from.plan.getLeaves().get(0); + // It could be POStoreTez incase of sampling job in order by + if (leaf instanceof POLocalRearrangeTez) { + POLocalRearrangeTez lr = (POLocalRearrangeTez) leaf; + lr.setOutputKey(to.getOperatorKey().toString()); + } + // Add edge descriptors to old and new operators + to.inEdges.put(from.getOperatorKey(), edge); + from.outEdges.put(to.getOperatorKey(), edge); + } static public POForEach getForEach(POProject project, int rp, String scope, NodeIdGenerator nig) { PhysicalPlan forEachPlan = new PhysicalPlan(); Added: pig/branches/tez/src/org/apache/pig/impl/builtin/ReadScalarsTez.java URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/builtin/ReadScalarsTez.java?rev=1568901&view=auto ============================================================================== --- pig/branches/tez/src/org/apache/pig/impl/builtin/ReadScalarsTez.java (added) +++ pig/branches/tez/src/org/apache/pig/impl/builtin/ReadScalarsTez.java Mon Feb 17 08:26:52 2014 @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.builtin; + +import java.io.IOException; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.EvalFunc; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.tez.TezLoad; +import org.apache.pig.data.Tuple; +import org.apache.tez.runtime.api.LogicalInput; +import org.apache.tez.runtime.library.broadcast.input.BroadcastKVReader; + +public class ReadScalarsTez extends EvalFunc implements TezLoad { + private static final Log LOG = LogFactory.getLog(ReadScalarsTez.class); + String inputKey; + Tuple t; + public ReadScalarsTez(String inputKey) { + this.inputKey = inputKey; + } + public void attachInputs(Map inputs, Configuration conf) + throws ExecException { + LogicalInput input = inputs.get(inputKey); + if (input == null) { + throw new ExecException("Input from vertex " + inputKey + " is missing"); + } + try { + BroadcastKVReader reader = (BroadcastKVReader)input.getReader(); + reader.next(); + t = (Tuple)reader.getCurrentValue(); + LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader); + } catch (Exception e) { + throw new ExecException(e); + } + } + + @Override + public Object exec(Tuple input) throws IOException { + int pos = (Integer)input.get(0); + Object obj = t.get(pos); + return obj; + } +} Modified: pig/branches/tez/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java?rev=1568901&r1=1568900&r2=1568901&view=diff ============================================================================== --- pig/branches/tez/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java (original) +++ pig/branches/tez/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java Mon Feb 17 08:26:52 2014 @@ -98,9 +98,9 @@ public class ScalarVisitor extends AllEx store.setTmpStore(true); lp.add( store ); lp.connect( refOp, store ); - expr.setImplicitReferencedOperator(store); } - + + expr.setImplicitReferencedOperator(store); filenameConst.setValue( store.getOutputSpec().getFileName() ); if( lp.getSoftLinkSuccessors( store ) == null ||