hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r1131265 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java src/test/findbugsExcludeFile.xml
Date Fri, 03 Jun 2011 22:18:06 GMT
Author: ddas
Date: Fri Jun  3 22:18:06 2011
New Revision: 1131265

URL: http://svn.apache.org/viewvc?rev=1131265&view=rev
Log:
MAPREDUCE-2452. Makes the cancellation of delegation tokens happen in a separate thread. Contributed
by Devaraj Das.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
    hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1131265&r1=1131264&r2=1131265&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jun  3 22:18:06 2011
@@ -155,6 +155,9 @@ Trunk (unreleased changes)
     MAPREDUCE-2494. Order distributed cache deletions by LRU. (Robert Joseph
     Evans via cdouglas)
 
+    MAPREDUCE-2452. Makes the cancellation of delegation tokens happen in a 
+    separate thread. (ddas)
+
   OPTIMIZATIONS
 
     MAPREDUCE-2026. Make JobTracker.getJobCounters() and

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java?rev=1131265&r1=1131264&r2=1131265&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
Fri Jun  3 22:18:06 2011
@@ -30,6 +30,7 @@ import java.util.Iterator;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -49,6 +50,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.util.StringUtils;
 
 
 @InterfaceAudience.Private
@@ -107,10 +109,87 @@ public class DelegationTokenRenewal {
   // global single timer (daemon)
   private static Timer renewalTimer = new Timer(true);
   
+  //delegation token canceler thread
+  private static DelegationTokenCancelThread dtCancelThread =
+    new DelegationTokenCancelThread();
+  static {
+    dtCancelThread.start();
+  }
+
+  
   //managing the list of tokens using Map
   // jobId=>List<tokens>
   private static Set<DelegationTokenToRenew> delegationTokens = 
     Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>());
+  
+  private static class DelegationTokenCancelThread extends Thread {
+    private static class TokenWithConf {
+      Token<DelegationTokenIdentifier> token;
+      Configuration conf;
+      TokenWithConf(Token<DelegationTokenIdentifier> token,  
+          Configuration conf) {
+        this.token = token;
+        this.conf = conf;
+      }
+    }
+    private LinkedBlockingQueue<TokenWithConf> queue =  
+      new LinkedBlockingQueue<TokenWithConf>();
+     
+    public DelegationTokenCancelThread() {
+      super("Delegation Token Canceler");
+      setDaemon(true);
+    }
+    public void cancelToken(Token<DelegationTokenIdentifier> token,  
+        Configuration conf) {
+      TokenWithConf tokenWithConf = new TokenWithConf(token, conf);
+      while (!queue.offer(tokenWithConf)) {
+        LOG.warn("Unable to add token " + token + " for cancellation. " +
+        		 "Will retry..");
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+    public void run() {
+      while (true) {
+        TokenWithConf tokenWithConf = null;
+        try {
+          tokenWithConf = queue.take();
+          DistributedFileSystem dfs = null;
+          try {
+            // do it over rpc. For that we need DFS object
+            dfs = getDFSForToken(tokenWithConf.token, tokenWithConf.conf);
+          } catch (Exception e) {
+            LOG.info("couldn't get DFS to cancel. Will retry over HTTPS");
+            dfs = null;
+          }
+      
+          if(dfs != null) {
+            dfs.cancelDelegationToken(tokenWithConf.token);
+          } else {
+            cancelDelegationTokenOverHttps(tokenWithConf.token, 
+                                           tokenWithConf.conf);
+          }
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Canceling token " + tokenWithConf.token.getService() +  
+                " for dfs=" + dfs);
+          }
+        } catch (IOException e) {
+          LOG.warn("Failed to cancel token " + tokenWithConf.token + " " +  
+              StringUtils.stringifyException(e));
+        } catch (InterruptedException ie) {
+          return;
+        } catch (Throwable t) {
+          LOG.warn("Got exception " + StringUtils.stringifyException(t) + 
+                   ". Exiting..");
+          System.exit(-1);
+        }
+      }
+    }
+  }
   //adding token
   private static void addTokenToList(DelegationTokenToRenew t) {
     delegationTokens.add(t);
@@ -337,24 +416,7 @@ public class DelegationTokenRenewal {
     Configuration conf = t.conf;
     
     if(token.getKind().equals(kindHdfs)) {
-      DistributedFileSystem dfs = null;
-      try {
-        // do it over rpc. For that we need DFS object
-        dfs = getDFSForToken(token, conf);
-      } catch (Exception e) {
-        LOG.info("couldn't get DFS to cancel. Will retry over HTTPS");
-        dfs = null;
-      }
-      
-      try {
-        if(dfs != null) {
-          dfs.cancelDelegationToken(token);
-        } else {
-          cancelDelegationTokenOverHttps(token,conf);
-        }
-      } catch (Exception e) {
-        LOG.warn("Failed to cancel " + token, e);
-      }
+      dtCancelThread.cancelToken(token, conf);
     }
   }
   

Modified: hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml?rev=1131265&r1=1131264&r2=1131265&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml Fri Jun  3 22:18:06 2011
@@ -388,4 +388,9 @@
        <Field name="started" />
        <Bug pattern="IS2_INCONSISTENT_SYNC" />
     </Match>
+    <Match>
+       <Class name="org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal$DelegationTokenCancelThread"
/>
+       <Method name="run" />
+       <Bug pattern="DM_EXIT" />
+    </Match>
  </FindBugsFilter>



Mime
View raw message