hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r732231 - in /hadoop/core/trunk: ./ src/contrib/capacity-scheduler/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Date Wed, 07 Jan 2009 05:51:04 GMT
Author: yhemanth
Date: Tue Jan  6 21:51:03 2009
New Revision: 732231

URL: http://svn.apache.org/viewvc?rev=732231&view=rev
Log:
HADOOP-4830. Add end-to-end test cases for testing queue capacities. Contributed by Vinod Kumar Vavilapalli.

Added:
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ClusterWithCapacityScheduler.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestQueueCapacities.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ControlledMapReduceJob.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestControlledMapReduceJob.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/capacity-scheduler/ivy.xml
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=732231&r1=732230&r2=732231&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Jan  6 21:51:03 2009
@@ -318,6 +318,9 @@
     HADOOP-4948. Add parameters java5.home and forrest.home to the ant commands
     in test-patch.sh.  (Giridharan Kesavan via szetszwo)
 
+    HADOOP-4830. Add end-to-end test cases for testing queue capacities.
+    (Vinod Kumar Vavilapalli via yhemanth)
+
   OPTIMIZATIONS
 
     HADOOP-3293. Fixes FileInputFormat to do provide locations for splits

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/ivy.xml?rev=732231&r1=732230&r2=732231&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/ivy.xml (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/ivy.xml Tue Jan  6 21:51:03 2009
@@ -36,5 +36,21 @@
       name="log4j"
       rev="${log4j.version}"
       conf="common->master"/>
-    </dependencies>
+    <dependency org="org.mortbay.jetty"
+      name="jetty-util"
+      rev="${jetty-util.version}"
+      conf="common->master"/>
+    <dependency org="org.mortbay.jetty"
+      name="jetty"
+      rev="${jetty.version}"
+      conf="common->master"/>
+    <dependency org="org.mortbay.jetty"
+      name="servlet-api-2.5"
+      rev="${servlet-api-2.5.version}"
+      conf="common->master"/> 
+    <dependency org="commons-httpclient"
+      name="commons-httpclient"
+      rev="${commons-httpclient.version}"
+      conf="common->master"/> 
+  </dependencies>
 </ivy-module>

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=732231&r1=732230&r2=732231&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Tue Jan  6 21:51:03 2009
@@ -286,7 +286,7 @@
     initializeDefaults();
   }
   
-  private static final String toFullPropertyName(String queue, 
+  static final String toFullPropertyName(String queue, 
                                                   String property) {
       return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
   }

Added: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ClusterWithCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ClusterWithCapacityScheduler.java?rev=732231&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ClusterWithCapacityScheduler.java (added)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ClusterWithCapacityScheduler.java Tue Jan  6 21:51:03 2009
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Enumeration;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+/**
+ * A test-cluster based on {@link MiniMRCluster} that is started with
+ * CapacityTaskScheduler. It provides knobs to configure both the cluster as
+ * well as the scheduler. Any test that intends to test capacity-scheduler
+ * should extend this.
+ * 
+ */
+public class ClusterWithCapacityScheduler extends TestCase {
+
+  static final Log LOG = LogFactory.getLog(ClusterWithCapacityScheduler.class);
+  private MiniMRCluster mrCluster;
+  private MiniDFSCluster dfsCluster;
+
+  private JobConf jobConf;
+
+  static final String MY_SCHEDULER_CONF_PATH_PROPERTY = "my.resource.path";
+
+  protected void startCluster()
+      throws IOException {
+    startCluster(null, null);
+  }
+
+  /**
+   * Start the cluster with two TaskTrackers and two DataNodes and configure the
+   * cluster with clusterProperties and the scheduler with schedulerProperties.
+   * Uses default configuration whenever user provided properties are missing
+   * (null/empty)
+   * 
+   * @param clusterProperties
+   * @param schedulerProperties
+   * @throws IOException
+   */
+  protected void startCluster(Properties clusterProperties,
+      Properties schedulerProperties)
+      throws IOException {
+    startCluster(2, 2, clusterProperties, schedulerProperties);
+  }
+
+  /**
+   * Start the cluster with numTaskTrackers TaskTrackers and numDataNodes
+   * DataNodes and configure the cluster with clusterProperties and the
+   * scheduler with schedulerProperties. Uses default configuration whenever
+   * user provided properties are missing (null/empty)
+   * 
+   * @param numTaskTrackers
+   * @param numDataNodes
+   * @param clusterProperties
+   * @param schedulerProperties
+   * @throws IOException
+   */
+  protected void startCluster(int numTaskTrackers, int numDataNodes,
+      Properties clusterProperties, Properties schedulerProperties)
+      throws IOException {
+    Thread.currentThread().setContextClassLoader(
+        new ClusterWithCapacityScheduler.MyClassLoader());
+    JobConf clusterConf = new JobConf();
+    if (clusterProperties != null) {
+      for (Enumeration<?> e = clusterProperties.propertyNames(); e
+          .hasMoreElements();) {
+        String key = (String) e.nextElement();
+        clusterConf.set(key, (String) clusterProperties.get(key));
+      }
+    }
+    dfsCluster = new MiniDFSCluster(clusterConf, numDataNodes, true, null);
+
+    if (schedulerProperties != null) {
+      setUpSchedulerConfigFile(schedulerProperties);
+    }
+
+    clusterConf.set("mapred.jobtracker.taskScheduler",
+        CapacityTaskScheduler.class.getName());
+    mrCluster =
+        new MiniMRCluster(numTaskTrackers, dfsCluster.getFileSystem().getUri()
+            .toString(), 1, null, null, clusterConf);
+
+    this.jobConf = mrCluster.createJobConf(clusterConf);
+  }
+
+  private void setUpSchedulerConfigFile(Properties schedulerConfProps)
+      throws IOException {
+    Configuration config = new Configuration(false);
+
+    LocalFileSystem fs = new LocalFileSystem();
+    fs.setConf(config);
+    // The above call doesn't set the configuration for the underlying
+    // RawFileSystem. Explicitly doing it.
+    fs.getRawFileSystem().setConf(config);
+
+    String myResourcePath = System.getProperty("test.build.data");
+    Path schedulerConfigFilePath =
+        new Path(myResourcePath, CapacitySchedulerConf.SCHEDULER_CONF_FILE);
+    OutputStream out = fs.create(schedulerConfigFilePath);
+
+    for (Enumeration<?> e = schedulerConfProps.propertyNames(); e
+        .hasMoreElements();) {
+      String key = (String) e.nextElement();
+      LOG.debug("Adding " + key + schedulerConfProps.getProperty(key));
+      config.set(key, schedulerConfProps.getProperty(key));
+    }
+
+    config.writeXml(out);
+    out.close();
+
+    LOG.info("setting resource path where capacity-scheduler's config file "
+        + "is placed to " + myResourcePath);
+    System.setProperty(MY_SCHEDULER_CONF_PATH_PROPERTY, myResourcePath);
+  }
+
+  private void cleanUpSchedulerConfigFile() throws IOException {
+    Configuration config = new Configuration(false);
+
+    LocalFileSystem fs = new LocalFileSystem();
+    fs.setConf(config);
+    fs.getRawFileSystem().setConf(config);
+
+    String myResourcePath = System.getProperty("test.build.data");
+    Path schedulerConfigFilePath =
+        new Path(myResourcePath, CapacitySchedulerConf.SCHEDULER_CONF_FILE);
+    fs.delete(schedulerConfigFilePath, false);
+  }
+
+  protected JobConf getJobConf() {
+    return this.jobConf;
+  }
+
+  protected JobTracker getJobTracker() {
+    return this.mrCluster.getJobTrackerRunner().getJobTracker();
+  }
+
+  @Override
+  protected void tearDown()
+      throws Exception {
+    cleanUpSchedulerConfigFile();
+    
+    if (mrCluster != null) {
+      mrCluster.shutdown();
+    }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+    }
+  }
+
+  /**
+   * Wait till all the slots in the cluster are occupied with respect to the
+   * tasks of type specified isMap.
+   * 
+   * <p>
+   * 
+   * <b>Also, it is assumed that the tasks won't finish any time soon, like in
+   * the case of tasks of {@link ControlledMapReduceJob}</b>.
+   * 
+   * @param isMap
+   */
+  protected void waitTillAllSlotsAreOccupied(boolean isMap)
+      throws InterruptedException {
+    JobTracker jt = this.mrCluster.getJobTrackerRunner().getJobTracker();
+    ClusterStatus clusterStatus = jt.getClusterStatus();
+    int currentTasks =
+        (isMap ? clusterStatus.getMapTasks() : clusterStatus.getReduceTasks());
+    int maxTasks =
+        (isMap ? clusterStatus.getMaxMapTasks() : clusterStatus
+            .getMaxReduceTasks());
+    while (currentTasks != maxTasks) {
+      Thread.sleep(1000);
+      clusterStatus = jt.getClusterStatus();
+      currentTasks =
+          (isMap ? clusterStatus.getMapTasks() : clusterStatus
+              .getReduceTasks());
+      maxTasks =
+          (isMap ? clusterStatus.getMaxMapTasks() : clusterStatus
+              .getMaxReduceTasks());
+      LOG.info("Waiting till cluster reaches steady state. currentTasks : "
+          + currentTasks + " total cluster capacity : " + maxTasks);
+    }
+  }
+
+  static class MyClassLoader extends ClassLoader {
+    @Override
+    public URL getResource(String name) {
+      if (!name.equals(CapacitySchedulerConf.SCHEDULER_CONF_FILE)) {
+        return super.getResource(name);
+      }
+      return findResource(name);
+    }
+
+    @Override
+    protected URL findResource(String name) {
+      try {
+        String resourcePath =
+            System
+                .getProperty(ClusterWithCapacityScheduler.MY_SCHEDULER_CONF_PATH_PROPERTY);
+        // Check the resourcePath directory
+        File file = new File(resourcePath, name);
+        if (file.exists()) {
+          return new URL("file://" + file.getAbsolutePath());
+        }
+      } catch (MalformedURLException mue) {
+        LOG.warn("exception : " + mue);
+      }
+      return super.findResource(name);
+    }
+  }
+}
\ No newline at end of file

Added: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestQueueCapacities.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestQueueCapacities.java?rev=732231&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestQueueCapacities.java (added)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestQueueCapacities.java Tue Jan  6 21:51:03 2009
@@ -0,0 +1,440 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.Properties;
+import org.apache.hadoop.mapred.ControlledMapReduceJob.ControlledMapReduceJobRunner;
+
+/**
+ * End to end tests based on MiniMRCluster to verify that queue capacities are
+ * honored. Automates the tests related to queue capacities: submits jobs to
+ * different queues simultaneously and ensures that capacities are honored
+ */
+public class TestQueueCapacities extends ClusterWithCapacityScheduler {
+
+  /**
+   * Test single queue.
+   * 
+   * <p>
+   * 
+   * Submit a job with more M/R tasks than total capacity. Full queue capacity
+   * should be utilized and remaining M/R tasks should wait for slots to be
+   * available.
+   * 
+   * @throws Exception
+   */
+  public void testSingleQueue()
+      throws Exception {
+
+    Properties schedulerProps = new Properties();
+    schedulerProps.put(
+        "mapred.capacity-scheduler.queue.default.guaranteed-capacity", "100");
+    Properties clusterProps = new Properties();
+    clusterProps
+        .put("mapred.tasktracker.map.tasks.maximum", String.valueOf(3));
+    clusterProps.put("mapred.tasktracker.reduce.tasks.maximum", String
+        .valueOf(3));
+    // cluster capacity 12 maps, 12 reduces
+    startCluster(4, 2, clusterProps, schedulerProps);
+
+    ControlledMapReduceJobRunner jobRunner =
+        ControlledMapReduceJobRunner.getControlledMapReduceJobRunner(
+            getJobConf(), 16, 16);
+    jobRunner.start();
+    ControlledMapReduceJob controlledJob = jobRunner.getJob();
+    JobID myJobID = jobRunner.getJobID();
+    JobInProgress myJob = getJobTracker().getJob(myJobID);
+
+    ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 12);
+
+    // Wait till the cluster reaches steady state. This confirms that the rest
+    // of the tasks are not running and waiting for slots
+    // to be freed.
+    waitTillAllSlotsAreOccupied(true);
+
+    LOG.info("Trying to finish 2 maps");
+    controlledJob.finishNTasks(true, 2);
+    ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, true, 2);
+    assertTrue("Number of maps finished", myJob.finishedMaps() == 2);
+    ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 12);
+    waitTillAllSlotsAreOccupied(true);
+
+    LOG.info("Trying to finish 2 more maps");
+    controlledJob.finishNTasks(true, 2);
+    ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, true, 4);
+    assertTrue("Number of maps finished", myJob.finishedMaps() == 4);
+    ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 12);
+    waitTillAllSlotsAreOccupied(true);
+
+    LOG.info("Trying to finish the last 12 maps");
+    controlledJob.finishNTasks(true, 12);
+    ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, true, 16);
+    assertTrue("Number of maps finished", myJob.finishedMaps() == 16);
+    ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 0);
+    ControlledMapReduceJob.haveAllTasksFinished(myJob, true);
+
+    ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, false, 12);
+    waitTillAllSlotsAreOccupied(false);
+
+    LOG.info("Trying to finish 4 reduces");
+    controlledJob.finishNTasks(false, 4);
+    ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, false, 4);
+    assertTrue("Number of reduces finished", myJob.finishedReduces() == 4);
+    ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, false, 12);
+    waitTillAllSlotsAreOccupied(false);
+
+    LOG.info("Trying to finish the last 12 reduces");
+    controlledJob.finishNTasks(false, 12);
+    ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, false, 16);
+    assertTrue("Number of reduces finished", myJob.finishedReduces() == 16);
+    ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, false, 0);
+    ControlledMapReduceJob.haveAllTasksFinished(myJob, false);
+
+    jobRunner.join();
+  }
+
+  /**
+   * Test single queue with multiple jobs.
+   * 
+   * @throws Exception
+   */
+  public void testSingleQueueMultipleJobs()
+      throws Exception {
+
+    Properties schedulerProps = new Properties();
+    schedulerProps.put(
+        "mapred.capacity-scheduler.queue.default.guaranteed-capacity", "100");
+    Properties clusterProps = new Properties();
+    clusterProps
+        .put("mapred.tasktracker.map.tasks.maximum", String.valueOf(3));
+    clusterProps.put("mapred.tasktracker.reduce.tasks.maximum", String
+        .valueOf(0));
+    // cluster capacity 12 maps, 0 reduces
+    startCluster(4, 2, clusterProps, schedulerProps);
+
+    singleQMultipleJobs1();
+    singleQMultipleJobs2();
+  }
+
+  /**
+   * Test multiple queues.
+   * 
+   * These tests use 4 queues default, Q2, Q3 and Q4 with guaranteed capacities
+   * 10, 20, 30, 40 respectively), user limit 100%, priority not respected, one
+   * user per queue. Reclaim time 5 minutes.
+   * 
+   * @throws Exception
+   */
+  public void testMultipleQueues()
+      throws Exception {
+    Properties schedulerProps = new Properties();
+    String[] queues = new String[] { "default", "Q2", "Q3", "Q4" };
+    int GC = 0;
+    for (String q : queues) {
+      GC += 10;
+      schedulerProps.put(CapacitySchedulerConf.toFullPropertyName(q,
+          "guaranteed-capacity"), String.valueOf(GC)); // TODO: use strings
+      schedulerProps.put(CapacitySchedulerConf.toFullPropertyName(q,
+          "minimum-user-limit-percent"), String.valueOf(100));
+      schedulerProps.put(CapacitySchedulerConf.toFullPropertyName(q,
+          "reclaim-time-limit"), String.valueOf(300));
+    }
+
+    Properties clusterProps = new Properties();
+    clusterProps
+        .put("mapred.tasktracker.map.tasks.maximum", String.valueOf(2));
+    clusterProps.put("mapred.tasktracker.reduce.tasks.maximum", String
+        .valueOf(2));
+    clusterProps.put("mapred.queue.names", queues[0] + "," + queues[1] + ","
+        + queues[2] + "," + queues[3]);
+
+    // cluster capacity 10 maps, 10 reduces and 4 queues with capacities 1, 2,
+    // 3, 4 respectively.
+    startCluster(5, 2, clusterProps, schedulerProps);
+
+    multipleQsWithOneQBeyondCapacity(queues);
+    multipleQueuesWithinCapacities(queues);
+  }
+
+  /**
+   * Submit a job with more M/R tasks than total queue capacity and then submit
+   * another job. First job utilizes all the slots. When the second job is
+   * submitted, the tasks of the second job wait for slots to be available. As
+   * the tasks of the first jobs finish and there are no more tasks pending, the
+   * tasks of the second job start running on the freed up slots.
+   * 
+   * @throws Exception
+   */
+  private void singleQMultipleJobs1()
+      throws Exception {
+
+    ControlledMapReduceJobRunner jobRunner1 =
+        ControlledMapReduceJobRunner.getControlledMapReduceJobRunner(
+            getJobConf(), 16, 0);
+    ControlledMapReduceJobRunner jobRunner2 =
+        ControlledMapReduceJobRunner.getControlledMapReduceJobRunner(
+            getJobConf(), 12, 0);
+    jobRunner1.start();
+    ControlledMapReduceJob controlledJob1 = jobRunner1.getJob();
+    JobID jobID1 = jobRunner1.getJobID();
+    JobInProgress jip1 = getJobTracker().getJob(jobID1);
+
+    ControlledMapReduceJob.waitTillNTasksStartRunning(jip1, true, 12);
+
+    // Confirm that the rest of the tasks are not running and waiting for slots
+    // to be freed.
+    waitTillAllSlotsAreOccupied(true);
+
+    // Now start the second job.
+    jobRunner2.start();
+    JobID jobID2 = jobRunner2.getJobID();
+    ControlledMapReduceJob controlledJob2 = jobRunner2.getJob();
+    JobInProgress jip2 = getJobTracker().getJob(jobID2);
+
+    LOG.info("Trying to finish 2 map");
+    controlledJob1.finishNTasks(true, 2);
+    ControlledMapReduceJob.waitTillNTotalTasksFinish(jip1, true, 2);
+    assertTrue("Number of maps finished", jip1.finishedMaps() == 2);
+    ControlledMapReduceJob.waitTillNTasksStartRunning(jip1, true, 12);
+    waitTillAllSlotsAreOccupied(true);
+
+    LOG.info("Trying to finish 2 more maps");
+    controlledJob1.finishNTasks(true, 2);
+    ControlledMapReduceJob.waitTillNTotalTasksFinish(jip1, true, 4);
+    assertTrue("Number of maps finished", jip1.finishedMaps() == 4);
+    ControlledMapReduceJob.waitTillNTasksStartRunning(jip1, true, 12);
+    waitTillAllSlotsAreOccupied(true);
+
+    // All tasks of Job1 started running/finished. Now job2 should start
+    LOG.info("Trying to finish 2 more maps");
+    controlledJob1.finishNTasks(true, 2);
+    ControlledMapReduceJob.waitTillNTotalTasksFinish(jip1, true, 6);
+    assertTrue("Number of maps finished", jip1.finishedMaps() == 6);
+    ControlledMapReduceJob.waitTillNTasksStartRunning(jip1, true, 10);
+    ControlledMapReduceJob.waitTillNTasksStartRunning(jip2, true, 2);
+    waitTillAllSlotsAreOccupied(true);
+    ControlledMapReduceJob.assertNumTasksRunning(jip1, true, 10);
+    ControlledMapReduceJob.assertNumTasksRunning(jip2, true, 2);
+
+    LOG.info("Trying to finish 10 more maps and hence job1");
+    controlledJob1.finishNTasks(true, 10);
+    ControlledMapReduceJob.waitTillNTotalTasksFinish(jip1, true, 16);
+    assertTrue("Number of maps finished", jip1.finishedMaps() == 16);
+    ControlledMapReduceJob.waitTillNTasksStartRunning(jip2, true, 12);
+    controlledJob1.finishJob();
+    waitTillAllSlotsAreOccupied(true);
+    ControlledMapReduceJob.assertNumTasksRunning(jip1, true, 0);
+    ControlledMapReduceJob.assertNumTasksRunning(jip2, true, 12);
+
+    // Finish job2 also
+    controlledJob2.finishJob();
+    ControlledMapReduceJob.waitTillNTotalTasksFinish(jip2, true, 12);
+    ControlledMapReduceJob.assertNumTasksRunning(jip2, true, 0);
+
+    jobRunner1.join();
+    jobRunner2.join();
+  }
+
+  /**
+   * Submit a job with less M/R tasks than total capacity and another job with
+   * more M/R tasks than the remaining capacity. First job should utilize the
+   * required slots and other job should utilize the available slots and its
+   * remaining tasks wait for slots to become free.
+   * 
+   * @throws Exception
+   */
+  private void singleQMultipleJobs2()
+      throws Exception {
+
+    ControlledMapReduceJobRunner jobRunner1 =
+        ControlledMapReduceJobRunner.getControlledMapReduceJobRunner(
+            getJobConf(), 8, 0);
+    ControlledMapReduceJobRunner jobRunner2 =
+        ControlledMapReduceJobRunner.getControlledMapReduceJobRunner(
+            getJobConf(), 12, 0);
+    jobRunner1.start();
+    ControlledMapReduceJob controlledJob1 = jobRunner1.getJob();
+    JobID jobID1 = jobRunner1.getJobID();
+    JobInProgress jip1 = getJobTracker().getJob(jobID1);
+
+    ControlledMapReduceJob.waitTillNTasksStartRunning(jip1, true, 8);
+    ControlledMapReduceJob.assertNumTasksRunning(jip1, true, 8);
+
+    // Now start the second job.
+    jobRunner2.start();
+    JobID jobID2 = jobRunner2.getJobID();
+    ControlledMapReduceJob controlledJob2 = jobRunner2.getJob();
+    JobInProgress jip2 = getJobTracker().getJob(jobID2);
+
+    ControlledMapReduceJob.waitTillNTasksStartRunning(jip2, true, 4);
+    waitTillAllSlotsAreOccupied(true);
+    ControlledMapReduceJob.assertNumTasksRunning(jip1, true, 8);
+    // The rest of the tasks of job2 should wait.
+    ControlledMapReduceJob.assertNumTasksRunning(jip2, true, 4);
+
+    LOG.info("Trying to finish 2 maps of job1");
+    controlledJob1.finishNTasks(true, 2);
+    ControlledMapReduceJob.waitTillNTotalTasksFinish(jip1, true, 2);
+    assertTrue("Number of maps finished", jip1.finishedMaps() == 2);
+    ControlledMapReduceJob.waitTillNTasksStartRunning(jip1, true, 6);
+    ControlledMapReduceJob.waitTillNTasksStartRunning(jip2, true, 6);
+    waitTillAllSlotsAreOccupied(true);
+    ControlledMapReduceJob.assertNumTasksRunning(jip1, true, 6);
+    ControlledMapReduceJob.assertNumTasksRunning(jip2, true, 6);
+
+    LOG.info("Trying to finish 6 more maps of job1");
+    controlledJob1.finishNTasks(true, 6);
+    ControlledMapReduceJob.waitTillNTotalTasksFinish(jip1, true, 8);
+    assertTrue("Number of maps finished", jip1.finishedMaps() == 8);
+    ControlledMapReduceJob.waitTillNTasksStartRunning(jip2, true, 12);
+    waitTillAllSlotsAreOccupied(true);
+    ControlledMapReduceJob.assertNumTasksRunning(jip1, true, 0);
+    ControlledMapReduceJob.assertNumTasksRunning(jip2, true, 12);
+
+    // Finish job2 also
+    controlledJob2.finishJob();
+
+    ControlledMapReduceJob.waitTillNTotalTasksFinish(jip2, true, 12);
+    ControlledMapReduceJob.assertNumTasksRunning(jip2, true, 0);
+
+    jobRunner1.join();
+    jobRunner2.join();
+  }
+
+  /**
+   * Test to verify running of tasks in a queue going over its capacity. In
+   * queue default, user U1 starts a job J1, having more M/R tasks than the
+   * total slots. M/R tasks of job J1 should start running on all the nodes (100
+   * % utilization).
+   * 
+   * @throws Exception
+   */
+  private void multipleQsWithOneQBeyondCapacity(String[] queues)
+      throws Exception {
+
+    JobConf conf = getJobConf();
+    conf.setQueueName(queues[0]);
+    conf.setUser("U1");
+    ControlledMapReduceJobRunner jobRunner =
+        ControlledMapReduceJobRunner.getControlledMapReduceJobRunner(conf, 15,
+            0);
+    jobRunner.start();
+    ControlledMapReduceJob controlledJob = jobRunner.getJob();
+    JobID myJobID = jobRunner.getJobID();
+    JobInProgress myJob = getJobTracker().getJob(myJobID);
+
+    ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 10);
+
+    // Confirm that the rest of the tasks are not running and waiting for slots
+    // to be freed.
+    waitTillAllSlotsAreOccupied(true);
+    ControlledMapReduceJob.assertNumTasksRunning(myJob, true, 10);
+
+    LOG.info("Trying to finish 3 maps");
+    controlledJob.finishNTasks(true, 3);
+    ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, true, 3);
+    assertTrue("Number of maps finished", myJob.finishedMaps() == 3);
+    ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 10);
+    waitTillAllSlotsAreOccupied(true);
+    ControlledMapReduceJob.assertNumTasksRunning(myJob, true, 10);
+
+    LOG.info("Trying to finish 2 more maps");
+    controlledJob.finishNTasks(true, 2);
+    ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, true, 5);
+    assertTrue("Number of maps finished", myJob.finishedMaps() == 5);
+    ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 10);
+    waitTillAllSlotsAreOccupied(true);
+    ControlledMapReduceJob.assertNumTasksRunning(myJob, true, 10);
+
+    // Finish job
+    controlledJob.finishJob();
+    ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, true, 15);
+    ControlledMapReduceJob.assertNumTasksRunning(myJob, true, 0);
+    jobRunner.join();
+  }
+
+  /**
+   * Test to verify queue capacities across multiple queues. In this test, jobs
+   * are submitted to different queues - all below the queue's capacity and
+   * verifies that all the jobs are running. This will test code paths related
+   * to job initialization, considering multiple queues for scheduling jobs etc.
+   * 
+   * <p>
+   * 
+   * One user per queue. Four jobs are submitted to the four queues such that
+   * they exactly fill up the queues. No queue should be beyond capacity. All
+   * jobs should be running.
+   * 
+   * @throws Exception
+   */
+  private void multipleQueuesWithinCapacities(String[] queues)
+      throws Exception {
+    String[] users = new String[] { "U1", "U2", "U3", "U4" };
+    ControlledMapReduceJobRunner[] jobRunners =
+        new ControlledMapReduceJobRunner[4];
+    ControlledMapReduceJob[] controlledJobs = new ControlledMapReduceJob[4];
+    JobInProgress[] jips = new JobInProgress[4];
+
+    // Initialize all the jobs
+    // Start all the jobs in parallel
+    JobConf conf = getJobConf();
+    int numTasks = 1;
+    for (int i = 0; i < 4; i++) {
+      conf.setQueueName(queues[i]);
+      conf.setUser(users[i]);
+      jobRunners[i] =
+          ControlledMapReduceJobRunner.getControlledMapReduceJobRunner(
+              getJobConf(), numTasks, numTasks);
+      jobRunners[i].start();
+      controlledJobs[i] = jobRunners[i].getJob();
+      JobID jobID = jobRunners[i].getJobID();
+      jips[i] = getJobTracker().getJob(jobID);
+      // Wait till all the jobs start running all of their tasks
+      ControlledMapReduceJob.waitTillNTasksStartRunning(jips[i], true,
+          numTasks);
+      ControlledMapReduceJob.waitTillNTasksStartRunning(jips[i], false,
+          numTasks);
+      numTasks += 1;
+    }
+
+    // Ensure steady state behavior
+    waitTillAllSlotsAreOccupied(true);
+    waitTillAllSlotsAreOccupied(false);
+    numTasks = 1;
+    for (int i = 0; i < 4; i++) {
+      ControlledMapReduceJob.assertNumTasksRunning(jips[i], true, numTasks);
+      ControlledMapReduceJob.assertNumTasksRunning(jips[i], false, numTasks);
+      numTasks += 1;
+    }
+
+    // Finish the jobs and join them
+    numTasks = 1;
+    for (int i = 0; i < 4; i++) {
+      controlledJobs[i].finishJob();
+      ControlledMapReduceJob
+          .waitTillNTotalTasksFinish(jips[i], true, numTasks);
+      ControlledMapReduceJob.assertNumTasksRunning(jips[i], true, 0);
+      ControlledMapReduceJob.waitTillNTotalTasksFinish(jips[i], false,
+          numTasks);
+      ControlledMapReduceJob.assertNumTasksRunning(jips[i], false, 0);
+      jobRunners[i].join();
+      numTasks += 1;
+    }
+  }
+}
\ No newline at end of file

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java?rev=732231&r1=732230&r2=732231&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java Tue Jan  6 21:51:03 2009
@@ -154,6 +154,10 @@
     return dfsCluster.getFileSystem();
   }
 
+  protected MiniMRCluster getMRCluster() {
+    return mrCluster;
+  }
+
   /**
    * Returns the path to the root directory for the testcase.
    *

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ControlledMapReduceJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ControlledMapReduceJob.java?rev=732231&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ControlledMapReduceJob.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ControlledMapReduceJob.java Tue Jan  6 21:51:03 2009
@@ -0,0 +1,576 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * A Controlled Map/Reduce Job. The tasks are controlled by the presence of
+ * particularly named files in the directory signalFileDir on the file-system
+ * that the job is configured to work with. Tasks get scheduled by the
+ * scheduler, occupy the slots on the TaskTrackers and keep running till the
+ * user gives a signal via files whose names are of the form MAPS_[0-9]* and
+ * REDUCES_[0-9]*. For e.g., whenever the map tasks see that a file name MAPS_5
+ * is created in the singalFileDir, all the maps whose TaskAttemptIDs are below
+ * 4 get finished. At any time, there should be only one MAPS_[0-9]* file and
+ * only one REDUCES_[0-9]* file in the singnalFileDir. In the beginning MAPS_0
+ * and REDUCE_0 files are present, and further signals are given by renaming
+ * these files.
+ * 
+ */
+class ControlledMapReduceJob extends Configured implements Tool,
+    Mapper<NullWritable, NullWritable, IntWritable, NullWritable>,
+    Reducer<IntWritable, NullWritable, NullWritable, NullWritable>,
+    Partitioner<IntWritable, NullWritable>,
+    InputFormat<NullWritable, NullWritable> {
+
+  static final Log LOG = LogFactory.getLog(ControlledMapReduceJob.class);
+
+  private FileSystem fs = null;
+  private int taskNumber;
+
+  private static ArrayList<Path> signalFileDirCache = new ArrayList<Path>();
+
+  private Path signalFileDir;
+  {
+    Random random = new Random();
+    signalFileDir = new Path("signalFileDir-" + random.nextLong());
+    while (signalFileDirCache.contains(signalFileDir)) {
+      signalFileDir = new Path("signalFileDir-" + random.nextLong());
+    }
+    signalFileDirCache.add(signalFileDir);
+  }
+
+  private long mapsFinished = 0;
+  private long reducesFinished = 0;
+
+  private RunningJob rJob = null;
+
+  private int numMappers;
+  private int numReducers;
+
+  private final String MAP_SIGFILE_PREFIX = "MAPS_";
+  private final String REDUCE_SIGFILE_PREFIX = "REDUCES_";
+
+  private void initialize()
+      throws IOException {
+    fs = FileSystem.get(getConf());
+    fs.mkdirs(signalFileDir);
+    writeFile(new Path(signalFileDir, MAP_SIGFILE_PREFIX + mapsFinished));
+    writeFile(new Path(signalFileDir, REDUCE_SIGFILE_PREFIX + reducesFinished));
+  }
+
+  /**
+   * Finish N number of maps/reduces.
+   * 
+   * @param isMap
+   * @param noOfTasksToFinish
+   * @throws IOException
+   */
+  public void finishNTasks(boolean isMap, int noOfTasksToFinish)
+      throws IOException {
+    if (noOfTasksToFinish < 0) {
+      throw new IOException(
+          "Negative values for noOfTasksToFinish not acceptable");
+    }
+
+    if (noOfTasksToFinish == 0) {
+      return;
+    }
+
+    LOG.info("Going to finish off " + noOfTasksToFinish);
+    String PREFIX = isMap ? MAP_SIGFILE_PREFIX : REDUCE_SIGFILE_PREFIX;
+    long tasksFinished = isMap ? mapsFinished : reducesFinished;
+    Path oldSignalFile =
+        new Path(signalFileDir, PREFIX + String.valueOf(tasksFinished));
+    Path newSignalFile =
+        new Path(signalFileDir, PREFIX
+            + String.valueOf(tasksFinished + noOfTasksToFinish));
+    fs.rename(oldSignalFile, newSignalFile);
+    if (isMap) {
+      mapsFinished += noOfTasksToFinish;
+    } else {
+      reducesFinished += noOfTasksToFinish;
+    }
+    LOG.info("Successfully sent signal to finish off " + noOfTasksToFinish);
+  }
+
+  /**
+   * Finished all tasks of type determined by isMap
+   * 
+   * @param isMap
+   * @throws IOException
+   */
+  public void finishAllTasks(boolean isMap)
+      throws IOException {
+    finishNTasks(isMap, (isMap ? numMappers : numReducers));
+  }
+
+  /**
+   * Finish the job
+   * 
+   * @throws IOException
+   */
+  public void finishJob()
+      throws IOException {
+    finishAllTasks(true);
+    finishAllTasks(false);
+  }
+
+  /**
+   * Wait till noOfTasksToBeRunning number of tasks of type specified by isMap
+   * started running. This currently uses a jip object and directly uses its api
+   * to determine the number of tasks running.
+   * 
+   * <p>
+   * 
+   * TODO: It should eventually use a JobID and then get the information from
+   * the JT to check the number of running tasks.
+   * 
+   * @param jip
+   * @param isMap
+   * @param noOfTasksToBeRunning
+   */
+  static void waitTillNTasksStartRunning(JobInProgress jip, boolean isMap,
+      int noOfTasksToBeRunning)
+      throws InterruptedException {
+    int numTasks = 0;
+    while (numTasks != noOfTasksToBeRunning) {
+      Thread.sleep(1000);
+      numTasks = isMap ? jip.runningMaps() : jip.runningReduces();
+      LOG.info("Waiting till " + noOfTasksToBeRunning
+          + (isMap ? " map" : " reduce") + " tasks of the job "
+          + jip.getJobID() + " start running. " + numTasks
+          + " tasks already started running.");
+    }
+  }
+
+  /**
+   * Make sure that the number of tasks of type specified by isMap running in
+   * the given job is the same as noOfTasksToBeRunning
+   * 
+   * <p>
+   * 
+   * TODO: It should eventually use a JobID and then get the information from
+   * the JT to check the number of running tasks.
+   * 
+   * @param jip
+   * @param isMap
+   * @param noOfTasksToBeRunning
+   */
+  static void assertNumTasksRunning(JobInProgress jip, boolean isMap,
+      int noOfTasksToBeRunning)
+      throws Exception {
+    if ((isMap ? jip.runningMaps() : jip.runningReduces()) != noOfTasksToBeRunning) {
+      throw new Exception("Number of tasks running is not "
+          + noOfTasksToBeRunning);
+    }
+  }
+
+  /**
+   * Wait till noOfTasksToFinish number of tasks of type specified by isMap
+   * are finished. This currently uses a jip object and directly uses its api to
+   * determine the number of tasks finished.
+   * 
+   * <p>
+   * 
+   * TODO: It should eventually use a JobID and then get the information from
+   * the JT to check the number of finished tasks.
+   * 
+   * @param jip
+   * @param isMap
+   * @param noOfTasksToFinish
+   * @throws InterruptedException
+   */
+  static void waitTillNTotalTasksFinish(JobInProgress jip, boolean isMap,
+      int noOfTasksToFinish)
+      throws InterruptedException {
+    int noOfTasksAlreadyFinished = 0;
+    while (noOfTasksAlreadyFinished < noOfTasksToFinish) {
+      Thread.sleep(1000);
+      noOfTasksAlreadyFinished =
+          (isMap ? jip.finishedMaps() : jip.finishedReduces());
+      LOG.info("Waiting till " + noOfTasksToFinish
+          + (isMap ? " map" : " reduce") + " tasks of the job "
+          + jip.getJobID() + " finish. " + noOfTasksAlreadyFinished
+          + " tasks already got finished.");
+    }
+  }
+
+  /**
+   * Have all the tasks of type specified by isMap finished in this job?
+   * 
+   * @param jip
+   * @param isMap
+   * @return true if finished, false otherwise
+   */
+  static boolean haveAllTasksFinished(JobInProgress jip, boolean isMap) {
+    return ((isMap ? jip.runningMaps() : jip.runningReduces()) == 0);
+  }
+
+  private void writeFile(Path name)
+      throws IOException {
+    Configuration conf = new Configuration(false);
+    SequenceFile.Writer writer =
+        SequenceFile.createWriter(fs, conf, name, BytesWritable.class,
+            BytesWritable.class, CompressionType.NONE);
+    writer.append(new BytesWritable(), new BytesWritable());
+    writer.close();
+  }
+
+  @Override
+  public void configure(JobConf conf) {
+    try {
+      signalFileDir = new Path(conf.get("signal.dir.path"));
+      numReducers = conf.getNumReduceTasks();
+      fs = FileSystem.get(conf);
+      String taskAttemptId = conf.get("mapred.task.id");
+      if (taskAttemptId != null) {
+        TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptId);
+        taskNumber = taskAttemptID.getTaskID().getId();
+      }
+    } catch (IOException ioe) {
+      LOG.warn("Caught exception " + ioe);
+    }
+  }
+
+  private FileStatus[] listSignalFiles(FileSystem fileSys, final boolean isMap)
+      throws IOException {
+    return fileSys.globStatus(new Path(signalFileDir.toString() + "/*"),
+        new PathFilter() {
+          @Override
+          public boolean accept(Path path) {
+            if (isMap && path.getName().startsWith(MAP_SIGFILE_PREFIX)) {
+              LOG.debug("Found signal file : " + path.getName());
+              return true;
+            } else if (!isMap
+                && path.getName().startsWith(REDUCE_SIGFILE_PREFIX)) {
+              LOG.debug("Found signal file : " + path.getName());
+              return true;
+            }
+            LOG.info("Didn't find any relevant signal files.");
+            return false;
+          }
+        });
+  }
+
+  @Override
+  public void map(NullWritable key, NullWritable value,
+      OutputCollector<IntWritable, NullWritable> output, Reporter reporter)
+      throws IOException {
+    LOG.info(taskNumber + " has started.");
+    FileStatus[] files = listSignalFiles(fs, true);
+    String[] sigFileComps = files[0].getPath().getName().split("_");
+    String signalType = sigFileComps[0];
+    int noOfTasks = Integer.parseInt(sigFileComps[1]);
+
+    while (!signalType.equals("MAPS") || taskNumber + 1 > noOfTasks) {
+      LOG.info("Signal type found : " + signalType
+          + " .Number of tasks to be finished by this signal : " + noOfTasks
+          + " . My id : " + taskNumber);
+      LOG.info(taskNumber + " is still alive.");
+      try {
+        reporter.progress();
+        Thread.sleep(1000);
+      } catch (InterruptedException ie) {
+        LOG.info(taskNumber + " is still alive.");
+        break;
+      }
+      files = listSignalFiles(fs, true);
+      sigFileComps = files[0].getPath().getName().split("_");
+      signalType = sigFileComps[0];
+      noOfTasks = Integer.parseInt(sigFileComps[1]);
+    }
+    LOG.info("Signal type found : " + signalType
+        + " .Number of tasks to be finished by this signal : " + noOfTasks
+        + " . My id : " + taskNumber);
+    // output numReduce number of random values, so that
+    // each reducer will get one key each.
+    for (int i = 0; i < numReducers; i++) {
+      output.collect(new IntWritable(i), NullWritable.get());
+    }
+
+    LOG.info(taskNumber + " is finished.");
+  }
+
+  @Override
+  public void reduce(IntWritable key, Iterator<NullWritable> values,
+      OutputCollector<NullWritable, NullWritable> output, Reporter reporter)
+      throws IOException {
+    LOG.info(taskNumber + " has started.");
+    FileStatus[] files = listSignalFiles(fs, false);
+    String[] sigFileComps = files[0].getPath().getName().split("_");
+    String signalType = sigFileComps[0];
+    int noOfTasks = Integer.parseInt(sigFileComps[1]);
+
+    while (!signalType.equals("REDUCES") || taskNumber + 1 > noOfTasks) {
+      LOG.info("Signal type found : " + signalType
+          + " .Number of tasks to be finished by this signal : " + noOfTasks
+          + " . My id : " + taskNumber);
+      LOG.info(taskNumber + " is still alive.");
+      try {
+        reporter.progress();
+        Thread.sleep(1000);
+      } catch (InterruptedException ie) {
+        LOG.info(taskNumber + " is still alive.");
+        break;
+      }
+      files = listSignalFiles(fs, false);
+      sigFileComps = files[0].getPath().getName().split("_");
+      signalType = sigFileComps[0];
+      noOfTasks = Integer.parseInt(sigFileComps[1]);
+    }
+    LOG.info("Signal type found : " + signalType
+        + " .Number of tasks to be finished by this signal : " + noOfTasks
+        + " . My id : " + taskNumber);
+    LOG.info(taskNumber + " is finished.");
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    // nothing
+  }
+
+  public JobID getJobId() {
+    if (rJob == null) {
+      return null;
+    }
+    return rJob.getID();
+  }
+
+  public int run(int numMapper, int numReducer)
+      throws IOException {
+    JobConf conf =
+        getControlledMapReduceJobConf(getConf(), numMapper, numReducer);
+    JobClient client = new JobClient(conf);
+    rJob = client.submitJob(conf);
+    while (!rJob.isComplete()) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException ie) {
+        break;
+      }
+    }
+    if (rJob.isSuccessful()) {
+      return 0;
+    }
+    return 1;
+  }
+
+  private JobConf getControlledMapReduceJobConf(Configuration clusterConf,
+      int numMapper, int numReducer)
+      throws IOException {
+    setConf(clusterConf);
+    initialize();
+    JobConf conf = new JobConf(getConf(), ControlledMapReduceJob.class);
+    conf.setJobName("ControlledJob");
+    conf.set("signal.dir.path", signalFileDir.toString());
+    conf.setNumMapTasks(numMapper);
+    conf.setNumReduceTasks(numReducer);
+    conf.setMapperClass(ControlledMapReduceJob.class);
+    conf.setMapOutputKeyClass(IntWritable.class);
+    conf.setMapOutputValueClass(NullWritable.class);
+    conf.setReducerClass(ControlledMapReduceJob.class);
+    conf.setOutputKeyClass(NullWritable.class);
+    conf.setOutputValueClass(NullWritable.class);
+    conf.setInputFormat(ControlledMapReduceJob.class);
+    FileInputFormat.addInputPath(conf, new Path("ignored"));
+    conf.setOutputFormat(NullOutputFormat.class);
+
+    // Set the following for reduce tasks to be able to be started running
+    // immediately along with maps.
+    conf.set("mapred.reduce.slowstart.completed.maps", String.valueOf(0));
+
+    return conf;
+  }
+
+  @Override
+  public int run(String[] args)
+      throws Exception {
+    numMappers = Integer.parseInt(args[0]);
+    numReducers = Integer.parseInt(args[1]);
+    return run(numMappers, numReducers);
+  }
+
+  @Override
+  public int getPartition(IntWritable k, NullWritable v, int numPartitions) {
+    return k.get() % numPartitions;
+  }
+
+  @Override
+  public RecordReader<NullWritable, NullWritable> getRecordReader(
+      InputSplit split, JobConf job, Reporter reporter) {
+    LOG.debug("Inside RecordReader.getRecordReader");
+    return new RecordReader<NullWritable, NullWritable>() {
+      private int pos = 0;
+
+      public void close() {
+        // nothing
+      }
+
+      public NullWritable createKey() {
+        return NullWritable.get();
+      }
+
+      public NullWritable createValue() {
+        return NullWritable.get();
+      }
+
+      public long getPos() {
+        return pos;
+      }
+
+      public float getProgress() {
+        return pos * 100;
+      }
+
+      public boolean next(NullWritable key, NullWritable value) {
+        if (pos++ == 0) {
+          LOG.debug("Returning the next record");
+          return true;
+        }
+        LOG.debug("No more records. Returning none.");
+        return false;
+      }
+    };
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) {
+    LOG.debug("Inside InputSplit.getSplits");
+    InputSplit[] ret = new InputSplit[numSplits];
+    for (int i = 0; i < numSplits; ++i) {
+      ret[i] = new EmptySplit();
+    }
+    return ret;
+  }
+
+  public static class EmptySplit implements InputSplit {
+    public void write(DataOutput out)
+        throws IOException {
+    }
+
+    public void readFields(DataInput in)
+        throws IOException {
+    }
+
+    public long getLength() {
+      return 0L;
+    }
+
+    public String[] getLocations() {
+      return new String[0];
+    }
+  }
+
+  static class ControlledMapReduceJobRunner extends Thread {
+    private JobConf conf;
+    private ControlledMapReduceJob job;
+    private JobID jobID;
+
+    private int numMappers;
+    private int numReducers;
+
+    public ControlledMapReduceJobRunner() {
+      this(new JobConf(), 5, 5);
+    }
+
+    public ControlledMapReduceJobRunner(JobConf cnf, int numMap, int numRed) {
+      this.conf = cnf;
+      this.numMappers = numMap;
+      this.numReducers = numRed;
+    }
+
+    public ControlledMapReduceJob getJob() {
+      while (job == null) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException ie) {
+          LOG.info(ControlledMapReduceJobRunner.class.getName()
+              + " is interrupted.");
+          break;
+        }
+      }
+      return job;
+    }
+
+    public JobID getJobID()
+        throws IOException {
+      ControlledMapReduceJob job = getJob();
+      JobID id = job.getJobId();
+      while (id == null) {
+        id = job.getJobId();
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException ie) {
+          LOG.info(ControlledMapReduceJobRunner.class.getName()
+              + " is interrupted.");
+          break;
+        }
+      }
+      return id;
+    }
+
+    @Override
+    public void run() {
+      if (job != null) {
+        LOG.warn("Job is already running.");
+        return;
+      }
+      try {
+        job = new ControlledMapReduceJob();
+        int ret =
+            ToolRunner.run(this.conf, job, new String[] {
+                String.valueOf(numMappers), String.valueOf(numReducers) });
+        LOG.info("Return value for the job : " + ret);
+      } catch (Exception e) {
+        LOG.warn("Caught exception : " + StringUtils.stringifyException(e));
+      }
+    }
+
+    static ControlledMapReduceJobRunner getControlledMapReduceJobRunner(
+        JobConf conf, int numMappers, int numReducers) {
+      return new ControlledMapReduceJobRunner(conf, numMappers, numReducers);
+    }
+  }
+}
\ No newline at end of file

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestControlledMapReduceJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestControlledMapReduceJob.java?rev=732231&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestControlledMapReduceJob.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestControlledMapReduceJob.java Tue Jan  6 21:51:03 2009
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.ControlledMapReduceJob.ControlledMapReduceJobRunner;
+
+/**
+ * Test to verify the controlled behavior of a ControlledMapReduceJob.
+ * 
+ */
+public class TestControlledMapReduceJob extends ClusterMapReduceTestCase {
+  static final Log LOG = LogFactory.getLog(TestControlledMapReduceJob.class);
+
+  /**
+   * Starts a job with 5 maps and 5 reduces. Then controls the finishing of
+   * tasks. Signals finishing tasks in batches and then verifies their
+   * completion.
+   * 
+   * @throws Exception
+   */
+  public void testControlledMapReduceJob()
+      throws Exception {
+
+    Properties props = new Properties();
+    props.setProperty("mapred.tasktracker.map.tasks.maximum", "2");
+    props.setProperty("mapred.tasktracker.reduce.tasks.maximum", "2");
+    startCluster(true, props);
+    LOG.info("Started the cluster");
+
+    ControlledMapReduceJobRunner jobRunner =
+        ControlledMapReduceJobRunner
+            .getControlledMapReduceJobRunner(createJobConf(), 7, 6);
+    jobRunner.start();
+    ControlledMapReduceJob controlledJob = jobRunner.getJob();
+    JobInProgress jip =
+        getMRCluster().getJobTrackerRunner().getJobTracker().getJob(
+            jobRunner.getJobID());
+
+    ControlledMapReduceJob.waitTillNTasksStartRunning(jip, true, 4);
+    LOG.info("Finishing 3 maps");
+    controlledJob.finishNTasks(true, 3);
+    ControlledMapReduceJob.waitTillNTotalTasksFinish(jip, true, 3);
+
+    ControlledMapReduceJob.waitTillNTasksStartRunning(jip, true, 4);
+    LOG.info("Finishing 4 more maps");
+    controlledJob.finishNTasks(true, 4);
+    ControlledMapReduceJob.waitTillNTotalTasksFinish(jip, true, 7);
+
+    ControlledMapReduceJob.waitTillNTasksStartRunning(jip, false, 4);
+    LOG.info("Finishing 2 reduces");
+    controlledJob.finishNTasks(false, 2);
+    ControlledMapReduceJob.waitTillNTotalTasksFinish(jip, false, 2);
+
+    ControlledMapReduceJob.waitTillNTasksStartRunning(jip, false, 4);
+    LOG.info("Finishing 4 more reduces");
+    controlledJob.finishNTasks(false, 4);
+    ControlledMapReduceJob.waitTillNTotalTasksFinish(jip, false, 6);
+
+    jobRunner.join();
+  }
+}



Mime
View raw message