hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1185881 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/mai...
Date Tue, 18 Oct 2011 21:38:09 GMT
Author: acmurthy
Date: Tue Oct 18 21:38:08 2011
New Revision: 1185881

URL: http://svn.apache.org/viewvc?rev=1185881&view=rev
Log:
Merge -c 1185880 from trunk to branch-0.23 to complete fix for MAPREDUCE-2762.

Added:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
      - copied unchanged from r1185880, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1185881&r1=1185880&r2=1185881&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Tue Oct 18 21:38:08
2011
@@ -1618,6 +1618,9 @@ Release 0.23.0 - Unreleased
     MAPREDUCE-3197. TestMRClientService failing on building clean checkout of 
     branch 0.23 (mahadev)
 
+    MAPREDUCE-2762. Cleanup MR staging directory on completion. (mahadev via
+    acmurthy) 
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1185881&r1=1185880&r2=1185881&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
Tue Oct 18 21:38:08 2011
@@ -31,12 +31,14 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.LocalContainerLauncher;
 import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
 import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@@ -228,6 +230,97 @@ public class MRAppMaster extends Composi
     super.init(conf);
   } // end of init()
 
+  
+  protected boolean keepJobFiles(JobConf conf) {
+    return (conf.getKeepTaskFilesPattern() != null || conf
+        .getKeepFailedTaskFiles());
+  }
+  
+  /**
+   * Create the default file System for this job.
+   * @param conf the conf object
+   * @return the default filesystem for this job
+   * @throws IOException
+   */
+  protected FileSystem getFileSystem(Configuration conf) throws IOException {
+    return FileSystem.get(conf);
+  }
+  
+  /**
+   * clean up staging directories for the job.
+   * @throws IOException
+   */
+  public void cleanupStagingDir() throws IOException {
+    /* make sure we clean the staging files */
+    String jobTempDir = null;
+    FileSystem fs = getFileSystem(getConfig());
+    try {
+      if (!keepJobFiles(new JobConf(getConfig()))) {
+        jobTempDir = getConfig().get(MRJobConfig.MAPREDUCE_JOB_DIR);
+        if (jobTempDir == null) {
+          LOG.warn("Job Staging directory is null");
+          return;
+        }
+        Path jobTempDirPath = new Path(jobTempDir);
+        LOG.info("Deleting staging directory " + fs.getDefaultUri(getConfig()) +
+            " " + jobTempDir);
+        fs.delete(jobTempDirPath, true);
+      }
+    } catch(IOException io) {
+      LOG.error("Failed to cleanup staging dir " + jobTempDir, io);
+    }
+  }
+  
+  /**
+   * Exit call. Just in a function call to enable testing.
+   */
+  protected void sysexit() {
+    System.exit(0);
+  }
+  
+  private class JobFinishEventHandler implements EventHandler<JobFinishEvent> {
+    @Override
+    public void handle(JobFinishEvent event) {
+      // job has finished
+      // this is the only job, so shut down the Appmaster
+      // note in a workflow scenario, this may lead to creation of a new
+      // job (FIXME?)
+
+      // TODO:currently just wait for some time so clients can know the
+      // final states. Will be removed once RM come on.
+      try {
+        Thread.sleep(5000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      LOG.info("Calling stop for all the services");
+      try {
+        stop();
+      } catch (Throwable t) {
+        LOG.warn("Graceful stop failed ", t);
+      }
+      try {
+        cleanupStagingDir();
+      } catch(IOException io) {
+        LOG.warn("Failed to delete staging dir");
+      }
+      //TODO: this is required because rpc server does not shut down
+      // in spite of calling server.stop().
+      //Bring the process down by force.
+      //Not needed after HADOOP-7140
+      LOG.info("Exiting MR AppMaster..GoodBye!");
+      sysexit();
+    }
+  }
+  
+  /**
+   * create an event handler that handles the job finish event.
+   * @return the job finish event handler.
+   */
+  protected EventHandler<JobFinishEvent> createJobFinishEventHandler() {
+    return new JobFinishEventHandler();
+  }
+
   /** Create and initialize (but don't start) a single job. */
   protected Job createJob(Configuration conf) {
 
@@ -238,36 +331,7 @@ public class MRAppMaster extends Composi
     ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
 
     dispatcher.register(JobFinishEvent.Type.class,
-        new EventHandler<JobFinishEvent>() {
-          @Override
-          public void handle(JobFinishEvent event) {
-            // job has finished
-            // this is the only job, so shut down the Appmaster
-            // note in a workflow scenario, this may lead to creation of a new
-            // job (FIXME?)
-
-            // TODO:currently just wait for some time so clients can know the
-            // final states. Will be removed once RM come on.
-            try {
-              Thread.sleep(5000);
-            } catch (InterruptedException e) {
-              e.printStackTrace();
-            }
-            LOG.info("Calling stop for all the services");
-            try {
-              stop();
-            } catch (Throwable t) {
-              LOG.warn("Graceful stop failed ", t);
-            }
-            //TODO: this is required because rpc server does not shut down
-            // in spite of calling server.stop().
-            //Bring the process down by force.
-            //Not needed after HADOOP-7140
-            LOG.info("Exiting MR AppMaster..GoodBye!");
-            System.exit(0);
-          }
-        });
-
+        createJobFinishEventHandler());     
     return newJob;
   } // end createJob()
 
@@ -553,6 +617,7 @@ public class MRAppMaster extends Composi
 
     ///////////////////// Create the job itself.
     job = createJob(getConfig());
+    
     // End of creating the job.
 
     // metrics system init is really init & start.

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1185881&r1=1185880&r2=1185881&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
Tue Oct 18 21:38:08 2011
@@ -339,7 +339,6 @@ public class JobImpl implements org.apac
                   JobEventType.JOB_DIAGNOSTIC_UPDATE,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
                   JobEventType.INTERNAL_ERROR))
-
           // create the topology tables
           .installTopology();
  
@@ -724,6 +723,16 @@ public class JobImpl implements org.apac
     this.getEventHandler().handle(new JobHistoryEvent(this.jobId, jfe));    
   }
   
+  /**
+   * Create the default file System for this job.
+   * @param conf the conf object
+   * @return the default filesystem for this job
+   * @throws IOException
+   */
+  protected FileSystem getFileSystem(Configuration conf) throws IOException {
+    return FileSystem.get(conf);
+  }
+  
   static JobState checkJobCompleteSuccess(JobImpl job) {
     // check for Job success
     if (job.completedTaskCount == job.getTasks().size()) {
@@ -733,7 +742,6 @@ public class JobImpl implements org.apac
       } catch (IOException e) {
         LOG.warn("Could not do commit for Job", e);
       }
-      
       job.logJobHistoryFinishedEvent();
       return job.finished(JobState.SUCCEEDED);
     }
@@ -816,7 +824,7 @@ public class JobImpl implements org.apac
       job.metrics.preparingJob(job);
       try {
         setup(job);
-        job.fs = FileSystem.get(job.conf);
+        job.fs = job.getFileSystem(job.conf);
 
         //log to job history
         JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
@@ -848,13 +856,14 @@ public class JobImpl implements org.apac
           LOG.info("Using mapred newApiCommitter.");
         }
         
-        LOG.info("OutputCommitter set in config " + job.conf.get("mapred.output.committer.class"));
+        LOG.info("OutputCommitter set in config " + job.conf.get(
+            "mapred.output.committer.class"));
         
         if (newApiCommitter) {
           job.jobContext = new JobContextImpl(job.conf,
               job.oldJobId);
-          org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = RecordFactoryProvider
-              .getRecordFactory(null)
+          org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID
+          = RecordFactoryProvider.getRecordFactory(null)
               .newRecordInstance(
                   org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId.class);
           attemptID.setTaskId(RecordFactoryProvider.getRecordFactory(null)
@@ -884,14 +893,17 @@ public class JobImpl implements org.apac
           inputLength += taskSplitMetaInfo[i].getInputDataLength();
         }
 
-//FIXME:  need new memory criterion for uber-decision (oops, too late here; until AM-resizing
supported, must depend on job client to pass fat-slot needs)
+        //FIXME:  need new memory criterion for uber-decision (oops, too late here; 
+        // until AM-resizing supported, must depend on job client to pass fat-slot needs)
         // these are no longer "system" settings, necessarily; user may override
         int sysMaxMaps = job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);
         int sysMaxReduces =
             job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
         long sysMaxBytes = job.conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
-            job.conf.getLong("dfs.block.size", 64*1024*1024));  //FIXME: this is wrong; get
FS from [File?]InputFormat and default block size from that
-        //long sysMemSizeForUberSlot = JobTracker.getMemSizeForReduceSlot(); // FIXME [could
use default AM-container memory size...]
+            job.conf.getLong("dfs.block.size", 64*1024*1024));  //FIXME: this is 
+        // wrong; get FS from [File?]InputFormat and default block size from that
+        //long sysMemSizeForUberSlot = JobTracker.getMemSizeForReduceSlot(); 
+        // FIXME [could use default AM-container memory size...]
 
         boolean uberEnabled =
             job.conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
@@ -900,8 +912,8 @@ public class JobImpl implements org.apac
         boolean smallInput = (inputLength <= sysMaxBytes);
         boolean smallMemory = true;  //FIXME (see above)
             // ignoring overhead due to UberTask and statics as negligible here:
-//  FIXME   && (Math.max(memoryPerMap, memoryPerReduce) <= sysMemSizeForUberSlot
-//              || sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT)
+        //  FIXME   && (Math.max(memoryPerMap, memoryPerReduce) <= sysMemSizeForUberSlot
+        //              || sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT)
         boolean notChainJob = !isChainJob(job.conf);
 
         // User has overall veto power over uberization, or user can modify
@@ -935,7 +947,9 @@ public class JobImpl implements org.apac
           job.conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
 
           // disable speculation:  makes no sense to speculate an entire job
-//        canSpeculateMaps = canSpeculateReduces = false; // [TODO: in old version, ultimately
was from conf.getMapSpeculativeExecution(), conf.getReduceSpeculativeExecution()]
+          //canSpeculateMaps = canSpeculateReduces = false; // [TODO: in old 
+          //version, ultimately was from conf.getMapSpeculativeExecution(), 
+          //conf.getReduceSpeculativeExecution()]
         } else {
           StringBuilder msg = new StringBuilder();
           msg.append("Not uberizing ").append(job.jobId).append(" because:");

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1185881&r1=1185880&r2=1185881&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
Tue Oct 18 21:38:08 2011
@@ -1119,7 +1119,7 @@ abstract public class Task implements Wr
     // delete the staging area for the job
     JobConf conf = new JobConf(jobContext.getConfiguration());
     if (!keepTaskFiles(conf)) {
-      String jobTempDir = conf.get("mapreduce.job.dir");
+      String jobTempDir = conf.get(MRJobConfig.MAPREDUCE_JOB_DIR);
       Path jobTempDirPath = new Path(jobTempDir);
       FileSystem fs = jobTempDirPath.getFileSystem(conf);
       fs.delete(jobTempDirPath, true);

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=1185881&r1=1185880&r2=1185881&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
Tue Oct 18 21:38:08 2011
@@ -341,7 +341,7 @@ class JobSubmitter {
     Path submitJobDir = new Path(jobStagingArea, jobId.toString());
     JobStatus status = null;
     try {
-      conf.set("mapreduce.job.dir", submitJobDir.toString());
+      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
       LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
           + " as the submit dir");
       // get delegation token for the dir

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1185881&r1=1185880&r2=1185881&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
Tue Oct 18 21:38:08 2011
@@ -238,6 +238,8 @@ public interface MRJobConfig {
   public static final String REDUCE_JAVA_OPTS = "mapreduce.reduce.java.opts";
 
   public static final String REDUCE_ULIMIT = "mapreduce.reduce.ulimit";
+  
+  public static final String MAPREDUCE_JOB_DIR = "mapreduce.job.dir";
 
   public static final String REDUCE_MAX_ATTEMPTS = "mapreduce.reduce.maxattempts";
 

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java?rev=1185881&r1=1185880&r2=1185881&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
Tue Oct 18 21:38:08 2011
@@ -31,7 +31,6 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.util.Records;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;



Mime
View raw message