hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r788653 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/JobTracker.java src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java
Date Fri, 26 Jun 2009 11:06:16 GMT
Author: sharad
Date: Fri Jun 26 11:06:15 2009
New Revision: 788653

URL: http://svn.apache.org/viewvc?rev=788653&view=rev
Log:
MAPREDUCE-502. Allow jobtracker to be configured with zero completed jobs in memory. Contributed
by Amar Kamat.

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=788653&r1=788652&r2=788653&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jun 26 11:06:15 2009
@@ -30,6 +30,9 @@
     MAPREDUCE-463. Makes job setup and cleanup tasks as optional.
     (Amareshwari Sriramadasu via sharad)
 
+    MAPREDUCE-502. Allow jobtracker to be configured with zero completed jobs
+    in memory. (Amar Kamat via sharad)
+
   BUG FIXES
     HADOOP-4687. MapReduce is split from Hadoop Core. It is a subproject under 
     Hadoop (Owen O'Malley)

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=788653&r1=788652&r2=788653&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Fri Jun 26 11:06:15
2009
@@ -163,7 +163,7 @@
     * The minimum time (in ms) that a job's information has to remain
     * in the JobTracker's memory before it is retired.
     */
-  static final int MIN_TIME_BEFORE_RETIRE = 60000;
+  final int MIN_TIME_BEFORE_RETIRE;
 
 
   private int nextJobId = 1;
@@ -187,10 +187,15 @@
   public static JobTracker startTracker(JobConf conf
                                         ) throws IOException,
                                                  InterruptedException {
+    return startTracker(conf, new Clock());
+  }
+
+  static JobTracker startTracker(JobConf conf, Clock clock) 
+  throws IOException, InterruptedException {
     JobTracker result = null;
     while (true) {
       try {
-        result = new JobTracker(conf, new Clock());
+        result = new JobTracker(conf, clock);
         result.taskScheduler.setTaskTrackerManager(result);
         break;
       } catch (VersionMismatch e) {
@@ -404,6 +409,7 @@
   // Used to remove old finished Jobs that have been around for too long
   ///////////////////////////////////////////////////////
   class RetireJobs implements Runnable {
+    int runCount = 0;
     public RetireJobs() {
     }
 
@@ -414,6 +420,7 @@
      */
     public void run() {
       while (true) {
+        ++runCount;
         try {
           Thread.sleep(retireJobCheckInterval);
           List<JobInProgress> retiredJobs = new ArrayList<JobInProgress>();
@@ -1602,6 +1609,8 @@
       conf.getLong("mapred.tasktracker.expiry.interval", 10 * 60 * 1000);
     retireJobInterval = conf.getLong("mapred.jobtracker.retirejob.interval", 24 * 60 * 60
* 1000);
     retireJobCheckInterval = conf.getLong("mapred.jobtracker.retirejob.check", 60 * 1000);
+    // min time before retire
+    MIN_TIME_BEFORE_RETIRE = conf.getInt("mapred.jobtracker.retirejob.interval.min", 60000);
     MAX_COMPLETE_USER_JOBS_IN_MEMORY = conf.getInt("mapred.jobtracker.completeuserjobs.maximum",
100);
     MAX_BLACKLISTS_PER_TRACKER = 
         conf.getInt("mapred.max.tracker.blacklists", 4);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java?rev=788653&r1=788652&r2=788653&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java Fri
Jun 26 11:06:15 2009
@@ -57,6 +57,7 @@
   private int numTrackerToExclude;
     
   private JobConf job;
+  private Clock clock;
   
   /**
    * An inner class that runs a job tracker.
@@ -66,11 +67,17 @@
     private volatile boolean isActive = true;
     
     JobConf jc = null;
+    Clock clock = null;
         
     public JobTrackerRunner(JobConf conf) {
       jc = conf;
     }
 
+    public JobTrackerRunner(JobConf conf, Clock clock) {
+      jc = conf;
+      this.clock = clock;
+    }
+
     public boolean isUp() {
       return (tracker != null);
     }
@@ -101,7 +108,11 @@
         jc.set("mapred.local.dir",f.getAbsolutePath());
         jc.setClass("topology.node.switch.mapping.impl", 
             StaticMapping.class, DNSToSwitchMapping.class);
-        tracker = JobTracker.startTracker(jc);
+        if (clock == null) {
+          tracker = JobTracker.startTracker(jc);
+        } else {
+          tracker = JobTracker.startTracker(jc, clock);
+        }
         tracker.offerService();
       } catch (Throwable e) {
         LOG.error("Job tracker crashed", e);
@@ -422,6 +433,14 @@
       int numTaskTrackers, String namenode, 
       int numDir, String[] racks, String[] hosts, UnixUserGroupInformation ugi,
       JobConf conf, int numTrackerToExclude) throws IOException {
+    this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
+         racks, hosts, ugi, conf, numTrackerToExclude, new Clock());
+  }
+
+   public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
+      int numTaskTrackers, String namenode,
+      int numDir, String[] racks, String[] hosts, UnixUserGroupInformation ugi,
+      JobConf conf, int numTrackerToExclude, Clock clock) throws IOException {
     if (racks != null && racks.length < numTaskTrackers) {
       LOG.error("Invalid number of racks specified. It should be at least " +
           "equal to the number of tasktrackers");
@@ -457,6 +476,7 @@
     this.ugi = ugi;
     this.conf = conf; // this is the conf the mr starts with
     this.numTrackerToExclude = numTrackerToExclude;
+    this.clock = clock;
 
     // start the jobtracker
     startJobTracker();
@@ -554,7 +574,7 @@
    */
   public void startJobTracker() {
     //  Create the JobTracker
-    jobTracker = new JobTrackerRunner(conf);
+    jobTracker = new JobTrackerRunner(conf, clock);
     jobTrackerThread = new Thread(jobTracker);
         
     jobTrackerThread.start();

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java?rev=788653&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java Fri
Jun 26 11:06:15 2009
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Test if the job expiry works fine. 
+ */
+public class TestJobRetire extends TestCase {
+  static final Path testDir = 
+    new Path(System.getProperty("test.build.data","/tmp"), 
+             "job-expiry-testing");
+  private static final Log LOG = LogFactory.getLog(TestJobRetire.class);
+
+  /** Test if the job after completion waits for atleast 
+   *  mapred.jobtracker.retirejob.interval.min amount of time.
+   */
+  public void testMinIntervalBeforeRetire() throws Exception {
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    int min = 10000;
+    int max = 5000;
+    try {
+      FakeClock clock = new FakeClock();
+      JobConf conf = new JobConf();
+      dfs = new MiniDFSCluster(conf, 1, true, null);
+      FileSystem fileSys = dfs.getFileSystem();
+      String namenode = fileSys.getUri().toString();
+
+      conf.setLong("mapred.jobtracker.retirejob.check", 1000); // 1 sec
+      conf.setInt("mapred.jobtracker.retirejob.interval.min", min); //10 secs
+      conf.setInt("mapred.jobtracker.retirejob.interval", max); // 5 secs
+      conf.setBoolean("mapred.job.tracker.persist.jobstatus.active", false);
+      mr = new MiniMRCluster(0, 0, 1, namenode, 1, null, null, null, conf, 0, 
+                             clock);
+      JobConf jobConf = mr.createJobConf();
+      JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
+
+      Path inDir = new Path(testDir, "input");
+      Path outDir = new Path(testDir, "output");
+      RunningJob rj = UtilsForTests.runJob(jobConf, inDir, outDir, 0, 0);
+      rj.waitForCompletion();
+      JobID id = rj.getID();
+      JobClient jc = new JobClient(jobConf);
+
+      // check if the job is there in the memory for min time
+      assertTrue(rj.isSuccessful());
+
+      // snapshot expiry thread count
+      int snapshot = jobtracker.retireJobs.runCount;
+      clock.advance(max + 1); // adv to expiry max time
+
+      // wait for the thread to run
+      while (jobtracker.retireJobs.runCount == snapshot) {
+        // wait for the thread to run
+        UtilsForTests.waitFor(1000);
+      }
+
+      assertNotNull(jc.getJob(id));
+
+      // snapshot expiry thread count
+      snapshot = jobtracker.retireJobs.runCount;
+      clock.advance(min - max); // adv to expiry min time
+
+      while (jobtracker.retireJobs.runCount == snapshot) {
+        // wait for the thread to run
+        UtilsForTests.waitFor(1000);
+      }
+
+      // check if the job is missing
+      assertNull(jc.getJob(id));
+    } finally {
+      if (mr != null) { mr.shutdown();}
+      if (dfs != null) { dfs.shutdown();}
+    }
+  }
+
+  /** Test if the job after completion get expired after
+   *  mapred.jobtracker.retirejob.interval amount after the time.
+   */
+  public void testJobRetire() throws Exception {
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    int min = 10000;
+    int max = 20000;
+    try {
+      FakeClock clock = new FakeClock();
+      JobConf conf = new JobConf();
+      dfs = new MiniDFSCluster(conf, 1, true, null);
+      FileSystem fileSys = dfs.getFileSystem();
+      String namenode = fileSys.getUri().toString();
+
+      conf.setLong("mapred.jobtracker.retirejob.check", 1000); // 1 sec
+      conf.setInt("mapred.jobtracker.retirejob.interval.min", min); // 10 secs
+      conf.setInt("mapred.jobtracker.retirejob.interval", max); // 20 secs
+      conf.setBoolean("mapred.job.tracker.persist.jobstatus.active", false);
+      mr = new MiniMRCluster(0, 0, 1, namenode, 1, null, null, null, conf, 0, 
+                             clock);
+      JobConf jobConf = mr.createJobConf();
+      JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
+      
+      Path inDir = new Path(testDir, "input1");
+      Path outDir = new Path(testDir, "output1");
+      RunningJob rj = UtilsForTests.runJob(jobConf, inDir, outDir, 0, 0);
+      rj.waitForCompletion();
+      JobID id = rj.getID();
+      JobClient jc = new JobClient(jobConf);
+
+      // check if the job is there in the memory for min time
+      assertTrue(rj.isSuccessful());
+
+      // snapshot expiry thread count
+      int snapshot = jobtracker.retireJobs.runCount;
+      clock.advance(max + 1); // adv to expiry max time
+
+      while (jobtracker.retireJobs.runCount == snapshot) {
+        // wait for the thread to run
+        UtilsForTests.waitFor(1000);
+      }
+
+      // check if the job is missing
+      assertNull(jc.getJob(id));
+    } finally {
+      if (mr != null) { mr.shutdown();}
+      if (dfs != null) { dfs.shutdown();}
+    }
+  }
+}



Mime
View raw message