Return-Path: Delivered-To: apmail-hadoop-pig-commits-archive@www.apache.org Received: (qmail 45990 invoked from network); 5 Aug 2010 19:25:31 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 5 Aug 2010 19:25:31 -0000 Received: (qmail 51691 invoked by uid 500); 5 Aug 2010 19:25:31 -0000 Delivered-To: apmail-hadoop-pig-commits-archive@hadoop.apache.org Received: (qmail 51672 invoked by uid 500); 5 Aug 2010 19:25:31 -0000 Mailing-List: contact pig-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: pig-dev@hadoop.apache.org Delivered-To: mailing list pig-commits@hadoop.apache.org Received: (qmail 51665 invoked by uid 500); 5 Aug 2010 19:25:31 -0000 Delivered-To: apmail-incubator-pig-commits@incubator.apache.org Received: (qmail 51662 invoked by uid 99); 5 Aug 2010 19:25:31 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Aug 2010 19:25:31 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Aug 2010 19:25:30 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id C9BF223889E3; Thu, 5 Aug 2010 19:24:13 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r982739 - in /hadoop/pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java test/org/apache/pig/test/TestSampleOptimizer.java Date: Thu, 05 Aug 2010 19:24:13 -0000 To: pig-commits@incubator.apache.org From: pradeepkth@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100805192413.C9BF223889E3@eris.apache.org> Author: pradeepkth Date: Thu Aug 5 19:24:13 2010 New Revision: 982739 URL: http://svn.apache.org/viewvc?rev=982739&view=rev Log: PIG-1534: Code discovering UDFs in the script has a bug in a order by case (pradeepkth) Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=982739&r1=982738&r2=982739&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Thu Aug 5 19:24:13 2010 @@ -114,6 +114,9 @@ PIG-1309: Map-side Cogroup (ashutoshc) BUG FIXES +PIG-1534: Code discovering UDFs in the script has a bug in a order by case +(pradeepkth) + PIG-1533: Compression codec should be a per-store property (rding) PIG-1527: No need to deserialize UDFContext on the client side (rding) Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java?rev=982739&r1=982738&r2=982739&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java Thu Aug 5 19:24:13 2010 @@ -63,6 +63,7 @@ public class SampleOptimizer extends MRO this.mPlan.remove(op); } + @Override public void visitMROp(MapReduceOper mr) throws VisitorException { // See if this is a sampling job. List pos = mr.mapPlan.getRoots(); @@ -168,6 +169,8 @@ public class SampleOptimizer extends MRO // First argument is FuncSpec of loader function to subsume, this we want to set for // ourselves. rslargs[0] = predFs.getFuncSpec().toString(); + // Add the loader's funcspec to the list of udf's associated with this mr operator + mr.UDFs.add(rslargs[0]); // Second argument is the number of samples per block, read this from the original. rslargs[1] = load.getLFile().getFuncSpec().getCtorArgs()[1]; FileSpec fs = new FileSpec(predFs.getFileName(),new FuncSpec(loadFunc, rslargs)); @@ -191,6 +194,8 @@ public class SampleOptimizer extends MRO newLoad.setSignature(predLoad.getSignature()); try { succ.mapPlan.replace(succLoad, newLoad); + // Add the loader's funcspec to the list of udf's associated with this mr operator + succ.UDFs.add(newLoad.getLFile().getFuncSpec().toString()); } catch (PlanException e) { throw new VisitorException(e); } Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java?rev=982739&r1=982738&r2=982739&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java Thu Aug 5 19:24:13 2010 @@ -223,4 +223,46 @@ public class TestSampleOptimizer { // After optimizer visits, number of MR jobs = 2 assertEquals(2,count); } + + @Test + public void testOrderByUDFSet() throws Exception { + LogicalPlanTester planTester = new LogicalPlanTester() ; + planTester.buildPlan("a = load 'input1' using BinStorage();"); + planTester.buildPlan("b = order a by $0;"); + LogicalPlan lp = planTester.buildPlan("store b into '/tmp';"); + + PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc); + MROperPlan mrPlan = Util.buildMRPlan(pp, pc); + + int count = 1; + MapReduceOper mrOper = mrPlan.getRoots().get(0); + while(mrPlan.getSuccessors(mrOper) != null) { + mrOper = mrPlan.getSuccessors(mrOper).get(0); + ++count; + } + // Before optimizer visits, number of MR jobs = 3. + assertEquals(3,count); + + SampleOptimizer so = new SampleOptimizer(mrPlan); + so.visit(); + + count = 1; + mrOper = mrPlan.getRoots().get(0); + // the first mrOper should be the sampling job - it's udf list should only + // contain BinStorage + assertTrue(mrOper.UDFs.size()==1); + assertTrue(mrOper.UDFs.contains("BinStorage")); + while(mrPlan.getSuccessors(mrOper) != null) { + mrOper = mrPlan.getSuccessors(mrOper).get(0); + // the second mr oper is the real order by job - it's udf list should + // contain BinStorage corresponding to the load and PigStorage + // corresponding to the store + assertTrue(mrOper.UDFs.size()==2); + assertTrue(mrOper.UDFs.contains("BinStorage")); + assertTrue(mrOper.UDFs.contains("org.apache.pig.builtin.PigStorage")); + ++count; + } + // After optimizer visits, number of MR jobs = 2 + assertEquals(2,count); + } }