hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r904609 - in /hadoop/mapreduce/trunk: ./ src/contrib/mumak/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/hadoop/mapreduce/filecache/ src/java/org/apache/hadoop/mapreduce/lib/input/ src/java/or...
Date Fri, 29 Jan 2010 19:50:55 GMT
Author: ddas
Date: Fri Jan 29 19:50:54 2010
New Revision: 904609

URL: http://svn.apache.org/viewvc?rev=904609&view=rev
Log:
MAPREDUCE-1383. Automates fetching of delegation tokens in File*Formats Distributed Cache
and Distcp. Also, provides a config  mapreduce.job.hdfs-servers that the jobs can populate
with a comma separated list of namenodes. The job client automatically fetches delegation
tokens from those namenodes. Contributed by Boris Shkolnik.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/mumak/ivy.xml
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenStorage.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=904609&r1=904608&r2=904609&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jan 29 19:50:54 2010
@@ -12,6 +12,12 @@
 
   NEW FEATURES
 
+    MAPREDUCE-1383. Automates fetching of delegation tokens in File*Formats
+    Distributed Cache and Distcp. Also, provides a config 
+    mapreduce.job.hdfs-servers that the jobs can populate with a comma
+    separated list of namenodes. The job client automatically fetches
+    delegation tokens from those namenodes. (Boris Shkolnik via ddas)    
+
     MAPREDUCE-698. Per-pool task limits for the fair scheduler.
     (Kevin Peterson via matei)
 

Modified: hadoop/mapreduce/trunk/src/contrib/mumak/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/ivy.xml?rev=904609&r1=904608&r2=904609&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/ivy.xml (original)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/ivy.xml Fri Jan 29 19:50:54 2010
@@ -43,6 +43,10 @@
                 rev="${hadoop-core.version}" conf="common->default"/>
     <dependency org="org.apache.hadoop" name="hadoop-core-test" 
                 rev="${hadoop-core.version}" conf="common->default"/>
+    <dependency org="org.apache.hadoop" name="hadoop-hdfs" 
+                rev="${hadoop-hdfs.version}" conf="common->default"/>
+    <dependency org="org.apache.hadoop" name="hadoop-hdfs-test"
+                rev="${hadoop-hdfs.version}" conf="test->default"/>
     <dependency org="commons-logging"
       name="commons-logging"
       rev="${commons-logging.version}"

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java?rev=904609&r1=904608&r2=904609&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java Fri Jan 29 19:50:54
2010
@@ -33,12 +33,12 @@
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.TokenStorage;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.jvm.JvmMetrics;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.LogManager;
@@ -73,8 +73,7 @@
     
     //load token cache storage
     String jobTokenFile = System.getenv().get("JOB_TOKEN_FILE");
-    defaultConf.set(JobContext.JOB_TOKEN_FILE, jobTokenFile);
-    TokenStorage ts = TokenCache.loadTaskTokenStorage(defaultConf);
+    TokenStorage ts = TokenCache.loadTaskTokenStorage(jobTokenFile, defaultConf);
     LOG.debug("loading token. # keys =" +ts.numberOfSecretKeys() + 
         "; from file=" + jobTokenFile);
     
@@ -156,7 +155,7 @@
         JobConf job = new JobConf(task.getJobFile());
         
         // set job shuffle token
-        Token<JobTokenIdentifier> jt = (Token<JobTokenIdentifier>)ts.getJobToken();
+        Token<? extends TokenIdentifier> jt = ts.getJobToken();
         // set the jobTokenFile into task
         task.setJobTokenSecret(JobTokenSecretManager.createSecretKey(jt.getPassword()));
         

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java?rev=904609&r1=904608&r2=904609&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java Fri Jan
29 19:50:54 2010
@@ -36,6 +36,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
@@ -154,6 +155,9 @@
       throw new IOException("No input paths specified in job");
     }
 
+    // get tokens for all the required FileSystems..
+    TokenCache.obtainTokensForNamenodes(dirs, job);
+    
     List<FileStatus> result = new ArrayList<FileStatus>();
     List<IOException> errors = new ArrayList<IOException>();
     

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputFormat.java?rev=904609&r1=904608&r2=904609&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputFormat.java Fri Jan
29 19:50:54 2010
@@ -24,6 +24,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 
@@ -111,6 +112,10 @@
       // normalize the output directory
       outDir = fs.makeQualified(outDir);
       setOutputPath(job, outDir);
+      
+      // get delegation token for the outDir's file system
+      TokenCache.obtainTokensForNamenodes(new Path[] {outDir}, job);
+      
       // check its existence
       if (fs.exists(outDir)) {
         throw new FileAlreadyExistsException("Output directory " + outDir + 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=904609&r1=904608&r2=904609&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Jan 29
19:50:54 2010
@@ -65,7 +65,7 @@
 import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
-import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.TokenStorage;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
@@ -3555,7 +3555,7 @@
    */
   private void generateAndStoreTokens() throws IOException{
     Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
-    Path keysFile = new Path(jobDir, SecureShuffleUtils.JOB_TOKEN_FILENAME);
+    Path keysFile = new Path(jobDir, TokenCache.JOB_TOKEN_HDFS_FILE);
     // we need to create this file using the jobtracker's filesystem
     FSDataOutputStream os = jobtracker.getFileSystem().create(keysFile);
     

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=904609&r1=904608&r2=904609&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Fri Jan 29 19:50:54
2010
@@ -38,6 +38,7 @@
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
+import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileSystem;
@@ -515,7 +516,7 @@
     env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
     
     // put jobTokenFile name into env
-    String jobTokenFile = conf.get(JobContext.JOB_TOKEN_FILE);
+    String jobTokenFile = conf.get(TokenCache.JOB_TOKEN_FILENAME);
     LOG.debug("putting jobToken file name into environment fn=" + jobTokenFile);
     env.put("JOB_TOKEN_FILE", jobTokenFile);
     

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=904609&r1=904608&r2=904609&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Jan 29 19:50:54
2010
@@ -921,6 +921,7 @@
                               new LocalDirAllocator(MRConfig.LOCAL_DIR);
 
   // intialize the job directory
+  @SuppressWarnings("unchecked")
   private void localizeJob(TaskInProgress tip
                            ) throws IOException, InterruptedException {
     Task t = tip.getTask();
@@ -948,7 +949,8 @@
         rjob.jobConf = localJobConf;
         rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
                              localJobConf.getKeepFailedTaskFiles());
-        TokenStorage ts = TokenCache.loadTokens(rjob.jobConf);
+        TokenStorage ts = TokenCache.loadTokens(
+            rjob.jobConf.get(TokenCache.JOB_TOKEN_FILENAME), rjob.jobConf);
         Token<JobTokenIdentifier> jt = (Token<JobTokenIdentifier>)ts.getJobToken();

         getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
         rjob.localized = true;
@@ -3786,7 +3788,7 @@
         throws IOException {
       // check if the tokenJob file is there..
       Path skPath = new Path(systemDirectory, 
-          jobId.toString()+"/"+SecureShuffleUtils.JOB_TOKEN_FILENAME);
+          jobId.toString()+"/"+TokenCache.JOB_TOKEN_HDFS_FILE);
       
       FileStatus status = null;
       long jobTokenSize = -1;
@@ -3803,7 +3805,7 @@
       // Download job_token
       systemFS.copyToLocalFile(skPath, localJobTokenFile);      
       // set it into jobConf to transfer the name to TaskRunner
-      jobConf.set(JobContext.JOB_TOKEN_FILE,localJobTokenFile.toString());
+      jobConf.set(TokenCache.JOB_TOKEN_FILENAME,localJobTokenFile.toString());
     }
 
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=904609&r1=904608&r2=904609&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java Fri Jan 29
19:50:54 2010
@@ -229,7 +229,9 @@
     "mapreduce.reduce.merge.memtomem.threshold";
   public static final String REDUCE_MEMTOMEM_ENABLED = 
     "mapreduce.reduce.merge.memtomem.enabled";
-  public static final String JOB_TOKEN_FILE = "mapreduce.job.jobTokenFile";
+  public static final String JOB_NAMENODES = "mapreduce.job.hdfs-servers";
+  public static final String JOB_JOBTRACKER_ID = "mapreduce.job.kerberos.jtprinicipal";
+  
 
   /**
    * Return the configuration for the job.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=904609&r1=904608&r2=904609&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java Fri Jan
29 19:50:54 2010
@@ -36,14 +36,16 @@
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.split.JobSplitWriter;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
 
 class JobSubmitter {
@@ -231,6 +233,8 @@
     TrackerDistributedCacheManager.determineTimestamps(conf);
     //  set the public/private visibility of the archives and files
     TrackerDistributedCacheManager.determineCacheVisibilities(conf);
+    // get DelegationToken for each cached file
+    TrackerDistributedCacheManager.getDelegationTokens(conf);
   }
   
   private URI getPathURI(Path destPath, String fragment) 
@@ -316,29 +320,14 @@
       conf.set("mapreduce.job.dir", submitJobDir.toString());
       LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
           + " as the submit dir");
+      // get delegation token for the dir
+      TokenCache.obtainTokensForNamenodes(new Path [] {submitJobDir}, conf);
+      
       copyAndConfigureFiles(job, submitJobDir);
       Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
 
       checkSpecs(job);
       
-      // create TokenStorage object with user secretKeys
-      String tokensFileName = conf.get("tokenCacheFile");
-      TokenStorage tokenStorage = null;
-      if(tokensFileName != null) {
-        LOG.info("loading secret keys from " + tokensFileName);
-        String localFileName = new Path(tokensFileName).toUri().getPath();
-        tokenStorage = new TokenStorage();
-        // read JSON
-        ObjectMapper mapper = new ObjectMapper();
-        Map<String, String> nm = 
-          mapper.readValue(new File(localFileName), Map.class);
-        
-        for(Map.Entry<String, String> ent: nm.entrySet()) {
-          LOG.debug("adding secret key alias="+ent.getKey());
-          tokenStorage.addSecretKey(new Text(ent.getKey()), ent.getValue().getBytes());
-        }
-      }
-
       // Create the splits for the job
       LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
       int maps = writeSplits(job, submitJobDir);
@@ -347,10 +336,13 @@
 
       // Write job file to submit dir
       writeConf(conf, submitJobFile);
+      
       //
       // Now, actually submit the job (using the submit name)
       //
-      status = submitClient.submitJob(jobId, submitJobDir.toString(), tokenStorage);
+      populateTokenCache(conf);
+      status = submitClient.submitJob(
+          jobId, submitJobDir.toString(), TokenCache.getTokenStorage());
       if (status != null) {
         return status;
       } else {
@@ -475,4 +467,44 @@
       }
     }
   }
+  
+  // get secret keys and tokens and store them into TokenCache
+  @SuppressWarnings("unchecked")
+  private void populateTokenCache(Configuration conf) throws IOException{
+    // create TokenStorage object with user secretKeys
+    String tokensFileName = conf.get("tokenCacheFile");
+    if(tokensFileName != null) {
+      LOG.info("loading user's secret keys from " + tokensFileName);
+      String localFileName = new Path(tokensFileName).toUri().getPath();
+
+      boolean json_error = false;
+      try {
+        // read JSON
+        ObjectMapper mapper = new ObjectMapper();
+        Map<String, String> nm = 
+          mapper.readValue(new File(localFileName), Map.class);
+
+        for(Map.Entry<String, String> ent: nm.entrySet()) {
+          TokenCache.setSecretKey(new Text(ent.getKey()), ent.getValue().getBytes());
+        }
+      } catch (JsonMappingException e) {
+        json_error = true;
+      } catch (JsonParseException e) {
+        json_error = true;
+      }
+      if(json_error)
+        LOG.warn("couldn't parse Token Cache JSON file with user secret keys");
+    }
+    
+    // add the delegation tokens from configuration
+    String [] nameNodes = conf.getStrings(JobContext.JOB_NAMENODES);
+    LOG.info("adding the following namenodes' delegation tokens:" + Arrays.toString(nameNodes));
+    if(nameNodes != null) {
+      Path [] ps = new Path[nameNodes.length];
+      for(int i=0; i< nameNodes.length; i++) {
+        ps[i] = new Path(nameNodes[i]);
+      }
+      TokenCache.obtainTokensForNamenodes(ps, conf);
+    }
+  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=904609&r1=904608&r2=904609&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
Fri Jan 29 19:50:54 2010
@@ -34,6 +34,7 @@
 import org.apache.hadoop.mapred.TaskController;
 import org.apache.hadoop.mapred.TaskController.DistributedCacheFileContext;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
 import org.apache.hadoop.fs.FileStatus;
@@ -709,6 +710,35 @@
       setFileTimestamps(job, fileTimestamps.toString());
     }
   }
+  
+  /**
+   * For each archive or cache file - get the corresponding delegation token
+   * @param job
+   * @throws IOException
+   */
+  public static void getDelegationTokens(Configuration job) throws IOException {
+    URI[] tarchives = DistributedCache.getCacheArchives(job);
+    URI[] tfiles = DistributedCache.getCacheFiles(job);
+    
+    int size = (tarchives!=null? tarchives.length : 0) + (tfiles!=null ? tfiles.length :0);
+    Path[] ps = new Path[size];
+    
+    int i = 0;
+    if (tarchives != null) {
+      for (i=0; i < tarchives.length; i++) {
+        ps[i] = new Path(tarchives[i].toString());
+      }
+    }
+    
+    if (tfiles != null) {
+      for(int j=0; j< tfiles.length; j++) {
+        ps[i+j] = new Path(tfiles[j].toString());
+      }
+    }
+    
+    TokenCache.obtainTokensForNamenodes(ps, job);
+  }
+  
   /**
    * Determines the visibilities of the distributed cache files and 
    * archives. The visibility of a cache path is "public" if the leaf component

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=904609&r1=904608&r2=904609&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
Fri Jan 29 19:50:54 2010
@@ -35,6 +35,7 @@
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
@@ -196,6 +197,9 @@
     if (dirs.length == 0) {
       throw new IOException("No input paths specified in job");
     }
+    
+    // get tokens for all the required FileSystems..
+    TokenCache.obtainTokensForNamenodes(dirs, job.getConfiguration());
 
     List<IOException> errors = new ArrayList<IOException>();
     

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java?rev=904609&r1=904608&r2=904609&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
Fri Jan 29 19:50:54 2010
@@ -35,6 +35,7 @@
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.mapreduce.security.TokenCache;
 
 /** A base class for {@link OutputFormat}s that read from {@link FileSystem}s.*/
 public abstract class FileOutputFormat<K, V> extends OutputFormat<K, V> {
@@ -127,6 +128,10 @@
     if (outDir == null) {
       throw new InvalidJobConfException("Output directory not set.");
     }
+
+    // get delegation token for outDir's file system
+    TokenCache.obtainTokensForNamenodes(new Path[] {outDir}, job.getConfiguration());
+    
     if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
       throw new FileAlreadyExistsException("Output directory " + outDir + 
                                            " already exists");

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java?rev=904609&r1=904608&r2=904609&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java
Fri Jan 29 19:50:54 2010
@@ -40,10 +40,6 @@
 public class SecureShuffleUtils {
   public static final String HTTP_HEADER_URL_HASH = "UrlHash";
   public static final String HTTP_HEADER_REPLY_URL_HASH = "ReplyHash";
-  /**
-   * file name used on HDFS for generated job token
-   */
-  public static final String JOB_TOKEN_FILENAME = "jobToken";
   
   /**
    * Base64 encoded hash of msg

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=904609&r1=904608&r2=904609&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java Fri
Jan 29 19:50:54 2010
@@ -19,20 +19,26 @@
 package org.apache.hadoop.mapreduce.security;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.Collection;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.security.token.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.classification.InterfaceAudience;
 
 
 /**
@@ -44,6 +50,18 @@
   
   private static final Log LOG = LogFactory.getLog(TokenCache.class);
   
+  /**
+   * file name used on HDFS for generated job token
+   */
+  public static final String JOB_TOKEN_HDFS_FILE = "jobToken";
+
+  /**
+   * conf setting for job tokens cache file name
+   */
+  public static final String JOB_TOKEN_FILENAME = "mapreduce.job.jobTokenFile";
+  
+  
+
   private static TokenStorage tokenStorage;
   
   /**
@@ -73,7 +91,17 @@
       String namenode, Token<? extends TokenIdentifier> t) {
     getTokenStorage().setToken(new Text(namenode), t);
   }
-  
+
+  /**
+   * 
+   * @param namenode
+   * @return delegation token
+   */
+  @SuppressWarnings("unchecked")
+  public static Token<DelegationTokenIdentifier> getDelegationToken(String namenode)
{
+    return (Token<DelegationTokenIdentifier>)getTokenStorage().getToken(new Text(namenode));
+  }
+
   /**
    * auxiliary method 
    * @return all the available tokens
@@ -113,12 +141,12 @@
    * @throws IOException
    */
   @InterfaceAudience.Private
-  public static TokenStorage loadTaskTokenStorage(JobConf conf)
+  public static TokenStorage loadTaskTokenStorage(String fileName, JobConf conf)
   throws IOException {
     if(tokenStorage != null)
       return tokenStorage;
     
-    tokenStorage = loadTokens(conf);
+    tokenStorage = loadTokens(fileName, conf);
     
     return tokenStorage;
   }
@@ -129,9 +157,8 @@
    * @throws IOException
    */
   @InterfaceAudience.Private
-  public static TokenStorage loadTokens(JobConf conf) 
+  public static TokenStorage loadTokens(String jobTokenFile, JobConf conf) 
   throws IOException {
-    String jobTokenFile = conf.get(JobContext.JOB_TOKEN_FILE);
     Path localJobTokenFile = new Path (jobTokenFile);
     FileSystem localFS = FileSystem.getLocal(conf);
     FSDataInputStream in = localFS.open(localJobTokenFile);
@@ -139,9 +166,54 @@
     TokenStorage ts = new TokenStorage();
     ts.readFields(in);
 
-    LOG.info("Task: Loaded jobTokenFile from: "+localJobTokenFile.toUri().getPath() 
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Task: Loaded jobTokenFile from: "+localJobTokenFile.toUri().getPath() 
         +"; num of sec keys  = " + ts.numberOfSecretKeys());
+    }
     in.close();
     return ts;
   }
+  
+  static String buildDTServiceName(URI uri) {
+    int port = uri.getPort();
+    if(port == -1) 
+      port = NameNode.DEFAULT_PORT;
+    
+    // build the service name string "ip:port"
+    StringBuffer sb = new StringBuffer();
+    sb.append(NetUtils.normalizeHostName(uri.getHost())).append(":").append(port);
+    return sb.toString();
+  }
+  
+  public static void obtainTokensForNamenodes(Path [] ps, Configuration conf) 
+  throws IOException {
+    // get jobtracker principal id (for the renewer)
+    Text jtCreds = new Text(conf.get(JobContext.JOB_JOBTRACKER_ID, ""));
+   
+    for(Path p: ps) {
+      FileSystem fs = FileSystem.get(p.toUri(), conf);
+      if(fs instanceof DistributedFileSystem) {
+        DistributedFileSystem dfs = (DistributedFileSystem)fs;
+        URI uri = fs.getUri();
+        String fs_addr = buildDTServiceName(uri);
+        
+        // see if we already have the token
+        Token<DelegationTokenIdentifier> token = 
+          TokenCache.getDelegationToken(fs_addr); 
+        if(token != null) {
+          LOG.debug("DT for " + token.getService()  + " is already present");
+          continue;
+        }
+        // get the token
+        token = dfs.getDelegationToken(jtCreds);
+        if(token==null) 
+          throw new IOException("Token from " + fs_addr + " is null");
+
+        token.setService(new Text(fs_addr));
+        TokenCache.addDelegationToken(fs_addr, token);
+        LOG.info("getting dt for " + p.toString() + ";uri="+ fs_addr + 
+            ";t.service="+token.getService());
+      }
+    }
+  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenStorage.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenStorage.java?rev=904609&r1=904608&r2=904609&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenStorage.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenStorage.java
Fri Jan 29 19:50:54 2010
@@ -41,7 +41,7 @@
 @InterfaceAudience.Private
 public class TokenStorage implements Writable {
 
-  private static final Text SHUFFLE_JOB_TOKEN = new Text("ShuffleJobToken");
+  private static final Text JOB_TOKEN = new Text("ShuffleJobToken");
 
   private  Map<Text, byte[]> secretKeysMap = new HashMap<Text, byte[]>();
   private  Map<Text, Token<? extends TokenIdentifier>> tokenMap = 
@@ -75,7 +75,7 @@
    */
   @InterfaceAudience.Private
   public void setJobToken(Token<? extends TokenIdentifier> t) {
-    setToken(SHUFFLE_JOB_TOKEN, t);
+    setToken(JOB_TOKEN, t);
   }
   /**
    * 
@@ -83,7 +83,7 @@
    */
   @InterfaceAudience.Private
   public Token<? extends TokenIdentifier> getJobToken() {
-    return getToken(SHUFFLE_JOB_TOKEN);
+    return getToken(JOB_TOKEN);
   }
   
   /**

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=904609&r1=904608&r2=904609&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
Fri Jan 29 19:50:54 2010
@@ -35,7 +35,7 @@
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -241,7 +241,7 @@
     if(!dir.exists())
       assertTrue("faild to create dir="+dir.getAbsolutePath(), dir.mkdirs());
     
-    File jobTokenFile = new File(dir, SecureShuffleUtils.JOB_TOKEN_FILENAME);
+    File jobTokenFile = new File(dir, TokenCache.JOB_TOKEN_HDFS_FILE);
     FileOutputStream fos = new FileOutputStream(jobTokenFile);
     java.io.DataOutputStream out = new java.io.DataOutputStream(fos);
     Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java?rev=904609&r1=904608&r2=904609&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
Fri Jan 29 19:50:54 2010
@@ -19,11 +19,14 @@
 
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.IOException;
 import java.security.NoSuchAlgorithmException;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -32,14 +35,20 @@
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.security.token.DelegationTokenIdentifier;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.AfterClass;
@@ -60,14 +69,28 @@
       // get token storage and a key
       TokenStorage ts = TokenCache.getTokenStorage();
       byte[] key1 = TokenCache.getSecretKey(new Text("alias1"));
+      Collection<Token<? extends TokenIdentifier>> dts = TokenCache.getAllTokens();
+      int dts_size = 0;
+      if(dts != null)
+        dts_size = dts.size();
       
       System.out.println("inside MAP: ts==NULL?=" + (ts==null) + 
           "; #keys = " + (ts==null? 0:ts.numberOfSecretKeys()) + 
           ";jobToken = " +  (ts==null? "n/a":ts.getJobToken()) +
-          "; alias1 key=" + new String(key1));
+          "; alias1 key=" + new String(key1) + 
+          "; dts size= " + dts_size);
     
-      if(key1 == null || ts == null || ts.numberOfSecretKeys() != NUM_OF_KEYS) {
+      for(Token<? extends TokenIdentifier> t : dts) {
+        System.out.println(t.getKind() + "=" + StringUtils.byteToHexString(t.getPassword()));
+      }
+      
+      if(dts.size() != 2) { // one job token and one delegation token
         throw new RuntimeException("tokens are not available"); // fail the test
+      }
+      
+      
+      if(key1 == null || ts == null || ts.numberOfSecretKeys() != NUM_OF_KEYS) {
+        throw new RuntimeException("secret keys are not available"); // fail the test
       } 
       super.map(key, value, context);
     }
@@ -170,6 +193,12 @@
     // make sure JT starts
     jConf = mrCluster.createJobConf();
     
+    // provide namenodes names for the job to get the delegation tokens for
+    String nnUri = dfsCluster.getURI().toString();
+    jConf.set(JobContext.JOB_NAMENODES, nnUri + "," + nnUri);
+    // job tracker principla id..
+    jConf.set(JobContext.JOB_JOBTRACKER_ID, "jt_id");
+    
     // using argument to pass the file name
     String[] args = {
        "-tokenCacheFile", tokenFileName.toString(), 
@@ -210,4 +239,38 @@
     }
     assertEquals("local job res is not 0", res, 0);
   }
-}
\ No newline at end of file
+  
+  @Test
+  public void testGetTokensForNamenodes() throws IOException {
+    FileSystem fs = dfsCluster.getFileSystem();
+    
+    Path p1 = new Path("file1");
+    Path p2 = new Path("file2");
+    
+    p1 = fs.makeQualified(p1);
+    // do not qualify p2
+    
+    TokenCache.obtainTokensForNamenodes(new Path [] {p1, p2}, jConf);
+    
+    // this token is keyed by hostname:port key.
+    String fs_addr = TokenCache.buildDTServiceName(p1.toUri());
+    Token<DelegationTokenIdentifier> nnt = TokenCache.getDelegationToken(fs_addr);
+    System.out.println("dt for " + p1 + "(" + fs_addr + ")" + " = " +  nnt);
+    assertNotNull("Token for nn is null", nnt);
+    
+    // verify the size
+    Collection<Token<? extends TokenIdentifier>> tns = TokenCache.getAllTokens();
+    assertEquals("number of tokens is not 1", 1, tns.size());
+    
+    boolean found = false;
+    for(Token<? extends TokenIdentifier> t: tns) {
+      System.out.println("kind="+t.getKind() + ";servic=" + t.getService() + ";str=" + t.toString());
+      
+      if(t.getKind().equals(new Text("HDFS_DELEGATION_TOKEN")) &&
+          t.getService().equals(new Text(fs_addr))) {
+        found = true;
+      }
+      assertTrue("didn't find token for " + p1 ,found);
+    }
+  }
+}

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java?rev=904609&r1=904608&r2=904609&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java Fri Jan 29 19:50:54
2010
@@ -70,6 +70,7 @@
 import org.apache.hadoop.mapred.SequenceFileRecordReader;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
@@ -739,7 +740,14 @@
   private static void checkSrcPath(Configuration conf, List<Path> srcPaths
       ) throws IOException {
     List<IOException> rslt = new ArrayList<IOException>();
-    List<Path> unglobbed = new LinkedList<Path>(); 
+    List<Path> unglobbed = new LinkedList<Path>();
+    
+    // get tokens for all the required FileSystems..
+    Path[] ps = new Path[srcPaths.size()];
+    ps = srcPaths.toArray(ps);
+    TokenCache.obtainTokensForNamenodes(ps, conf);
+    
+    
     for (Path p : srcPaths) {
       FileSystem fs = p.getFileSystem(conf);
       FileStatus[] inputs = fs.globStatus(p);
@@ -1218,6 +1226,11 @@
     long maxBytesPerMap = conf.getLong(BYTES_PER_MAP_LABEL, BYTES_PER_MAP);
 
     FileSystem dstfs = args.dst.getFileSystem(conf);
+    
+    // get tokens for all the required FileSystems..
+    TokenCache.obtainTokensForNamenodes(new Path[] {args.dst}, conf);
+    
+    
     boolean dstExists = dstfs.exists(args.dst);
     boolean dstIsDir = false;
     if (dstExists) {



Mime
View raw message