hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r943358 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/hadoop/mapreduce/security/token/ src/java/org/apache/hadoop/mapreduce/task/ src/test/mapred/org/apache/had...
Date Wed, 12 May 2010 02:59:55 GMT
Author: ddas
Date: Wed May 12 02:59:54 2010
New Revision: 943358

URL: http://svn.apache.org/viewvc?rev=943358&view=rev
Log:
MAPREDUCE-1532. Ensures that delegation tokens is obtained as the actual user when the proxy-user
is used for submitting jobs. Also refactors the DelegationTokenToRenew class. Contributed
by Devaraj Das.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=943358&r1=943357&r2=943358&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed May 12 02:59:54 2010
@@ -23,6 +23,10 @@ Trunk (unreleased changes)
 
     MAPREDUCE-1707. TaskRunner can get NPE in getting ugi from TaskTracker.
     (Vinod Kumar Vavilapalli)
+    
+    MAPREDUCE-1532. Ensures that delegation tokens is obtained as the
+    actual user when the proxy-user is used for submitting jobs. Also
+    refactors the DelegationTokenToRenew class. (ddas)
 
 Release 0.21.0 - Unreleased
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=943358&r1=943357&r2=943358&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed May 12
02:59:54 2010
@@ -407,8 +407,10 @@ public class JobInProgress {
       this.conf.setUser(user);
     }
     if (!conf.getUser().equals(user)) {
-      throw new IOException("The username obtained from the conf doesn't " +
-      		"match the username the user authenticated as");
+      String desc = "The username " + conf.getUser() + " obtained from the " +
+                     "conf doesn't match the username " + user + " the user " +
+                                     "authenticated as";
+      throw new IOException(desc);
     }
     this.priority = conf.getJobPriority();
     this.profile = new JobProfile(conf.getUser(), this.jobId, 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=943358&r1=943357&r2=943358&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Wed May 12 02:59:54
2010
@@ -79,6 +79,7 @@ import org.apache.hadoop.mapreduce.TaskT
 import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenSecretManager;
@@ -1769,7 +1770,7 @@ public class JobTracker implements MRCon
     if (jobHistory != null) {
       jobHistory.shutDown();
     }
-    
+    DelegationTokenRenewal.close();
     LOG.info("stopped all jobtracker services");
     return;
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java?rev=943358&r1=943357&r2=943358&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java Wed May 12 02:59:54
2010
@@ -127,8 +127,16 @@ public class Cluster {
   public synchronized FileSystem getFileSystem() 
       throws IOException, InterruptedException {
     if (this.fs == null) {
-      final Path sysDir = new Path(client.getSystemDir());
-      this.fs = sysDir.getFileSystem(getConf());
+      try {
+        this.fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+          public FileSystem run() throws IOException, InterruptedException {
+            final Path sysDir = new Path(client.getSystemDir());
+            return sysDir.getFileSystem(getConf());
+          }
+        });
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
     }
     return fs;
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java?rev=943358&r1=943357&r2=943358&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java Wed May 12 02:59:54
2010
@@ -29,6 +29,7 @@ import java.io.OutputStreamWriter;
 import java.net.URL;
 import java.net.URLConnection;
 import java.net.URI;
+import java.security.PrivilegedExceptionAction;
 
 import javax.security.auth.login.LoginException;
 
@@ -958,8 +959,14 @@ public class Job extends JobContextImpl 
                               ClassNotFoundException {
     ensureState(JobState.DEFINE);
     setUseNewAPI();
-    status = new JobSubmitter(cluster.getFileSystem(),
-      cluster.getClient()).submitJobInternal(this, cluster);
+    final JobSubmitter submitter = new JobSubmitter(cluster.getFileSystem(),
+        cluster.getClient());
+    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
+      public JobStatus run() throws IOException, InterruptedException, 
+      ClassNotFoundException {
+        return submitter.submitJobInternal(Job.this, cluster);
+      }
+    });
     state = JobState.RUNNING;
    }
   

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java?rev=943358&r1=943357&r2=943358&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
Wed May 12 02:59:54 2010
@@ -18,16 +18,15 @@
 
 package org.apache.hadoop.mapreduce.security.token;
 
+import java.io.IOException;
 import java.net.URI;
-import java.security.AccessControlException;
+import org.apache.hadoop.security.AccessControlException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
 
@@ -81,6 +80,20 @@ public class DelegationTokenRenewal {
     public String toString() {
       return token + ";exp="+expirationDate;
     }
+    @Override
+    public boolean equals (Object obj) {
+      if (obj == this) {
+        return true;
+      } else if (obj == null || getClass() != obj.getClass()) {
+        return false;
+      } else {
+        return token.equals(((DelegationTokenToRenew)obj).token);
+      }
+    }
+    @Override
+    public int hashCode() {
+      return token.hashCode();
+    }
   }
   
   // global single timer (daemon)
@@ -88,19 +101,14 @@ public class DelegationTokenRenewal {
   
   //managing the list of tokens using Map
   // jobId=>List<tokens>
-  private static Map<JobID, List<DelegationTokenToRenew>> delegationTokens =

-    Collections.synchronizedMap(new HashMap<JobID, 
-                                       List<DelegationTokenToRenew>>());
+  private static List<DelegationTokenToRenew> delegationTokens = 
+    Collections.synchronizedList(new ArrayList<DelegationTokenToRenew>());
   //adding token
-  private static void addTokenToMap(DelegationTokenToRenew t) {
-    // see if a list already exists
-    JobID jobId = t.jobId;
-    List<DelegationTokenToRenew> l = delegationTokens.get(jobId);
-    if(l==null) {
-      l = new ArrayList<DelegationTokenToRenew>();
-      delegationTokens.put(jobId, l);
-    }
-    l.add(t);
+  private static void addTokenToList(DelegationTokenToRenew t) {
+    //check to see if the token already exists in the list
+    if (delegationTokens.contains(t))
+      return;
+    delegationTokens.add(t);
   }
   
   // kind of tokens we currently renew
@@ -129,7 +137,7 @@ public class DelegationTokenRenewal {
       DelegationTokenToRenew dtr = 
         new DelegationTokenToRenew(jobId, dt, conf, now); 
 
-      addTokenToMap(dtr);
+      addTokenToList(dtr);
       
       setTimerForTokenRenewal(dtr, true);
       LOG.info("registering token for renewal for service =" + dt.getService()+
@@ -148,15 +156,14 @@ public class DelegationTokenRenewal {
         DistributedFileSystem dfs = getDFSForToken(token, conf);
         newExpirationDate = dfs.renewDelegationToken(token);
       } catch (InvalidToken ite) {
-        LOG.warn("token canceled - not scheduling for renew");
-        removeFailedDelegationToken(dttr);
-        throw new Exception("failed to renew token", ite);
-      } catch (AccessControlException ace) {
-        LOG.warn("token canceled - not scheduling for renew");
+        LOG.warn("invalid token - not scheduling for renew");
         removeFailedDelegationToken(dttr);
-        throw new Exception("failed to renew token", ace);
-      } catch (Exception ioe) {
+        throw new IOException("failed to renew token", ite);
+      } catch (AccessControlException ioe) {
         LOG.warn("failed to renew token:"+token, ioe);
+        removeFailedDelegationToken(dttr);
+      } catch (Exception e) {
+        LOG.warn("failed to renew token:"+token, e);
         // returns default expiration date
       }
     } else {
@@ -267,26 +274,13 @@ public class DelegationTokenRenewal {
    */
   private static void removeFailedDelegationToken(DelegationTokenToRenew t) {
     JobID jobId = t.jobId;
-    List<DelegationTokenToRenew> l = delegationTokens.get(jobId);
-    if(l==null) return;
-
-    Iterator<DelegationTokenToRenew> it = l.iterator();
-    while(it.hasNext()) {
-      DelegationTokenToRenew dttr = it.next();
-      if(dttr == t) {
-        if (LOG.isDebugEnabled())
-          LOG.debug("removing failed delegation token for jobid=" + jobId + 
-            ";t=" + dttr.token.getService());
-
-        // cancel the timer
-        if(dttr.timerTask!=null)
-          dttr.timerTask.cancel();
-
-        // no need to cancel the token - it is invalid
-        it.remove();
-        break; //should be only one
-      }
-    }
+    if (LOG.isDebugEnabled())
+      LOG.debug("removing failed delegation token for jobid=" + jobId + 
+          ";t=" + t.token.getService());
+    delegationTokens.remove(t);
+    // cancel the timer
+    if(t.timerTask!=null)
+      t.timerTask.cancel();
   }
   
   /**
@@ -294,24 +288,25 @@ public class DelegationTokenRenewal {
    * @param jobId
    */
   public static void removeDelegationTokenRenewalForJob(JobID jobId) {
-    List<DelegationTokenToRenew> l = delegationTokens.remove(jobId);
-    if(l==null) return;
+    synchronized (delegationTokens) {
+      Iterator<DelegationTokenToRenew> it = delegationTokens.iterator();
+      while(it.hasNext()) {
+        DelegationTokenToRenew dttr = it.next();
+        if (dttr.jobId.equals(jobId)) {
+          if (LOG.isDebugEnabled())
+            LOG.debug("removing delegation token for jobid=" + jobId + 
+                ";t=" + dttr.token.getService());
+
+          // cancel the timer
+          if(dttr.timerTask!=null)
+            dttr.timerTask.cancel();
 
-    Iterator<DelegationTokenToRenew> it = l.iterator();
-    while(it.hasNext()) {
-      DelegationTokenToRenew dttr = it.next();
-      if (LOG.isDebugEnabled())
-        LOG.debug("removing delegation token for jobid=" + jobId + 
-          ";t=" + dttr.token.getService());
-
-      // cancel the timer
-      if(dttr.timerTask!=null)
-        dttr.timerTask.cancel();
+          // cancel the token
+          cancelToken(dttr);
 
-      // cancel the token
-      cancelToken(dttr);
-
-      it.remove();
+          it.remove();
+        }
+      }
     }
   }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java?rev=943358&r1=943357&r2=943358&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java Wed
May 12 02:59:54 2010
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.filec
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
+import org.apache.hadoop.security.UserGroupInformation;
 
 /**
  * A read-only view of the job that is provided to the tasks while they
@@ -46,10 +47,19 @@ public class JobContextImpl implements J
 
   protected final org.apache.hadoop.mapred.JobConf conf;
   private final JobID jobId;
+  /**
+   * The UserGroupInformation object that has a reference to the current user
+   */
+  protected UserGroupInformation ugi;
   
   public JobContextImpl(Configuration conf, JobID jobId) {
     this.conf = new org.apache.hadoop.mapred.JobConf(conf);
     this.jobId = jobId;
+    try {
+      this.ugi = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   /**

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java?rev=943358&r1=943357&r2=943358&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
Wed May 12 02:59:54 2010
@@ -251,7 +251,7 @@ public class TestDelegationTokenRenewal 
     // first 3 initial renewals + 1 real
     int numberOfExpectedRenewals = 3+1; 
     
-    int attempts = 4;
+    int attempts = 10;
     while(attempts-- > 0) {
       try {
         Thread.sleep(3*1000); // sleep 3 seconds, so it has time to renew
@@ -287,16 +287,10 @@ public class TestDelegationTokenRenewal 
     JobID jid2 = new JobID("job2",1);
     DelegationTokenRenewal.registerDelegationTokensForRenewal(jid2, ts, conf);
     DelegationTokenRenewal.removeDelegationTokenRenewalForJob(jid2);
-    numberOfExpectedRenewals++; // one more initial renewal
-    attempts = 4;
-    while(attempts-- > 0) {
-      try {
-        Thread.sleep(3*1000); // sleep 3 seconds, so it has time to renew
-      } catch (InterruptedException e) {}
-      // since we cannot guarantee timely execution - let's give few chances
-      if(dfs.getCounter()==numberOfExpectedRenewals)
-        break;
-    }
+    numberOfExpectedRenewals = dfs.getCounter(); // number of renewals so far
+    try {
+      Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew
+    } catch (InterruptedException e) {}
     System.out.println("Counter = " + dfs.getCounter() + ";t="+dfs.getToken());
     
     // counter and the token should stil be the old ones



Mime
View raw message