hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r912471 - in /hadoop/mapreduce/trunk: ./ src/java/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/hadoop/mapreduce/security/token/ src/test/mapred/org/apache/hadoop/mapreduce/security/ src/test/...
Date Mon, 22 Feb 2010 03:09:19 GMT
Author: ddas
Date: Mon Feb 22 03:09:19 2010
New Revision: 912471

URL: http://svn.apache.org/viewvc?rev=912471&view=rev
Log:
MAPREDUCE-1430. JobTracker automatically renews delegation tokens for jobs. Contributed by
Boris Shkolnik.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/mapred-default.xml
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=912471&r1=912470&r2=912471&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Feb 22 03:09:19 2010
@@ -60,6 +60,9 @@
     MAPREDUCE-1307. Introduces the Job level ACLs feature. 
     (Vinod Kumar Vavilapalli via ddas)
 
+    MAPREDUCE-1430. JobTracker automatically renews delegation tokens for jobs.
+    (Boris Shkolnik via ddas)
+
   IMPROVEMENTS
 
     MAPREDUCE-1198. Alternatively schedule different types of tasks in

Modified: hadoop/mapreduce/trunk/src/java/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/mapred-default.xml?rev=912471&r1=912470&r2=912471&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/mapred-default.xml (original)
+++ hadoop/mapreduce/trunk/src/java/mapred-default.xml Mon Feb 22 03:09:19 2010
@@ -1021,6 +1021,14 @@
 </property>
 
 <property>
+<name>mapreduce.job.complete.cancel.delegation.tokens</name>
+  <value>true</value>
+  <description> if false - do not unregister/cancel delegation tokens from 
+    renewal, because same tokens may be used by spawned jobs
+  </description>
+</property>
+
+<property>
   <name>mapreduce.tasktracker.taskcontroller</name>
   <value>org.apache.hadoop.mapred.DefaultTaskController</value>
   <description>TaskController which is used to launch and manage task execution 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=912471&r1=912470&r2=912471&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Mon Feb 22
03:09:19 2010
@@ -68,6 +68,7 @@
 import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 import org.apache.hadoop.mapreduce.split.JobSplit;
@@ -461,7 +462,10 @@
     this.speculativeCap = conf.getFloat(
         JobContext.SPECULATIVECAP,0.1f);
     this.slowNodeThreshold = conf.getFloat(
-        JobContext.SPECULATIVE_SLOWNODE_THRESHOLD,1.0f);
+        JobContext.SPECULATIVE_SLOWNODE_THRESHOLD,1.0f); 
+    // register job's tokens for renewal
+    DelegationTokenRenewal.registerDelegationTokensForRenewal(
+        jobInfo.getJobID(), ts, this.conf);
   }
 
   /**
@@ -3264,38 +3268,45 @@
    * from all tables.  Be sure to remove all of this job's tasks
    * from the various tables.
    */
-  synchronized void garbageCollect() {
-    // Cancel task tracker reservation
-    cancelReservedSlots();
-    
-    // Let the JobTracker know that a job is complete
-    jobtracker.getInstrumentation().decWaitingMaps(getJobID(), pendingMaps());
-    jobtracker.getInstrumentation().decWaitingReduces(getJobID(), pendingReduces());
-    jobtracker.storeCompletedJob(this);
-    jobtracker.finalizeJob(this);
-      
-    try {
-      // Definitely remove the local-disk copy of the job file
-      if (localJobFile != null) {
-        localFs.delete(localJobFile, true);
-        localJobFile = null;
-      }
-
-      Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
-      new CleanupQueue().addToQueue(new PathDeletionContext(
-          jobtracker.getFileSystem(), tempDir.toUri().getPath())); 
-    } catch (IOException e) {
-      LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
-    }
-    
-    cleanUpMetrics();
-    // free up the memory used by the data structures
-    this.nonRunningMapCache = null;
-    this.runningMapCache = null;
-    this.nonRunningReduces = null;
-    this.runningReduces = null;
+   void garbageCollect() {
+     synchronized(this) {
+       // Cancel task tracker reservation
+       cancelReservedSlots();
+
+
+       // Let the JobTracker know that a job is complete
+       jobtracker.getInstrumentation().decWaitingMaps(getJobID(), pendingMaps());
+       jobtracker.getInstrumentation().decWaitingReduces(getJobID(), pendingReduces());
+       jobtracker.storeCompletedJob(this);
+       jobtracker.finalizeJob(this);
+
+       try {
+         // Definitely remove the local-disk copy of the job file
+         if (localJobFile != null) {
+           localFs.delete(localJobFile, true);
+           localJobFile = null;
+         }
+
+         Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
+         new CleanupQueue().addToQueue(new PathDeletionContext(
+             jobtracker.getFileSystem(), tempDir.toUri().getPath())); 
+       } catch (IOException e) {
+         LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
+       }
 
-  }
+       cleanUpMetrics();
+       // free up the memory used by the data structures
+       this.nonRunningMapCache = null;
+       this.runningMapCache = null;
+       this.nonRunningReduces = null;
+       this.runningReduces = null;
+
+     }
+     // remove jobs delegation tokens
+     if(conf.getBoolean(JobContext.JOB_CANCEL_DELEGATION_TOKEN, true)) {
+       DelegationTokenRenewal.removeDelegationTokenRenewalForJob(jobId);
+     } // else don't remove it.May be used by spawned tasks
+   }
 
   /**
    * Return the TaskInProgress that matches the tipid.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java?rev=912471&r1=912470&r2=912471&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java Mon Feb 22 03:09:19
2010
@@ -895,6 +895,15 @@
       throw new IOException(attr + " is incompatible with " + msg + " mode.");
     }    
   }
+  
+  /**
+   * Sets the flag that will allow the JobTracker to cancel the HDFS delegation
+   * tokens upon job completion. Defaults to true.
+   */
+  public void setCancelDelegationTokenUponJobCompletion(boolean value) {
+    ensureState(JobState.DEFINE);
+    conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value);
+  }
 
   /**
    * Default to the new APIs unless they are explicitly set or the old mapper or

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=912471&r1=912470&r2=912471&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java Mon Feb 22
03:09:19 2010
@@ -229,7 +229,8 @@
     "mapreduce.reduce.merge.memtomem.enabled";
   public static final String JOB_NAMENODES = "mapreduce.job.hdfs-servers";
   public static final String JOB_JOBTRACKER_ID = "mapreduce.job.kerberos.jtprinicipal";
-
+  public static final String JOB_CANCEL_DELEGATION_TOKEN = 
+    "mapreduce.job.complete.cancel.delegation.tokens";
   public static final String JOB_ACL_VIEW_JOB =
       "mapreduce.job.acl-view-job";
   public static final String JOB_ACL_MODIFY_JOB =

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java?rev=912471&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
(added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
Mon Feb 22 03:09:19 2010
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security.token;
+
+import java.net.URI;
+import java.security.AccessControlException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+
+
+@InterfaceAudience.Private
+public class DelegationTokenRenewal {
+  private static final Log LOG = LogFactory.getLog(DelegationTokenRenewal.class);
+  public static final String SCHEME = "hdfs";
+  
+  /**
+   * class that is used for keeping tracks of DT to renew
+   *
+   */
+  private static class DelegationTokenToRenew {
+    public final Token<DelegationTokenIdentifier> token;
+    public final JobID jobId;
+    public final Configuration conf;
+    public long expirationDate;
+    public TimerTask timerTask;
+    
+    public DelegationTokenToRenew(
+        JobID jId, Token<DelegationTokenIdentifier> t, 
+        Configuration newConf, long newExpirationDate) {
+      token = t;
+      jobId = jId;
+      conf = newConf;
+      expirationDate = newExpirationDate;
+      timerTask = null;
+      if(token==null || jobId==null || conf==null) {
+        throw new IllegalArgumentException("invalid params for Renew Token" +
+            ";t="+token+";j="+jobId+";c="+conf);
+      }
+    }
+    public void setTimerTask(TimerTask tTask) {
+      timerTask = tTask;
+    }
+    public String toString() {
+      return token + ";exp="+expirationDate;
+    }
+  }
+  
+  // global single timer (daemon)
+  private static Timer renewalTimer = new Timer(true);
+  
+  //managing the list of tokens using Map
+  // jobId=>List<tokens>
+  private static Map<JobID, List<DelegationTokenToRenew>> delegationTokens =

+    Collections.synchronizedMap(new HashMap<JobID, 
+                                       List<DelegationTokenToRenew>>());
+  //adding token
+  private static void addTokenToMap(DelegationTokenToRenew t) {
+    // see if a list already exists
+    JobID jobId = t.jobId;
+    List<DelegationTokenToRenew> l = delegationTokens.get(jobId);
+    if(l==null) {
+      l = new ArrayList<DelegationTokenToRenew>();
+      delegationTokens.put(jobId, l);
+    }
+    l.add(t);
+  }
+  
+  // kind of tokens we currently renew
+  private static final Text kindHdfs = 
+    DelegationTokenIdentifier.HDFS_DELEGATION_KIND;
+  
+  @SuppressWarnings("unchecked")
+  public static synchronized void registerDelegationTokensForRenewal(
+      JobID jobId, TokenStorage ts, Configuration conf) {
+    if(ts==null)
+      return; //nothing to add
+    
+    Collection <Token<? extends TokenIdentifier>> tokens = ts.getAllTokens();
+    long now = System.currentTimeMillis();
+    
+    for(Token<? extends TokenIdentifier> t : tokens) {
+      // currently we only check for HDFS delegation tokens
+      // later we can add more different types.
+      if(! t.getKind().equals(kindHdfs)) {
+        continue; 
+      }
+      Token<DelegationTokenIdentifier> dt = 
+        (Token<DelegationTokenIdentifier>)t;
+      
+      // first renew happens immediately
+      DelegationTokenToRenew dtr = 
+        new DelegationTokenToRenew(jobId, dt, conf, now); 
+
+      addTokenToMap(dtr);
+      
+      setTimerForTokenRenewal(dtr, true);
+      LOG.info("registering token for renewal for service =" + dt.getService()+
+          " and jobID = " + jobId);
+    }
+  }
+  
+  private static long renewDelegationToken(DelegationTokenToRenew dttr) 
+  throws Exception {
+    long newExpirationDate=System.currentTimeMillis()+3600*1000;
+    Token<DelegationTokenIdentifier> token = dttr.token;
+    Configuration conf = dttr.conf;
+    
+    if(token.getKind().equals(kindHdfs)) {
+      try {
+        DistributedFileSystem dfs = getDFSForToken(token, conf);
+        newExpirationDate = dfs.renewDelegationToken(token);
+      } catch (InvalidToken ite) {
+        LOG.warn("token canceled - not scheduling for renew");
+        removeFailedDelegationToken(dttr);
+        throw new Exception("failed to renew token", ite);
+      } catch (AccessControlException ace) {
+        LOG.warn("token canceled - not scheduling for renew");
+        removeFailedDelegationToken(dttr);
+        throw new Exception("failed to renew token", ace);
+      } catch (Exception ioe) {
+        LOG.warn("failed to renew token:"+token, ioe);
+        // returns default expiration date
+      }
+    } else {
+      throw new Exception("unknown token type to renew+"+token.getKind());
+    }
+    return newExpirationDate;
+  }
+
+  
+  /**
+   * Task - to renew a token
+   *
+   */
+  private static class RenewalTimerTask extends TimerTask {
+    private DelegationTokenToRenew dttr;
+    
+    RenewalTimerTask(DelegationTokenToRenew t) {  dttr = t;  }
+    
+    @Override
+    public void run() {
+      Token<DelegationTokenIdentifier> token = dttr.token;
+      long newExpirationDate=0;
+      try {
+        newExpirationDate = renewDelegationToken(dttr);
+      } catch (Exception e) {
+        return; // message logged in renewDT method
+      }
+      if (LOG.isDebugEnabled())
+        LOG.debug("renewing for:"+token.getService()+";newED=" + 
+            newExpirationDate);
+      
+      // new expiration date
+      dttr.expirationDate = newExpirationDate;
+      setTimerForTokenRenewal(dttr, false);// set the next one
+    }
+  }
+  
+  private static DistributedFileSystem getDFSForToken(
+      Token<DelegationTokenIdentifier> token, Configuration conf) 
+  throws Exception {
+    DistributedFileSystem dfs = null;
+    try {
+      URI uri = new URI (SCHEME + "://" + token.getService().toString());
+      dfs =  (DistributedFileSystem) FileSystem.get(uri, conf);
+    } catch (Exception e) {
+      LOG.warn("Failed to create a dfs to renew for:" + token.getService(), e);
+      throw e;
+    } 
+    return dfs;
+  }
+  
+  /**
+   * find the soonest expiring token and set it for renew
+   */
+  private static void setTimerForTokenRenewal(
+      DelegationTokenToRenew token, boolean firstTime) {
+      
+    // calculate timer time
+    long now = System.currentTimeMillis();
+    long renewIn;
+    if(firstTime) {
+      renewIn = now;
+    } else {
+      long expiresIn = (token.expirationDate - now); 
+      renewIn = now + expiresIn - expiresIn/10; // little before expiration
+    }
+    
+    try {
+      // need to create new timer every time
+      TimerTask tTask = new RenewalTimerTask(token);
+      token.setTimerTask(tTask); // keep reference to the timer
+
+      renewalTimer.schedule(token.timerTask, new Date(renewIn));
+    } catch (Exception e) {
+      LOG.warn("failed to schedule a task, token will not renew more", e);
+    }
+  }
+
+  /**
+   * removing all tokens renewals
+   */
+  static public void close() {
+    renewalTimer.cancel();
+    delegationTokens.clear();
+  }
+  
+  // cancel a token
+  private static void cancelToken(DelegationTokenToRenew t) {
+    Token<DelegationTokenIdentifier> token = t.token;
+    Configuration conf = t.conf;
+    
+    if(token.getKind().equals(kindHdfs)) {
+      try {
+        DistributedFileSystem dfs = getDFSForToken(token, conf);
+        if (LOG.isDebugEnabled())
+          LOG.debug("canceling token " + token.getService() + " for dfs=" +
+              dfs);
+        dfs.cancelDelegationToken(token);
+      } catch (Exception e) {
+        LOG.warn("Failed to cancel " + token, e);
+      }
+    }
+  }
+  
+  /**
+   * removing failed DT
+   * @param jobId
+   */
+  private static void removeFailedDelegationToken(DelegationTokenToRenew t) {
+    JobID jobId = t.jobId;
+    List<DelegationTokenToRenew> l = delegationTokens.get(jobId);
+    if(l==null) return;
+
+    Iterator<DelegationTokenToRenew> it = l.iterator();
+    while(it.hasNext()) {
+      DelegationTokenToRenew dttr = it.next();
+      if(dttr == t) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("removing failed delegation token for jobid=" + jobId + 
+            ";t=" + dttr.token.getService());
+
+        // cancel the timer
+        if(dttr.timerTask!=null)
+          dttr.timerTask.cancel();
+
+        // no need to cancel the token - it is invalid
+        it.remove();
+        break; //should be only one
+      }
+    }
+  }
+  
+  /**
+   * removing DT for completed jobs
+   * @param jobId
+   */
+  public static void removeDelegationTokenRenewalForJob(JobID jobId) {
+    List<DelegationTokenToRenew> l = delegationTokens.remove(jobId);
+    if(l==null) return;
+
+    Iterator<DelegationTokenToRenew> it = l.iterator();
+    while(it.hasNext()) {
+      DelegationTokenToRenew dttr = it.next();
+      if (LOG.isDebugEnabled())
+        LOG.debug("removing delegation token for jobid=" + jobId + 
+          ";t=" + dttr.token.getService());
+
+      // cancel the timer
+      if(dttr.timerTask!=null)
+        dttr.timerTask.cancel();
+
+      // cancel the token
+      cancelToken(dttr);
+
+      it.remove();
+    }
+  }
+}

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java?rev=912471&r1=912470&r2=912471&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
Mon Feb 22 03:09:19 2010
@@ -267,7 +267,7 @@
     for(Token<? extends TokenIdentifier> t: tns) {
       System.out.println("kind="+t.getKind() + ";servic=" + t.getService() + ";str=" + t.toString());
       
-      if(t.getKind().equals(new Text("HDFS_DELEGATION_TOKEN")) &&
+      if(t.getKind().equals(DelegationTokenIdentifier.HDFS_DELEGATION_KIND) &&
           t.getService().equals(new Text(fs_addr))) {
         found = true;
       }

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java?rev=912471&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
(added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
Mon Feb 22 03:09:19 2010
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security.token;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * unit test - 
+ * tests addition/deletion/cancelation of renewals of delegation tokens
+ *
+ */
+public class TestDelegationTokenRenewal {
+  private static final Log LOG = 
+      LogFactory.getLog(TestDelegationTokenRenewal.class);
+
+  private static Configuration conf;
+ 
+  @BeforeClass
+  public static void setUp() throws Exception {
+    conf = new Configuration();
+    
+    // create a fake FileSystem (MyFS) and assosiate it
+    // with "hdfs" schema.
+    URI uri = new URI(DelegationTokenRenewal.SCHEME+"://localhost:0");
+    System.out.println("scheme is : " + uri.getScheme());
+    conf.setClass("fs." + uri.getScheme() + ".impl", MyFS.class, DistributedFileSystem.class);
+    FileSystem.setDefaultUri(conf, uri);
+    System.out.println("filesystem uri = " + FileSystem.getDefaultUri(conf).toString());
+  }
+  
+  /**
+   * add some extra functionality for testing
+   * 1. toString();
+   * 2. cancel() and isCanceled()
+   */
+  private static class MyToken extends Token<DelegationTokenIdentifier> {
+    public String status = "GOOD";
+    public static final String CANCELED = "CANCELED";
+
+    public MyToken(DelegationTokenIdentifier dtId1,
+        DelegationTokenSecretManager sm) {
+      super(dtId1, sm);
+      status = "GOOD";
+    }
+    
+    public boolean isCanceled() {return status.equals(CANCELED);}
+    public void cancelToken() {this.status=CANCELED;}
+    public String toString() {
+      StringBuilder sb = new StringBuilder(1024);
+      
+      sb.append("id=");
+      String id = StringUtils.byteToHexString(this.getIdentifier());
+      int idLen = id.length();
+      sb.append(id.substring(idLen-6));
+      sb.append(";k=");
+      sb.append(this.getKind());
+      sb.append(";s=");
+      sb.append(this.getService());
+      return sb.toString();
+    }
+  }
+
+  /**
+   * fake FileSystem 
+   * overwrites three methods
+   * 1. getDelegationToken() - generates a token
+   * 2. renewDelegataionToken - counts number of calls, and remembers 
+   * most recently renewed token.
+   * 3. cancelToken -cancels token (subsequent renew will cause IllegalToken 
+   * exception
+   */
+  static class MyFS extends DistributedFileSystem {
+    int counter=0;
+    MyToken token;
+    MyToken tokenToRenewIn2Sec;
+    
+    public MyFS() {}
+    public void close() {}
+    @Override
+    public void initialize(URI uri, Configuration conf) throws IOException {}
+    
+    @Override
+    public long renewDelegationToken(Token<DelegationTokenIdentifier> t)
+    throws InvalidToken, IOException {
+      MyToken token = (MyToken)t;
+      if(token.isCanceled()) {
+        throw new InvalidToken("token has been canceled");
+      }
+      counter ++;
+      this.token = (MyToken)token;
+      System.out.println("Called MYDFS.renewdelegationtoken " + token);
+      if(tokenToRenewIn2Sec == token) { 
+        // this token first renewal in 2 seconds
+        System.out.println("RENEW in 2 seconds");
+        tokenToRenewIn2Sec=null;
+        return 2*1000 + System.currentTimeMillis();
+      } else {
+        return 86400*1000 + System.currentTimeMillis();
+      }
+    }
+    @Override 
+    public MyToken getDelegationToken(Text renewer)
+    throws IOException {
+      System.out.println("Called MYDFS.getdelegationtoken");
+      return createTokens(renewer);
+    }
+    @Override
+    public void cancelDelegationToken(Token<DelegationTokenIdentifier> t)
+    throws IOException {
+      MyToken token = (MyToken)t;
+      token.cancelToken();
+    }
+
+    public void setTokenToRenewIn2Sec(MyToken t) {tokenToRenewIn2Sec=t;}
+    public int getCounter() {return counter; }
+    public MyToken getToken() {return token;}
+  }
+  
+  /**
+   * auxilary - create token
+   * @param renewer
+   * @return
+   * @throws IOException
+   */
+  static MyToken createTokens(Text renewer) 
+    throws IOException {
+    Text user1= new Text("user1");
+    
+    DelegationTokenSecretManager sm = new DelegationTokenSecretManager(
+        DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
+        DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
+        DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT,
+        3600000);
+    sm.startThreads();
+    
+    DelegationTokenIdentifier dtId1 = 
+      new DelegationTokenIdentifier(user1, renewer, user1);
+    
+    MyToken token1 = new MyToken(dtId1, sm);
+    
+   
+    token1.setService(new Text("localhost:0"));
+    return token1;
+  }
+  
+  
+  /**
+   * Basic idea of the test:
+   * 1. create tokens.
+   * 2. Mark one of them to be renewed in 2 seconds (istead of
+   * 24 hourse)
+   * 3. register them for renewal
+   * 4. sleep for 3 seconds
+   * 5. count number of renewals (should 3 initial ones + one extra)
+   * 6. register another token for 2 seconds 
+   * 7. cancel it immediately
+   * 8. Sleep and check that the 2 seconds renew didn't happen 
+   * (totally 5 reneals)
+   * 9. check cancelation
+   * @throws IOException
+   * @throws URISyntaxException
+   */
+  @Test
+  public void testDTRenewal () throws IOException, URISyntaxException {
+    MyFS dfs = (MyFS)FileSystem.get(conf);
+    System.out.println("dfs="+(Object)dfs);
+    // Test 1. - add three tokens - make sure exactly one get's renewed
+    
+    // get the delegation tokens
+    MyToken token1, token2, token3;
+    token1 = dfs.getDelegationToken(new Text("user1"));
+    token2 = dfs.getDelegationToken(new Text("user2"));
+    token3 = dfs.getDelegationToken(new Text("user3"));
+
+    //to cause this one to be set for renew in 2 secs
+    dfs.setTokenToRenewIn2Sec(token1); 
+    System.out.println("token="+token1+" should be renewed for 2 secs");
+    
+    // two distinct Namenodes
+    String nn1 = DelegationTokenRenewal.SCHEME + "://host1:0";
+    String nn2 = DelegationTokenRenewal.SCHEME + "://host2:0";
+    String nn3 = DelegationTokenRenewal.SCHEME + "://host3:0";
+    
+    TokenStorage ts = new TokenStorage();
+    
+    // register the token for renewal
+    ts.addToken(new Text(nn1), token1);
+    ts.addToken(new Text(nn2), token2);
+    ts.addToken(new Text(nn3), token3);
+    
+    // register the tokens for renewal
+    DelegationTokenRenewal.registerDelegationTokensForRenewal(
+        new JobID("job1", 1), ts, conf);
+    // first 3 initial renewals + 1 real
+    int numberOfExpectedRenewals = 3+1; 
+    
+    int attempts = 4;
+    while(attempts-- > 0) {
+      try {
+        Thread.sleep(3*1000); // sleep 3 seconds, so it has time to renew
+      } catch (InterruptedException e) {}
+      
+      // since we cannot guarantee timely execution - let's give few chances
+      if(dfs.getCounter()==numberOfExpectedRenewals)
+        break;
+    }
+    
+    System.out.println("Counter = " + dfs.getCounter() + ";t="+
+        dfs.getToken());
+    assertEquals("renew wasn't called as many times as expected(4):",
+        numberOfExpectedRenewals, dfs.getCounter());
+    assertEquals("most recently renewed token mismatch", dfs.getToken(), 
+        token1);
+    
+    // Test 2. 
+    // add another token ( that expires in 2 secs). Then remove it, before
+    // time is up.
+    // Wait for 3 secs , and make sure no renews were called
+    ts = new TokenStorage();
+    MyToken token4 = dfs.getDelegationToken(new Text("user4"));
+    
+    //to cause this one to be set for renew in 2 secs
+    dfs.setTokenToRenewIn2Sec(token4); 
+    System.out.println("token="+token4+" should be renewed for 2 secs");
+    
+    String nn4 = DelegationTokenRenewal.SCHEME + "://host4:0";
+    ts.addToken(new Text(nn4), token4);
+    
+
+    JobID jid2 = new JobID("job2",1);
+    DelegationTokenRenewal.registerDelegationTokensForRenewal(jid2, ts, conf);
+    DelegationTokenRenewal.removeDelegationTokenRenewalForJob(jid2);
+    numberOfExpectedRenewals++; // one more initial renewal
+    attempts = 4;
+    while(attempts-- > 0) {
+      try {
+        Thread.sleep(3*1000); // sleep 3 seconds, so it has time to renew
+      } catch (InterruptedException e) {}
+      // since we cannot guarantee timely execution - let's give few chances
+      if(dfs.getCounter()==numberOfExpectedRenewals)
+        break;
+    }
+    System.out.println("Counter = " + dfs.getCounter() + ";t="+dfs.getToken());
+    
+    // counter and the token should stil be the old ones
+    assertEquals("renew wasn't called as many times as expected",
+        numberOfExpectedRenewals, dfs.getCounter());
+    
+    // also renewing of the cancelled token should fail
+    boolean exception=false;
+    try {
+      dfs.renewDelegationToken(token4);
+    } catch (InvalidToken ite) {
+      //expected
+      exception = true;
+    }
+    assertTrue("Renew of canceled token didn't fail", exception);
+  }
+}



Mime
View raw message