pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From roh...@apache.org
Subject svn commit: r1795466 - in /pig/branches/spark: src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java test/excluded-tests-spark
Date Wed, 17 May 2017 21:52:20 GMT
Author: rohini
Date: Wed May 17 21:52:19 2017
New Revision: 1795466

URL: http://svn.apache.org/viewvc?rev=1795466&view=rev
Log:
PIG-5135: HDFS bytes read stats are always 0 in Spark mode (szita via rohini)

Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
    pig/branches/spark/test/excluded-tests-spark

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1795466&r1=1795465&r2=1795466&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
Wed May 17 21:52:19 2017
@@ -61,14 +61,6 @@ public class PigInputFormat extends Inpu
     public static final String PIG_INPUT_SIGNATURES = "pig.inpSignatures";
     public static final String PIG_INPUT_LIMITS = "pig.inpLimits";
 
-    /**
-     * @deprecated Use {@link UDFContext} instead in the following way to get
-     * the job's {@link Configuration}:
-     * <pre>UdfContext.getUdfContext().getJobConf()</pre>
-     */
-    @Deprecated
-    public static Configuration sJob;
-
     /* (non-Javadoc)
      * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache.hadoop.mapreduce.InputSplit,
org.apache.hadoop.mapreduce.TaskAttemptContext)
      */
@@ -78,43 +70,66 @@ public class PigInputFormat extends Inpu
             org.apache.hadoop.mapreduce.InputSplit split,
             TaskAttemptContext context) throws IOException,
             InterruptedException {
-        // We need to create a TaskAttemptContext based on the Configuration which
-        // was used in the getSplits() to produce the split supplied here. For
-        // this, let's find out the input of the script which produced the split
-        // supplied here and then get the corresponding Configuration and setup
-        // TaskAttemptContext based on it and then call the real InputFormat's
-        // createRecordReader() method
-
-        PigSplit pigSplit = (PigSplit)split;
-        activeSplit = pigSplit;
-        // XXX hadoop 20 new API integration: get around a hadoop 20 bug by
-        // passing total # of splits to each split so it can be retrieved
-        // here and set it to the configuration object. This number is needed
-        // by PoissonSampleLoader to compute the number of samples
-        int n = pigSplit.getTotalSplits();
-        context.getConfiguration().setInt("pig.mapsplits.count", n);
-        Configuration conf = context.getConfiguration();
-        PigContext.setPackageImportList((ArrayList<String>) ObjectSerializer
-                .deserialize(conf.get("udf.import.list")));
-        MapRedUtil.setupUDFContext(conf);
-        LoadFunc loadFunc = getLoadFunc(pigSplit.getInputIndex(), conf);
-        // Pass loader signature to LoadFunc and to InputFormat through
-        // the conf
-        passLoadSignature(loadFunc, pigSplit.getInputIndex(), conf);
-
-        // merge entries from split specific conf into the conf we got
-        PigInputFormat.mergeSplitSpecificConf(loadFunc, pigSplit, conf);
-
-        // for backward compatibility
-        PigInputFormat.sJob = conf;
+        RecordReaderFactory factory = new RecordReaderFactory(split, context);
+        return factory.createRecordReader();
+    }
 
-        InputFormat inputFormat = loadFunc.getInputFormat();
 
-        List<Long> inpLimitLists =
-                (ArrayList<Long>)ObjectSerializer.deserialize(
-                        conf.get(PIG_INPUT_LIMITS));
+    /**
+     * Helper class to create record reader
+     */
+    protected static class RecordReaderFactory {
+        protected InputFormat inputFormat;
+        protected PigSplit pigSplit;
+        protected LoadFunc loadFunc;
+        protected TaskAttemptContext context;
+        protected long limit;
+
+        public RecordReaderFactory(org.apache.hadoop.mapreduce.InputSplit split,
+                                   TaskAttemptContext context) throws IOException {
+
+            // We need to create a TaskAttemptContext based on the Configuration which
+            // was used in the getSplits() to produce the split supplied here. For
+            // this, let's find out the input of the script which produced the split
+            // supplied here and then get the corresponding Configuration and setup
+            // TaskAttemptContext based on it and then call the real InputFormat's
+            // createRecordReader() method
+
+            PigSplit pigSplit = (PigSplit)split;
+            // XXX hadoop 20 new API integration: get around a hadoop 20 bug by
+            // passing total # of splits to each split so it can be retrieved
+            // here and set it to the configuration object. This number is needed
+            // by PoissonSampleLoader to compute the number of samples
+            int n = pigSplit.getTotalSplits();
+            context.getConfiguration().setInt("pig.mapsplits.count", n);
+            Configuration conf = context.getConfiguration();
+            PigContext.setPackageImportList((ArrayList<String>) ObjectSerializer
+                    .deserialize(conf.get("udf.import.list")));
+            MapRedUtil.setupUDFContext(conf);
+            LoadFunc loadFunc = getLoadFunc(pigSplit.getInputIndex(), conf);
+            // Pass loader signature to LoadFunc and to InputFormat through
+            // the conf
+            passLoadSignature(loadFunc, pigSplit.getInputIndex(), conf);
+
+            // merge entries from split specific conf into the conf we got
+            PigInputFormat.mergeSplitSpecificConf(loadFunc, pigSplit, conf);
+
+            InputFormat inputFormat = loadFunc.getInputFormat();
+
+            List<Long> inpLimitLists =
+                    (ArrayList<Long>)ObjectSerializer.deserialize(
+                            conf.get(PIG_INPUT_LIMITS));
+
+            this.inputFormat = inputFormat;
+            this.pigSplit = pigSplit;
+            this.loadFunc = loadFunc;
+            this.context = context;
+            this.limit = inpLimitLists.get(pigSplit.getInputIndex());
+        }
 
-        return new PigRecordReader(inputFormat, pigSplit, loadFunc, context, inpLimitLists.get(pigSplit.getInputIndex()));
+        public org.apache.hadoop.mapreduce.RecordReader<Text, Tuple> createRecordReader()
throws IOException, InterruptedException {
+            return new PigRecordReader(inputFormat, pigSplit, loadFunc, context, limit);
+        }
     }
 
 
@@ -339,10 +354,4 @@ public class PigInputFormat extends Inpu
         return pigSplit;
     }
 
-    public static PigSplit getActiveSplit() {
-        return activeSplit;
-    }
-
-    private static PigSplit activeSplit;
-
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java?rev=1795466&r1=1795465&r2=1795466&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
Wed May 17 21:52:19 2017
@@ -18,19 +18,25 @@
 package org.apache.pig.backend.hadoop.executionengine.spark.running;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigRecordReader;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigSplit;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.data.Tuple;
@@ -42,14 +48,13 @@ import org.apache.pig.tools.pigstats.spa
 
 public class PigInputFormatSpark extends PigInputFormat {
 
-	@Override
-	public RecordReader<Text, Tuple> createRecordReader(InputSplit split,
-			TaskAttemptContext context) throws IOException,
-			InterruptedException {
+    @Override
+    public RecordReader<Text, Tuple> createRecordReader(InputSplit split, TaskAttemptContext
context) throws
+            IOException, InterruptedException {
         resetUDFContext();
         //PigSplit#conf is the default hadoop configuration, we need get the configuration
         //from context.getConfigration() to retrieve pig properties
-        PigSplit pigSplit = (PigSplit) split;
+        PigSplit pigSplit = ((SparkPigSplit) split).getWrappedPigSplit();
         Configuration conf = context.getConfiguration();
         pigSplit.setConf(conf);
         //Set current splitIndex in PigMapReduce.sJobContext.getConfiguration.get(PigImplConstants.PIG_SPLIT_INDEX)
@@ -61,7 +66,42 @@ public class PigInputFormatSpark extends
         // Here JobConf is first available in spark Executor thread, we initialize PigContext,UDFContext
and
         // SchemaTupleBackend by reading properties from JobConf
         initialize(conf);
-        return super.createRecordReader(split, context);
+
+        SparkRecordReaderFactory sparkRecordReaderFactory = new SparkRecordReaderFactory(pigSplit,
context);
+        return sparkRecordReaderFactory.createRecordReader();
+    }
+
+    /**
+     * This is where we have to wrap PigSplits into SparkPigSplits
+     * @param jobcontext
+     * @return
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    @Override
+    public List<InputSplit> getSplits(JobContext jobcontext) throws IOException, InterruptedException
{
+        List<InputSplit> sparkPigSplits = new ArrayList<>();
+        List<InputSplit> originalSplits = super.getSplits(jobcontext);
+
+        boolean isFileSplits = true;
+        for (InputSplit inputSplit : originalSplits) {
+            PigSplit split = (PigSplit)inputSplit;
+            if (!(split.getWrappedSplit() instanceof FileSplit)) {
+                isFileSplits = false;
+                break;
+            }
+        }
+
+        for (InputSplit inputSplit : originalSplits) {
+            PigSplit split = (PigSplit) inputSplit;
+            if (!isFileSplits) {
+                sparkPigSplits.add(new SparkPigSplit.GenericSparkPigSplit(split));
+            } else {
+                sparkPigSplits.add(new SparkPigSplit.FileSparkPigSplit(split));
+            }
+        }
+
+        return sparkPigSplits;
     }
 
     private void initialize(Configuration jobConf) throws IOException {
@@ -78,4 +118,17 @@ public class PigInputFormatSpark extends
     private void resetUDFContext() {
 		UDFContext.getUDFContext().reset();
 	}
+
+
+    static class SparkRecordReaderFactory extends PigInputFormat.RecordReaderFactory {
+
+        public SparkRecordReaderFactory(InputSplit split, TaskAttemptContext context) throws
IOException {
+            super(split, context);
+        }
+
+        @Override
+        public RecordReader<Text, Tuple> createRecordReader() throws IOException, InterruptedException
{
+            return new SparkPigRecordReader(inputFormat, pigSplit, loadFunc, context, limit);
+        }
+    }
 }
\ No newline at end of file

Modified: pig/branches/spark/test/excluded-tests-spark
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/excluded-tests-spark?rev=1795466&r1=1795465&r2=1795466&view=diff
==============================================================================
--- pig/branches/spark/test/excluded-tests-spark (original)
+++ pig/branches/spark/test/excluded-tests-spark Wed May 17 21:52:19 2017
@@ -2,6 +2,3 @@
 **/tez/*.java
 **/TestNativeMapReduce.java
 **/TestCounters.java
-
-#TODO: PIG-5135 fix for Spark mode
-**/TestOrcStoragePushdown.java



Mime
View raw message