hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r792700 - in /hadoop/mapreduce/trunk/src: java/ java/org/apache/hadoop/mapred/ test/mapred/org/apache/hadoop/mapred/
Date Thu, 09 Jul 2009 21:16:09 GMT
Author: omalley
Date: Thu Jul  9 21:16:07 2009
New Revision: 792700

URL: http://svn.apache.org/viewvc?rev=792700&view=rev
Log:
HADOOP-5170. Reverting patch.

Modified:
    hadoop/mapreduce/trunk/src/java/mapred-default.xml
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRunningTaskLimits.java

Modified: hadoop/mapreduce/trunk/src/java/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/mapred-default.xml?rev=792700&r1=792699&r2=792700&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/mapred-default.xml (original)
+++ hadoop/mapreduce/trunk/src/java/mapred-default.xml Thu Jul  9 21:16:07 2009
@@ -983,34 +983,6 @@
   </description>
 </property>
 
-<property>
-  <name>mapred.max.maps.per.node</name>
-  <value>-1</value>
-  <description>Per-node limit on running map tasks for the job. A value
-    of -1 signifies no limit.</description>
-</property>
-
-<property>
-  <name>mapred.max.reduces.per.node</name>
-  <value>-1</value>
-  <description>Per-node limit on running reduce tasks for the job. A value
-    of -1 signifies no limit.</description>
-</property>
-
-<property>
-  <name>mapred.running.map.limit</name>
-  <value>-1</value>
-  <description>Cluster-wide limit on running map tasks for the job. A value
-    of -1 signifies no limit.</description>
-</property>
-
-<property>
-  <name>mapred.running.reduce.limit</name>
-  <value>-1</value>
-  <description>Cluster-wide limit on running reduce tasks for the job. A value
-    of -1 signifies no limit.</description>
-</property>
-
 <!--  Node health script variables -->
 
 <property>

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=792700&r1=792699&r2=792700&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Thu Jul  9 21:16:07
2009
@@ -1430,78 +1430,6 @@
   }
   
   /**
-   * Get the per-node limit on running maps for the job
-   * 
-   * @return per-node running map limit
-   */
-  public int getMaxMapsPerNode() {
-    return getInt("mapred.max.maps.per.node", -1);
-  }
-  
-  /**
-   * Set the per-node limit on running maps for the job
-   * 
-   * @param limit per-node running map limit
-   */
-  public void setMaxMapsPerNode(int limit) {
-    setInt("mapred.max.maps.per.node", limit);
-  }
-  
-  /**
-   * Get the per-node limit on running reduces for the job
-   * 
-   * @return per-node running reduce limit
-   */
-  public int getMaxReducesPerNode() {
-    return getInt("mapred.max.reduces.per.node", -1);
-  }
-  
-  /**
-   * Set the per-node limit on running reduces for the job
-   * 
-   * @param limit per-node running reduce limit
-   */
-  public void setMaxReducesPerNode(int limit) {
-    setInt("mapred.max.reduces.per.node", limit);
-  }
-  
-  /**
-   * Get the cluster-wide limit on running maps for the job
-   * 
-   * @return cluster-wide running map limit
-   */
-  public int getRunningMapLimit() {
-    return getInt("mapred.running.map.limit", -1);
-  }
-  
-  /**
-   * Set the cluster-wide limit on running maps for the job
-   * 
-   * @param limit cluster-wide running map limit
-   */
-  public void setRunningMapLimit(int limit) {
-    setInt("mapred.running.map.limit", limit);
-  }
-  
-  /**
-   * Get the cluster-wide limit on running reduces for the job
-   * 
-   * @return cluster-wide running reduce limit
-   */
-  public int getRunningReduceLimit() {
-    return getInt("mapred.running.reduce.limit", -1);
-  }
-  
-  /**
-   * Set the cluster-wide limit on running reduces for the job
-   * 
-   * @param limit cluster-wide running reduce limit
-   */
-  public void setRunningReduceLimit(int limit) {
-    setInt("mapred.running.reduce.limit", limit);
-  }
-  
-  /**
    * Normalize the negative values in configuration
    * 
    * @param val

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=792700&r1=792699&r2=792700&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Thu Jul  9
21:16:07 2009
@@ -97,12 +97,6 @@
   int speculativeMapTasks = 0;
   int speculativeReduceTasks = 0;
   
-  // Limits on concurrent running tasks per-node and cluster-wide
-  int maxMapsPerNode;
-  int maxReducesPerNode;
-  int runningMapLimit;
-  int runningReduceLimit;
-  
   int mapFailuresPercent = 0;
   int reduceFailuresPercent = 0;
   int failedMapTIPs = 0;
@@ -358,11 +352,6 @@
 
     this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
     this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
-
-    this.maxMapsPerNode = conf.getMaxMapsPerNode();
-    this.maxReducesPerNode = conf.getMaxReducesPerNode();
-    this.runningMapLimit = conf.getRunningMapLimit();
-    this.runningReduceLimit = conf.getRunningReduceLimit();
         
     MetricsContext metricsContext = MetricsUtil.getContext("mapred");
     this.jobMetrics = MetricsUtil.createRecord(metricsContext, "job");
@@ -1985,10 +1974,6 @@
     //
     this.clusterSize = clusterSize;
 
-    if (!belowRunningTaskLimit(tts, true)) {
-      return -1;
-    }
-    
     if (!shouldRunOnTaskTracker(taskTrackerName)) {
       return -1;
     }
@@ -2172,13 +2157,9 @@
       return -1;
     }
     TaskInProgress tip = null;
-
+    
     // Update the last-known clusterSize
     this.clusterSize = clusterSize;
-    
-    if (!belowRunningTaskLimit(tts, false)) {
-      return -1;
-    }
 
     if (!shouldRunOnTaskTracker(taskTrackerName)) {
       return -1;
@@ -2403,42 +2384,6 @@
     }
     return true;
   }
-  
-  /**
-   * Check whether we are below the running task limits (per node and cluster
-   * wide) for a given type of task on a given task tracker.
-   * 
-   * @param tts task tracker to check on
-   * @param map true if looking at map tasks, false for reduce tasks
-   * @return true if we are below both the cluster-wide and the per-node 
-   *         running task limit for the given type of task
-   */
-  private boolean belowRunningTaskLimit(TaskTrackerStatus tts, boolean map) {
-    int runningTasks = map ? runningMapTasks : runningReduceTasks;
-    int clusterLimit = map ? runningMapLimit : runningReduceLimit;
-    int perNodeLimit = map ? maxMapsPerNode  : maxReducesPerNode;
-    
-    // Check cluster-wide limit
-    if (clusterLimit != -1 && runningTasks >= clusterLimit) {
-      return false;
-    }
-    
-    // Check per-node limit
-    if (perNodeLimit != -1) {
-      int runningTasksOnNode = 0;
-      for (TaskStatus ts: tts.getTaskReports()) {
-        if (ts.getTaskID().getJobID().equals(jobId) && ts.getIsMap() == map &&
-            ts.getRunState().equals(TaskStatus.State.RUNNING)) {
-          runningTasksOnNode++;
-        }
-      }
-      if (runningTasksOnNode >= perNodeLimit) {
-        return false;
-      }
-    }
-    
-    return true;
-  }
 
   /**
    * A taskid assigned to this JobInProgress has reported in successfully.

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java?rev=792700&r1=792699&r2=792700&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
Thu Jul  9 21:16:07 2009
@@ -85,10 +85,6 @@
       this.profile = new JobProfile(jc.getUser(), jobid, 
           jobFile.toString(), null, jc.getJobName(),
           jc.getQueueName());
-      this.maxMapsPerNode = jc.getMaxMapsPerNode();
-      this.maxReducesPerNode = jc.getMaxReducesPerNode();
-      this.runningMapLimit = jc.getRunningMapLimit();
-      this.runningReduceLimit = jc.getRunningReduceLimit();
     }
 
     @Override

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRunningTaskLimits.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRunningTaskLimits.java?rev=792700&r1=792699&r2=792700&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRunningTaskLimits.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRunningTaskLimits.java
Thu Jul  9 21:16:07 2009
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import java.io.IOException;
-
-import junit.framework.TestCase;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
-import org.apache.hadoop.mapred.UtilsForTests.RandomInputFormat;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
-
-/**
- * Test the running task limits - mapred.max.maps.per.node,
- * mapred.max.reduces.per.node, mapred.max.running.maps and
- * mapred.max.running.reduces.
- */
-public class TestRunningTaskLimits extends TestCase {
-  private static final Log LOG = 
-    LogFactory.getLog(TestRunningTaskLimits.class);
-  private static final Path TEST_DIR = 
-    new Path(System.getProperty("test.build.data", "/tmp"), 
-             "test-running-task-limits");
-  
-  /**
-   * This test creates a cluster with 1 tasktracker with 3 map and 3 reduce 
-   * slots. We then submit a job with a limit of 2 maps and 1 reduce per
-   * node, and check that these limits are obeyed in launching tasks.
-   */
-  public void testPerNodeLimits() throws Exception {
-    LOG.info("Running testPerNodeLimits");
-    FileSystem fs = FileSystem.get(new Configuration());
-    fs.delete(TEST_DIR, true); // cleanup test dir
-    
-    // Create a cluster with 1 tasktracker with 3 map slots and 3 reduce slots
-    JobConf conf = new JobConf();
-    conf.setInt("mapred.tasktracker.map.tasks.maximum", 3);
-    conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 3);
-    MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
-    
-    // Create a job with limits of 3 maps/node and 2 reduces/node 
-    JobConf jobConf = createWaitJobConf(mr, "job1", 20, 20);
-    jobConf.setMaxMapsPerNode(2);
-    jobConf.setMaxReducesPerNode(1);
-    
-    // Submit the job
-    RunningJob rJob = (new JobClient(jobConf)).submitJob(jobConf);
-    
-    // Wait 20 seconds for it to start up
-    UtilsForTests.waitFor(20000);
-    
-    // Check the number of running tasks
-    JobTracker jobTracker = mr.getJobTrackerRunner().getJobTracker();
-    JobInProgress jip = jobTracker.getJob(rJob.getID());
-    assertEquals(2, jip.runningMaps());
-    assertEquals(1, jip.runningReduces());
-    
-    rJob.killJob();
-    mr.shutdown();
-  }
-  
-  /**
-   * This test creates a cluster with 2 tasktrackers with 3 map and 3 reduce 
-   * slots each. We then submit a job with a limit of 5 maps and 3 reduces
-   * cluster-wide, and check that these limits are obeyed in launching tasks.
-   */
-  public void testClusterWideLimits() throws Exception {
-    LOG.info("Running testClusterWideLimits");
-    FileSystem fs = FileSystem.get(new Configuration());
-    fs.delete(TEST_DIR, true); // cleanup test dir
-    
-    // Create a cluster with 2 tasktrackers with 3 map and reduce slots each
-    JobConf conf = new JobConf();
-    conf.setInt("mapred.tasktracker.map.tasks.maximum", 3);
-    conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 3);
-    MiniMRCluster mr = new MiniMRCluster(2, "file:///", 1, null, null, conf);
-    
-    // Create a job with limits of 10 maps and 5 reduces on the entire cluster 
-    JobConf jobConf = createWaitJobConf(mr, "job1", 20, 20);
-    jobConf.setRunningMapLimit(5);
-    jobConf.setRunningReduceLimit(3);
-    
-    // Submit the job
-    RunningJob rJob = (new JobClient(jobConf)).submitJob(jobConf);
-    
-    // Wait 20 seconds for it to start up
-    UtilsForTests.waitFor(20000);
-    
-    // Check the number of running tasks
-    JobTracker jobTracker = mr.getJobTrackerRunner().getJobTracker();
-    JobInProgress jip = jobTracker.getJob(rJob.getID());
-    assertEquals(5, jip.runningMaps());
-    assertEquals(3, jip.runningReduces());
-    
-    rJob.killJob();
-    mr.shutdown();
-  }
-  
-  /**
-   * This test creates a cluster with 2 tasktrackers with 3 map and 3 reduce 
-   * slots each. We then submit a job with a limit of 5 maps and 3 reduces
-   * cluster-wide, and 2 maps and 2 reduces per node. We should end up with
-   * 4 maps and 3 reduces running: the maps hit the per-node limit first,
-   * while the reduces hit the cluster-wide limit.
-   */
-  public void testClusterWideAndPerNodeLimits() throws Exception {
-    LOG.info("Running testClusterWideAndPerNodeLimits");
-    FileSystem fs = FileSystem.get(new Configuration());
-    fs.delete(TEST_DIR, true); // cleanup test dir
-    
-    // Create a cluster with 2 tasktrackers with 3 map and reduce slots each
-    JobConf conf = new JobConf();
-    conf.setInt("mapred.tasktracker.map.tasks.maximum", 3);
-    conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 3);
-    MiniMRCluster mr = new MiniMRCluster(2, "file:///", 1, null, null, conf);
-    
-    // Create a job with limits of 10 maps and 5 reduces on the entire cluster 
-    JobConf jobConf = createWaitJobConf(mr, "job1", 20, 20);
-    jobConf.setRunningMapLimit(5);
-    jobConf.setRunningReduceLimit(3);
-    jobConf.setMaxMapsPerNode(2);
-    jobConf.setMaxReducesPerNode(2);
-    
-    // Submit the job
-    RunningJob rJob = (new JobClient(jobConf)).submitJob(jobConf);
-    
-    // Wait 20 seconds for it to start up
-    UtilsForTests.waitFor(20000);
-    
-    // Check the number of running tasks
-    JobTracker jobTracker = mr.getJobTrackerRunner().getJobTracker();
-    JobInProgress jip = jobTracker.getJob(rJob.getID());
-    assertEquals(4, jip.runningMaps());
-    assertEquals(3, jip.runningReduces());
-    
-    rJob.killJob();
-    mr.shutdown();
-  }
-  
-  /**
-   * Create a JobConf for a job using the WaitingMapper and IdentityReducer,
-   * which will sleep until a signal file is created. In this test we never
-   * create the signal file so the job just occupies slots for the duration
-   * of the test as they are assigned to it. 
-   */
-  JobConf createWaitJobConf(MiniMRCluster mr, String jobName,
-      int numMaps, int numRed)
-  throws IOException {
-    JobConf jobConf = mr.createJobConf();
-    Path inDir = new Path(TEST_DIR, "input");
-    Path outDir = new Path(TEST_DIR, "output-" + jobName);
-    String signalFile = new Path(TEST_DIR, "signal").toString();
-    jobConf.setJobName(jobName);
-    jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
-    jobConf.setOutputFormat(SequenceFileOutputFormat.class);
-    FileInputFormat.setInputPaths(jobConf, inDir);
-    FileOutputFormat.setOutputPath(jobConf, outDir);
-    jobConf.setMapperClass(UtilsForTests.WaitingMapper.class);
-    jobConf.setReducerClass(IdentityReducer.class);
-    jobConf.setOutputKeyClass(BytesWritable.class);
-    jobConf.setOutputValueClass(BytesWritable.class);
-    jobConf.setInputFormat(RandomInputFormat.class);
-    jobConf.setNumMapTasks(numMaps);
-    jobConf.setNumReduceTasks(numRed);
-    jobConf.setJar("build/test/mapred/testjar/testjob.jar");
-    jobConf.set(UtilsForTests.getTaskSignalParameter(true), signalFile);
-    jobConf.set(UtilsForTests.getTaskSignalParameter(false), signalFile);
-    // Disable reduce slow start to begin reduces ASAP
-    jobConf.setFloat("mapred.reduce.slowstart.completed.maps", 0.0f);
-    return jobConf;
-  }
-}



Mime
View raw message