hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1471565 - in /hadoop/common/branches/branch-1.2: CHANGES.txt src/mapred/org/apache/hadoop/mapred/JobInProgress.java src/mapred/org/apache/hadoop/mapred/JobTracker.java src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
Date Wed, 24 Apr 2013 17:49:46 GMT
Author: acmurthy
Date: Wed Apr 24 17:49:46 2013
New Revision: 1471565

URL: http://svn.apache.org/r1471565
Log:
Merge -c 1471563 from branch-1 to branch-1.2 to fix MAPREDUCE-5169. Fixed JobTracker recovery
to ensure both job-info and jobToken file are saved correctly to prevent race-condition between
job submission and initialization. Contributed by Arun C. Murthy

Modified:
    hadoop/common/branches/branch-1.2/CHANGES.txt
    hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-1.2/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java

Modified: hadoop/common/branches/branch-1.2/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/CHANGES.txt?rev=1471565&r1=1471564&r2=1471565&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.2/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.2/CHANGES.txt Wed Apr 24 17:49:46 2013
@@ -581,6 +581,10 @@ Release 1.2.0 - 2013.04.16
     can override Mapper.run and Reducer.run to get the old (inconsistent)
     behaviour. (acmurthy) 
 
+    MAPREDUCE-5169. Fixed JobTracker recovery to ensure both job-info and
+    jobToken file are saved correctly to prevent race-condition between job
+    submission and initialization. (acmurthy)
+
 Release 1.1.2 - 2013.01.30
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1471565&r1=1471564&r2=1471565&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Wed Apr 24 17:49:46 2013
@@ -44,16 +44,13 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.Counters.CountersExceededException;
 import org.apache.hadoop.mapred.JobHistory.Values;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 import org.apache.hadoop.mapreduce.split.JobSplit;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
@@ -654,12 +651,44 @@ public class JobInProgress {
     this.numSlotsPerReduce = numSlotsPerReduce;
   }
 
+  static final String JOB_INIT_EXCEPTION = 
+      "mapreduce.job.init.throw.exception";
+  static final String JT_JOB_INIT_EXCEPTION_OVERRIDE = 
+      "mapreduce.jt.job.init.throw.exception.override";
+  
+  Object jobInitWaitLockForTests = new Object();
+  
+  void signalInitWaitLockForTests() {
+    synchronized (jobInitWaitLockForTests) {
+      jobInitWaitLockForTests.notify();
+    }
+  }
+  
+  void waitForInitWaitLockForTests() {
+    synchronized (jobInitWaitLockForTests) {
+      try {
+        LOG.info("About to wait for jobInitWaitLockForTests");
+        jobInitWaitLockForTests.wait();
+        LOG.info("Done waiting for jobInitWaitLockForTests");
+      } catch (InterruptedException ie) {
+        // Should never occur
+      }
+    }
+  }
+  
   /**
    * Construct the splits, etc.  This is invoked from an async
    * thread so that split-computation doesn't block anyone.
    */
   public synchronized void initTasks() 
   throws IOException, KillInterruptedException, UnknownHostException {
+    // Only for tests
+    if (!jobtracker.getConf().getBoolean(JT_JOB_INIT_EXCEPTION_OVERRIDE, false) 
+        &&
+        getJobConf().getBoolean(JOB_INIT_EXCEPTION, false)) {
+        waitForInitWaitLockForTests();
+    }
+    
     if (tasksInited || isComplete()) {
       return;
     }
@@ -690,11 +719,6 @@ public class JobInProgress {
     setPriority(this.priority);
     
     //
-    // generate security keys needed by Tasks
-    //
-    generateAndStoreTokens();
-    
-    //
     // read input splits and create a map per a split
     //
     TaskSplitMetaInfo[] splits = createSplits(jobId);
@@ -3579,31 +3603,6 @@ public class JobInProgress {
   }
 
   /**
-   * generate job token and save it into the file
-   * @throws IOException
-   */
-  private void generateAndStoreTokens() throws IOException {
-    Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
-    Path keysFile = new Path(jobDir, TokenCache.JOB_TOKEN_HDFS_FILE);
-    if (tokenStorage == null) {
-      tokenStorage = new Credentials();
-    }
-    //create JobToken file and write token to it
-    JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(jobId
-        .toString()));
-    Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(identifier,
-        jobtracker.getJobTokenSecretManager());
-    token.setService(identifier.getJobId());
-    
-    TokenCache.setJobToken(token, tokenStorage);
-        
-    // write TokenStorage out
-    tokenStorage.writeTokenStorageFile(keysFile, jobtracker.getConf());
-    LOG.info("jobToken generated and stored with users keys in "
-        + keysFile.toUri().getPath());
-  }
-
-  /**
    * Get the level of locality that a given task would have if launched on
    * a particular TaskTracker. Returns 0 if the task has data on that machine,
    * 1 if it has data on the same rack, etc (depending on number of levels in

Modified: hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1471565&r1=1471564&r2=1471565&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++ hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Wed Apr 24 17:49:46 2013
@@ -79,7 +79,9 @@ import org.apache.hadoop.mapred.QueueMan
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.mapreduce.ClusterMetrics;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenSecretManager;
@@ -205,7 +207,6 @@ public class JobTracker implements MRCon
   volatile State state = State.INITIALIZING;
   private static final int FS_ACCESS_RETRY_PERIOD = 1000;
   static final String JOB_INFO_FILE = "job-info";
-  static final String JOB_TOKEN_FILE = "jobToken";
   private DNSToSwitchMapping dnsToSwitchMapping;
   private NetworkTopology clusterMap;
   private int numTaskCacheLevels; // the max level to which we cache tasks
@@ -3579,10 +3580,26 @@ public class JobTracker implements MRCon
           new Path(jobSubmitDir));
     }
     
+    // Store the job-info in a file so that the job can be recovered
+    // later (if at all)
+    // Note: jobDir & jobInfo are owned by JT user since we are using
+    // his fs object
+    if (!recovered) {
+      Path jobDir = getSystemDirectoryForJob(jobId);
+      FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION));
+      FSDataOutputStream out = fs.create(getSystemFileForJob(jobId));
+      jobInfo.write(out);
+      out.close();
+    }
+
     // Create the JobInProgress, do not lock the JobTracker since
-    // we are about to copy job.xml from HDFS
+    // we are about to copy job.xml from HDFS and write jobToken file to HDFS
     JobInProgress job = null;
     try {
+      if (ts == null) {
+        ts = new Credentials();
+      }
+      generateAndStoreJobTokens(jobId, ts);
       job = new JobInProgress(this, this.conf, jobInfo, 0, ts);
     } catch (Exception e) {
       throw new IOException(e);
@@ -3618,16 +3635,6 @@ public class JobTracker implements MRCon
         throw ioe;
       }
 
-      if (!recovered) {
-        // Store the information in a file so that the job can be recovered
-        // later (if at all)
-        Path jobDir = getSystemDirectoryForJob(jobId);
-        FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION));
-        FSDataOutputStream out = fs.create(getSystemFileForJob(jobId));
-        jobInfo.write(out);
-        out.close();
-      }
-
       try {
         this.taskScheduler.checkJobSubmission(job);
       } catch (IOException ioe){
@@ -4346,12 +4353,13 @@ public class JobTracker implements MRCon
   
   //Get the job token file in system directory
   Path getSystemFileForJob(JobID id) {
-    return new Path(getSystemDirectoryForJob(id)+"/" + JOB_INFO_FILE);
+    return new Path(getSystemDirectoryForJob(id), JOB_INFO_FILE);
   }
 
   //Get the job token file in system directory
   Path getTokenFileForJob(JobID id) {
-    return new Path(getSystemDirectoryForJob(id)+"/" + JOB_TOKEN_FILE);
+    return new Path(
+        getSystemDirectoryForJob(id), TokenCache.JOB_TOKEN_HDFS_FILE);
   }
   
   /**
@@ -5184,4 +5192,47 @@ public class JobTracker implements MRCon
     }
   }
 
+  /**
+   * generate job token and save it into the file
+   * @throws IOException
+   * @throws InterruptedException 
+   */
+  private void 
+  generateAndStoreJobTokens(final JobID jobId, final Credentials tokenStorage) 
+      throws IOException {
+
+    // Write out jobToken as JT user
+    try {
+      getMROwner().doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws IOException {
+
+          Path jobDir = getSystemDirectoryForJob(jobId);
+          Path keysFile = new Path(jobDir, TokenCache.JOB_TOKEN_HDFS_FILE);
+          //create JobToken file and write token to it
+          JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(jobId
+              .toString()));
+          Token<JobTokenIdentifier> token = 
+              new Token<JobTokenIdentifier>(
+                  identifier, getJobTokenSecretManager());
+          token.setService(identifier.getJobId());
+
+          TokenCache.setJobToken(token, tokenStorage);
+
+          // write TokenStorage out
+          tokenStorage.writeTokenStorageFile(keysFile, getConf());
+          LOG.info("jobToken generated and stored with users keys in "
+              + keysFile.toUri().getPath());
+          
+          return null;
+
+        }
+      });
+    } catch (InterruptedException ie) {
+      // TODO Auto-generated catch block
+      throw new IOException(ie);
+    }
+
+  }
+
 }

Modified: hadoop/common/branches/branch-1.2/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=1471565&r1=1471564&r2=1471565&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.2/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
(original)
+++ hadoop/common/branches/branch-1.2/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
Wed Apr 24 17:49:46 2013
@@ -52,6 +52,7 @@ public class TestRecoveryManager {
   private static final Path TEST_DIR = 
     new Path(System.getProperty("test.build.data", "/tmp"), 
              "test-recovery-manager");
+  
   private FileSystem fs;
   private MiniDFSCluster dfs;
   private MiniMRCluster mr;
@@ -723,4 +724,91 @@ public class TestRecoveryManager {
     rJob1 = jc.getJob(rJob1.getID());
     Assert.assertTrue("Task should be successful", rJob1.isSuccessful());
   }
+
+  @Test(timeout=120000)
+  public void testJobInitError() throws Exception {
+    LOG.info("Testing error during Job submission");
+    
+    final Path HDFS_TEST_DIR = new Path("/tmp");
+    
+    JobConf conf = new JobConf();
+    
+    dfs = new MiniDFSCluster(conf, 1, true, null);
+    fs = dfs.getFileSystem();
+
+    conf.set("mapreduce.jobtracker.staging.root.dir", "/user");
+    conf.set("mapred.system.dir", "/mapred");
+    
+    String mapredSysDir =  conf.get("mapred.system.dir");
+    mkdirWithPerms(fs, mapredSysDir, (short)0700);
+    fs.setOwner(new Path(mapredSysDir),
+        UserGroupInformation.getCurrentUser().getUserName(), "mrgroup");
+
+    mkdirWithPerms(fs, "/user", (short)0777);
+    mkdirWithPerms(fs, "/mapred", (short)0777);
+    mkdirWithPerms(fs, "/tmp", (short)0777);
+
+    mr = 
+        new MiniMRCluster(
+            1, dfs.getFileSystem().getUri().toString(), 1, null, null, conf);
+
+    String signalFile = new Path(HDFS_TEST_DIR, "signal").toString();
+
+    // make sure that the jobtracker is in recovery mode
+    mr.getJobTrackerConf()
+    .setBoolean("mapred.jobtracker.restart.recover", true);
+
+    JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
+
+    final JobConf job1 = mr.createJobConf();
+    UtilsForTests.configureWaitingJobConf(job1, new Path(HDFS_TEST_DIR, "input"),
+        new Path(HDFS_TEST_DIR, "output3"), 2, 0, "test-resubmission", signalFile,
+        signalFile);
+
+    UserGroupInformation ugi =
+        UserGroupInformation.createUserForTesting("bob", new String[]{"users"});
+    job1.setUser(ugi.getUserName());
+
+    JobClient jc = new JobClient(job1);
+    RunningJob rJob1 = ugi.doAs(new PrivilegedExceptionAction<RunningJob>() {
+      public RunningJob run() throws IOException {
+        // Job 1 init should fail
+        job1.setBoolean(JobInProgress.JOB_INIT_EXCEPTION, true);
+        JobClient jc = new JobClient(job1);
+        return jc.submitJob(job1);
+      }
+    });
+    LOG.info("Submitted first job " + rJob1.getID());
+
+    // kill the jobtracker
+    LOG.info("Stopping jobtracker");
+    mr.stopJobTracker();
+
+    // start the jobtracker, after turning off job-init exception
+    LOG.info("Starting jobtracker");
+    mr.getJobTrackerConf().setBoolean(
+        JobInProgress.JT_JOB_INIT_EXCEPTION_OVERRIDE, true);
+    mr.startJobTracker(false);
+
+    while (!mr.getJobTrackerRunner().isUp()) {
+      Thread.sleep(100);
+    }
+    jobtracker = mr.getJobTrackerRunner().getJobTracker();
+    Assert.assertNotNull(jobtracker);
+    
+    UtilsForTests.waitForJobTracker(jc);
+
+    // assert that job is recovered by the jobtracker
+    Assert.assertEquals("Resubmission failed ", 1, jobtracker.getAllJobs().length);
+    JobInProgress jip = jobtracker.getJob(rJob1.getID());
+
+    // Signaling Map task to complete
+    fs.create(new Path(HDFS_TEST_DIR, "signal"));
+    while (!jip.isComplete()) {
+      LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
+      UtilsForTests.waitFor(100);
+    }
+    rJob1 = jc.getJob(rJob1.getID());
+    Assert.assertTrue("Task should be successful", rJob1.isSuccessful());
+  }
 }



Mime
View raw message