tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1477916 - in /incubator/tez/branches/TEZ-1: tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/ tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/ tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/ tez-ma...
Date Wed, 01 May 2013 07:02:33 GMT
Author: sseth
Date: Wed May  1 07:02:32 2013
New Revision: 1477916

URL: http://svn.apache.org/r1477916
Log:
TEZ-95. MRPartitioner should get num partitions from TezTaskContext (sseth)

Added:
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
Modified:
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
    incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java?rev=1477916&r1=1477915&r2=1477916&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
Wed May  1 07:02:32 2013
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.dag.api.TezConfiguration;
 import com.google.common.collect.Maps;
@@ -59,6 +60,7 @@ public class DeprecatedKeys {
     populateMRToEngineParamMap();
     populateMRToDagParamMap();
     populateMultiStageParamMap();
+    addDeprecatedKeys();
   }
   
   
@@ -143,11 +145,7 @@ public class DeprecatedKeys {
     registerMRToEngineKeyTranslation(MRJobConfig.RECORDS_BEFORE_PROGRESS, TezJobConfig.RECORDS_BEFORE_PROGRESS);
     
     registerMRToEngineKeyTranslation(MRJobConfig.JOB_LOCAL_DIR, MRConfig.LOCAL_DIR);
-        
-    registerMRToEngineKeyTranslation(MRJobConfig.NUM_REDUCES, TezJobConfig.TEZ_ENGINE_TASK_OUTDEGREE);
 
-    registerMRToEngineKeyTranslation(MRJobConfig.NUM_MAPS, TezJobConfig.TEZ_ENGINE_TASK_INDEGREE);
-    
     registerMRToEngineKeyTranslation(MRJobConfig.IO_SORT_FACTOR, TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR);
     
     registerMRToEngineKeyTranslation(MRJobConfig.MAP_SORT_SPILL_PERCENT, TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT);
@@ -195,12 +193,27 @@ public class DeprecatedKeys {
     registerMRToEngineKeyTranslation("map.sort.class", TezJobConfig.TEZ_ENGINE_INTERNAL_SORTER_CLASS);
     
     registerMRToEngineKeyTranslation(MRJobConfig.GROUP_COMPARATOR_CLASS, TezJobConfig.TEZ_ENGINE_GROUP_COMPARATOR_CLASS);
+    
+    registerMRToEngineKeyTranslation(MRJobConfig.NUM_REDUCES, TezJobConfig.TEZ_ENGINE_TASK_OUTDEGREE);
+
+    registerMRToEngineKeyTranslation(MRJobConfig.NUM_MAPS, TezJobConfig.TEZ_ENGINE_TASK_INDEGREE);
+  }
+  
+  private static void addDeprecatedKeys() {
+    // Adding deprecation for num_maps, num_reduces - in case some component in
+    // Hadoop MR land uses these. Tez components already use tez-in/out-degree.
+    _(MRJobConfig.NUM_REDUCES, TezJobConfig.TEZ_ENGINE_TASK_OUTDEGREE);
+    _(MRJobConfig.NUM_MAPS, TezJobConfig.TEZ_ENGINE_TASK_INDEGREE);
   }
 
   private static void registerMRToEngineKeyTranslation(String mrKey,
       String tezKey) {
     mrParamToEngineParamMap.put(mrKey, tezKey);
   }
+  
+  private static void _(String mrKey, String tezKey) {
+    Configuration.addDeprecation(mrKey, tezKey);
+  }
 
   public static Map<String, String> getMRToDAGParamMap() {
     return Collections.unmodifiableMap(mrParamToDAGParamMap);

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java?rev=1477916&r1=1477915&r2=1477916&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
Wed May  1 07:02:32 2013
@@ -52,9 +52,11 @@ public class MultiStageMRConfToTezTransl
 
     for (int i = 0; i < allConfs.length; i++) {
       processDirectConversion(allConfs[i]);
+      // XXX How are the number of reducers being set correctly in YARNRUNNER ?
     }
     for (int i = 0; i < allConfs.length - 1; i++) {
       processMultiStageDepreaction(allConfs[i], allConfs[i + 1]);
+      
     }
     // Unset unnecessary keys in the last stage. Will end up being called for
     // single stage as well which should be harmless.
@@ -111,9 +113,12 @@ public class MultiStageMRConfToTezTransl
         .entrySet()) {
       if (conf.get(dep.getKey()) != null) {
         // TODO Deprecation reason does not seem to reflect in the config ?
-        conf.set(dep.getValue(), conf.get(dep.getKey()),
-            DeprecationReason.DEPRECATED_DIRECT_TRANSLATION.name());
+        // The ordering is important in case of keys which are also deprecated.
+        // Unset will unset the deprecated keys and all it's variants.
+        String value = conf.get(dep.getKey());
         conf.unset(dep.getKey());
+        conf.set(dep.getValue(), value,
+            DeprecationReason.DEPRECATED_DIRECT_TRANSLATION.name());
       }
     }
   }
@@ -129,11 +134,12 @@ public class MultiStageMRConfToTezTransl
         .getMultiStageParamMap().entrySet()) {
       if (srcConf.get(dep.getKey()) != null) {
         if (destConf != null) {
-          srcConf.set(dep.getValue().get(MultiStageKeys.OUTPUT),
-              srcConf.get(dep.getKey()));
-          destConf.set(dep.getValue().get(MultiStageKeys.INPUT),
-              srcConf.get(dep.getKey()));
+          String value = srcConf.get(dep.getKey());
           srcConf.unset(dep.getKey());
+          srcConf.set(dep.getValue().get(MultiStageKeys.OUTPUT), value,
+              DeprecationReason.DEPRECATED_MULTI_STAGE.name());
+          destConf.set(dep.getValue().get(MultiStageKeys.INPUT), value,
+              DeprecationReason.DEPRECATED_MULTI_STAGE.name());
         } else { // Last stage. Just remove the key reference.
           srcConf.unset(dep.getKey());
         }

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java?rev=1477916&r1=1477915&r2=1477916&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
Wed May  1 07:02:32 2013
@@ -54,7 +54,8 @@ public class MRPartitioner implements or
     }
     
     useNewApi = jobConf.getUseNewMapper();
-    final int partitions = jobConf.getNumReduceTasks();
+    final int partitions = this.task.getTezEngineTaskContext()
+        .getOutputSpecList().get(0).getNumOutputs();
     if (useNewApi) {
       if (partitions > 1) {
         try {

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java?rev=1477916&r1=1477915&r2=1477916&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
Wed May  1 07:02:32 2013
@@ -90,7 +90,7 @@ extends RunningTaskContext {
   protected GcTimeUpdater gcUpdater;
   private ResourceCalculatorProcessTree pTree;
   private long initCpuCumulativeTime = 0;
-  protected TezEngineTaskContext tezTaskContext;
+  protected TezEngineTaskContext tezEngineTaskContext;
   protected TezTaskAttemptID taskAttemptId;
   
   /* flag to track whether task is done */
@@ -120,7 +120,7 @@ extends RunningTaskContext {
      new HashMap<String, FileSystemStatisticUpdater>();
 
   public MRTask(TezEngineTaskContext context) {
-    tezTaskContext = context;
+    tezEngineTaskContext = context;
     this.taskAttemptId = context.getTaskAttemptId();
     // TODO TEZAM4 Figure out initialization / run sequence of Input, Process,
     // Output. Phase is MR specific.
@@ -711,4 +711,8 @@ extends RunningTaskContext {
   public TezTaskAttemptID getTaskAttemptId() {
     return taskAttemptId;
   }
+  
+  public TezEngineTaskContext getTezEngineTaskContext() {
+    return tezEngineTaskContext;
+  }
 }

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java?rev=1477916&r1=1477915&r2=1477916&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
Wed May  1 07:02:32 2013
@@ -76,7 +76,7 @@ public class MapProcessor extends MRTask
   InterruptedException {
     super.initialize(conf, master);
     TaskSplitMetaInfo[] allMetaInfo = readSplits();
-    TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[tezTaskContext
+    TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[tezEngineTaskContext
         .getTaskAttemptId().getTaskID().getId()];
     splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(),
         thisTaskMetaInfo.getStartOffset());
@@ -117,7 +117,7 @@ public class MapProcessor extends MRTask
 
     // If there are no reducers then there won't be any sort. Hence the map 
     // phase will govern the entire attempt's progress.
-    if (jobConf.getNumReduceTasks() == 0) {
+    if (tezEngineTaskContext.getOutputSpecList().get(0).getNumOutputs() == 0) {
       mapPhase = getProgress().addPhase("map", 1.0f);
     } else {
       // If there are reducers then the entire attempt's progress will be 
@@ -155,7 +155,8 @@ public class MapProcessor extends MRTask
     
     RecordReader in = new OldRecordReader(input);
         
-    int numReduceTasks = job.getNumReduceTasks();
+    int numReduceTasks = tezEngineTaskContext.getOutputSpecList().get(0)
+        .getNumOutputs();
     LOG.info("numReduceTasks: " + numReduceTasks);
 
     OutputCollector collector = new OldOutputCollector(output);
@@ -383,5 +384,4 @@ public class MapProcessor extends MRTask
         FileSystem.getLocal(getConf()));
     return allTaskSplitMetaInfo;
   }
-  
 }

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java?rev=1477916&r1=1477915&r2=1477916&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
Wed May  1 07:02:32 2013
@@ -91,7 +91,6 @@ implements Processor {
   public void initialize(Configuration conf, Master master) throws IOException,
       InterruptedException {
     super.initialize(conf, master);
-    
   }
 
   @Override

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java?rev=1477916&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
(added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
Wed May  1 07:02:32 2013
@@ -0,0 +1,45 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.hadoop;
+
+import org.junit.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.tez.common.TezJobConfig;
+import org.junit.Test;
+
+public class TestDeprecatedKeys {
+
+  @Test
+  public void testNumTasksDeprecation() {
+    
+    Configuration conf = new Configuration(false);
+    conf.setInt(TezJobConfig.TEZ_ENGINE_TASK_INDEGREE, 6);
+    conf.setInt(TezJobConfig.TEZ_ENGINE_TASK_OUTDEGREE, 3);
+    
+    
+    DeprecatedKeys.init();
+    JobConf jobConf = new JobConf(conf);
+    
+    Assert.assertEquals(6, jobConf.getNumMapTasks());
+    Assert.assertEquals(3, jobConf.getNumReduceTasks());
+  }
+  
+}

Modified: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java?rev=1477916&r1=1477915&r2=1477916&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
Wed May  1 07:02:32 2013
@@ -573,7 +573,7 @@ public class YARNRunner implements Clien
     LOG.info("XXXX Parsing job config"
         + ", numMaps=" + numMaps
         + ", numReduces=" + numReduces
-        + ", intermediateReduces=" + intermediateReduces);
+        + ", intermediateReduceStages=" + intermediateReduces);
 
     // configure map vertex
     String mapProcessor = "org.apache.tez.mapreduce.processor.map.MapProcessor";
@@ -865,13 +865,16 @@ public class YARNRunner implements Clien
     FileSystem fs = FileSystem.get(conf);
     JobConf jobConf = new JobConf(new TezConfiguration(conf));
     Configuration tezJobConf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
-    
+
     // This will replace job.xml in the staging dir.
     writeTezConf(jobSubmitDir, fs, tezJobConf);    
 
     // FIXME set up job resources
     Map<String, LocalResource> jobLocalResources =
         createJobLocalResources(tezJobConf, jobSubmitDir);
+    
+    // FIXME createDAG should take the tezConf as a parameter, instead of using
+    // MR keys.
     DAG dag = createDAG(fs, jobId, jobConf, jobSubmitDir, ts,
         jobLocalResources);
     ApplicationSubmissionContext appContext =



Mime
View raw message