hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1407718 - in /hadoop/common/branches/branch-1.1: CHANGES.txt src/mapred/org/apache/hadoop/mapred/TaskTracker.java src/test/org/apache/hadoop/mapred/TestTaskTrackerActionCleanup.java
Date Sat, 10 Nov 2012 02:36:28 GMT
Author: vinodkv
Date: Sat Nov 10 02:36:26 2012
New Revision: 1407718

URL: http://svn.apache.org/viewvc?rev=1407718&view=rev
Log:
MAPREDUCE-4749. Fixed a bug in TaskTracker because of which kill-actions get delayed progressively.
Contributed by Arpit Gupta.
svn merge --ignore-ancestry -c 1407717 ../branch-1

Added:
    hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestTaskTrackerActionCleanup.java
      - copied unchanged from r1407717, hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestTaskTrackerActionCleanup.java
Modified:
    hadoop/common/branches/branch-1.1/CHANGES.txt
    hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/common/branches/branch-1.1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/CHANGES.txt?rev=1407718&r1=1407717&r2=1407718&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.1/CHANGES.txt Sat Nov 10 02:36:26 2012
@@ -39,6 +39,9 @@ Release 1.1.1 - Unreleased
     MAPREDUCE-4782. NLineInputFormat skips first line of last InputSplit
     (Mark Fuhs via bobby)
 
+    MAPREDUCE-4749. Fixed a bug in TaskTracker because of which kill-actions get
+    delayed progressively. (Arpit Gupta via vinodkv)
+
 Release 1.1.0 - 2012.09.28
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1407718&r1=1407717&r2=1407718&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++ hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Sat Nov 10 02:36:26 2012
@@ -44,6 +44,7 @@ import java.util.TreeMap;
 import java.util.Vector;
 import java.util.Map.Entry;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
@@ -435,50 +436,105 @@ public class TaskTracker implements MRCo
   }
   
   /**
-   * A list of tips that should be cleaned up.
+   * A list of clean-up actions
    */
-  private BlockingQueue<TaskTrackerAction> tasksToCleanup = 
-    new LinkedBlockingQueue<TaskTrackerAction>();
-    
+  // No ConcurrentHashSet :(
+  ConcurrentHashMap<String, String> allCleanupActions = 
+      new ConcurrentHashMap<String, String>();
+  BlockingQueue<TaskTrackerAction> activeCleanupActions = 
+      new LinkedBlockingQueue<TaskTrackerAction>();
+  List<TaskTrackerAction> inactiveCleanupActions = 
+      new ArrayList<TaskTrackerAction>();
+
+  /*
+   * Add action to clean up queue. Check to make sure action is not already
+   * present before adding it to the queue.
+   */
+  void addActionToCleanup(TaskTrackerAction action) throws InterruptedException {
+
+    String actionId = getIdForCleanUpAction(action);
+
+    // add the action to the queue only if its not added in the first place
+    String previousActionId = allCleanupActions.putIfAbsent(actionId, actionId);
+    if (previousActionId != null) {
+      return;
+    } else {
+      activeCleanupActions.put(action);
+    }
+  }
+
+  /*
+   * Get the id for a given clean up action. It is expected to be either a
+   * KillJobAction or KillTaskAction
+   */
+  private String getIdForCleanUpAction(TaskTrackerAction action) {
+    String actionId = null;
+    // get the id of the action
+    if (action instanceof KillJobAction) {
+      actionId = ((KillJobAction) action).getJobID().toString();
+    } else if (action instanceof KillTaskAction) {
+      actionId = ((KillTaskAction) action).getTaskID().toString();
+    }
+    // Assuming actionId is not null as all clean-up actions are either KillJob
+    // or KillTask
+    return actionId;
+  }
+
   /**
    * A daemon-thread that pulls tips off the list of things to cleanup.
    */
-  private Thread taskCleanupThread = 
-    new Thread(new Runnable() {
-        public void run() {
-          while (true) {
-            try {
-              TaskTrackerAction action = tasksToCleanup.take();
-              if( !checkJobStatusAndWait(action) ) {
-                //If job is still localizing, put it back into the queue and
-                //pick another one in the next iteration
-                StringBuffer sb = new StringBuffer("Cleanup for job ");
-                if (action instanceof KillJobAction) {
-                  sb.append(((KillJobAction)action).getJobID());
-                } else if (action instanceof KillTaskAction) {
-                  sb.append(((KillTaskAction)action).getTaskID().getJobID() +
-                    ", task " + ((KillTaskAction)action).getTaskID());
-                }
-                sb.append(" was postponed after waiting for 5 seconds.");
-                LOG.info(sb);
-                tasksToCleanup.put(action);
-                continue;
-              }
-              if (action instanceof KillJobAction) {
-                purgeJob((KillJobAction) action);
-              } else if (action instanceof KillTaskAction) {
-                processKillTaskAction((KillTaskAction) action);
-              } else {
-                LOG.error("Non-delete action given to cleanup thread: "
-                          + action);
-              }
-            } catch (Throwable except) {
-              LOG.warn(StringUtils.stringifyException(except));
-            }
-          }
+  private Thread taskCleanupThread = new Thread(new Runnable() {
+    public void run() {
+      while (true) {
+        try {
+          taskCleanUp();
+        } catch (Throwable except) {
+          LOG.warn(StringUtils.stringifyException(except));
         }
-      }, "taskCleanup");
+      }
+    }
+  }, "taskCleanup");
+
+  void taskCleanUp() throws InterruptedException, IOException {
+    // process all the localizing tasks and move to the clean up queue
+    // if the job is not localizing.
+    Iterator<TaskTrackerAction> itr = inactiveCleanupActions.iterator();
+    while (itr.hasNext()) {
+      TaskTrackerAction action = itr.next();
+      if (!isJobLocalizing(action)) {
+        activeCleanupActions.put(action);
+        itr.remove();
+      }
+    }
+
+    // if the tasks to clean is empty better sleep for 2 seconds and
+    // re process
+    if (activeCleanupActions.isEmpty()) {
+      Thread.sleep(2000);
+      return;
+    }
 
+    TaskTrackerAction action = activeCleanupActions.take();
+    String actionId = getIdForCleanUpAction(action);
+    if (isJobLocalizing(action)) {
+      // If job is still localizing, put it back into the queue and
+      // pick another one in the next iteration
+      LOG.info("Cleanup for id " + actionId + " skipped as its localizing.");
+      inactiveCleanupActions.add(action);
+      return;
+    } else {
+      // remove the action from the hash as its being processed.
+      allCleanupActions.remove(actionId);
+    }
+    if (action instanceof KillJobAction) {
+      purgeJob((KillJobAction) action);
+    } else if (action instanceof KillTaskAction) {
+      processKillTaskAction((KillTaskAction) action);
+    } else {
+      LOG.error("Non-delete action given to cleanup thread: " + action);
+    }
+  }
+  
   void processKillTaskAction(KillTaskAction killAction) throws IOException {
     TaskInProgress tip;
     synchronized (TaskTracker.this) {
@@ -489,21 +545,19 @@ public class TaskTracker implements MRCo
   }
 
   /**
-   * Wait until the job has completed localizing if it was doing so.
+   * Check if the job for the given action is localizing or not.
    * @param action The command received from the JobTracker
-   * @return true if the wait was successful and the task is not localizing
-   * anymore. false if after 5 seconds, the job was still localizing
-   * @throws InterruptedException
+   * @return true if job is localizing and false otherwise.
    */
-  private boolean checkJobStatusAndWait(TaskTrackerAction action)
-  throws InterruptedException {
+  private boolean isJobLocalizing(TaskTrackerAction action)
+  {
     JobID jobId = null;
     if (action instanceof KillJobAction) {
       jobId = ((KillJobAction)action).getJobID();
     } else if (action instanceof KillTaskAction) {
       jobId = ((KillTaskAction)action).getTaskID().getJobID();
     } else {
-      return true;
+      return false;
     }
     RunningJob rjob = null;
     synchronized (runningJobs) {
@@ -511,11 +565,10 @@ public class TaskTracker implements MRCo
     }
     if (rjob != null) {
       synchronized (rjob) {
-        rjob.wait(5000);
-        return !rjob.localizing;
+        return rjob.localizing;
       }
     }
-    return true;
+    return false;
   }
 
   public TaskController getTaskController() {
@@ -1797,7 +1850,7 @@ public class TaskTracker implements MRCo
                 commitResponses.add(commitAction.getTaskID());
               }
             } else {
-              tasksToCleanup.put(action);
+              addActionToCleanup(action);
             }
           }
         }
@@ -3819,7 +3872,7 @@ public class TaskTracker implements MRCo
    * @return has this task tracker finished and cleaned up all of its tasks?
    */
   public synchronized boolean isIdleAndClean() {
-    return tasks.isEmpty() && tasksToCleanup.isEmpty();
+    return tasks.isEmpty() && allCleanupActions.isEmpty();
   }
 
   /**



Mime
View raw message