Return-Path: Delivered-To: apmail-hadoop-pig-commits-archive@www.apache.org Received: (qmail 24038 invoked from network); 5 May 2009 15:50:30 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 5 May 2009 15:50:30 -0000 Received: (qmail 99762 invoked by uid 500); 5 May 2009 15:50:30 -0000 Delivered-To: apmail-hadoop-pig-commits-archive@hadoop.apache.org Received: (qmail 99732 invoked by uid 500); 5 May 2009 15:50:30 -0000 Mailing-List: contact pig-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: pig-dev@hadoop.apache.org Delivered-To: mailing list pig-commits@hadoop.apache.org Received: (qmail 99721 invoked by uid 500); 5 May 2009 15:50:29 -0000 Delivered-To: apmail-incubator-pig-commits@incubator.apache.org Received: (qmail 99718 invoked by uid 99); 5 May 2009 15:50:29 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 May 2009 15:50:29 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 May 2009 15:50:20 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 4298B2388C4D; Tue, 5 May 2009 15:49:59 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r771844 - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/impl/logicalLay... Date: Tue, 05 May 2009 15:49:58 -0000 To: pig-commits@incubator.apache.org From: gates@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090505154959.4298B2388C4D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gates Date: Tue May 5 15:49:58 2009 New Revision: 771844 URL: http://svn.apache.org/viewvc?rev=771844&view=rev Log: PIG-741 Allow limit to be nested in foreach (gates). Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/PigServer.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=771844&r1=771843&r2=771844&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Tue May 5 15:49:58 2009 @@ -34,6 +34,8 @@ PIG-775: PORelationToExprProject should create a NonSpillableDataBag to create empty bags (pradeepkth) +PIG-741: Allow limit to be nested in a foreach. + PIG-743: To implement clover (gkesavan) PIG-701: Implement IVY for resolving pig dependencies (gkesavan) Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=771844&r1=771843&r2=771844&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Tue May 5 15:49:58 2009 @@ -185,7 +185,7 @@ * Starts batch execution mode. */ public void setBatchOn() { - log.info("Create a new graph."); + log.debug("Create a new graph."); if (currDAG != null) { graphs.push(currDAG); Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=771844&r1=771843&r2=771844&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Tue May 5 15:49:58 2009 @@ -38,6 +38,7 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage; @@ -762,6 +763,10 @@ sawNonAlgebraic = true; } + public void visitLimit(POLimit limit) throws VisitorException { + sawNonAlgebraic = true; + } + private boolean checkSuccessorIsLeaf(PhysicalOperator leaf, PhysicalOperator opToCheck) { List succs = mPlan.getSuccessors(opToCheck); if(succs.size() == 1) { Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java?rev=771844&r1=771843&r2=771844&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java Tue May 5 15:49:58 2009 @@ -94,9 +94,7 @@ res.result = it.next(); if (res.result == null){ res.returnStatus = POStatus.STATUS_EOP; - inputsAccumulated = false; - distinctBag = null; - it = null; + reset(); } else { res.returnStatus = POStatus.STATUS_OK; } @@ -119,6 +117,13 @@ } @Override + public void reset() { + inputsAccumulated = false; + distinctBag = null; + it = null; + } + + @Override public void visit(PhyPlanVisitor v) throws VisitorException { v.visitDistinct(this); } Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=771844&r1=771843&r2=771844&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Tue May 5 15:49:58 2009 @@ -201,7 +201,6 @@ for (PhysicalOperator po : opsToBeReset) { po.reset(); } - res = processPlan(); processingPlan = true; Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java?rev=771844&r1=771843&r2=771844&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java Tue May 5 15:49:58 2009 @@ -29,6 +29,7 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ComparisonOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; +import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; @@ -115,4 +116,18 @@ public void visit(PhyPlanVisitor v) throws VisitorException { v.visitLimit(this); } + + @Override + public void reset() { + soFar = 0; + } + + @Override + public POLimit clone() throws CloneNotSupportedException { + POLimit newLimit = new POLimit(new OperatorKey(this.mKey.scope, + NodeIdGenerator.getGenerator().getNextNodeId(this.mKey.scope)), + this.requestedParallelism, this.inputs); + newLimit.mLimit = this.mLimit; + return newLimit; + } } Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=771844&r1=771843&r2=771844&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java Tue May 5 15:49:58 2009 @@ -282,9 +282,7 @@ res.returnStatus = POStatus.STATUS_OK; } else { res.returnStatus = POStatus.STATUS_EOP; - inputsAccumulated = false; - sortedBag = null; - it = null; + reset(); } return res; } @@ -307,6 +305,13 @@ v.visitSort(this); } + @Override + public void reset() { + inputsAccumulated = false; + sortedBag = null; + it = null; + } + public List getSortPlans() { return sortPlans; } Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=771844&r1=771843&r2=771844&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Tue May 5 15:49:58 2009 @@ -1278,7 +1278,7 @@ } { ( - input = NestedExpr(lp) {log.debug("Filter input: " + input);} + input = NestedExpr(lp) {log.debug("Limit input: " + input);} t = ) { @@ -2184,6 +2184,7 @@ | item = NestedFilter(over,specs,lp, input) | item = NestedSortOrArrange(over,specs,lp, input) | item = NestedDistinct(over,specs,lp, input) +| item = NestedLimit(over,specs,lp, input) ) ) { @@ -2411,7 +2412,40 @@ } } - +LogicalOperator NestedLimit(Schema over, Map specs, LogicalPlan lp, LogicalOperator input): +{ + LogicalOperator eOp; + Schema subSchema = null; + Token t; + log.trace("Entering LimitClause"); +} +{ + ( + + ( + LOOKAHEAD(NestedProject(over, specs, lp, input)) eOp = NestedProject(over, specs, lp, input) +| LOOKAHEAD({ null != specs.get(getToken(1).image) }) t = {eOp = specs.get(t.image);} +| eOp = BaseEvalSpec(over, specs, lp, input) + ) + {subSchema = eOp.getSchema();} + t = + ) + { + lp.add(eOp); + log.debug("Added " + eOp.getAlias() + " to the logical plan"); + long l = Integer.parseInt(t.image); + LogicalOperator limit = new LOLimit(lp, new OperatorKey(scope, getNextId()), l); + lp.add(limit); + log.debug("Added operator " + limit.getClass().getName() + " to the logical plan"); + + lp.connect(eOp, limit); + log.debug("Connected the limit input to the limit"); + + log.trace("Exiting NestedLimit"); + return limit; + } +} + LogicalOperator GenerateStatement(Schema over, Map specs, LogicalPlan lp, LogicalOperator input): { LogicalOperator spec = null; Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java?rev=771844&r1=771843&r2=771844&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java Tue May 5 15:49:58 2009 @@ -27,8 +27,7 @@ import java.util.Iterator; import java.util.Random; -import java.io.File; -import java.io.IOException; +import java.io.*; import java.text.DecimalFormat; public class TestForEachNestedPlanLocal extends TestCase { @@ -65,6 +64,30 @@ } } + @Test + public void testInnerLimit() throws Exception { + File tmpFile = genDataSetFileOneGroup(); + pig.registerQuery("a = load '" + Util.generateURI(tmpFile.toString()) + "'; "); + pig.registerQuery("b = group a by $0; "); + pig.registerQuery("c = foreach b { " + " c1 = limit $1 5; " + + " generate COUNT(c1); " + "};"); + Iterator it = pig.openIterator("c"); + Tuple t = null; + long count[] = new long[3]; + for (int i = 0; i < 3 && it.hasNext(); i++) { + t = it.next(); + count[i] = (Long)t.get(0); + } + + Assert.assertFalse(it.hasNext()); + + Assert.assertEquals(3L, count[0]); + Assert.assertEquals(5L, count[1]); + Assert.assertEquals(5L, count[2]); + } + + + /* @Test @@ -113,4 +136,34 @@ return TestHelper.createTempFile(data) ; } + + private File genDataSetFileOneGroup() throws IOException { + + File fp1 = File.createTempFile("test", "txt"); + PrintStream ps = new PrintStream(new FileOutputStream(fp1)); + + ps.println("lost\tjack"); + ps.println("lost\tkate"); + ps.println("lost\tsawyer"); + ps.println("lost\tdesmond"); + ps.println("lost\thurley"); + ps.println("lost\tlocke"); + ps.println("lost\tsun"); + ps.println("lost\tcharlie"); + ps.println("lost\tjin"); + ps.println("lost\tben"); + ps.println("lotr\tfrodo"); + ps.println("lotr\tsam"); + ps.println("lotr\tmerry"); + ps.println("lotr\tpippen"); + ps.println("lotr\tbilbo"); + ps.println("3stooges\tlarry"); + ps.println("3stooges\tmoe"); + ps.println("3stooges\tcurly"); + + ps.close(); + + return fp1; + } + }