hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077187 - in /hadoop/common/branches/branch-0.20-security-patches/src: hdfs/org/apache/hadoop/hdfs/security/token/delegation/ mapred/ mapred/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapreduce/ mapred/org/apache/hadoop/mapreduce/s...
Date Fri, 04 Mar 2011 03:49:54 GMT
Author: omalley
Date: Fri Mar  4 03:49:54 2011
New Revision: 1077187

URL: http://svn.apache.org/viewvc?rev=1077187&view=rev
Log:
commit a1654131f5aca0bbb7058c36bb8c2806056d4c0c
Author: Boris Shkolnik <borya@yahoo-inc.com>
Date:   Sun Feb 21 22:10:38 2010 -0800

    MAPREDUCE:1430 from https://issues.apache.org/jira/secure/attachment/12436542/1430-dd4-BP20.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1430. JobTracker should be able to renew delegation tokens
    +    for the jobs(boryas)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenIdentifier.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Job.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenIdentifier.java?rev=1077187&r1=1077186&r2=1077187&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenIdentifier.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenIdentifier.java
Fri Mar  4 03:49:54 2011
@@ -28,7 +28,7 @@ import org.apache.hadoop.security.token.
 //@InterfaceAudience.Private
 public class DelegationTokenIdentifier 
     extends AbstractDelegationTokenIdentifier {
-  static final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN");
+  public static final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN");
 
   /**
    * Create an empty delegation token identifier for reading into.

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml?rev=1077187&r1=1077186&r2=1077187&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml Fri
Mar  4 03:49:54 2011
@@ -717,6 +717,14 @@
   </property>
 
   <property>
+    <name>mapreduce.job.complete.cancel.delegation.tokens</name>
+    <value>true</value>
+    <description> if false - do not unregister/cancel delegation tokens
+    from renewal, because same tokens may be used by spawned jobs
+    </description>
+  </property>
+
+  <property>
     <name>mapred.task.profile</name>
     <value>false</value>
     <description>To set whether the system should collect profiler

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077187&r1=1077186&r2=1077187&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Fri Mar  4 03:49:54 2011
@@ -21,8 +21,6 @@ import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
@@ -38,32 +36,34 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.JobHistory.Values;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
+import org.apache.hadoop.security.TokenStorage;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
-import org.apache.hadoop.mapreduce.JobSubmissionFiles;
-import org.apache.hadoop.mapreduce.split.*;
-import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 
 /*************************************************************
  * JobInProgress maintains all the info for keeping
@@ -394,6 +394,10 @@ class JobInProgress {
     this.nonRunningReduces = new LinkedList<TaskInProgress>();    
     this.runningReduces = new LinkedHashSet<TaskInProgress>();
     this.resourceEstimator = new ResourceEstimator(this);
+    
+    // register job's tokens for renewal
+    DelegationTokenRenewal.registerDelegationTokensForRenewal(
+         jobInfo.getJobID(), ts, this.conf);
   }
 
   /**
@@ -2881,37 +2885,43 @@ class JobInProgress {
    * from all tables.  Be sure to remove all of this job's tasks
    * from the various tables.
    */
-  synchronized void garbageCollect() {
-    // Cancel task tracker reservation
-    cancelReservedSlots();
-    
-    // Let the JobTracker know that a job is complete
-    jobtracker.getInstrumentation().decWaitingMaps(getJobID(), pendingMaps());
-    jobtracker.getInstrumentation().decWaitingReduces(getJobID(), pendingReduces());
-    jobtracker.storeCompletedJob(this);
-    jobtracker.finalizeJob(this);
-      
-    try {
-      // Definitely remove the local-disk copy of the job file
-      if (localJobFile != null) {
-        localFs.delete(localJobFile, true);
-        localJobFile = null;
-      }
-
-      Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
-      new CleanupQueue().addToQueue(new PathDeletionContext(
-          jobtracker.getFileSystem(), tempDir.toUri().getPath())); 
-    } catch (IOException e) {
-      LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
-    }
-    
-    cleanUpMetrics();
-    // free up the memory used by the data structures
-    this.nonRunningMapCache = null;
-    this.runningMapCache = null;
-    this.nonRunningReduces = null;
-    this.runningReduces = null;
+  void garbageCollect() {
+    synchronized(this) {
+      // Cancel task tracker reservation
+      cancelReservedSlots();
 
+      // Let the JobTracker know that a job is complete
+      jobtracker.getInstrumentation().decWaitingMaps(getJobID(), pendingMaps());
+      jobtracker.getInstrumentation().decWaitingReduces(getJobID(), pendingReduces());
+      jobtracker.storeCompletedJob(this);
+      jobtracker.finalizeJob(this);
+
+      try {
+        // Definitely remove the local-disk copy of the job file
+        if (localJobFile != null) {
+          localFs.delete(localJobFile, true);
+          localJobFile = null;
+        }
+
+        Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
+        new CleanupQueue().addToQueue(new PathDeletionContext(
+            jobtracker.getFileSystem(), tempDir.toUri().getPath())); 
+      } catch (IOException e) {
+        LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
+      }
+
+      cleanUpMetrics();
+      // free up the memory used by the data structures
+      this.nonRunningMapCache = null;
+      this.runningMapCache = null;
+      this.nonRunningReduces = null;
+      this.runningReduces = null;
+    }
+    
+    // remove jobs delegation tokens
+    if(conf.getBoolean(JobContext.JOB_CANCEL_DELEGATION_TOKEN, true)) {
+      DelegationTokenRenewal.removeDelegationTokenRenewalForJob(jobId);
+    } // else don't remove it.May be used by spawned tasks
   }
 
   /**

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Job.java?rev=1077187&r1=1077186&r2=1077187&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Job.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Job.java
Fri Mar  4 03:49:54 2011
@@ -387,6 +387,15 @@ public class Job extends JobContext {  
       throw new IOException(attr + " is incompatible with " + msg + " mode.");
     }    
   }
+  
+  /**
+   * Sets the flag that will allow the JobTracker to cancel the HDFS delegation
+   * tokens upon job completion. Defaults to true.
+   */
+  public void setCancelDelegationTokenUponJobCompletion(boolean value) {
+    ensureState(JobState.DEFINE);
+    conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value);
+  }
 
   /**
    * Default to the new APIs unless they are explicitly set or the old mapper or

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java?rev=1077187&r1=1077186&r2=1077187&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
Fri Mar  4 03:49:54 2011
@@ -56,6 +56,9 @@ public class JobContext {
   public static final String CACHE_ARCHIVES_VISIBILITIES = 
     "mapreduce.job.cache.archives.visibilities";
   
+  public static final String JOB_CANCEL_DELEGATION_TOKEN = 
+    "mapreduce.job.complete.cancel.delegation.tokens";
+  
   public JobContext(Configuration conf, JobID jobId) {
     this.conf = new org.apache.hadoop.mapred.JobConf(conf);
     this.jobId = jobId;

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java?rev=1077187&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
(added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
Fri Mar  4 03:49:54 2011
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security.token;
+
+import java.net.URI;
+import java.security.AccessControlException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+
+
+//@InterfaceAudience.Private
+public class DelegationTokenRenewal {
+  private static final Log LOG = LogFactory.getLog(DelegationTokenRenewal.class);
+  public static final String SCHEME = "hdfs";
+  
+  /**
+   * class that is used for keeping tracks of DT to renew
+   *
+   */
+  private static class DelegationTokenToRenew {
+    public final Token<DelegationTokenIdentifier> token;
+    public final JobID jobId;
+    public final Configuration conf;
+    public long expirationDate;
+    public TimerTask timerTask;
+    
+    public DelegationTokenToRenew(
+        JobID jId, Token<DelegationTokenIdentifier> t, 
+        Configuration newConf, long newExpirationDate) {
+      token = t;
+      jobId = jId;
+      conf = newConf;
+      expirationDate = newExpirationDate;
+      timerTask = null;
+      if(token==null || jobId==null || conf==null) {
+        throw new IllegalArgumentException("invalid params for Renew Token" +
+            ";t="+token+";j="+jobId+";c="+conf);
+      }
+    }
+    public void setTimerTask(TimerTask tTask) {
+      timerTask = tTask;
+    }
+    public String toString() {
+      return token + ";exp="+expirationDate;
+    }
+  }
+  
+  // global single timer (daemon)
+  private static Timer renewalTimer = new Timer(true);
+  
+  //managing the list of tokens using Map
+  // jobId=>List<tokens>
+  private static Map<JobID, List<DelegationTokenToRenew>> delegationTokens =

+    Collections.synchronizedMap(new HashMap<JobID, 
+                                       List<DelegationTokenToRenew>>());
+  //adding token
+  private static void addTokenToMap(DelegationTokenToRenew t) {
+    // see if a list already exists
+    JobID jobId = t.jobId;
+    List<DelegationTokenToRenew> l = delegationTokens.get(jobId);
+    if(l==null) {
+      l = new ArrayList<DelegationTokenToRenew>();
+      delegationTokens.put(jobId, l);
+    }
+    l.add(t);
+  }
+  
+  // kind of tokens we currently renew
+  private static final Text kindHdfs = 
+    DelegationTokenIdentifier.HDFS_DELEGATION_KIND;
+  
+  @SuppressWarnings("unchecked")
+  public static synchronized void registerDelegationTokensForRenewal(
+      JobID jobId, TokenStorage ts, Configuration conf) {
+    if(ts==null)
+      return; //nothing to add
+    
+    Collection <Token<? extends TokenIdentifier>> tokens = ts.getAllTokens();
+    long now = System.currentTimeMillis();
+    
+    for(Token<? extends TokenIdentifier> t : tokens) {
+      // currently we only check for HDFS delegation tokens
+      // later we can add more different types.
+      if(! t.getKind().equals(kindHdfs)) {
+        continue; 
+      }
+      Token<DelegationTokenIdentifier> dt = 
+        (Token<DelegationTokenIdentifier>)t;
+      
+      // first renew happens immediately
+      DelegationTokenToRenew dtr = 
+        new DelegationTokenToRenew(jobId, dt, conf, now); 
+
+      addTokenToMap(dtr);
+      
+      setTimerForTokenRenewal(dtr, true);
+      LOG.info("registering token for renewal for service =" + dt.getService()+
+          " and jobID = " + jobId);
+    }
+  }
+  
+  private static long renewDelegationToken(DelegationTokenToRenew dttr) 
+  throws Exception {
+    long newExpirationDate=System.currentTimeMillis()+3600*1000;
+    Token<DelegationTokenIdentifier> token = dttr.token;
+    Configuration conf = dttr.conf;
+    
+    if(token.getKind().equals(kindHdfs)) {
+      try {
+        DistributedFileSystem dfs = getDFSForToken(token, conf);
+        newExpirationDate = dfs.renewDelegationToken(token);
+      } catch (InvalidToken ite) {
+        LOG.warn("token canceled - not scheduling for renew");
+        removeFailedDelegationToken(dttr);
+        throw new Exception("failed to renew token", ite);
+      } catch (AccessControlException ace) {
+        LOG.warn("token canceled - not scheduling for renew");
+        removeFailedDelegationToken(dttr);
+        throw new Exception("failed to renew token", ace);
+      } catch (Exception ioe) {
+        LOG.warn("failed to renew token:"+token, ioe);
+        // returns default expiration date
+      }
+    } else {
+      throw new Exception("unknown token type to renew+"+token.getKind());
+    }
+    return newExpirationDate;
+  }
+
+  
+  /**
+   * Task - to renew a token
+   *
+   */
+  private static class RenewalTimerTask extends TimerTask {
+    private DelegationTokenToRenew dttr;
+    
+    RenewalTimerTask(DelegationTokenToRenew t) {  dttr = t;  }
+    
+    @Override
+    public void run() {
+      Token<DelegationTokenIdentifier> token = dttr.token;
+      long newExpirationDate=0;
+      try {
+        newExpirationDate = renewDelegationToken(dttr);
+      } catch (Exception e) {
+        return; // message logged in renewDT method
+      }
+      if (LOG.isDebugEnabled())
+        LOG.debug("renewing for:"+token.getService()+";newED=" + 
+            newExpirationDate);
+      
+      // new expiration date
+      dttr.expirationDate = newExpirationDate;
+      setTimerForTokenRenewal(dttr, false);// set the next one
+    }
+  }
+  
+  private static DistributedFileSystem getDFSForToken(
+      Token<DelegationTokenIdentifier> token, Configuration conf) 
+  throws Exception {
+    DistributedFileSystem dfs = null;
+    try {
+      URI uri = new URI (SCHEME + "://" + token.getService().toString());
+      dfs =  (DistributedFileSystem) FileSystem.get(uri, conf);
+    } catch (Exception e) {
+      LOG.warn("Failed to create a dfs to renew for:" + token.getService(), e);
+      throw e;
+    } 
+    return dfs;
+  }
+  
+  /**
+   * find the soonest expiring token and set it for renew
+   */
+  private static void setTimerForTokenRenewal(
+      DelegationTokenToRenew token, boolean firstTime) {
+      
+    // calculate timer time
+    long now = System.currentTimeMillis();
+    long renewIn;
+    if(firstTime) {
+      renewIn = now;
+    } else {
+      long expiresIn = (token.expirationDate - now); 
+      renewIn = now + expiresIn - expiresIn/10; // little before expiration
+    }
+    
+    try {
+      // need to create new timer every time
+      TimerTask tTask = new RenewalTimerTask(token);
+      token.setTimerTask(tTask); // keep reference to the timer
+
+      renewalTimer.schedule(token.timerTask, new Date(renewIn));
+    } catch (Exception e) {
+      LOG.warn("failed to schedule a task, token will not renew more", e);
+    }
+  }
+
+  /**
+   * removing all tokens renewals
+   */
+  static public void close() {
+    renewalTimer.cancel();
+    delegationTokens.clear();
+  }
+  
+  // cancel a token
+  private static void cancelToken(DelegationTokenToRenew t) {
+    Token<DelegationTokenIdentifier> token = t.token;
+    Configuration conf = t.conf;
+    
+    if(token.getKind().equals(kindHdfs)) {
+      try {
+        DistributedFileSystem dfs = getDFSForToken(token, conf);
+        if (LOG.isDebugEnabled())
+          LOG.debug("canceling token " + token.getService() + " for dfs=" +
+              dfs);
+        dfs.cancelDelegationToken(token);
+      } catch (Exception e) {
+        LOG.warn("Failed to cancel " + token, e);
+      }
+    }
+  }
+  
+  /**
+   * removing failed DT
+   * @param jobId
+   */
+  private static void removeFailedDelegationToken(DelegationTokenToRenew t) {
+    JobID jobId = t.jobId;
+    List<DelegationTokenToRenew> l = delegationTokens.get(jobId);
+    if(l==null) return;
+
+    Iterator<DelegationTokenToRenew> it = l.iterator();
+    while(it.hasNext()) {
+      DelegationTokenToRenew dttr = it.next();
+      if(dttr == t) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("removing failed delegation token for jobid=" + jobId + 
+            ";t=" + dttr.token.getService());
+
+        // cancel the timer
+        if(dttr.timerTask!=null)
+          dttr.timerTask.cancel();
+
+        // no need to cancel the token - it is invalid
+        it.remove();
+        break; //should be only one
+      }
+    }
+  }
+  
+  /**
+   * removing DT for completed jobs
+   * @param jobId
+   */
+  public static void removeDelegationTokenRenewalForJob(JobID jobId) {
+    List<DelegationTokenToRenew> l = delegationTokens.remove(jobId);
+    if(l==null) return;
+
+    Iterator<DelegationTokenToRenew> it = l.iterator();
+    while(it.hasNext()) {
+      DelegationTokenToRenew dttr = it.next();
+      if (LOG.isDebugEnabled())
+        LOG.debug("removing delegation token for jobid=" + jobId + 
+          ";t=" + dttr.token.getService());
+
+      // cancel the timer
+      if(dttr.timerTask!=null)
+        dttr.timerTask.cancel();
+
+      // cancel the token
+      cancelToken(dttr);
+
+      it.remove();
+    }
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java?rev=1077187&r1=1077186&r2=1077187&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java
Fri Mar  4 03:49:54 2011
@@ -18,8 +18,7 @@
 package org.apache.hadoop.mapreduce.security;
 
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 import java.io.File;
 import java.io.IOException;
@@ -59,7 +58,6 @@ import org.codehaus.jackson.map.ObjectMa
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import static org.junit.Assert.*;
 
 
 public class TestTokenCache {
@@ -275,7 +273,7 @@ public class TestTokenCache {
     for(Token<? extends TokenIdentifier> t: tns) {
       System.out.println("kind="+t.getKind() + ";servic=" + t.getService() + ";str=" + t.toString());
 
-      if(t.getKind().equals(new Text("HDFS_DELEGATION_TOKEN")) &&
+      if(t.getKind().equals(DelegationTokenIdentifier.HDFS_DELEGATION_KIND) &&
           t.getService().equals(new Text(fs_addr))) {
         found = true;
       }

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java?rev=1077187&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
(added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
Fri Mar  4 03:49:54 2011
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security.token;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * unit test - 
+ * tests addition/deletion/cancelation of renewals of delegation tokens
+ *
+ */
+public class TestDelegationTokenRenewal {
+  private static final Log LOG = 
+      LogFactory.getLog(TestDelegationTokenRenewal.class);
+
+  private static Configuration conf;
+ 
+  @BeforeClass
+  public static void setUp() throws Exception {
+    conf = new Configuration();
+    
+    // create a fake FileSystem (MyFS) and assosiate it
+    // with "hdfs" schema.
+    URI uri = new URI(DelegationTokenRenewal.SCHEME+"://localhost:0");
+    System.out.println("scheme is : " + uri.getScheme());
+    conf.setClass("fs." + uri.getScheme() + ".impl", MyFS.class, DistributedFileSystem.class);
+    FileSystem.setDefaultUri(conf, uri);
+    System.out.println("filesystem uri = " + FileSystem.getDefaultUri(conf).toString());
+  }
+  
+  /**
+   * add some extra functionality for testing
+   * 1. toString();
+   * 2. cancel() and isCanceled()
+   */
+  private static class MyToken extends Token<DelegationTokenIdentifier> {
+    public String status = "GOOD";
+    public static final String CANCELED = "CANCELED";
+
+    public MyToken(DelegationTokenIdentifier dtId1,
+        DelegationTokenSecretManager sm) {
+      super(dtId1, sm);
+      status = "GOOD";
+    }
+    
+    public boolean isCanceled() {return status.equals(CANCELED);}
+    public void cancelToken() {this.status=CANCELED;}
+    public String toString() {
+      StringBuilder sb = new StringBuilder(1024);
+      
+      sb.append("id=");
+      String id = StringUtils.byteToHexString(this.getIdentifier());
+      int idLen = id.length();
+      sb.append(id.substring(idLen-6));
+      sb.append(";k=");
+      sb.append(this.getKind());
+      sb.append(";s=");
+      sb.append(this.getService());
+      return sb.toString();
+    }
+  }
+
+  /**
+   * fake FileSystem 
+   * overwrites three methods
+   * 1. getDelegationToken() - generates a token
+   * 2. renewDelegataionToken - counts number of calls, and remembers 
+   * most recently renewed token.
+   * 3. cancelToken -cancels token (subsequent renew will cause IllegalToken 
+   * exception
+   */
+  static class MyFS extends DistributedFileSystem {
+    int counter=0;
+    MyToken token;
+    MyToken tokenToRenewIn2Sec;
+    
+    public MyFS() {}
+    public void close() {}
+    @Override
+    public void initialize(URI uri, Configuration conf) throws IOException {}
+    
+    @Override
+    public long renewDelegationToken(Token<DelegationTokenIdentifier> t)
+    throws InvalidToken, IOException {
+      MyToken token = (MyToken)t;
+      if(token.isCanceled()) {
+        throw new InvalidToken("token has been canceled");
+      }
+      counter ++;
+      this.token = (MyToken)token;
+      System.out.println("Called MYDFS.renewdelegationtoken " + token);
+      if(tokenToRenewIn2Sec == token) { 
+        // this token first renewal in 2 seconds
+        System.out.println("RENEW in 2 seconds");
+        tokenToRenewIn2Sec=null;
+        return 2*1000 + System.currentTimeMillis();
+      } else {
+        return 86400*1000 + System.currentTimeMillis();
+      }
+    }
+    @Override 
+    public MyToken getDelegationToken(Text renewer)
+    throws IOException {
+      System.out.println("Called MYDFS.getdelegationtoken");
+      return createTokens(renewer);
+    }
+    @Override
+    public void cancelDelegationToken(Token<DelegationTokenIdentifier> t)
+    throws IOException {
+      MyToken token = (MyToken)t;
+      token.cancelToken();
+    }
+
+    public void setTokenToRenewIn2Sec(MyToken t) {tokenToRenewIn2Sec=t;}
+    public int getCounter() {return counter; }
+    public MyToken getToken() {return token;}
+  }
+  
+  /**
+   * auxilary - create token
+   * @param renewer
+   * @return
+   * @throws IOException
+   */
+  static MyToken createTokens(Text renewer) 
+    throws IOException {
+    Text user1= new Text("user1");
+    
+    DelegationTokenSecretManager sm = new DelegationTokenSecretManager(
+        DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
+        DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
+        DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT,
+        3600000);
+    sm.startThreads();
+    
+    DelegationTokenIdentifier dtId1 = 
+      new DelegationTokenIdentifier(user1, renewer, user1);
+    
+    MyToken token1 = new MyToken(dtId1, sm);
+    
+   
+    token1.setService(new Text("localhost:0"));
+    return token1;
+  }
+  
+  
+  /**
+   * Basic idea of the test:
+   * 1. create tokens.
+   * 2. Mark one of them to be renewed in 2 seconds (istead of
+   * 24 hourse)
+   * 3. register them for renewal
+   * 4. sleep for 3 seconds
+   * 5. count number of renewals (should 3 initial ones + one extra)
+   * 6. register another token for 2 seconds 
+   * 7. cancel it immediately
+   * 8. Sleep and check that the 2 seconds renew didn't happen 
+   * (totally 5 reneals)
+   * 9. check cancelation
+   * @throws IOException
+   * @throws URISyntaxException
+   */
+  @Test
+  public void testDTRenewal () throws IOException, URISyntaxException {
+    MyFS dfs = (MyFS)FileSystem.get(conf);
+    System.out.println("dfs="+(Object)dfs);
+    // Test 1. - add three tokens - make sure exactly one get's renewed
+    
+    // get the delegation tokens
+    MyToken token1, token2, token3;
+    token1 = dfs.getDelegationToken(new Text("user1"));
+    token2 = dfs.getDelegationToken(new Text("user2"));
+    token3 = dfs.getDelegationToken(new Text("user3"));
+
+    //to cause this one to be set for renew in 2 secs
+    dfs.setTokenToRenewIn2Sec(token1); 
+    System.out.println("token="+token1+" should be renewed for 2 secs");
+    
+    // two distinct Namenodes
+    String nn1 = DelegationTokenRenewal.SCHEME + "://host1:0";
+    String nn2 = DelegationTokenRenewal.SCHEME + "://host2:0";
+    String nn3 = DelegationTokenRenewal.SCHEME + "://host3:0";
+    
+    TokenStorage ts = new TokenStorage();
+    
+    // register the token for renewal
+    ts.addToken(new Text(nn1), token1);
+    ts.addToken(new Text(nn2), token2);
+    ts.addToken(new Text(nn3), token3);
+    
+    // register the tokens for renewal
+    DelegationTokenRenewal.registerDelegationTokensForRenewal(
+        new JobID("job1", 1), ts, conf);
+    // first 3 initial renewals + 1 real
+    int numberOfExpectedRenewals = 3+1; 
+    
+    int attempts = 4;
+    while(attempts-- > 0) {
+      try {
+        Thread.sleep(3*1000); // sleep 3 seconds, so it has time to renew
+      } catch (InterruptedException e) {}
+      
+      // since we cannot guarantee timely execution - let's give few chances
+      if(dfs.getCounter()==numberOfExpectedRenewals)
+        break;
+    }
+    
+    System.out.println("Counter = " + dfs.getCounter() + ";t="+
+        dfs.getToken());
+    assertEquals("renew wasn't called as many times as expected(4):",
+        numberOfExpectedRenewals, dfs.getCounter());
+    assertEquals("most recently renewed token mismatch", dfs.getToken(), 
+        token1);
+    
+    // Test 2. 
+    // add another token ( that expires in 2 secs). Then remove it, before
+    // time is up.
+    // Wait for 3 secs , and make sure no renews were called
+    ts = new TokenStorage();
+    MyToken token4 = dfs.getDelegationToken(new Text("user4"));
+    
+    //to cause this one to be set for renew in 2 secs
+    dfs.setTokenToRenewIn2Sec(token4); 
+    System.out.println("token="+token4+" should be renewed for 2 secs");
+    
+    String nn4 = DelegationTokenRenewal.SCHEME + "://host4:0";
+    ts.addToken(new Text(nn4), token4);
+    
+
+    JobID jid2 = new JobID("job2",1);
+    DelegationTokenRenewal.registerDelegationTokensForRenewal(jid2, ts, conf);
+    DelegationTokenRenewal.removeDelegationTokenRenewalForJob(jid2);
+    numberOfExpectedRenewals++; // one more initial renewal
+    attempts = 4;
+    while(attempts-- > 0) {
+      try {
+        Thread.sleep(3*1000); // sleep 3 seconds, so it has time to renew
+      } catch (InterruptedException e) {}
+      // since we cannot guarantee timely execution - let's give few chances
+      if(dfs.getCounter()==numberOfExpectedRenewals)
+        break;
+    }
+    System.out.println("Counter = " + dfs.getCounter() + ";t="+dfs.getToken());
+    
+    // counter and the token should stil be the old ones
+    assertEquals("renew wasn't called as many times as expected",
+        numberOfExpectedRenewals, dfs.getCounter());
+    
+    // also renewing of the cancelled token should fail
+    boolean exception=false;
+    try {
+      dfs.renewDelegationToken(token4);
+    } catch (InvalidToken ite) {
+      //expected
+      exception = true;
+    }
+    assertTrue("Renew of canceled token didn't fail", exception);
+  }
+}



Mime
View raw message