pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From roh...@apache.org
Subject svn commit: r1724131 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/backend/hadoop/executionengine/tez/util/ test/org/apache/pig/tez/
Date Mon, 11 Jan 2016 21:18:06 GMT
Author: rohini
Date: Mon Jan 11 21:18:06 2016
New Revision: 1724131

URL: http://svn.apache.org/viewvc?rev=1724131&view=rev
Log:
PIG-4411: Support for vertex level configuration like speculative execution (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
    pig/trunk/test/org/apache/pig/tez/TestGroupConstParallelTez.java
    pig/trunk/test/org/apache/pig/tez/TestJobSubmissionTez.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1724131&r1=1724130&r2=1724131&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jan 11 21:18:06 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-4411: Support for vertex level configuration like speculative execution (rohini)
+
 PIG-4775: Better default values for shuffle bytes per reducer (rohini)
 
 PIG-4753: Pigmix should have option to delete outputs after completing the tests (mitdesai
via rohini)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1724131&r1=1724130&r2=1724131&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Mon
Jan 11 21:18:06 2016
@@ -172,6 +172,7 @@ public class TezDagBuilder extends TezOp
     private Map<String, LocalResource> localResources;
     private PigContext pc;
     private Configuration globalConf;
+    private Configuration pigContextConf;
     private FileSystem fs;
     private long intermediateTaskInputSize;
     private Set<String> inputSplitInDiskVertices;
@@ -181,21 +182,98 @@ public class TezDagBuilder extends TezOp
     private String serializedPigContext;
     private String serializedUDFImportList;
 
+    // Map corresponds to root vertices, reduce to intermediate and leaf vertices
+    private Resource mapTaskResource;
+    private Resource reduceTaskResource;
+    private Map<String, String> mapTaskEnv = new HashMap<String, String>();
+    private Map<String, String> reduceTaskEnv = new HashMap<String, String>();
+    private String mapTaskLaunchCmdOpts;
+    private String reduceTaskLaunchCmdOpts;
+
     public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag,
             Map<String, LocalResource> localResources) {
         super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
         this.pc = pc;
-        this.globalConf = ConfigurationUtil.toConfiguration(pc.getProperties(), true);
         this.localResources = localResources;
         this.dag = dag;
         this.inputSplitInDiskVertices = new HashSet<String>();
 
         try {
-            // Add credentials from binary token file and get tokens for namenodes
-            // specified in mapreduce.job.hdfs-servers
-            SecurityHelper.populateTokenCache(globalConf, dag.getCredentials());
+            initialize(pc);
+
+            udfContextSeparator = new TezUDFContextSeparator(plan,
+                    new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+            udfContextSeparator.visit();
         } catch (IOException e) {
-            throw new RuntimeException("Error while fetching delegation tokens", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void initialize(PigContext pc) throws IOException {
+
+        this.globalConf = ConfigurationUtil.toConfiguration(pc.getProperties(), true);
+
+        this.pigContextConf = ConfigurationUtil.toConfiguration(pc.getProperties(), false);
+        MRToTezHelper.processMRSettings(pigContextConf, globalConf);
+
+        // Add credentials from binary token file and get tokens for namenodes
+        // specified in mapreduce.job.hdfs-servers
+        SecurityHelper.populateTokenCache(globalConf, dag.getCredentials());
+
+        // All these classes are @InterfaceAudience.Private in Hadoop. Switch to Tez methods
in TEZ-1012
+        // set the timestamps, public/private visibility of the archives and files
+        ClientDistributedCacheManager
+                .determineTimestampsAndCacheVisibilities(globalConf);
+        // get DelegationToken for each cached file
+        ClientDistributedCacheManager.getDelegationTokens(globalConf,
+                dag.getCredentials());
+        MRApps.setupDistributedCache(globalConf, this.localResources);
+        dag.addTaskLocalFiles(this.localResources);
+
+        int mapMemoryMB;
+        int reduceMemoryMB;
+        int mapVCores;
+        int reduceVCores;
+        if (globalConf.get(TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB) != null) {
+            mapMemoryMB = globalConf.getInt(
+                    TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB,
+                    TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB_DEFAULT);
+            reduceMemoryMB = mapMemoryMB;
+        } else {
+            // If tez setting is not defined, try MR setting
+            mapMemoryMB = globalConf.getInt(MRJobConfig.MAP_MEMORY_MB,
+                    MRJobConfig.DEFAULT_MAP_MEMORY_MB);
+            reduceMemoryMB = globalConf.getInt(MRJobConfig.REDUCE_MEMORY_MB,
+                    MRJobConfig.DEFAULT_REDUCE_MEMORY_MB);
+        }
+
+        if (globalConf.get(TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES) != null) {
+            mapVCores = globalConf.getInt(
+                    TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES,
+                    TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES_DEFAULT);
+            reduceVCores = mapVCores;
+        } else {
+            mapVCores = globalConf.getInt(MRJobConfig.MAP_CPU_VCORES,
+                    MRJobConfig.DEFAULT_MAP_CPU_VCORES);
+            reduceVCores = globalConf.getInt(MRJobConfig.REDUCE_CPU_VCORES,
+                    MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
+        }
+        mapTaskResource = Resource.newInstance(mapMemoryMB, mapVCores);
+        reduceTaskResource = Resource.newInstance(reduceMemoryMB, reduceVCores);
+
+        if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) == null) {
+            // If tez setting is not defined
+            MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, mapTaskEnv, true);
+            MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, reduceTaskEnv, true);
+        }
+
+        if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) != null) {
+            mapTaskLaunchCmdOpts = globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS);
+            reduceTaskLaunchCmdOpts = mapTaskLaunchCmdOpts;
+        } else {
+            // If tez setting is not defined, try MR setting
+            mapTaskLaunchCmdOpts = MRHelpers.getJavaOptsForMRMapper(globalConf);
+            reduceTaskLaunchCmdOpts = MRHelpers.getJavaOptsForMRReducer(globalConf);
         }
 
         try {
@@ -212,21 +290,13 @@ public class TezDagBuilder extends TezOp
                         InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                         InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
 
-        try {
-            serializedPigContext = ObjectSerializer.serialize(pc);
-            serializedUDFImportList = ObjectSerializer.serialize(PigContext.getPackageImportList());
-
-            udfContextSeparator = new TezUDFContextSeparator(plan,
-                    new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
-            udfContextSeparator.visit();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
+        serializedPigContext = ObjectSerializer.serialize(pc);
+        serializedUDFImportList = ObjectSerializer.serialize(PigContext.getPackageImportList());
     }
 
-    public String getSerializedTezPlan() throws IOException {
+    private String getSerializedTezPlan() throws IOException {
         if (serializedTezPlan == null) {
-            // Initialize lazy as auto parallelism might not be in play
+            // Initialize lazy instead of constructor as this might not be needed
             serializedTezPlan = ObjectSerializer.serialize(getPlan());
         }
         return serializedTezPlan;
@@ -368,7 +438,7 @@ public class TezDagBuilder extends TezOp
         InputDescriptor in = InputDescriptor.create(edge.inputClassName);
         OutputDescriptor out = OutputDescriptor.create(edge.outputClassName);
 
-        Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), false);
+        Configuration conf = new Configuration(pigContextConf);
 
         if (!combinePlan.isEmpty()) {
             udfContextSeparator.serializeUDFContextForEdge(conf, from, to, UDFType.USERFUNC);
@@ -437,8 +507,6 @@ public class TezDagBuilder extends TezOp
                     edge.partitionerClass.getName());
         }
 
-        MRToTezHelper.processMRSettings(conf, globalConf);
-
         in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
         out.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
 
@@ -485,7 +553,7 @@ public class TezDagBuilder extends TezOp
                 tezOp.getProcessorName());
 
         // Pass physical plans to vertex as user payload.
-        JobConf payloadConf = new JobConf(ConfigurationUtil.toConfiguration(pc.getProperties(),
false));
+        JobConf payloadConf = new JobConf(pigContextConf);
 
         // We do this so that dag.getCredentials(), job.getCredentials(),
         // job.getConfiguration().getCredentials() all reference the same Credentials object
@@ -502,7 +570,6 @@ public class TezDagBuilder extends TezOp
         setOutputFormat(job);
         payloadConf.set("udf.import.list", serializedUDFImportList);
         payloadConf.set("exectype", "TEZ");
-        MRToTezHelper.processMRSettings(payloadConf, globalConf);
 
         // Process stores
         LinkedList<POStore> stores = processStores(tezOp, payloadConf, job);
@@ -707,7 +774,7 @@ public class TezDagBuilder extends TezOp
                     }
                 }
                 if (containScatterGather && !containCustomPartitioner) {
-                    vmPluginConf = (vmPluginConf == null) ? ConfigurationUtil.toConfiguration(pc.getProperties(),
false) : vmPluginConf;
+                    vmPluginConf = (vmPluginConf == null) ? new Configuration(pigContextConf)
: vmPluginConf;
                     // Use auto-parallelism feature of ShuffleVertexManager to dynamically
                     // reduce the parallelism of the vertex
                     if (payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM,
true)
@@ -750,7 +817,7 @@ public class TezDagBuilder extends TezOp
                 // limit job starts when 1 source task finishes.
                 // If limit is part of a group by or join because their parallelism is 1,
                 // we should leave the configuration with the defaults.
-                vmPluginConf = (vmPluginConf == null) ? ConfigurationUtil.toConfiguration(pc.getProperties(),
false) : vmPluginConf;
+                vmPluginConf = (vmPluginConf == null) ? new Configuration(pigContextConf)
: vmPluginConf;
                 vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION,
"0.00001");
                 vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION,
"0.00001");
                 log.info("Set " + ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION
+ " to 0.00001 for limit vertex " + tezOp.getOperatorKey().toString());
@@ -761,41 +828,19 @@ public class TezDagBuilder extends TezOp
         if (tezOp.isUseGraceParallelism()) {
             parallel = -1;
         }
-        Resource resource;
-        if (globalConf.get(TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB)!=null &&
-                globalConf.get(TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES)!=null) {
-            resource = Resource.newInstance(globalConf.getInt(TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB,
-                    TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB_DEFAULT),
-                    globalConf.getInt(TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES,
-                    TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES_DEFAULT));
-        } else {
-            // If tez setting is not defined, try MR setting
-            resource = tezOp.isUseMRMapSettings() ? MRHelpers.getResourceForMRMapper(globalConf)
: MRHelpers.getResourceForMRReducer(globalConf);
-        }
+        Resource resource = tezOp.isUseMRMapSettings() ? mapTaskResource : reduceTaskResource;
+
         Vertex vertex = Vertex.create(tezOp.getOperatorKey().toString(), procDesc, parallel,
resource);
-        Map<String, String> taskEnv = new HashMap<String, String>();
-        MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, taskEnv, tezOp.isUseMRMapSettings());
-        vertex.setTaskEnvironment(taskEnv);
 
-        // All these classes are @InterfaceAudience.Private in Hadoop. Switch to Tez methods
in TEZ-1012
-        // set the timestamps, public/private visibility of the archives and files
-        ClientDistributedCacheManager
-                .determineTimestampsAndCacheVisibilities(globalConf);
-        // get DelegationToken for each cached file
-        ClientDistributedCacheManager.getDelegationTokens(globalConf,
-                job.getCredentials());
-        MRApps.setupDistributedCache(globalConf, localResources);
-        vertex.addTaskLocalFiles(localResources);
-
-        String javaOpts;
-        if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS)!=null) {
-            javaOpts = globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS);
+        if (tezOp.isUseMRMapSettings()) {
+            vertex.setTaskLaunchCmdOpts(mapTaskLaunchCmdOpts);
+            vertex.setTaskEnvironment(mapTaskEnv);
         } else {
-            // If tez setting is not defined, try MR setting
-            javaOpts = tezOp.isUseMRMapSettings() ? MRHelpers.getJavaOptsForMRMapper(globalConf)
-                    : MRHelpers.getJavaOptsForMRReducer(globalConf);
+            vertex.setTaskLaunchCmdOpts(reduceTaskLaunchCmdOpts);
+            vertex.setTaskEnvironment(reduceTaskEnv);
         }
-        vertex.setTaskLaunchCmdOpts(javaOpts);
+
+        MRToTezHelper.setVertexConfig(vertex, tezOp.isUseMRMapSettings(), globalConf);
 
         log.info("For vertex - " + tezOp.getOperatorKey().toString()
                 + ": parallelism=" + tezOp.getVertexParallelism()

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1724131&r1=1724130&r2=1724131&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
Mon Jan 11 21:18:06 2016
@@ -19,6 +19,7 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -48,6 +49,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfoDisk;
@@ -60,6 +62,10 @@ public class MRToTezHelper {
     private static final String JOB_SPLIT_RESOURCE_NAME = MRJobConfig.JOB_SPLIT;
     private static final String JOB_SPLIT_METAINFO_RESOURCE_NAME = MRJobConfig.JOB_SPLIT_METAINFO;
 
+    private static Map<String, String> mrAMParamToTezAMParamMap = new HashMap<String,
String>();
+    private static Map<String, String> mrMapParamToTezVertexParamMap = new HashMap<String,
String>();
+    private static Map<String, String> mrReduceParamToTezVertexParamMap = new HashMap<String,
String>();
+
     private static List<String> mrSettingsToRetain = new ArrayList<String>();
 
     private static List<String> mrSettingsToRemove = new ArrayList<String>();
@@ -68,10 +74,38 @@ public class MRToTezHelper {
     }
 
     static {
+        populateMRToTezParamsMap();
         populateMRSettingsToRetain();
         populateMRSettingsToRemove();
     }
 
+    private static void populateMRToTezParamsMap() {
+
+        //AM settings
+        mrAMParamToTezAMParamMap.put(MRJobConfig.MR_AM_VMEM_MB, TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB);
+        mrAMParamToTezAMParamMap.put(MRJobConfig.MR_AM_CPU_VCORES, TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES);
+        mrAMParamToTezAMParamMap.put(MRJobConfig.MR_AM_MAX_ATTEMPTS, TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS);
+        mrAMParamToTezAMParamMap.put(MRConfiguration.JOB_CREDENTIALS_BINARY, TezConfiguration.TEZ_CREDENTIALS_PATH);
+        mrAMParamToTezAMParamMap.put(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, TezConfiguration.TEZ_CANCEL_DELEGATION_TOKENS_ON_COMPLETION);
+
+        //Map settings
+        mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_MAX_ATTEMPTS, TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS);
+        mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_SPECULATIVE, TezConfiguration.TEZ_AM_SPECULATION_ENABLED);
+        mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_LOG_LEVEL, TezConfiguration.TEZ_TASK_LOG_LEVEL);
+        //TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY TEZ-2914 in Tez 0.8
+        mrMapParamToTezVertexParamMap.put("mapreduce.job.running.map.limit", "tez.am.vertex.max-task-concurrency");
+        //TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS TEZ-808 in Tez 0.8
+        mrMapParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, "tez.am.progress.stuck.interval-ms");
+
+        //Reduce settings
+        mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_MAX_ATTEMPTS, TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS);
+        mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_SPECULATIVE, TezConfiguration.TEZ_AM_SPECULATION_ENABLED);
+        mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_LOG_LEVEL, TezConfiguration.TEZ_TASK_LOG_LEVEL);
+        mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.reduce.limit", "tez.am.vertex.max-task-concurrency");
+        mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.map.limit", "tez.am.vertex.max-task-concurrency");
+        mrReduceParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, "tez.am.progress.stuck.interval-ms");
+    }
+
     private static void populateMRSettingsToRetain() {
 
         // FileInputFormat
@@ -93,23 +127,57 @@ public class MRToTezHelper {
 
     private static void populateMRSettingsToRemove() {
 
-        // TODO: Add all unwanted MR config once Tez UI starts showing config
-
         // FileInputFormat.listStatus() on a task can cause job failure when run from Oozie
         mrSettingsToRemove.add(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
+
+        mrSettingsToRemove.add(MRJobConfig.CACHE_ARCHIVES);
+        mrSettingsToRemove.add(MRJobConfig.CACHE_ARCHIVES_SIZES);
+        mrSettingsToRemove.add(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS);
+        mrSettingsToRemove.add(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES);
+        mrSettingsToRemove.add(MRJobConfig.CACHE_FILES);
+        mrSettingsToRemove.add(MRJobConfig.CACHE_FILES_SIZES);
+        mrSettingsToRemove.add(MRJobConfig.CACHE_FILE_TIMESTAMPS);
+        mrSettingsToRemove.add(MRJobConfig.CACHE_FILE_VISIBILITIES);
+        mrSettingsToRemove.add(MRJobConfig.CLASSPATH_FILES);
     }
 
-    private static void removeUnwantedMRSettings(Configuration tezConf) {
+    private static void removeUnwantedSettings(Configuration tezConf, boolean isAMConf) {
+
+        // It is good to clean up as much of the unapplicable settings as possible.
+        // Tez has configs set on multiple places AM, DAG, Vertex, VertexManager
+        // Plugin, Tasks (Processor, Edge, every input and output, combiner)
+        // If conf size is bigger, it places heavy pressurce on AM memory and is
+        // inefficient while sending over RPC to tasks
 
-        Iterator<Entry<String, String>> iter = tezConf.iterator();
+        for (String mrSetting : mrSettingsToRemove) {
+            tezConf.unset(mrSetting);
+        }
+
+        Iterator<Entry<String, String>> iter = new Configuration(tezConf).iterator();
         while (iter.hasNext()) {
-            Entry<String, String> next = iter.next();
-            for (String mrSetting : mrSettingsToRemove) {
-                if (next.getKey().equals(mrSetting)) {
-                    iter.remove();
-                    break;
+            String key = iter.next().getKey();
+            if (!isAMConf) {
+                // Keep the setting in AM conf to be able to connect back to the
+                // Oozie launcher job and look at the parameter values passed,
+                // but get rid of for others
+                if (key.startsWith("oozie.")) {
+                    tezConf.unset(key);
+                    continue;
                 }
             }
+            if (key.startsWith("dfs.datanode")) {
+                tezConf.unset(key);
+            } else if (key.startsWith("dfs.namenode")) {
+                tezConf.unset(key);
+            } else if (key.startsWith("yarn.nodemanager")) {
+                tezConf.unset(key);
+            } else if (key.startsWith("mapreduce.jobhistory")) {
+                tezConf.unset(key);
+            } else if (key.startsWith("mapreduce.jobtracker")) {
+                tezConf.unset(key);
+            } else if (key.startsWith("mapreduce.tasktracker")) {
+                tezConf.unset(key);
+            }
         }
     }
 
@@ -118,20 +186,10 @@ public class MRToTezHelper {
 
         // Set Tez parameters based on MR parameters.
         TezConfiguration dagAMConf = new TezConfiguration(tezConf);
-        Map<String, String> mrParamToDAGParamMap = DeprecatedKeys
-                .getMRToDAGParamMap();
 
-        for (Entry<String, String> entry : mrParamToDAGParamMap.entrySet()) {
-            if (dagAMConf.get(entry.getKey()) != null) {
-                dagAMConf.setIfUnset(entry.getValue(), dagAMConf.get(entry.getKey()));
-                dagAMConf.unset(entry.getKey());
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("MR->DAG Translating MR key: " + entry.getKey()
-                            + " to Tez key: " + entry.getValue()
-                            + " with value " + dagAMConf.get(entry.getValue()));
-                }
-            }
-        }
+
+        convertMRToTezConf(dagAMConf, dagAMConf, DeprecatedKeys.getMRToDAGParamMap());
+        convertMRToTezConf(dagAMConf, dagAMConf, mrAMParamToTezAMParamMap);
 
         String env = tezConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV);
         if (tezConf.get(MRJobConfig.MR_AM_ENV) != null) {
@@ -151,77 +209,81 @@ public class MRToTezHelper {
                 YarnConfiguration.DEFAULT_QUEUE_NAME);
         dagAMConf.setIfUnset(TezConfiguration.TEZ_QUEUE_NAME, queueName);
 
-        int amMemMB = tezConf.getInt(MRJobConfig.MR_AM_VMEM_MB,
-                MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
-        int amCores = tezConf.getInt(MRJobConfig.MR_AM_CPU_VCORES,
-                MRJobConfig.DEFAULT_MR_AM_CPU_VCORES);
-        dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, ""
-                + amMemMB);
-        dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES, ""
-                + amCores);
-
         dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_VIEW_ACLS,
                 tezConf.get(MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
 
         dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MODIFY_ACLS,
                 tezConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB, MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
 
-        dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, ""
-                + dagAMConf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,
-                        MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
-
-        if (tezConf.get(MRConfiguration.JOB_CREDENTIALS_BINARY) != null) {
-            dagAMConf.setIfUnset(TezConfiguration.TEZ_CREDENTIALS_PATH,
-                    tezConf.get(MRConfiguration.JOB_CREDENTIALS_BINARY));
-        }
-
-        if (tezConf.get(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN) != null) {
-            dagAMConf.setIfUnset(TezConfiguration.TEZ_CANCEL_DELEGATION_TOKENS_ON_COMPLETION,
-                    tezConf.get(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN));
-        }
-
         // Hardcoding at AM level instead of setting per vertex till TEZ-2710 is available
         dagAMConf.setIfUnset(TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION, "0.5");
 
-        removeUnwantedMRSettings(dagAMConf);
+        removeUnwantedSettings(dagAMConf, true);
 
         return dagAMConf;
     }
 
     /**
+     * Set config with Scope.Vertex in TezConfiguration on the vertex
+     *
+     * @param vertex Vertex on which config is to be set
+     * @param isMapVertex Whether map or reduce vertex. i.e root or intermediate/leaf vertex
+     * @param conf Config that contains the tez or equivalent mapreduce settings.
+     */
+    public static void setVertexConfig(Vertex vertex, boolean isMapVertex,
+            Configuration conf) {
+        Map<String, String> configMapping = isMapVertex ? mrMapParamToTezVertexParamMap
+                : mrReduceParamToTezVertexParamMap;
+        for (Entry<String, String> dep : configMapping.entrySet()) {
+
+            String value = conf.get(dep.getValue(), conf.get(dep.getKey()));
+            if (value != null) {
+                vertex.setConf(dep.getValue(), value);
+                LOG.debug("Setting " + dep.getValue() + " to " + value
+                        + " for the vertex " + vertex.getName());
+            }
+        }
+    }
+
+    /**
      * Process the mapreduce configuration settings and
      *    - copy as is the still required ones (like those used by FileInputFormat/FileOutputFormat)
      *    - convert and set equivalent tez runtime settings
      *    - handle compression related settings
      *
-     * @param conf Configuration on which the mapreduce settings will have to be transferred
+     * @param tezConf Configuration on which the mapreduce settings will have to be transferred
      * @param mrConf Configuration that contains mapreduce settings
      */
-    public static void processMRSettings(Configuration conf, Configuration mrConf) {
+    public static void processMRSettings(Configuration tezConf, Configuration mrConf) {
         for (String mrSetting : mrSettingsToRetain) {
             if (mrConf.get(mrSetting) != null) {
-                conf.set(mrSetting, mrConf.get(mrSetting));
+                tezConf.set(mrSetting, mrConf.get(mrSetting));
             }
         }
-        JobControlCompiler.configureCompression(conf);
-        convertMRToTezRuntimeConf(conf, mrConf);
-        removeUnwantedMRSettings(conf);
+        JobControlCompiler.configureCompression(tezConf);
+        convertMRToTezConf(tezConf, mrConf, DeprecatedKeys.getMRToTezRuntimeParamMap());
+        removeUnwantedSettings(tezConf, false);
     }
 
     /**
      * Convert MR settings to Tez settings and set on conf.
      *
-     * @param conf  Configuration on which MR equivalent Tez settings should be set
+     * @param tezConf  Configuration on which MR equivalent Tez settings should be set
      * @param mrConf Configuration that contains MR settings
+     * @param mrToTezConfigMapping  Mapping of MR config to equivalent Tez config
      */
-    private static void convertMRToTezRuntimeConf(Configuration conf, Configuration mrConf)
{
-        for (Entry<String, String> dep : DeprecatedKeys.getMRToTezRuntimeParamMap().entrySet())
{
+    private static void convertMRToTezConf(Configuration tezConf, Configuration mrConf, Map<String,
String> mrToTezConfigMapping) {
+        for (Entry<String, String> dep : mrToTezConfigMapping.entrySet()) {
             if (mrConf.get(dep.getKey()) != null) {
-                conf.unset(dep.getKey());
-                LOG.info("Setting " + dep.getValue() + " to "
-                        + mrConf.get(dep.getKey()) + " from MR setting "
-                        + dep.getKey());
-                conf.setIfUnset(dep.getValue(), mrConf.get(dep.getKey()));
+                if (tezConf.get(dep.getValue()) == null) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Setting " + dep.getValue() + " to "
+                                + mrConf.get(dep.getKey()) + " from MR setting "
+                                + dep.getKey());
+                    }
+                    tezConf.set(dep.getValue(), mrConf.get(dep.getKey()));
+                }
+                tezConf.unset(dep.getKey());
             }
         }
     }

Modified: pig/trunk/test/org/apache/pig/tez/TestGroupConstParallelTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestGroupConstParallelTez.java?rev=1724131&r1=1724130&r2=1724131&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestGroupConstParallelTez.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestGroupConstParallelTez.java Mon Jan 11 21:18:06 2016
@@ -20,6 +20,9 @@ package org.apache.pig.tez;
 
 import static org.junit.Assert.assertEquals;
 
+import java.util.HashMap;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezDagBuilder;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezCompiler;
@@ -68,7 +71,7 @@ public class TestGroupConstParallelTez e
         parallelismSetter.visit();
 
         DAG tezDag = getTezDAG(tezPlan, pc);
-        TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, null);
+        TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, new HashMap<String,
LocalResource>());
         dagBuilder.visit();
         for (Vertex v : tezDag.getVertices()) {
             if (!v.getInputVertices().isEmpty()) {
@@ -88,7 +91,7 @@ public class TestGroupConstParallelTez e
         parallelismSetter.visit();
 
         DAG tezDag = getTezDAG(tezPlan, pc);
-        TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, null);
+        TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, new HashMap<String,
LocalResource>());
         dagBuilder.visit();
         for (Vertex v : tezDag.getVertices()) {
             if (!v.getInputVertices().isEmpty()) {

Modified: pig/trunk/test/org/apache/pig/tez/TestJobSubmissionTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestJobSubmissionTez.java?rev=1724131&r1=1724130&r2=1724131&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestJobSubmissionTez.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestJobSubmissionTez.java Mon Jan 11 21:18:06 2016
@@ -20,7 +20,10 @@ package org.apache.pig.tez;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.util.HashMap;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezDagBuilder;
@@ -62,7 +65,7 @@ public class TestJobSubmissionTez extend
         parallelismSetter.visit();
 
         DAG tezDag = getTezDAG(tezPlan, pc);
-        TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, null);
+        TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, new HashMap<String,
LocalResource>());
         try {
             dagBuilder.visit();
         } catch (VisitorException jce) {
@@ -81,7 +84,7 @@ public class TestJobSubmissionTez extend
         parallelismSetter.visit();
 
         DAG tezDag = getTezDAG(tezPlan, pc);
-        TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, null);
+        TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, new HashMap<String,
LocalResource>());
         dagBuilder.visit();
         for (Vertex v : tezDag.getVertices()) {
             if (!v.getInputVertices().isEmpty()) {



Mime
View raw message