hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077153 - in /hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred: Child.java JobInProgress.java JobTracker.java
Date Fri, 04 Mar 2011 03:46:36 GMT
Author: omalley
Date: Fri Mar  4 03:46:35 2011
New Revision: 1077153

URL: http://svn.apache.org/viewvc?rev=1077153&view=rev
Log:
commit dd4956feca64b86eb358b512cfdbac7bfc0a7942
Author: Devaraj Das <ddas@yahoo-inc.com>
Date:   Sun Feb 7 00:22:29 2010 -0800

    MAPREDUCE:1457 from https://issues.apache.org/jira/secure/attachment/12435115/MAPREDUCE-1457-BPY20.patch.1
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1457. Fixes JobTracker to get the FileSystem object within getStagingAreaDir
    +    within a privileged block. Fixes Child.java to use the appropriate UGIs while getting
    +    the TaskUmbilicalProtocol proxy and while executing the task.
    +    Contributed by Jakob Homan. (ddas)
    +

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java?rev=1077153&r1=1077152&r2=1077153&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
Fri Mar  4 03:46:35 2011
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -58,10 +59,10 @@ class Child {
   public static void main(String[] args) throws Throwable {
     LOG.debug("Child starting");
 
-    JobConf defaultConf = new JobConf();
+    final JobConf defaultConf = new JobConf();
     String host = args[0];
     int port = Integer.parseInt(args[1]);
-    InetSocketAddress address = new InetSocketAddress(host, port);
+    final InetSocketAddress address = new InetSocketAddress(host, port);
     final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
     final int SLEEP_LONGER_COUNT = 5;
     int jvmIdInt = Integer.parseInt(args[3]);
@@ -81,11 +82,21 @@ class Child {
     UserGroupInformation current = UserGroupInformation.getCurrentUser();
     current.addToken(jt);
 
-    TaskUmbilicalProtocol umbilical =
-      (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
-          TaskUmbilicalProtocol.versionID,
-          address,
-          defaultConf);
+    UserGroupInformation taskOwner 
+     = UserGroupInformation.createRemoteUser(firstTaskid.getJobID().toString());
+    taskOwner.addToken(jt);
+    
+    final TaskUmbilicalProtocol umbilical = 
+      taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
+        @Override
+        public TaskUmbilicalProtocol run() throws Exception {
+          return (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
+              TaskUmbilicalProtocol.versionID,
+              address,
+              defaultConf);
+        }
+    });
+    
     int numTasksToExecute = -1; //-1 signifies "no limit"
     int numTasksExecuted = 0;
     Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -127,6 +138,9 @@ class Child {
     JvmContext context = new JvmContext(jvmId, pid);
     int idleLoopCount = 0;
     Task task = null;
+    
+    UserGroupInformation childUGI = null;
+    
     try {
       while (true) {
         taskid = null;
@@ -156,7 +170,7 @@ class Child {
         //create the index file so that the log files 
         //are viewable immediately
         TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
-        JobConf job = new JobConf(task.getJobFile());
+        final JobConf job = new JobConf(task.getJobFile());
 
         // set the jobTokenFile into task
         task.setJobTokenSecret(JobTokenSecretManager.
@@ -181,11 +195,27 @@ class Child {
         JvmMetrics.init(task.getPhase().toString(), job.getSessionId());
         // use job-specified working directory
         FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
-        try {
-          task.run(job, umbilical);             // run the task
-        } finally {
-          TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
-        }
+        LOG.debug("Creating remote user to execute task: " + job.get("user.name"));
+        childUGI = UserGroupInformation.createRemoteUser(job.get("user.name"));
+        // Add tokens to new user so that it may execute its task correctly.
+        for(Token<?> token : UserGroupInformation.getCurrentUser().getTokens()) {
+          childUGI.addToken(token);
+        }
+        
+        // Create a final reference to the task for the doAs block
+        final Task taskFinal = task;
+        childUGI.doAs(new PrivilegedExceptionAction<Object>() {
+          @Override
+          public Object run() throws Exception {
+            try {
+              taskFinal.run(job, umbilical);             // run the task
+            } finally {
+              TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
+            }
+
+            return null;
+          }
+        });
         if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute)
{
           break;
         }
@@ -198,7 +228,18 @@ class Child {
       try {
         if (task != null) {
           // do cleanup for the task
-          task.taskCleanup(umbilical);
+          if(childUGI == null) {
+            task.taskCleanup(umbilical);
+          } else {
+            final Task taskFinal = task;
+            childUGI.doAs(new PrivilegedExceptionAction<Object>() {
+              @Override
+              public Object run() throws Exception {
+                taskFinal.taskCleanup(umbilical);
+                return null;
+              }
+            });
+          }
         }
       } catch (Exception e) {
         LOG.info("Error cleaning up", e);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077153&r1=1077152&r2=1077153&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Fri Mar  4 03:46:35 2011
@@ -271,6 +271,8 @@ class JobInProgress {
   private Map<TaskTracker, FallowSlotInfo> trackersReservedForReduces = 
     new HashMap<TaskTracker, FallowSlotInfo>();
   private Path jobSubmitDir = null;
+
+  final private UserGroupInformation userUGI;
   
   /**
    * Create an almost empty JobInProgress, which can be used only for tests
@@ -284,6 +286,11 @@ class JobInProgress {
     this.anyCacheLevel = this.maxLevel+1;
     this.jobtracker = tracker;
     this.restartCount = 0;
+    try {
+      this.userUGI = UserGroupInformation.getCurrentUser();
+    } catch (IOException ie){
+      throw new RuntimeException(ie);
+    }
   }
   
   /**
@@ -320,14 +327,14 @@ class JobInProgress {
     // use the user supplied token to add user credentials to the conf
     jobSubmitDir = jobInfo.getJobSubmitDir();
     user = jobInfo.getUser().toString();
-    UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
+    userUGI = UserGroupInformation.createRemoteUser(user);
     if (ts != null) {
       for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
-        ugi.addToken(token);
+        userUGI.addToken(token);
       }
     }
 
-    fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+    fs = userUGI.doAs(new PrivilegedExceptionAction<FileSystem>() {
       public FileSystem run() throws IOException {
         return jobSubmitDir.getFileSystem(default_conf);
       }});
@@ -525,10 +532,21 @@ class JobInProgress {
     }
 
     LOG.info("Initializing " + jobId);
-
-    // log job info
-    JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile, 
-                                    this.startTime, hasRestarted());
+    final long startTimeFinal = this.startTime;
+    // log job info as the user running the job
+    try {
+    userUGI.doAs(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() throws Exception {
+        JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile, 
+            startTimeFinal, hasRestarted());
+        return null;
+      }
+    });
+    } catch(InterruptedException ie) {
+      throw new IOException(ie);
+    }
+    
     // log the job priority
     setPriority(this.priority);
     

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077153&r1=1077152&r2=1077153&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Fri Mar  4 03:46:35 2011
@@ -2031,8 +2031,15 @@ public class JobTracker implements MRCon
         tmpInfoPort == 0, conf);
     infoServer.setAttribute("job.tracker", this);
     // initialize history parameters.
-    boolean historyInitialized = JobHistory.init(this, conf, this.localMachine,
-                                                 this.startTime);
+    final JobTracker jtFinal = this;
+    boolean historyInitialized = 
+      mrOwner.doAs(new PrivilegedExceptionAction<Boolean>() {
+        @Override
+        public Boolean run() throws Exception {
+          return JobHistory.init(jtFinal, conf,jtFinal.localMachine, 
+              jtFinal.startTime);
+        }
+      });
     
     infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class);
     infoServer.start();
@@ -2147,16 +2154,16 @@ public class JobTracker implements MRCon
 
     // Initialize history DONE folder
     if (historyInitialized) {
-      JobHistory.initDone(conf, fs);
-      final String historyLogDir = 
-        JobHistory.getCompletedJobHistoryLocation().toString();
-      infoServer.setAttribute("historyLogDir", historyLogDir);
       FileSystem historyFS = mrOwner.doAs(new PrivilegedExceptionAction<FileSystem>()
{
         public FileSystem run() throws IOException {
+          JobHistory.initDone(conf, fs);
+          final String historyLogDir = 
+            JobHistory.getCompletedJobHistoryLocation().toString();
+          infoServer.setAttribute("historyLogDir", historyLogDir);
+          
           return new Path(historyLogDir).getFileSystem(conf);
         }
       });
-      infoServer.setAttribute("historyLogDir", historyLogDir);
       infoServer.setAttribute("fileSys", historyFS);
     }
 
@@ -3570,13 +3577,22 @@ public class JobTracker implements MRCon
    * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getStagingAreaDir()
    */
   public String getStagingAreaDir() throws IOException {
-    Path stagingRootDir =
-      new Path(conf.get("mapreduce.jobtracker.staging.root.dir",
-          "/tmp/hadoop/mapred/staging"));
-    FileSystem fs = stagingRootDir.getFileSystem(conf);
-    String user = UserGroupInformation.getCurrentUser().getShortUserName();
-    return fs.makeQualified(new Path(stagingRootDir,
-                                user+"/.staging")).toString();
+    try{
+      final String user = UserGroupInformation.getCurrentUser().getShortUserName();
+      return mrOwner.doAs(new PrivilegedExceptionAction<String>() {
+        @Override
+        public String run() throws Exception {
+          Path stagingRootDir =
+            new Path(conf.get("mapreduce.jobtracker.staging.root.dir",
+                "/tmp/hadoop/mapred/staging"));
+          FileSystem fs = stagingRootDir.getFileSystem(conf);
+          return fs.makeQualified(new Path(stagingRootDir,
+                                    user+"/.staging")).toString();
+        }
+      });
+    } catch(InterruptedException ie) {
+      throw new IOException(ie);
+    }
   }
 
   /**



Mime
View raw message