Return-Path: X-Original-To: apmail-pig-commits-archive@www.apache.org Delivered-To: apmail-pig-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 19397D61A for ; Tue, 30 Oct 2012 23:52:54 +0000 (UTC) Received: (qmail 26237 invoked by uid 500); 30 Oct 2012 23:52:54 -0000 Delivered-To: apmail-pig-commits-archive@pig.apache.org Received: (qmail 26201 invoked by uid 500); 30 Oct 2012 23:52:54 -0000 Mailing-List: contact commits-help@pig.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pig.apache.org Delivered-To: mailing list commits@pig.apache.org Received: (qmail 26189 invoked by uid 99); 30 Oct 2012 23:52:53 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Oct 2012 23:52:53 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Oct 2012 23:52:51 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 4146523888E4 for ; Tue, 30 Oct 2012 23:52:08 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1403934 - in /pig/branches/branch-0.11: CHANGES.txt src/org/apache/pig/impl/util/ObjectSerializer.java test/org/apache/pig/test/TestMRCompiler.java test/org/apache/pig/test/data/GoldenFiles/MRC18.gld Date: Tue, 30 Oct 2012 23:52:08 -0000 To: commits@pig.apache.org From: jcoveney@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121030235208.4146523888E4@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jcoveney Date: Tue Oct 30 23:52:07 2012 New Revision: 1403934 URL: http://svn.apache.org/viewvc?rev=1403934&view=rev Log: PIG-3017: Pigs object serialization should use compression (jcoveney) Modified: pig/branches/branch-0.11/CHANGES.txt pig/branches/branch-0.11/src/org/apache/pig/impl/util/ObjectSerializer.java pig/branches/branch-0.11/test/org/apache/pig/test/TestMRCompiler.java pig/branches/branch-0.11/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld Modified: pig/branches/branch-0.11/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/CHANGES.txt?rev=1403934&r1=1403933&r2=1403934&view=diff ============================================================================== --- pig/branches/branch-0.11/CHANGES.txt (original) +++ pig/branches/branch-0.11/CHANGES.txt Tue Oct 30 23:52:07 2012 @@ -310,6 +310,8 @@ OPTIMIZATIONS BUG FIXES +PIG-3017: Pig's object serialization should use compression (jcoveney) + PIG-2968: ColumnMapKeyPrune fails to prune a subtree inside foreach (knoguchi via cheolsoo) PIG-2999: Regression after PIG-2975: BinInterSedesTupleRawComparator secondary sort failing (knoguchi via azaroth) Modified: pig/branches/branch-0.11/src/org/apache/pig/impl/util/ObjectSerializer.java URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/src/org/apache/pig/impl/util/ObjectSerializer.java?rev=1403934&r1=1403933&r2=1403934&view=diff ============================================================================== --- pig/branches/branch-0.11/src/org/apache/pig/impl/util/ObjectSerializer.java (original) +++ pig/branches/branch-0.11/src/org/apache/pig/impl/util/ObjectSerializer.java Tue Oct 30 23:52:07 2012 @@ -1,14 +1,12 @@ /* * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -24,19 +22,25 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; +import java.util.zip.Deflater; +import java.util.zip.DeflaterOutputStream; +import java.util.zip.InflaterInputStream; +import org.apache.commons.codec.binary.Base64; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; public class ObjectSerializer { - private static final Log log = LogFactory.getLog(ObjectSerializer.class); - + public static String serialize(Serializable obj) throws IOException { - if (obj == null) return ""; + if (obj == null) + return ""; try { ByteArrayOutputStream serialObj = new ByteArrayOutputStream(); - ObjectOutputStream objStream = new ObjectOutputStream(serialObj); + Deflater def = new Deflater(Deflater.BEST_COMPRESSION); + ObjectOutputStream objStream = new ObjectOutputStream(new DeflaterOutputStream( + serialObj, def)); objStream.writeObject(obj); objStream.close(); return encodeBytes(serialObj.toByteArray()); @@ -44,38 +48,24 @@ public class ObjectSerializer { throw new IOException("Serialization error: " + e.getMessage(), e); } } - + public static Object deserialize(String str) throws IOException { - if (str == null || str.length() == 0) return null; + if (str == null || str.length() == 0) + return null; try { ByteArrayInputStream serialObj = new ByteArrayInputStream(decodeBytes(str)); - ObjectInputStream objStream = new ObjectInputStream(serialObj); + ObjectInputStream objStream = new ObjectInputStream(new InflaterInputStream(serialObj)); return objStream.readObject(); } catch (Exception e) { throw new IOException("Deserialization error: " + e.getMessage(), e); } } - + public static String encodeBytes(byte[] bytes) { - StringBuffer strBuf = new StringBuffer(); - - for (int i = 0; i < bytes.length; i++) { - strBuf.append((char) (((bytes[i] >> 4) & 0xF) + ((int) 'a'))); - strBuf.append((char) (((bytes[i]) & 0xF) + ((int) 'a'))); - } - - return strBuf.toString(); + return Base64.encodeBase64URLSafeString(bytes); } - + public static byte[] decodeBytes(String str) { - byte[] bytes = new byte[str.length() / 2]; - for (int i = 0; i < str.length(); i+=2) { - char c = str.charAt(i); - bytes[i/2] = (byte) ((c - 'a') << 4); - c = str.charAt(i+1); - bytes[i/2] += (c - 'a'); - } - return bytes; + return Base64.decodeBase64(str); } - } Modified: pig/branches/branch-0.11/test/org/apache/pig/test/TestMRCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/test/org/apache/pig/test/TestMRCompiler.java?rev=1403934&r1=1403933&r2=1403934&view=diff ============================================================================== --- pig/branches/branch-0.11/test/org/apache/pig/test/TestMRCompiler.java (original) +++ pig/branches/branch-0.11/test/org/apache/pig/test/TestMRCompiler.java Tue Oct 30 23:52:07 2012 @@ -19,7 +19,6 @@ package org.apache.pig.test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.io.ByteArrayOutputStream; import java.io.FileInputStream; @@ -39,34 +38,45 @@ import org.apache.pig.FuncSpec; import org.apache.pig.IndexableLoadFunc; import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.builtin.AVG; -import org.apache.pig.builtin.COUNT; -import org.apache.pig.builtin.PigStorage; -import org.apache.pig.builtin.SUM; -import org.apache.pig.data.DataType; -import org.apache.pig.data.Tuple; -import org.apache.pig.impl.PigContext; -import org.apache.pig.impl.builtin.GFCross; -import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.LimitAdjuster; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion; +import org.apache.pig.builtin.AVG; +import org.apache.pig.builtin.COUNT; +import org.apache.pig.builtin.PigStorage; +import org.apache.pig.builtin.SUM; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.builtin.GFCross; import org.apache.pig.impl.plan.NodeIdGenerator; +import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.util.Utils; import org.apache.pig.test.junit.OrderedJUnit4Runner; import org.apache.pig.test.junit.OrderedJUnit4Runner.TestOrder; import org.apache.pig.test.utils.GenPhyOp; - -import org.junit.After; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; @@ -74,14 +84,13 @@ import org.junit.runner.RunWith; * Test cases to test the MRCompiler. * VERY IMPORTANT NOTE: The tests here compare results with a * "golden" set of outputs. In each testcase here, the operators - * generated have a random operator key which uses Java's Random + * generated have a random operator key which uses Java's Random * class. So if there is a code change which changes the number of * operators created in a plan, then not only will the "golden" file * for that test case need to be changed, but also for the tests * that follow it since the operator keys that will be generated through * Random will be different. */ - @RunWith(OrderedJUnit4Runner.class) @TestOrder({ "testRun1", @@ -101,7 +110,7 @@ import org.junit.runner.RunWith; "testDistinct1", "testLimit", "testMRCompilerErr", - "testMRCompilerErr1", + "testMRCompilerErr1", "testNumReducersInLimit", "testNumReducersInLimitWithParallel", "testUDFInJoin", @@ -115,27 +124,16 @@ import org.junit.runner.RunWith; "testSchemaInStoreForDistinctLimit" }) public class TestMRCompiler { static MiniCluster cluster = MiniCluster.buildCluster(); - + static PigContext pc; static PigContext pcMR; static final int MAX_SIZE = 100000; static final long SEED = 1013; - - static Random r; - static{ - pc = new PigContext(ExecType.LOCAL, new Properties()); - pcMR = new PigContext(ExecType.MAPREDUCE, cluster.getProperties()); - try { - pc.connect(); - } catch (ExecException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - r = new Random(SEED); - } - + + static final Random r = new Random(SEED); + PigServer pigServer = null; PigServer pigServerMR = null; @@ -146,20 +144,23 @@ public class TestMRCompiler { // and are sure of private boolean generate = false; + @BeforeClass + public static void setUpBeforeClass() throws Exception { + pc = new PigContext(ExecType.LOCAL, new Properties()); + pcMR = new PigContext(ExecType.MAPREDUCE, cluster.getProperties()); + pc.connect(); + } + @Before public void setUp() throws ExecException { GenPhyOp.setR(r); - + GenPhyOp.setPc(pc); NodeIdGenerator.getGenerator().reset(""); - pigServer = new PigServer( pc ); - pigServerMR = new PigServer( pcMR ); + pigServer = new PigServer(pc); + pigServerMR = new PigServer(pcMR); } - @After - public void tearDown() throws Exception { - } - @Test public void testRun1() throws Exception { PhysicalPlan php = new PhysicalPlan(); @@ -447,7 +448,6 @@ public class TestMRCompiler { POStore st = GenPhyOp.topStoreOp(); php.addAsLeaf(st); run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC13.gld"); - } @Test @@ -527,7 +527,6 @@ public class TestMRCompiler { POStore st = GenPhyOp.topStoreOp(); php.addAsLeaf(st); run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC14.gld"); - } // Tests Single input case for both blocking and non-blocking @@ -560,7 +559,6 @@ public class TestMRCompiler { php.connect(fl, st); run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC1.gld"); - } @Test @@ -581,7 +579,6 @@ public class TestMRCompiler { php.connect(un, st); run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC2.gld"); - } @Test @@ -774,13 +771,13 @@ public class TestMRCompiler { php.addAsLeaf(st); run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC9.gld"); } - + @Test public void testSortUDF1() throws Exception { PhysicalPlan php = new PhysicalPlan(); PhysicalPlan ldFil1 = GenPhyOp.loadedFilter(); php.merge(ldFil1); - + // set up order by * String funcName = WeirdComparator.class.getName(); POUserComparisonFunc comparator = new POUserComparisonFunc( @@ -794,18 +791,18 @@ public class TestMRCompiler { topPrj.setOverloaded(true); topPrj.setResultType(DataType.TUPLE); nesSortPlan.add(topPrj); - + POProject prjStar2 = new POProject(new OperatorKey("", r.nextLong())); prjStar2.setResultType(DataType.TUPLE); prjStar2.setStar(true); nesSortPlan.add(prjStar2); - + nesSortPlan.connect(topPrj, prjStar2); List nesSortPlanLst = new ArrayList(); nesSortPlanLst.add(nesSortPlan); - + sort.setSortPlans(nesSortPlanLst); - + php.add(sort); php.connect(ldFil1.getLeaves().get(0), sort); // have a foreach which takes the sort output @@ -816,18 +813,18 @@ public class TestMRCompiler { POForEach fe3 = GenPhyOp.topForEachOPWithUDF(udfs); php.add(fe3); php.connect(sort, fe3); - + // add a group above the foreach PhysicalPlan grpChain1 = GenPhyOp.grpChain(); php.merge(grpChain1); php.connect(fe3,grpChain1.getRoots().get(0)); - - + + udfs.clear(); udfs.add(AVG.class.getName()); POForEach fe4 = GenPhyOp.topForEachOPWithUDF(udfs); php.addAsLeaf(fe4); - + PhysicalPlan grpChain2 = GenPhyOp.grpChain(); php.merge(grpChain2); php.connect(fe4,grpChain2.getRoots().get(0)); @@ -836,36 +833,36 @@ public class TestMRCompiler { udfs.add(GFCross.class.getName()); POForEach fe5 = GenPhyOp.topForEachOPWithUDF(udfs); php.addAsLeaf(fe5); - + POStore st = GenPhyOp.topStoreOp(); php.addAsLeaf(st); run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC15.gld"); } - + @Test public void testDistinct1() throws Exception { PhysicalPlan php = new PhysicalPlan(); PhysicalPlan ldFil1 = GenPhyOp.loadedFilter(); php.merge(ldFil1); - + PODistinct op = new PODistinct(new OperatorKey("", r.nextLong()), -1, null); - + php.addAsLeaf(op); - + PhysicalPlan grpChain1 = GenPhyOp.grpChain(); php.merge(grpChain1); php.connect(op,grpChain1.getRoots().get(0)); - + PODistinct op1 = new PODistinct(new OperatorKey("", r.nextLong()), -1, null); - + php.addAsLeaf(op1); POStore st = GenPhyOp.topStoreOp(); php.addAsLeaf(st); run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC16.gld"); } - + @Test public void testLimit() throws Exception { PhysicalPlan php = new PhysicalPlan(); @@ -883,37 +880,37 @@ public class TestMRCompiler { php.addAsLeaf(st); run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC17.gld"); } - - @Test + + @Test(expected = MRCompilerException.class) public void testMRCompilerErr() throws Exception { String query = "a = load 'input';" + "b = filter a by $0 > 5;" + "store b into 'output';"; - + PhysicalPlan pp = Util.buildPp(pigServer, query); pp.remove(pp.getRoots().get(0)); try { Util.buildMRPlan(new PhysicalPlan(), pc); - fail("Expected failure."); } catch (MRCompilerException mrce) { - assertTrue(mrce.getErrorCode() == 2053); + assertEquals(2053, mrce.getErrorCode()); + throw mrce; } } - @Test - public void testMRCompilerErr1() throws Exception { + @Test(expected = MRCompilerException.class) + public void testMRCompilerErr1() throws Exception { PhysicalPlan pp = new PhysicalPlan(); PhysicalPlan ldFil1 = GenPhyOp.loadedFilter(); pp.merge(ldFil1); - + POSplit op = GenPhyOp.topSplitOp(); pp.addAsLeaf(op); try { Util.buildMRPlan(pp, pc); - fail("Expected failure."); } catch (MRCompilerException mrce) { - assertTrue(mrce.getErrorCode() == 2025); + assertEquals(2025, mrce.getErrorCode()); + throw mrce; } } @@ -929,44 +926,44 @@ public class TestMRCompiler { "b = order a by $0;" + "c = limit b 10;" + "store c into 'output';"; - + PhysicalPlan pp = Util.buildPp(pigServer, query); MROperPlan mrPlan = Util.buildMRPlan(pp, pc); MapReduceOper mrOper = mrPlan.getRoots().get(0); int count = 1; - + while(mrPlan.getSuccessors(mrOper) != null) { mrOper = mrPlan.getSuccessors(mrOper).get(0); ++count; - } - assertTrue(count == 3); + } + assertEquals(3, count); } - + /** * Test to ensure that the order by with parallel followed by a limit, i.e., top k * always produces the correct number of map reduce jobs */ @Test public void testNumReducersInLimitWithParallel() throws Exception { - String query = "a = load 'input';" + + String query = "a = load 'input';" + "b = order a by $0 parallel 2;" + "c = limit b 10;" + "store c into 'output';"; - + PhysicalPlan pp = Util.buildPp(pigServerMR, query); MROperPlan mrPlan = Util.buildMRPlan(pp, pc); - + LimitAdjuster la = new LimitAdjuster(mrPlan, pc); la.visit(); la.adjust(); MapReduceOper mrOper = mrPlan.getRoots().get(0); int count = 1; - + while(mrPlan.getSuccessors(mrOper) != null) { mrOper = mrPlan.getSuccessors(mrOper).get(0); ++count; - } - assertTrue(count == 4); + } + assertEquals(4, count); } @Test @@ -974,13 +971,13 @@ public class TestMRCompiler { String query = "a = load 'input1' using BinStorage();" + "b = load 'input2';" + "c = join a by $0, b by $0;" + "store c into 'output';"; - + PhysicalPlan pp = Util.buildPp(pigServer, query); MROperPlan mrPlan = Util.buildMRPlan(pp, pc); MapReduceOper mrOper = mrPlan.getRoots().get(0); - - assertTrue(mrOper.UDFs.size()==2); - assertTrue(mrOper.UDFs.size()==2); + + assertEquals(2, mrOper.UDFs.size()); + assertEquals(2, mrOper.UDFs.size()); assertTrue(mrOper.UDFs.contains("BinStorage")); assertTrue(mrOper.UDFs.contains("org.apache.pig.builtin.PigStorage")); } @@ -991,47 +988,42 @@ public class TestMRCompiler { "b = load '/tmp/input2';" + "c = join a by $0, b by $0 using 'merge';" + "store c into '/tmp/output1';"; - + PhysicalPlan pp = Util.buildPp(pigServer, query); run(pp, "test/org/apache/pig/test/data/GoldenFiles/MRC18.gld"); } - - public static class WeirdComparator extends ComparisonFunc { + public static class WeirdComparator extends ComparisonFunc { @Override public int compare(Tuple t1, Tuple t2) { - // TODO Auto-generated method stub int result = 0; try { int i1 = (Integer) t1.get(1); int i2 = (Integer) t2.get(1); result = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50); } catch (ExecException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + throw new RuntimeException(e); } return result; } - } - + @Test public void testMergeJoinWithIndexableLoadFunc() throws Exception{ String query = "a = load 'input1';" + "b = load 'input2' using " + TestMergeJoin.DummyIndexableLoader.class.getName() + ";" + "c = join a by $0, b by $0 using 'merge';" + "store c into 'output';"; - + PhysicalPlan pp = Util.buildPp(pigServer, query); MROperPlan mp = Util.buildMRPlan(pp, pc); assertEquals("Checking number of MR Jobs for merge join with " + "IndexableLoadFunc:", 1, mp.size()); - } - + @Test public void testCastFuncShipped() throws Exception{ - String query = "a = load 'input1' using " + PigStorageNoDefCtor.class.getName() + + String query = "a = load 'input1' using " + PigStorageNoDefCtor.class.getName() + "('\t') as (a0, a1, a2);" + "b = group a by a0;" + "c = foreach b generate flatten(a);" + @@ -1043,34 +1035,34 @@ public class TestMRCompiler { MapReduceOper op = mp.getLeaves().get(0); assertTrue(op.UDFs.contains(new FuncSpec(PigStorageNoDefCtor.class.getName())+"('\t')")); } - + @Test public void testLimitAdjusterFuncShipped() throws Exception{ - String query = "a = load 'input';" + + String query = "a = load 'input';" + "b = order a by $0 parallel 2;" + "c = limit b 7;" + "store c into 'output' using " + PigStorageNoDefCtor.class.getName() + "('\t');"; - + PhysicalPlan pp = Util.buildPp(pigServerMR, query); MROperPlan mrPlan = Util.buildMRPlan(pp, pc); - + LimitAdjuster la = new LimitAdjuster(mrPlan, pc); la.visit(); la.adjust(); - + MapReduceOper mrOper = mrPlan.getRoots().get(0); int count = 1; - + while(mrPlan.getSuccessors(mrOper) != null) { mrOper = mrPlan.getSuccessors(mrOper).get(0); ++count; - } - assertTrue(count == 4); + } + assertEquals(4, count); MapReduceOper op = mrPlan.getLeaves().get(0); assertTrue(op.UDFs.contains(new FuncSpec(PigStorageNoDefCtor.class.getName())+"('\t')")); } - + /** * Test that POSortedDistinct gets printed as POSortedDistinct * @throws Exception @@ -1080,7 +1072,7 @@ public class TestMRCompiler { PhysicalPlan php = new PhysicalPlan(); PhysicalPlan grpChain1 = GenPhyOp.loadedGrpChain(); php.merge(grpChain1); - + List inputs = new LinkedList(); PhysicalPlan inplan = new PhysicalPlan(); PODistinct op1 = new POSortedDistinct(new OperatorKey("", r.nextLong()), @@ -1089,7 +1081,7 @@ public class TestMRCompiler { inputs.add(inplan); List toFlattens = new ArrayList(); toFlattens.add(false); - POForEach pofe = new POForEach(new OperatorKey("", r.nextLong()), 1, + POForEach pofe = new POForEach(new OperatorKey("", r.nextLong()), 1, inputs, toFlattens); php.addAsLeaf(pofe); @@ -1097,7 +1089,7 @@ public class TestMRCompiler { php.addAsLeaf(st); run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC19.gld"); } - + private void run(PhysicalPlan pp, String expectedFile) throws Exception { String compiledPlan, goldenPlan = null; int MAX_SIZE = 100000; @@ -1110,7 +1102,7 @@ public class TestMRCompiler { ppp.print(baos); compiledPlan = baos.toString(); - if(generate ){ + if(generate){ FileOutputStream fos = new FileOutputStream(expectedFile); fos.write(baos.toByteArray()); return; @@ -1147,7 +1139,7 @@ public class TestMRCompiler { public void ensureAllKeyInstancesInSameSplit() throws IOException { } } - + public static class TestIndexableLoadFunc extends PigStorage implements IndexableLoadFunc { @Override public void initialize(Configuration conf) throws IOException { @@ -1161,56 +1153,56 @@ public class TestMRCompiler { public void close() throws IOException { } } - + @Test public void testUDFInMergedCoGroup() throws Exception { String query = "a = load 'input1' using " + TestCollectableLoadFunc.class.getName() + "();" + "b = load 'input2' using " + TestIndexableLoadFunc.class.getName() + "();" + "c = cogroup a by $0, b by $0 using 'merge';" + "store c into 'output';"; - + PhysicalPlan pp = Util.buildPp(pigServer, query); MROperPlan mrPlan = Util.buildMRPlan(pp, pc); MapReduceOper mrOper = mrPlan.getRoots().get(0); - + assertTrue(mrOper.UDFs.contains(TestCollectableLoadFunc.class.getName())); mrOper = mrPlan.getSuccessors(mrOper).get(0); assertTrue(mrOper.UDFs.contains(TestCollectableLoadFunc.class.getName())); assertTrue(mrOper.UDFs.contains(TestIndexableLoadFunc.class.getName())); } - + @Test public void testUDFInMergedJoin() throws Exception { - String query = "a = load 'input1';" + + String query = "a = load 'input1';" + "b = load 'input2' using " + TestIndexableLoadFunc.class.getName() + "();" + "c = join a by $0, b by $0 using 'merge';" + "store c into 'output';"; - + PhysicalPlan pp = Util.buildPp(pigServer, query); MROperPlan mrPlan = Util.buildMRPlan(pp, pc); MapReduceOper mrOper = mrPlan.getRoots().get(0); - + assertTrue(mrOper.UDFs.contains(TestIndexableLoadFunc.class.getName())); } - + //PIG-2146 @Test public void testSchemaInStoreForDistinctLimit() throws Exception { //test if the POStore in the 2nd mr plan (that stores the actual output) - // has a schema - String query = "a = load 'input1' as (a : int,b :float ,c : int);" + + // has a schema + String query = "a = load 'input1' as (a : int,b :float ,c : int);" + "b = distinct a;" + "c = limit b 10;" + "store c into 'output';"; - + PhysicalPlan pp = Util.buildPp(pigServer, query); MROperPlan mrPlan = Util.buildMRPlan(pp, pc); MapReduceOper secondMrOper = mrPlan.getLeaves().get(0); POStore store = (POStore)secondMrOper.reducePlan.getLeaves().get(0); assertEquals( - "compare load and store schema", - store.getSchema(), + "compare load and store schema", + store.getSchema(), Utils.getSchemaFromString("a : int,b :float ,c : int") ); } -} +} \ No newline at end of file Modified: pig/branches/branch-0.11/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld?rev=1403934&r1=1403933&r2=1403934&view=diff ============================================================================== --- pig/branches/branch-0.11/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld (original) +++ pig/branches/branch-0.11/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld Tue Oct 30 23:52:07 2012 @@ -18,4 +18,4 @@ Reduce Plan Empty | | | | | Project[tuple][*] - scope-111 | | - | |---b: Load(/tmp/input2:org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer('org.apache.pig.builtin.PigStorage','kmonaaafhdhcaabdgkgbhggbcohfhegjgmcoebhchcgbhjemgjhdhehiibncbnjjmhgbjnadaaabejaaaehdgjhkgfhihaaaaaaaabhhaeaaaaaaabhdhcaaeogphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccohagmgbgohdcofagihjhdgjgdgbgmfagmgbgoaaaaaaaaaaaaaaabacaaacfkaaangfgogeepggebgmgmejgohahfheemaaafgphagngbhaheaacdemgphcghcpgbhagbgdgigfcphagjghcpgjgnhagmcphfhegjgmcpenhfgmhegjengbhadlhihcaacfgphcghcogbhagbgdgigfcohagjghcogjgnhagmcohagmgbgocoephagfhcgbhegphcfagmgbgoimlohdjmoaemmdgiacaaaiemaaakgneghcgpgnefgeghgfhdhbaahoaaademaaafgnelgfhjhdheaaapemgkgbhggbcphfhegjgmcpengbhadlemaaahgnemgfgbhggfhdheaabaemgkgbhggbcphfhegjgmcpemgjhdhedlemaaaegnephahdhbaahoaaafemaaaggnfcgpgphehdhbaahoaaagemaaaognfdgpggheeghcgpgnefgeghgfhdhbaahoaaademaaamgnfdgpgghefegpefgeghgfhdhbaahoaaademaaaignfegpefgeghgfhdhbaahoaaad hihahdhcaacbgphcghcogbhagbgdgigfcohagjghcogjgnhagmcohfhegjgmcoenhfgmhegjengbhaaaaaaaaaaaaaaaacacaaabemaaaegnengbhahbaahoaaafhihahdhcaabbgkgbhggbcohfhegjgmcoeigbhdgiengbhaafahnkmbmdbgganbadaaacegaaakgmgpgbgeeggbgdhegphcejaaajhegihcgfhdgigpgmgehihadpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaaahihdhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaabhdhcaacegphcghcogbhagbgdgigfcohagjghcogjgnhagmcohagmgbgocoephagfhcgbhegphcelgfhjaaaaaaaaaaaaaaabacaaacekaaacgjgeemaaafhdgdgphagfheaabcemgkgbhggbcpgmgbgoghcpfdhehcgjgoghdlhihaaaaaaaaaaaaaaagiheaaafhdgdgphagfhdhcaafjgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccogfhihahcgfhdhdgjgpgoephagfhcgbhegphchdcofaepfahcgpgkgfgdheaaaaaaaaaaaaaaabacaaagfkaaaogjhdfahcgpgkgfgdhefegpefgogefkaaakgphggfhcgmgpgbgegfgefkaabfhahcgpgdgfhdhdgjgoghecgbghepggfehfhagmgfhdfkaabehcgfhdhfgmhefdgjgoghgmgffehfhagmgfecgbghejaaaihdhegbhcheedgpgmemaaahgdgpgmhfgngohdheaabfemgkgbhggbcphfhegjgmcpebhchcgbhjemgjhdhed lhihcaagcgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccogfhihahcgfhdhdgjgpgoephagfhcgbhegphchdcoefhihahcgfhdhdgjgpgoephagfhcgbhegphcaaaaaaaaaaaaaaabacaaaahihcaaemgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccofagihjhdgjgdgbgmephagfhcgbhegphcaaaaaaaaaaaaaaabacaaamfkaaafgbgdgdhfgnfkaaangjgohahfheebhehegbgdgigfgeejaabehcgfhbhfgfhdhegfgefagbhcgbgmgmgfgmgjhdgnecaaakhcgfhdhfgmhefehjhagfemaaafgbgmgjgbhdhbaahoaaaoemaaafgjgohahfheheaablemgphcghcpgbhagbgdgigfcphagjghcpgegbhegbcpfehfhagmgfdlemaaaggjgohahfhehdhbaahoaaagemaaangmgjgogfgbghgffehcgbgdgfhcheaachemgphcghcpgbhagbgdgigfcphagjghcphagfgocphfhegjgmcpemgjgogfgbghgffehcgbgdgfhcdlemaabbgphcgjghgjgogbgmemgpgdgbhegjgpgohdhbaahoaaagemaaahgphfhehahfhehdhbaahoaaagemaaakhagbhcgfgohefagmgbgoheaafaemgphcghcpgbhagbgdgigfcphagjghcpgcgbgdglgfgogecpgigbgegpgphacpgfhigfgdhfhegjgpgogfgoghgjgogfcphagihjhd gjgdgbgmemgbhjgfhccphagmgbgohdcpfagihjhdgjgdgbgmfagmgbgodlemaaadhcgfhdheaaeeemgphcghcpgbhagbgdgigfcphagjghcpgcgbgdglgfgogecpgigbgegpgphacpgfhigfgdhfhegjgpgogfgoghgjgogfcphagihjhdgjgdgbgmemgbhjgfhccpfcgfhdhfgmhedlhihcaacbgphcghcogbhagbgdgigfcohagjghcogjgnhagmcohagmgbgocoephagfhcgbhegphcaaaaaaaaaaaaaaabacaaabemaaaegnelgfhjheaacgemgphcghcpgbhagbgdgigfcphagjghcpgjgnhagmcphagmgbgocpephagfhcgbhegphcelgfhjdlhihahbaahoaaapaaaappppppppdchahahahahdhbaahoaaaaaaaaaaaahhaeaaaaaaakhihahahdhcaaecgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccofcgfhdhfgmheaaaaaaaaaaaaaaabacaaacecaaamhcgfhehfhcgofdhegbhehfhdemaaaghcgfhdhfgmheheaabcemgkgbhggbcpgmgbgoghcpepgcgkgfgdhedlhihaachaaaaaaaaaaaaaaaaahdhbaahoaaaaaaaaaaabhhaeaaaaaaakhdhcaabbgkgbhggbcogmgbgoghcoejgohegfghgfhcbcockakephibihdiacaaabejaaafhggbgmhfgfhihcaabagkgbhggbcogmgbgoghcoeohfgngcgfhcigkmjfbnaljeoailacaaaahihaaaaaaaaahihihdhbaahoaaaaaaaaaaabhhaeaaaaaaakhbaahoaablhih dhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaabhbaahoaablhbaahoaaaphihdhbaahoaaaaaaaaaaaahhaeaaaaaaakhihdhbaahoaaaihdhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaaahihdhbaahoaaaihdhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaaahihdhbaahoaaaihdhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaaahiaahahi','','b','scope','true')) - scope-102 \ No newline at end of file + | |---b: Load(/tmp/input2:org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer('org.apache.pig.builtin.PigStorage','eNqtVTtvE0EQnlzixJiQhGeDKBCv7k6iQqKABIgwHNgiaXDF5G5zPti7XXb3wpkCiQYKKKFAAomCkt-AhCiooaRC9NSUMLu248MBUZgtLO_M7jy-_b65t9-hphXsv41b6Bcm5f6yUtgLU23Kh5-PvPiIr6ZhqgkzOr3PSgkAU_dm7C9dui5U4qPEqMt8mSb-BkZ3WB77XYyFkD4rWUQRRc7yJM3pSLen0wh5iD2mfMkx1357YGvTDvprygOvA3soUGtzmfNmLgsTQk3IDKWBYyElDfpJA0oapJnkgS08uFZwk15DebZUcGKsNHvKpfRbkik0QtmMT9_pl1_DD10P6iE0slUlsktxwvRdeADTlDO7ynrawGJo0RkkofghzGUhwy1GvqWKz4JGzpmsJV2IWgiz2Q0hjNvNhrCQrYlNM55m3lnXRdVWz6r7UhLaR__UknuxYeMDAD0PpmwVZHFVuNt7Rw98GXWXfLW5L-8_HLr1aRq8VWhwgfEqRgRME3aZrmK6K3hcynPnXcz5e3X6XbJ_S1dTY4fDMuL4P2EnRCvvfAW8NCagdSQkM7CvDyadT4I1o9I8OVsOu-qawTFKc3MS4hGLqTtN7mFNRMNWW4nbLDKj2mY7sJDqgZkeI4870BBbTFmkGG0OSiUiGyhPVjBpba4XkjPdgQMUnR5kjeycOSO5m1DXBpW5IDixJxK8yHJiz8EKe7Z1Z_m78b87vLTDNmoVKGE4ScKhineGnu9ADaOoyEjRqVXysjE2R9y0ON0tSEIsbqNCzhlPdbYCjT586z3JiBjIU3R8W6CNC2Dg8PgIiNFg4JAm9c26U0PF7eFULSZsXW HElIFT43cly4firRykOHuFSqlValFEaDsfhpwThalkaEhULDd2nBhoj4cfwBj0YQzGYAx-gzFwIzGojkSqY1rZMXNxosA3HKSWV0f_KdDR27khQno1cPKPQ9deCyq6Jq1aTBYBftI6LWk5kNxyH41GKe0sWpmEa_1eKjNkBeYVM4XK1wyaQhMD-gz6fZy0NqyOqURvOFBgu7j-F62xPSXtBb-ZG5Ywte_b6zc_Hj4-49mPYG0LecEIxqXRuetFtsHUo7fPj-x-9vWJFZNLUJbj4e328F-Hp_M6-ModoDlD_S83YUI3yPIXqjl9HQ','','b_43-1','scope','true')) - scope-102 \ No newline at end of file