Return-Path: X-Original-To: apmail-pig-commits-archive@www.apache.org Delivered-To: apmail-pig-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 37DFB10678 for ; Sat, 8 Feb 2014 18:27:12 +0000 (UTC) Received: (qmail 6833 invoked by uid 500); 8 Feb 2014 18:27:11 -0000 Delivered-To: apmail-pig-commits-archive@pig.apache.org Received: (qmail 6781 invoked by uid 500); 8 Feb 2014 18:27:11 -0000 Mailing-List: contact commits-help@pig.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pig.apache.org Delivered-To: mailing list commits@pig.apache.org Received: (qmail 6774 invoked by uid 99); 8 Feb 2014 18:27:11 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 08 Feb 2014 18:27:11 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 08 Feb 2014 18:27:07 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 8EFDF23888FE; Sat, 8 Feb 2014 18:26:46 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1566082 - in /pig/branches/branch-0.12: CHANGES.txt src/org/apache/pig/newplan/logical/relational/LOForEach.java test/org/apache/pig/test/TestNewPlanFilterRule.java Date: Sat, 08 Feb 2014 18:26:46 -0000 To: commits@pig.apache.org From: daijy@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140208182646.8EFDF23888FE@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: daijy Date: Sat Feb 8 18:26:46 2014 New Revision: 1566082 URL: http://svn.apache.org/r1566082 Log: PIG-3347: Store invocation brings side effect Modified: pig/branches/branch-0.12/CHANGES.txt pig/branches/branch-0.12/src/org/apache/pig/newplan/logical/relational/LOForEach.java pig/branches/branch-0.12/test/org/apache/pig/test/TestNewPlanFilterRule.java Modified: pig/branches/branch-0.12/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/CHANGES.txt?rev=1566082&r1=1566081&r2=1566082&view=diff ============================================================================== --- pig/branches/branch-0.12/CHANGES.txt (original) +++ pig/branches/branch-0.12/CHANGES.txt Sat Feb 8 18:26:46 2014 @@ -32,6 +32,8 @@ PIG-3480: TFile-based tmpfile compressio BUG FIXES +PIG-3347: Store invocation brings side effect (daijy) + PIG-3670: Fix assert in Pig script (daijy) PIG-3741: Utils.setTmpFileCompressionOnConf can cause side effect for SequenceFileInterStorage (aniket486) Modified: pig/branches/branch-0.12/src/org/apache/pig/newplan/logical/relational/LOForEach.java URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/newplan/logical/relational/LOForEach.java?rev=1566082&r1=1566081&r2=1566082&view=diff ============================================================================== --- pig/branches/branch-0.12/src/org/apache/pig/newplan/logical/relational/LOForEach.java (original) +++ pig/branches/branch-0.12/src/org/apache/pig/newplan/logical/relational/LOForEach.java Sat Feb 8 18:26:46 2014 @@ -77,44 +77,25 @@ public class LOForEach extends LogicalRe } // Find the LOInnerLoad of the inner plan corresponding to the project, and - // also find whether there is a LOForEach in inner plan along the way + // also find whether there is a relational operator in inner plan along the way public static Pair, Boolean> findReacheableInnerLoadFromBoundaryProject(ProjectExpression project) throws FrontendException { boolean needNewUid = false; - LogicalRelationalOperator referred = project.findReferent(); - // If it is nested foreach, generate new uid - if (referred instanceof LOForEach) - needNewUid = true; - List srcs = referred.getPlan().getSources(); List innerLoads = new ArrayList(); - for (Operator src:srcs) { - if (src instanceof LOInnerLoad) { - if( src == referred ) { - innerLoads.add( (LOInnerLoad)src ); - continue; - } - - Deque stack = new LinkedList(); - List succs = referred.getPlan().getSuccessors( src ); - if( succs != null ) { - for( Operator succ : succs ) { - stack.push( succ ); - } - } - - while( !stack.isEmpty() ) { - Operator op = stack.pop(); - if( op == referred ) { - innerLoads.add((LOInnerLoad)src); - break; - } - else { - List ops = referred.getPlan().getSuccessors( op ); - if( ops != null ) { - for( Operator o : ops ) { - stack.push( o ); - } - } - } + LogicalRelationalOperator referred = project.findReferent(); + Deque stack = new LinkedList(); + stack.add(referred); + while( !stack.isEmpty() ) { + Operator op = stack.pop(); + if (op instanceof LOInnerLoad) { + innerLoads.add((LOInnerLoad)op); + } + else if (!(op instanceof LOGenerate)) { + needNewUid = true; + } + List ops = referred.getPlan().getPredecessors( op ); + if( ops != null ) { + for( Operator o : ops ) { + stack.push( o ); } } } Modified: pig/branches/branch-0.12/test/org/apache/pig/test/TestNewPlanFilterRule.java URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/test/org/apache/pig/test/TestNewPlanFilterRule.java?rev=1566082&r1=1566081&r2=1566082&view=diff ============================================================================== --- pig/branches/branch-0.12/test/org/apache/pig/test/TestNewPlanFilterRule.java (original) +++ pig/branches/branch-0.12/test/org/apache/pig/test/TestNewPlanFilterRule.java Sat Feb 8 18:26:46 2014 @@ -51,6 +51,7 @@ import org.apache.pig.newplan.logical.re import org.apache.pig.newplan.logical.relational.LogicalPlan; import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator; import org.apache.pig.newplan.logical.relational.LogicalSchema; +import org.apache.pig.newplan.logical.rules.FilterAboveForeach; import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter; import org.apache.pig.newplan.logical.rules.MergeFilter; import org.apache.pig.newplan.logical.rules.PushUpFilter; @@ -570,6 +571,110 @@ public class TestNewPlanFilterRule { } + /** + * Test that filter cannot get pushed up over nested Distinct (see PIG-3347) + */ + @Test + public void testFilterAfterNestedDistinct() throws Exception { + String query = "a = LOAD 'file.txt';" + + "a_group = group a by $0;" + + "b = foreach a_group { a_distinct = distinct a.$0;generate group, a_distinct;}" + + "c = filter b by SIZE(a_distinct) == 1;" + + "store c into 'empty';"; + + // filter should not be pushed above nested distinct, + //ie expect - loload -> locogroup -> foreach -> filter + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); + newLogicalPlan.explain(System.out, "text", true); + + Operator load = newLogicalPlan.getSources().get( 0 ); + Assert.assertTrue( load instanceof LOLoad ); + Operator cogroup = newLogicalPlan.getSuccessors( load ).get( 0 ); + Assert.assertTrue( cogroup instanceof LOCogroup ); + Operator foreach = newLogicalPlan.getSuccessors(cogroup).get( 0 ); + Assert.assertTrue( foreach instanceof LOForEach ); + Operator filter = newLogicalPlan.getSuccessors(foreach).get( 0 ); + Assert.assertTrue( filter instanceof LOFilter ); + } + + /** + * Test that filter cannot get pushed up over nested Limit (see PIG-3347) + */ + @Test + public void testFilterAfterNestedLimit() throws Exception { + String query = "a = LOAD 'file.txt';" + + "a_group = group a by $0;" + + "b = foreach a_group { a_limit = limit a.$0 5;generate group, a_limit;}" + + "c = filter b by SIZE(a_limit) == 1;" + + "store c into 'empty';"; + + // filter should not be pushed above nested distinct, + //ie expect - loload -> locogroup -> foreach -> filter + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); + newLogicalPlan.explain(System.out, "text", true); + + Operator load = newLogicalPlan.getSources().get( 0 ); + Assert.assertTrue( load instanceof LOLoad ); + Operator cogroup = newLogicalPlan.getSuccessors( load ).get( 0 ); + Assert.assertTrue( cogroup instanceof LOCogroup ); + Operator foreach = newLogicalPlan.getSuccessors(cogroup).get( 0 ); + Assert.assertTrue( foreach instanceof LOForEach ); + Operator filter = newLogicalPlan.getSuccessors(foreach).get( 0 ); + Assert.assertTrue( filter instanceof LOFilter ); + } + + /** + * Test that filter cannot get pushed up over nested Filter (see PIG-3347) + */ + @Test + public void testFilterAfterNestedFilter() throws Exception { + String query = "a = LOAD 'file.txt';" + + "a_group = group a by $0;" + + "b = foreach a_group { a_filter = filter a by $0 == 1;generate group, a_filter;}" + + "c = filter b by SIZE(a_filter) == 1;" + + "store c into 'empty';"; + + // filter should not be pushed above nested distinct, + //ie expect - loload -> locogroup -> foreach -> filter + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); + newLogicalPlan.explain(System.out, "text", true); + + Operator load = newLogicalPlan.getSources().get( 0 ); + Assert.assertTrue( load instanceof LOLoad ); + Operator cogroup = newLogicalPlan.getSuccessors( load ).get( 0 ); + Assert.assertTrue( cogroup instanceof LOCogroup ); + Operator foreach = newLogicalPlan.getSuccessors(cogroup).get( 0 ); + Assert.assertTrue( foreach instanceof LOForEach ); + Operator filter = newLogicalPlan.getSuccessors(foreach).get( 0 ); + Assert.assertTrue( filter instanceof LOFilter ); + } + + /** + * Test that filter does not get blocked for PushUpFilter/FilterAboveForeach + * by an unrelated nested filter (see PIG-3347) + */ + @Test + public void testFilterAfterUnrelatedNestedFilter() throws Exception { + String query = "a = LOAD 'file.txt' as (a0:int, a1_bag:bag{(X:int)}, a2_bag:bag{(Y:int)});" + + "b = foreach a { a1_filter = filter a1_bag by X == 1; generate a0, a1_filter, a2_bag;}" + + "c = filter b by SIZE(a2_bag) == 1;" + + "store c into 'empty';"; + + // filter should be pushed above nested filter, + //ie expect - loload -> locogroup -> foreach -> filter + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); + newLogicalPlan.explain(System.out, "text", true); + + Operator load = newLogicalPlan.getSources().get( 0 ); + Assert.assertTrue( load instanceof LOLoad ); + Operator foreach1 = newLogicalPlan.getSuccessors(load).get( 0 ); + Assert.assertTrue( foreach1 instanceof LOForEach ); + Operator filter = newLogicalPlan.getSuccessors( foreach1 ).get( 0 ); + Assert.assertTrue( filter instanceof LOFilter ); + Operator foreach2 = newLogicalPlan.getSuccessors(filter).get( 0 ); + Assert.assertTrue( foreach2 instanceof LOForEach ); + } + private LogicalPlan migrateAndOptimizePlan(String query) throws Exception { PigServer pigServer = new PigServer(pc); LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query); @@ -602,6 +707,11 @@ public class TestNewPlanFilterRule { r = new PushUpFilter( "PushUpFilter" ); s.add(r); ls.add(s); + + s = new HashSet(); + r = new FilterAboveForeach( "PushUpFilter" ); + s.add(r); + ls.add(s); return ls; }