hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r773829 - in /hadoop/core/trunk: ./ src/mapred/org/apache/hadoop/mapred/
Date Tue, 12 May 2009 09:01:45 GMT
Author: ddas
Date: Tue May 12 09:01:45 2009
New Revision: 773829

URL: http://svn.apache.org/viewvc?rev=773829&view=rev
Log:
HADOOP-5737. Fixes a problem in the way the JobTracker used to talk to other daemons like
the NameNode to get the job's files. Also adds APIs in the JobTracker to get the FileSystem
objects as per the JobTracker's configuration. Contributed by Amar Kamat.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=773829&r1=773828&r2=773829&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue May 12 09:01:45 2009
@@ -552,6 +552,11 @@
     HADOOP-5675. Do not launch a job if DistCp has no work to do. (Tsz Wo
     (Nicholas), SZE via cdouglas)
 
+    HADOOP-5737. Fixes a problem in the way the JobTracker used to talk to
+    other daemons like the NameNode to get the job's files. Also adds APIs
+    in the JobTracker to get the FileSystem objects as per the JobTracker's
+    configuration. (Amar Kamat via ddas) 
+
 Release 0.20.1 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java?rev=773829&r1=773828&r2=773829&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java Tue May 12 09:01:45
2009
@@ -39,7 +39,7 @@
    * paths(directories/files) in a separate thread. This constructor creates a
    * clean-up thread and also starts it as a daemon. Callers can instantiate one
    * CleanupQueue per JVM and can use it for deleting paths. Use
-   * {@link CleanupQueue#addToQueue(JobConf, Path...)} to add paths for
+   * {@link CleanupQueue#addToQueue(FileSystem, Path...)} to add paths for
    * deletion.
    */
   public CleanupQueue() {
@@ -50,22 +50,22 @@
     }
   }
   
-  public void addToQueue(JobConf conf, Path...paths) {
-    cleanupThread.addToQueue(conf,paths);
+  public void addToQueue(FileSystem fs, Path...paths) {
+    cleanupThread.addToQueue(fs, paths);
   }
 
   private static class PathCleanupThread extends Thread {
 
-    static class PathAndConf {
-      JobConf conf;
+    static class PathAndFS {
+      FileSystem fs;
       Path path;
-      PathAndConf(JobConf conf, Path path) {
-        this.conf = conf;
+      PathAndFS(FileSystem fs, Path path) {
+        this.fs = fs;
         this.path = path;
       }
     }
     // cleanup queue which deletes files/directories of the paths queued up.
-    private LinkedBlockingQueue<PathAndConf> queue = new LinkedBlockingQueue<PathAndConf>();
+    private LinkedBlockingQueue<PathAndFS> queue = new LinkedBlockingQueue<PathAndFS>();
 
     public PathCleanupThread() {
       setName("Directory/File cleanup thread");
@@ -73,28 +73,27 @@
       start();
     }
 
-    public void addToQueue(JobConf conf,Path... paths) {
+    public void addToQueue(FileSystem fs, Path... paths) {
       for (Path p : paths) {
         try {
-          queue.put(new PathAndConf(conf,p));
+          queue.put(new PathAndFS(fs, p));
         } catch (InterruptedException ie) {}
       }
     }
 
     public void run() {
       LOG.debug(getName() + " started.");
-      PathAndConf pathAndConf = null;
+      PathAndFS pathAndFS = null;
       while (true) {
         try {
-          pathAndConf = queue.take();
+          pathAndFS = queue.take();
           // delete the path.
-          FileSystem fs = pathAndConf.path.getFileSystem(pathAndConf.conf);
-          fs.delete(pathAndConf.path, true);
-          LOG.debug("DELETED " + pathAndConf.path);
+          pathAndFS.fs.delete(pathAndFS.path, true);
+          LOG.debug("DELETED " + pathAndFS.path);
         } catch (InterruptedException t) {
           return;
         } catch (Exception e) {
-          LOG.warn("Error deleting path" + pathAndConf.path);
+          LOG.warn("Error deleting path" + pathAndFS.path);
         } 
       }
     }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=773829&r1=773828&r2=773829&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java Tue May 12 09:01:45
2009
@@ -102,7 +102,7 @@
     FsPermission.createImmutable((short) 0750); // rwxr-x---
   final static FsPermission HISTORY_FILE_PERMISSION =
     FsPermission.createImmutable((short) 0740); // rwxr-----
-  private static JobConf jtConf;
+  private static FileSystem JT_FS; // jobtracker's filesystem
   /**
    * Record types are identifiers for each line of log in history files. 
    * A record type appears as the first token in a single line of log. 
@@ -141,9 +141,25 @@
    * @param jobTrackerStartTime jobtracker's start time
    * @return true if intialized properly
    *         false otherwise
+   * @deprecated Use {@link #init(JobConf, String, long, FileSystem)} instead.
    */
   public static boolean init(JobConf conf, String hostname, 
                               long jobTrackerStartTime){
+    return init(conf, hostname, jobTrackerStartTime, null);
+  }
+
+  /**
+   * Initialize JobHistory files. 
+   * @param conf Jobconf of the job tracker.
+   * @param hostname jobtracker's hostname
+   * @param jobTrackerStartTime jobtracker's start time
+   * @param fs JobTracker's filesystem
+   * @return true if intialized properly
+   *         false otherwise
+   */
+  public static boolean init(JobConf conf, String hostname,
+                             long jobTrackerStartTime,
+                             FileSystem fs){
     try {
       LOG_DIR = conf.get("hadoop.job.history.location" ,
         "file:///" + new File(
@@ -153,9 +169,13 @@
                                     String.valueOf(jobTrackerStartTime) + "_";
       jobtrackerHostname = hostname;
       Path logDir = new Path(LOG_DIR);
-      FileSystem fs = logDir.getFileSystem(conf);
-      if (!fs.exists(logDir)){
-        if (!fs.mkdirs(logDir, new FsPermission(HISTORY_DIR_PERMISSION))) {
+      if (fs == null) {
+        JT_FS = logDir.getFileSystem(conf);
+      } else {
+        JT_FS = fs;
+      }
+      if (!JT_FS.exists(logDir)){
+        if (!JT_FS.mkdirs(logDir, new FsPermission(HISTORY_DIR_PERMISSION))) {
           throw new IOException("Mkdirs failed to create " + logDir.toString());
         }
       }
@@ -165,7 +185,6 @@
       jobHistoryBlockSize = 
         conf.getLong("mapred.jobtracker.job.history.block.size", 
                      3 * 1024 * 1024);
-      jtConf = conf;
     } catch(IOException e) {
         LOG.error("Failed to initialize JobHistory log file", e); 
         disableHistory = true;
@@ -635,7 +654,6 @@
       String user = getUserName(jobConf);
       String jobName = trimJobName(getJobName(jobConf));
       
-      FileSystem fs = new Path(LOG_DIR).getFileSystem(jobConf);
       if (LOG_DIR == null) {
         return null;
       }
@@ -664,7 +682,7 @@
         }
       };
       
-      FileStatus[] statuses = fs.listStatus(new Path(LOG_DIR), filter);
+      FileStatus[] statuses = JT_FS.listStatus(new Path(LOG_DIR), filter);
       String filename = null;
       if (statuses.length == 0) {
         LOG.info("Nothing to recover for job " + id);
@@ -696,9 +714,8 @@
     throws IOException {
       Path logPath = JobHistory.JobInfo.getJobHistoryLogLocation(fileName);
       if (logPath != null) {
-        FileSystem fs = logPath.getFileSystem(conf);
         LOG.info("Deleting job history file " + logPath.getName());
-        fs.delete(logPath, false);
+        JT_FS.delete(logPath, false);
       }
       // do the same for the user file too
       logPath = JobHistory.JobInfo.getJobHistoryLogLocationForUser(fileName, 
@@ -726,25 +743,24 @@
                                                           Path logFilePath) 
     throws IOException {
       Path ret;
-      FileSystem fs = logFilePath.getFileSystem(conf);
       String logFileName = logFilePath.getName();
       String tmpFilename = getSecondaryJobHistoryFile(logFileName);
       Path logDir = logFilePath.getParent();
       Path tmpFilePath = new Path(logDir, tmpFilename);
-      if (fs.exists(logFilePath)) {
+      if (JT_FS.exists(logFilePath)) {
         LOG.info(logFileName + " exists!");
-        if (fs.exists(tmpFilePath)) {
+        if (JT_FS.exists(tmpFilePath)) {
           LOG.info("Deleting " + tmpFilename 
                    + "  and using " + logFileName + " for recovery.");
-          fs.delete(tmpFilePath, false);
+          JT_FS.delete(tmpFilePath, false);
         }
         ret = tmpFilePath;
       } else {
         LOG.info(logFileName + " doesnt exist! Using " 
                  + tmpFilename + " for recovery.");
-        if (fs.exists(tmpFilePath)) {
+        if (JT_FS.exists(tmpFilePath)) {
           LOG.info("Renaming " + tmpFilename + " to " + logFileName);
-          fs.rename(tmpFilePath, logFilePath);
+          JT_FS.rename(tmpFilePath, logFilePath);
           ret = tmpFilePath;
         } else {
           ret = logFilePath;
@@ -754,7 +770,7 @@
       // do the same for the user files too
       logFilePath = getJobHistoryLogLocationForUser(logFileName, conf);
       if (logFilePath != null) {
-        fs = logFilePath.getFileSystem(conf);
+        FileSystem fs = logFilePath.getFileSystem(conf);
         logDir = logFilePath.getParent();
         tmpFilePath = new Path(logDir, tmpFilename);
         if (fs.exists(logFilePath)) {
@@ -797,13 +813,12 @@
       Path tmpLogPath = 
         JobHistory.JobInfo.getJobHistoryLogLocation(tmpLogFileName);
       if (masterLogPath != null) {
-        FileSystem fs = masterLogPath.getFileSystem(conf);
 
         // rename the tmp file to the master file. Note that this should be 
         // done only when the file is closed and handles are released.
-        if(fs.exists(tmpLogPath)) {
+        if(JT_FS.exists(tmpLogPath)) {
           LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName);
-          fs.rename(tmpLogPath, masterLogPath);
+          JT_FS.rename(tmpLogPath, masterLogPath);
         }
       }
       
@@ -878,22 +893,20 @@
           FSDataOutputStream out = null;
           PrintWriter writer = null;
 
-          if (LOG_DIR != null) {
-            // create output stream for logging in hadoop.job.history.location
-            fs = new Path(LOG_DIR).getFileSystem(jobConf);
-            
+          if (LOG_DIR != null) {           
+          // create output stream for logging in hadoop.job.history.location 
             if (restarted) {
               logFile = recoverJobHistoryFile(jobConf, logFile);
               logFileName = logFile.getName();
             }
             
             int defaultBufferSize = 
-              fs.getConf().getInt("io.file.buffer.size", 4096);
-            out = fs.create(logFile, 
+              JT_FS.getConf().getInt("io.file.buffer.size", 4096);
+            out = JT_FS.create(logFile, 
                             new FsPermission(HISTORY_FILE_PERMISSION),
                             true, 
                             defaultBufferSize, 
-                            fs.getDefaultReplication(), 
+                            JT_FS.getDefaultReplication(), 
                             jobHistoryBlockSize, null);
             writer = new PrintWriter(out);
             writers.add(writer);
@@ -968,22 +981,20 @@
       FSDataOutputStream jobFileOut = null;
       try {
         if (LOG_DIR != null) {
-          fs = new Path(LOG_DIR).getFileSystem(jobConf);
           int defaultBufferSize = 
-              fs.getConf().getInt("io.file.buffer.size", 4096);
-          if (!fs.exists(jobFilePath)) {
-            jobFileOut = fs.create(jobFilePath, 
+              JT_FS.getConf().getInt("io.file.buffer.size", 4096);
+          if (!JT_FS.exists(jobFilePath)) {
+            jobFileOut = JT_FS.create(jobFilePath, 
                                    new FsPermission(HISTORY_FILE_PERMISSION),
                                    true, 
                                    defaultBufferSize, 
-                                   fs.getDefaultReplication(), 
-                                   fs.getDefaultBlockSize(), null);
+                                   JT_FS.getDefaultReplication(), 
+                                   JT_FS.getDefaultBlockSize(), null);
             jobConf.writeXml(jobFileOut);
             jobFileOut.close();
           }
         } 
         if (userLogDir != null) {
-          fs = new Path(userLogDir).getFileSystem(jobConf);
           jobFileOut = fs.create(userJobFilePath);
           jobConf.writeXml(jobFileOut);
         }
@@ -1775,13 +1786,12 @@
       isRunning = true; 
       try {
         Path logDir = new Path(LOG_DIR);
-        FileSystem fs = logDir.getFileSystem(jtConf);
-        FileStatus[] historyFiles = fs.listStatus(logDir);
+        FileStatus[] historyFiles = JT_FS.listStatus(logDir);
         // delete if older than 30 days
         if (historyFiles != null) {
           for (FileStatus f : historyFiles) {
             if (now - f.getModificationTime() > THIRTY_DAYS_IN_MS) {
-              fs.delete(f.getPath(), true); 
+              JT_FS.delete(f.getPath(), true); 
               LOG.info("Deleting old history file : " + f.getPath());
             }
           }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=773829&r1=773828&r2=773829&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Tue May 12 09:01:45
2009
@@ -173,6 +173,7 @@
   private JobInitKillStatus jobInitKillStatus = new JobInitKillStatus();
 
   private LocalFileSystem localFs;
+  private FileSystem fs;
   private JobID jobId;
   private boolean hasSpeculativeMaps;
   private boolean hasSpeculativeReduces;
@@ -228,7 +229,7 @@
     this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
     this.startTime = System.currentTimeMillis();
     status.setStartTime(startTime);
-    this.localFs = FileSystem.getLocal(default_conf);
+    this.localFs = jobtracker.getLocalFileSystem();
 
     JobConf default_job_conf = new JobConf(default_conf);
     this.localJobFile = default_job_conf.getLocalPath(JobTracker.SUBDIR 
@@ -236,7 +237,7 @@
     this.localJarFile = default_job_conf.getLocalPath(JobTracker.SUBDIR
                                                       +"/"+ jobid + ".jar");
     Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
-    FileSystem fs = jobDir.getFileSystem(default_conf);
+    fs = jobtracker.getFileSystem(jobDir);
     jobFile = new Path(jobDir, "job.xml");
     fs.copyToLocalFile(jobFile, localJobFile);
     conf = new JobConf(localJobFile);
@@ -397,8 +398,6 @@
     //
     String jobFile = profile.getJobFile();
 
-    Path sysDir = new Path(this.jobtracker.getSystemDir());
-    FileSystem fs = sysDir.getFileSystem(conf);
     DataInputStream splitFile =
       fs.open(new Path(conf.get("mapred.job.split.file")));
     JobClient.RawSplit[] splits;
@@ -2499,7 +2498,7 @@
       // Delete temp dfs dirs created if any, like in case of 
       // speculative exn of reduces.  
       Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
-      new CleanupQueue().addToQueue(conf,tempDir); 
+      new CleanupQueue().addToQueue(jobtracker.getFileSystem(tempDir), tempDir); 
     } catch (IOException e) {
       LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
     }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=773829&r1=773828&r2=773829&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Tue May 12 09:01:45
2009
@@ -52,6 +52,7 @@
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.http.HttpServer;
@@ -1792,6 +1793,29 @@
            : 0;
   }
 
+  /**
+   * Get JobTracker's FileSystem. This is the filesystem for mapred.system.dir.
+   */
+  FileSystem getFileSystem() {
+    return fs;
+  }
+
+  /**
+   * Get the FileSystem for the given path. This can be used to resolve
+   * filesystem for job history, local job files or mapred.system.dir path.
+   */
+  FileSystem getFileSystem(Path path) throws IOException {
+    return path.getFileSystem(conf);
+  }
+
+  /**
+   * Get JobTracker's LocalFileSystem handle. This is used by jobs for 
+   * localizing job files to the local disk.
+   */
+  LocalFileSystem getLocalFileSystem() throws IOException {
+    return FileSystem.getLocal(conf);
+  }
+
   public static Class<? extends JobTrackerInstrumentation> getInstrumentationClass(Configuration
conf) {
     return conf.getClass("mapred.jobtracker.instrumentation",
         JobTrackerMetricsInst.class, JobTrackerInstrumentation.class);
@@ -2995,7 +3019,7 @@
     
     String queue = job.getProfile().getQueueName();
     if(!(queueManager.getQueues().contains(queue))) {      
-      new CleanupQueue().addToQueue(conf,getSystemDirectoryForJob(jobId));
+      new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));
       throw new IOException("Queue \"" + queue + "\" does not exist");        
     }
 
@@ -3005,7 +3029,7 @@
     } catch (IOException ioe) {
        LOG.warn("Access denied for user " + job.getJobConf().getUser() 
                 + ". Ignoring job " + jobId, ioe);
-      new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
+      new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));
       throw ioe;
     }
 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=773829&r1=773828&r2=773829&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue May 12 09:01:45
2009
@@ -188,6 +188,7 @@
   private static final String JOBCACHE = "jobcache";
   private static final String OUTPUT = "output";
   private JobConf fConf;
+  private FileSystem localFs;
   private int maxCurrentMapTasks;
   private int maxCurrentReduceTasks;
   private int failures;
@@ -471,6 +472,7 @@
    * close().
    */
   synchronized void initialize() throws IOException {
+    localFs = FileSystem.getLocal(fConf);
     // use configured nameserver & interface to get local hostname
     if (fConf.get("slave.host.name") != null) {
       this.localHostname = fConf.get("slave.host.name");
@@ -1445,7 +1447,7 @@
         // Delete the job directory for this  
         // task if the job is done/failed
         if (!rjob.keepJobFiles){
-          directoryCleanupThread.addToQueue(fConf, getLocalFiles(fConf, 
+          directoryCleanupThread.addToQueue(localFs, getLocalFiles(fConf, 
             getLocalJobDir(rjob.getJobID().toString())));
         }
         // Remove this job 
@@ -2514,19 +2516,19 @@
             //might be using the dir. The JVM running the tasks would clean
             //the workdir per a task in the task process itself.
             if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-              directoryCleanupThread.addToQueue(defaultJobConf,
+              directoryCleanupThread.addToQueue(localFs,
                   getLocalFiles(defaultJobConf,
                   taskDir));
             }  
             
             else {
-              directoryCleanupThread.addToQueue(defaultJobConf,
+              directoryCleanupThread.addToQueue(localFs,
                   getLocalFiles(defaultJobConf,
                 taskDir+"/job.xml"));
             }
           } else {
             if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-              directoryCleanupThread.addToQueue(defaultJobConf,
+              directoryCleanupThread.addToQueue(localFs,
                   getLocalFiles(defaultJobConf,
                   taskDir+"/work"));
             }  



Mime
View raw message