Return-Path: Delivered-To: apmail-hadoop-pig-commits-archive@www.apache.org Received: (qmail 21959 invoked from network); 4 Aug 2010 17:48:34 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 4 Aug 2010 17:48:34 -0000 Received: (qmail 88531 invoked by uid 500); 4 Aug 2010 17:48:34 -0000 Delivered-To: apmail-hadoop-pig-commits-archive@hadoop.apache.org Received: (qmail 88512 invoked by uid 500); 4 Aug 2010 17:48:34 -0000 Mailing-List: contact pig-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: pig-dev@hadoop.apache.org Delivered-To: mailing list pig-commits@hadoop.apache.org Received: (qmail 88505 invoked by uid 500); 4 Aug 2010 17:48:34 -0000 Delivered-To: apmail-incubator-pig-commits@incubator.apache.org Received: (qmail 88502 invoked by uid 99); 4 Aug 2010 17:48:34 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Aug 2010 17:48:34 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Aug 2010 17:48:30 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 2428F2388A90; Wed, 4 Aug 2010 17:46:50 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r982345 [6/13] - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/experimental/ src/org/apache/pig/newplan/ src/org/apache/pig/newplan/logical/ src/org/apache/pig/newplan/log... Date: Wed, 04 Aug 2010 17:46:48 -0000 To: pig-commits@incubator.apache.org From: daijy@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100804174650.2428F2388A90@eris.apache.org> Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=982345&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Wed Aug 4 17:46:42 2010 @@ -0,0 +1,1327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.newplan.logical.relational; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import org.apache.pig.FuncSpec; +import org.apache.pig.PigException; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogicalToPhysicalTranslatorException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.builtin.GFCross; +import org.apache.pig.impl.io.FileLocalizer; +import org.apache.pig.impl.io.FileSpec; +import org.apache.pig.impl.io.InterStorage; +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; +import org.apache.pig.impl.plan.NodeIdGenerator; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.PlanException; +import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.util.CompilerUtils; +import org.apache.pig.impl.util.LinkedMultiMap; +import org.apache.pig.impl.util.MultiMap; +import org.apache.pig.newplan.DependencyOrderWalker; +import org.apache.pig.newplan.Operator; +import org.apache.pig.newplan.OperatorPlan; +import org.apache.pig.newplan.PlanWalker; +import org.apache.pig.newplan.ReverseDependencyOrderWalker; +import org.apache.pig.newplan.SubtreeDependencyOrderWalker; +import org.apache.pig.newplan.logical.expression.ExpToPhyTranslationVisitor; +import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan; +import org.apache.pig.newplan.logical.expression.ProjectExpression; +import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema; + +public class LogToPhyTranslationVisitor extends LogicalRelationalNodesVisitor { + + public LogToPhyTranslationVisitor(OperatorPlan plan) { + super(plan, new DependencyOrderWalker(plan)); + currentPlan = new PhysicalPlan(); + logToPhyMap = new HashMap(); + currentPlans = new Stack(); + } + + protected Map logToPhyMap; + + protected Stack currentPlans; + + protected PhysicalPlan currentPlan; + + protected NodeIdGenerator nodeGen = NodeIdGenerator.getGenerator(); + + protected PigContext pc; + + public void setPigContext(PigContext pc) { + this.pc = pc; + } + + public PhysicalPlan getPhysicalPlan() { + return currentPlan; + } + + @Override + public void visit(LOLoad loLoad) throws IOException { + String scope = DEFAULT_SCOPE; +// System.err.println("Entering Load"); + // The last parameter here is set to true as we assume all files are + // splittable due to LoadStore Refactor + POLoad load = new POLoad(new OperatorKey(scope, nodeGen + .getNextNodeId(scope)), loLoad.getLoadFunc()); + load.setAlias(loLoad.getAlias()); + load.setLFile(loLoad.getFileSpec()); + load.setPc(pc); + load.setResultType(DataType.BAG); + load.setSignature(loLoad.getAlias()); + currentPlan.add(load); + logToPhyMap.put(loLoad, load); + + // Load is typically a root operator, but in the multiquery + // case it might have a store as a predecessor. + List op = loLoad.getPlan().getPredecessors(loLoad); + PhysicalOperator from; + + if(op != null) { + from = logToPhyMap.get(op.get(0)); + try { + currentPlan.connect(from, load); + } catch (PlanException e) { + int errCode = 2015; + String msg = "Invalid physical operators in the physical plan" ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); + } + } +// System.err.println("Exiting Load"); + } + + @Override + public void visit(LOFilter filter) throws IOException { + String scope = DEFAULT_SCOPE; +// System.err.println("Entering Filter"); + POFilter poFilter = new POFilter(new OperatorKey(scope, nodeGen + .getNextNodeId(scope)), filter.getRequestedParallelisam()); + poFilter.setAlias(filter.getAlias()); + poFilter.setResultType(DataType.BAG); + currentPlan.add(poFilter); + logToPhyMap.put(filter, poFilter); + currentPlans.push(currentPlan); + + currentPlan = new PhysicalPlan(); + +// PlanWalker childWalker = currentWalker +// .spawnChildWalker(filter.getFilterPlan()); + PlanWalker childWalker = new ReverseDependencyOrderWalker(filter.getFilterPlan()); + pushWalker(childWalker); + //currentWalker.walk(this); + currentWalker.walk( + new ExpToPhyTranslationVisitor( currentWalker.getPlan(), + childWalker, filter, currentPlan, logToPhyMap ) ); + popWalker(); + + poFilter.setPlan(currentPlan); + currentPlan = currentPlans.pop(); + + List op = filter.getPlan().getPredecessors(filter); + + PhysicalOperator from; + if(op != null) { + from = logToPhyMap.get(op.get(0)); + } else { + int errCode = 2051; + String msg = "Did not find a predecessor for Filter." ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG); + } + + try { + currentPlan.connect(from, poFilter); + } catch (PlanException e) { + int errCode = 2015; + String msg = "Invalid physical operators in the physical plan" ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); + } +// System.err.println("Exiting Filter"); + } + + @Override + public void visit(LOSort sort) throws IOException { + String scope = DEFAULT_SCOPE; + List logPlans = sort.getSortColPlans(); + List sortPlans = new ArrayList(logPlans.size()); + + // convert all the logical expression plans to physical expression plans + currentPlans.push(currentPlan); + for (LogicalExpressionPlan plan : logPlans) { + currentPlan = new PhysicalPlan(); + PlanWalker childWalker = new ReverseDependencyOrderWalker(plan); + pushWalker(childWalker); + childWalker.walk(new ExpToPhyTranslationVisitor( currentWalker.getPlan(), + childWalker, sort, currentPlan, logToPhyMap )); + sortPlans.add(currentPlan); + popWalker(); + } + currentPlan = currentPlans.pop(); + + // get the physical operator for sort + POSort poSort; + if (sort.getUserFunc() == null) { + poSort = new POSort(new OperatorKey(scope, nodeGen + .getNextNodeId(scope)), sort.getRequestedParallelisam(), null, + sortPlans, sort.getAscendingCols(), null); + } else { + POUserComparisonFunc comparator = new POUserComparisonFunc(new OperatorKey( + scope, nodeGen.getNextNodeId(scope)), sort + .getRequestedParallelisam(), null, sort.getUserFunc()); + poSort = new POSort(new OperatorKey(scope, nodeGen + .getNextNodeId(scope)), sort.getRequestedParallelisam(), null, + sortPlans, sort.getAscendingCols(), comparator); + } + poSort.setAlias(sort.getAlias()); + poSort.setLimit(sort.getLimit()); + // sort.setRequestedParallelism(s.getType()); + logToPhyMap.put(sort, poSort); + currentPlan.add(poSort); + List op = sort.getPlan().getPredecessors(sort); + PhysicalOperator from; + + if(op != null) { + from = logToPhyMap.get(op.get(0)); + } else { + int errCode = 2051; + String msg = "Did not find a predecessor for Sort." ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG); + } + + try { + currentPlan.connect(from, poSort); + } catch (PlanException e) { + int errCode = 2015; + String msg = "Invalid physical operators in the physical plan" ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); + } + + poSort.setResultType(DataType.BAG); + try { + poSort.setSortInfo(sort.getSortInfo()); + } catch (FrontendException e) { + throw new LogicalToPhysicalTranslatorException(e); + } + } + + @Override + public void visit(LOCross cross) throws IOException { + String scope = DEFAULT_SCOPE; + List inputs = cross.getPlan().getPredecessors(cross); + + POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey( + scope, nodeGen.getNextNodeId(scope)), cross + .getRequestedParallelisam()); + poGlobal.setAlias(cross.getAlias()); + POPackage poPackage = new POPackage(new OperatorKey(scope, nodeGen + .getNextNodeId(scope)), cross.getRequestedParallelisam()); + poGlobal.setAlias(cross.getAlias()); + currentPlan.add(poGlobal); + currentPlan.add(poPackage); + + int count = 0; + + try { + currentPlan.connect(poGlobal, poPackage); + List flattenLst = Arrays.asList(true, true); + + for (Operator op : inputs) { + PhysicalPlan fep1 = new PhysicalPlan(); + ConstantExpression ce1 = new ConstantExpression(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelisam()); + ce1.setValue(inputs.size()); + ce1.setResultType(DataType.INTEGER); + fep1.add(ce1); + + ConstantExpression ce2 = new ConstantExpression(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelisam()); + ce2.setValue(count); + ce2.setResultType(DataType.INTEGER); + fep1.add(ce2); + /*Tuple ce1val = TupleFactory.getInstance().newTuple(2); + ce1val.set(0,inputs.size()); + ce1val.set(1,count); + ce1.setValue(ce1val); + ce1.setResultType(DataType.TUPLE);*/ + + + + POUserFunc gfc = new POUserFunc(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelisam(), Arrays.asList((PhysicalOperator)ce1,(PhysicalOperator)ce2), new FuncSpec(GFCross.class.getName())); + gfc.setAlias(cross.getAlias()); + gfc.setResultType(DataType.BAG); + fep1.addAsLeaf(gfc); + gfc.setInputs(Arrays.asList((PhysicalOperator)ce1,(PhysicalOperator)ce2)); + /*fep1.add(gfc); + fep1.connect(ce1, gfc); + fep1.connect(ce2, gfc);*/ + + PhysicalPlan fep2 = new PhysicalPlan(); + POProject feproj = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelisam()); + feproj.setAlias(cross.getAlias()); + feproj.setResultType(DataType.TUPLE); + feproj.setStar(true); + feproj.setOverloaded(false); + fep2.add(feproj); + List fePlans = Arrays.asList(fep1, fep2); + + POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelisam(), fePlans, flattenLst ); + fe.setAlias(cross.getAlias()); + currentPlan.add(fe); + currentPlan.connect(logToPhyMap.get(op), fe); + + POLocalRearrange physOp = new POLocalRearrange(new OperatorKey( + scope, nodeGen.getNextNodeId(scope)), cross + .getRequestedParallelisam()); + physOp.setAlias(cross.getAlias()); + List lrPlans = new ArrayList(); + for(int i=0;i fePlans = new ArrayList(); + List flattenLst = new ArrayList(); + for(int i=1;i<=count;i++){ + PhysicalPlan fep1 = new PhysicalPlan(); + POProject feproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelisam(), i); + feproj1.setAlias(cross.getAlias()); + feproj1.setResultType(DataType.BAG); + feproj1.setOverloaded(false); + fep1.add(feproj1); + fePlans.add(fep1); + flattenLst.add(true); + } + + POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelisam(), fePlans, flattenLst ); + fe.setAlias(cross.getAlias()); + currentPlan.add(fe); + try{ + currentPlan.connect(poPackage, fe); + }catch (PlanException e1) { + int errCode = 2015; + String msg = "Invalid physical operators in the physical plan" ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e1); + } + logToPhyMap.put(cross, fe); + } + + @Override + public void visit(LOStream stream) throws IOException { + String scope = DEFAULT_SCOPE; + POStream poStream = new POStream(new OperatorKey(scope, nodeGen + .getNextNodeId(scope)), stream.getExecutableManager(), + stream.getStreamingCommand(), this.pc.getProperties()); + poStream.setAlias(stream.getAlias()); + currentPlan.add(poStream); + logToPhyMap.put(stream, poStream); + + List op = stream.getPlan().getPredecessors(stream); + + PhysicalOperator from; + if(op != null) { + from = logToPhyMap.get(op.get(0)); + } else { + int errCode = 2051; + String msg = "Did not find a predecessor for Stream." ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG); + } + + try { + currentPlan.connect(from, poStream); + } catch (PlanException e) { + int errCode = 2015; + String msg = "Invalid physical operators in the physical plan" ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visit(LOInnerLoad load) throws IOException { + String scope = DEFAULT_SCOPE; + + POProject exprOp = new POProject(new OperatorKey(scope, nodeGen + .getNextNodeId(scope))); + + LogicalSchema s = load.getSchema(); + if (s != null) { + if (s.getField(0).type == DataType.BAG) { + Operator succ = load.getPlan().getSuccessors(load).get(0); + if (succ instanceof LOGenerate) { + exprOp.setResultType(DataType.BAG); + } + else { + exprOp.setResultType(DataType.TUPLE); + } + } + else { + exprOp.setResultType(s.getField(0).type); + } + } + exprOp.setColumn(load.getColNum()); + exprOp.setStar(load.getProjection().isProjectStar()); + + // set input to POProject to the predecessor of foreach + + logToPhyMap.put(load, exprOp); + currentPlan.add(exprOp); + } + + @Override + public void visit(LOForEach foreach) throws IOException { + String scope = DEFAULT_SCOPE; + + List innerPlans = new ArrayList(); + + org.apache.pig.newplan.logical.relational.LogicalPlan inner = foreach.getInnerPlan(); + LOGenerate gen = (LOGenerate)inner.getSinks().get(0); + + List exps = gen.getOutputPlans(); + List preds = inner.getPredecessors(gen); + + currentPlans.push(currentPlan); + + // we need to translate each predecessor of LOGenerate into a physical plan. + // The physical plan should contain the expression plan for this predecessor plus + // the subtree starting with this predecessor + for (int i=0; i leaves = exps.get(i).getSinks(); + for(Operator l: leaves) { + PhysicalOperator op = logToPhyMap.get(l); + if (l instanceof ProjectExpression ) { + int input = ((ProjectExpression)l).getInputNum(); + + // for each sink projection, get its input logical plan and translate it + Operator pred = preds.get(input); + childWalker = new SubtreeDependencyOrderWalker(inner, pred); + pushWalker(childWalker); + childWalker.walk(this); + popWalker(); + + // get the physical operator of the leaf of input logical plan + PhysicalOperator leaf = logToPhyMap.get(pred); + + if (pred instanceof LOInnerLoad) { + // if predecessor is only an LOInnerLoad, remove the project that + // comes from LOInnerLoad and change the column of project that + // comes from expression plan + currentPlan.remove(leaf); + logToPhyMap.remove(pred); + + ((POProject)op).setColumn( ((POProject)leaf).getColumn() ); + ((POProject)op).setStar(((POProject)leaf).isStar()); + + }else{ + currentPlan.connect(leaf, op); + } + } + } + innerPlans.add(currentPlan); + } + + currentPlan = currentPlans.pop(); + + // PhysicalOperator poGen = new POGenerate(new OperatorKey("", + // r.nextLong()), inputs, toBeFlattened); + boolean[] flatten = gen.getFlattenFlags(); + List flattenList = new ArrayList(); + for(boolean fl: flatten) { + flattenList.add(fl); + } + POForEach poFE = new POForEach(new OperatorKey(scope, nodeGen + .getNextNodeId(scope)), foreach.getRequestedParallelisam(), innerPlans, flattenList); + poFE.setAlias(foreach.getAlias()); + poFE.setResultType(DataType.BAG); + logToPhyMap.put(foreach, poFE); + currentPlan.add(poFE); + + // generate cannot have multiple inputs + List op = foreach.getPlan().getPredecessors(foreach); + + // generate may not have any predecessors + if (op == null) + return; + + PhysicalOperator from = logToPhyMap.get(op.get(0)); + try { + currentPlan.connect(from, poFE); + } catch (Exception e) { + int errCode = 2015; + String msg = "Invalid physical operators in the physical plan" ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); + } + + } + + + /** + * This function translates the new LogicalSchema into old Schema format required + * by PhysicalOperators + * @param lSchema LogicalSchema to be converted to Schema + * @return Schema that is converted from LogicalSchema + * @throws FrontendException + */ + private static Schema translateSchema( LogicalSchema lSchema ) throws FrontendException { + if( lSchema == null ) { + return null; + } + Schema schema = new Schema(); + List lFields = lSchema.getFields(); + for( LogicalFieldSchema lField : lFields ) { + FieldSchema field = new FieldSchema( lField.alias, translateSchema(lField.schema),lField.type ); + field.canonicalName = ((Long)lField.uid).toString(); + schema.add(field); + } + return schema; + } + + /** + * This function takes in a List of LogicalExpressionPlan and converts them to + * a list of PhysicalPlans + * @param plans + * @return + * @throws IOException + */ + private List translateExpressionPlans(LogicalRelationalOperator loj, + List plans ) throws IOException { + List exprPlans = new ArrayList(); + if( plans == null || plans.size() == 0 ) { + return exprPlans; + } + + // Save the current plan onto stack + currentPlans.push(currentPlan); + + for( LogicalExpressionPlan lp : plans ) { + currentPlan = new PhysicalPlan(); + + // We spawn a new Dependency Walker and use it + // PlanWalker childWalker = currentWalker.spawnChildWalker(lp); + PlanWalker childWalker = new ReverseDependencyOrderWalker(lp); + + // Save the old walker and use childWalker as current Walker + pushWalker(childWalker); + + // We create a new ExpToPhyTranslationVisitor to walk the ExpressionPlan + currentWalker.walk( + new ExpToPhyTranslationVisitor( + currentWalker.getPlan(), + childWalker, loj, currentPlan, logToPhyMap ) ); + + exprPlans.add(currentPlan); + popWalker(); + } + + // Pop the current plan back out + currentPlan = currentPlans.pop(); + + return exprPlans; + } + + @Override + public void visit(LOStore loStore) throws IOException { + String scope = DEFAULT_SCOPE; +// System.err.println("Entering Store"); + POStore store = new POStore(new OperatorKey(scope, nodeGen + .getNextNodeId(scope))); + store.setAlias(((LogicalRelationalOperator)loStore.getPlan(). + getPredecessors(loStore).get(0)).getAlias()); + store.setSFile(loStore.getOutputSpec()); + // TODO Implement this + //store.setInputSpec(loStore.getInputSpec()); +// try { + // create a new schema for ourselves so that when + // we serialize we are not serializing objects that + // contain the schema - apparently Java tries to + // serialize the object containing the schema if + // we are trying to serialize the schema reference in + // the containing object. The schema here will be serialized + // in JobControlCompiler + store.setSchema(translateSchema( loStore.getSchema() )); +// } catch (FrontendException e1) { +// int errorCode = 1060; +// String message = "Cannot resolve Store output schema"; +// throw new VisitorException(message, errorCode, PigException.BUG, e1); +// } + currentPlan.add(store); + + List op = loStore.getPlan().getPredecessors(loStore); + PhysicalOperator from = null; + + if(op != null) { + from = logToPhyMap.get(op.get(0)); + // TODO Implement sorting when we have a LOSort (new) and LOLimit (new) operator ready +// SortInfo sortInfo = null; +// // if store's predecessor is limit, +// // check limit's predecessor +// if(op.get(0) instanceof LOLimit) { +// op = loStore.getPlan().getPredecessors(op.get(0)); +// } +// PhysicalOperator sortPhyOp = logToPhyMap.get(op.get(0)); +// // if this predecessor is a sort, get +// // the sort info. +// if(op.get(0) instanceof LOSort) { +// sortInfo = ((POSort)sortPhyOp).getSortInfo(); +// } +// store.setSortInfo(sortInfo); +// } else { +// int errCode = 2051; +// String msg = "Did not find a predecessor for Store." ; +// throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG); + } + + try { + currentPlan.connect(from, store); + } catch (PlanException e) { + int errCode = 2015; + String msg = "Invalid physical operators in the physical plan" ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); + } + logToPhyMap.put(loStore, store); +// System.err.println("Exiting Store"); + } + + @Override + public void visit( LOCogroup cg ) throws IOException { + if (cg.getGroupType() == LOCogroup.GROUPTYPE.COLLECTED) { + translateCollectedCogroup(cg); + } else { + translateRegularCogroup(cg); + } + } + + private void translateRegularCogroup(LOCogroup cg) throws IOException { + List preds = plan.getPredecessors(cg); + + POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey( + DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)), cg.getRequestedParallelisam() ); + poGlobal.setAlias(cg.getAlias()); + POPackage poPackage = new POPackage(new OperatorKey(DEFAULT_SCOPE, nodeGen + .getNextNodeId(DEFAULT_SCOPE)), cg.getRequestedParallelisam()); + poPackage.setAlias(cg.getAlias()); + currentPlan.add(poGlobal); + currentPlan.add(poPackage); + + try { + currentPlan.connect(poGlobal, poPackage); + } catch (PlanException e1) { + int errCode = 2015; + String msg = "Invalid physical operators in the physical plan" ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e1); + } + + Byte type = null; + for( int i = 0 ; i < preds.size(); i++ ) { + ArrayList exprPlans = + (ArrayList) cg.getExpressionPlans().get(i); + + POLocalRearrange physOp = new POLocalRearrange(new OperatorKey( + DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)), cg.getRequestedParallelisam() ); + physOp.setAlias(cg.getAlias()); + + List pExprPlans = translateExpressionPlans( cg, exprPlans ); + + try { + physOp.setPlans(pExprPlans); + } catch (PlanException pe) { + int errCode = 2071; + String msg = "Problem with setting up local rearrange's plans."; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, pe); + } + try { + physOp.setIndex(i); + } catch (ExecException e1) { + // int errCode = 2058; + String msg = "Unable to set index on newly create POLocalRearrange."; + throw new IOException(msg); + } + if (exprPlans.size() > 1) { + type = DataType.TUPLE; + physOp.setKeyType(type); + } else { + type = pExprPlans.get(0).getLeaves().get(0).getResultType(); + physOp.setKeyType(type); + } + physOp.setResultType(DataType.TUPLE); + + currentPlan.add(physOp); + + try { + currentPlan.connect(logToPhyMap.get(preds.get(i)), physOp); + currentPlan.connect(physOp, poGlobal); + } catch (PlanException e) { + int errCode = 2015; + String msg = "Invalid physical operators in the physical plan" ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); + } + } + + poPackage.setKeyType(type); + poPackage.setResultType(DataType.TUPLE); + poPackage.setNumInps(preds.size()); + poPackage.setInner(cg.getInner()); + logToPhyMap.put(cg, poPackage); + } + + private void translateCollectedCogroup(LOCogroup cg) throws IOException { + // can have only one input + LogicalRelationalOperator pred = (LogicalRelationalOperator) plan.getPredecessors(cg).get(0); + List exprPlans = (List) cg.getExpressionPlans().get(0); + POCollectedGroup physOp = new POCollectedGroup(new OperatorKey( + DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); + physOp.setAlias(cg.getAlias()); + List pExprPlans = translateExpressionPlans(cg, exprPlans); + + try { + physOp.setPlans(pExprPlans); + } catch (PlanException pe) { + int errCode = 2071; + String msg = "Problem with setting up map group's plans."; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, pe); + } + Byte type = null; + if (exprPlans.size() > 1) { + type = DataType.TUPLE; + physOp.setKeyType(type); + } else { + type = pExprPlans.get(0).getLeaves().get(0).getResultType(); + physOp.setKeyType(type); + } + physOp.setResultType(DataType.TUPLE); + + currentPlan.add(physOp); + + try { + currentPlan.connect(logToPhyMap.get(pred), physOp); + } catch (PlanException e) { + int errCode = 2015; + String msg = "Invalid physical operators in the physical plan" ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); + } + + logToPhyMap.put(cg, physOp); + } + + @Override + public void visit(LOJoin loj) throws IOException { + String scope = DEFAULT_SCOPE; +// System.err.println("Entering Join"); + + // List of join predicates + List inputs = plan.getPredecessors(loj); + + // mapping of inner join physical plans corresponding to inner physical operators. + MultiMap joinPlans = new LinkedMultiMap(); + + // Outer list corresponds to join predicates. Inner list is inner physical plan of each predicate. + List> ppLists = new ArrayList>(); + + // List of physical operator corresponding to join predicates. + List inp = new ArrayList(); + + // Outer list corresponds to join predicates and inner list corresponds to type of keys for each predicate. + List> keyTypes = new ArrayList>(); + + for (int i=0; i plans = (List) loj.getJoinPlan(i); + + // Convert the expression plan into physical Plan + List exprPlans = translateExpressionPlans(loj, plans); + + ppLists.add(exprPlans); + joinPlans.put(physOp, exprPlans); + + // Key could potentially be a tuple. So, we visit all exprPlans to get types of members of tuples. + List tupleKeyMemberTypes = new ArrayList(); + for(PhysicalPlan exprPlan : exprPlans) + tupleKeyMemberTypes.add(exprPlan.getLeaves().get(0).getResultType()); + keyTypes.add(tupleKeyMemberTypes); + } + + if (loj.getJoinType() == LOJoin.JOINTYPE.SKEWED) { + POSkewedJoin skj; + try { + skj = new POSkewedJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),loj.getRequestedParallelisam(), + inp, loj.getInnerFlags()); + skj.setAlias(loj.getAlias()); + skj.setJoinPlans(joinPlans); + } + catch (Exception e) { + int errCode = 2015; + String msg = "Skewed Join creation failed"; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); + } + skj.setResultType(DataType.TUPLE); + + boolean[] innerFlags = loj.getInnerFlags(); + for (int i=0; i < inputs.size(); i++) { + LogicalRelationalOperator op = (LogicalRelationalOperator) inputs.get(i); + if (!innerFlags[i]) { + try { + LogicalSchema s = op.getSchema(); + // if the schema cannot be determined + if (s == null) { + throw new FrontendException(); + } + skj.addSchema(translateSchema(s)); + } catch (FrontendException e) { + int errCode = 2015; + String msg = "Couldn't set the schema for outer join" ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); + } + } else { + // This will never be retrieved. It just guarantees that the index will be valid when + // MRCompiler is trying to read the schema + skj.addSchema(null); + } + } + + currentPlan.add(skj); + + for (Operator op : inputs) { + try { + currentPlan.connect(logToPhyMap.get(op), skj); + } catch (PlanException e) { + int errCode = 2015; + String msg = "Invalid physical operators in the physical plan" ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); + } + } + logToPhyMap.put(loj, skj); + } + else if(loj.getJoinType() == LOJoin.JOINTYPE.REPLICATED) { + + int fragment = 0; + POFRJoin pfrj; + try { + boolean []innerFlags = loj.getInnerFlags(); + boolean isLeftOuter = false; + // We dont check for bounds issue as we assume that a join + // involves atleast two inputs + isLeftOuter = !innerFlags[1]; + + Tuple nullTuple = null; + if( isLeftOuter ) { + try { + // We know that in a Left outer join its only a two way + // join, so we assume index of 1 for the right input + LogicalSchema inputSchema = ((LogicalRelationalOperator)inputs.get(1)).getSchema(); + + // We check if we have a schema before the join + if(inputSchema == null) { + int errCode = 1109; + String msg = "Input (" + ((LogicalRelationalOperator) inputs.get(1)).getAlias() + ") " + + "on which outer join is desired should have a valid schema"; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.INPUT); + } + + // Using the schema we decide the number of columns/fields + // in the nullTuple + nullTuple = TupleFactory.getInstance().newTuple(inputSchema.size()); + for(int j = 0; j < inputSchema.size(); j++) { + nullTuple.set(j, null); + } + + } catch( FrontendException e ) { + int errCode = 2104; + String msg = "Error while determining the schema of input"; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); + } + } + + pfrj = new POFRJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),loj.getRequestedParallelisam(), + inp, ppLists, keyTypes, null, fragment, isLeftOuter, nullTuple); + pfrj.setAlias(loj.getAlias()); + } catch (ExecException e1) { + int errCode = 2058; + String msg = "Unable to set index on newly create POLocalRearrange."; + throw new VisitorException(msg, errCode, PigException.BUG, e1); + } + pfrj.setResultType(DataType.TUPLE); + currentPlan.add(pfrj); + for (Operator op : inputs) { + try { + currentPlan.connect(logToPhyMap.get(op), pfrj); + } catch (PlanException e) { + int errCode = 2015; + String msg = "Invalid physical operators in the physical plan" ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); + } + } + logToPhyMap.put(loj, pfrj); + } + + else if (loj.getJoinType() == LOJoin.JOINTYPE.MERGE && validateMergeJoin(loj)) { + + POMergeJoin smj; + try { + smj = new POMergeJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),loj.getRequestedParallelisam(),inp,joinPlans,keyTypes); + } + catch (Exception e) { + int errCode = 2042; + String msg = "Merge Join creation failed"; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); + } + + smj.setResultType(DataType.TUPLE); + currentPlan.add(smj); + + for (Operator op : inputs) { + try { + currentPlan.connect(logToPhyMap.get(op), smj); + } catch (PlanException e) { + int errCode = 2015; + String msg = "Invalid physical operators in the physical plan" ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); + } + } + logToPhyMap.put(loj, smj); + return; + } + else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH){ + POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey( + scope, nodeGen.getNextNodeId(scope)), loj + .getRequestedParallelisam()); + poGlobal.setAlias(loj.getAlias()); + POPackage poPackage = new POPackage(new OperatorKey(scope, nodeGen + .getNextNodeId(scope)), loj.getRequestedParallelisam()); + poPackage.setAlias(loj.getAlias()); + currentPlan.add(poGlobal); + currentPlan.add(poPackage); + + int count = 0; + Byte type = null; + + try { + currentPlan.connect(poGlobal, poPackage); + for (int i=0; i plans = + (List) loj.getJoinPlan(i); + POLocalRearrange physOp = new POLocalRearrange(new OperatorKey( + scope, nodeGen.getNextNodeId(scope)), loj + .getRequestedParallelisam()); + List exprPlans = translateExpressionPlans(loj, plans); +// currentPlans.push(currentPlan); +// for (LogicalExpressionPlan lp : plans) { +// currentPlan = new PhysicalPlan(); +// PlanWalker childWalker = currentWalker +// .spawnChildWalker(lp); +// pushWalker(childWalker); +// //currentWalker.walk(this); +// currentWalker.walk( +// new ExpToPhyTranslationVisitor(currentWalker.getPlan(), +// childWalker) ); +// exprPlans.add(currentPlan); +// popWalker(); +// +// } +// currentPlan = currentPlans.pop(); + try { + physOp.setPlans(exprPlans); + } catch (PlanException pe) { + int errCode = 2071; + String msg = "Problem with setting up local rearrange's plans."; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, pe); + } + try { + physOp.setIndex(count++); + } catch (ExecException e1) { + int errCode = 2058; + String msg = "Unable to set index on newly create POLocalRearrange."; + throw new VisitorException(msg, errCode, PigException.BUG, e1); + } + if (plans.size() > 1) { + type = DataType.TUPLE; + physOp.setKeyType(type); + } else { + type = exprPlans.get(0).getLeaves().get(0).getResultType(); + physOp.setKeyType(type); + } + physOp.setResultType(DataType.TUPLE); + + currentPlan.add(physOp); + + try { + currentPlan.connect(logToPhyMap.get(op), physOp); + currentPlan.connect(physOp, poGlobal); + } catch (PlanException e) { + int errCode = 2015; + String msg = "Invalid physical operators in the physical plan" ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); + } + + } + + } catch (PlanException e1) { + int errCode = 2015; + String msg = "Invalid physical operators in the physical plan" ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e1); + } + + poPackage.setKeyType(type); + poPackage.setResultType(DataType.TUPLE); + poPackage.setNumInps(count); + + boolean[] innerFlags = loj.getInnerFlags(); + poPackage.setInner(innerFlags); + + List fePlans = new ArrayList(); + List flattenLst = new ArrayList(); + + try{ + for(int i=0;i< count;i++){ + PhysicalPlan fep1 = new PhysicalPlan(); + POProject feproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), + loj.getRequestedParallelisam(), i+1); //i+1 since the first column is the "group" field + feproj1.setAlias(loj.getAlias()); + feproj1.setResultType(DataType.BAG); + feproj1.setOverloaded(false); + fep1.add(feproj1); + fePlans.add(fep1); + // the parser would have marked the side + // where we need to keep empty bags on + // non matched as outer (innerFlags[i] would be + // false) + if(!(innerFlags[i])) { + Operator joinInput = inputs.get(i); + // for outer join add a bincond + // which will project nulls when bag is + // empty + updateWithEmptyBagCheck(fep1, joinInput); + } + flattenLst.add(true); + } + + POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), + loj.getRequestedParallelisam(), fePlans, flattenLst ); + fe.setAlias(loj.getAlias()); + currentPlan.add(fe); + currentPlan.connect(poPackage, fe); + logToPhyMap.put(loj, fe); + }catch (PlanException e1) { + int errCode = 2015; + String msg = "Invalid physical operators in the physical plan" ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e1); + } + } +// System.err.println("Exiting Join"); + } + + @Override + public void visit(LOUnion loUnion) throws IOException { + String scope = DEFAULT_SCOPE; + POUnion physOp = new POUnion(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), loUnion.getRequestedParallelisam()); + physOp.setAlias(loUnion.getAlias()); + currentPlan.add(physOp); + physOp.setResultType(DataType.BAG); + logToPhyMap.put(loUnion, physOp); + List ops = loUnion.getPlan().getPredecessors(loUnion); + + for (Operator l : ops) { + PhysicalOperator from = logToPhyMap.get(l); + try { + currentPlan.connect(from, physOp); + } catch (PlanException e) { + int errCode = 2015; + String msg = "Invalid physical operators in the physical plan" ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); + } + } + } + + @Override + public void visit(LODistinct loDistinct) throws IOException { + String scope = DEFAULT_SCOPE; + PODistinct physOp = new PODistinct(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), loDistinct.getRequestedParallelisam()); + physOp.setAlias(loDistinct.getAlias()); + currentPlan.add(physOp); + physOp.setResultType(DataType.BAG); + logToPhyMap.put(loDistinct, physOp); + Operator op = loDistinct.getPlan().getPredecessors(loDistinct).get(0); + + PhysicalOperator from = logToPhyMap.get(op); + try { + currentPlan.connect(from, physOp); + } catch (PlanException e) { + int errCode = 2015; + String msg = "Invalid physical operators in the physical plan" ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visit(LOLimit loLimit) throws IOException { + String scope = DEFAULT_SCOPE; + POLimit physOp = new POLimit(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), loLimit.getRequestedParallelisam()); + physOp.setLimit(loLimit.getLimit()); + physOp.setAlias(loLimit.getAlias()); + currentPlan.add(physOp); + physOp.setResultType(DataType.BAG); + logToPhyMap.put(loLimit, physOp); + Operator op = loLimit.getPlan().getPredecessors(loLimit).get(0); + + PhysicalOperator from = logToPhyMap.get(op); + try { + currentPlan.connect(from, physOp); + } catch (PlanException e) { + int errCode = 2015; + String msg = "Invalid physical operators in the physical plan" ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visit(LOSplit loSplit) throws IOException { + String scope = DEFAULT_SCOPE; + POSplit physOp = new POSplit(new OperatorKey(scope, nodeGen + .getNextNodeId(scope)), loSplit.getRequestedParallelisam()); + physOp.setAlias(loSplit.getAlias()); + FileSpec splStrFile; + try { + splStrFile = new FileSpec(FileLocalizer.getTemporaryPath(pc).toString(),new FuncSpec(InterStorage.class.getName())); + } catch (IOException e1) { + byte errSrc = pc.getErrorSource(); + int errCode = 0; + switch(errSrc) { + case PigException.BUG: + errCode = 2016; + break; + case PigException.REMOTE_ENVIRONMENT: + errCode = 6002; + break; + case PigException.USER_ENVIRONMENT: + errCode = 4003; + break; + } + String msg = "Unable to obtain a temporary path." ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, errSrc, e1); + + } + physOp.setSplitStore(splStrFile); + logToPhyMap.put(loSplit, physOp); + + currentPlan.add(physOp); + + List op = loSplit.getPlan().getPredecessors(loSplit); + PhysicalOperator from; + + if(op != null) { + from = logToPhyMap.get(op.get(0)); + } else { + int errCode = 2051; + String msg = "Did not find a predecessor for Split." ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG); + } + + try { + currentPlan.connect(from, physOp); + } catch (PlanException e) { + int errCode = 2015; + String msg = "Invalid physical operators in the physical plan" ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visit(LOSplitOutput loSplitOutput) throws IOException { + String scope = DEFAULT_SCOPE; +// System.err.println("Entering Filter"); + POFilter poFilter = new POFilter(new OperatorKey(scope, nodeGen + .getNextNodeId(scope)), loSplitOutput.getRequestedParallelisam()); + poFilter.setAlias(loSplitOutput.getAlias()); + poFilter.setResultType(DataType.BAG); + currentPlan.add(poFilter); + logToPhyMap.put(loSplitOutput, poFilter); + currentPlans.push(currentPlan); + + currentPlan = new PhysicalPlan(); + +// PlanWalker childWalker = currentWalker +// .spawnChildWalker(filter.getFilterPlan()); + PlanWalker childWalker = new ReverseDependencyOrderWalker(loSplitOutput.getFilterPlan()); + pushWalker(childWalker); + //currentWalker.walk(this); + currentWalker.walk( + new ExpToPhyTranslationVisitor( currentWalker.getPlan(), + childWalker, loSplitOutput, currentPlan, logToPhyMap ) ); + popWalker(); + + poFilter.setPlan(currentPlan); + currentPlan = currentPlans.pop(); + + List op = loSplitOutput.getPlan().getPredecessors(loSplitOutput); + + PhysicalOperator from; + if(op != null) { + from = logToPhyMap.get(op.get(0)); + } else { + int errCode = 2051; + String msg = "Did not find a predecessor for Filter." ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG); + } + + try { + currentPlan.connect(from, poFilter); + } catch (PlanException e) { + int errCode = 2015; + String msg = "Invalid physical operators in the physical plan" ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); + } +// System.err.println("Exiting Filter"); + } + + /** + * updates plan with check for empty bag and if bag is empty to flatten a bag + * with as many null's as dictated by the schema + * @param fePlan the plan to update + * @param joinInput the relation for which the corresponding bag is being checked + * @throws FrontendException + */ + public static void updateWithEmptyBagCheck(PhysicalPlan fePlan, Operator joinInput) throws FrontendException { + LogicalSchema inputSchema = null; + try { + inputSchema = ((LogicalRelationalOperator) joinInput).getSchema(); + + + if(inputSchema == null) { + int errCode = 1109; + String msg = "Input (" + ((LogicalRelationalOperator) joinInput).getAlias() + ") " + + "on which outer join is desired should have a valid schema"; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.INPUT); + } + } catch (FrontendException e) { + int errCode = 2104; + String msg = "Error while determining the schema of input"; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); + } + + CompilerUtils.addEmptyBagOuterJoin(fePlan, translateSchema(inputSchema)); + + } + + private boolean validateMergeJoin(LOJoin loj) throws IOException{ + + List preds = plan.getPredecessors(loj); + + int errCode = 1101; + String errMsg = "Merge Join must have exactly two inputs."; + if(preds.size() != 2) + throw new LogicalToPhysicalTranslatorException(errMsg+" Found: "+preds.size(),errCode); + + return mergeJoinValidator(preds,loj.getPlan()); + } + + private boolean mergeJoinValidator(List preds,OperatorPlan lp) throws IOException { + + int errCode = 1103; + String errMsg = "Merge join only supports Filter, Foreach, filter and Load as its predecessor. Found : "; + if(preds != null && !preds.isEmpty()){ + for(Operator lo : preds){ + // TODO Need to add LOForEach in this statement + if (!(lo instanceof LOFilter || lo instanceof LOLoad)) // || lo instanceof LOForEach + throw new LogicalToPhysicalTranslatorException(errMsg, errCode); + // All is good at this level. Visit predecessors now. + mergeJoinValidator(lp.getPredecessors(lo),lp); + } + } + // We visited everything and all is good. + return true; + } +} Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java?rev=982345&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java Wed Aug 4 17:46:42 2010 @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.newplan.logical.relational; + +import java.io.IOException; +import java.io.PrintStream; + +import org.apache.pig.newplan.BaseOperatorPlan; +import org.apache.pig.newplan.OperatorPlan; +import org.apache.pig.newplan.logical.optimizer.LogicalPlanPrinter; + +/** + * LogicalPlan is the logical view of relational operations Pig will execute + * for a given script. Note that it contains only relational operations. + * All expressions will be contained in LogicalExpressionPlans inside + * each relational operator. + */ +public class LogicalPlan extends BaseOperatorPlan { + + /** + * Equality is checked by calling equals on every leaf in the plan. This + * assumes that plans are always connected graphs. It is somewhat + * inefficient since every leaf will test equality all the way to + * every root. But it is only intended for use in testing, so that + * should be ok. Checking predecessors (as opposed to successors) was + * chosen because splits (which have multiple successors) do not depend + * on order of outputs for correctness, whereas joins (with multiple + * predecessors) do. That is, reversing the outputs of split in the + * graph has no correctness implications, whereas reversing the inputs + * of join can. This method of doing equals will detect predecessors + * in different orders but not successors in different orders. + */ + @Override + public boolean isEqual(OperatorPlan other) { + if (other == null || !(other instanceof LogicalPlan)) { + return false; + } + + return super.isEqual(other); + } + + @Override + public void explain(PrintStream ps, String format, boolean verbose) + throws IOException { + ps.println("#-----------------------------------------------"); + ps.println("# New Logical Plan:"); + ps.println("#-----------------------------------------------"); + + LogicalPlanPrinter npp = new LogicalPlanPrinter(this, ps); + npp.visit(); + } +} Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlanVisitor.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlanVisitor.java?rev=982345&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlanVisitor.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlanVisitor.java Wed Aug 4 17:46:42 2010 @@ -0,0 +1 @@ +package org.apache.pig.newplan.logical.relational; Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java?rev=982345&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java Wed Aug 4 17:46:42 2010 @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.newplan.logical.relational; + +import java.io.IOException; + +import org.apache.pig.newplan.OperatorPlan; +import org.apache.pig.newplan.PlanVisitor; +import org.apache.pig.newplan.PlanWalker; + +/** + * A visitor for logical plans. + */ +public abstract class LogicalRelationalNodesVisitor extends PlanVisitor { + + protected LogicalRelationalNodesVisitor(OperatorPlan plan, PlanWalker walker) { + super(plan, walker); + /* + Iterator iter = plan.getOperators(); + while(iter.hasNext()) { + if (!(iter.next() instanceof LogicalRelationalOperator)) { + throw new RuntimeException("LogicalPlanVisitor can only visit logical plan"); + } + }*/ + } + + public void visit(LOLoad load) throws IOException { + } + + public void visit(LOFilter filter) throws IOException { + } + + public void visit(LOStore store) throws IOException { + } + + public void visit(LOJoin join) throws IOException { + } + + public void visit(LOForEach foreach) throws IOException { + } + + public void visit(LOGenerate gen) throws IOException { + } + + public void visit(LOInnerLoad load) throws IOException { + } + + public void visit(LOCogroup loCogroup) throws IOException { + } + + public void visit(LOSplit loSplit) throws IOException { + } + + public void visit(LOSplitOutput loSplitOutput) throws IOException { + } + + public void visit(LOUnion loUnion) throws IOException { + } + + public void visit(LOSort loSort) throws IOException { + } + + public void visit(LODistinct loDistinct) throws IOException { + } + + public void visit(LOLimit loLimit) throws IOException { + } + + public void visit(LOCross loCross) throws IOException { + } + + public void visit(LOStream loStream) throws IOException { + } +} Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalOperator.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalOperator.java?rev=982345&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalOperator.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalOperator.java Wed Aug 4 17:46:42 2010 @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.newplan.logical.relational; + +import java.util.Map; + +import org.apache.pig.newplan.Operator; +import org.apache.pig.newplan.OperatorPlan; + +/** + * Logical representation of relational operators. Relational operators have + * a schema. + */ +abstract public class LogicalRelationalOperator extends Operator { + + protected LogicalSchema schema; + protected int requestedParallelism; + protected String alias; + protected int lineNum; + + /** + * + * @param name of this operator + * @param plan this operator is in + */ + public LogicalRelationalOperator(String name, OperatorPlan plan) { + this(name, plan, -1); + } + + /** + * + * @param name of this operator + * @param plan this operator is in + * @param rp requested parallelism + */ + public LogicalRelationalOperator(String name, + OperatorPlan plan, + int rp) { + super(name, plan); + requestedParallelism = rp; + } + + /** + * Get the schema for the output of this relational operator. This does + * not merely return the schema variable. If schema is not yet set, this + * will attempt to construct it. Therefore it is abstract since each + * operator will need to construct its schema differently. + * @return the schema + */ + abstract public LogicalSchema getSchema(); + + public void setSchema(LogicalSchema schema) { + this.schema = schema; + } + + /** + * Reset the schema to null so that the next time getSchema is called + * the schema will be regenerated from scratch. + */ + public void resetSchema() { + schema = null; + } + + + /** + * Get the requestedParallelism for this operator. + * @return requestedParallelsim + */ + public int getRequestedParallelisam() { + return requestedParallelism; + } + + /** + * Get the alias of this operator. That is, if the Pig Latin for this operator + * was 'X = sort W by $0' then the alias will be X. For store and split it will + * be the alias being stored or split. Note that because of this this alias + * is not guaranteed to be unique to a single operator. + * @return alias + */ + + public String getAlias() { + return alias; + } + + public void setAlias(String alias) { + this.alias = alias; + } + + public void setRequestedParallelism(int parallel) { + this.requestedParallelism = parallel; + } + + /** + * Get the line number in the submitted Pig Latin script where this operator + * occurred. + * @return line number + */ + public int getLineNumber() { + return lineNum; + } + + /** + * Only to be used by unit tests. This is a back door cheat to set the schema + * without having to calculate it. This should never be called by production + * code, only by tests. + * @param schema to set + */ + public void neverUseForRealSetSchema(LogicalSchema schema) { + this.schema = schema; + } + + /** + * Do some basic equality checks on two relational operators. Equality + * is defined here as having equal schemas and predecessors that are equal. + * This is intended to be used by operators' equals methods. + * @param other LogicalRelationalOperator to compare predecessors against + * @return true if the isEquals() methods of this node's predecessor(s) returns + * true when invoked with other's predecessor(s). + */ + protected boolean checkEquality(LogicalRelationalOperator other) { + if (other == null) return false; + LogicalSchema s = getSchema(); + LogicalSchema os = other.getSchema(); + if (s == null && os == null) { + // intentionally blank + } else if (s == null || os == null) { + // one of them is null and one isn't + return false; + } else { + if (!s.isEqual(os)) return false; + } + return true; + } + + public String toString() { + StringBuilder msg = new StringBuilder(); + + msg.append("(Name: " + name + " Schema: "); + if (schema!=null) + msg.append(schema); + else + msg.append("null"); + msg.append(")"); + if (annotations!=null) { + for (Map.Entry entry : annotations.entrySet()) { + msg.append(entry); + } + } + return msg.toString(); + } +} Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java?rev=982345&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java Wed Aug 4 17:46:42 2010 @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.newplan.logical.relational; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.pig.data.DataType; +import org.apache.pig.impl.util.Pair; +import org.apache.pig.newplan.logical.expression.LogicalExpression; + +/** + * Schema, from a logical perspective. + */ +public class LogicalSchema { + + public static class LogicalFieldSchema { + public String alias; + public byte type; + public long uid; + public LogicalSchema schema; + + public LogicalFieldSchema(String alias, LogicalSchema schema, byte type) { + this(alias, schema, type, -1); + } + + public LogicalFieldSchema(LogicalFieldSchema fs) { + this(fs.alias, fs.schema, fs.type, fs.uid); + } + + public LogicalFieldSchema(String alias, LogicalSchema schema, byte type, long uid) { + this.alias = alias; + this.type = type; + this.schema = schema; + this.uid = uid; + } + + /** + * Equality is defined as having the same type and either the same schema + * or both null schema. Alias and uid are not checked. + */ + public boolean isEqual(Object other) { + if (other instanceof LogicalFieldSchema) { + LogicalFieldSchema ofs = (LogicalFieldSchema)other; + if (type != ofs.type) return false; + if (schema == null && ofs.schema == null) return true; + if (schema == null) return false; + else return schema.isEqual(ofs.schema); + } else { + return false; + } + } + + public String toString() { + if( type == DataType.BAG ) { + if( schema == null ) { + return ( alias + "#" + uid + ":bag{}#" ); + } + return ( alias + "#" + uid + ":bag{" + schema.toString() + "}" ); + } else if( type == DataType.TUPLE ) { + if( schema == null ) { + return ( alias + "#" + uid + ":tuple{}" ); + } + return ( alias + "#" + uid + ":tuple(" + schema.toString() + ")" ); + } + return ( alias + "#" + uid + ":" + DataType.findTypeName(type) ); + } + + public void stampFieldSchema() { + if (uid==-1) + uid = LogicalExpression.getNextUid(); + if (schema!=null) { + for (LogicalFieldSchema fs : schema.getFields()) { + fs.stampFieldSchema(); + } + } + } + + private boolean compatible(LogicalFieldSchema uidOnlyFieldSchema) { + if (uidOnlyFieldSchema==null) + return false; + if (this.schema==null && uidOnlyFieldSchema.schema!=null || + this.schema!=null && uidOnlyFieldSchema==null) + return false; + if (this.schema!=null) { + if (this.schema.size()!=uidOnlyFieldSchema.schema.size()) + return false; + for (int i=0;i fields; + private Map> aliases; + + private boolean twoLevelAccessRequired = false; + + public LogicalSchema() { + fields = new ArrayList(); + aliases = new HashMap>(); + } + + /** + * Add a field to this schema. + * @param field to be added to the schema + */ + public void addField(LogicalFieldSchema field) { + fields.add(field); + if (field.alias != null && !field.alias.equals("")) { + // put the full name of this field into aliases map + // boolean in the pair indicates if this alias is full name + aliases.put(field.alias, new Pair(fields.size()-1, true)); + int index = 0; + + // check and put short names into alias map if there is no conflict + + while(index != -1) { + index = field.alias.indexOf("::", index); + if (index != -1) { + String a = field.alias.substring(index+2); + if (aliases.containsKey(a)) { + // remove conflict if the conflict is not full name + // we can never remove full name + if (!aliases.get(a).second) { + aliases.remove(a); + } + }else{ + // put alias into map and indicate it is a short name + aliases.put(a, new Pair(fields.size()-1, false)); + } + + index = index +2; + } + } + } + } + + /** + * Fetch a field by alias + * @param alias + * @return field associated with alias, or null if no such field + */ + public LogicalFieldSchema getField(String alias) { + Pair p = aliases.get(alias); + if (p == null) { + return null; + } + + return fields.get(p.first); + } + + /** + * Fetch a field by field number + * @param fieldNum field number to fetch + * @return field + */ + public LogicalFieldSchema getField(int fieldNum) { + return fields.get(fieldNum); + } + + /** + * Get all fields + * @return list of all fields + */ + public List getFields() { + return fields; + } + + /** + * Get the size of the schema. + * @return size + */ + public int size() { + return fields.size(); + } + + /** + * Two schemas are equal if they are of equal size and their fields + * schemas considered in order are equal. + */ + public boolean isEqual(Object other) { + if (other != null && other instanceof LogicalSchema) { + LogicalSchema os = (LogicalSchema)other; + if (size() != os.size()) return false; + for (int i = 0; i < size(); i++) { + if (!getField(i).isEqual(os.getField(i))) return false; + } + return true; + } else { + return false; + } + + } + + /** + * Look for the index of the field that contains the specified uid + * @param uid the uid to look for + * @return the index of the field, -1 if not found + */ + public int findField(long uid) { + + for(int i=0; i< size(); i++) { + LogicalFieldSchema f = getField(i); + // if this field has the same uid, then return this field + if (f.uid == uid) { + return i; + } + + // if this field has a schema, check its schema + if (f.schema != null) { + if (f.schema.findField(uid) != -1) { + return i; + } + } + } + + return -1; + } + + + /** + * Merge two schemas. + * @param s1 + * @param s2 + * @return a merged schema, or null if the merge fails + */ + public static LogicalSchema merge(LogicalSchema s1, LogicalSchema s2) { + // TODO + return null; + } + + public String toString() { + StringBuilder str = new StringBuilder(); + + for( LogicalFieldSchema field : fields ) { + str.append( field.toString() + "," ); + } + if( fields.size() != 0 ) { + str.deleteCharAt( str.length() -1 ); + } + return str.toString(); + } + + public void setTwoLevelAccessRequired(boolean flag) { + twoLevelAccessRequired = flag; + } + + public boolean isTwoLevelAccessRequired() { + return twoLevelAccessRequired; + } + + public LogicalSchema mergeUid(LogicalSchema uidOnlySchema) throws IOException { + if (uidOnlySchema!=null) { + if (size()!=uidOnlySchema.size()) { + throw new IOException("structure of schema change"); + } + for (int i=0;i