pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jcove...@apache.org
Subject svn commit: r1403934 - in /pig/branches/branch-0.11: CHANGES.txt src/org/apache/pig/impl/util/ObjectSerializer.java test/org/apache/pig/test/TestMRCompiler.java test/org/apache/pig/test/data/GoldenFiles/MRC18.gld
Date Tue, 30 Oct 2012 23:52:08 GMT
Author: jcoveney
Date: Tue Oct 30 23:52:07 2012
New Revision: 1403934

URL: http://svn.apache.org/viewvc?rev=1403934&view=rev
Log:
PIG-3017: Pigs object serialization should use compression (jcoveney)

Modified:
    pig/branches/branch-0.11/CHANGES.txt
    pig/branches/branch-0.11/src/org/apache/pig/impl/util/ObjectSerializer.java
    pig/branches/branch-0.11/test/org/apache/pig/test/TestMRCompiler.java
    pig/branches/branch-0.11/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld

Modified: pig/branches/branch-0.11/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/CHANGES.txt?rev=1403934&r1=1403933&r2=1403934&view=diff
==============================================================================
--- pig/branches/branch-0.11/CHANGES.txt (original)
+++ pig/branches/branch-0.11/CHANGES.txt Tue Oct 30 23:52:07 2012
@@ -310,6 +310,8 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-3017: Pig's object serialization should use compression (jcoveney)
+
 PIG-2968: ColumnMapKeyPrune fails to prune a subtree inside foreach (knoguchi via cheolsoo)
 
 PIG-2999: Regression after PIG-2975: BinInterSedesTupleRawComparator secondary sort failing
(knoguchi via azaroth)

Modified: pig/branches/branch-0.11/src/org/apache/pig/impl/util/ObjectSerializer.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/src/org/apache/pig/impl/util/ObjectSerializer.java?rev=1403934&r1=1403933&r2=1403934&view=diff
==============================================================================
--- pig/branches/branch-0.11/src/org/apache/pig/impl/util/ObjectSerializer.java (original)
+++ pig/branches/branch-0.11/src/org/apache/pig/impl/util/ObjectSerializer.java Tue Oct 30
23:52:07 2012
@@ -1,14 +1,12 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -24,19 +22,25 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
 
+import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 public class ObjectSerializer {
-
     private static final Log log = LogFactory.getLog(ObjectSerializer.class);
-    
+
     public static String serialize(Serializable obj) throws IOException {
-        if (obj == null) return "";
+        if (obj == null)
+            return "";
         try {
             ByteArrayOutputStream serialObj = new ByteArrayOutputStream();
-            ObjectOutputStream objStream = new ObjectOutputStream(serialObj);
+            Deflater def = new Deflater(Deflater.BEST_COMPRESSION);
+            ObjectOutputStream objStream = new ObjectOutputStream(new DeflaterOutputStream(
+                    serialObj, def));
             objStream.writeObject(obj);
             objStream.close();
             return encodeBytes(serialObj.toByteArray());
@@ -44,38 +48,24 @@ public class ObjectSerializer {
             throw new IOException("Serialization error: " + e.getMessage(), e);
         }
     }
-    
+
     public static Object deserialize(String str) throws IOException {
-        if (str == null || str.length() == 0) return null;
+        if (str == null || str.length() == 0)
+            return null;
         try {
             ByteArrayInputStream serialObj = new ByteArrayInputStream(decodeBytes(str));
-            ObjectInputStream objStream = new ObjectInputStream(serialObj);
+            ObjectInputStream objStream = new ObjectInputStream(new InflaterInputStream(serialObj));
             return objStream.readObject();
         } catch (Exception e) {
             throw new IOException("Deserialization error: " + e.getMessage(), e);
         }
     }
-    
+
     public static String encodeBytes(byte[] bytes) {
-        StringBuffer strBuf = new StringBuffer();
-    
-        for (int i = 0; i < bytes.length; i++) {
-            strBuf.append((char) (((bytes[i] >> 4) & 0xF) + ((int) 'a')));
-            strBuf.append((char) (((bytes[i]) & 0xF) + ((int) 'a')));
-        }
-        
-        return strBuf.toString();
+        return Base64.encodeBase64URLSafeString(bytes);
     }
-    
+
     public static byte[] decodeBytes(String str) {
-        byte[] bytes = new byte[str.length() / 2];
-        for (int i = 0; i < str.length(); i+=2) {
-            char c = str.charAt(i);
-            bytes[i/2] = (byte) ((c - 'a') << 4);
-            c = str.charAt(i+1);
-            bytes[i/2] += (c - 'a');
-        }
-        return bytes;
+        return Base64.decodeBase64(str);
     }
-
 }

Modified: pig/branches/branch-0.11/test/org/apache/pig/test/TestMRCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/test/org/apache/pig/test/TestMRCompiler.java?rev=1403934&r1=1403933&r2=1403934&view=diff
==============================================================================
--- pig/branches/branch-0.11/test/org/apache/pig/test/TestMRCompiler.java (original)
+++ pig/branches/branch-0.11/test/org/apache/pig/test/TestMRCompiler.java Tue Oct 30 23:52:07
2012
@@ -19,7 +19,6 @@ package org.apache.pig.test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.io.ByteArrayOutputStream;
 import java.io.FileInputStream;
@@ -39,34 +38,45 @@ import org.apache.pig.FuncSpec;
 import org.apache.pig.IndexableLoadFunc;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.builtin.AVG;
-import org.apache.pig.builtin.COUNT;
-import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.builtin.SUM;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.builtin.GFCross;
-import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.LimitAdjuster;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.builtin.AVG;
+import org.apache.pig.builtin.COUNT;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.builtin.SUM;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.GFCross;
 import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.test.junit.OrderedJUnit4Runner;
 import org.apache.pig.test.junit.OrderedJUnit4Runner.TestOrder;
 import org.apache.pig.test.utils.GenPhyOp;
-
-import org.junit.After;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
@@ -74,14 +84,13 @@ import org.junit.runner.RunWith;
  * Test cases to test the MRCompiler.
  * VERY IMPORTANT NOTE: The tests here compare results with a
  * "golden" set of outputs. In each testcase here, the operators
- * generated have a random operator key which uses Java's Random 
+ * generated have a random operator key which uses Java's Random
  * class. So if there is a code change which changes the number of
  * operators created in a plan, then not only will the "golden" file
  * for that test case need to be changed, but also for the tests
  * that follow it since the operator keys that will be generated through
  * Random will be different.
  */
-
 @RunWith(OrderedJUnit4Runner.class)
 @TestOrder({
     "testRun1",
@@ -101,7 +110,7 @@ import org.junit.runner.RunWith;
     "testDistinct1",
     "testLimit",
     "testMRCompilerErr",
-    "testMRCompilerErr1",      
+    "testMRCompilerErr1",
     "testNumReducersInLimit",
     "testNumReducersInLimitWithParallel",
     "testUDFInJoin",
@@ -115,27 +124,16 @@ import org.junit.runner.RunWith;
     "testSchemaInStoreForDistinctLimit" })
 public class TestMRCompiler {
     static MiniCluster cluster = MiniCluster.buildCluster();
-    
+
     static PigContext pc;
     static PigContext pcMR;
 
     static final int MAX_SIZE = 100000;
 
     static final long SEED = 1013;
-    
-    static Random r;
-    static{
-        pc = new PigContext(ExecType.LOCAL, new Properties());
-        pcMR = new PigContext(ExecType.MAPREDUCE, cluster.getProperties());
-        try {
-            pc.connect();
-        } catch (ExecException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        }
-        r = new Random(SEED);
-    }
-    
+
+    static final Random r = new Random(SEED);
+
     PigServer pigServer = null;
     PigServer pigServerMR = null;
 
@@ -146,20 +144,23 @@ public class TestMRCompiler {
     // and are sure of
     private boolean generate = false;
 
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        pc = new PigContext(ExecType.LOCAL, new Properties());
+        pcMR = new PigContext(ExecType.MAPREDUCE, cluster.getProperties());
+        pc.connect();
+    }
+
     @Before
     public void setUp() throws ExecException {
         GenPhyOp.setR(r);
-        
+
         GenPhyOp.setPc(pc);
         NodeIdGenerator.getGenerator().reset("");
-        pigServer = new PigServer( pc );
-        pigServerMR = new PigServer( pcMR );
+        pigServer = new PigServer(pc);
+        pigServerMR = new PigServer(pcMR);
     }
 
-    @After
-    public void tearDown() throws Exception {
-    }
-    
     @Test
     public void testRun1() throws Exception {
         PhysicalPlan php = new PhysicalPlan();
@@ -447,7 +448,6 @@ public class TestMRCompiler {
         POStore st = GenPhyOp.topStoreOp();
         php.addAsLeaf(st);
         run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC13.gld");
-
     }
 
     @Test
@@ -527,7 +527,6 @@ public class TestMRCompiler {
         POStore st = GenPhyOp.topStoreOp();
         php.addAsLeaf(st);
         run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC14.gld");
-
     }
 
      // Tests Single input case for both blocking and non-blocking
@@ -560,7 +559,6 @@ public class TestMRCompiler {
 
         php.connect(fl, st);
         run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC1.gld");
-
     }
 
     @Test
@@ -581,7 +579,6 @@ public class TestMRCompiler {
 
         php.connect(un, st);
         run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC2.gld");
-
     }
 
     @Test
@@ -774,13 +771,13 @@ public class TestMRCompiler {
         php.addAsLeaf(st);
         run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC9.gld");
     }
-    
+
     @Test
     public void testSortUDF1() throws Exception {
         PhysicalPlan php = new PhysicalPlan();
         PhysicalPlan ldFil1 = GenPhyOp.loadedFilter();
         php.merge(ldFil1);
-        
+
         // set up order by *
         String funcName = WeirdComparator.class.getName();
         POUserComparisonFunc comparator = new POUserComparisonFunc(
@@ -794,18 +791,18 @@ public class TestMRCompiler {
         topPrj.setOverloaded(true);
         topPrj.setResultType(DataType.TUPLE);
         nesSortPlan.add(topPrj);
-        
+
         POProject prjStar2 = new POProject(new OperatorKey("", r.nextLong()));
         prjStar2.setResultType(DataType.TUPLE);
         prjStar2.setStar(true);
         nesSortPlan.add(prjStar2);
-        
+
         nesSortPlan.connect(topPrj, prjStar2);
         List<PhysicalPlan> nesSortPlanLst = new ArrayList<PhysicalPlan>();
         nesSortPlanLst.add(nesSortPlan);
-        
+
         sort.setSortPlans(nesSortPlanLst);
-        
+
         php.add(sort);
         php.connect(ldFil1.getLeaves().get(0), sort);
         // have a foreach which takes the sort output
@@ -816,18 +813,18 @@ public class TestMRCompiler {
         POForEach fe3 = GenPhyOp.topForEachOPWithUDF(udfs);
         php.add(fe3);
         php.connect(sort, fe3);
-        
+
         // add a group above the foreach
         PhysicalPlan grpChain1 = GenPhyOp.grpChain();
         php.merge(grpChain1);
         php.connect(fe3,grpChain1.getRoots().get(0));
-        
-        
+
+
         udfs.clear();
         udfs.add(AVG.class.getName());
         POForEach fe4 = GenPhyOp.topForEachOPWithUDF(udfs);
         php.addAsLeaf(fe4);
-        
+
         PhysicalPlan grpChain2 = GenPhyOp.grpChain();
         php.merge(grpChain2);
         php.connect(fe4,grpChain2.getRoots().get(0));
@@ -836,36 +833,36 @@ public class TestMRCompiler {
         udfs.add(GFCross.class.getName());
         POForEach fe5 = GenPhyOp.topForEachOPWithUDF(udfs);
         php.addAsLeaf(fe5);
-        
+
         POStore st = GenPhyOp.topStoreOp();
         php.addAsLeaf(st);
         run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC15.gld");
     }
-    
+
     @Test
     public void testDistinct1() throws Exception {
         PhysicalPlan php = new PhysicalPlan();
         PhysicalPlan ldFil1 = GenPhyOp.loadedFilter();
         php.merge(ldFil1);
-        
+
         PODistinct op = new PODistinct(new OperatorKey("", r.nextLong()),
                 -1, null);
-        
+
         php.addAsLeaf(op);
-        
+
         PhysicalPlan grpChain1 = GenPhyOp.grpChain();
         php.merge(grpChain1);
         php.connect(op,grpChain1.getRoots().get(0));
-        
+
         PODistinct op1 = new PODistinct(new OperatorKey("", r.nextLong()),
                 -1, null);
-        
+
         php.addAsLeaf(op1);
         POStore st = GenPhyOp.topStoreOp();
         php.addAsLeaf(st);
         run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC16.gld");
     }
-    
+
     @Test
     public void testLimit() throws Exception {
         PhysicalPlan php = new PhysicalPlan();
@@ -883,37 +880,37 @@ public class TestMRCompiler {
         php.addAsLeaf(st);
         run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC17.gld");
     }
-    
-    @Test
+
+    @Test(expected = MRCompilerException.class)
     public void testMRCompilerErr() throws Exception {
     	String query = "a = load 'input';" +
     	"b = filter a by $0 > 5;" +
     	"store b into 'output';";
-    	
+
     	PhysicalPlan pp = Util.buildPp(pigServer, query);
     	pp.remove(pp.getRoots().get(0));
     	try {
     		Util.buildMRPlan(new PhysicalPlan(), pc);
-    		fail("Expected failure.");
     	} catch (MRCompilerException mrce) {
-    		assertTrue(mrce.getErrorCode() == 2053);
+    		assertEquals(2053, mrce.getErrorCode());
+    		throw mrce;
     	}
     }
 
-    @Test
-    public void testMRCompilerErr1() throws Exception {   	
+    @Test(expected = MRCompilerException.class)
+    public void testMRCompilerErr1() throws Exception {
         PhysicalPlan pp = new PhysicalPlan();
         PhysicalPlan ldFil1 = GenPhyOp.loadedFilter();
         pp.merge(ldFil1);
-        
+
         POSplit op = GenPhyOp.topSplitOp();
         pp.addAsLeaf(op);
 
     	try {
     		Util.buildMRPlan(pp, pc);
-    		fail("Expected failure.");
     	} catch (MRCompilerException mrce) {
-    		assertTrue(mrce.getErrorCode() == 2025);
+    		assertEquals(2025, mrce.getErrorCode());
+    		throw mrce;
     	}
     }
 
@@ -929,44 +926,44 @@ public class TestMRCompiler {
     	"b = order a by $0;" +
     	"c = limit b 10;" +
     	"store c into 'output';";
-    	
+
     	PhysicalPlan pp = Util.buildPp(pigServer, query);
     	MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
     	MapReduceOper mrOper = mrPlan.getRoots().get(0);
     	int count = 1;
-    	
+
     	while(mrPlan.getSuccessors(mrOper) != null) {
     		mrOper = mrPlan.getSuccessors(mrOper).get(0);
     		++count;
-    	}        
-    	assertTrue(count == 3);
+    	}
+    	assertEquals(3, count);
     }
-    
+
     /**
      * Test to ensure that the order by with parallel followed by a limit, i.e., top k
      * always produces the correct number of map reduce jobs
      */
     @Test
     public void testNumReducersInLimitWithParallel() throws Exception {
-    	String query = "a = load 'input';" + 
+    	String query = "a = load 'input';" +
     	"b = order a by $0 parallel 2;" +
     	"c = limit b 10;" + "store c into 'output';";
-    	
+
     	PhysicalPlan pp = Util.buildPp(pigServerMR, query);
     	MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-    	
+
     	LimitAdjuster la = new LimitAdjuster(mrPlan, pc);
         la.visit();
         la.adjust();
 
     	MapReduceOper mrOper = mrPlan.getRoots().get(0);
     	int count = 1;
-    	
+
     	while(mrPlan.getSuccessors(mrOper) != null) {
     		mrOper = mrPlan.getSuccessors(mrOper).get(0);
     		++count;
-    	}        
-    	assertTrue(count == 4);
+    	}
+    	assertEquals(4, count);
     }
 
     @Test
@@ -974,13 +971,13 @@ public class TestMRCompiler {
         String query = "a = load 'input1' using BinStorage();" +
         "b = load 'input2';" +
         "c = join a by $0, b by $0;" + "store c into 'output';";
-        
+
         PhysicalPlan pp = Util.buildPp(pigServer, query);
         MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
         MapReduceOper mrOper = mrPlan.getRoots().get(0);
-        
-        assertTrue(mrOper.UDFs.size()==2);
-        assertTrue(mrOper.UDFs.size()==2);
+
+        assertEquals(2, mrOper.UDFs.size());
+        assertEquals(2, mrOper.UDFs.size());
         assertTrue(mrOper.UDFs.contains("BinStorage"));
         assertTrue(mrOper.UDFs.contains("org.apache.pig.builtin.PigStorage"));
     }
@@ -991,47 +988,42 @@ public class TestMRCompiler {
         "b = load '/tmp/input2';" +
         "c = join a by $0, b by $0 using 'merge';" +
         "store c into '/tmp/output1';";
-        
+
         PhysicalPlan pp = Util.buildPp(pigServer, query);
         run(pp, "test/org/apache/pig/test/data/GoldenFiles/MRC18.gld");
     }
-    
-    public static class WeirdComparator extends ComparisonFunc {
 
+    public static class WeirdComparator extends ComparisonFunc {
         @Override
         public int compare(Tuple t1, Tuple t2) {
-            // TODO Auto-generated method stub
             int result = 0;
             try {
                 int i1 = (Integer) t1.get(1);
                 int i2 = (Integer) t2.get(1);
                 result = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
             } catch (ExecException e) {
-                // TODO Auto-generated catch block
-                e.printStackTrace();
+                throw new RuntimeException(e);
             }
             return result;
         }
-
     }
-    
+
     @Test
     public void testMergeJoinWithIndexableLoadFunc() throws Exception{
         String query = "a = load 'input1';" +
         "b = load 'input2' using " +
             TestMergeJoin.DummyIndexableLoader.class.getName() + ";" +
         "c = join a by $0, b by $0 using 'merge';" + "store c into 'output';";
-        
+
         PhysicalPlan pp = Util.buildPp(pigServer, query);
         MROperPlan mp = Util.buildMRPlan(pp, pc);
         assertEquals("Checking number of MR Jobs for merge join with " +
         		"IndexableLoadFunc:", 1, mp.size());
-        
     }
-    
+
     @Test
     public void testCastFuncShipped() throws Exception{
-        String query = "a = load 'input1' using " + PigStorageNoDefCtor.class.getName() +

+        String query = "a = load 'input1' using " + PigStorageNoDefCtor.class.getName() +
                 "('\t') as (a0, a1, a2);" +
         "b = group a by a0;" +
         "c = foreach b generate flatten(a);" +
@@ -1043,34 +1035,34 @@ public class TestMRCompiler {
         MapReduceOper op = mp.getLeaves().get(0);
         assertTrue(op.UDFs.contains(new FuncSpec(PigStorageNoDefCtor.class.getName())+"('\t')"));
     }
-    
+
     @Test
     public void testLimitAdjusterFuncShipped() throws Exception{
-        String query = "a = load 'input';" + 
+        String query = "a = load 'input';" +
         "b = order a by $0 parallel 2;" +
         "c = limit b 7;" + "store c into 'output' using "
                 + PigStorageNoDefCtor.class.getName() + "('\t');";
-         
+
         PhysicalPlan pp = Util.buildPp(pigServerMR, query);
         MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-        
+
         LimitAdjuster la = new LimitAdjuster(mrPlan, pc);
         la.visit();
         la.adjust();
-        
+
         MapReduceOper mrOper = mrPlan.getRoots().get(0);
         int count = 1;
-        
+
         while(mrPlan.getSuccessors(mrOper) != null) {
             mrOper = mrPlan.getSuccessors(mrOper).get(0);
             ++count;
-        }        
-        assertTrue(count == 4);
+        }
+        assertEquals(4, count);
 
         MapReduceOper op = mrPlan.getLeaves().get(0);
         assertTrue(op.UDFs.contains(new FuncSpec(PigStorageNoDefCtor.class.getName())+"('\t')"));
     }
-    
+
     /**
      * Test that POSortedDistinct gets printed as POSortedDistinct
      * @throws Exception
@@ -1080,7 +1072,7 @@ public class TestMRCompiler {
         PhysicalPlan php = new PhysicalPlan();
         PhysicalPlan grpChain1 = GenPhyOp.loadedGrpChain();
         php.merge(grpChain1);
-        
+
         List<PhysicalPlan> inputs = new LinkedList<PhysicalPlan>();
         PhysicalPlan inplan = new PhysicalPlan();
         PODistinct op1 = new POSortedDistinct(new OperatorKey("", r.nextLong()),
@@ -1089,7 +1081,7 @@ public class TestMRCompiler {
         inputs.add(inplan);
         List<Boolean> toFlattens = new ArrayList<Boolean>();
         toFlattens.add(false);
-        POForEach pofe = new POForEach(new OperatorKey("", r.nextLong()), 1, 
+        POForEach pofe = new POForEach(new OperatorKey("", r.nextLong()), 1,
                 inputs, toFlattens);
 
         php.addAsLeaf(pofe);
@@ -1097,7 +1089,7 @@ public class TestMRCompiler {
         php.addAsLeaf(st);
         run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC19.gld");
     }
-    
+
     private void run(PhysicalPlan pp, String expectedFile) throws Exception {
         String compiledPlan, goldenPlan = null;
         int MAX_SIZE = 100000;
@@ -1110,7 +1102,7 @@ public class TestMRCompiler {
         ppp.print(baos);
         compiledPlan = baos.toString();
 
-        if(generate ){
+        if(generate){
             FileOutputStream fos = new FileOutputStream(expectedFile);
             fos.write(baos.toByteArray());
             return;
@@ -1147,7 +1139,7 @@ public class TestMRCompiler {
         public void ensureAllKeyInstancesInSameSplit() throws IOException {
         }
     }
-    
+
     public static class TestIndexableLoadFunc extends PigStorage implements IndexableLoadFunc
{
         @Override
         public void initialize(Configuration conf) throws IOException {
@@ -1161,56 +1153,56 @@ public class TestMRCompiler {
         public void close() throws IOException {
         }
     }
-    
+
     @Test
     public void testUDFInMergedCoGroup() throws Exception {
         String query = "a = load 'input1' using " + TestCollectableLoadFunc.class.getName()
+ "();" +
             "b = load 'input2' using " + TestIndexableLoadFunc.class.getName() + "();" +
             "c = cogroup a by $0, b by $0 using 'merge';" +
             "store c into 'output';";
-        
+
         PhysicalPlan pp = Util.buildPp(pigServer, query);
         MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
         MapReduceOper mrOper = mrPlan.getRoots().get(0);
-        
+
         assertTrue(mrOper.UDFs.contains(TestCollectableLoadFunc.class.getName()));
         mrOper = mrPlan.getSuccessors(mrOper).get(0);
         assertTrue(mrOper.UDFs.contains(TestCollectableLoadFunc.class.getName()));
         assertTrue(mrOper.UDFs.contains(TestIndexableLoadFunc.class.getName()));
     }
-    
+
     @Test
     public void testUDFInMergedJoin() throws Exception {
-        String query = "a = load 'input1';" + 
+        String query = "a = load 'input1';" +
             "b = load 'input2' using " + TestIndexableLoadFunc.class.getName() + "();" +
             "c = join a by $0, b by $0 using 'merge';" +
             "store c into 'output';";
-        
+
         PhysicalPlan pp = Util.buildPp(pigServer, query);
         MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
         MapReduceOper mrOper = mrPlan.getRoots().get(0);
-        
+
         assertTrue(mrOper.UDFs.contains(TestIndexableLoadFunc.class.getName()));
     }
-    
+
     //PIG-2146
     @Test
     public void testSchemaInStoreForDistinctLimit() throws Exception {
         //test if the POStore in the 2nd mr plan (that stores the actual output)
-        // has a schema 
-        String query = "a = load 'input1' as (a : int,b :float ,c : int);" + 
+        // has a schema
+        String query = "a = load 'input1' as (a : int,b :float ,c : int);" +
             "b  = distinct a;" +
             "c = limit b 10;" +
             "store c into 'output';";
-        
+
         PhysicalPlan pp = Util.buildPp(pigServer, query);
         MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
         MapReduceOper secondMrOper = mrPlan.getLeaves().get(0);
         POStore store = (POStore)secondMrOper.reducePlan.getLeaves().get(0);
         assertEquals(
-                "compare load and store schema", 
-                store.getSchema(), 
+                "compare load and store schema",
+                store.getSchema(),
                 Utils.getSchemaFromString("a : int,b :float ,c : int")
         );
     }
-}
+}
\ No newline at end of file

Modified: pig/branches/branch-0.11/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld?rev=1403934&r1=1403933&r2=1403934&view=diff
==============================================================================
--- pig/branches/branch-0.11/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld (original)
+++ pig/branches/branch-0.11/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld Tue Oct 30
23:52:07 2012
@@ -18,4 +18,4 @@ Reduce Plan Empty
     |   |   |
     |   |   Project[tuple][*] - scope-111
     |   |
-    |   |---b: Load(/tmp/input2:org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer('org.apache.pig.builtin.PigStorage','kmonaaafhdhcaabdgkgbhggbcohfhegjgmcoebhchcgbhjemgjhdhehiibncbnjjmhgbjnadaaabejaaaehdgjhkgfhihaaaaaaaabhhaeaaaaaaabhdhcaaeogphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccohagmgbgohdcofagihjhdgjgdgbgmfagmgbgoaaaaaaaaaaaaaaabacaaacfkaaangfgogeepggebgmgmejgohahfheemaaafgphagngbhaheaacdemgphcghcpgbhagbgdgigfcphagjghcpgjgnhagmcphfhegjgmcpenhfgmhegjengbhadlhihcaacfgphcghcogbhagbgdgigfcohagjghcogjgnhagmcohagmgbgocoephagfhcgbhegphcfagmgbgoimlohdjmoaemmdgiacaaaiemaaakgneghcgpgnefgeghgfhdhbaahoaaademaaafgnelgfhjhdheaaapemgkgbhggbcphfhegjgmcpengbhadlemaaahgnemgfgbhggfhdheaabaemgkgbhggbcphfhegjgmcpemgjhdhedlemaaaegnephahdhbaahoaaafemaaaggnfcgpgphehdhbaahoaaagemaaaognfdgpggheeghcgpgnefgeghgfhdhbaahoaaademaaamgnfdgpgghefegpefgeghgfhdhbaahoaaademaaaignfegpefgeghgfhdhbaahoaaad
 hihahdhcaacbgphcghcogbhagbgdgigfcohagjghcogjgnhagmcohfhegjgmcoenhfgmhegjengbhaaaaaaaaaaaaaaaacacaaabemaaaegnengbhahbaahoaaafhihahdhcaabbgkgbhggbcohfhegjgmcoeigbhdgiengbhaafahnkmbmdbgganbadaaacegaaakgmgpgbgeeggbgdhegphcejaaajhegihcgfhdgigpgmgehihadpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaaahihdhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaabhdhcaacegphcghcogbhagbgdgigfcohagjghcogjgnhagmcohagmgbgocoephagfhcgbhegphcelgfhjaaaaaaaaaaaaaaabacaaacekaaacgjgeemaaafhdgdgphagfheaabcemgkgbhggbcpgmgbgoghcpfdhehcgjgoghdlhihaaaaaaaaaaaaaaagiheaaafhdgdgphagfhdhcaafjgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccogfhihahcgfhdhdgjgpgoephagfhcgbhegphchdcofaepfahcgpgkgfgdheaaaaaaaaaaaaaaabacaaagfkaaaogjhdfahcgpgkgfgdhefegpefgogefkaaakgphggfhcgmgpgbgegfgefkaabfhahcgpgdgfhdhdgjgoghecgbghepggfehfhagmgfhdfkaabehcgfhdhfgmhefdgjgoghgmgffehfhagmgfecgbghejaaaihdhegbhcheedgpgmemaaahgdgpgmhfgngohdheaabfemgkgbhggbcphfhegjgmcpebhchcgbhjemgjhdhed
 lhihcaagcgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccogfhihahcgfhdhdgjgpgoephagfhcgbhegphchdcoefhihahcgfhdhdgjgpgoephagfhcgbhegphcaaaaaaaaaaaaaaabacaaaahihcaaemgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccofagihjhdgjgdgbgmephagfhcgbhegphcaaaaaaaaaaaaaaabacaaamfkaaafgbgdgdhfgnfkaaangjgohahfheebhehegbgdgigfgeejaabehcgfhbhfgfhdhegfgefagbhcgbgmgmgfgmgjhdgnecaaakhcgfhdhfgmhefehjhagfemaaafgbgmgjgbhdhbaahoaaaoemaaafgjgohahfheheaablemgphcghcpgbhagbgdgigfcphagjghcpgegbhegbcpfehfhagmgfdlemaaaggjgohahfhehdhbaahoaaagemaaangmgjgogfgbghgffehcgbgdgfhcheaachemgphcghcpgbhagbgdgigfcphagjghcphagfgocphfhegjgmcpemgjgogfgbghgffehcgbgdgfhcdlemaabbgphcgjghgjgogbgmemgpgdgbhegjgpgohdhbaahoaaagemaaahgphfhehahfhehdhbaahoaaagemaaakhagbhcgfgohefagmgbgoheaafaemgphcghcpgbhagbgdgigfcphagjghcpgcgbgdglgfgogecpgigbgegpgphacpgfhigfgdhfhegjgpgogfgoghgjgogfcphagihjhd
 gjgdgbgmemgbhjgfhccphagmgbgohdcpfagihjhdgjgdgbgmfagmgbgodlemaaadhcgfhdheaaeeemgphcghcpgbhagbgdgigfcphagjghcpgcgbgdglgfgogecpgigbgegpgphacpgfhigfgdhfhegjgpgogfgoghgjgogfcphagihjhdgjgdgbgmemgbhjgfhccpfcgfhdhfgmhedlhihcaacbgphcghcogbhagbgdgigfcohagjghcogjgnhagmcohagmgbgocoephagfhcgbhegphcaaaaaaaaaaaaaaabacaaabemaaaegnelgfhjheaacgemgphcghcpgbhagbgdgigfcphagjghcpgjgnhagmcphagmgbgocpephagfhcgbhegphcelgfhjdlhihahbaahoaaapaaaappppppppdchahahahahdhbaahoaaaaaaaaaaaahhaeaaaaaaakhihahahdhcaaecgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccofcgfhdhfgmheaaaaaaaaaaaaaaabacaaacecaaamhcgfhehfhcgofdhegbhehfhdemaaaghcgfhdhfgmheheaabcemgkgbhggbcpgmgbgoghcpepgcgkgfgdhedlhihaachaaaaaaaaaaaaaaaaahdhbaahoaaaaaaaaaaabhhaeaaaaaaakhdhcaabbgkgbhggbcogmgbgoghcoejgohegfghgfhcbcockakephibihdiacaaabejaaafhggbgmhfgfhihcaabagkgbhggbcogmgbgoghcoeohfgngcgfhcigkmjfbnaljeoailacaaaahihaaaaaaaaahihihdhbaahoaaaaaaaaaaabhhaeaaaaaaakhbaahoaablhih
 dhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaabhbaahoaablhbaahoaaaphihdhbaahoaaaaaaaaaaaahhaeaaaaaaakhihdhbaahoaaaihdhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaaahihdhbaahoaaaihdhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaaahihdhbaahoaaaihdhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaaahiaahahi','','b','scope','true'))
- scope-102
\ No newline at end of file
+    |   |---b: Load(/tmp/input2:org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer('org.apache.pig.builtin.PigStorage','eNqtVTtvE0EQnlzixJiQhGeDKBCv7k6iQqKABIgwHNgiaXDF5G5zPti7XXb3wpkCiQYKKKFAAomCkt-AhCiooaRC9NSUMLu248MBUZgtLO_M7jy-_b65t9-hphXsv41b6Bcm5f6yUtgLU23Kh5-PvPiIr6ZhqgkzOr3PSgkAU_dm7C9dui5U4qPEqMt8mSb-BkZ3WB77XYyFkD4rWUQRRc7yJM3pSLen0wh5iD2mfMkx1357YGvTDvprygOvA3soUGtzmfNmLgsTQk3IDKWBYyElDfpJA0oapJnkgS08uFZwk15DebZUcGKsNHvKpfRbkik0QtmMT9_pl1_DD10P6iE0slUlsktxwvRdeADTlDO7ynrawGJo0RkkofghzGUhwy1GvqWKz4JGzpmsJV2IWgiz2Q0hjNvNhrCQrYlNM55m3lnXRdVWz6r7UhLaR__UknuxYeMDAD0PpmwVZHFVuNt7Rw98GXWXfLW5L-8_HLr1aRq8VWhwgfEqRgRME3aZrmK6K3hcynPnXcz5e3X6XbJ_S1dTY4fDMuL4P2EnRCvvfAW8NCagdSQkM7CvDyadT4I1o9I8OVsOu-qawTFKc3MS4hGLqTtN7mFNRMNWW4nbLDKj2mY7sJDqgZkeI4870BBbTFmkGG0OSiUiGyhPVjBpba4XkjPdgQMUnR5kjeycOSO5m1DXBpW5IDixJxK8yHJiz8EKe7Z1Z_m78b87vLTDNmoVKGE4ScKhineGnu9ADaOoyEjRqVXysjE2R9y0ON0tSEIsbqNCzhlPdbYCjT586z3JiBjIU3R8W6CNC2Dg8PgIiNFg4JAm9c26U0PF7eFULSZsXW
 HElIFT43cly4firRykOHuFSqlValFEaDsfhpwThalkaEhULDd2nBhoj4cfwBj0YQzGYAx-gzFwIzGojkSqY1rZMXNxosA3HKSWV0f_KdDR27khQno1cPKPQ9deCyq6Jq1aTBYBftI6LWk5kNxyH41GKe0sWpmEa_1eKjNkBeYVM4XK1wyaQhMD-gz6fZy0NqyOqURvOFBgu7j-F62xPSXtBb-ZG5Ywte_b6zc_Hj4-49mPYG0LecEIxqXRuetFtsHUo7fPj-x-9vWJFZNLUJbj4e328F-Hp_M6-ModoDlD_S83YUI3yPIXqjl9HQ','','b_43-1','scope','true'))
- scope-102
\ No newline at end of file



Mime
View raw message