hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r901866 - in /hadoop/mapreduce/trunk: ./ src/contrib/mumak/src/java/org/apache/hadoop/mapred/ src/contrib/mumak/src/test/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/...
Date Thu, 21 Jan 2010 21:32:44 GMT
Author: ddas
Date: Thu Jan 21 21:32:43 2010
New Revision: 901866

URL: http://svn.apache.org/viewvc?rev=901866&view=rev
Log:
MAPREDUCE-1338. Introduces the notion of token cache using which tokens and secrets can be sent by the Job client to the JobTracker. Contributed by Boris Shkolnik.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenStorage.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenStorage.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java
    hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java
    hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java
    hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=901866&r1=901865&r2=901866&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jan 21 21:32:43 2010
@@ -22,6 +22,10 @@
     MAPREDUCE-744. Introduces the notion of a public distributed cache.
     (Devaraj Das)
 
+    MAPREDUCE-1338. Introduces the notion of token cache using which
+    tokens and secrets can be sent by the Job client to the JobTracker.
+    (Boris Shkolnik via ddas)
+
   IMPROVEMENTS
 
     MAPREDUCE-1198. Alternatively schedule different types of tasks in

Modified: hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java?rev=901866&r1=901865&r2=901866&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java Thu Jan 21 21:32:43 2010
@@ -349,6 +349,6 @@
     }
     
     SimulatorJobCache.put(org.apache.hadoop.mapred.JobID.downgrade(jobId), job);
-    return jobTracker.submitJob(jobId, "dummy-path");
+    return jobTracker.submitJob(jobId, "dummy-path", null);
   }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java?rev=901866&r1=901865&r2=901866&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java Thu Jan 21 21:32:43 2010
@@ -35,10 +35,11 @@
 import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
 import org.apache.hadoop.mapred.SimulatorJobInProgress;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.mapreduce.security.TokenStorage;
 
 /**
  * {@link SimulatorJobTracker} extends {@link JobTracker}. It implements the
- * {@link JobSubmissionProtocol} and the {@link InterTrackerProtocol} protocols.
+ * {@link InterTrackerProtocol} protocols.
  */
 @SuppressWarnings("deprecation")
 public class SimulatorJobTracker extends JobTracker {
@@ -173,7 +174,8 @@
   }
 
   @Override
-  public synchronized JobStatus submitJob(JobID jobId, String jobSubmitDir) 
+  public synchronized JobStatus submitJob(
+      JobID jobId, String jobSubmitDir, TokenStorage ts) 
   throws IOException {
     boolean loggingEnabled = LOG.isDebugEnabled();
     if (loggingEnabled) {

Modified: hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java?rev=901866&r1=901865&r2=901866&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java Thu Jan 21 21:32:43 2010
@@ -35,6 +35,7 @@
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.QueueAclsInfo;
 import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.security.TokenStorage;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskReport;
 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
@@ -77,7 +78,8 @@
   }
 
   @Override
-  public JobStatus submitJob(JobID jobId, String jobSubmitDir) throws IOException {
+  public JobStatus submitJob(
+      JobID jobId, String jobSubmitDir, TokenStorage ts) throws IOException {
     JobStatus status = new JobStatus(jobId, 0.0f, 0.0f, 0.0f, 0.0f,
         JobStatus.State.RUNNING, JobPriority.NORMAL, "", "", "", "");
     return status;

Modified: hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java?rev=901866&r1=901865&r2=901866&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java Thu Jan 21 21:32:43 2010
@@ -79,7 +79,7 @@
       FakeJobs job = new FakeJobs("job1", 0, numMaps, numReduces);
 
       SimulatorJobCache.put(org.apache.hadoop.mapred.JobID.downgrade(jobId), job);
-      jobTracker.submitJob(jobId, "dummy-path");
+      jobTracker.submitJob(jobId, "dummy-path", null);
     }
   }
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java?rev=901866&r1=901865&r2=901866&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java Thu Jan 21 21:32:43 2010
@@ -26,23 +26,22 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.mapred.JvmTask;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.TokenStorage;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.jvm.JvmMetrics;
 import org.apache.hadoop.security.token.Token;
-import org.apache.log4j.LogManager;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
 
 /** 
  * The main() for child processes. 
@@ -72,11 +71,12 @@
     JVMId jvmId = new JVMId(firstTaskid.getJobID(),
         firstTaskid.getTaskType() == TaskType.MAP,jvmIdInt);
     
-    // file name is passed thru env
+    //load token cache storage
     String jobTokenFile = System.getenv().get("JOB_TOKEN_FILE");
-    FileSystem localFs = FileSystem.getLocal(defaultConf);
-    Token<JobTokenIdentifier> jt = loadJobToken(jobTokenFile, localFs);
-    LOG.debug("Child: got jobTokenfile=" + jobTokenFile);
+    defaultConf.set(JobContext.JOB_TOKEN_FILE, jobTokenFile);
+    TokenStorage ts = TokenCache.loadTaskTokenStorage(defaultConf);
+    LOG.debug("loading token. # keys =" +ts.numberOfSecretKeys() + 
+        "; from file=" + jobTokenFile);
     
     TaskUmbilicalProtocol umbilical =
       (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
@@ -155,6 +155,8 @@
         TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
         JobConf job = new JobConf(task.getJobFile());
         
+        // set job shuffle token
+        Token<JobTokenIdentifier> jt = (Token<JobTokenIdentifier>)ts.getJobToken();
         // set the jobTokenFile into task
         task.setJobTokenSecret(JobTokenSecretManager.createSecretKey(jt.getPassword()));
         
@@ -226,22 +228,4 @@
       LogManager.shutdown();
     }
   }
-  
-  /**
-   * load job token from a file
-   * @param jobTokenFile
-   * @param conf
-   * @throws IOException
-   */
-  private static Token<JobTokenIdentifier> loadJobToken(String jobTokenFile, FileSystem localFS) 
-  throws IOException {
-    Path localJobTokenFile = new Path (jobTokenFile);
-    FSDataInputStream in = localFS.open(localJobTokenFile);
-    Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
-    jt.readFields(in);
-        
-    LOG.debug("Loaded jobTokenFile from: "+localJobTokenFile.toUri().getPath());
-    in.close();
-    return jt;
-  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=901866&r1=901865&r2=901866&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Thu Jan 21 21:32:43 2010
@@ -37,9 +37,9 @@
 import java.util.Vector;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
@@ -64,8 +64,9 @@
 import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 import org.apache.hadoop.mapreduce.split.JobSplit;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
@@ -76,10 +77,9 @@
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.fs.FSDataOutputStream;
 
 /*************************************************************
  * JobInProgress maintains all the info for keeping
@@ -146,6 +146,8 @@
   JobPriority priority = JobPriority.NORMAL;
   protected JobTracker jobtracker;
   
+  protected TokenStorage tokenStorage;
+  
   JobHistory jobHistory;
 
   // NetworkTopology Node to the set of TIPs
@@ -347,6 +349,7 @@
     if (tracker != null) { // Some mock tests have null tracker
       this.jobHistory = tracker.getJobHistory();
     }
+    this.tokenStorage = null;
   }
   
   JobInProgress() {
@@ -360,7 +363,7 @@
    */
   public JobInProgress(JobTracker jobtracker, 
                        JobConf default_conf, int rCount,
-                       JobInfo jobInfo) throws IOException {
+                       JobInfo jobInfo, TokenStorage ts) throws IOException {
     this.restartCount = rCount;
     this.jobId = JobID.downgrade(jobInfo.getJobID());
     String url = "http://" + jobtracker.getJobTrackerMachine() + ":" 
@@ -442,6 +445,7 @@
         JobContext.SPECULATIVECAP,0.1f);
     this.slowNodeThreshold = conf.getFloat(
         JobContext.SPECULATIVE_SLOWNODE_THRESHOLD,1.0f);
+    this.tokenStorage = ts;
 
   }
 
@@ -594,7 +598,7 @@
     //
     // generate security keys needed by Tasks
     //
-    generateJobToken();
+    generateAndStoreTokens();
     
     //
     // read input splits and create a map per a split
@@ -3538,20 +3542,30 @@
    * generate job token and save it into the file
    * @throws IOException
    */
-  private void generateJobToken() throws IOException{
+  private void generateAndStoreTokens() throws IOException{
     Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
     Path keysFile = new Path(jobDir, SecureShuffleUtils.JOB_TOKEN_FILENAME);
     // we need to create this file using the jobtracker's filesystem
     FSDataOutputStream os = jobtracker.getFileSystem().create(keysFile);
+    
     //create JobToken file and write token to it
     JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(jobId
         .toString()));
     Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(identifier,
         jobtracker.getJobTokenSecretManager());
     token.setService(identifier.getJobId());
-    token.write(os);
+    
+    // add this token to the tokenStorage
+    if(tokenStorage == null)
+      tokenStorage = new TokenStorage();
+    
+    tokenStorage.setJobToken(token);
+    
+    // write TokenStorage out
+    tokenStorage.write(os);
     os.close();
-    LOG.debug("jobToken generated and stored in "+ keysFile.toUri().getPath());
+    LOG.info("jobToken generated and stored with users keys in "
+        + keysFile.toUri().getPath());
   }
 
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=901866&r1=901865&r2=901866&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Thu Jan 21 21:32:43 2010
@@ -22,11 +22,9 @@
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.io.UnsupportedEncodingException;
 import java.io.Writer;
 import java.net.BindException;
 import java.net.InetSocketAddress;
-import java.net.URLEncoder;
 import java.net.UnknownHostException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
@@ -77,10 +75,14 @@
 import org.apache.hadoop.mapreduce.QueueInfo;
 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.security.TokenStorage;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.mapreduce.util.ConfigUtil;
+import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
@@ -102,9 +104,6 @@
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
-import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
-import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
-import org.apache.hadoop.mapreduce.util.ConfigUtil;
 
 /*******************************************************
  * JobTracker is the central location for submitting and 
@@ -1137,7 +1136,7 @@
               token.getUser().toString(), 
               new String[]{UnixUserGroupInformation.DEFAULT_GROUP});
           submitJob(token.getJobID(), restartCount, 
-              ugi, token.getJobSubmitDir().toString(), true);
+              ugi, token.getJobSubmitDir().toString(), true, null);
           recovered++;
         } catch (Exception e) {
           LOG.warn("Could not recover job " + jobId, e);
@@ -2911,8 +2910,9 @@
    * the JobTracker alone.
    */
   public synchronized org.apache.hadoop.mapreduce.JobStatus submitJob(
-    org.apache.hadoop.mapreduce.JobID jobId, String jobSubmitDir) throws IOException {
-    return submitJob(JobID.downgrade(jobId), jobSubmitDir);
+    org.apache.hadoop.mapreduce.JobID jobId,String jobSubmitDir, TokenStorage ts) 
+  throws IOException {  
+    return submitJob(JobID.downgrade(jobId), jobSubmitDir, ts);
   }
   
   /**
@@ -2923,14 +2923,16 @@
    * of the JobTracker.  But JobInProgress adds info that's useful for
    * the JobTracker alone.
    * @deprecated Use 
-   * {@link #submitJob(org.apache.hadoop.mapreduce.JobID, String)} instead
+   * {@link #submitJob(org.apache.hadoop.mapreduce.JobID, String, TokenStorage)}
+   *  instead
    */
   @Deprecated
-  public synchronized JobStatus submitJob(JobID jobId, String jobSubmitDir) 
+  public synchronized JobStatus submitJob(
+      JobID jobId, String jobSubmitDir, TokenStorage ts) 
   throws IOException {
     return submitJob(jobId, 0, 
         UserGroupInformation.getCurrentUGI(), 
-        jobSubmitDir, false);
+        jobSubmitDir, false, ts);
   }
 
   /**
@@ -2938,7 +2940,7 @@
    */
   private synchronized JobStatus submitJob(org.apache.hadoop.mapreduce.JobID jobID, 
       int restartCount, UserGroupInformation ugi, String jobSubmitDir, 
-      boolean recovered) throws IOException {
+      boolean recovered, TokenStorage ts) throws IOException {
     JobID jobId = JobID.downgrade(jobID);
     if(jobs.containsKey(jobId)) {
       //job already running, don't start twice
@@ -2950,7 +2952,8 @@
     //Text.
     JobInfo jobInfo = new JobInfo(jobId, new Text(ugi.getUserName()), 
         new Path(jobSubmitDir));
-    JobInProgress job = new JobInProgress(this, this.conf, restartCount, jobInfo);
+    JobInProgress job = 
+      new JobInProgress(this, this.conf, restartCount, jobInfo, ts);
     
     String queue = job.getProfile().getQueueName();
     if(!(queueManager.getLeafQueueNames().contains(queue))) {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=901866&r1=901865&r2=901866&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Thu Jan 21 21:32:43 2010
@@ -29,21 +29,22 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.QueueInfo;
 import org.apache.hadoop.mapreduce.TaskCompletionEvent;
 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
-import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
-import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.State;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@@ -60,7 +61,7 @@
   private int map_tasks = 0;
   private int reduce_tasks = 0;
   final Random rand = new Random();
-
+  
   private JobTrackerInstrumentation myMetrics = null;
 
   private static final String jobDir =  "localRunner/";
@@ -415,8 +416,9 @@
   }
 
   public org.apache.hadoop.mapreduce.JobStatus submitJob(
-      org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir) 
+      org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir, TokenStorage ts) 
       throws IOException {
+    TokenCache.setTokenStorage(ts);
     return new Job(JobID.downgrade(jobid), jobSubmitDir).status;
   }
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=901866&r1=901865&r2=901866&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Jan 21 21:32:43 2010
@@ -76,6 +76,8 @@
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.TokenStorage;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
@@ -916,10 +918,8 @@
         rjob.jobConf = localJobConf;
         rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
                              localJobConf.getKeepFailedTaskFiles());
-        FSDataInputStream in = localFs.open(new Path(
-            rjob.jobConf.get(JobContext.JOB_TOKEN_FILE)));
-        Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
-        jt.readFields(in); 
+        TokenStorage ts = TokenCache.loadTokens(rjob.jobConf);
+        Token<JobTokenIdentifier> jt = (Token<JobTokenIdentifier>)ts.getJobToken(); 
         getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
         rjob.localized = true;
       }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=901866&r1=901865&r2=901866&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java Thu Jan 21 21:32:43 2010
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.mapreduce;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.URI;
@@ -25,6 +26,7 @@
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,8 +40,11 @@
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.security.TokenStorage;
 import org.apache.hadoop.mapreduce.split.JobSplitWriter;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.codehaus.jackson.map.ObjectMapper;
 
 class JobSubmitter {
   protected static final Log LOG = LogFactory.getLog(JobSubmitter.class);
@@ -296,6 +301,7 @@
    * @throws InterruptedException
    * @throws IOException
    */
+  @SuppressWarnings("unchecked")
   JobStatus submitJobInternal(Job job, Cluster cluster) 
   throws ClassNotFoundException, InterruptedException, IOException {
     /*
@@ -319,6 +325,24 @@
       Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
 
       checkSpecs(job);
+      
+      // create TokenStorage object with user secretKeys
+      String tokensFileName = conf.get("tokenCacheFile");
+      TokenStorage tokenStorage = null;
+      if(tokensFileName != null) {
+        LOG.info("loading secret keys from " + tokensFileName);
+        String localFileName = new Path(tokensFileName).toUri().getPath();
+        tokenStorage = new TokenStorage();
+        // read JSON
+        ObjectMapper mapper = new ObjectMapper();
+        Map<String, String> nm = 
+          mapper.readValue(new File(localFileName), Map.class);
+        
+        for(Map.Entry<String, String> ent: nm.entrySet()) {
+          LOG.debug("adding secret key alias="+ent.getKey());
+          tokenStorage.addSecretKey(new Text(ent.getKey()), ent.getValue().getBytes());
+        }
+      }
 
       // Create the splits for the job
       LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
@@ -331,7 +355,7 @@
       //
       // Now, actually submit the job (using the submit name)
       //
-      status = submitClient.submitJob(jobId, submitJobDir.toString());
+      status = submitClient.submitJob(jobId, submitJobDir.toString(), tokenStorage);
       if (status != null) {
         return status;
       } else {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java?rev=901866&r1=901865&r2=901866&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java Thu Jan 21 21:32:43 2010
@@ -32,6 +32,7 @@
 import org.apache.hadoop.mapreduce.TaskReport;
 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.TokenStorage;
 import org.apache.hadoop.mapreduce.server.jobtracker.State;
 
 /** 
@@ -88,9 +89,10 @@
    *             to ClusterMetrics as part of MAPREDUCE-1048.
    * Version 30: Job submission files are uploaded to a staging area under
    *             user home dir. JobTracker reads the required files from the
-   *             staging area using user credentials passed via the rpc.          
+   *             staging area using user credentials passed via the rpc.
+   * Version 31: Added TokenStorage to submitJob          
    */
-  public static final long versionID = 30L;
+  public static final long versionID = 31L;
 
   /**
    * Allocate a name for the job.
@@ -103,7 +105,7 @@
    * Submit a Job for execution.  Returns the latest profile for
    * that job.
    */
-  public JobStatus submitJob(JobID jobId, String jobSubmitDir) 
+  public JobStatus submitJob(JobID jobId, String jobSubmitDir, TokenStorage ts) 
   throws IOException, InterruptedException;
 
   /**

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=901866&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java Thu Jan 21 21:32:43 2010
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+
+/**
+ * this class keeps static references to TokenStorage object
+ * also it provides auxiliary methods for setting and getting secret keys  
+ */
+@InterfaceStability.Evolving
+public class TokenCache {
+  
+  private static final Log LOG = LogFactory.getLog(TokenCache.class);
+  
+  private static TokenStorage tokenStorage;
+  
+  /**
+   * auxiliary method to get user's secret keys..
+   * @param alias
+   * @return secret key from the storage
+   */
+  public static byte[] getSecretKey(Text alias) {
+    if(tokenStorage == null)
+      return null;
+    return tokenStorage.getSecretKey(alias);
+  }
+  
+  /**
+   * auxiliary methods to store user'  s secret keys
+   * @param alias
+   * @param key
+   */
+  public static void setSecretKey(Text alias, byte[] key) {
+    getTokenStorage().addSecretKey(alias, key);
+  }
+  
+  /**
+   * auxiliary method to add a delegation token
+   */
+  public static void addDelegationToken(
+      String namenode, Token<? extends TokenIdentifier> t) {
+    getTokenStorage().setToken(new Text(namenode), t);
+  }
+  
+  /**
+   * auxiliary method 
+   * @return all the available tokens
+   */
+  public static Collection<Token<? extends TokenIdentifier>> getAllTokens() {
+    return getTokenStorage().getAllTokens();
+  }
+  
+
+  /**
+   * @return TokenStore object
+   */
+  @InterfaceAudience.Private
+  public static TokenStorage getTokenStorage() {
+    if(tokenStorage==null)
+      tokenStorage = new TokenStorage();
+    
+    return tokenStorage;
+  }
+  
+  /**
+   * sets TokenStorage
+   * @param ts
+   */
+  @InterfaceAudience.Private
+  public static void setTokenStorage(TokenStorage ts) {
+    if(tokenStorage != null)
+      LOG.warn("Overwriting existing token storage with # keys=" + 
+          tokenStorage.numberOfSecretKeys());
+    tokenStorage = ts;
+  }
+  
+  /**
+   * load token storage and stores it
+   * @param conf
+   * @return Loaded TokenStorage object
+   * @throws IOException
+   */
+  @InterfaceAudience.Private
+  public static TokenStorage loadTaskTokenStorage(JobConf conf)
+  throws IOException {
+    if(tokenStorage != null)
+      return tokenStorage;
+    
+    tokenStorage = loadTokens(conf);
+    
+    return tokenStorage;
+  }
+  
+  /**
+   * load job token from a file
+   * @param conf
+   * @throws IOException
+   */
+  @InterfaceAudience.Private
+  public static TokenStorage loadTokens(JobConf conf) 
+  throws IOException {
+    String jobTokenFile = conf.get(JobContext.JOB_TOKEN_FILE);
+    Path localJobTokenFile = new Path (jobTokenFile);
+    FileSystem localFS = FileSystem.getLocal(conf);
+    FSDataInputStream in = localFS.open(localJobTokenFile);
+    
+    TokenStorage ts = new TokenStorage();
+    ts.readFields(in);
+
+    LOG.info("Task: Loaded jobTokenFile from: "+localJobTokenFile.toUri().getPath() 
+        +"; num of sec keys  = " + ts.numberOfSecretKeys());
+    in.close();
+    return ts;
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenStorage.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenStorage.java?rev=901866&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenStorage.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenStorage.java Thu Jan 21 21:32:43 2010
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * get/set, store/load security keys
+ * key's value - byte[]
+ * store/load from DataInput/DataOuptut
+ *
+ */
+@InterfaceAudience.Private
+public class TokenStorage implements Writable {
+
+  private static final Text SHUFFLE_JOB_TOKEN = new Text("ShuffleJobToken");
+
+  private  Map<Text, byte[]> secretKeysMap = new HashMap<Text, byte[]>();
+  private  Map<Text, Token<? extends TokenIdentifier>> tokenMap = 
+    new HashMap<Text, Token<? extends TokenIdentifier>>(); 
+
+  /**
+   * returns the key value for the alias
+   * @param alias
+   * @return key for this alias
+   */
+  byte[] getSecretKey(Text alias) {
+    return secretKeysMap.get(alias);
+  }
+  
+  /**
+   * returns the key value for the alias
+   * @param alias
+   * @return token for this alias
+   */
+  Token<? extends TokenIdentifier> getToken(Text alias) {
+    return tokenMap.get(alias);
+  }
+  
+  void setToken(Text alias, Token<? extends TokenIdentifier> t) {
+    tokenMap.put(alias, t);
+  }
+  
+  /**
+   * store job token
+   * @param t
+   */
+  @InterfaceAudience.Private
+  public void setJobToken(Token<? extends TokenIdentifier> t) {
+    setToken(SHUFFLE_JOB_TOKEN, t);
+  }
+  /**
+   * 
+   * @return job token
+   */
+  @InterfaceAudience.Private
+  public Token<? extends TokenIdentifier> getJobToken() {
+    return getToken(SHUFFLE_JOB_TOKEN);
+  }
+  
+  /**
+   * 
+   * @return all the tokens in the storage
+   */
+  Collection<Token<? extends TokenIdentifier>> getAllTokens() {
+    return tokenMap.values();
+  }
+  
+
+  
+  /**
+   * 
+   * @return number of keys
+   */
+  public int numberOfSecretKeys() {
+    return secretKeysMap.size();
+  }
+  
+  
+  /**
+   * set the key for an alias
+   * @param alias
+   * @param key
+   */
+  public void addSecretKey(Text alias, byte[] key) {
+    secretKeysMap.put(alias, key);
+  }
+ 
+  /**
+   * stores all the keys to DataOutput
+   * @param out
+   * @throws IOException
+   */
+  @Override
+  public void write(DataOutput out) throws IOException {
+    // write out tokens first
+    System.out.println("about to write out: token = " + tokenMap.size() + 
+        "; sec = " + secretKeysMap.size());
+    WritableUtils.writeVInt(out, tokenMap.size());
+    for(Map.Entry<Text, Token<? extends TokenIdentifier>> e: tokenMap.entrySet()) {
+      e.getKey().write(out);
+      e.getValue().write(out);
+    }
+    
+    // now write out secret keys
+    WritableUtils.writeVInt(out, secretKeysMap.size());
+    for(Map.Entry<Text, byte[]> e : secretKeysMap.entrySet()) {
+      e.getKey().write(out);
+      WritableUtils.writeCompressedByteArray(out, e.getValue());  
+    }
+  }
+  
+  /**
+   * loads all the keys
+   * @param in
+   * @throws IOException
+   */
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    secretKeysMap.clear();
+    tokenMap.clear();
+    
+    int size = WritableUtils.readVInt(in);
+    for(int i=0; i<size; i++) {
+      Text alias = new Text();
+      alias.readFields(in);
+      Token<? extends TokenIdentifier> t = new Token<TokenIdentifier>();
+      t.readFields(in);
+      tokenMap.put(alias, t);
+    }
+    
+    size = WritableUtils.readVInt(in);
+    for(int i=0; i<size; i++) {
+      Text alias = new Text();
+      alias.readFields(in);
+      byte[] key = WritableUtils.readCompressedByteArray(in);
+      secretKeysMap.put(alias, key);
+    }
+  }
+}

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java?rev=901866&r1=901865&r2=901866&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java Thu Jan 21 21:32:43 2010
@@ -79,7 +79,7 @@
     jobSubmitDir = jobSubmitDir.makeQualified(fs);
     uploadJobFiles(JobID.downgrade(id), splits, jobSubmitDir, job);
     
-    jobSubmitClient.submitJob(id, jobSubmitDir.toString());
+    jobSubmitClient.submitJob(id, jobSubmitDir.toString(), null);
     
     JobClient jc = new JobClient(job);
     return jc.getJob(JobID.downgrade(id));

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java?rev=901866&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java Thu Jan 21 21:32:43 2010
@@ -0,0 +1,213 @@
+/** Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.crypto.KeyGenerator;
+import javax.crypto.spec.SecretKeySpec;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.util.ToolRunner;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestTokenCache {
+  private static final int NUM_OF_KEYS = 10;
+
+  // my sleep class - adds check for tokenCache
+  static class MySleepMapper extends SleepJob.SleepMapper {
+    /**
+     * attempts to access tokenCache as from client
+     */
+    @Override
+    public void map(IntWritable key, IntWritable value, Context context)
+    throws IOException, InterruptedException {
+      // get token storage and a key
+      TokenStorage ts = TokenCache.getTokenStorage();
+      byte[] key1 = TokenCache.getSecretKey(new Text("alias1"));
+      
+      System.out.println("inside MAP: ts==NULL?=" + (ts==null) + 
+          "; #keys = " + (ts==null? 0:ts.numberOfSecretKeys()) + 
+          ";jobToken = " +  (ts==null? "n/a":ts.getJobToken()) +
+          "; alias1 key=" + new String(key1));
+    
+      if(key1 == null || ts == null || ts.numberOfSecretKeys() != NUM_OF_KEYS) {
+        throw new RuntimeException("tokens are not available"); // fail the test
+      } 
+      super.map(key, value, context);
+    }
+  }
+  
+  class MySleepJob extends SleepJob {
+    @Override
+    public Job createJob(int numMapper, int numReducer, 
+        long mapSleepTime, int mapSleepCount, 
+        long reduceSleepTime, int reduceSleepCount) 
+    throws IOException {
+      Job job =  super.createJob(numMapper, numReducer,
+           mapSleepTime, mapSleepCount, 
+          reduceSleepTime, reduceSleepCount);
+      
+      job.setMapperClass(MySleepMapper.class);
+      return job;
+    }
+  }
+  
+  private static MiniMRCluster mrCluster;
+  private static MiniDFSCluster dfsCluster;
+  private static final Path TEST_DIR = 
+    new Path(System.getProperty("test.build.data","/tmp"), "sleepTest");
+  private static final Path tokenFileName = new Path(TEST_DIR, "tokenFile.json");
+  private static int numSlaves = 1;
+  private static JobConf jConf;
+  private static ObjectMapper mapper = new ObjectMapper();
+  
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    Configuration conf = new Configuration();
+    dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null);
+    jConf = new JobConf(conf);
+    mrCluster = new MiniMRCluster(0, 0, numSlaves, 
+        dfsCluster.getFileSystem().getUri().toString(), 1, null, null, null, 
+        jConf);
+    
+    createTokenFileJson();
+    verifySecretKeysInJSONFile();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if(mrCluster != null)
+      mrCluster.shutdown();
+    mrCluster = null;
+    if(dfsCluster != null)
+      dfsCluster.shutdown();
+    dfsCluster = null;
+  }
+
+  // create jason file and put some keys into it..
+  private static void createTokenFileJson() throws IOException {
+    Map<String, String> map = new HashMap<String, String>();
+    
+    try {
+      KeyGenerator kg = KeyGenerator.getInstance("HmacSHA1");
+      for(int i=0; i<NUM_OF_KEYS; i++) {
+        SecretKeySpec key = (SecretKeySpec) kg.generateKey();
+        byte [] enc_key = key.getEncoded();
+        map.put("alias"+i, new String(Base64.encodeBase64(enc_key)));
+
+      }
+    } catch (NoSuchAlgorithmException e) {
+      throw new IOException(e);
+    }
+    
+    System.out.println("writing secret keys into " + tokenFileName);
+    try {
+      File p  = new File(tokenFileName.getParent().toString());
+      p.mkdirs();
+      // convert to JSON and save to the file
+      mapper.writeValue(new File(tokenFileName.toString()), map);
+
+    } catch (Exception e) {
+      System.out.println("failed with :" + e.getLocalizedMessage());
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  private static void verifySecretKeysInJSONFile() throws IOException {
+    Map<String, String> map;
+    map = mapper.readValue(new File(tokenFileName.toString()), Map.class);
+    assertEquals("didn't read JSON correctly", map.size(), NUM_OF_KEYS);
+    
+    System.out.println("file " + tokenFileName + " verified; size="+ map.size());
+  }
+  
+  /**
+   * run a distributed job and verify that TokenCache is available
+   * @throws IOException
+   */
+  @Test
+  public void testTokenCache() throws IOException {
+    
+    System.out.println("running dist job");
+    
+    // make sure JT starts
+    jConf = mrCluster.createJobConf();
+    
+    // using argument to pass the file name
+    String[] args = {
+       "-tokenCacheFile", tokenFileName.toString(), 
+        "-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
+        };
+     
+    int res = -1;
+    try {
+      res = ToolRunner.run(jConf, new MySleepJob(), args);
+    } catch (Exception e) {
+      System.out.println("Job failed with" + e.getLocalizedMessage());
+      e.printStackTrace(System.out);
+      fail("Job failed");
+    }
+    assertEquals("dist job res is not 0", res, 0);
+  }
+  
+  /**
+   * run a local job and verify that TokenCache is available
+   * @throws NoSuchAlgorithmException
+   * @throws IOException
+   */
+  @Test
+  public void testLocalJobTokenCache() throws NoSuchAlgorithmException, IOException {
+    
+    System.out.println("running local job");
+    // this is local job
+    String[] args = {"-m", "1", "-r", "1", "-mt", "1", "-rt", "1"}; 
+    jConf.set("tokenCacheFile", tokenFileName.toString());
+    
+    int res = -1;
+    try {
+      res = ToolRunner.run(jConf, new MySleepJob(), args);
+    } catch (Exception e) {
+      System.out.println("Job failed with" + e.getLocalizedMessage());
+      e.printStackTrace(System.out);
+      fail("local Job failed");
+    }
+    assertEquals("local job res is not 0", res, 0);
+  }
+}
\ No newline at end of file

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenStorage.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenStorage.java?rev=901866&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenStorage.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenStorage.java Thu Jan 21 21:32:43 2010
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.mapreduce.security;
+
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.security.Key;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.crypto.KeyGenerator;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.record.Utils;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestTokenStorage {
+  private static final String DEFAULT_HMAC_ALGORITHM = "HmacSHA1";
+  private static final File tmpDir =
+    new File(System.getProperty("test.build.data", "/tmp"), "mapred");  
+    
+  @Before
+  public void setUp() {
+    tmpDir.mkdir();
+  }
+
+  @Test 
+  public void testReadWriteStorage() throws IOException, NoSuchAlgorithmException{
+    // create tokenStorage Object
+    TokenStorage ts = new TokenStorage();
+    
+    // create a token
+    JobTokenSecretManager jtSecretManager = new JobTokenSecretManager();
+    JobTokenIdentifier identifier = new JobTokenIdentifier(new Text("fakeJobId"));
+    Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>(identifier,
+        jtSecretManager);
+    // store it
+    ts.setJobToken(jt);
+    
+    // create keys and put it in
+    final KeyGenerator kg = KeyGenerator.getInstance(DEFAULT_HMAC_ALGORITHM);
+    String alias = "alias";
+    Map<Text, byte[]> m = new HashMap<Text, byte[]>(10);
+    for(int i=0; i<10; i++) {
+      Key key = kg.generateKey();
+      m.put(new Text(alias+i), key.getEncoded());
+      ts.addSecretKey(new Text(alias+i), key.getEncoded());
+    }
+   
+    // create file to store
+    File tmpFileName = new File(tmpDir, "tokenStorageTest");
+    DataOutputStream dos = new DataOutputStream(new FileOutputStream(tmpFileName));
+    ts.write(dos);
+    dos.close();
+    
+    // open and read it back
+    DataInputStream dis = new DataInputStream(new FileInputStream(tmpFileName));    
+    ts = new TokenStorage();
+    ts.readFields(dis);
+    dis.close();
+    
+    // get the token and compare the passwords
+    byte[] tp1 = ts.getJobToken().getPassword();
+    byte[] tp2 = jt.getPassword();
+    int comp = Utils.compareBytes(tp1, 0, tp1.length, tp2, 0, tp2.length);
+    assertTrue("shuffleToken doesn't match", comp==0);
+    
+    // compare secret keys
+    int mapLen = m.size();
+    assertEquals("wrong number of keys in the Storage", mapLen, ts.numberOfSecretKeys());
+    for(Text a : m.keySet()) {
+      byte [] kTS = ts.getSecretKey(a);
+      byte [] kLocal = m.get(a);
+      assertTrue("keys don't match for " + a, 
+          Utils.compareBytes(kTS, 0, kTS.length, kLocal, 0, kLocal.length)==0);
+    }  
+    
+    assertEquals("All tokens should return collection of size 1", 
+        ts.getAllTokens().size(), 1);
+  }
+ }



Mime
View raw message