tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject [2/2] git commit: TEZ-1596. Secure Shuffle utils is extremely expensive for fast queries (Gopal via Rajesh Balamohan) (Cherry picked from commit f801fa01d81ed0c0b7c1b39fbd551c5f86a8dda0)
Date Mon, 27 Oct 2014 23:15:51 GMT
TEZ-1596. Secure Shuffle utils is extremely expensive for fast queries (Gopal via Rajesh Balamohan)
(Cherry picked from commit f801fa01d81ed0c0b7c1b39fbd551c5f86a8dda0)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/11d35d9f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/11d35d9f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/11d35d9f

Branch: refs/heads/branch-0.5
Commit: 11d35d9fd72577931642abe387600bba30e90f3b
Parents: 5cc6ee6
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Tue Oct 28 04:44:33 2014 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Tue Oct 28 04:44:59 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../common/security/JobTokenSecretManager.java  | 40 +++++++++++++++++---
 .../common/security/SecureShuffleUtils.java     | 36 +++++++++---------
 .../runtime/library/common/shuffle/Fetcher.java | 19 +++++-----
 .../library/common/shuffle/HttpConnection.java  | 14 +++----
 .../common/shuffle/impl/ShuffleManager.java     |  8 ++--
 .../orderedgrouped/FetcherOrderedGrouped.java   | 11 +++---
 .../common/shuffle/orderedgrouped/Shuffle.java  |  5 ++-
 .../shuffle/orderedgrouped/TestFetcher.java     |  5 ++-
 9 files changed, 87 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/11d35d9f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e0561a3..a7e070b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -58,6 +58,7 @@ ALL CHANGES:
   TEZ-1590. Fetchers should not report failures after the Processor on the task completes.
   TEZ-1542. Fix a Local Mode crash on concurrentModificationException.
   TEZ-1638. Fix Missing type parametrization in runtime Input/Output configs.
+  TEZ-1596. Secure Shuffle utils is extremely expensive for fast queries.
 
 Release 0.5.1: 2014-10-02
 

http://git-wip-us.apache.org/repos/asf/tez/blob/11d35d9f/tez-api/src/main/java/org/apache/tez/common/security/JobTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/security/JobTokenSecretManager.java
b/tez-api/src/main/java/org/apache/tez/common/security/JobTokenSecretManager.java
index d793b82..785613e 100644
--- a/tez-api/src/main/java/org/apache/tez/common/security/JobTokenSecretManager.java
+++ b/tez-api/src/main/java/org/apache/tez/common/security/JobTokenSecretManager.java
@@ -18,9 +18,12 @@
 
 package org.apache.tez.common.security;
 
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
 import java.util.Map;
 import java.util.TreeMap;
 
+import javax.crypto.Mac;
 import javax.crypto.SecretKey;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -34,8 +37,10 @@ import org.apache.hadoop.security.token.Token;
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class JobTokenSecretManager extends SecretManager<JobTokenIdentifier> {
+  private static final String DEFAULT_HMAC_ALGORITHM = "HmacSHA1";
   private final SecretKey masterKey;
   private final Map<String, SecretKey> currentJobTokens;
+  private final Mac mac;
 
   /**
    * Convert the byte[] to a secret key
@@ -45,7 +50,7 @@ public class JobTokenSecretManager extends SecretManager<JobTokenIdentifier>
{
   public static SecretKey createSecretKey(byte[] key) {
     return SecretManager.createSecretKey(key);
   }
-  
+
   /**
    * Compute the HMAC hash of the message using the key
    * @param msg the message to hash
@@ -55,15 +60,39 @@ public class JobTokenSecretManager extends SecretManager<JobTokenIdentifier>
{
   public static byte[] computeHash(byte[] msg, SecretKey key) {
     return createPassword(msg, key);
   }
-  
+
+  /**
+   * Compute the HMAC hash of the message using the key
+   * @param msg the message to hash
+   * @return the computed hash
+   */
+  public byte[] computeHash(byte[] msg) {
+    synchronized(mac) {
+      return mac.doFinal(msg);
+    }
+  }
+
   /**
    * Default constructor
    */
   public JobTokenSecretManager() {
-    this.masterKey = generateSecret();
+    this(null);
+  }
+
+  public JobTokenSecretManager(SecretKey key) {
+    this.masterKey = (key == null) ? generateSecret() : key;
     this.currentJobTokens = new TreeMap<String, SecretKey>();
+    try {
+      mac = Mac.getInstance(DEFAULT_HMAC_ALGORITHM);
+      mac.init(masterKey);
+    } catch (NoSuchAlgorithmException nsa) {
+      throw new IllegalArgumentException("Can't find " + DEFAULT_HMAC_ALGORITHM + " algorithm.",
nsa);
+    } catch (InvalidKeyException ike) {
+      throw new IllegalArgumentException("Invalid key to HMAC computation", ike);
+    }
   }
-  
+
+
   /**
    * Create a new password/secret for the given job token identifier.
    * @param identifier the job token identifier
@@ -71,8 +100,7 @@ public class JobTokenSecretManager extends SecretManager<JobTokenIdentifier>
{
    */
   @Override
   public byte[] createPassword(JobTokenIdentifier identifier) {
-    byte[] result = createPassword(identifier.getBytes(), masterKey);
-    return result;
+    return computeHash(identifier.getBytes());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/11d35d9f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
index cf1fbba..b1424c0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
@@ -49,7 +49,7 @@ public class SecureShuffleUtils {
   public static String generateHash(byte[] msg, SecretKey key) {
     return new String(Base64.encodeBase64(generateByteHash(msg, key)));
   }
-  
+
   /**
    * calculate hash of msg
    * @param msg
@@ -58,41 +58,43 @@ public class SecureShuffleUtils {
   private static byte[] generateByteHash(byte[] msg, SecretKey key) {
     return JobTokenSecretManager.computeHash(msg, key);
   }
-  
+
   /**
    * verify that hash equals to HMacHash(msg)
-   * @param newHash
-   * @return true if is the same
+   * @param hash
+   * @param msg
+   * @param mgr JobTokenSecretManager
+   * @return true when hashes match; false otherwise
    */
-  private static boolean verifyHash(byte[] hash, byte[] msg, SecretKey key) {
-    byte[] msg_hash = generateByteHash(msg, key);
+  private static boolean verifyHash(byte[] hash, byte[] msg, JobTokenSecretManager mgr) {
+    byte[] msg_hash = mgr.computeHash(msg);
     return WritableComparator.compareBytes(msg_hash, 0, msg_hash.length, hash, 0, hash.length)
== 0;
   }
-  
+
   /**
    * Aux util to calculate hash of a String
    * @param enc_str
-   * @param key
+   * @param mgr JobTokenSecretManager
    * @return Base64 encodedHash
    * @throws IOException
    */
-  public static String hashFromString(String enc_str, SecretKey key) 
-  throws IOException {
-    return generateHash(enc_str.getBytes(), key); 
+  public static String hashFromString(String enc_str, JobTokenSecretManager mgr)
+      throws IOException {
+    return new String(Base64.encodeBase64(mgr.computeHash(enc_str.getBytes())));
   }
   
   /**
-   * verify that base64Hash is same as HMacHash(msg)  
+   * verify that base64Hash is same as HMacHash(msg)
    * @param base64Hash (Base64 encoded hash)
    * @param msg
    * @throws IOException if not the same
    */
-  public static void verifyReply(String base64Hash, String msg, SecretKey key)
-  throws IOException {
+  public static void verifyReply(String base64Hash, String msg, JobTokenSecretManager mgr)
+      throws IOException {
     byte[] hash = Base64.decodeBase64(base64Hash.getBytes());
-    
-    boolean res = verifyHash(hash, msg.getBytes(), key);
-    
+
+    boolean res = verifyHash(hash, msg.getBytes(), mgr);
+
     if(res != true) {
       throw new IOException("Verification of the hashReply failed");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/11d35d9f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 2dea4d3..6875966 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -38,8 +38,6 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import javax.crypto.SecretKey;
-
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.commons.logging.Log;
@@ -52,6 +50,7 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.Constants;
@@ -82,7 +81,7 @@ public class Fetcher implements Callable<FetchResult> {
   private boolean ifileReadAhead = TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT;
   private int ifileReadAheadLength = TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT;
   
-  private final SecretKey shuffleSecret;
+  private final JobTokenSecretManager jobTokenSecretMgr;
 
   private final FetcherCallback fetcherCallback;
   private final FetchedInputAllocator inputManager;
@@ -124,7 +123,7 @@ public class Fetcher implements Callable<FetchResult> {
 
   private Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params,
       FetchedInputAllocator inputManager, ApplicationId appId,
-      SecretKey shuffleSecret, String srcNameTrimmed, Configuration conf,
+      JobTokenSecretManager jobTokenSecretManager, String srcNameTrimmed, Configuration conf,
       RawLocalFileSystem localFs,
       LocalDirAllocator localDirAllocator,
       Path lockPath,
@@ -132,7 +131,7 @@ public class Fetcher implements Callable<FetchResult> {
       boolean sharedFetchEnabled) {
     this.fetcherCallback = fetcherCallback;
     this.inputManager = inputManager;
-    this.shuffleSecret = shuffleSecret;
+    this.jobTokenSecretMgr = jobTokenSecretManager;
     this.appId = appId;
     this.pathToAttemptMap = new HashMap<String, InputAttemptIdentifier>();
     this.httpConnectionParams = params;
@@ -402,7 +401,7 @@ public class Fetcher implements Callable<FetchResult> {
       this.url = ShuffleUtils.constructInputURL(baseURI.toString(), attempts,
           httpConnectionParams.getKeepAlive());
 
-      httpConnection = new HttpConnection(url, httpConnectionParams, logIdentifier, shuffleSecret);
+      httpConnection = new HttpConnection(url, httpConnectionParams, logIdentifier, jobTokenSecretMgr);
       httpConnection.connect();
     } catch (IOException e) {
       // ioErrs.increment(1);
@@ -898,21 +897,21 @@ public class Fetcher implements Callable<FetchResult> {
 
     public FetcherBuilder(FetcherCallback fetcherCallback,
         HttpConnectionParams params, FetchedInputAllocator inputManager,
-        ApplicationId appId, SecretKey shuffleSecret, String srcNameTrimmed,
+        ApplicationId appId, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
         Configuration conf, boolean localDiskFetchEnabled) {
       this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
-          shuffleSecret, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled,
+          jobTokenSecretMgr, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled,
           false);
     }
 
     public FetcherBuilder(FetcherCallback fetcherCallback,
         HttpConnectionParams params, FetchedInputAllocator inputManager,
-        ApplicationId appId, SecretKey shuffleSecret, String srcNameTrimmed,
+        ApplicationId appId, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
         Configuration conf, RawLocalFileSystem localFs,
         LocalDirAllocator localDirAllocator, Path lockPath,
         boolean localDiskFetchEnabled, boolean sharedFetchEnabled) {
       this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
-          shuffleSecret, srcNameTrimmed, conf, localFs, localDirAllocator,
+          jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
           lockPath, localDiskFetchEnabled, sharedFetchEnabled);
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/11d35d9f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
index a9356e9..5407fe4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
@@ -27,7 +27,6 @@ import java.net.URL;
 import java.security.GeneralSecurityException;
 import java.util.concurrent.TimeUnit;
 
-import javax.crypto.SecretKey;
 import javax.net.ssl.HttpsURLConnection;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -37,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.ssl.SSLFactory;
+import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
 
@@ -65,7 +65,7 @@ public class HttpConnection {
   private boolean connectionSucceeed;
   private volatile boolean cleanup;
 
-  private final SecretKey jobTokenSecret;
+  private final JobTokenSecretManager jobTokenSecretMgr;
   private String encHash;
   private String msgToEncode;
 
@@ -78,13 +78,13 @@ public class HttpConnection {
    * @param url
    * @param connParams
    * @param logIdentifier
-   * @param jobTokenSecret
+   * @param jobTokenSecretManager
    * @throws IOException
    */
   public HttpConnection(URL url, HttpConnectionParams connParams,
-      String logIdentifier, SecretKey jobTokenSecret) throws IOException {
+      String logIdentifier, JobTokenSecretManager jobTokenSecretManager) throws IOException
{
     this.logIdentifier = logIdentifier;
-    this.jobTokenSecret = jobTokenSecret;
+    this.jobTokenSecretMgr = jobTokenSecretManager;
     this.httpConnParams = connParams;
     this.url = url;
     this.stopWatch = new Stopwatch();
@@ -107,7 +107,7 @@ public class HttpConnection {
     }
     // generate hash of the url
     msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
-    encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecret);
+    encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecretMgr);
 
     // put url hash into http header
     connection.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH,
@@ -217,7 +217,7 @@ public class HttpConnection {
           + replyHash);
     }
     // verify that replyHash is HMac of encHash
-    SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
+    SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecretMgr);
     LOG.info("for url=" + url +
       " sent hash and receievd reply " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + "
ms");
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/11d35d9f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index bae2bd6..57276b2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -54,6 +54,7 @@ import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Event;
@@ -126,7 +127,7 @@ public class ShuffleManager implements FetcherCallback {
   private final int numFetchers;
   
   // Parameters required by Fetchers
-  private final SecretKey shuffleSecret;
+  private final JobTokenSecretManager jobTokenSecretMgr;
   private final CompressionCodec codec;
   private final boolean localDiskFetchEnabled;
   private final boolean sharedFetchEnabled;
@@ -212,9 +213,10 @@ public class ShuffleManager implements FetcherCallback {
     this.startTime = System.currentTimeMillis();
     this.lastProgressTime = startTime;
     
-    this.shuffleSecret = ShuffleUtils
+    SecretKey shuffleSecret = ShuffleUtils
         .getJobTokenSecretFromTokenBytes(inputContext
             .getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
+    this.jobTokenSecretMgr = new JobTokenSecretManager(shuffleSecret);
     httpConnectionParams =
         ShuffleUtils.constructHttpShuffleConnectionParams(conf);
 
@@ -343,7 +345,7 @@ public class ShuffleManager implements FetcherCallback {
 
     FetcherBuilder fetcherBuilder = new FetcherBuilder(ShuffleManager.this,
       httpConnectionParams, inputManager, inputContext.getApplicationId(),
-        shuffleSecret, srcNameTrimmed, conf, localFs, localDirAllocator,
+        jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
         lockDisk, localDiskFetchEnabled, sharedFetchEnabled);
 
     if (codec != null) {

http://git-wip-us.apache.org/repos/asf/tez/blob/11d35d9f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 911f4f4..2b5a863 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -29,8 +29,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 
-import javax.crypto.SecretKey;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -42,6 +40,7 @@ import org.apache.tez.common.TezUtilsInternal;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
@@ -82,7 +81,7 @@ class FetcherOrderedGrouped extends Thread {
 
   // Decompression of map-outputs
   private final CompressionCodec codec;
-  private final SecretKey jobTokenSecret;
+  private final JobTokenSecretManager jobTokenSecretManager;
 
   @VisibleForTesting
   volatile boolean stopped = false;
@@ -105,7 +104,7 @@ class FetcherOrderedGrouped extends Thread {
   public FetcherOrderedGrouped(HttpConnectionParams httpConnectionParams,
                                ShuffleScheduler scheduler, MergeManager merger,
                                ShuffleClientMetrics metrics,
-                               Shuffle shuffle, SecretKey jobTokenSecret,
+                               Shuffle shuffle, JobTokenSecretManager jobTokenSecretMgr,
                                boolean ifileReadAhead, int ifileReadAheadLength,
                                CompressionCodec codec,
                                InputContext inputContext, Configuration conf,
@@ -116,7 +115,7 @@ class FetcherOrderedGrouped extends Thread {
     this.metrics = metrics;
     this.shuffle = shuffle;
     this.id = ++nextId;
-    this.jobTokenSecret = jobTokenSecret;
+    this.jobTokenSecretManager = jobTokenSecretMgr;
     ioErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
         ShuffleErrors.IO_ERROR.toString());
     wrongLengthErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
@@ -321,7 +320,7 @@ class FetcherOrderedGrouped extends Thread {
       URL url = ShuffleUtils.constructInputURL(host.getBaseUrl(), attempts,
           httpConnectionParams.getKeepAlive());
       httpConnection = new HttpConnection(url, httpConnectionParams,
-          logIdentifier, jobTokenSecret);
+          logIdentifier, jobTokenSecretManager);
       connectSucceeded = httpConnection.connect();
 
       if (stopped) {

http://git-wip-us.apache.org/repos/asf/tez/blob/11d35d9f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index e4641fc..9f59cb2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -44,6 +44,7 @@ import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.runtime.api.Event;
@@ -88,6 +89,7 @@ public class Shuffle implements ExceptionReporter {
   private final MergeManager merger;
 
   private final SecretKey jobTokenSecret;
+  private final JobTokenSecretManager jobTokenSecretMgr;
   private final CompressionCodec codec;
   private final boolean ifileReadAhead;
   private final int ifileReadAheadLength;
@@ -126,6 +128,7 @@ public class Shuffle implements ExceptionReporter {
     this.jobTokenSecret = ShuffleUtils
         .getJobTokenSecretFromTokenBytes(inputContext
             .getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
+    this.jobTokenSecretMgr = new JobTokenSecretManager(jobTokenSecret);
     
     if (ConfigUtils.isIntermediateInputCompressed(conf)) {
       Class<? extends CompressionCodec> codecClass =
@@ -321,7 +324,7 @@ public class Shuffle implements ExceptionReporter {
         for (int i = 0; i < numFetchers; ++i) {
           FetcherOrderedGrouped
               fetcher = new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger,
-            metrics, Shuffle.this, jobTokenSecret, ifileReadAhead, ifileReadAheadLength,
+            metrics, Shuffle.this, jobTokenSecretMgr, ifileReadAhead, ifileReadAheadLength,
             codec, inputContext, conf, localDiskFetchEnabled);
           fetchers.add(fetcher);
           fetcher.start();

http://git-wip-us.apache.org/repos/asf/tez/blob/11d35d9f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index b3021dd..9915cbc 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -47,6 +47,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
@@ -182,9 +183,9 @@ public class TestFetcher {
   static class FakeHttpConnection extends HttpConnection {
 
     public FakeHttpConnection(URL url,
-        HttpConnectionParams connParams, String logIdentifier, SecretKey jobTokenSecret)
+        HttpConnectionParams connParams, String logIdentifier, JobTokenSecretManager jobTokenSecretMgr)
         throws IOException {
-      super(url, connParams, logIdentifier, jobTokenSecret);
+      super(url, connParams, logIdentifier, jobTokenSecretMgr);
       this.connection = mock(HttpURLConnection.class);
       when(connection.getResponseCode()).thenReturn(200);
       when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))


Mime
View raw message