pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cheol...@apache.org
Subject svn commit: r1601268 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java test/org/apache/pig/test/TestSplitCombine.java
Date Sun, 08 Jun 2014 23:02:50 GMT
Author: cheolsoo
Date: Sun Jun  8 23:02:49 2014
New Revision: 1601268

URL: http://svn.apache.org/r1601268
Log:
PIG-3986: PigSplit to support multiple split class (tongjie via cheolsoo)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
    pig/trunk/test/org/apache/pig/test/TestSplitCombine.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1601268&r1=1601267&r2=1601268&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Jun  8 23:02:49 2014
@@ -32,6 +32,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-3986: PigSplit to support multiple split class (tongjie via cheolsoo)
+
 PIG-3988: PigStorage: CommandLineParser is not thread safe (tmwoodruff via cheolsoo)
 
 PIG-2409: Pig show wrong tracking URL for hadoop 2 (lbendig via rohini)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=1601268&r1=1601267&r2=1601268&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
Sun Jun  8 23:02:49 2014
@@ -209,7 +209,7 @@ public class PigSplit extends InputSplit
     public long getLength(int idx) throws IOException, InterruptedException {
         return wrappedSplits[idx].getLength();
     }
-    
+
     @SuppressWarnings("unchecked")
     public void readFields(DataInput is) throws IOException {
         disableCounter = is.readBoolean();
@@ -219,23 +219,31 @@ public class PigSplit extends InputSplit
         inputIndex = is.readInt();
         targetOps = (ArrayList<OperatorKey>) readObject(is);
         int splitLen = is.readInt();
-        String splitClassName = is.readUTF();
+        int distinctSplitClassCount = is.readInt();
+        //construct the input split class name list
+        String[] distinctSplitClassName = new String[distinctSplitClassCount];
+        for (int i = 0; i < distinctSplitClassCount; i++) {
+            distinctSplitClassName[i] = is.readUTF();
+        }
         try {
-            Class splitClass = conf.getClassByName(splitClassName);
             SerializationFactory sf = new SerializationFactory(conf);
             // The correct call sequence for Deserializer is, we shall open, then deserialize,
but we shall not close
-            Deserializer d = sf.getDeserializer(splitClass);
-            d.open((InputStream) is);
             wrappedSplits = new InputSplit[splitLen];
             for (int i = 0; i < splitLen; i++)
             {
+                //read the className index
+                int index = is.readInt();
+                //get the split class name
+                String splitClassName = distinctSplitClassName[index];
+                Class splitClass = conf.getClassByName(splitClassName);
+                Deserializer d = sf.getDeserializer(splitClass);
+                d.open((InputStream) is);
                 wrappedSplits[i] = (InputSplit)ReflectionUtils.newInstance(splitClass, conf);
                 d.deserialize(wrappedSplits[i]);
             }
         } catch (ClassNotFoundException e) {
             throw new IOException(e);
         }
-        
     }
 
     @SuppressWarnings("unchecked")
@@ -247,22 +255,36 @@ public class PigSplit extends InputSplit
         os.writeInt(inputIndex);
         writeObject(targetOps, os);
         os.writeInt(wrappedSplits.length);
-        os.writeUTF(wrappedSplits[0].getClass().getName());
+        Set<String> splitClassNameSet = new HashSet<String>();
+        //first get the distinct split class name set
+        for ( int i= 0; i < wrappedSplits.length; i++) {
+            splitClassNameSet.add(wrappedSplits[i].getClass().getName());
+        }
+        List<String> distinctSplitClassList = new ArrayList<String>();
+        distinctSplitClassList.addAll(splitClassNameSet);
+        //write the distinct number of split class name
+        os.writeInt(distinctSplitClassList.size());
+        //write each classname once
+        for (int i = 0 ; i < distinctSplitClassList.size(); i++) {
+            os.writeUTF(distinctSplitClassList.get(i));
+        }
         SerializationFactory sf = new SerializationFactory(conf);
-        Serializer s = 
-            sf.getSerializer(wrappedSplits[0].getClass());
-         
-        //Checks if Serializer is NULL or not before calling open() method on it.       
 
-        if (s == null) {
-            	throw new IllegalArgumentException("Could not find Serializer for class "+wrappedSplits[0].getClass()+".
InputSplits must implement Writable.");
-        }        
-        s.open((OutputStream) os);
+
         for (int i = 0; i < wrappedSplits.length; i++)
         {
+            //find out the index of the split class name
+            int index = distinctSplitClassList.indexOf(wrappedSplits[i].getClass().getName());
+            os.writeInt(index);
+            Serializer s = sf.getSerializer(wrappedSplits[i].getClass());
+            //Checks if Serializer is NULL or not before calling open() method on it.
+            if (s == null) {
+                throw new IllegalArgumentException("Could not find Serializer for class "+wrappedSplits[i].getClass()+".
InputSplits must implement Writable.");
+            }
+            s.open((OutputStream) os);
             // The correct call sequence for Serializer is, we shall open, then serialize,
but we shall not close
             s.serialize(wrappedSplits[i]);
         }
-        
+
     }
 
     private void writeObject(Serializable obj, DataOutput os)
@@ -376,7 +398,8 @@ public class PigSplit extends InputSplit
         try {
             st.append("Total Length = "+ getLength()+"\n");
             for (int i = 0; i < wrappedSplits.length; i++) {
-                st.append("Input split["+i+"]:\n   Length = "+ wrappedSplits[i].getLength()+"\n
 Locations:\n");
+                st.append("Input split["+i+"]:\n   Length = "+ wrappedSplits[i].getLength()+"\n
  ClassName: " +
+                    wrappedSplits[i].getClass().getName() + "\n   Locations:\n");
                 for (String location :  wrappedSplits[i].getLocations())
                     st.append("    "+location+"\n");
                 st.append("\n-----------------------\n"); 

Modified: pig/trunk/test/org/apache/pig/test/TestSplitCombine.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSplitCombine.java?rev=1601268&r1=1601267&r2=1601268&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestSplitCombine.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestSplitCombine.java Sun Jun  8 23:02:49 2014
@@ -17,26 +17,33 @@
  */
 package org.apache.pig.test;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.List;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
+
+import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.impl.plan.OperatorKey;
-
-import junit.framework.Assert;
-import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 
+import parquet.hadoop.ParquetInputSplit;
+import parquet.hadoop.metadata.BlockMetaData;
+
 public class TestSplitCombine {
     private Configuration conf;
     private TestPigInputFormat pigInputFormat;
@@ -76,6 +83,8 @@ public class TestSplitCombine {
     public void setUp() throws Exception {
         conf = new Configuration();
         conf.setLong("pig.maxCombinedSplitSize", 1000);
+      conf.setStrings("io.serializations",
+            "org.apache.hadoop.io.serializer.WritableSerialization");
         pigInputFormat = new TestPigInputFormat();
         ok = new ArrayList<OperatorKey>();
         ok.add(new OperatorKey());
@@ -459,6 +468,105 @@ public class TestSplitCombine {
         }
     }
     
+    @Test
+    public void test10() throws IOException, InterruptedException {
+        // verify locations in order
+        ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
+
+        rawSplits.add(new FileSplit(new Path("path1"), 0, 100, new String[] {
+                "l1", "l2", "l3" }));
+        rawSplits.add(new FileSplit(new Path("path2"), 0, 200, new String[] {
+                "l3", "l4", "l5" }));
+        rawSplits.add(new FileSplit(new Path("path3"), 0, 400, new String[] {
+                "l5", "l6", "l1" }));
+        List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
+                null, true, conf);
+
+        Assert.assertEquals(result.size(), 1);
+
+        for (InputSplit split : result) {
+            PigSplit pigSplit = (PigSplit) split;
+            // write to a byte array output stream
+            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+            DataOutput out = new DataOutputStream(outputStream);
+            pigSplit.write(out);
+            // restore the pig split from the byte array
+            ByteArrayInputStream inputStream = new ByteArrayInputStream(
+                    outputStream.toByteArray());
+
+            DataInput in = new DataInputStream(inputStream);
+            PigSplit anotherSplit = new PigSplit();
+            anotherSplit.setConf(conf);
+
+            anotherSplit.readFields(in);
+
+            Assert.assertEquals(700, anotherSplit.getLength());
+            checkLocationOrdering(pigSplit.getLocations(), new String[] { "l5",
+                    "l1", "l6", "l3", "l4" });
+
+            Assert.assertEquals(3, anotherSplit.getNumPaths());
+
+            Assert.assertEquals(
+                    "org.apache.hadoop.mapreduce.lib.input.FileSplit",
+                    (anotherSplit.getWrappedSplit(0).getClass().getName()));
+            Assert.assertEquals(
+                    "org.apache.hadoop.mapreduce.lib.input.FileSplit",
+                    (anotherSplit.getWrappedSplit(1).getClass().getName()));
+            Assert.assertEquals(
+                    "org.apache.hadoop.mapreduce.lib.input.FileSplit",
+                    (anotherSplit.getWrappedSplit(2).getClass().getName()));
+        }
+    }
+
+    @Test
+    public void test11() throws IOException, InterruptedException {
+        // verify locations in order
+        ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
+
+        // first split is parquetinputsplit
+        rawSplits.add(new ParquetInputSplit(new Path("path1"), 0, 100,
+                new String[] { "l1", "l2", "l3" },
+                new ArrayList<BlockMetaData>(), "", "",
+                new HashMap<String, String>(), new HashMap<String, String>()));
+        // second split is file split
+        rawSplits.add(new FileSplit(new Path("path2"), 0, 400, new String[] {
+                "l5", "l6", "l1" }));
+
+        List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
+                null, true, conf);
+
+        // pig combines two into one pigsplit
+        Assert.assertEquals(result.size(), 1);
+
+        for (InputSplit split : result) {
+            PigSplit pigSplit = (PigSplit) split;
+
+            // write to a byte array output stream
+            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+            DataOutput out = new DataOutputStream(outputStream);
+            pigSplit.write(out);
+            // restore the pig split from the byte array
+            ByteArrayInputStream inputStream = new ByteArrayInputStream(
+                    outputStream.toByteArray());
+
+            DataInput in = new DataInputStream(inputStream);
+            PigSplit anotherSplit = new PigSplit();
+            anotherSplit.setConf(conf);
+            anotherSplit.readFields(in);
+
+            Assert.assertEquals(500, anotherSplit.getLength());
+
+            Assert.assertEquals(2, anotherSplit.getNumPaths());
+            Assert.assertEquals("parquet.hadoop.ParquetInputSplit",
+                    (anotherSplit.getWrappedSplit(0).getClass().getName()));
+            Assert.assertEquals(
+                    "org.apache.hadoop.mapreduce.lib.input.FileSplit",
+                    (anotherSplit.getWrappedSplit(1).getClass().getName()));
+        }
+    }
+    
     private void checkLocations(String[] actual, String[] expected) {
         HashSet<String> expectedSet = new HashSet<String>();
         for (String str : expected)



Mime
View raw message