hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r781683 - in /hadoop/core/trunk: ./ src/mapred/ src/mapred/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/
Date Thu, 04 Jun 2009 08:34:00 GMT
Author: ddas
Date: Thu Jun  4 08:34:00 2009
New Revision: 781683

URL: http://svn.apache.org/viewvc?rev=781683&view=rev
Log:
HADOOP-5170. Allows jobs to set max maps/reduces per-node and per-cluster. Contributed by
Matei Zaharia.

Added:
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRunningTaskLimits.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/mapred-default.xml
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=781683&r1=781682&r2=781683&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Jun  4 08:34:00 2009
@@ -134,6 +134,9 @@
     HADOOP-5844. Use mysqldump when connecting to local mysql instance in Sqoop.
     (Aaron Kimball via tomwhite)
 
+    HADOOP-5170. Allows jobs to set max maps/reduces per-node and per-cluster.
+    (Matei Zaharia via ddas)
+
   IMPROVEMENTS
 
     HADOOP-4565. Added CombineFileInputFormat to use data locality information

Modified: hadoop/core/trunk/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/mapred-default.xml?rev=781683&r1=781682&r2=781683&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/mapred-default.xml (original)
+++ hadoop/core/trunk/src/mapred/mapred-default.xml Thu Jun  4 08:34:00 2009
@@ -941,4 +941,32 @@
   </description>
 </property>
 
+<property>
+  <name>mapred.max.maps.per.node</name>
+  <value>-1</value>
+  <description>Per-node limit on running map tasks for the job. A value
+    of -1 signifies no limit.</description>
+</property>
+
+<property>
+  <name>mapred.max.reduces.per.node</name>
+  <value>-1</value>
+  <description>Per-node limit on running reduce tasks for the job. A value
+    of -1 signifies no limit.</description>
+</property>
+
+<property>
+  <name>mapred.running.map.limit</name>
+  <value>-1</value>
+  <description>Cluster-wide limit on running map tasks for the job. A value
+    of -1 signifies no limit.</description>
+</property>
+
+<property>
+  <name>mapred.running.reduce.limit</name>
+  <value>-1</value>
+  <description>Cluster-wide limit on running reduce tasks for the job. A value
+    of -1 signifies no limit.</description>
+</property>
+
 </configuration>

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java?rev=781683&r1=781682&r2=781683&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java Thu Jun  4 08:34:00
2009
@@ -1430,6 +1430,78 @@
   }
   
   /**
+   * Get the per-node limit on running maps for the job
+   * 
+   * @return per-node running map limit
+   */
+  public int getMaxMapsPerNode() {
+    return getInt("mapred.max.maps.per.node", -1);
+  }
+  
+  /**
+   * Set the per-node limit on running maps for the job
+   * 
+   * @param limit per-node running map limit
+   */
+  public void setMaxMapsPerNode(int limit) {
+    setInt("mapred.max.maps.per.node", limit);
+  }
+  
+  /**
+   * Get the per-node limit on running reduces for the job
+   * 
+   * @return per-node running reduce limit
+   */
+  public int getMaxReducesPerNode() {
+    return getInt("mapred.max.reduces.per.node", -1);
+  }
+  
+  /**
+   * Set the per-node limit on running reduces for the job
+   * 
+   * @param limit per-node running reduce limit
+   */
+  public void setMaxReducesPerNode(int limit) {
+    setInt("mapred.max.reduces.per.node", limit);
+  }
+  
+  /**
+   * Get the cluster-wide limit on running maps for the job
+   * 
+   * @return cluster-wide running map limit
+   */
+  public int getRunningMapLimit() {
+    return getInt("mapred.running.map.limit", -1);
+  }
+  
+  /**
+   * Set the cluster-wide limit on running maps for the job
+   * 
+   * @param limit cluster-wide running map limit
+   */
+  public void setRunningMapLimit(int limit) {
+    setInt("mapred.running.map.limit", limit);
+  }
+  
+  /**
+   * Get the cluster-wide limit on running reduces for the job
+   * 
+   * @return cluster-wide running reduce limit
+   */
+  public int getRunningReduceLimit() {
+    return getInt("mapred.running.reduce.limit", -1);
+  }
+  
+  /**
+   * Set the cluster-wide limit on running reduces for the job
+   * 
+   * @param limit cluster-wide running reduce limit
+   */
+  public void setRunningReduceLimit(int limit) {
+    setInt("mapred.running.reduce.limit", limit);
+  }
+  
+  /**
    * Normalize the negative values in configuration
    * 
    * @param val

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=781683&r1=781682&r2=781683&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Thu Jun  4 08:34:00
2009
@@ -87,6 +87,12 @@
   int speculativeMapTasks = 0;
   int speculativeReduceTasks = 0;
   
+  // Limits on concurrent running tasks per-node and cluster-wide
+  private int maxMapsPerNode;
+  private int maxReducesPerNode;
+  private int runningMapLimit;
+  private int runningReduceLimit;
+  
   int mapFailuresPercent = 0;
   int reduceFailuresPercent = 0;
   int failedMapTIPs = 0;
@@ -257,6 +263,11 @@
 
     this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
     this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
+
+    this.maxMapsPerNode = conf.getMaxMapsPerNode();
+    this.maxReducesPerNode = conf.getMaxReducesPerNode();
+    this.runningMapLimit = conf.getRunningMapLimit();
+    this.runningReduceLimit = conf.getRunningReduceLimit();
         
     MetricsContext metricsContext = MetricsUtil.getContext("mapred");
     this.jobMetrics = MetricsUtil.createRecord(metricsContext, "job");
@@ -1679,6 +1690,10 @@
     //
     this.clusterSize = clusterSize;
 
+    if (!belowRunningTaskLimit(tts, true)) {
+      return -1;
+    }
+    
     if (!shouldRunOnTaskTracker(taskTracker)) {
       return -1;
     }
@@ -1883,9 +1898,13 @@
 
     String taskTracker = tts.getTrackerName();
     TaskInProgress tip = null;
-    
+
     // Update the last-known clusterSize
     this.clusterSize = clusterSize;
+    
+    if (!belowRunningTaskLimit(tts, false)) {
+      return -1;
+    }
 
     if (!shouldRunOnTaskTracker(taskTracker)) {
       return -1;
@@ -1939,6 +1958,42 @@
     }
     return true;
   }
+  
+  /**
+   * Check whether we are below the running task limits (per node and cluster
+   * wide) for a given type of task on a given task tracker.
+   * 
+   * @param tts task tracker to check on
+   * @param map true if looking at map tasks, false for reduce tasks
+   * @return true if we are below both the cluster-wide and the per-node 
+   *         running task limit for the given type of task
+   */
+  private boolean belowRunningTaskLimit(TaskTrackerStatus tts, boolean map) {
+    int runningTasks = map ? runningMapTasks : runningReduceTasks;
+    int clusterLimit = map ? runningMapLimit : runningReduceLimit;
+    int perNodeLimit = map ? maxMapsPerNode  : maxReducesPerNode;
+    
+    // Check cluster-wide limit
+    if (clusterLimit != -1 && runningTasks >= clusterLimit) {
+      return false;
+    }
+    
+    // Check per-node limit
+    if (perNodeLimit != -1) {
+      int runningTasksOnNode = 0;
+      for (TaskStatus ts: tts.getTaskReports()) {
+        if (ts.getTaskID().getJobID().equals(jobId) && ts.getIsMap() == map &&
+            ts.getRunState().equals(TaskStatus.State.RUNNING)) {
+          runningTasksOnNode++;
+        }
+      }
+      if (runningTasksOnNode >= perNodeLimit) {
+        return false;
+      }
+    }
+    
+    return true;
+  }
 
   /**
    * A taskid assigned to this JobInProgress has reported in successfully.

Added: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRunningTaskLimits.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRunningTaskLimits.java?rev=781683&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRunningTaskLimits.java
(added)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRunningTaskLimits.java
Thu Jun  4 08:34:00 2009
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
+import org.apache.hadoop.mapred.UtilsForTests.RandomInputFormat;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+/**
+ * Test the running task limits - mapred.max.maps.per.node,
+ * mapred.max.reduces.per.node, mapred.max.running.maps and
+ * mapred.max.running.reduces.
+ */
+public class TestRunningTaskLimits extends TestCase {
+  private static final Log LOG = 
+    LogFactory.getLog(TestRunningTaskLimits.class);
+  private static final Path TEST_DIR = 
+    new Path(System.getProperty("test.build.data", "/tmp"), 
+             "test-running-task-limits");
+  
+  /**
+   * This test creates a cluster with 1 tasktracker with 3 map and 3 reduce 
+   * slots. We then submit a job with a limit of 2 maps and 1 reduce per
+   * node, and check that these limits are obeyed in launching tasks.
+   */
+  public void testPerNodeLimits() throws Exception {
+    LOG.info("Running testPerNodeLimits");
+    FileSystem fs = FileSystem.get(new Configuration());
+    fs.delete(TEST_DIR, true); // cleanup test dir
+    
+    // Create a cluster with 1 tasktracker with 3 map slots and 3 reduce slots
+    JobConf conf = new JobConf();
+    conf.setInt("mapred.tasktracker.map.tasks.maximum", 3);
+    conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 3);
+    MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
+    
+    // Create a job with limits of 3 maps/node and 2 reduces/node 
+    JobConf jobConf = createWaitJobConf(mr, "job1", 20, 20);
+    jobConf.setMaxMapsPerNode(2);
+    jobConf.setMaxReducesPerNode(1);
+    
+    // Submit the job
+    RunningJob rJob = (new JobClient(jobConf)).submitJob(jobConf);
+    
+    // Wait 20 seconds for it to start up
+    UtilsForTests.waitFor(20000);
+    
+    // Check the number of running tasks
+    JobTracker jobTracker = mr.getJobTrackerRunner().getJobTracker();
+    JobInProgress jip = jobTracker.getJob(rJob.getID());
+    assertEquals(2, jip.runningMaps());
+    assertEquals(1, jip.runningReduces());
+    
+    rJob.killJob();
+    mr.shutdown();
+  }
+  
+  /**
+   * This test creates a cluster with 2 tasktrackers with 3 map and 3 reduce 
+   * slots each. We then submit a job with a limit of 5 maps and 3 reduces
+   * cluster-wide, and check that these limits are obeyed in launching tasks.
+   */
+  public void testClusterWideLimits() throws Exception {
+    LOG.info("Running testClusterWideLimits");
+    FileSystem fs = FileSystem.get(new Configuration());
+    fs.delete(TEST_DIR, true); // cleanup test dir
+    
+    // Create a cluster with 2 tasktrackers with 3 map and reduce slots each
+    JobConf conf = new JobConf();
+    conf.setInt("mapred.tasktracker.map.tasks.maximum", 3);
+    conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 3);
+    MiniMRCluster mr = new MiniMRCluster(2, "file:///", 1, null, null, conf);
+    
+    // Create a job with limits of 10 maps and 5 reduces on the entire cluster 
+    JobConf jobConf = createWaitJobConf(mr, "job1", 20, 20);
+    jobConf.setRunningMapLimit(5);
+    jobConf.setRunningReduceLimit(3);
+    
+    // Submit the job
+    RunningJob rJob = (new JobClient(jobConf)).submitJob(jobConf);
+    
+    // Wait 20 seconds for it to start up
+    UtilsForTests.waitFor(20000);
+    
+    // Check the number of running tasks
+    JobTracker jobTracker = mr.getJobTrackerRunner().getJobTracker();
+    JobInProgress jip = jobTracker.getJob(rJob.getID());
+    assertEquals(5, jip.runningMaps());
+    assertEquals(3, jip.runningReduces());
+    
+    rJob.killJob();
+    mr.shutdown();
+  }
+  
+  /**
+   * This test creates a cluster with 2 tasktrackers with 3 map and 3 reduce 
+   * slots each. We then submit a job with a limit of 5 maps and 3 reduces
+   * cluster-wide, and 2 maps and 2 reduces per node. We should end up with
+   * 4 maps and 3 reduces running: the maps hit the per-node limit first,
+   * while the reduces hit the cluster-wide limit.
+   */
+  public void testClusterWideAndPerNodeLimits() throws Exception {
+    LOG.info("Running testClusterWideAndPerNodeLimits");
+    FileSystem fs = FileSystem.get(new Configuration());
+    fs.delete(TEST_DIR, true); // cleanup test dir
+    
+    // Create a cluster with 2 tasktrackers with 3 map and reduce slots each
+    JobConf conf = new JobConf();
+    conf.setInt("mapred.tasktracker.map.tasks.maximum", 3);
+    conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 3);
+    MiniMRCluster mr = new MiniMRCluster(2, "file:///", 1, null, null, conf);
+    
+    // Create a job with limits of 10 maps and 5 reduces on the entire cluster 
+    JobConf jobConf = createWaitJobConf(mr, "job1", 20, 20);
+    jobConf.setRunningMapLimit(5);
+    jobConf.setRunningReduceLimit(3);
+    jobConf.setMaxMapsPerNode(2);
+    jobConf.setMaxReducesPerNode(2);
+    
+    // Submit the job
+    RunningJob rJob = (new JobClient(jobConf)).submitJob(jobConf);
+    
+    // Wait 20 seconds for it to start up
+    UtilsForTests.waitFor(20000);
+    
+    // Check the number of running tasks
+    JobTracker jobTracker = mr.getJobTrackerRunner().getJobTracker();
+    JobInProgress jip = jobTracker.getJob(rJob.getID());
+    assertEquals(4, jip.runningMaps());
+    assertEquals(3, jip.runningReduces());
+    
+    rJob.killJob();
+    mr.shutdown();
+  }
+  
+  /**
+   * Create a JobConf for a job using the WaitingMapper and IdentityReducer,
+   * which will sleep until a signal file is created. In this test we never
+   * create the signal file so the job just occupies slots for the duration
+   * of the test as they are assigned to it. 
+   */
+  JobConf createWaitJobConf(MiniMRCluster mr, String jobName,
+      int numMaps, int numRed)
+  throws IOException {
+    JobConf jobConf = mr.createJobConf();
+    Path inDir = new Path(TEST_DIR, "input");
+    Path outDir = new Path(TEST_DIR, "output-" + jobName);
+    String signalFile = new Path(TEST_DIR, "signal").toString();
+    jobConf.setJobName(jobName);
+    jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
+    jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+    FileInputFormat.setInputPaths(jobConf, inDir);
+    FileOutputFormat.setOutputPath(jobConf, outDir);
+    jobConf.setMapperClass(UtilsForTests.WaitingMapper.class);
+    jobConf.setReducerClass(IdentityReducer.class);
+    jobConf.setOutputKeyClass(BytesWritable.class);
+    jobConf.setOutputValueClass(BytesWritable.class);
+    jobConf.setInputFormat(RandomInputFormat.class);
+    jobConf.setNumMapTasks(numMaps);
+    jobConf.setNumReduceTasks(numRed);
+    jobConf.setJar("build/test/mapred/testjar/testjob.jar");
+    jobConf.set(UtilsForTests.getTaskSignalParameter(true), signalFile);
+    jobConf.set(UtilsForTests.getTaskSignalParameter(false), signalFile);
+    // Disable reduce slow start to begin reduces ASAP
+    jobConf.setFloat("mapred.reduce.slowstart.completed.maps", 0.0f);
+    return jobConf;
+  }
+}



Mime
View raw message