Return-Path: Delivered-To: apmail-hadoop-pig-commits-archive@www.apache.org Received: (qmail 48046 invoked from network); 10 Jun 2010 18:10:39 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 10 Jun 2010 18:10:39 -0000 Received: (qmail 93895 invoked by uid 500); 10 Jun 2010 18:10:39 -0000 Delivered-To: apmail-hadoop-pig-commits-archive@hadoop.apache.org Received: (qmail 93869 invoked by uid 500); 10 Jun 2010 18:10:39 -0000 Mailing-List: contact pig-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: pig-dev@hadoop.apache.org Delivered-To: mailing list pig-commits@hadoop.apache.org Received: (qmail 93862 invoked by uid 500); 10 Jun 2010 18:10:39 -0000 Delivered-To: apmail-incubator-pig-commits@incubator.apache.org Received: (qmail 93859 invoked by uid 99); 10 Jun 2010 18:10:39 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 Jun 2010 18:10:39 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 Jun 2010 18:10:38 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 21961238899C; Thu, 10 Jun 2010 18:09:55 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r953414 - in /hadoop/pig/branches/branch-0.7: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java test/org/apache/pig/test/TestMultiQuery.java Date: Thu, 10 Jun 2010 18:09:55 -0000 To: pig-commits@incubator.apache.org From: rding@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100610180955.21961238899C@eris.apache.org> Author: rding Date: Thu Jun 10 18:09:54 2010 New Revision: 953414 URL: http://svn.apache.org/viewvc?rev=953414&view=rev Log: PIG-1438: [Performance] MultiQueryOptimizer should also merge DISTINCT jobs Modified: hadoop/pig/branches/branch-0.7/CHANGES.txt hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestMultiQuery.java Modified: hadoop/pig/branches/branch-0.7/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/CHANGES.txt?rev=953414&r1=953413&r2=953414&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.7/CHANGES.txt (original) +++ hadoop/pig/branches/branch-0.7/CHANGES.txt Thu Jun 10 18:09:54 2010 @@ -22,6 +22,9 @@ Release 0.7.0 - 2010-05-03 INCOMPATIBLE CHANGES +PIG-1438: [Performance] MultiQueryOptimizer should also merge DISTINCT jobs +(rding) + PIG-1292: Interface Refinements (hashutosh) PIG-1259: ResourceFieldSchema.setSchema should not allow a bag field without a Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=953414&r1=953413&r2=953414&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original) +++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Thu Jun 10 18:09:54 2010 @@ -455,10 +455,12 @@ class MultiQueryOptimizer extends MROpPl return true; } - private List getMergeList(List mapReducers) { + private List getMergeList(MapReduceOper splitter, + List mapReducers) { List mergeNoCmbList = new ArrayList(); List mergeCmbList = new ArrayList(); - + List mergeDistList = new ArrayList(); + for (MapReduceOper mrOp : mapReducers) { if (isSplitteeMergeable(mrOp)) { if (mrOp.combinePlan.isEmpty()) { @@ -466,16 +468,26 @@ class MultiQueryOptimizer extends MROpPl } else { mergeCmbList.add(mrOp); } - } - } - return (mergeNoCmbList.size() > mergeCmbList.size()) ? - mergeNoCmbList : mergeCmbList; + } else if (splitter.reducePlan.isEmpty() + || splitter.needsDistinctCombiner()) { + if (mrOp.needsDistinctCombiner()) { + mergeDistList.add(mrOp); + } + } + } + + int max = Math.max(mergeNoCmbList.size(), mergeCmbList.size()); + max = Math.max(max, mergeDistList.size()); + + if (max == mergeDistList.size()) return mergeDistList; + else if (max == mergeNoCmbList.size()) return mergeNoCmbList; + else return mergeCmbList; } private int mergeMapReduceSplittees(List mapReducers, MapReduceOper splitter, POSplit splitOp) throws VisitorException { - List mergeList = getMergeList(mapReducers); + List mergeList = getMergeList(splitter, mapReducers); if (mergeList.size() <= 1) { @@ -507,7 +519,7 @@ class MultiQueryOptimizer extends MROpPl // MR splittees into the splitter. What we'll do is to merge multiple // splittees (if exists) into a new MR operator and connect it to the splitter. - List mergeList = getMergeList(mapReducers); + List mergeList = getMergeList(splitter, mapReducers); if (mergeList.size() <= 1) { // nothing to merge, just return Modified: hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestMultiQuery.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestMultiQuery.java?rev=953414&r1=953413&r2=953414&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestMultiQuery.java (original) +++ hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestMultiQuery.java Thu Jun 10 18:09:54 2010 @@ -112,6 +112,122 @@ public class TestMultiQuery { } @Test + public void testMultiQueryJiraPig1438() { + + // test case: merge multiple distinct jobs + + String INPUT_FILE = "abc"; + + try { + + PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE)); + w.println("1\t2\t3"); + w.println("2\t3\t4"); + w.println("1\t2\t3"); + w.println("2\t3\t4"); + w.println("1\t2\t3"); + w.close(); + + Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE); + + myPig.setBatchOn(); + + myPig.registerQuery("A = load '" + INPUT_FILE + "' as (col1:int, col2:int, col3:int);"); + myPig.registerQuery("B1 = foreach A generate col1, col2;"); + myPig.registerQuery("B2 = foreach A generate col2, col3;"); + myPig.registerQuery("C1 = distinct B1;"); + myPig.registerQuery("C2 = distinct B2;"); + myPig.registerQuery("D1 = foreach C1 generate col1, col2;"); + myPig.registerQuery("D2 = foreach C2 generate col2, col3;"); + myPig.registerQuery("store D1 into '/tmp/output1';"); + myPig.registerQuery("store D2 into '/tmp/output2';"); + + LogicalPlan lp = checkLogicalPlan(1, 2, 13); + + PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 13); + + checkMRPlan(pp, 1, 1, 1); + + myPig.executeBatch(); + + myPig.registerQuery("E = load '/tmp/output1' as (a:int, b:int);"); + Iterator iter = myPig.openIterator("E"); + + List expectedResults = Util.getTuplesFromConstantTupleStrings( + new String[] { + "(1,2)", + "(2,3)" + }); + + int counter = 0; + while (iter.hasNext()) { + assertEquals(expectedResults.get(counter++).toString(), iter.next().toString()); + } + assertEquals(expectedResults.size(), counter); + + myPig.registerQuery("E = load '/tmp/output2' as (a:int, b:int);"); + iter = myPig.openIterator("E"); + + expectedResults = Util.getTuplesFromConstantTupleStrings( + new String[] { + "(2,3)", + "(3,4)" + }); + + counter = 0; + while (iter.hasNext()) { + assertEquals(expectedResults.get(counter++).toString(), iter.next().toString()); + } + + assertEquals(expectedResults.size(), counter); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } finally { + new File(INPUT_FILE).delete(); + try { + Util.deleteFile(cluster, INPUT_FILE); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(); + } + } + } + + @Test + public void testMultiQueryJiraPig1438_2() { + + // test case: merge multiple distinct jobs -- one group by job, one distinct job + + String INPUT_FILE = "abc"; + + try { + myPig.setBatchOn(); + + myPig.registerQuery("A = load '" + INPUT_FILE + "' as (col1:int, col2:int, col3:int);"); + myPig.registerQuery("B1 = foreach A generate col1, col2;"); + myPig.registerQuery("B2 = foreach A generate col2, col3;"); + myPig.registerQuery("C1 = distinct B1;"); + myPig.registerQuery("C2 = group B2 by (col2, col3);"); + myPig.registerQuery("D1 = foreach C1 generate col1, col2;"); + myPig.registerQuery("D2 = foreach C2 generate B2.col2, B2.col3;"); + myPig.registerQuery("store D1 into '/tmp/output1';"); + myPig.registerQuery("store D2 into '/tmp/output2';"); + + LogicalPlan lp = checkLogicalPlan(1, 2, 13); + + PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 15); + + checkMRPlan(pp, 1, 1, 2); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test public void testMultiQueryJiraPig1252() { // test case: Problems with secondary key optimization and multiquery