hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r904969 - in /hadoop/mapreduce/trunk: ./ src/contrib/mumak/src/java/org/apache/hadoop/mapred/ src/contrib/mumak/src/test/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/...
Date Sun, 31 Jan 2010 03:49:07 GMT
Author: ddas
Date: Sun Jan 31 03:49:06 2010
New Revision: 904969

URL: http://svn.apache.org/viewvc?rev=904969&view=rev
Log:
MAPREDUCE-1432. Adds hooks in the jobtracker and tasktracker for loading the tokens in the
user's ugi. This is required for the copying of files from the hdfs. Contributed by Devaraj
Das.

Removed:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenStorage.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenStorage.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java
    hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=904969&r1=904968&r2=904969&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Sun Jan 31 03:49:06 2010
@@ -42,6 +42,10 @@
     implements offline erasure-coding.  It can be used to reduce the 
     total storage requirements of HDFS.  (dhruba)
 
+    MAPREDUCE-1432. Adds hooks in the jobtracker and tasktracker 
+    for loading the tokens in the user's ugi. This is required
+    for the copying of files from the hdfs. (ddas)
+
   IMPROVEMENTS
 
     MAPREDUCE-1198. Alternatively schedule different types of tasks in

Modified: hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java?rev=904969&r1=904968&r2=904969&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java
Sun Jan 31 03:49:06 2010
@@ -35,7 +35,7 @@
 import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
 import org.apache.hadoop.mapred.SimulatorJobInProgress;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.security.TokenStorage;
 
 /**
  * {@link SimulatorJobTracker} extends {@link JobTracker}. It implements the

Modified: hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java?rev=904969&r1=904968&r2=904969&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java
Sun Jan 31 03:49:06 2010
@@ -35,7 +35,7 @@
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.QueueAclsInfo;
 import org.apache.hadoop.mapreduce.QueueInfo;
-import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.security.TokenStorage;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskReport;
 import org.apache.hadoop.mapreduce.TaskTrackerInfo;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java?rev=904969&r1=904968&r2=904969&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java Sun Jan 31 03:49:06
2010
@@ -32,7 +32,8 @@
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsUtil;
@@ -72,8 +73,10 @@
         firstTaskid.getTaskType() == TaskType.MAP,jvmIdInt);
     
     //load token cache storage
-    String jobTokenFile = System.getenv().get("JOB_TOKEN_FILE");
-    TokenStorage ts = TokenCache.loadTaskTokenStorage(jobTokenFile, defaultConf);
+    String jobTokenFile = 
+      System.getenv().get(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+    TokenStorage ts = 
+      TokenCache.loadTaskTokenStorage(jobTokenFile, defaultConf);
     LOG.debug("loading token. # keys =" +ts.numberOfSecretKeys() + 
         "; from file=" + jobTokenFile);
     
@@ -155,9 +158,10 @@
         JobConf job = new JobConf(task.getJobFile());
         
         // set job shuffle token
-        Token<? extends TokenIdentifier> jt = ts.getJobToken();
+        Token<? extends TokenIdentifier> jt = TokenCache.getJobToken(ts);
         // set the jobTokenFile into task
-        task.setJobTokenSecret(JobTokenSecretManager.createSecretKey(jt.getPassword()));
+        task.setJobTokenSecret(JobTokenSecretManager.
+                               createSecretKey(jt.getPassword()));
         
         // setup the child's Configs.LOCAL_DIR. The child is now sandboxed and
         // can only see files down and under attemtdir only.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=904969&r1=904968&r2=904969&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Sun Jan 31
03:49:06 2010
@@ -66,7 +66,7 @@
 import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.security.TokenStorage;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 import org.apache.hadoop.mapreduce.split.JobSplit;
@@ -79,6 +79,7 @@
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 
@@ -377,16 +378,22 @@
     this.startTime = System.currentTimeMillis();
     
     this.localFs = jobtracker.getLocalFileSystem();
-
+    this.tokenStorage = ts;
     // use the user supplied token to add user credentials to the conf
     jobSubmitDir = jobInfo.getJobSubmitDir();
     user = jobInfo.getUser().toString();
+
     UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
-      fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
-        public FileSystem run() throws IOException {
-          return jobSubmitDir.getFileSystem(default_conf);
+    if (ts != null) {
+      for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
+        ugi.addToken(token);
+      }
+    }
+
+    fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+      public FileSystem run() throws IOException {
+        return jobSubmitDir.getFileSystem(default_conf);
       }});
-    
     this.localJobFile = 
       default_conf.getLocalPath(JobTracker.SUBDIR + "/" + this.jobId + ".xml");
     
@@ -449,8 +456,6 @@
         JobContext.SPECULATIVECAP,0.1f);
     this.slowNodeThreshold = conf.getFloat(
         JobContext.SPECULATIVE_SLOWNODE_THRESHOLD,1.0f);
-    this.tokenStorage = ts;
-
   }
 
   /**
@@ -3570,7 +3575,7 @@
     if(tokenStorage == null)
       tokenStorage = new TokenStorage();
     
-    tokenStorage.setJobToken(token);
+    TokenCache.setJobToken(token, tokenStorage);
     
     // write TokenStorage out
     tokenStorage.write(os);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=904969&r1=904968&r2=904969&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Sun Jan 31 03:49:06
2010
@@ -76,7 +76,7 @@
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
-import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.security.TokenStorage;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=904969&r1=904968&r2=904969&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Sun Jan 31
03:49:06 2010
@@ -49,7 +49,7 @@
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.security.TokenStorage;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.State;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=904969&r1=904968&r2=904969&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Sun Jan 31 03:49:06
2010
@@ -175,7 +175,7 @@
       // We don't create any symlinks yet, so presence/absence of workDir
       // actually on the file system doesn't matter.
       UserGroupInformation ugi = 
-        UserGroupInformation.createRemoteUser(conf.getUser());
+        tracker.getRunningJob(t.getJobID()).getUGI();
       ugi.doAs(new PrivilegedExceptionAction<Void>() {
         public Void run() throws IOException {
           taskDistributedCacheManager = 
@@ -516,9 +516,9 @@
     env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
     
     // put jobTokenFile name into env
-    String jobTokenFile = conf.get(TokenCache.JOB_TOKEN_FILENAME);
+    String jobTokenFile = conf.get(TokenCache.JOB_TOKENS_FILENAME);
     LOG.debug("putting jobToken file name into environment fn=" + jobTokenFile);
-    env.put("JOB_TOKEN_FILE", jobTokenFile);
+    env.put(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION, jobTokenFile);
     
     // for the child of task jvm, set hadoop.root.logger
     env.put("HADOOP_ROOT_LOGGER", "INFO,TLA");

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=904969&r1=904968&r2=904969&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Sun Jan 31 03:49:06
2010
@@ -79,7 +79,7 @@
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.security.TokenStorage;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
@@ -102,6 +102,7 @@
 import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
 import org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.util.StringUtils;
@@ -440,6 +441,10 @@
   JobTokenSecretManager getJobTokenSecretManager() {
     return jobTokenSecretManager;
   }
+  
+  RunningJob getRunningJob(JobID jobId) {
+    return runningJobs.get(jobId);
+  }
 
   Localizer getLocalizer() {
     return localizer;
@@ -933,8 +938,8 @@
 
     synchronized (rjob) {
       if (!rjob.localized) {
-
-        JobConf localJobConf = localizeJobFiles(t);
+       
+        JobConf localJobConf = localizeJobFiles(t, rjob);
 
         // Now initialize the job via task-controller so as to set
         // ownership/permissions of jars, job-work-dir. Note that initializeJob
@@ -949,20 +954,17 @@
         rjob.jobConf = localJobConf;
         rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
                              localJobConf.getKeepFailedTaskFiles());
-        TokenStorage ts = TokenCache.loadTokens(
-            rjob.jobConf.get(TokenCache.JOB_TOKEN_FILENAME), rjob.jobConf);
-        Token<JobTokenIdentifier> jt = (Token<JobTokenIdentifier>)ts.getJobToken();

-        getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
         rjob.localized = true;
       }
     }
     launchTaskForJob(tip, new JobConf(rjob.jobConf)); 
   }
 
-  private FileSystem getFS(final Path filePath, String user, 
+  private FileSystem getFS(final Path filePath, JobID jobId, 
       final Configuration conf) throws IOException, InterruptedException {
-    UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
-    FileSystem userFs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+    RunningJob rJob = runningJobs.get(jobId);
+    FileSystem userFs = 
+      rJob.ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
         public FileSystem run() throws IOException {
           return filePath.getFileSystem(conf);
       }});
@@ -985,7 +987,8 @@
    *         job as a starting point.
    * @throws IOException
    */
-  JobConf localizeJobFiles(Task t)
+  @SuppressWarnings("unchecked")
+  JobConf localizeJobFiles(Task t, RunningJob rjob)
       throws IOException, InterruptedException {
     JobID jobId = t.getJobID();
     String userName = t.getUser();
@@ -993,7 +996,19 @@
     // Initialize the job directories
     FileSystem localFs = FileSystem.getLocal(fConf);
     getLocalizer().initializeJobDirs(userName, jobId);
+    // save local copy of JobToken file
+    String localJobTokenFile = localizeJobTokenFile(t.getUser(), jobId);
+    rjob.ugi = UserGroupInformation.createRemoteUser(t.getUser());
 
+    TokenStorage ts = TokenCache.loadTokens(localJobTokenFile, fConf);
+    Token<JobTokenIdentifier> jt = 
+      (Token<JobTokenIdentifier>)TokenCache.getJobToken(ts);
+    if (jt != null) { //could be null in the case of some unit tests
+      getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
+    }
+    for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
+      rjob.ugi.addToken(token);
+    }
     // Download the job.xml for this job from the system FS
     Path localJobFile =
         localizeJobConfFile(new Path(t.getJobFile()), userName, jobId);
@@ -1002,6 +1017,12 @@
     //WE WILL TRUST THE USERNAME THAT WE GOT FROM THE JOBTRACKER
     //AS PART OF THE TASK OBJECT
     localJobConf.setUser(userName);
+    
+    // set the location of the token file into jobConf to transfer 
+    // the name to TaskRunner
+    localJobConf.set(TokenCache.JOB_TOKENS_FILENAME,
+        localJobTokenFile.toString());
+    
 
     // create the 'job-work' directory: job-specific shared directory for use as
     // scratch space by all tasks of the same job running on this TaskTracker. 
@@ -1016,8 +1037,7 @@
     localJobConf.set(JOB_LOCAL_DIR, workDir.toUri().getPath());
     // Download the job.jar for this job from the system FS
     localizeJobJarFile(userName, jobId, localFs, localJobConf);
-    // save local copy of JobToken file
-    localizeJobTokenFile(userName, jobId, localJobConf);
+    
     return localJobConf;
   }
 
@@ -1032,7 +1052,7 @@
   private Path localizeJobConfFile(Path jobFile, String user, JobID jobId)
       throws IOException, InterruptedException {
     final JobConf conf = new JobConf(getJobConf());
-    FileSystem userFs = getFS(jobFile, user, conf);
+    FileSystem userFs = getFS(jobFile, jobId, conf);
     // Get sizes of JobFile
     // sizes are -1 if they are not present.
     FileStatus status = null;
@@ -1071,7 +1091,7 @@
     long jarFileSize = -1;
     if (jarFile != null) {
       Path jarFilePath = new Path(jarFile);
-      FileSystem fs = getFS(jarFilePath, user, localJobConf);
+      FileSystem fs = getFS(jarFilePath, jobId, localJobConf);
       try {
         status = fs.getFileStatus(jarFilePath);
         jarFileSize = status.getLen();
@@ -3151,6 +3171,7 @@
     volatile Set<TaskInProgress> tasks;
     boolean localized;
     boolean keepJobFiles;
+    UserGroupInformation ugi;
     FetchStatus f;
     RunningJob(JobID jobid) {
       this.jobid = jobid;
@@ -3162,6 +3183,10 @@
     JobID getJobID() {
       return jobid;
     }
+    
+    UserGroupInformation getUGI() {
+      return ugi;
+    }
       
     void setFetchStatus(FetchStatus f) {
       this.f = f;
@@ -3784,7 +3809,7 @@
      * @return the local file system path of the downloaded file.
      * @throws IOException
      */
-    private void localizeJobTokenFile(String user, JobID jobId, JobConf jobConf)
+    private String localizeJobTokenFile(String user, JobID jobId)
         throws IOException {
       // check if the tokenJob file is there..
       Path skPath = new Path(systemDirectory, 
@@ -3798,14 +3823,13 @@
       Path localJobTokenFile =
           lDirAlloc.getLocalPathForWrite(getLocalJobTokenFile(user, 
               jobId.toString()), jobTokenSize, fConf);
-    
+      String localJobTokenFileStr = localJobTokenFile.toUri().getPath();
       LOG.debug("localizingJobTokenFile from sd="+skPath.toUri().getPath() + 
-          " to " + localJobTokenFile.toUri().getPath());
+          " to " + localJobTokenFileStr);
       
       // Download job_token
       systemFS.copyToLocalFile(skPath, localJobTokenFile);      
-      // set it into jobConf to transfer the name to TaskRunner
-      jobConf.set(TokenCache.JOB_TOKEN_FILENAME,localJobTokenFile.toString());
+      return localJobTokenFileStr;
     }
 
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=904969&r1=904968&r2=904969&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java Sun Jan
31 03:49:06 2010
@@ -485,7 +485,7 @@
           mapper.readValue(new File(localFileName), Map.class);
 
         for(Map.Entry<String, String> ent: nm.entrySet()) {
-          TokenCache.setSecretKey(new Text(ent.getKey()), ent.getValue().getBytes());
+          TokenCache.addSecretKey(new Text(ent.getKey()), ent.getValue().getBytes());
         }
       } catch (JsonMappingException e) {
         json_error = true;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java?rev=904969&r1=904968&r2=904969&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
Sun Jan 31 03:49:06 2010
@@ -20,7 +20,6 @@
 
 import java.io.IOException;
 
-import javax.security.auth.login.LoginException;
 
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.mapreduce.ClusterMetrics;
@@ -34,7 +33,7 @@
 import org.apache.hadoop.mapreduce.TaskReport;
 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.security.TokenStorage;
 import org.apache.hadoop.mapreduce.server.jobtracker.State;
 
 /** 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=904969&r1=904968&r2=904969&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java Sun
Jan 31 03:49:06 2010
@@ -37,30 +37,21 @@
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.TokenStorage;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 
 
 /**
- * this class keeps static references to TokenStorage object
- * also it provides auxiliary methods for setting and getting secret keys  
+ * This class provides user facing APIs for transferring secrets from
+ * the job client to the tasks.
+ * The secrets can be stored just before submission of jobs and read during
+ * the task execution.  
  */
 @InterfaceStability.Evolving
 public class TokenCache {
   
   private static final Log LOG = LogFactory.getLog(TokenCache.class);
-  
-  /**
-   * file name used on HDFS for generated job token
-   */
-  public static final String JOB_TOKEN_HDFS_FILE = "jobToken";
-
-  /**
-   * conf setting for job tokens cache file name
-   */
-  public static final String JOB_TOKEN_FILENAME = "mapreduce.job.jobTokenFile";
-  
-  
 
   private static TokenStorage tokenStorage;
   
@@ -80,7 +71,7 @@
    * @param alias
    * @param key
    */
-  public static void setSecretKey(Text alias, byte[] key) {
+  public static void addSecretKey(Text alias, byte[] key) {
     getTokenStorage().addSecretKey(alias, key);
   }
   
@@ -89,17 +80,7 @@
    */
   public static void addDelegationToken(
       String namenode, Token<? extends TokenIdentifier> t) {
-    getTokenStorage().setToken(new Text(namenode), t);
-  }
-
-  /**
-   * 
-   * @param namenode
-   * @return delegation token
-   */
-  @SuppressWarnings("unchecked")
-  public static Token<DelegationTokenIdentifier> getDelegationToken(String namenode)
{
-    return (Token<DelegationTokenIdentifier>)getTokenStorage().getToken(new Text(namenode));
+    getTokenStorage().addToken(new Text(namenode), t);
   }
 
   /**
@@ -109,7 +90,70 @@
   public static Collection<Token<? extends TokenIdentifier>> getAllTokens() {
     return getTokenStorage().getAllTokens();
   }
+  /**
+   * Convenience method to obtain delegation tokens from namenodes 
+   * corresponding to the paths passed.
+   * @param ps array of paths
+   * @param conf configuration
+   * @throws IOException
+   */
+  public static void obtainTokensForNamenodes(Path [] ps, Configuration conf) 
+  throws IOException {
+    // get jobtracker principal id (for the renewer)
+    Text jtCreds = new Text(conf.get(JobContext.JOB_JOBTRACKER_ID, ""));
+   
+    for(Path p: ps) {
+      FileSystem fs = FileSystem.get(p.toUri(), conf);
+      if(fs instanceof DistributedFileSystem) {
+        DistributedFileSystem dfs = (DistributedFileSystem)fs;
+        URI uri = fs.getUri();
+        String fs_addr = buildDTServiceName(uri);
+        
+        // see if we already have the token
+        Token<DelegationTokenIdentifier> token = 
+          TokenCache.getDelegationToken(fs_addr); 
+        if(token != null) {
+          LOG.debug("DT for " + token.getService()  + " is already present");
+          continue;
+        }
+        // get the token
+        token = dfs.getDelegationToken(jtCreds);
+        if(token==null) 
+          throw new IOException("Token from " + fs_addr + " is null");
+
+        token.setService(new Text(fs_addr));
+        TokenCache.addDelegationToken(fs_addr, token);
+        LOG.info("getting dt for " + p.toString() + ";uri="+ fs_addr + 
+            ";t.service="+token.getService());
+      }
+    }
+  }
   
+  /**
+   * file name used on HDFS for generated job token
+   */
+  @InterfaceAudience.Private
+  public static final String JOB_TOKEN_HDFS_FILE = "jobToken";
+
+  /**
+   * conf setting for job tokens cache file name
+   */
+  @InterfaceAudience.Private
+  public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile";
+  private static final Text JOB_TOKEN = new Text("ShuffleAndJobToken");
+  
+  /**
+   * 
+   * @param namenode
+   * @return delegation token
+   */
+  @SuppressWarnings("unchecked")
+  @InterfaceAudience.Private
+  public static Token<DelegationTokenIdentifier> 
+  getDelegationToken(String namenode) {
+    return (Token<DelegationTokenIdentifier>)getTokenStorage().
+            getToken(new Text(namenode));
+  }
 
   /**
    * @return TokenStore object
@@ -173,6 +217,23 @@
     in.close();
     return ts;
   }
+  /**
+   * store job token
+   * @param t
+   */
+  @InterfaceAudience.Private
+  public static void setJobToken(Token<? extends TokenIdentifier> t, 
+      TokenStorage ts) {
+    ts.addToken(JOB_TOKEN, t);
+  }
+  /**
+   * 
+   * @return job token
+   */
+  @InterfaceAudience.Private
+  public static Token<? extends TokenIdentifier> getJobToken(TokenStorage ts) {
+    return ts.getToken(JOB_TOKEN);
+  }
   
   static String buildDTServiceName(URI uri) {
     int port = uri.getPort();
@@ -184,36 +245,4 @@
     sb.append(NetUtils.normalizeHostName(uri.getHost())).append(":").append(port);
     return sb.toString();
   }
-  
-  public static void obtainTokensForNamenodes(Path [] ps, Configuration conf) 
-  throws IOException {
-    // get jobtracker principal id (for the renewer)
-    Text jtCreds = new Text(conf.get(JobContext.JOB_JOBTRACKER_ID, ""));
-   
-    for(Path p: ps) {
-      FileSystem fs = FileSystem.get(p.toUri(), conf);
-      if(fs instanceof DistributedFileSystem) {
-        DistributedFileSystem dfs = (DistributedFileSystem)fs;
-        URI uri = fs.getUri();
-        String fs_addr = buildDTServiceName(uri);
-        
-        // see if we already have the token
-        Token<DelegationTokenIdentifier> token = 
-          TokenCache.getDelegationToken(fs_addr); 
-        if(token != null) {
-          LOG.debug("DT for " + token.getService()  + " is already present");
-          continue;
-        }
-        // get the token
-        token = dfs.getDelegationToken(jtCreds);
-        if(token==null) 
-          throw new IOException("Token from " + fs_addr + " is null");
-
-        token.setService(new Text(fs_addr));
-        TokenCache.addDelegationToken(fs_addr, token);
-        LOG.info("getting dt for " + p.toString() + ";uri="+ fs_addr + 
-            ";t.service="+token.getService());
-      }
-    }
-  }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=904969&r1=904968&r2=904969&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
Sun Jan 31 03:49:06 2010
@@ -144,6 +144,11 @@
     // Set up the task to be localized
     String jtIdentifier = "200907202331";
     jobId = new JobID(jtIdentifier, 1);
+    
+    TaskTracker.RunningJob rjob = new TaskTracker.RunningJob(jobId);
+    rjob.ugi = UserGroupInformation.getCurrentUser();
+    tracker.runningJobs.put(jobId, rjob);
+    
     taskId =
         new TaskAttemptID(jtIdentifier, jobId.getId(), TaskType.MAP, 1, 0);
     task =
@@ -404,7 +409,8 @@
     tracker.getLocalizer().initializeUserDirs(task.getUser());
 
     // /////////// The main method being tested
-    localizedJobConf = tracker.localizeJobFiles(task);
+    localizedJobConf = tracker.localizeJobFiles(task, 
+        new TaskTracker.RunningJob(task.getJobID()));
     // ///////////
 
     // Now initialize the job via task-controller so as to set
@@ -498,7 +504,8 @@
       return;
     }
     tracker.getLocalizer().initializeUserDirs(task.getUser());
-    localizedJobConf = tracker.localizeJobFiles(task);
+    localizedJobConf = tracker.localizeJobFiles(task, 
+        new TaskTracker.RunningJob(task.getJobID()));
 
     // Now initialize the job via task-controller so as to set
     // ownership/permissions of jars, job-work-dir
@@ -704,7 +711,8 @@
       throws Exception {
     // Localize job and localize task.
     tracker.getLocalizer().initializeUserDirs(task.getUser());
-    localizedJobConf = tracker.localizeJobFiles(task);
+    localizedJobConf = tracker.localizeJobFiles(task, 
+        new TaskTracker.RunningJob(task.getJobID()));
     if (jvmReuse) {
       localizedJobConf.setNumTasksToExecutePerJvm(2);
     }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java?rev=904969&r1=904968&r2=904969&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
Sun Jan 31 03:49:06 2010
@@ -46,6 +46,7 @@
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.security.TokenStorage;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.StringUtils;
@@ -76,7 +77,7 @@
       
       System.out.println("inside MAP: ts==NULL?=" + (ts==null) + 
           "; #keys = " + (ts==null? 0:ts.numberOfSecretKeys()) + 
-          ";jobToken = " +  (ts==null? "n/a":ts.getJobToken()) +
+          ";jobToken = " +  (ts==null? "n/a":TokenCache.getJobToken(ts)) +
           "; alias1 key=" + new String(key1) + 
           "; dts size= " + dts_size);
     
@@ -249,7 +250,7 @@
     
     p1 = fs.makeQualified(p1);
     // do not qualify p2
-    
+    TokenCache.setTokenStorage(new TokenStorage());
     TokenCache.obtainTokensForNamenodes(new Path [] {p1, p2}, jConf);
     
     // this token is keyed by hostname:port key.



Mime
View raw message