hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1195146 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org...
Date Sun, 30 Oct 2011 11:33:34 GMT
Author: vinodkv
Date: Sun Oct 30 11:33:34 2011
New Revision: 1195146

URL: http://svn.apache.org/viewvc?rev=1195146&view=rev
Log:
MAPREDUCE-3274. Fixed a race condition in MRAppMaster that was causing a task-scheduling deadlock.
Contributed by Robert Joseph Evans.
svn merge -c r1195145 --ignore-ancestry ../../trunk/

Added:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/
      - copied from r1195145, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
      - copied unchanged from r1195145, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1195146&r1=1195145&r2=1195146&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Sun Oct 30 11:33:34
2011
@@ -1818,6 +1818,9 @@ Release 0.23.0 - Unreleased
     MAPREDUCE-3256. Added authorization checks for the protocol between
     NodeManager and ApplicationMaster. (vinodkv via acmurthy) 
 
+    MAPREDUCE-3274. Fixed a race condition in MRAppMaster that was causing a
+    task-scheduling deadlock. (Robert Joseph Evans via vinodkv)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1195146&r1=1195145&r2=1195146&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
Sun Oct 30 11:33:34 2011
@@ -23,8 +23,10 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -69,12 +71,14 @@ public class TaskAttemptListenerImpl ext
 
   private AppContext context;
   private Server server;
-  private TaskHeartbeatHandler taskHeartbeatHandler;
+  protected TaskHeartbeatHandler taskHeartbeatHandler;
   private InetSocketAddress address;
-  private Map<WrappedJvmID, org.apache.hadoop.mapred.Task> jvmIDToAttemptMap = 
+  private Map<WrappedJvmID, org.apache.hadoop.mapred.Task> jvmIDToActiveAttemptMap
= 
     Collections.synchronizedMap(new HashMap<WrappedJvmID, 
         org.apache.hadoop.mapred.Task>());
   private JobTokenSecretManager jobTokenSecretManager = null;
+  private Set<WrappedJvmID> pendingJvms =
+    Collections.synchronizedSet(new HashSet<WrappedJvmID>());
   
   public TaskAttemptListenerImpl(AppContext context,
       JobTokenSecretManager jobTokenSecretManager) {
@@ -395,35 +399,55 @@ public class TaskAttemptListenerImpl ext
 
     JVMId jvmId = context.jvmId;
     LOG.info("JVM with ID : " + jvmId + " asked for a task");
-
-    // TODO: Is it an authorised container to get a task? Otherwise return null.
-
-    // TODO: Is the request for task-launch still valid?
+    
+    JvmTask jvmTask = null;
+    // TODO: Is it an authorized container to get a task? Otherwise return null.
 
     // TODO: Child.java's firstTaskID isn't really firstTaskID. Ask for update
     // to jobId and task-type.
 
     WrappedJvmID wJvmID = new WrappedJvmID(jvmId.getJobId(), jvmId.isMap,
         jvmId.getId());
-    org.apache.hadoop.mapred.Task task = jvmIDToAttemptMap.get(wJvmID);
-    if (task != null) { //there may be lag in the attempt getting added here
-      LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
-      JvmTask jvmTask = new JvmTask(task, false);
-      
-      //remove the task as it is no more needed and free up the memory
-      jvmIDToAttemptMap.remove(wJvmID);
-      
-      return jvmTask;
+    synchronized(this) {
+      if(pendingJvms.contains(wJvmID)) {
+        org.apache.hadoop.mapred.Task task = jvmIDToActiveAttemptMap.get(wJvmID);
+        if (task != null) { //there may be lag in the attempt getting added here
+         LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
+          jvmTask = new JvmTask(task, false);
+
+          //remove the task as it is no more needed and free up the memory
+          //Also we have already told the JVM to process a task, so it is no
+          //longer pending, and further request should ask it to exit.
+          pendingJvms.remove(wJvmID);
+          jvmIDToActiveAttemptMap.remove(wJvmID);
+        }
+      } else {
+        LOG.info("JVM with ID: " + jvmId + " is invalid and will be killed.");
+        jvmTask = new JvmTask(null, true);
+      }
     }
-    return null;
+    return jvmTask;
+  }
+  
+  @Override
+  public synchronized void registerPendingTask(WrappedJvmID jvmID) {
+    //Save this JVM away as one that has not been handled yet
+    pendingJvms.add(jvmID);
   }
 
   @Override
-  public void register(org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
+  public void registerLaunchedTask(
+      org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
       org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {
-    //create the mapping so that it is easy to look up
-    //when it comes back to ask for Task.
-    jvmIDToAttemptMap.put(jvmID, task);
+    synchronized(this) {
+      //create the mapping so that it is easy to look up
+      //when it comes back to ask for Task.
+      jvmIDToActiveAttemptMap.put(jvmID, task);
+      //This should not need to happen here, but just to be on the safe side
+      if(!pendingJvms.add(jvmID)) {
+        LOG.warn(jvmID+" launched without first being registered");
+      }
+    }
     //register this attempt
     taskHeartbeatHandler.register(attemptID);
   }
@@ -432,8 +456,9 @@ public class TaskAttemptListenerImpl ext
   public void unregister(org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
       WrappedJvmID jvmID) {
     //remove the mapping if not already removed
-    jvmIDToAttemptMap.remove(jvmID);
-
+    jvmIDToActiveAttemptMap.remove(jvmID);
+    //remove the pending if not already removed
+    pendingJvms.remove(jvmID);
     //unregister this attempt
     taskHeartbeatHandler.unregister(attemptID);
   }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java?rev=1195146&r1=1195145&r2=1195146&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java
Sun Oct 30 11:33:34 2011
@@ -24,12 +24,35 @@ import org.apache.hadoop.mapred.Task;
 import org.apache.hadoop.mapred.WrappedJvmID;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 
+/**
+ * This class listens for changes to the state of a Task.
+ */
 public interface TaskAttemptListener {
 
   InetSocketAddress getAddress();
 
-  void register(TaskAttemptId attemptID, Task task, WrappedJvmID jvmID);
+  /**
+   * register a JVM with the listener.  This should be called as soon as a 
+   * JVM ID is assigned to a task attempt, before it has been launched.
+   * @param jvmID The ID of the JVM .
+   */
+  void registerPendingTask(WrappedJvmID jvmID);
+  
+  /**
+   * Register the task and task attempt with the JVM.  This should be called
+   * when the JVM has been launched.
+   * @param attemptID the id of the attempt for this JVM.
+   * @param task the task itself for this JVM.
+   * @param jvmID the id of the JVM handling the task.
+   */
+  void registerLaunchedTask(TaskAttemptId attemptID, Task task, WrappedJvmID jvmID);
 
+  /**
+   * Unregister the JVM and the attempt associated with it.  This should be 
+   * called when the attempt/JVM has finished executing and is being cleaned up.
+   * @param attemptID the ID of the attempt.
+   * @param jvmID the ID of the JVM for that attempt.
+   */
   void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID);
 
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1195146&r1=1195145&r2=1195146&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
Sun Oct 30 11:33:34 2011
@@ -1012,6 +1012,7 @@ public abstract class TaskAttemptImpl im
       taskAttempt.jvmID = new WrappedJvmID(
           taskAttempt.remoteTask.getTaskID().getJobID(), 
           taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId());
+      taskAttempt.taskAttemptListener.registerPendingTask(taskAttempt.jvmID);
       
       //launch the container
       //create the container object to be launched for a given Task attempt
@@ -1106,7 +1107,7 @@ public abstract class TaskAttemptImpl im
 
       // register it to TaskAttemptListener so that it start listening
       // for it
-      taskAttempt.taskAttemptListener.register(
+      taskAttempt.taskAttemptListener.registerLaunchedTask(
           taskAttempt.attemptId, taskAttempt.remoteTask, taskAttempt.jvmID);
       //TODO Resolve to host / IP in case of a local address.
       InetSocketAddress nodeHttpInetAddr =

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1195146&r1=1195145&r2=1195146&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
Sun Oct 30 11:33:34 2011
@@ -294,11 +294,14 @@ public class MRApp extends MRAppMaster {
         return null;
       }
       @Override
-      public void register(TaskAttemptId attemptID, 
+      public void registerLaunchedTask(TaskAttemptId attemptID, 
           org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {}
       @Override
       public void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID) {
       }
+      @Override
+      public void registerPendingTask(WrappedJvmID jvmID) {
+      }
     };
   }
 



Mime
View raw message