hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077107 - in /hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred: CleanupQueue.java JobInProgress.java JobTracker.java TaskTracker.java
Date Fri, 04 Mar 2011 03:41:15 GMT
Author: omalley
Date: Fri Mar  4 03:41:15 2011
New Revision: 1077107

URL: http://svn.apache.org/viewvc?rev=1077107&view=rev
Log:
commit 22f13bff8c3eb13e7f306a6e97c981b2fd4780e6
Author: Hemanth Yamijala <yhemanth@yahoo-inc.com>
Date:   Tue Jan 12 22:14:37 2010 +0530

    HADOOP:5737 from https://issues.apache.org/jira/secure/attachment/12430029/HADOOP-5737-y20.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    HADOOP-5737. Fixes a problem in the way the JobTracker used to talk to
    +    other daemons like the NameNode to get the job's files. Also adds APIs
    +    in the JobTracker to get the FileSystem objects as per the JobTracker's
    +    configuration. (Amar Kamat via ddas)
    +

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java?rev=1077107&r1=1077106&r2=1077107&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
Fri Mar  4 03:41:15 2011
@@ -39,7 +39,7 @@ class CleanupQueue {
    * paths(directories/files) in a separate thread. This constructor creates a
    * clean-up thread and also starts it as a daemon. Callers can instantiate one
    * CleanupQueue per JVM and can use it for deleting paths. Use
-   * {@link CleanupQueue#addToQueue(JobConf, Path...)} to add paths for
+   * {@link CleanupQueue#addToQueue(FileSystem, Path...)} to add paths for
    * deletion.
    */
   public CleanupQueue() {
@@ -50,22 +50,22 @@ class CleanupQueue {
     }
   }
   
-  public void addToQueue(JobConf conf, Path...paths) {
-    cleanupThread.addToQueue(conf,paths);
+  public void addToQueue(FileSystem fs, Path...paths) {
+    cleanupThread.addToQueue(fs, paths);
   }
 
   private static class PathCleanupThread extends Thread {
 
-    static class PathAndConf {
-      JobConf conf;
+    static class PathAndFS {
+      FileSystem fs;
       Path path;
-      PathAndConf(JobConf conf, Path path) {
-        this.conf = conf;
+      PathAndFS(FileSystem fs, Path path) {
+        this.fs = fs;
         this.path = path;
       }
     }
     // cleanup queue which deletes files/directories of the paths queued up.
-    private LinkedBlockingQueue<PathAndConf> queue = new LinkedBlockingQueue<PathAndConf>();
+    private LinkedBlockingQueue<PathAndFS> queue = new LinkedBlockingQueue<PathAndFS>();
 
     public PathCleanupThread() {
       setName("Directory/File cleanup thread");
@@ -73,28 +73,27 @@ class CleanupQueue {
       start();
     }
 
-    public void addToQueue(JobConf conf,Path... paths) {
+    public void addToQueue(FileSystem fs, Path... paths) {
       for (Path p : paths) {
         try {
-          queue.put(new PathAndConf(conf,p));
+          queue.put(new PathAndFS(fs, p));
         } catch (InterruptedException ie) {}
       }
     }
 
     public void run() {
       LOG.debug(getName() + " started.");
-      PathAndConf pathAndConf = null;
+      PathAndFS pathAndFS = null;
       while (true) {
         try {
-          pathAndConf = queue.take();
+          pathAndFS = queue.take();
           // delete the path.
-          FileSystem fs = pathAndConf.path.getFileSystem(pathAndConf.conf);
-          fs.delete(pathAndConf.path, true);
-          LOG.debug("DELETED " + pathAndConf.path);
+          pathAndFS.fs.delete(pathAndFS.path, true);
+          LOG.debug("DELETED " + pathAndFS.path);
         } catch (InterruptedException t) {
           return;
         } catch (Exception e) {
-          LOG.warn("Error deleting path" + pathAndConf.path);
+          LOG.warn("Error deleting path" + pathAndFS.path);
         } 
       }
     }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077107&r1=1077106&r2=1077107&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Fri Mar  4 03:41:15 2011
@@ -193,6 +193,7 @@ class JobInProgress {
   private JobInitKillStatus jobInitKillStatus = new JobInitKillStatus();
 
   private LocalFileSystem localFs;
+  private FileSystem fs;
   private JobID jobId;
   private boolean hasSpeculativeMaps;
   private boolean hasSpeculativeReduces;
@@ -300,7 +301,7 @@ class JobInProgress {
     this.jobtracker.getInstrumentation().addPrepJob(conf, jobid);
     this.startTime = System.currentTimeMillis();
     status.setStartTime(startTime);
-    this.localFs = FileSystem.getLocal(default_conf);
+    this.localFs = jobtracker.getLocalFileSystem();
 
     JobConf default_job_conf = new JobConf(default_conf);
     this.localJobFile = default_job_conf.getLocalPath(JobTracker.SUBDIR 
@@ -314,7 +315,7 @@ class JobInProgress {
     LOG.info("User : " +  this.user);
 
     Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
-    FileSystem fs = jobDir.getFileSystem(default_conf);
+    fs = jobtracker.getFileSystem(jobDir);
     jobFile = new Path(jobDir, "job.xml");
     fs.copyToLocalFile(jobFile, localJobFile);
     conf = new JobConf(localJobFile);
@@ -499,8 +500,6 @@ class JobInProgress {
     // log the job priority
     setPriority(this.priority);
     
-    Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
-    FileSystem fs = jobDir.getFileSystem(conf);
     //
     // generate security keys needed by Tasks
     //
@@ -2856,7 +2855,7 @@ class JobInProgress {
       // Delete temp dfs dirs created if any, like in case of 
       // speculative exn of reduces.  
       Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
-      new CleanupQueue().addToQueue(conf,tempDir); 
+      new CleanupQueue().addToQueue(jobtracker.getFileSystem(tempDir), tempDir); 
     } catch (IOException e) {
       LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
     }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077107&r1=1077106&r2=1077107&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Fri Mar  4 03:41:15 2011
@@ -63,6 +63,7 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -2194,6 +2195,29 @@ public class JobTracker implements MRCon
            : 0;
   }
 
+  /**
+   * Get JobTracker's FileSystem. This is the filesystem for mapred.system.dir.
+   */
+  FileSystem getFileSystem() {
+    return fs;
+  }
+
+  /**
+   * Get the FileSystem for the given path. This can be used to resolve
+   * filesystem for job history, local job files or mapred.system.dir path.
+   */
+  FileSystem getFileSystem(Path path) throws IOException {
+    return path.getFileSystem(conf);
+  }
+
+  /**
+   * Get JobTracker's LocalFileSystem handle. This is used by jobs for 
+   * localizing job files to the local disk.
+   */
+  LocalFileSystem getLocalFileSystem() throws IOException {
+    return FileSystem.getLocal(conf);
+  }
+
   public static Class<? extends JobTrackerInstrumentation> getInstrumentationClass(Configuration
conf) {
     return conf.getClass("mapred.jobtracker.instrumentation",
         JobTrackerMetricsInst.class, JobTrackerInstrumentation.class);
@@ -3544,7 +3568,7 @@ public class JobTracker implements MRCon
     
     String queue = job.getProfile().getQueueName();
     if(!(queueManager.getQueues().contains(queue))) {      
-      new CleanupQueue().addToQueue(conf,getSystemDirectoryForJob(jobId));
+      new CleanupQueue().addToQueue(fs,getSystemDirectoryForJob(jobId));
       job.fail();
       if (userFileForJob != null) {
         userFileForJob.delete();
@@ -3562,7 +3586,7 @@ public class JobTracker implements MRCon
       if (userFileForJob != null) {
         userFileForJob.delete();
       }
-      new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
+      new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));
       throw ioe;
     }
 
@@ -3571,7 +3595,7 @@ public class JobTracker implements MRCon
     try {
       checkMemoryRequirements(job);
     } catch (IOException ioe) {
-      new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
+      new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));
       throw ioe;
     }
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077107&r1=1077106&r2=1077107&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Fri Mar  4 03:41:15 2011
@@ -179,7 +179,7 @@ public class TaskTracker 
   
   // The filesystem where job files are stored
   FileSystem systemFS = null;
-  
+  private FileSystem localFs = null;
   private final HttpServer server;
     
   volatile boolean shuttingDown = false;
@@ -500,6 +500,7 @@ public class TaskTracker 
   synchronized void initialize() throws IOException {
     // use configured nameserver & interface to get local hostname
     this.fConf = new JobConf(originalConf);
+    localFs = FileSystem.getLocal(fConf);
     if (fConf.get("slave.host.name") != null) {
       this.localHostname = fConf.get("slave.host.name");
     }
@@ -1511,7 +1512,7 @@ public class TaskTracker 
         // Delete the job directory for this  
         // task if the job is done/failed
         if (!rjob.keepJobFiles){
-          directoryCleanupThread.addToQueue(fConf, getLocalFiles(fConf, 
+          directoryCleanupThread.addToQueue(localFs, getLocalFiles(fConf, 
             getLocalJobDir(rjob.getJobID().toString())));
         }
         // Remove this job 
@@ -2629,19 +2630,19 @@ public class TaskTracker 
             //might be using the dir. The JVM running the tasks would clean
             //the workdir per a task in the task process itself.
             if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-              directoryCleanupThread.addToQueue(defaultJobConf,
+              directoryCleanupThread.addToQueue(localFs,
                   getLocalFiles(defaultJobConf,
                   taskDir));
             }  
             
             else {
-              directoryCleanupThread.addToQueue(defaultJobConf,
+              directoryCleanupThread.addToQueue(localFs,
                   getLocalFiles(defaultJobConf,
                 taskDir+"/job.xml"));
             }
           } else {
             if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-              directoryCleanupThread.addToQueue(defaultJobConf,
+              directoryCleanupThread.addToQueue(localFs,
                   getLocalFiles(defaultJobConf,
                   taskDir+"/work"));
             }  



Mime
View raw message