hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r792528 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/JobInProgress.java src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java
Date Thu, 09 Jul 2009 12:31:47 GMT
Author: yhemanth
Date: Thu Jul  9 12:31:46 2009
New Revision: 792528

URL: http://svn.apache.org/viewvc?rev=792528&view=rev
Log:
MAPREDUCE-734. Fix a ConcurrentModificationException in unreserving unused reservations for
a job when it completes. Contributed by Arun Murthy and Sreekanth Ramakrishnan.

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=792528&r1=792527&r2=792528&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jul  9 12:31:46 2009
@@ -164,3 +164,8 @@
 
     MAPREDUCE-685. Sqoop will fail with OutOfMemory on large tables
     using mysql. (Aaron Kimball via tomwhite)
+
+    MAPREDUCE-734. Fix a ConcurrentModificationException in unreserving
+    unused reservations for a job when it completes.
+    (Arun Murthy and Sreekanth Ramakrishnan via yhemanth)
+

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=792528&r1=792527&r2=792528&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Thu Jul  9
12:31:46 2009
@@ -186,7 +186,7 @@
   private final int restartCount;
 
   private JobConf conf;
-  AtomicBoolean tasksInited = new AtomicBoolean(false);
+  protected AtomicBoolean tasksInited = new AtomicBoolean(false);
   private JobInitKillStatus jobInitKillStatus = new JobInitKillStatus();
 
   private LocalFileSystem localFs;
@@ -303,7 +303,8 @@
         "mapred.speculative.execution.speculativeCap",0.1f);
     this.slowNodeThreshold = conf.getFloat(
         "mapred.speculative.execution.slowNodeThreshold",1.0f);
-
+    this.jobSetupCleanupNeeded = conf.getBoolean(
+        "mapred.committer.job.setup.cleanup.needed", true);
   }
   
   /**
@@ -2622,7 +2623,6 @@
         this.status.setReduceProgress(1.0f);
       }
       this.finishTime = JobTracker.getClock().getTime();
-      cancelReservedSlots();
       LOG.info("Job " + this.status.getJobID() + 
                " has completed successfully.");
       JobHistory.JobInfo.logFinished(this.status.getJobID(), finishTime, 
@@ -2707,22 +2707,31 @@
         reduces[i].kill();
       }
       
-      // Clear out reserved tasktrackers
-      cancelReservedSlots();
       if (!jobSetupCleanupNeeded) {
         terminateJob(jobTerminationState);
       }
     }
   }
 
+  /**
+   * Cancel all reservations since the job is done
+   */
   private void cancelReservedSlots() {
-    for (TaskTracker tt : trackersReservedForMaps.keySet()) {
+    // Make a copy of the set of TaskTrackers to prevent a 
+    // ConcurrentModificationException ...
+    Set<TaskTracker> tm = 
+      new HashSet<TaskTracker>(trackersReservedForMaps.keySet());
+    for (TaskTracker tt : tm) {
       tt.unreserveSlots(TaskType.MAP, this);
     }
-    for (TaskTracker tt : trackersReservedForReduces.keySet()) {
+
+    Set<TaskTracker> tr = 
+      new HashSet<TaskTracker>(trackersReservedForReduces.keySet());
+    for (TaskTracker tt : tr) {
       tt.unreserveSlots(TaskType.REDUCE, this);
     }
   }
+  
   private void clearUncleanTasks() {
     TaskAttemptID taskid = null;
     TaskInProgress tip = null;
@@ -3030,6 +3039,8 @@
    * from the various tables.
    */
   synchronized void garbageCollect() {
+    //Cancel task tracker reservation
+    cancelReservedSlots();
     // Let the JobTracker know that a job is complete
     jobtracker.getInstrumentation().decWaitingMaps(getJobID(), pendingMaps());
     jobtracker.getInstrumentation().decWaitingReduces(getJobID(), pendingReduces());

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java?rev=792528&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java
(added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java
Thu Jul  9 12:31:46 2009
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+public class TestTrackerReservation extends TestCase {
+
+  static String[] trackers = new String[] { "tracker_tracker1:1000",
+      "tracker_tracker2:1000", "tracker_tracker3:1000" };
+  private static FakeJobTracker jobTracker;
+
+  private static class FakeJobTracker extends
+      org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker {
+
+    FakeJobTracker(JobConf conf, Clock clock, String[] tts) throws IOException,
+        InterruptedException {
+      super(conf, clock, tts);
+    }
+
+    @Override
+    synchronized void finalizeJob(JobInProgress job) {
+      // Do nothing
+    }
+  }
+
+  private static class FakeJobInProgress extends
+      org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress {
+    TaskInProgress cleanup[] = new TaskInProgress[0];
+    TaskInProgress setup[] = new TaskInProgress[0];
+
+    FakeJobInProgress(JobConf jobConf, JobTracker tracker) throws IOException {
+      super(jobConf, tracker);
+    }
+
+    @Override
+    public synchronized void initTasks() throws IOException {
+      super.initTasks();
+      tasksInited.set(true);
+    }
+
+    @Override
+    public void cleanUpMetrics() {
+    }
+  }
+
+  public static Test suite() {
+    TestSetup setup = new TestSetup(new TestSuite(TestTrackerReservation.class)) {
+      protected void setUp() throws Exception {
+        JobConf conf = new JobConf();
+        conf.set("mapred.job.tracker", "localhost:0");
+        conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
+        jobTracker = new FakeJobTracker(conf, new Clock(), trackers);
+        for (String tracker : trackers) {
+          FakeObjectUtilities.establishFirstContact(jobTracker, tracker);
+        }
+      }
+
+      protected void tearDown() throws Exception {
+      }
+    };
+    return setup;
+  }
+
+  /**
+   * Test case to test if task tracker reservation.
+   * <ol>
+   * <li>Run a cluster with 3 trackers.</li>
+   * <li>Submit a job which reserves all the slots in two
+   * trackers.</li>
+   * <li>Run the job on another tracker which has 
+   * no reservations</li>
+   * <li>Finish the job and observe the reservations are
+   * successfully canceled</li>
+   * </ol>
+   * 
+   * @throws Exception
+   */
+  public void testTaskTrackerReservation() throws Exception {
+    JobConf conf = new JobConf();
+    
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(1);
+    conf.setSpeculativeExecution(false);
+    
+    conf.setBoolean(
+        "mapred.committer.job.setup.cleanup.needed", false);
+    
+    //Set task tracker objects for reservation.
+    TaskTracker tt1 = new TaskTracker(trackers[0]);
+    TaskTracker tt2 = new TaskTracker(trackers[1]);
+    TaskTracker tt3 = new TaskTracker(trackers[2]);
+    TaskTrackerStatus status1 = new TaskTrackerStatus(
+        trackers[0],JobInProgress.convertTrackerNameToHostName(
+            trackers[0]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
+    TaskTrackerStatus status2 = new TaskTrackerStatus(
+        trackers[1],JobInProgress.convertTrackerNameToHostName(
+            trackers[1]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
+    TaskTrackerStatus status3 = new TaskTrackerStatus(
+        trackers[1],JobInProgress.convertTrackerNameToHostName(
+            trackers[1]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
+    tt1.setStatus(status1);
+    tt2.setStatus(status2);
+    tt3.setStatus(status3);
+    
+    FakeJobInProgress fjob = new FakeJobInProgress(conf, jobTracker);
+    fjob.initTasks();
+    
+    tt1.reserveSlots(TaskType.MAP, fjob, 2);
+    tt1.reserveSlots(TaskType.REDUCE, fjob, 2);
+    tt3.reserveSlots(TaskType.MAP, fjob, 2);
+    tt3.reserveSlots(TaskType.REDUCE, fjob, 2);
+    
+    assertEquals("Trackers not reserved for the job : maps", 
+        2, fjob.getNumReservedTaskTrackersForMaps());
+    assertEquals("Trackers not reserved for the job : reduces", 
+        2, fjob.getNumReservedTaskTrackersForReduces());
+    
+    TaskAttemptID mTid = fjob.findMapTask(trackers[1]);
+    TaskAttemptID rTid = fjob.findReduceTask(trackers[1]);
+
+    fjob.finishTask(mTid);
+    fjob.finishTask(rTid);
+    
+    assertEquals("Job didnt complete successfully complete", fjob.getStatus()
+        .getRunState(), JobStatus.SUCCEEDED);
+    
+    assertEquals("Reservation for the job not released: Maps", 
+        0, fjob.getNumReservedTaskTrackersForMaps());
+    assertEquals("Reservation for the job not released : Reduces", 
+        0, fjob.getNumReservedTaskTrackersForReduces());
+  }
+}



Mime
View raw message