Return-Path: Delivered-To: apmail-pig-commits-archive@www.apache.org Received: (qmail 88517 invoked from network); 13 Dec 2010 19:11:30 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 13 Dec 2010 19:11:30 -0000 Received: (qmail 23670 invoked by uid 500); 13 Dec 2010 19:11:30 -0000 Delivered-To: apmail-pig-commits-archive@pig.apache.org Received: (qmail 23644 invoked by uid 500); 13 Dec 2010 19:11:29 -0000 Mailing-List: contact commits-help@pig.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pig.apache.org Delivered-To: mailing list commits@pig.apache.org Received: (qmail 23630 invoked by uid 99); 13 Dec 2010 19:11:29 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Dec 2010 19:11:29 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Dec 2010 19:11:26 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id F17872388A2C; Mon, 13 Dec 2010 19:11:05 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1045314 [1/5] - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src... Date: Mon, 13 Dec 2010 19:11:04 -0000 To: commits@pig.apache.org From: yanz@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20101213191105.F17872388A2C@eris.apache.org> Author: yanz Date: Mon Dec 13 19:11:00 2010 New Revision: 1045314 URL: http://svn.apache.org/viewvc?rev=1045314&view=rev Log: PIG-1712: ILLUSTRATE rework (yanz) Added: pig/trunk/src/org/apache/pig/pen/FakeRawKeyValueIterator.java pig/trunk/src/org/apache/pig/pen/Illustrable.java pig/trunk/src/org/apache/pig/pen/Illustrator.java pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java pig/trunk/src/org/apache/pig/pen/POOptimizeDisabler.java pig/trunk/src/org/apache/pig/pen/PhysicalPlanResetter.java pig/trunk/src/org/apache/pig/pen/util/ReverseDepthFirstWalker.java pig/trunk/test/org/apache/pig/test/data/TestIllustrateInput.txt pig/trunk/test/org/apache/pig/test/data/TestIllustrateInput2.txt pig/trunk/test/org/apache/pig/test/data/TestIllustrateInput_invalid.txt pig/trunk/test/org/apache/pig/test/data/illustrate.pig pig/trunk/test/org/apache/pig/test/data/illustrate2.pig pig/trunk/test/org/apache/pig/test/data/illustrate3.pig pig/trunk/test/org/apache/pig/test/data/illustrate4.pig pig/trunk/test/org/apache/pig/test/data/illustrate5.pig pig/trunk/test/org/apache/pig/test/data/illustrate6.pig Removed: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java pig/trunk/src/org/apache/pig/pen/LocalLogToPhyTranslationVisitor.java pig/trunk/src/org/apache/pig/pen/physicalOperators/ pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java pig/trunk/test/org/apache/pig/test/TestPOCogroup.java pig/trunk/test/org/apache/pig/test/TestPOCross.java Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/PigServer.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryComparisonOperator.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryExpressionOperator.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POIsNull.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POMapLookUp.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONegative.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONot.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORegexp.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PONative.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORead.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java pig/trunk/src/org/apache/pig/data/AccumulativeBag.java pig/trunk/src/org/apache/pig/data/TupleFactory.java pig/trunk/src/org/apache/pig/impl/PigContext.java pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java pig/trunk/src/org/apache/pig/pen/DerivedDataVisitor.java pig/trunk/src/org/apache/pig/pen/EquivalenceClasses.java pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java pig/trunk/src/org/apache/pig/pen/LineageTrimmingVisitor.java pig/trunk/src/org/apache/pig/pen/util/DisplayExamples.java pig/trunk/src/org/apache/pig/pen/util/ExampleTuple.java pig/trunk/src/org/apache/pig/pen/util/LineageTracer.java pig/trunk/src/org/apache/pig/pen/util/MetricEvaluation.java pig/trunk/src/org/apache/pig/pen/util/PreOrderDepthFirstWalker.java pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj pig/trunk/test/org/apache/pig/test/TestExampleGenerator.java pig/trunk/test/org/apache/pig/test/TestGrunt.java pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java pig/trunk/test/org/apache/pig/test/TestStore.java pig/trunk/test/org/apache/pig/test/utils/POCastDummy.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1045314&r1=1045313&r2=1045314&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Mon Dec 13 19:11:00 2010 @@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES IMPROVEMENTS +PIG-1712: ILLUSTRATE rework (yanz) + PIG-1758: Deep cast of complex type (daijy) PIG-1728: doc updates (chandec via olgan) Modified: pig/trunk/src/org/apache/pig/PigServer.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1045314&r1=1045313&r2=1045314&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/PigServer.java (original) +++ pig/trunk/src/org/apache/pig/PigServer.java Mon Dec 13 19:11:00 2010 @@ -1125,23 +1125,32 @@ public class PigServer { return currDAG.getAliasOp().keySet(); } - public Map getExamples(String alias) { + public Map getExamples(String alias) throws IOException { LogicalPlan plan = null; - try { - if (currDAG.isBatchOn()) { + if (currDAG.isBatchOn() && alias != null) { currDAG.execute(); } - - plan = getClonedGraph().getPlan(alias); + Graph g = getClonedGraph(); + plan = g.getPlan(alias); + plan = compileLp(plan, g, false); } catch (IOException e) { //Since the original script is parsed anyway, there should not be an //error in this parsing. The only reason there can be an error is when //the files being loaded in load don't exist anymore. e.printStackTrace(); } + ExampleGenerator exgen = new ExampleGenerator(plan, pigContext); - return exgen.getExamples(); + try { + return exgen.getExamples(); + } catch (ExecException e) { + e.printStackTrace(System.out); + throw new IOException("ExecException : "+ e.getMessage()); + } catch (Exception e) { + e.printStackTrace(System.out); + throw new IOException("Exception : "+ e.getMessage()); + } } private LogicalPlan getStorePlan(String alias) throws IOException { Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1045314&r1=1045313&r2=1045314&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Mon Dec 13 19:11:00 2010 @@ -57,6 +57,8 @@ import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.logicalLayer.LOForEach; +import org.apache.pig.impl.logicalLayer.LogicalOperator; import org.apache.pig.impl.logicalLayer.LogicalPlan; import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.impl.plan.OperatorKey; @@ -69,17 +71,17 @@ import org.apache.pig.newplan.logical.Lo import org.apache.pig.newplan.logical.expression.ConstantExpression; import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan; import org.apache.pig.newplan.logical.optimizer.SchemaResetter; -import org.apache.pig.newplan.logical.optimizer.ProjectionPatcher.ProjectionFinder; import org.apache.pig.newplan.logical.relational.LOLimit; import org.apache.pig.newplan.logical.relational.LOSort; import org.apache.pig.newplan.logical.relational.LOSplit; import org.apache.pig.newplan.logical.relational.LOSplitOutput; import org.apache.pig.newplan.logical.relational.LOStore; import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor; +import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator; import org.apache.pig.newplan.logical.rules.InputOutputFileValidator; -import org.apache.pig.newplan.optimizer.Rule; import org.apache.pig.tools.pigstats.OutputStats; import org.apache.pig.tools.pigstats.PigStats; +import org.apache.pig.pen.POOptimizeDisabler; public class HExecutionEngine { @@ -105,6 +107,11 @@ public class HExecutionEngine { // map from LOGICAL key to into about the execution protected Map materializedResults; + protected Map logToPhyMap; + protected Map opsMap; + protected Map newLogToPhyMap; + private Map> forEachInnerOpMap; + public HExecutionEngine(PigContext pigContext) { this.pigContext = pigContext; this.logicalToPhysicalKeys = new HashMap(); @@ -255,8 +262,16 @@ public class HExecutionEngine { // translate old logical plan to new plan LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(plan); visitor.visit(); + opsMap = visitor.getOldToNewLOOpMap(); + forEachInnerOpMap = visitor.getForEachInnerMap(); org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan(); + if (pigContext.inIllustrator) { + // disable all PO-specific optimizations + POOptimizeDisabler pod = new POOptimizeDisabler(newPlan); + pod.visit(); + } + SchemaResetter schemaResetter = new SchemaResetter(newPlan); schemaResetter.visit(); @@ -271,6 +286,22 @@ public class HExecutionEngine { throw new FrontendException(msg, errCode, PigException.BUG, ioe); } + if (pigContext.inIllustrator) { + // disable MergeForEach in illustrator + if (optimizerRules == null) + optimizerRules = new HashSet(); + optimizerRules.add("MergeForEach"); + optimizerRules.add("PartitionFilterOptimizer"); + optimizerRules.add("LimitOptimizer"); + optimizerRules.add("SplitFilter"); + optimizerRules.add("PushUpFilter"); + optimizerRules.add("MergeFilter"); + optimizerRules.add("PushDownForEachFlatten"); + optimizerRules.add("ColumnMapKeyPrune"); + optimizerRules.add("AddForEach"); + optimizerRules.add("GroupByConstParallelSetter"); + } + // run optimizer org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer optimizer = new org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer(newPlan, 100, optimizerRules); @@ -294,6 +325,7 @@ public class HExecutionEngine { translator.setPigContext(pigContext); translator.visit(); + newLogToPhyMap = translator.getLogToPhyMap(); return translator.getPhysicalPlan(); }else{ @@ -303,13 +335,40 @@ public class HExecutionEngine { translator.visit(); return translator.getPhysicalPlan(); } - } catch (Exception ve) { + } catch (ExecException ve) { int errCode = 2042; String msg = "Error in new logical plan. Try -Dpig.usenewlogicalplan=false."; throw new FrontendException(msg, errCode, PigException.BUG, ve); } } + public Map getLogToPhyMap() { + if (logToPhyMap != null) + return logToPhyMap; + else if (newLogToPhyMap != null) { + Map result = new HashMap(); + for (LogicalOperator lo: opsMap.keySet()) { + result.put(lo, newLogToPhyMap.get(opsMap.get(lo))); + } + return result; + } else + return null; + } + + public Map> getForEachInnerLogToPhyMap() { + Map> result = + new HashMap>(); + for (Map.Entry> entry : + forEachInnerOpMap.entrySet()) { + Map innerOpMap = new HashMap(); + for (Map.Entry innerEntry : entry.getValue().entrySet()) { + innerOpMap.put(innerEntry.getKey(), newLogToPhyMap.get(innerEntry.getValue())); + } + result.put(entry.getKey(), innerOpMap); + } + return result; + } + public static class SortInfoSetter extends LogicalRelationalNodesVisitor { public SortInfoSetter(OperatorPlan plan) throws FrontendException { Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1045314&r1=1045313&r2=1045314&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Mon Dec 13 19:11:00 2010 @@ -174,7 +174,10 @@ public class JobControlCompiler{ UDFContext.getUDFContext().reset(); } - Map getJobMroMap() { + /** + * Gets the map of Job and the MR Operator + */ + public Map getJobMroMap() { return Collections.unmodifiableMap(jobMroMap); } @@ -378,19 +381,23 @@ public class JobControlCompiler{ inpTargets.add(ldSucKeys); inpSignatureLists.add(ld.getSignature()); //Remove the POLoad from the plan - mro.mapPlan.remove(ld); + if (!pigContext.inIllustrator) + mro.mapPlan.remove(ld); } } - //Create the jar of all functions and classes required - File submitJarFile = File.createTempFile("Job", ".jar"); - // ensure the job jar is deleted on exit - submitJarFile.deleteOnExit(); - FileOutputStream fos = new FileOutputStream(submitJarFile); - JarManager.createJar(fos, mro.UDFs, pigContext); + if (!pigContext.inIllustrator) + { + //Create the jar of all functions and classes required + File submitJarFile = File.createTempFile("Job", ".jar"); + // ensure the job jar is deleted on exit + submitJarFile.deleteOnExit(); + FileOutputStream fos = new FileOutputStream(submitJarFile); + JarManager.createJar(fos, mro.UDFs, pigContext); - //Start setting the JobConf properties - conf.set("mapred.jar", submitJarFile.getPath()); + //Start setting the JobConf properties + conf.set("mapred.jar", submitJarFile.getPath()); + } conf.set("pig.inputs", ObjectSerializer.serialize(inp)); conf.set("pig.inpTargets", ObjectSerializer.serialize(inpTargets)); conf.set("pig.inpSignatures", ObjectSerializer.serialize(inpSignatureLists)); @@ -457,22 +464,23 @@ public class JobControlCompiler{ POStore st; if (reduceStores.isEmpty()) { st = mapStores.get(0); - mro.mapPlan.remove(st); + if(!pigContext.inIllustrator) + mro.mapPlan.remove(st); } else { st = reduceStores.get(0); - mro.reducePlan.remove(st); + if(!pigContext.inIllustrator) + mro.reducePlan.remove(st); } // set out filespecs String outputPath = st.getSFile().getFileName(); - FuncSpec outputFuncSpec = st.getSFile().getFuncSpec(); conf.set("pig.streaming.log.dir", new Path(outputPath, LOG_DIR).toString()); conf.set("pig.streaming.task.output.dir", outputPath); } - else { // multi store case + else if (mapStores.size() + reduceStores.size() > 0) { // multi store case log.info("Setting up multi store job"); String tmpLocationStr = FileLocalizer .getTemporaryPath(pigContext).toString(); @@ -513,7 +521,8 @@ public class JobControlCompiler{ //MapOnly Job nwJob.setMapperClass(PigMapOnly.Map.class); nwJob.setNumReduceTasks(0); - conf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan)); + if(!pigContext.inIllustrator) + conf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan)); if(mro.isEndOfAllInputSetInMap()) { // this is used in Map.close() to decide whether the // pipeline needs to be rerun one more time in the close() @@ -535,7 +544,8 @@ public class JobControlCompiler{ log.info("Setting identity combiner class."); } pack = (POPackage)mro.reducePlan.getRoots().get(0); - mro.reducePlan.remove(pack); + if(!pigContext.inIllustrator) + mro.reducePlan.remove(pack); nwJob.setMapperClass(PigMapReduce.Map.class); nwJob.setReducerClass(PigMapReduce.Reduce.class); @@ -550,21 +560,24 @@ public class JobControlCompiler{ if (mro.customPartitioner != null) nwJob.setPartitionerClass(PigContext.resolveClassName(mro.customPartitioner)); - conf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan)); + if(!pigContext.inIllustrator) + conf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan)); if(mro.isEndOfAllInputSetInMap()) { // this is used in Map.close() to decide whether the // pipeline needs to be rerun one more time in the close() // The pipeline is rerun only if there was a stream or merge-join. conf.set(END_OF_INP_IN_MAP, "true"); } - conf.set("pig.reducePlan", ObjectSerializer.serialize(mro.reducePlan)); + if(!pigContext.inIllustrator) + conf.set("pig.reducePlan", ObjectSerializer.serialize(mro.reducePlan)); if(mro.isEndOfAllInputSetInReduce()) { // this is used in Map.close() to decide whether the // pipeline needs to be rerun one more time in the close() // The pipeline is rerun only if there was a stream conf.set("pig.stream.in.reduce", "true"); } - conf.set("pig.reduce.package", ObjectSerializer.serialize(pack)); + if (!pigContext.inIllustrator) + conf.set("pig.reduce.package", ObjectSerializer.serialize(pack)); conf.set("pig.reduce.key.type", Byte.toString(pack.getKeyType())); if (mro.getUseSecondaryKey()) { @@ -631,9 +644,14 @@ public class JobControlCompiler{ nwJob.setGroupingComparatorClass(PigGroupingPartitionWritableComparator.class); } - // unset inputs for POStore, otherwise, map/reduce plan will be unnecessarily deserialized - for (POStore st: mapStores) { st.setInputs(null); st.setParentPlan(null);} - for (POStore st: reduceStores) { st.setInputs(null); st.setParentPlan(null);} + if (!pigContext.inIllustrator) + { + // unset inputs for POStore, otherwise, map/reduce plan will be unnecessarily deserialized + for (POStore st: mapStores) { st.setInputs(null); st.setParentPlan(null);} + for (POStore st: reduceStores) { st.setInputs(null); st.setParentPlan(null);} + conf.set(PIG_MAP_STORES, ObjectSerializer.serialize(mapStores)); + conf.set(PIG_REDUCE_STORES, ObjectSerializer.serialize(reduceStores)); + } // tmp file compression setups if (Utils.tmpFileCompression(pigContext)) { @@ -641,8 +659,6 @@ public class JobControlCompiler{ conf.set("pig.tmpfilecompression.codec", Utils.tmpFileCompressionCodec(pigContext)); } - conf.set(PIG_MAP_STORES, ObjectSerializer.serialize(mapStores)); - conf.set(PIG_REDUCE_STORES, ObjectSerializer.serialize(reduceStores)); String tmp; long maxCombinedSplitSize = 0; if (!mro.combineSmallSplits() || pigContext.getProperties().getProperty("pig.splitCombination", "true").equals("false")) @@ -661,7 +677,6 @@ public class JobControlCompiler{ UDFContext.getUDFContext().serialize(conf); Job cjob = new Job(new JobConf(nwJob.getConfiguration()), new ArrayList()); jobStoreMap.put(cjob,new Pair, Path>(storeLocations, tmpLocation)); - return cjob; } catch (JobCreationException jce) { @@ -1142,7 +1157,7 @@ public class JobControlCompiler{ PigContext pigContext, Configuration conf, String filename, String prefix) throws IOException { - if (!FileLocalizer.fileExists(filename, pigContext)) { + if (!pigContext.inIllustrator && !FileLocalizer.fileExists(filename, pigContext)) { throw new IOException( "Internal error: skew join partition file " + filename + " does not exist"); Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1045314&r1=1045313&r2=1045314&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Mon Dec 13 19:11:00 2010 @@ -109,7 +109,6 @@ import org.apache.pig.impl.util.ObjectSe import org.apache.pig.impl.util.Pair; import org.apache.pig.impl.util.UriUtil; import org.apache.pig.impl.util.Utils; -import org.mortbay.util.URIUtil; /** * The compiler that compiles a given physical plan @@ -311,6 +310,7 @@ public class MRCompiler extends PhyPlanV public MROperPlan compile() throws IOException, PlanException, VisitorException { List leaves = plan.getLeaves(); + if (!pigContext.inIllustrator) for (PhysicalOperator op : leaves) { if (!(op instanceof POStore)) { int errCode = 2025; @@ -324,8 +324,14 @@ public class MRCompiler extends PhyPlanV // and compile their plans List stores = PlanHelper.getStores(plan); List nativeMRs= PlanHelper.getNativeMRs(plan); - List ops = new ArrayList(stores.size() + nativeMRs.size()); - ops.addAll(stores); + List ops; + if (!pigContext.inIllustrator) { + ops = new ArrayList(stores.size() + nativeMRs.size()); + ops.addAll(stores); + } else { + ops = new ArrayList(leaves.size() + nativeMRs.size()); + ops.addAll(leaves); + } ops.addAll(nativeMRs); Collections.sort(ops); @@ -1005,16 +1011,23 @@ public class MRCompiler extends PhyPlanV if (!mro.isMapDone()) { // if map plan is open, add a limit for optimization, eventually we // will add another limit to reduce plan - mro.mapPlan.addAsLeaf(op); - mro.setMapDone(true); + if (!pigContext.inIllustrator) + { + mro.mapPlan.addAsLeaf(op); + mro.setMapDone(true); + } if (mro.reducePlan.isEmpty()) { simpleConnectMapToReduce(mro); mro.requestedParallelism = 1; - POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope))); - pLimit2.setLimit(op.getLimit()); - mro.reducePlan.addAsLeaf(pLimit2); + if (!pigContext.inIllustrator) { + POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope))); + pLimit2.setLimit(op.getLimit()); + mro.reducePlan.addAsLeaf(pLimit2); + } else { + mro.reducePlan.addAsLeaf(op); + } } else { @@ -1848,6 +1861,7 @@ public class MRCompiler extends PhyPlanV curMROp.reducePlan.addAsLeaf(nfe1); curMROp.setNeedsDistinctCombiner(true); phyToMROpMap.put(op, curMROp); + curMROp.phyToMRMap.put(op, nfe1); }catch(Exception e){ int errCode = 2034; String msg = "Error compiling operator " + op.getClass().getSimpleName(); @@ -2221,12 +2235,13 @@ public class MRCompiler extends PhyPlanV POForEach nfe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),-1,eps2,flattened); mro.reducePlan.add(nfe1); mro.reducePlan.connect(pkg, nfe1); - + mro.phyToMRMap.put(sort, nfe1); if (limit!=-1) { POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope))); pLimit2.setLimit(limit); mro.reducePlan.addAsLeaf(pLimit2); + mro.phyToMRMap.put(sort, pLimit2); } // ep1.add(innGen); @@ -2649,7 +2664,7 @@ public class MRCompiler extends PhyPlanV POPackage pack = (POPackage)op; List sucs = mr.reducePlan.getSuccessors(pack); - if (sucs.size()!=1) { + if (sucs == null || sucs.size()!=1) { return; } @@ -2739,12 +2754,12 @@ public class MRCompiler extends PhyPlanV { // Now we can optimize the map-reduce plan // Replace POPackage->POForeach to POJoinPackage - replaceWithPOJoinPackage(mr.reducePlan, pack, forEach, chunkSize); + replaceWithPOJoinPackage(mr.reducePlan, mr, pack, forEach, chunkSize); } } } - public static void replaceWithPOJoinPackage(PhysicalPlan plan, + public static void replaceWithPOJoinPackage(PhysicalPlan plan, MapReduceOper mr, POPackage pack, POForEach forEach, String chunkSize) throws VisitorException { String scope = pack.getOperatorKey().scope; NodeIdGenerator nig = NodeIdGenerator.getGenerator(); @@ -2772,7 +2787,7 @@ public class MRCompiler extends PhyPlanV String msg = "Error rewriting POJoinPackage."; throw new MRCompilerException(msg, errCode, PigException.BUG, e); } - + mr.phyToMRMap.put(forEach, joinPackage); LogFactory. getLog(LastInputStreamingOptimizer.class).info("Rewrite: POPackage->POForEach to POJoinPackage"); } @@ -2800,6 +2815,7 @@ public class MRCompiler extends PhyPlanV throw new MRCompilerException(msg, errCode, PigException.BUG); } PhysicalOperator mpLeaf = mpLeaves.get(0); + if (!pigContext.inIllustrator) if (!(mpLeaf instanceof POStore)) { int errCode = 2025; String msg = "Expected leaf of reduce plan to " + @@ -2885,6 +2901,7 @@ public class MRCompiler extends PhyPlanV throw new MRCompilerException(msg, errCode, PigException.BUG); } PhysicalOperator mpLeaf = mpLeaves.get(0); + if (!pigContext.inIllustrator) if (!(mpLeaf instanceof POStore)) { int errCode = 2025; String msg = "Expected leaf of reduce plan to " + Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1045314&r1=1045313&r2=1045314&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Mon Dec 13 19:11:00 2010 @@ -461,7 +461,7 @@ public class MapReduceLauncher extends L } } - private MROperPlan compile( + public MROperPlan compile( PhysicalPlan php, PigContext pc) throws PlanException, IOException, VisitorException { MRCompiler comp = new MRCompiler(php, pc); @@ -479,7 +479,7 @@ public class MapReduceLauncher extends L //String prop = System.getProperty("pig.exec.nocombiner"); String prop = pc.getProperties().getProperty("pig.exec.nocombiner"); - if (!("true".equals(prop))) { + if (!pc.inIllustrator && !("true".equals(prop))) { CombinerOptimizer co = new CombinerOptimizer(plan, lastInputChunkSize); co.visit(); //display the warning message(s) from the CombinerOptimizer @@ -493,7 +493,7 @@ public class MapReduceLauncher extends L // Optimize to use secondary sort key if possible prop = pc.getProperties().getProperty("pig.exec.nosecondarykey"); - if (!("true".equals(prop))) { + if (!pc.inIllustrator && !("true".equals(prop))) { SecondaryKeyOptimizer skOptimizer = new SecondaryKeyOptimizer(plan); skOptimizer.visit(); } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=1045314&r1=1045313&r2=1045314&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Mon Dec 13 19:11:00 2010 @@ -20,6 +20,8 @@ package org.apache.pig.backend.hadoop.ex import java.io.ByteArrayOutputStream; import java.util.HashSet; import java.util.Set; +import java.util.Map; +import java.util.HashMap; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.NodeIdGenerator; @@ -146,6 +148,10 @@ public class MapReduceOper extends Opera // are NOT combinable for correctness. private boolean combineSmallSplits = true; + // Map of the physical operator in physical plan to the one in MR plan: only needed + // if the physical operator is changed/replaced in MR compilation due to, e.g., optimization + public Map phyToMRMap; + private static enum OPER_FEATURE { NONE, // Indicate if this job is a sampling job @@ -169,6 +175,7 @@ public class MapReduceOper extends Opera scalars = new HashSet(); nig = NodeIdGenerator.getGenerator(); scope = k.getScope(); + phyToMRMap = new HashMap(); } /*@Override Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java?rev=1045314&r1=1045313&r2=1045314&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java Mon Dec 13 19:11:00 2010 @@ -276,13 +276,6 @@ public class PhyPlanSetter extends PhyPl stream.setParentPlan(parent); } - @Override - public void visitLocalRearrangeForIllustrate( - POLocalRearrangeForIllustrate lrfi) throws VisitorException { - super.visitLocalRearrangeForIllustrate(lrfi); - lrfi.setParentPlan(parent); - } - /* @Override public void visitPartitionRearrange(POPartitionRearrange lrfi) throws VisitorException { Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=1045314&r1=1045313&r2=1045314&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Mon Dec 13 19:11:00 2010 @@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -28,6 +29,8 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.log4j.PropertyConfigurator; import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; @@ -40,6 +43,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; import org.apache.pig.data.Tuple; +import org.apache.pig.data.DataBag; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.PigNullableWritable; @@ -48,6 +52,7 @@ import org.apache.pig.impl.plan.Operator import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.SpillableMemoryManager; +import org.apache.pig.impl.util.Pair; import org.apache.pig.tools.pigstats.PigStatusReporter; public abstract class PigMapBase extends Mapper { @@ -58,13 +63,15 @@ public abstract class PigMapBase extends protected byte keyType; //Map Plan - protected PhysicalPlan mp; + protected PhysicalPlan mp = null; // Store operators protected List stores; protected TupleFactory tf = TupleFactory.getInstance(); + boolean inIllustrator = false; + Context outputCollector; // Reporter that will be used by operators @@ -81,6 +88,14 @@ public abstract class PigMapBase extends private volatile boolean initialized = false; /** + * for local map/reduce simulation + * @param plan the map plan + */ + public void setMapPlan(PhysicalPlan plan) { + mp = plan; + } + + /** * Will be called when all the tuples in the input * are done. So reporter thread should be closed. */ @@ -142,14 +157,16 @@ public abstract class PigMapBase extends SpillableMemoryManager.configure(ConfigurationUtil.toProperties(job)); PigMapReduce.sJobContext = context; PigMapReduce.sJobConf = context.getConfiguration(); + inIllustrator = (context instanceof IllustratorContext); PigContext.setPackageImportList((ArrayList)ObjectSerializer.deserialize(job.get("udf.import.list"))); pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext")); if (pigContext.getLog4jProperties()!=null) PropertyConfigurator.configure(pigContext.getLog4jProperties()); - mp = (PhysicalPlan) ObjectSerializer.deserialize( - job.get("pig.mapPlan")); + if (mp == null) + mp = (PhysicalPlan) ObjectSerializer.deserialize( + job.get("pig.mapPlan")); stores = PlanHelper.getStores(mp); // To be removed @@ -207,7 +224,8 @@ public abstract class PigMapBase extends MapReducePOStoreImpl impl = new MapReducePOStoreImpl(context); store.setStoreImpl(impl); - store.setUp(); + if (!pigContext.inIllustrator) + store.setUp(); } boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning")); @@ -225,7 +243,13 @@ public abstract class PigMapBase extends } for (PhysicalOperator root : roots) { - root.attachInput(tf.newTupleNoCopy(inpTuple.getAll())); + if (inIllustrator) { + if (root != null) { + root.attachInput(inpTuple); + } + } else { + root.attachInput(tf.newTupleNoCopy(inpTuple.getAll())); + } } runPipeline(leaf); @@ -284,4 +308,76 @@ public abstract class PigMapBase extends this.keyType = keyType; } + /** + * + * Get mapper's illustrator context + * + * @param conf Configuration + * @param input Input bag to serve as data source + * @param output Map output buffer + * @param split the split + * @return Illustrator's context + * @throws IOException + * @throws InterruptedException + */ + public Context getIllustratorContext(Configuration conf, DataBag input, + List> output, InputSplit split) + throws IOException, InterruptedException { + return new IllustratorContext(conf, input, output, split); + } + + public class IllustratorContext extends Context { + private DataBag input; + List> output; + private Iterator it = null; + private Tuple value = null; + private boolean init = false; + + public IllustratorContext(Configuration conf, DataBag input, + List> output, + InputSplit split) throws IOException, InterruptedException { + super(conf, new TaskAttemptID(), null, null, null, null, split); + if (output == null) + throw new IOException("Null output can not be used"); + this.input = input; this.output = output; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (input == null) { + if (!init) { + init = true; + return true; + } + return false; + } + if (it == null) + it = input.iterator(); + if (!it.hasNext()) + return false; + value = it.next(); + return true; + } + + @Override + public Text getCurrentKey() { + return null; + } + + @Override + public Tuple getCurrentValue() { + return value; + } + + @Override + public void write(PigNullableWritable key, Writable value) + throws IOException, InterruptedException { + output.add(new Pair(key, value)); + } + + @Override + public void progress() { + + } + } } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=1045314&r1=1045313&r2=1045314&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Mon Dec 13 19:11:00 2010 @@ -18,18 +18,24 @@ package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Collections; +import java.util.Comparator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapreduce.Reducer; -import org.apache.log4j.PropertyConfigurator; +import org.apache.hadoop.mapreduce.TaskAttemptID; + import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.HDataType; @@ -43,6 +49,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; +import org.apache.pig.pen.FakeRawKeyValueIterator; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; @@ -55,6 +62,7 @@ import org.apache.pig.impl.plan.VisitorE import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.SpillableMemoryManager; import org.apache.pig.impl.util.UDFContext; +import org.apache.pig.impl.util.Pair; import org.apache.pig.tools.pigstats.PigStatusReporter; /** @@ -111,8 +119,8 @@ public class PigMapReduce { // value. The value needs it so that POPackage can properly // assign the tuple to its slot in the projection. key.setIndex(index); - val.setIndex(index); - + val.setIndex(index); + oc.write(key, val); } } @@ -194,7 +202,6 @@ public class PigMapReduce { // set the partition wrappedKey.setPartition(partitionIndex); val.setIndex(index); - oc.write(wrappedKey, val); } @@ -254,7 +261,7 @@ public class PigMapReduce { protected final Log log = LogFactory.getLog(getClass()); //The reduce plan - protected PhysicalPlan rp; + protected PhysicalPlan rp = null; // Store operators protected List stores; @@ -279,6 +286,16 @@ public class PigMapReduce { PigContext pigContext = null; protected volatile boolean initialized = false; + private boolean inIllustrator = false; + + /** + * Set the reduce plan: to be used by local runner for illustrator + * @param plan Reduce plan + */ + public void setReducePlan(PhysicalPlan plan) { + rp = plan; + } + /** * Configures the Reduce plan, the POPackage operator * and the reporter thread @@ -287,7 +304,9 @@ public class PigMapReduce { @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); - + inIllustrator = (context instanceof IllustratorContext); + if (inIllustrator) + pack = ((IllustratorContext) context).pack; Configuration jConf = context.getConfiguration(); SpillableMemoryManager.configure(ConfigurationUtil.toProperties(jConf)); sJobContext = context; @@ -296,11 +315,13 @@ public class PigMapReduce { PigContext.setPackageImportList((ArrayList)ObjectSerializer.deserialize(jConf.get("udf.import.list"))); pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext")); - rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf - .get("pig.reducePlan")); + if (rp == null) + rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf + .get("pig.reducePlan")); stores = PlanHelper.getStores(rp); - pack = (POPackage)ObjectSerializer.deserialize(jConf.get("pig.reduce.package")); + if (!inIllustrator) + pack = (POPackage)ObjectSerializer.deserialize(jConf.get("pig.reduce.package")); // To be removed if(rp.isEmpty()) log.debug("Reduce Plan empty!"); @@ -352,12 +373,13 @@ public class PigMapReduce { PhysicalOperator.setPigLogger(pigHadoopLogger); - for (POStore store: stores) { - MapReducePOStoreImpl impl - = new MapReducePOStoreImpl(context); - store.setStoreImpl(impl); - store.setUp(); - } + if (!inIllustrator) + for (POStore store: stores) { + MapReducePOStoreImpl impl + = new MapReducePOStoreImpl(context); + store.setStoreImpl(impl); + store.setUp(); + } } // In the case we optimize the join, we combine @@ -512,6 +534,127 @@ public class PigMapReduce { PhysicalOperator.setReporter(null); initialized = false; } + + /** + * Get reducer's illustrator context + * + * @param input Input buffer as output by maps + * @param pkg package + * @return reducer's illustrator context + * @throws IOException + * @throws InterruptedException + */ + public Context getIllustratorContext(Job job, + List> input, POPackage pkg) throws IOException, InterruptedException { + return new IllustratorContext(job, input, pkg); + } + + @SuppressWarnings("unchecked") + public class IllustratorContext extends Context { + private PigNullableWritable currentKey = null, nextKey = null; + private NullableTuple nextValue = null; + private List currentValues = null; + private Iterator> it; + private final ByteArrayOutputStream bos; + private final DataOutputStream dos; + private final RawComparator sortComparator, groupingComparator; + POPackage pack = null; + + public IllustratorContext(Job job, + List> input, + POPackage pkg + ) throws IOException, InterruptedException { + super(job.getJobConf(), new TaskAttemptID(), new FakeRawKeyValueIterator(input.iterator().hasNext()), + null, null, null, null, null, null, PigNullableWritable.class, NullableTuple.class); + bos = new ByteArrayOutputStream(); + dos = new DataOutputStream(bos); + org.apache.hadoop.mapreduce.Job nwJob = new org.apache.hadoop.mapreduce.Job(job.getJobConf()); + sortComparator = nwJob.getSortComparator(); + groupingComparator = nwJob.getGroupingComparator(); + + Collections.sort(input, new Comparator>() { + @Override + public int compare(Pair o1, + Pair o2) { + try { + o1.first.write(dos); + int l1 = bos.size(); + o2.first.write(dos); + int l2 = bos.size(); + byte[] bytes = bos.toByteArray(); + bos.reset(); + return sortComparator.compare(bytes, 0, l1, bytes, l1, l2-l1); + } catch (IOException e) { + throw new RuntimeException("Serialization exception in sort:"+e.getMessage()); + } + } + } + ); + currentValues = new ArrayList(); + it = input.iterator(); + if (it.hasNext()) { + Pair entry = it.next(); + nextKey = entry.first; + nextValue = (NullableTuple) entry.second; + } + pack = pkg; + } + + @Override + public PigNullableWritable getCurrentKey() { + return currentKey; + } + + @Override + public boolean nextKey() { + if (nextKey == null) + return false; + currentKey = nextKey; + currentValues.clear(); + currentValues.add(nextValue); + nextKey = null; + for(; it.hasNext(); ) { + Pair entry = it.next(); + /* Why can't raw comparison be used? + byte[] bytes; + int l1, l2; + try { + currentKey.write(dos); + l1 = bos.size(); + entry.first.write(dos); + l2 = bos.size(); + bytes = bos.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("nextKey exception : "+e.getMessage()); + } + bos.reset(); + if (groupingComparator.compare(bytes, 0, l1, bytes, l1, l2-l1) == 0) + */ + if (groupingComparator.compare(currentKey, entry.first) == 0) + { + currentValues.add((NullableTuple)entry.second); + } else { + nextKey = entry.first; + nextValue = (NullableTuple) entry.second; + break; + } + } + return true; + } + + @Override + public Iterable getValues() { + return currentValues; + } + + @Override + public void write(PigNullableWritable k, Writable t) { + } + + @Override + public void progress() { + } + } } /** Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=1045314&r1=1045313&r2=1045314&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Mon Dec 13 19:11:00 2010 @@ -91,6 +91,10 @@ public class LogToPhyTranslationVisitor return currentPlan; } + public Map getLogToPhyMap() { + return logToPhyMap; + } + @Override protected void visit(LOGreaterThan op) throws VisitorException { String scope = op.getOperatorKey().scope; Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1045314&r1=1045313&r2=1045314&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Mon Dec 13 19:11:00 2010 @@ -35,6 +35,8 @@ import org.apache.pig.impl.plan.Operator import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.pen.util.LineageTracer; +import org.apache.pig.pen.Illustrator; +import org.apache.pig.pen.Illustrable; /** * @@ -58,7 +60,7 @@ import org.apache.pig.pen.util.LineageTr * only those types that are supported. * */ -public abstract class PhysicalOperator extends Operator implements Cloneable { +public abstract class PhysicalOperator extends Operator implements Illustrable, Cloneable { private Log log = LogFactory.getLog(getClass()); @@ -125,8 +127,14 @@ public abstract class PhysicalOperator e static final protected Map dummyMap = null; + // TODO: This is not needed. But a lot of tests check serialized physical plans + // that are sensitive to the serialized image of the contained physical operators. + // So for now, just keep it. Later it'll be cleansed along with those test golden + // files protected LineageTracer lineageTracer; + protected transient Illustrator illustrator = null; + private boolean accum; private transient boolean accumStart; @@ -149,8 +157,13 @@ public abstract class PhysicalOperator e res = new Result(); } - public void setLineageTracer(LineageTracer lineage) { - this.lineageTracer = lineage; + @Override + public void setIllustrator(Illustrator illustrator) { + this.illustrator = illustrator; + } + + public Illustrator getIllustrator() { + return illustrator; } public int getRequestedParallelism() { Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryComparisonOperator.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryComparisonOperator.java?rev=1045314&r1=1045313&r2=1045314&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryComparisonOperator.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryComparisonOperator.java Mon Dec 13 19:11:00 2010 @@ -17,6 +17,7 @@ */ package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators; +import org.apache.pig.data.Tuple; import org.apache.pig.impl.plan.OperatorKey; /** @@ -66,4 +67,11 @@ public abstract class BinaryComparisonOp operandType = op.operandType; super.cloneHelper(op); } + + public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) { + if (illustrator != null) { + illustrator.setSubExpResult(eqClassIndex == 0); + } + return null; + } } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryExpressionOperator.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryExpressionOperator.java?rev=1045314&r1=1045313&r2=1045314&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryExpressionOperator.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryExpressionOperator.java Mon Dec 13 19:11:00 2010 @@ -20,7 +20,9 @@ package org.apache.pig.backend.hadoop.ex import java.util.ArrayList; import java.util.List; +import org.apache.pig.data.Tuple; import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.util.IdentityHashSet; /** * A base class for all Binary expression operators. @@ -84,4 +86,9 @@ public abstract class BinaryExpressionOp rhs = op.rhs; super.cloneHelper(op); } -} + + @Override + public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) { + return null; + } + } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java?rev=1045314&r1=1045313&r2=1045314&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java Mon Dec 13 19:11:00 2010 @@ -204,4 +204,9 @@ public class ConstantExpression extends public List getChildExpressions() { return null; } + + @Override + public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) { + return (Tuple) out; + } } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java?rev=1045314&r1=1045313&r2=1045314&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java Mon Dec 13 19:11:00 2010 @@ -185,7 +185,7 @@ public class EqualToExpr extends BinaryC }else{ throw new ExecException("The left side and right side has the different types"); } - + illustratorMarkup(null, left.result, (Boolean) left.result ? 0 : 1); return left; } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java?rev=1045314&r1=1045313&r2=1045314&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java Mon Dec 13 19:11:00 2010 @@ -35,6 +35,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.pen.Illustrator; /** * A base class for all types of expressions. All expression @@ -55,6 +56,11 @@ public abstract class ExpressionOperator } @Override + public void setIllustrator(Illustrator illustrator) { + this.illustrator = illustrator; + } + + @Override public boolean supportsMultipleOutputs() { return false; } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java?rev=1045314&r1=1045313&r2=1045314&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java Mon Dec 13 19:11:00 2010 @@ -154,6 +154,7 @@ public class GTOrEqualToExpr extends Bin } else { left.result = falseRef; } + illustratorMarkup(null, left.result, (Boolean) left.result ? 0 : 1); return left; } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java?rev=1045314&r1=1045313&r2=1045314&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java Mon Dec 13 19:11:00 2010 @@ -17,15 +17,11 @@ */ package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; -import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.NodeIdGenerator; @@ -37,7 +33,6 @@ public class GreaterThanExpr extends Bin * */ private static final long serialVersionUID = 1L; - transient private final Log log = LogFactory.getLog(getClass()); public GreaterThanExpr(OperatorKey k) { this(k, -1); @@ -60,7 +55,6 @@ public class GreaterThanExpr extends Bin @Override public Result getNext(Boolean bool) throws ExecException { - byte status; Result left, right; switch (operandType) { @@ -154,6 +148,7 @@ public class GreaterThanExpr extends Bin } else { left.result = falseRef; } + illustratorMarkup(null, left.result, (Boolean) left.result ? 0 : 1); return left; } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java?rev=1045314&r1=1045313&r2=1045314&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java Mon Dec 13 19:11:00 2010 @@ -154,6 +154,7 @@ public class LTOrEqualToExpr extends Bin } else { left.result = falseRef; } + illustratorMarkup(null, left.result, (Boolean) left.result ? 0 : 1); return left; } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java?rev=1045314&r1=1045313&r2=1045314&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java Mon Dec 13 19:11:00 2010 @@ -154,6 +154,7 @@ public class LessThanExpr extends Binary } else { left.result = falseRef; } + illustratorMarkup(null, left.result, (Boolean) left.result ? 0 : 1); return left; } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java?rev=1045314&r1=1045313&r2=1045314&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java Mon Dec 13 19:11:00 2010 @@ -184,6 +184,7 @@ public class NotEqualToExpr extends Bina }else{ throw new ExecException("The left side and right side has the different types"); } + illustratorMarkup(null, left.result, (Boolean) left.result ? 0 : 1); return left; } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java?rev=1045314&r1=1045313&r2=1045314&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java Mon Dec 13 19:11:00 2010 @@ -78,9 +78,20 @@ public class POAnd extends BinaryCompari // 3) f f f f // Short circuit - if lhs is false, return false; ROW 3 above is handled with this - if (left.result != null && !(((Boolean)left.result).booleanValue())) return left; + boolean returnLeft = false; + if (left.result != null && !(((Boolean)left.result).booleanValue())) { + if (illustrator == null) { + return left; + } + illustratorMarkup(null, left.result, 1); + returnLeft = true; + } Result right = rhs.getNext(dummyBool); + if (returnLeft) { + return left; + } + // pass on ERROR and EOP if(right.returnStatus != POStatus.STATUS_OK && right.returnStatus != POStatus.STATUS_NULL) { return right; @@ -94,6 +105,8 @@ public class POAnd extends BinaryCompari // No matter what, what we get from the right side is what we'll // return, null, true, or false. + if (right.result != null) + illustratorMarkup(null, right.result, (Boolean) right.result ? 0 : 1); return right; } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java?rev=1045314&r1=1045313&r2=1045314&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java Mon Dec 13 19:11:00 2010 @@ -32,6 +32,7 @@ import org.apache.pig.data.Tuple; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.util.IdentityHashSet; public class POBinCond extends ExpressionOperator { @@ -65,7 +66,9 @@ public class POBinCond extends Expressio Result res = cond.getNext(b); if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res; - return ((Boolean)res.result) == true ? lhs.getNext(b) : rhs.getNext(b); + Result result = ((Boolean)res.result) == true ? lhs.getNext(b) : rhs.getNext(b); + illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1); + return result; } @@ -88,7 +91,9 @@ public class POBinCond extends Expressio Result res = cond.getNext(dummyBool); if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res; - return ((Boolean)res.result) == true ? lhs.getNext(db) : rhs.getNext(db); + Result result = ((Boolean)res.result) == true ? lhs.getNext(db) : rhs.getNext(db); + illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1); + return result; } @Override @@ -109,7 +114,9 @@ public class POBinCond extends Expressio } Result res = cond.getNext(dummyBool); if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res; - return ((Boolean)res.result) == true ? lhs.getNext(ba) : rhs.getNext(ba); + Result result = ((Boolean)res.result) == true ? lhs.getNext(ba) : rhs.getNext(ba); + illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1); + return result; } @Override @@ -130,7 +137,9 @@ public class POBinCond extends Expressio } Result res = cond.getNext(dummyBool); if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res; - return ((Boolean)res.result) == true ? lhs.getNext(d) : rhs.getNext(d); + Result result = ((Boolean)res.result) == true ? lhs.getNext(d) : rhs.getNext(d); + illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1); + return result; } @Override @@ -151,7 +160,9 @@ public class POBinCond extends Expressio } Result res = cond.getNext(dummyBool); if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res; - return ((Boolean)res.result) == true ? lhs.getNext(f) : rhs.getNext(f); + Result result = ((Boolean)res.result) == true ? lhs.getNext(f) : rhs.getNext(f); + illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1); + return result; } @Override @@ -171,7 +182,9 @@ public class POBinCond extends Expressio } Result res = cond.getNext(dummyBool); if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res; - return ((Boolean)res.result) == true ? lhs.getNext(i) : rhs.getNext(i); + Result result = ((Boolean)res.result) == true ? lhs.getNext(i) : rhs.getNext(i); + illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1); + return result; } @Override @@ -192,7 +205,9 @@ public class POBinCond extends Expressio } Result res = cond.getNext(dummyBool); if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res; - return ((Boolean)res.result) == true ? lhs.getNext(l) : rhs.getNext(l); + Result result = ((Boolean)res.result) == true ? lhs.getNext(l) : rhs.getNext(l); + illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1); + return result; } @Override @@ -213,7 +228,9 @@ public class POBinCond extends Expressio } Result res = cond.getNext(dummyBool); if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res; - return ((Boolean)res.result) == true ? lhs.getNext(m) : rhs.getNext(m); + Result result = ((Boolean)res.result) == true ? lhs.getNext(m) : rhs.getNext(m); + illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1); + return result; } @Override @@ -234,7 +251,9 @@ public class POBinCond extends Expressio } Result res = cond.getNext(dummyBool); if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res; - return ((Boolean)res.result) == true ? lhs.getNext(s) : rhs.getNext(s); + Result result = ((Boolean)res.result) == true ? lhs.getNext(s) : rhs.getNext(s); + illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1); + return result; } @Override @@ -255,7 +274,9 @@ public class POBinCond extends Expressio } Result res = cond.getNext(dummyBool); if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res; - return ((Boolean)res.result) == true ? lhs.getNext(t) : rhs.getNext(t); + Result result = ((Boolean)res.result) == true ? lhs.getNext(t) : rhs.getNext(t); + illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1); + return result; } @Override @@ -338,4 +359,11 @@ public class POBinCond extends Expressio return child; } + @Override + public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) { + if(illustrator != null) { + + } + return null; + } } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java?rev=1045314&r1=1045313&r2=1045314&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java Mon Dec 13 19:11:00 2010 @@ -1327,4 +1327,8 @@ public class POCast extends ExpressionOp return funcSpec; } + @Override + public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) { + return (Tuple) out; + } } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POIsNull.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POIsNull.java?rev=1045314&r1=1045313&r2=1045314&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POIsNull.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POIsNull.java Mon Dec 13 19:11:00 2010 @@ -76,6 +76,7 @@ public class POIsNull extends UnaryCompa } else { res.result = false; } + illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1); } return res; case DataType.DOUBLE: @@ -86,6 +87,7 @@ public class POIsNull extends UnaryCompa } else { res.result = false; } + illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1); } return res; case DataType.INTEGER: @@ -96,6 +98,7 @@ public class POIsNull extends UnaryCompa } else { res.result = false; } + illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1); } return res; case DataType.CHARARRAY: @@ -106,6 +109,7 @@ public class POIsNull extends UnaryCompa } else { res.result = false; } + illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1); } return res; case DataType.BOOLEAN: @@ -116,6 +120,7 @@ public class POIsNull extends UnaryCompa } else { res.result = false; } + illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1); } return res; case DataType.LONG: @@ -126,6 +131,7 @@ public class POIsNull extends UnaryCompa } else { res.result = false; } + illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1); } return res; case DataType.FLOAT: @@ -136,6 +142,7 @@ public class POIsNull extends UnaryCompa } else { res.result = false; } + illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1); } return res; case DataType.MAP: @@ -146,6 +153,7 @@ public class POIsNull extends UnaryCompa } else { res.result = false; } + illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1); } return res; case DataType.TUPLE: @@ -156,6 +164,7 @@ public class POIsNull extends UnaryCompa } else { res.result = false; } + illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1); } return res; case DataType.BAG: @@ -166,6 +175,7 @@ public class POIsNull extends UnaryCompa } else { res.result = false; } + illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1); } return res; default: { Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POMapLookUp.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POMapLookUp.java?rev=1045314&r1=1045313&r2=1045314&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POMapLookUp.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POMapLookUp.java Mon Dec 13 19:11:00 2010 @@ -165,6 +165,8 @@ public class POMapLookUp extends Express return null; } - - + @Override + public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) { + return (Tuple) out; + } } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONegative.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONegative.java?rev=1045314&r1=1045313&r2=1045314&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONegative.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONegative.java Mon Dec 13 19:11:00 2010 @@ -22,6 +22,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.impl.plan.VisitorException; @@ -101,4 +102,9 @@ public class PONegative extends UnaryExp clone.cloneHelper(this); return clone; } + + @Override + public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) { + return (Tuple) out; + } }