pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1734188 - in /pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark: SparkLauncher.java converter/FilterConverter.java
Date Wed, 09 Mar 2016 04:39:25 GMT
Author: xuefu
Date: Wed Mar  9 04:39:24 2016
New Revision: 1734188

URL: http://svn.apache.org/viewvc?rev=1734188&view=rev
Log:
PIG-4827: Fix TestSample UT failure (Pallavi via Xuefu)

Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1734188&r1=1734187&r2=1734188&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Wed Mar  9 04:39:24 2016
@@ -46,6 +46,7 @@ import org.apache.pig.backend.BackendExc
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.Launcher;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
@@ -192,7 +193,7 @@ public class SparkLauncher extends Launc
                 physicalPlan, sparkContext.sc()));
         convertMap.put(POStore.class, new StoreConverter(pigContext));
         convertMap.put(POForEach.class, new ForEachConverter(confBytes));
-        convertMap.put(POFilter.class, new FilterConverter());
+        convertMap.put(POFilter.class, new FilterConverter(confBytes));
         convertMap.put(POPackage.class, new PackageConverter(confBytes));
         convertMap.put(POLocalRearrange.class, new LocalRearrangeConverter());
         convertMap.put(POGlobalRearrangeSpark.class, new GlobalRearrangeConverter());

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java?rev=1734188&r1=1734187&r2=1734188&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java
Wed Mar  9 04:39:24 2016
@@ -17,17 +17,28 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.spark.converter;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
+import java.util.UUID;
 
 import scala.runtime.AbstractFunction1;
 
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.PigConstants;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.spark.rdd.RDD;
 
 /**
@@ -36,12 +47,18 @@ import org.apache.spark.rdd.RDD;
 @SuppressWarnings({ "serial" })
 public class FilterConverter implements RDDConverter<Tuple, Tuple, POFilter> {
 
+    private byte[] confBytes;
+
+    public FilterConverter(byte[] confBytes) {
+        this.confBytes = confBytes;
+    }
+
     @Override
     public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
             POFilter physicalOperator) {
         SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
         RDD<Tuple> rdd = predecessors.get(0);
-        FilterFunction filterFunction = new FilterFunction(physicalOperator);
+        FilterFunction filterFunction = new FilterFunction(physicalOperator, confBytes);
         return rdd.filter(filterFunction);
     }
 
@@ -49,13 +66,17 @@ public class FilterConverter implements
             AbstractFunction1<Tuple, Object> implements Serializable {
 
         private POFilter poFilter;
+        private byte[] confBytes;
+        private transient JobConf jobConf;
 
-        private FilterFunction(POFilter poFilter) {
+        private FilterFunction(POFilter poFilter, byte[] confBytes) {
             this.poFilter = poFilter;
+            this.confBytes = confBytes;
         }
 
         @Override
         public Boolean apply(Tuple v1) {
+            initializeJobConf();
             Result result;
             try {
                 poFilter.setInputs(null);
@@ -80,5 +101,24 @@ public class FilterConverter implements
                         "Unexpected response code from filter: " + result);
             }
         }
+
+        void initializeJobConf() {
+            if (this.jobConf != null) {
+                return;
+            }
+            this.jobConf = KryoSerializer.deserializeJobConf(this.confBytes);
+            PigMapReduce.sJobConfInternal.set(jobConf);
+            try {
+                MapRedUtil.setupUDFContext(jobConf);
+                PigContext pc = (PigContext) ObjectSerializer.deserialize(jobConf.get("pig.pigContext"));
+                SchemaTupleBackend.initialize(jobConf, pc);
+                // Although Job ID and task index are not really applicable for spark,
+                // set them here to overcome PIG-4827
+                jobConf.set(MRConfiguration.JOB_ID, UUID.randomUUID().toString());
+                jobConf.set(PigConstants.TASK_INDEX, "0");
+            } catch (IOException ioe) {
+                throw new RuntimeException("Problem while configuring UDFContext from FilterConverter.",
ioe);
+            }
+        }
     }
 }



Mime
View raw message