hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r752835 - in /hadoop/core/branches/branch-0.20: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Date Thu, 12 Mar 2009 11:30:41 GMT
Author: yhemanth
Date: Thu Mar 12 11:30:34 2009
New Revision: 752835

URL: http://svn.apache.org/viewvc?rev=752835&view=rev
Log:
Merge -r 752833:752834 from trunk to branch 0.20 to fix HADOOP-5327.

Modified:
    hadoop/core/branches/branch-0.20/   (props changed)
    hadoop/core/branches/branch-0.20/CHANGES.txt   (contents, props changed)
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestQueueManager.java
    hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java

Propchange: hadoop/core/branches/branch-0.20/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 12 11:30:34 2009
@@ -1,2 +1,2 @@
 /hadoop/core/branches/branch-0.19:713112
-/hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274,746902-746903,746944,746968,746970,747279,747802,748084,748090,748783,749262,749318,749863,750533,752073,752609
+/hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274,746902-746903,746944,746968,746970,747279,747802,748084,748090,748783,749262,749318,749863,750533,752073,752609,752834

Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=752835&r1=752834&r2=752835&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Thu Mar 12 11:30:34 2009
@@ -693,6 +693,10 @@
     HADOOP-5416. Correct the shell command "fs -test" forrest doc description.
     (Ravi Phulari via szetszwo) 
 
+    HADOOP-5327. Fixed job tracker to remove files from system directory on
+    ACL check failures and also check ACLs on restart.
+    (Amar Kamat via yhemanth)
+
 Release 0.19.2 - Unreleased
 
   BUG FIXES

Propchange: hadoop/core/branches/branch-0.20/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 12 11:30:34 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.18/CHANGES.txt:727226
 /hadoop/core/branches/branch-0.19/CHANGES.txt:713112
-/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233,746274,746902-746903,746944,746968,746970,747279,747802,748084,748090,748783,749262,749318,749863,750533,752073,752514,752555,752590,752609
+/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233,746274,746902-746903,746944,746968,746970,747279,747802,748084,748090,748783,749262,749318,749863,750533,752073,752514,752555,752590,752609,752834

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=752835&r1=752834&r2=752835&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Thu Mar 12 11:30:34 2009
@@ -235,9 +235,9 @@
                                                       +"/"+jobid + ".xml");
     this.localJarFile = default_job_conf.getLocalPath(JobTracker.SUBDIR
                                                       +"/"+ jobid + ".jar");
-    Path sysDir = new Path(this.jobtracker.getSystemDir());
-    FileSystem fs = sysDir.getFileSystem(default_conf);
-    jobFile = new Path(sysDir, jobid + "/job.xml");
+    Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
+    FileSystem fs = jobDir.getFileSystem(default_conf);
+    jobFile = new Path(jobDir, "job.xml");
     fs.copyToLocalFile(jobFile, localJobFile);
     conf = new JobConf(localJobFile);
     this.priority = conf.getJobPriority();
@@ -2468,7 +2468,7 @@
       // so we remove that directory to cleanup
       // Delete temp dfs dirs created if any, like in case of 
       // speculative exn of reduces.  
-      Path tempDir = new Path(jobtracker.getSystemDir(), jobId.toString());
+      Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
       new CleanupQueue().addToQueue(conf,tempDir); 
     } catch (IOException e) {
       LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=752835&r1=752834&r2=752835&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java Thu
Mar 12 11:30:34 2009
@@ -1098,31 +1098,45 @@
         try {
           // 1. Create the job object
           JobInProgress job = new JobInProgress(id, JobTracker.this, conf);
-          String logFileName;
-          Path jobHistoryFilePath;
 
-          // 2. Get the log file and the file path
-          logFileName = 
+          // 2. Check if the user has appropriate access
+          // Get the user group info for the job's owner
+          UserGroupInformation ugi =
+            UserGroupInformation.readFrom(job.getJobConf());
+          LOG.info("Submitting job " + id + " on behalf of user "
+                   + ugi.getUserName() + " in groups : "
+                   + StringUtils.arrayToString(ugi.getGroupNames()));
+
+          // check the access
+          try {
+            checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB, ugi);
+          } catch (Throwable t) {
+            LOG.warn("Access denied for user " + ugi.getUserName() 
+                     + " in groups : [" 
+                     + StringUtils.arrayToString(ugi.getGroupNames()) + "]");
+            throw t;
+          }
+
+          // 3. Get the log file and the file path
+          String logFileName = 
             JobHistory.JobInfo.getJobHistoryFileName(job.getJobConf(), id);
-          jobHistoryFilePath = 
+          Path jobHistoryFilePath = 
             JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
 
-          // 3. Recover the history file. This involved
+          // 4. Recover the history file. This involved
           //     - deleting file.recover if file exists
           //     - renaming file.recover to file if file doesnt exist
           // This makes sure that the (master) file exists
           JobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(), 
                                                    jobHistoryFilePath);
           
-          // 4. Cache the history file name as it costs one dfs access
+          // 5. Cache the history file name as it costs one dfs access
           jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
 
-          // 5. Sumbit the job to the jobtracker
+          // 6. Sumbit the job to the jobtracker
           addJob(id, job);
         } catch (Throwable t) {
-          LOG.warn("Failed to recover job " + id + " history details." 
-                   + " Ignoring.", t);
-          // TODO : remove job details from the system directory
+          LOG.warn("Failed to recover job " + id + " Ignoring the job.", t);
           idIter.remove();
           continue;
         }
@@ -1157,8 +1171,8 @@
           JobHistory.parseHistoryFromFS(jobHistoryFilePath.toString(), 
                                         listener, fs);
         } catch (Throwable t) {
-          LOG.info("JobTracker failed to recover job " + pJob.getJobID() 
-                   + " from history. Ignoring it.", t);
+          LOG.info("Error reading history file of job " + pJob.getJobID() 
+                   + ". Ignoring the error and continuing.", t);
         }
 
         // 3. Close the listener
@@ -1179,7 +1193,7 @@
           }
         } catch (Throwable t) {
           LOG.warn("Failed to delete log file (" + logFileName + ") for job " 
-                   + id + ". Ignoring it.", t);
+                   + id + ". Continuing.", t);
         }
 
         if (pJob.isComplete()) {
@@ -2763,7 +2777,14 @@
     JobInProgress job = new JobInProgress(jobId, this, this.conf);
 
     // check for access
-    checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
+    try {
+      checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
+    } catch (IOException ioe) {
+       LOG.warn("Access denied for user " + job.getJobConf().getUser() 
+                + ". Ignoring job " + jobId, ioe);
+      new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
+      throw ioe;
+    }
 
    return addJob(jobId, job); 
   }
@@ -2794,15 +2815,18 @@
   }
 
   // Check whether the specified operation can be performed
-  // related to the job. If ownerAllowed is true, then an owner
-  // of the job can perform the operation irrespective of
-  // access control.
+  // related to the job.
   private void checkAccess(JobInProgress job, 
                                 QueueManager.QueueOperation oper) 
                                   throws IOException {
     // get the user group info
     UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
+    checkAccess(job, oper, ugi);
+  }
 
+  // use the passed ugi for checking the access
+  private void checkAccess(JobInProgress job, QueueManager.QueueOperation oper,
+                           UserGroupInformation ugi) throws IOException {
     // get the queue
     String queue = job.getProfile().getQueueName();
     if (!queueManager.hasAccess(queue, job, oper, ugi)) {
@@ -3148,6 +3172,11 @@
     return jobs.get(jobid);
   }
 
+  // Get the job directory in system directory
+  Path getSystemDirectoryForJob(JobID id) {
+    return new Path(getSystemDir(), id.toString());
+  }
+
   /**
    * Change the run-time priority of the given job.
    * @param jobId job id

Modified: hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestQueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestQueueManager.java?rev=752835&r1=752834&r2=752835&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestQueueManager.java
(original)
+++ hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestQueueManager.java
Thu Mar 12 11:30:34 2009
@@ -31,6 +31,7 @@
 import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UnixUserGroupInformation;
@@ -217,6 +218,16 @@
         assertTrue(ioe.getMessage().
             contains("cannot perform operation " +
             "SUBMIT_JOB on queue " + queue));
+        // check if the system directory gets cleaned up or not
+        JobTracker jobtracker = miniMRCluster.getJobTrackerRunner().getJobTracker();
+        Path sysDir = new Path(jobtracker.getSystemDir());
+        FileSystem fs = sysDir.getFileSystem(conf);
+        int size = fs.listStatus(sysDir).length;
+        while (size > 0) {
+          System.out.println("Waiting for the job files in sys directory to be cleaned up");
+          UtilsForTests.waitFor(100);
+          size = fs.listStatus(sysDir).length;
+        }
       }
     } finally {
       tearDownCluster();

Modified: hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=752835&r1=752834&r2=752835&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
(original)
+++ hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
Thu Mar 12 11:30:34 2009
@@ -29,6 +29,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobTracker.RecoveryManager;
+import org.apache.hadoop.security.UserGroupInformation;
 
 /**
  * Test whether the {@link RecoveryManager} is able to tolerate job-recovery 
@@ -203,6 +204,25 @@
       UtilsForTests.waitFor(100);
     }
     
+    // now submit job3 with inappropriate acls
+    JobConf job3 = mr.createJobConf();
+    job3.set("hadoop.job.ugi","abc,users");
+
+    UtilsForTests.configureWaitingJobConf(job3, 
+        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output2"), 1, 0, 
+        "test-recovery-manager", signalFile, signalFile);
+    
+    // submit the job
+    RunningJob rJob3 = (new JobClient(job3)).submitJob(job3);
+    LOG.info("Submitted job " + rJob3.getID() + " with different user");
+    
+    jip = jobtracker.getJob(rJob3.getID());
+
+    while (!jip.inited()) {
+      LOG.info("Waiting for job " + jip.getJobID() + " to be inited");
+      UtilsForTests.waitFor(100);
+    }
+
     // kill the jobtracker
     LOG.info("Stopping jobtracker");
     mr.stopJobTracker();
@@ -212,9 +232,15 @@
                                       true);
     mr.getJobTrackerConf().setInt("mapred.jobtracker.maxtasks.per.job", 25);
     
+    mr.getJobTrackerConf().setBoolean("mapred.acls.enabled" , true);
+    UserGroupInformation ugi = UserGroupInformation.readFrom(job1);
+    mr.getJobTrackerConf().set("mapred.queue.default.acl-submit-job", 
+                               ugi.getUserName());
+
     // start the jobtracker
     LOG.info("Starting jobtracker");
     mr.startJobTracker();
+    UtilsForTests.waitForJobTracker(jc);
     
     jobtracker = mr.getJobTrackerRunner().getJobTracker();
     
@@ -222,6 +248,17 @@
     assertEquals("Recovery manager failed to tolerate job failures",
                  2, jobtracker.getAllJobs().length);
     
+    // check if the job#1 has failed
+    JobStatus status = jobtracker.getJobStatus(rJob1.getID());
+    assertEquals("Faulty job not failed", 
+                 JobStatus.FAILED, status.getRunState());
+    
+    jip = jobtracker.getJob(rJob2.getID());
+    assertFalse("Job should be running", jip.isComplete());
+    
+    status = jobtracker.getJobStatus(rJob3.getID());
+    assertNull("Job should be missing", status);
+    
     mr.shutdown();
   }
 }



Mime
View raw message