hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077780 - /hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Date Fri, 04 Mar 2011 04:54:40 GMT
Author: omalley
Date: Fri Mar  4 04:54:40 2011
New Revision: 1077780

URL: http://svn.apache.org/viewvc?rev=1077780&view=rev
Log:
commit c5df0eb7510a16e3cc6bf4b7a7d1081c59fa0e77
Author: Owen O'Malley <omalley@apache.org>
Date:   Tue Feb 1 09:42:55 2011 -0800

     Don't hold the rjob lock while localizing resources. (ddas
    via omalley)
    
    +++ b/YAHOO-CHANGES.txt
    +     Don't hold the rjob lock while localizing resources. (ddas
    +    via omalley)
    +
    +    . New cunters for FileInputFormat (BYTES_READ) and
    +    FileOutputFormat (BYTES_WRITTEN).
    +    New counter MAP_OUTPUT_MATERIALIZED_BYTES for compressed MapOutputSize.
    +    Related Bugs: 4241034, 3418543, 4217546 (sseth)

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077780&r1=1077779&r2=1077780&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Fri Mar  4 04:54:40 2011
@@ -344,6 +344,7 @@ public class TaskTracker implements MRCo
           while (true) {
             try {
               TaskTrackerAction action = tasksToCleanup.take();
+              checkJobStatusAndWait(action);
               if (action instanceof KillJobAction) {
                 purgeJob((KillJobAction) action);
               } else if (action instanceof KillTaskAction) {
@@ -367,6 +368,29 @@ public class TaskTracker implements MRCo
     LOG.info("Received KillTaskAction for task: " + killAction.getTaskID());
     purgeTask(tip, false);
   }
+  
+  private void checkJobStatusAndWait(TaskTrackerAction action) 
+  throws InterruptedException {
+    JobID jobId = null;
+    if (action instanceof KillJobAction) {
+      jobId = ((KillJobAction)action).getJobID();
+    } else if (action instanceof KillTaskAction) {
+      jobId = ((KillTaskAction)action).getTaskID().getJobID();
+    } else {
+      return;
+    }
+    RunningJob rjob = null;
+    synchronized (runningJobs) {
+      rjob = runningJobs.get(jobId);
+    }
+    if (rjob != null) {
+      synchronized (rjob) {
+        while (rjob.localizing) {
+          rjob.wait();
+        }
+      }
+    }
+  }
 
   public TaskController getTaskController() {
     return taskController;
@@ -951,8 +975,18 @@ public class TaskTracker implements MRCo
     JobID jobId = t.getJobID();
     RunningJob rjob = addTaskToJob(jobId, tip);
     InetSocketAddress ttAddr = getTaskTrackerReportAddress();
-
-    synchronized (rjob) {
+    try {
+      synchronized (rjob) {
+        if (!rjob.localized) {
+          while (rjob.localizing) {
+            rjob.wait();
+          }
+          if (!rjob.localized) {
+            //this thread is localizing the job
+            rjob.localizing = true;
+          }
+        }
+      }
       if (!rjob.localized) {
         Path localJobConfPath = initializeJob(t, rjob, ttAddr);
         JobConf localJobConf = new JobConf(localJobConfPath);
@@ -963,12 +997,21 @@ public class TaskTracker implements MRCo
         resetNumTasksPerJvm(localJobConf);
         //set the base jobconf path in rjob; all tasks will use
         //this as the base path when they run
-        rjob.localizedJobConf = localJobConfPath;
-        rjob.jobConf = localJobConf;  
-        rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
-                             localJobConf.getKeepFailedTaskFiles());
- 
-        rjob.localized = true;
+        synchronized (rjob) {
+          rjob.localizedJobConf = localJobConfPath;
+          rjob.jobConf = localJobConf;  
+          rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
+              localJobConf.getKeepFailedTaskFiles());
+
+          rjob.localized = true;
+        }
+      } 
+    } finally {
+      synchronized (rjob) {
+        if (rjob.localizing) {
+          rjob.localizing = false;
+          rjob.notifyAll();
+        }
       }
     }
     synchronized (runningJobs) {
@@ -1005,15 +1048,17 @@ public class TaskTracker implements MRCo
 
     // save local copy of JobToken file
     final String localJobTokenFile = localizeJobTokenFile(t.getUser(), jobId);
-    rjob.ugi = UserGroupInformation.createRemoteUser(t.getUser());
+    synchronized (rjob) {
+      rjob.ugi = UserGroupInformation.createRemoteUser(t.getUser());
 
-    Credentials ts = TokenCache.loadTokens(localJobTokenFile, conf);
-    Token<JobTokenIdentifier> jt = TokenCache.getJobToken(ts);
-    if (jt != null) { //could be null in the case of some unit tests
-      getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
-    }
-    for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
-      rjob.ugi.addToken(token);
+      Credentials ts = TokenCache.loadTokens(localJobTokenFile, conf);
+      Token<JobTokenIdentifier> jt = TokenCache.getJobToken(ts);
+      if (jt != null) { //could be null in the case of some unit tests
+        getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
+      }
+      for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
+        rjob.ugi.addToken(token);
+      }
     }
 
     FileSystem userFs = getFS(jobFile, jobId, conf);
@@ -2336,7 +2381,9 @@ public class TaskTracker implements MRCo
         }
         setTaskRunner(task.createRunner(TaskTracker.this, this, rjob));
         this.runner.start();
-        this.taskStatus.setStartTime(System.currentTimeMillis());
+        long now = System.currentTimeMillis();
+        this.taskStatus.setStartTime(now);
+        this.lastProgressReport = now;
       } else {
         LOG.info("Not launching task: " + task.getTaskID() + 
             " since it's state is " + this.taskStatus.getRunState());
@@ -3197,7 +3244,11 @@ public class TaskTracker implements MRCo
     private Path localizedJobConf;
     // keep this for later use
     volatile Set<TaskInProgress> tasks;
+    //the 'localizing' and 'localized' fields have the following
+    //state transitions (first entry is for 'localizing')
+    //{false,false} -> {true,false} -> {false,true}
     volatile boolean localized;
+    boolean localizing;
     boolean keepJobFiles;
     UserGroupInformation ugi;
     FetchStatus f;
@@ -3206,6 +3257,7 @@ public class TaskTracker implements MRCo
     RunningJob(JobID jobid) {
       this.jobid = jobid;
       localized = false;
+      localizing = false;
       tasks = new HashSet<TaskInProgress>();
       keepJobFiles = false;
     }



Mime
View raw message