pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From roh...@apache.org
Subject svn commit: r1729745 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
Date Wed, 10 Feb 2016 21:37:05 GMT
Author: rohini
Date: Wed Feb 10 21:37:05 2016
New Revision: 1729745

URL: http://svn.apache.org/viewvc?rev=1729745&view=rev
Log:
PIG-4801: Provide backward compatibility with mapreduce mapred.task settings (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1729745&r1=1729744&r2=1729745&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Feb 10 21:37:05 2016
@@ -91,6 +91,8 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-4801: Provide backward compatibility with mapreduce mapred.task settings (rohini)
+
 PIG-4759: Fix Classresolution_1 e2e failure (rohini)
 
 PIG-4800: EvalFunc.getCacheFiles() fails for different namenode (rohini)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1729745&r1=1729744&r2=1729745&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Wed
Feb 10 21:37:05 2016
@@ -563,6 +563,8 @@ public class TezDagBuilder extends TezOp
         @SuppressWarnings("deprecation")
         Job job = new Job(payloadConf);
         payloadConf = (JobConf) job.getConfiguration();
+        //TODO: Investigate. Setting as map writes empty output.
+        //payloadConf.setBoolean(MRConfig.IS_MAP_PROCESSOR, tezOp.isUseMRMapSettings());
         payloadConf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
         payloadConf.setBoolean(MRConfiguration.REDUCER_NEW_API, true);
         payloadConf.setClass(MRConfiguration.INPUTFORMAT_CLASS,

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1729745&r1=1729744&r2=1729745&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
Wed Feb 10 21:37:05 2016
@@ -30,6 +30,8 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.pig.JVMReuseImpl;
 import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
@@ -59,6 +61,7 @@ import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
@@ -131,6 +134,22 @@ public class PigProcessor extends Abstra
 
         // To determine front-end in UDFContext
         conf.set(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID, getContext().getUniqueIdentifier());
+
+        // For compatibility with mapreduce. Some users use these configs in their UDF
+        // Copied logic from the tez class - org.apache.tez.mapreduce.output.MROutput
+        // Currently isMapperOutput is always false. Setting it to true produces empty output
with MROutput
+        boolean isMapperOutput = conf.getBoolean(MRConfig.IS_MAP_PROCESSOR, false);
+        TaskAttemptID taskAttemptId = org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl
+                .createMockTaskAttemptID(getContext().getApplicationId().getClusterTimestamp(),
+                    getContext().getTaskVertexIndex(), getContext().getApplicationId().getId(),
+                    getContext().getTaskIndex(), getContext().getTaskAttemptNumber(), isMapperOutput);
+        conf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
+        conf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
+        conf.setBoolean(JobContext.TASK_ISMAP, isMapperOutput);
+        conf.setInt(JobContext.TASK_PARTITION,
+              taskAttemptId.getTaskID().getId());
+        conf.set(JobContext.ID, taskAttemptId.getJobID().toString());
+
         conf.set(PigConstants.TASK_INDEX, Integer.toString(getContext().getTaskIndex()));
         UDFContext.getUDFContext().addJobConf(conf);
         UDFContext.getUDFContext().deserialize();



Mime
View raw message