tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-1114. Fix encrypted shuffle. Contributed by Rajesh Balamohan.
Date Mon, 19 May 2014 22:29:30 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master e5a825b68 -> 62bace35c


TEZ-1114. Fix encrypted shuffle. Contributed by Rajesh Balamohan.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/62bace35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/62bace35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/62bace35

Branch: refs/heads/master
Commit: 62bace35c2938eafd840547cebe2670c37787da1
Parents: e5a825b
Author: Siddharth Seth <sseth@apache.org>
Authored: Mon May 19 15:28:55 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Mon May 19 15:28:55 2014 -0700

----------------------------------------------------------------------
 .../library/common/shuffle/impl/Fetcher.java    |  32 +----
 .../library/common/shuffle/impl/Shuffle.java    |  23 ++-
 .../shuffle/impl/ShuffleInputEventHandler.java  |   8 +-
 .../runtime/library/shuffle/common/Fetcher.java |  28 +---
 .../library/shuffle/common/HttpConnection.java  |  61 +++++---
 .../library/shuffle/common/ShuffleUtils.java    |  22 ++-
 .../shuffle/common/impl/ShuffleManager.java     |  19 ++-
 .../org/apache/tez/test/TestSecureShuffle.java  | 141 +++++++++++++++++++
 8 files changed, 239 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/62bace35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index 4f4858c..5d5f58e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -30,10 +30,7 @@ import javax.crypto.SecretKey;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.security.ssl.SSLFactory;
-import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.runtime.api.TezInputContext;
@@ -75,10 +72,6 @@ class Fetcher extends Thread {
   
   private final boolean ifileReadAhead;
   private final int ifileReadAheadLength;
-
-  private static boolean sslShuffle;
-  private static SSLFactory sslFactory;
-  
   private LinkedHashSet<InputAttemptIdentifier> remaining;
 
   volatile HttpURLConnection connection;
@@ -87,7 +80,7 @@ class Fetcher extends Thread {
   HttpConnection httpConnection;
   HttpConnectionParams httpConnectionParams;
   
-  public Fetcher(Configuration job, 
+  public Fetcher(HttpConnectionParams httpConnectionParams,
       ShuffleScheduler scheduler, MergeManager merger,
       ShuffleClientMetrics metrics,
       Shuffle shuffle, SecretKey jobTokenSecret,
@@ -115,8 +108,7 @@ class Fetcher extends Thread {
 
     this.ifileReadAhead = ifileReadAhead;
     this.ifileReadAheadLength = ifileReadAheadLength;
-    this.httpConnectionParams = ShuffleUtils.constructHttpShuffleConnectionParams(job);
-    
+    this.httpConnectionParams = httpConnectionParams;
     if (codec != null) {
       this.codec = codec;
     } else {
@@ -126,20 +118,6 @@ class Fetcher extends Thread {
     this.logIdentifier = "fetcher [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName())
+ "] #" + id;
     setName(logIdentifier);
     setDaemon(true);
-
-    synchronized (Fetcher.class) {
-      sslShuffle = job.getBoolean(TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
-          TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_ENABLE_SSL);
-      if (sslShuffle && sslFactory == null) {
-        sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, job);
-        try {
-          sslFactory.init();
-        } catch (Exception ex) {
-          sslFactory.destroy();
-          throw new RuntimeException(ex);
-        }
-      }
-    }
   }  
 
   public void run() {
@@ -181,9 +159,6 @@ class Fetcher extends Thread {
     } catch (InterruptedException ie) {
       LOG.warn("Got interrupt while joining " + getName(), ie);
     }
-    if (sslFactory != null) {
-      sslFactory.destroy();
-    }
   }
 
   private Object cleanupLock = new Object();
@@ -240,9 +215,6 @@ class Fetcher extends Thread {
         httpConnectionParams.getKeepAlive());
       httpConnection = new HttpConnection(url, httpConnectionParams,
         logIdentifier, jobTokenSecret);
-      if (sslShuffle) {
-        httpConnection.setSSLFactory(sslFactory);
-      }
       connectSucceeded = httpConnection.connect();
       
       if (stopped) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/62bace35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
index b055f16..667303a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
@@ -52,6 +52,8 @@ import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.combine.Combiner;
 import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
 import org.apache.tez.runtime.library.exceptions.InputAlreadyClosedException;
+import org.apache.tez.runtime.library.shuffle.common.HttpConnection;
+import org.apache.tez.runtime.library.shuffle.common.HttpConnection.HttpConnectionParams;
 import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 
 import com.google.common.base.Preconditions;
@@ -99,6 +101,7 @@ public class Shuffle implements ExceptionReporter {
   private final String srcNameTrimmed;
   
   private final List<Fetcher> fetchers;
+  private final HttpConnectionParams httpConnectionParams;
   
   private AtomicBoolean isShutDown = new AtomicBoolean(false);
   private AtomicBoolean fetchersClosed = new AtomicBoolean(false);
@@ -109,7 +112,8 @@ public class Shuffle implements ExceptionReporter {
       long initialMemoryAvailable) throws IOException {
     this.inputContext = inputContext;
     this.conf = conf;
-
+    this.httpConnectionParams =
+        ShuffleUtils.constructHttpShuffleConnectionParams(conf);
     this.metrics = new ShuffleClientMetrics(inputContext.getDAGName(),
         inputContext.getTaskVertexName(), inputContext.getTaskIndex(),
         this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
@@ -168,6 +172,8 @@ public class Shuffle implements ExceptionReporter {
         + (codec == null ? "None" : codec.getClass().getName()) + 
         "ifileReadAhead: " + ifileReadAhead);
 
+    boolean sslShuffle = conf.getBoolean(TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
+      TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_ENABLE_SSL);
     scheduler = new ShuffleScheduler(
           this.inputContext,
           this.conf,
@@ -180,8 +186,9 @@ public class Shuffle implements ExceptionReporter {
           bytesShuffedToDisk,
           bytesShuffedToMem);
     eventHandler= new ShuffleInputEventHandler(
-          inputContext,
-          scheduler);
+      inputContext,
+      scheduler,
+      sslShuffle);
     merger = new MergeManager(
           this.conf,
           localFS,
@@ -302,8 +309,9 @@ public class Shuffle implements ExceptionReporter {
 
       synchronized (this) {
         for (int i = 0; i < numFetchers; ++i) {
-          Fetcher fetcher = new Fetcher(conf, scheduler, merger, metrics, Shuffle.this, jobTokenSecret,
-              ifileReadAhead, ifileReadAheadLength, codec, inputContext);
+          Fetcher fetcher = new Fetcher(httpConnectionParams, scheduler, merger,
+            metrics, Shuffle.this, jobTokenSecret, ifileReadAhead, ifileReadAheadLength,
+            codec, inputContext);
           fetchers.add(fetcher);
           fetcher.start();
         }
@@ -325,7 +333,6 @@ public class Shuffle implements ExceptionReporter {
       // stop the scheduler
       cleanupShuffleScheduler(false);
 
-
       // Finish the on-going merges...
       TezRawKeyValueIterator kvIter = null;
       try {
@@ -369,6 +376,10 @@ public class Shuffle implements ExceptionReporter {
         }
       }
       fetchers.clear();
+      //All threads are shutdown.  It is safe to shutdown SSL factory
+      if (httpConnectionParams.isSSLShuffleEnabled()) {
+        HttpConnection.cleanupSSLFactory();
+      }
       // throw only the first exception while attempting to shutdown.
       if (ie != null) {
         throw ie;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/62bace35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
index a045fc4..e37bbe3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -45,11 +45,13 @@ public class ShuffleInputEventHandler {
   private final TezInputContext inputContext;
 
   private int maxMapRuntime = 0;
-  
+  private final boolean sslShuffle;
+
   public ShuffleInputEventHandler(TezInputContext inputContext,
-      ShuffleScheduler scheduler) {
+      ShuffleScheduler scheduler, boolean sslShuffle) {
     this.inputContext = inputContext;
     this.scheduler = scheduler;
+    this.sslShuffle = sslShuffle;
   }
 
   public void handleEvents(List<Event> events) {
@@ -114,7 +116,7 @@ public class ShuffleInputEventHandler {
   // TODO NEWTEZ Handle encrypted shuffle
   private URI getBaseURI(String host, int port, int partitionId) {
     StringBuilder sb = ShuffleUtils.constructBaseURIForShuffleHandler(host, port,
-      partitionId, inputContext.getApplicationId().toString());
+      partitionId, inputContext.getApplicationId().toString(), sslShuffle);
     URI u = URI.create(sb.toString());
     return u;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/62bace35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
index 00fc55a..c0ac632 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
@@ -72,12 +72,8 @@ public class Fetcher implements Callable<FetchResult> {
   
   private final AtomicBoolean isShutDown = new AtomicBoolean(false);
 
-  private static boolean sslShuffle = false;
-  private static SSLFactory sslFactory;
-  private static boolean sslFactoryInited;
-
   private final int fetcherIdentifier;
-  
+
   // Parameters to track work.
   private List<InputAttemptIdentifier> srcAttempts;
   private String host;
@@ -106,26 +102,6 @@ public class Fetcher implements Callable<FetchResult> {
 
     this.fetcherIdentifier = fetcherIdGen.getAndIncrement();
     this.logIdentifier = "fetcher [" + srcNameTrimmed +"] " + fetcherIdentifier;
-
-    // TODO NEWTEZ Ideally, move this out from here into a static initializer block.
-    // Re-enable when ssl shuffle support is needed.
-//    synchronized (Fetcher.class) {
-//      if (!sslFactoryInited) {
-//        sslFactoryInited = true;
-//        sslShuffle = conf.getBoolean(
-//            TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
-//            TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_ENABLE_SSL);
-//        if (sslShuffle) {
-//          sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
-//          try {
-//            sslFactory.init();
-//          } catch (Exception ex) {
-//            sslFactory.destroy();
-//            throw new RuntimeException(ex);
-//          }
-//        }
-//      }
-//    }
   }
 
   @Override
@@ -142,7 +118,7 @@ public class Fetcher implements Callable<FetchResult> {
 
     try {
       StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host,
-        port, partition, appId.toString());
+        port, partition, appId.toString(), httpConnectionParams.isSSLShuffleEnabled());
       this.url = ShuffleUtils.constructInputURL(baseURI.toString(), srcAttempts,
         httpConnectionParams.getKeepAlive());
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/62bace35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/HttpConnection.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/HttpConnection.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/HttpConnection.java
index c50ce27..5b56d5b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/HttpConnection.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/HttpConnection.java
@@ -31,6 +31,7 @@ import javax.net.ssl.HttpsURLConnection;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.ssl.SSLFactory;
@@ -50,7 +51,9 @@ public class HttpConnection {
   private URL url;
   private final String logIdentifier;
 
-  private SSLFactory sslFactory;
+  //Shared by many threads
+  private static SSLFactory sslFactory;
+
   private HttpURLConnection connection;
   private DataInputStream input;
 
@@ -83,15 +86,6 @@ public class HttpConnection {
     }
   }
 
-  /**
-   * Set ssl factory
-   * 
-   * @param factory
-   */
-  public void setSSLFactory(SSLFactory factory) {
-    this.sslFactory = factory;
-  }
-
   private void setupConnection() throws IOException {
     connection = (HttpURLConnection) url.openConnection();
     if (sslFactory != null) {
@@ -225,6 +219,16 @@ public class HttpConnection {
   }
 
   /**
+   * Cleanup ssl factory. Should be called after all threads are shutdown.
+   */
+  public synchronized static void cleanupSSLFactory() {
+    if (sslFactory != null) {
+      sslFactory.destroy();
+      sslFactory = null;
+    }
+  }
+
+  /**
    * Cleanup the connection.
    * 
    * @param disconnect
@@ -283,7 +287,6 @@ public class HttpConnection {
     private int readTimeout;
     private int bufferSize;
     private boolean sslShuffle;
-    private SSLFactory sslFactory;
 
     public boolean getKeepAlive() {
       return keepAlive;
@@ -313,8 +316,15 @@ public class HttpConnection {
       return sslShuffle;
     }
 
-    public SSLFactory getSSLFactory() {
-      return sslFactory;
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("keepAlive=").append(keepAlive).append(", ");
+      sb.append("keepAliveMaxConnections=").append(keepAliveMaxConnections).append(", ");
+      sb.append("connectionTimeout=").append(connectionTimeout).append(", ");
+      sb.append("readTimeout=").append(readTimeout).append(", ");
+      sb.append("bufferSize=").append(bufferSize).append(", ");
+      sb.append("sslShuffle=").append(sslShuffle);
+      return sb.toString();
     }
   }
 
@@ -339,10 +349,27 @@ public class HttpConnection {
       return this;
     }
 
-    public HttpConnectionParamsBuilder setSSL(boolean sslEnabled,
-        SSLFactory sslFactory) {
-      params.sslShuffle = true;
-      params.sslFactory = sslFactory;
+    public synchronized HttpConnectionParamsBuilder setSSL(boolean sslEnabled,
+        Configuration conf) {
+      params.sslShuffle = sslEnabled;
+      if(sslEnabled) {
+        //Create sslFactory if it is null or if it was destroyed earlier
+        if (sslFactory == null || sslFactory.getKeystoresFactory()
+            .getTrustManagers() == null) {
+          LOG.info("Initializing SSL factory in HttpConnection");
+          sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
+          try {
+            sslFactory.init();
+          } catch (Exception ex) {
+            sslFactory.destroy();
+            sslFactory = null;
+            throw new RuntimeException(ex);
+          }
+        }
+      } else {
+        //Defensive: In case SSL was initied earlier, clean it up.
+        cleanupSSLFactory();
+      }
       return this;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/62bace35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
index 51471cf..ebf0e62 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.security.token.Token;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.security.JobTokenIdentifier;
@@ -153,14 +154,15 @@ public class ShuffleUtils {
 
   // TODO NEWTEZ handle ssl shuffle
   public static StringBuilder constructBaseURIForShuffleHandler(String host,
-      int port, int partition, String appId) {
+      int port, int partition, String appId, boolean sslShuffle) {
     return constructBaseURIForShuffleHandler(host + ":" + String.valueOf(port),
-      partition, appId);
+      partition, appId, sslShuffle);
   }
   
   public static StringBuilder constructBaseURIForShuffleHandler(String hostIdentifier,
-      int partition, String appId) {
-    StringBuilder sb = new StringBuilder("http://");
+      int partition, String appId, boolean sslShuffle) {
+    final String http_protocol = (sslShuffle) ? "https://" : "http://";
+    StringBuilder sb = new StringBuilder(http_protocol);
     sb.append(hostIdentifier);
     sb.append("/");
     sb.append("mapOutput?job=");
@@ -221,9 +223,15 @@ public class ShuffleUtils {
       LOG.info("Set keepAlive max connections: " + keepAliveMaxConnections);
     }
 
-    return builder.setTimeout(connectionTimeout, readTimeout)
-      .setBufferSize(bufferSize)
-      .setKeepAlive(keepAlive, keepAliveMaxConnections).build();
+    builder.setTimeout(connectionTimeout, readTimeout)
+        .setBufferSize(bufferSize)
+        .setKeepAlive(keepAlive, keepAliveMaxConnections);
+
+    boolean sslShuffle = conf.getBoolean(TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
+      TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_ENABLE_SSL);
+    builder.setSSL(sslShuffle, conf);
+
+    return builder.build();
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/62bace35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
index d81a4f5..b24e67e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
@@ -60,6 +60,8 @@ import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
 import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
 import org.apache.tez.runtime.library.shuffle.common.Fetcher;
 import org.apache.tez.runtime.library.shuffle.common.FetcherCallback;
+import org.apache.tez.runtime.library.shuffle.common.HttpConnection;
+import org.apache.tez.runtime.library.shuffle.common.HttpConnection.HttpConnectionParams;
 import org.apache.tez.runtime.library.shuffle.common.InputHost;
 import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
@@ -130,7 +132,7 @@ public class ShuffleManager implements FetcherCallback {
   private final TezCounter bytesShuffledToMemCounter;
   
   private volatile Throwable shuffleError;
-  private final Configuration conf;
+  private final HttpConnectionParams httpConnectionParams;
   
   // TODO More counters - FetchErrors, speed?
   
@@ -174,7 +176,6 @@ public class ShuffleManager implements FetcherCallback {
         new ThreadFactoryBuilder().setDaemon(true)
             .setNameFormat("Fetcher [" + srcNameTrimmed + "] #%d").build());
     this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
-    this.conf = conf;
     
     ExecutorService schedulerRawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
         .setDaemon(true).setNameFormat("ShuffleRunner [" + srcNameTrimmed + "]").build());
@@ -186,11 +187,13 @@ public class ShuffleManager implements FetcherCallback {
     this.shuffleSecret = ShuffleUtils
         .getJobTokenSecretFromTokenBytes(inputContext
             .getServiceConsumerMetaData(TezConfiguration.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
-    
+    httpConnectionParams =
+        ShuffleUtils.constructHttpShuffleConnectionParams(conf);
     LOG.info(this.getClass().getSimpleName() + " : numInputs=" + numInputs + ", compressionCodec="
         + (codec == null ? "NoCompressionCodec" : codec.getClass().getName()) + ", numFetchers="
         + numFetchers + ", ifileBufferSize=" + ifileBufferSize + ", ifileReadAheadEnabled="
-        + ifileReadAhead + ", ifileReadAheadLength=" + ifileReadAheadLength);
+        + ifileReadAhead + ", ifileReadAheadLength=" + ifileReadAheadLength +", "
+        + httpConnectionParams.toString());
   }
 
   public void run() throws IOException {
@@ -283,8 +286,8 @@ public class ShuffleManager implements FetcherCallback {
   
   private Fetcher constructFetcherForHost(InputHost inputHost) {
     FetcherBuilder fetcherBuilder = new FetcherBuilder(ShuffleManager.this,
-      ShuffleUtils.constructHttpShuffleConnectionParams(conf), inputManager,
-        inputContext.getApplicationId(), shuffleSecret, srcNameTrimmed);
+      httpConnectionParams, inputManager, inputContext.getApplicationId(),
+      shuffleSecret, srcNameTrimmed);
     if (codec != null) {
       fetcherBuilder.setCompressionParameters(codec);
     }
@@ -522,6 +525,10 @@ public class ShuffleManager implements FetcherCallback {
         this.fetcherExecutor.shutdownNow(); // Interrupts all running fetchers.
       }
     }
+    //All threads are shutdown.  It is safe to shutdown SSL factory
+    if (httpConnectionParams.isSSLShuffleEnabled()) {
+      HttpConnection.cleanupSSLFactory();
+    }
   }
 
   private void registerCompletedInput(FetchedInput fetchedInput) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/62bace35/tez-tests/src/test/java/org/apache/tez/test/TestSecureShuffle.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestSecureShuffle.java b/tez-tests/src/test/java/org/apache/tez/test/TestSecureShuffle.java
new file mode 100644
index 0000000..7921c83
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestSecureShuffle.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.mapreduce.examples.OrderedWordCount;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test to verify secure-shuffle (SSL mode) in Tez
+ */
+public class TestSecureShuffle {
+
+  private static MiniDFSCluster miniDFSCluster;
+  private static MiniTezCluster miniTezCluster;
+
+  private static Configuration conf = new Configuration();
+  private static FileSystem fs;
+  private static Path inputLoc = new Path("/tmp/sample.txt");
+  private static Path outputLoc = new Path("/tmp/outPath");
+  private static File keysStoresDir = new File("target/keystores");
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    // Enable SSL debugging
+    System.setProperty("javax.net.debug", "all");
+    conf = new Configuration();
+    setupKeyStores();
+
+    miniDFSCluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true).build();
+    fs = miniDFSCluster.getFileSystem();
+    conf.set("fs.defaultFS", fs.getUri().toString());
+
+    // 15 seconds should be good enough in local machine
+    conf.setInt(TezJobConfig.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT, 15 * 1000);
+    conf.setInt(TezJobConfig.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, 15 * 1000);
+
+    miniTezCluster =
+        new MiniTezCluster(TestSecureShuffle.class.getName(), 3, 3, 1);
+
+    miniTezCluster.init(conf);
+    miniTezCluster.start();
+    createSampleFile(inputLoc);
+  }
+
+  /**
+   * Verify whether shuffle works on SSL mode.
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 60000)
+  public void testSecureShuffle() throws Exception {
+    miniTezCluster.getConfig().setBoolean(
+      TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL, true);
+
+    OrderedWordCount wordCount = new OrderedWordCount();
+    wordCount.setConf(new Configuration(miniTezCluster.getConfig()));
+
+    String[] args = new String[] { inputLoc.toString(), outputLoc.toString() };
+    assertEquals(0, wordCount.run(args));
+
+    // cleanup output
+    fs.delete(outputLoc, true);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    if (miniTezCluster != null) {
+      miniTezCluster.stop();
+    }
+    if (miniDFSCluster != null) {
+      miniDFSCluster.shutdown();
+    }
+  }
+
+  /**
+   * Create sample file for wordcount program
+   *
+   * @param inputLoc
+   * @throws IOException
+   */
+  private static void createSampleFile(Path inputLoc) throws IOException {
+    fs.deleteOnExit(inputLoc);
+    FSDataOutputStream out = fs.create(inputLoc);
+    BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
+    for (int i = 0; i < 50; i++) {
+      writer.write("Hello World");
+      writer.write("Some other line");
+      writer.newLine();
+    }
+    writer.close();
+  }
+
+  /**
+   * Create relevant keystores for test cluster
+   *
+   * @throws Exception
+   */
+  private static void setupKeyStores() throws Exception {
+    keysStoresDir.mkdirs();
+    String sslConfsDir =
+        KeyStoreTestUtil.getClasspathDir(TestSecureShuffle.class);
+
+    KeyStoreTestUtil.setupSSLConfig(keysStoresDir.getAbsolutePath(),
+      sslConfsDir, conf, true);
+    conf.setBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, true);
+  }
+}


Mime
View raw message