hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r982353 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/JobInProgress.java src/java/org/apache/hadoop/mapred/JobTracker.java src/java/org/apache/hadoop/mapred/TaskTracker.java
Date Wed, 04 Aug 2010 18:35:24 GMT
Author: ddas
Date: Wed Aug  4 18:35:24 2010
New Revision: 982353

URL: http://svn.apache.org/viewvc?rev=982353&view=rev
Log:
MAPREDUCE-1900. TaskTracker and JobTracker closes FileSystems, opened on behalf of users that
it no longer requires. Contributed by Kan Zhang and Devaraj Das.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=982353&r1=982352&r2=982353&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Aug  4 18:35:24 2010
@@ -230,6 +230,9 @@ Trunk (unreleased changes)
     MAPREDUCE-1958. The MapReduce part corresponding to the HADOOP-6873.
     (Boris Shkolnik & Owen O'Malley via ddas)
 
+    MAPREDUCE-1900. TaskTracker and JobTracker closes FileSystems, opened on
+    behalf of users that it no longer requires. (Kan Zhang and ddas via ddas)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=982353&r1=982352&r2=982353&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed Aug  4
18:35:24 2010
@@ -392,105 +392,112 @@ public class JobInProgress {
                        JobInfo jobInfo,
                        Credentials ts
                       ) throws IOException, InterruptedException {
-    this.restartCount = rCount;
-    this.jobId = JobID.downgrade(jobInfo.getJobID());
-    String url = "http://" + jobtracker.getJobTrackerMachine() + ":" 
-        + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + this.jobId;
-    this.jobtracker = jobtracker;
-    this.jobHistory = jobtracker.getJobHistory();
-    this.startTime = System.currentTimeMillis();
-    
-    this.localFs = jobtracker.getLocalFileSystem();
-    this.tokenStorage = ts;
-    // use the user supplied token to add user credentials to the conf
-    jobSubmitDir = jobInfo.getJobSubmitDir();
-    user = jobInfo.getUser().toString();
-
-    UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
-    if (ts != null) {
-      for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
-        ugi.addToken(token);
-      }
+    try {
+      this.restartCount = rCount;
+      this.jobId = JobID.downgrade(jobInfo.getJobID());
+      String url = "http://" + jobtracker.getJobTrackerMachine() + ":"
+          + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + this.jobId;
+      this.jobtracker = jobtracker;
+      this.jobHistory = jobtracker.getJobHistory();
+      this.startTime = System.currentTimeMillis();
+
+      this.localFs = jobtracker.getLocalFileSystem();
+      this.tokenStorage = ts;
+      // use the user supplied token to add user credentials to the conf
+      jobSubmitDir = jobInfo.getJobSubmitDir();
+      user = jobInfo.getUser().toString();
+
+      UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
+      if (ts != null) {
+        for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
+          ugi.addToken(token);
+        }
+      }
+
+      fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+        public FileSystem run() throws IOException {
+          return jobSubmitDir.getFileSystem(default_conf);
+        }
+      });
+      this.localJobFile = default_conf.getLocalPath(JobTracker.SUBDIR + "/"
+          + this.jobId + ".xml");
+
+      jobFile = JobSubmissionFiles.getJobConfPath(jobSubmitDir);
+      fs.copyToLocalFile(jobFile, localJobFile);
+      conf = new JobConf(localJobFile);
+      if (conf.getUser() == null) {
+        this.conf.setUser(user);
+      }
+      if (!conf.getUser().equals(user)) {
+        String desc = "The username " + conf.getUser() + " obtained from the "
+            + "conf doesn't match the username " + user + " the user "
+            + "authenticated as";
+        AuditLogger.logFailure(user, Queue.QueueOperation.SUBMIT_JOB.name(),
+            conf.getUser(), jobId.toString(), desc);
+        throw new IOException(desc);
+      }
+      this.priority = conf.getJobPriority();
+      this.profile = new JobProfile(conf.getUser(), this.jobId, jobFile
+          .toString(), url, conf.getJobName(), conf.getQueueName());
+      this.status = new JobStatus(this.jobId, 0.0f, 0.0f, JobStatus.PREP,
+          profile.getUser(), profile.getJobName(), profile.getJobFile(),
+          profile.getURL().toString());
+      this.jobtracker.getInstrumentation().addPrepJob(conf, this.jobId);
+      status.setStartTime(startTime);
+      this.status.setJobPriority(this.priority);
+
+      this.numMapTasks = conf.getNumMapTasks();
+      this.numReduceTasks = conf.getNumReduceTasks();
+
+      this.memoryPerMap = conf.getMemoryForMapTask();
+      this.memoryPerReduce = conf.getMemoryForReduceTask();
+
+      this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>(
+          numMapTasks + numReduceTasks + 10);
+      JobContext jobContext = new JobContextImpl(conf, jobId);
+      this.jobSetupCleanupNeeded = jobContext.getJobSetupCleanupNeeded();
+
+      // Construct the jobACLs
+      status.setJobACLs(jobtracker.getJobACLsManager().constructJobACLs(conf));
+
+      this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
+      this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
+
+      this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
+
+      hasSpeculativeMaps = conf.getMapSpeculativeExecution();
+      hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
+      this.maxLevel = jobtracker.getNumTaskCacheLevels();
+      this.anyCacheLevel = this.maxLevel + 1;
+      this.nonLocalMaps = new LinkedList<TaskInProgress>();
+      this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
+      this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
+      this.nonRunningReduces = new LinkedList<TaskInProgress>();
+      this.runningReduces = new LinkedHashSet<TaskInProgress>();
+      this.resourceEstimator = new ResourceEstimator(this);
+      this.submitHostName = conf.getJobSubmitHostName();
+      this.submitHostAddress = conf.getJobSubmitHostAddress();
+
+      this.nonLocalMaps = new LinkedList<TaskInProgress>();
+      this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
+      this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
+      this.nonRunningReduces = new LinkedList<TaskInProgress>();
+      this.runningReduces = new LinkedHashSet<TaskInProgress>();
+      this.slowTaskThreshold = Math.max(0.0f, conf.getFloat(
+          MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD, 1.0f));
+      this.speculativeCap = conf.getFloat(MRJobConfig.SPECULATIVECAP, 0.1f);
+      this.slowNodeThreshold = conf.getFloat(
+          MRJobConfig.SPECULATIVE_SLOWNODE_THRESHOLD, 1.0f);
+      // register job's tokens for renewal
+      DelegationTokenRenewal.registerDelegationTokensForRenewal(jobInfo
+          .getJobID(), ts, jobtracker.getConf());
+    } finally {
+      // close all FileSystems that was created above for the current user
+      // At this point, this constructor is called in the context of an RPC, and
+      // hence the "current user" is actually referring to the kerberos
+      // authenticated user (if security is ON).
+      FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
     }
-
-    fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
-      public FileSystem run() throws IOException {
-        return jobSubmitDir.getFileSystem(default_conf);
-      }});
-    this.localJobFile = 
-      default_conf.getLocalPath(JobTracker.SUBDIR + "/" + this.jobId + ".xml");
-    
-    jobFile = JobSubmissionFiles.getJobConfPath(jobSubmitDir);
-    fs.copyToLocalFile(jobFile, localJobFile);
-    conf = new JobConf(localJobFile);
-    if (conf.getUser() == null) {
-      this.conf.setUser(user);
-    }
-    if (!conf.getUser().equals(user)) {
-      String desc = "The username " + conf.getUser() + " obtained from the " +
-                     "conf doesn't match the username " + user + " the user " +
-                                     "authenticated as";
-      AuditLogger.logFailure(user, Queue.QueueOperation.SUBMIT_JOB.name(),
-                             conf.getUser(), jobId.toString(), desc);
-      throw new IOException(desc);
-    }
-    this.priority = conf.getJobPriority();
-    this.profile = new JobProfile(conf.getUser(), this.jobId, 
-                                  jobFile.toString(), url, 
-                                  conf.getJobName(), conf.getQueueName());
-    this.status = new JobStatus(this.jobId, 0.0f, 0.0f, JobStatus.PREP, 
-        profile.getUser(), profile.getJobName(), profile.getJobFile(), 
-        profile.getURL().toString());
-    this.jobtracker.getInstrumentation().addPrepJob(conf, this.jobId);
-    status.setStartTime(startTime);
-    this.status.setJobPriority(this.priority);
-
-    this.numMapTasks = conf.getNumMapTasks();
-    this.numReduceTasks = conf.getNumReduceTasks();
-
-    this.memoryPerMap = conf.getMemoryForMapTask();
-    this.memoryPerReduce = conf.getMemoryForReduceTask();
-
-    this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
-       (numMapTasks + numReduceTasks + 10);
-    JobContext jobContext = new JobContextImpl(conf, jobId);
-    this.jobSetupCleanupNeeded = jobContext.getJobSetupCleanupNeeded();
-
-    // Construct the jobACLs
-    status.setJobACLs(jobtracker.getJobACLsManager().constructJobACLs(conf));
-
-    this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
-    this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
-
-    this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
-        
-    hasSpeculativeMaps = conf.getMapSpeculativeExecution();
-    hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
-    this.maxLevel = jobtracker.getNumTaskCacheLevels();
-    this.anyCacheLevel = this.maxLevel+1;
-    this.nonLocalMaps = new LinkedList<TaskInProgress>();
-    this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
-    this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
-    this.nonRunningReduces = new LinkedList<TaskInProgress>();    
-    this.runningReduces = new LinkedHashSet<TaskInProgress>();
-    this.resourceEstimator = new ResourceEstimator(this);
-    this.submitHostName = conf.getJobSubmitHostName();
-    this.submitHostAddress = conf.getJobSubmitHostAddress();
-    
-    this.nonLocalMaps = new LinkedList<TaskInProgress>();
-    this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
-    this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
-    this.nonRunningReduces = new LinkedList<TaskInProgress>();    
-    this.runningReduces = new LinkedHashSet<TaskInProgress>();
-    this.slowTaskThreshold = Math.max(0.0f,
-        conf.getFloat(MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD,1.0f));
-    this.speculativeCap = conf.getFloat(
-        MRJobConfig.SPECULATIVECAP,0.1f);
-    this.slowNodeThreshold = conf.getFloat(
-        MRJobConfig.SPECULATIVE_SLOWNODE_THRESHOLD,1.0f); 
-    // register job's tokens for renewal
-    DelegationTokenRenewal.registerDelegationTokensForRenewal(
-        jobInfo.getJobID(), ts, this.conf);
   }
     
   private void printCache (Map<Node, List<TaskInProgress>> cache) {
@@ -3702,7 +3709,7 @@ public class JobInProgress {
     TokenCache.setJobToken(token, tokenStorage);
     
     // write TokenStorage out
-    tokenStorage.writeTokenStorageFile(keysFile, conf);
+    tokenStorage.writeTokenStorageFile(keysFile, jobtracker.getConf());
     LOG.info("jobToken generated and stored with users keys in "
         + keysFile.toUri().getPath());
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=982353&r1=982352&r2=982353&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Wed Aug  4 18:35:24
2010
@@ -1326,6 +1326,7 @@ public class JobTracker implements MRCon
   // Some jobs are stored in a local system directory.  We can delete
   // the files when we're done with the job.
   static final String SUBDIR = "jobTracker";
+  final LocalFileSystem localFs;
   FileSystem fs = null;
   Path systemDir = null;
   JobConf conf;
@@ -1356,6 +1357,7 @@ public class JobTracker implements MRCon
     myInstrumentation = new JobTrackerMetricsInst(this, new JobConf());
     mrOwner = null;
     secretManager = null;
+    localFs = null;
   }
 
   
@@ -1514,6 +1516,7 @@ public class JobTracker implements MRCon
     // ... ensure we have the correct info
     this.port = interTrackerServer.getListenerAddress().getPort();
     this.conf.set(JT_IPC_ADDRESS, (this.localMachine + ":" + this.port));
+    this.localFs = FileSystem.getLocal(conf);
     LOG.info("JobTracker up at: " + this.port);
     this.infoPort = this.infoServer.getPort();
     this.conf.set(JT_HTTP_ADDRESS, 
@@ -1679,7 +1682,7 @@ public class JobTracker implements MRCon
    * localizing job files to the local disk.
    */
   LocalFileSystem getLocalFileSystem() throws IOException {
-    return FileSystem.getLocal(conf);
+    return localFs;
   }
 
   TaskScheduler getScheduler() {
@@ -2940,7 +2943,13 @@ public class JobTracker implements MRCon
     return fs.getUri().toString();
   }
 
-
+  /**
+   * Returns a handle to the JobTracker's Configuration
+   */
+  public JobConf getConf() {
+    return conf;
+  }
+  
   public void reportTaskTrackerError(String taskTracker,
                                      String errorClass,
                                      String errorMessage) throws IOException {
@@ -4572,6 +4581,7 @@ public class JobTracker implements MRCon
     if (fs == null) {
       fs = FileSystem.get(conf);
     }
+    this.localFs = FileSystem.getLocal(conf);
     
     tasktrackerExpiryInterval = 
       conf.getLong("mapred.tasktracker.expiry.interval", 10 * 60 * 1000);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=982353&r1=982352&r2=982353&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Aug  4 18:35:24
2010
@@ -1914,6 +1914,13 @@ public class TaskTracker 
 
         // Remove this job 
         rjob.tasks.clear();
+        // Close all FileSystems for this job
+        try {
+          FileSystem.closeAllForUGI(rjob.getUGI());
+        } catch (IOException ie) {
+          LOG.warn("Ignoring exception " + StringUtils.stringifyException(ie) + 
+              " while closing FileSystem for " + rjob.getUGI());
+        }
       }
     }
 



Mime
View raw message