tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-1102. Abstract out connection management logic in shuffle code. Contributed by Rajesh Balamohan.
Date Tue, 13 May 2014 22:27:28 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master ab3c4c346 -> c247a3bfc


TEZ-1102. Abstract out connection management logic in shuffle code.
Contributed by Rajesh Balamohan.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/c247a3bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/c247a3bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/c247a3bf

Branch: refs/heads/master
Commit: c247a3bfcc242eae6609b384dc748458f11db5f8
Parents: ab3c4c3
Author: Siddharth Seth <sseth@apache.org>
Authored: Tue May 13 15:26:59 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Tue May 13 15:26:59 2014 -0700

----------------------------------------------------------------------
 .../library/common/shuffle/impl/Fetcher.java    | 345 ++----------------
 .../shuffle/impl/ShuffleInputEventHandler.java  |  17 +-
 .../runtime/library/shuffle/common/Fetcher.java | 198 ++--------
 .../library/shuffle/common/HttpConnection.java  | 359 +++++++++++++++++++
 .../library/shuffle/common/ShuffleUtils.java    | 112 ++++--
 .../impl/ShuffleInputEventHandlerImpl.java      |  18 +-
 .../shuffle/common/impl/ShuffleManager.java     |  17 +-
 7 files changed, 535 insertions(+), 531 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c247a3bf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index 950c3cc..4f4858c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -17,56 +17,40 @@
  */
 package org.apache.tez.runtime.library.common.shuffle.impl;
 
-import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.HttpURLConnection;
-import java.net.MalformedURLException;
 import java.net.URL;
-import java.security.GeneralSecurityException;
 import java.util.Arrays;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
 
 import javax.crypto.SecretKey;
-import javax.net.ssl.HttpsURLConnection;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
 import org.apache.tez.runtime.library.common.shuffle.impl.MapOutput.Type;
-import org.apache.tez.runtime.library.common.sort.impl.IFileInputStream;
+import org.apache.tez.runtime.library.shuffle.common.HttpConnection;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.common.HttpConnection.HttpConnectionParams;
 
 import com.google.common.annotations.VisibleForTesting;
 
 class Fetcher extends Thread {
   
   private static final Log LOG = LogFactory.getLog(Fetcher.class);
-  
-  /** Basic/unit connection timeout (in milliseconds) */
-  private final static int UNIT_CONNECT_TIMEOUT = 60 * 1000;
-
   private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
                                     CONNECTION, WRONG_REDUCE}
   
-  private final static String URL_CONNECTION_ERROR_STREAM_BUFFER_ENABLED =
-      "sun.net.http.errorstream.enableBuffering";
-  private final static String URL_CONNECTION_MAX_CONNECTIONS = "http.maxConnections";
   private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
   private final TezCounter connectionErrs;
   private final TezCounter ioErrs;
@@ -83,13 +67,8 @@ class Fetcher extends Thread {
   private static int nextId = 0;
   private int currentPartition = -1;
   
-  private final int connectionTimeout;
-  private final int readTimeout;
-  private final int bufferSize;
-  
   // Decompression of map-outputs
   private final CompressionCodec codec;
-  private final Decompressor decompressor;
   private final SecretKey jobTokenSecret;
 
   private volatile boolean stopped = false;
@@ -102,11 +81,12 @@ class Fetcher extends Thread {
   
   private LinkedHashSet<InputAttemptIdentifier> remaining;
 
-  private boolean keepAlive = false;
-
   volatile HttpURLConnection connection;
   volatile DataInputStream input;
 
+  HttpConnection httpConnection;
+  HttpConnectionParams httpConnectionParams;
+  
   public Fetcher(Configuration job, 
       ShuffleScheduler scheduler, MergeManager merger,
       ShuffleClientMetrics metrics,
@@ -135,38 +115,14 @@ class Fetcher extends Thread {
 
     this.ifileReadAhead = ifileReadAhead;
     this.ifileReadAheadLength = ifileReadAheadLength;
+    this.httpConnectionParams = ShuffleUtils.constructHttpShuffleConnectionParams(job);
     
     if (codec != null) {
       this.codec = codec;
-      this.decompressor = CodecPool.getDecompressor(codec);
     } else {
       this.codec = null;
-      this.decompressor = null;
     }
 
-    this.keepAlive =
-        job.getBoolean(TezJobConfig.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED,
-          TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED);
-    int keepAliveMaxConnections =
-        job.getInt(TezJobConfig.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS,
-          TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS);
-
-    if (keepAlive) {
-      System.setProperty(URL_CONNECTION_ERROR_STREAM_BUFFER_ENABLED, "true");
-      System.setProperty(URL_CONNECTION_MAX_CONNECTIONS, String.valueOf(keepAliveMaxConnections));
-      LOG.info("Set keepAlive max connections: " + keepAliveMaxConnections);
-    }
-
-    this.connectionTimeout = 
-        job.getInt(TezJobConfig.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT,
-            TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_STALLED_COPY_TIMEOUT);
-    this.readTimeout = 
-        job.getInt(TezJobConfig.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, 
-            TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT);
-    
-    this.bufferSize = job.getInt(TezJobConfig.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE, 
-            TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE);
-
     this.logIdentifier = "fetcher [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "] #" + id;
     setName(logIdentifier);
     setDaemon(true);
@@ -202,10 +158,10 @@ class Fetcher extends Thread {
           // Shuffle
           copyFromHost(host);
         } finally {
-          cleanupCurrentConnection(!keepAlive);
+          cleanupCurrentConnection(false);
           if (host != null) {
             scheduler.freeHost(host);
-            metrics.threadFree();            
+            metrics.threadFree();
           }
         }
       }
@@ -237,13 +193,8 @@ class Fetcher extends Thread {
     // shutdown request to block
     synchronized (cleanupLock) {
       try {
-        if (input != null) {
-          LOG.info("Closing input on " + logIdentifier);
-          input.close();
-        }
-        if (connection != null && disconnect) {
-          LOG.info("Closing connection on " + logIdentifier);
-          connection.disconnect();
+        if (httpConnection != null) {
+          httpConnection.cleanup(disconnect);
         }
       } catch (IOException e) {
         if (LOG.isDebugEnabled()) {
@@ -255,21 +206,6 @@ class Fetcher extends Thread {
     }
   }
 
-  @VisibleForTesting
-  protected HttpURLConnection openConnection(URL url) throws IOException {
-    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
-    if (sslShuffle) {
-      HttpsURLConnection httpsConn = (HttpsURLConnection) conn;
-      try {
-        httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory());
-      } catch (GeneralSecurityException ex) {
-        throw new IOException(ex);
-      }
-      httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());
-    }
-    return conn;
-  }
-  
   /**
    * The crux of the matter...
    * 
@@ -300,24 +236,14 @@ class Fetcher extends Thread {
     boolean connectSucceeded = false;
     
     try {
-      URL url = getMapOutputURL(host, srcAttempts);
-      connection = openConnection(url);
-
-      // generate hash of the url
-      String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
-      String encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecret);
-
-      // put url hash into http header
-      connection.addRequestProperty(
-          SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
-      // set the read timeout
-      connection.setReadTimeout(readTimeout);
-      // put shuffle version into http header
-      connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
-          ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
-      connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
-          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
-      connect(connectionTimeout);
+      URL url = ShuffleUtils.constructInputURL(host.getBaseUrl(), srcAttempts,
+        httpConnectionParams.getKeepAlive());
+      httpConnection = new HttpConnection(url, httpConnectionParams,
+        logIdentifier, jobTokenSecret);
+      if (sslShuffle) {
+        httpConnection.setSSLFactory(sslFactory);
+      }
+      connectSucceeded = httpConnection.connect();
       
       if (stopped) {
         LOG.info("Detected fetcher has been shutdown after connection establishment. Returning");
@@ -325,32 +251,8 @@ class Fetcher extends Thread {
         putBackRemainingMapOutputs(host);
         return;
       }
-      connectSucceeded = true;
-      input = new DataInputStream(new BufferedInputStream(connection.getInputStream(), bufferSize));
-
-      // Validate response code
-      int rc = connection.getResponseCode();
-      if (rc != HttpURLConnection.HTTP_OK) {
-        throw new IOException(
-            "Got invalid response code " + rc + " from " + url +
-            ": " + connection.getResponseMessage());
-      }
-      // get the shuffle version
-      if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
-          connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
-          || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
-              connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))) {
-        throw new IOException("Incompatible shuffle response version");
-      }
-      // get the replyHash which is HMac of the encHash we sent to the server
-      String replyHash = connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
-      if(replyHash==null) {
-        throw new IOException("security validation of TT Map output failed");
-      }
-      LOG.debug("url="+msgToEncode+";encHash="+encHash+";replyHash="+replyHash);
-      // verify that replyHash is HMac of encHash
-      SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
-      LOG.info("for url="+msgToEncode+" sent hash and receievd reply");
+      input = httpConnection.getInputStream();
+      httpConnection.validate();
     } catch (IOException ie) {
       if (stopped) {
         LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown");
@@ -358,11 +260,6 @@ class Fetcher extends Thread {
         putBackRemainingMapOutputs(host);
         return;
       }
-      if (keepAlive && connectSucceeded) {
-        //Read the response body in case of error. This helps in connection reuse.
-        //Refer: http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html
-        readErrorStream(connection.getErrorStream());
-      }
       ioErrs.increment(1);
       if (!connectSucceeded) {
         LOG.warn("Failed to connect to " + host + " with " + remaining.size() + " inputs", ie);
@@ -375,7 +272,6 @@ class Fetcher extends Thread {
       // At this point, either the connection failed, or the initial header verification failed.
       // The error does not relate to any specific Input. Report all of them as failed.
       // This ends up indirectly penalizing the host (multiple failures reported on the single host)
-
       for(InputAttemptIdentifier left: remaining) {
         // Need to be handling temporary glitches .. 
         // Report read error to the AM to trigger source failure heuristics
@@ -407,13 +303,9 @@ class Fetcher extends Thread {
         for(InputAttemptIdentifier left: failedTasks) {
           scheduler.copyFailed(left, host, true, false);
         }
-        //Being defensive: cleanup the error stream in case of keepAlive
-        if (keepAlive && connection != null) {
-          readErrorStream(connection.getErrorStream());
-        }
       }
 
-      cleanupCurrentConnection(!keepAlive); //do not disconnect if keepAlive is on
+      cleanupCurrentConnection(false);
 
       // Sanity check
       if (failedTasks == null && !remaining.isEmpty()) {
@@ -425,25 +317,6 @@ class Fetcher extends Thread {
     }
   }
 
-  /**
-   * Cleanup the error stream if any, for keepAlive connections
-   * 
-   * @param errorStream
-   */
-  private void readErrorStream(InputStream errorStream) {
-    if (errorStream == null) {
-      return;
-    }
-    try {
-      DataOutputBuffer errorBuffer = new DataOutputBuffer();
-      IOUtils.copyBytes(errorStream, errorBuffer, 4096);
-      IOUtils.closeStream(errorBuffer);
-      IOUtils.closeStream(errorStream);
-    } catch(IOException ioe) {
-      //ignore
-    }
-  }
-
   private void putBackRemainingMapOutputs(MapHost host) {
     // Cycle through remaining MapOutputs
     boolean isFirst = true;
@@ -495,7 +368,6 @@ class Fetcher extends Thread {
         return new InputAttemptIdentifier[] {getNextRemainingAttempt()};
       }
 
- 
       // Do some basic sanity verification
       if (!verifySanity(compressedLength, decompressedLength, forReduce,
           remaining, srcAttemptId)) {
@@ -522,23 +394,26 @@ class Fetcher extends Thread {
         return EMPTY_ATTEMPT_ID_ARRAY;
       }
       
+      // Check if we can shuffle *now* ...
       if (mapOutput.getType() == Type.WAIT) {
         // TODO Review: Does this cause a tight loop ?
         LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
-        //Not an error but wait to process data.  
+        //Not an error but wait to process data.
         return EMPTY_ATTEMPT_ID_ARRAY;
-      }
-
+      } 
+      
       // Go!
       LOG.info("fetcher#" + id + " about to shuffle output of map " + 
                mapOutput.getAttemptIdentifier() + " decomp: " +
                decompressedLength + " len: " + compressedLength + " to " +
                mapOutput.getType());
       if (mapOutput.getType() == Type.MEMORY) {
-        shuffleToMemory(host, mapOutput, input, 
-                        (int) decompressedLength, (int) compressedLength);
+        ShuffleUtils.shuffleToMemory(mapOutput.getMemory(), input,
+          (int) decompressedLength, (int) compressedLength, codec, ifileReadAhead,
+          ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier().toString());
       } else {
-        shuffleToDisk(host, mapOutput, input, compressedLength);
+        ShuffleUtils.shuffleToDisk(mapOutput.getDisk(), host.getHostIdentifier(),
+          input, compressedLength, LOG, mapOutput.getAttemptIdentifier().toString());
       }
       
       // Inform the shuffle scheduler
@@ -629,161 +504,5 @@ class Fetcher extends Thread {
       return null;
     }
   }
-
-  /**
-   * Create the map-output-url. This will contain all the map ids
-   * separated by commas
-   * @param host
-   * @param maps
-   * @return
-   * @throws MalformedURLException
-   */
-  private URL getMapOutputURL(MapHost host, List<InputAttemptIdentifier> srcAttempts
-                              )  throws MalformedURLException {
-    // Get the base url
-    StringBuffer url = new StringBuffer(host.getBaseUrl());
-    
-    boolean first = true;
-    for (InputAttemptIdentifier mapId : srcAttempts) {
-      if (!first) {
-        url.append(",");
-      }
-      url.append(mapId.getPathComponent());
-      first = false;
-    }
-    //It is possible to override keep-alive setting in cluster by adding keepAlive in url.
-    //Refer MAPREDUCE-5787 to enable/disable keep-alive in the cluster.
-    if (keepAlive) {
-      url.append("&keepAlive=true");
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("MapOutput URL for " + host + " -> " + url.toString());
-    }
-    return new URL(url.toString());
-  }
-  
-  /** 
-   * The connection establishment is attempted multiple times and is given up 
-   * only on the last failure. Instead of connecting with a timeout of 
-   * X, we try connecting with a timeout of x < X but multiple times. 
-   */
-  private void connect(int connectionTimeout) throws IOException {
-    int unit = 0;
-    if (connectionTimeout < 0) {
-      throw new IOException("Invalid timeout "
-                            + "[timeout = " + connectionTimeout + " ms]");
-    } else if (connectionTimeout > 0) {
-      unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout);
-    }
-    // set the connect timeout to the unit-connect-timeout
-    connection.setConnectTimeout(unit);
-    while (true) {
-      try {
-        connection.connect();
-        break;
-      } catch (IOException ioe) {
-        
-        // Don't attempt another connect if already stopped.
-        if (stopped) {
-          LOG.info("Fetcher already shutdown. Not attempting to connect again. Last exception was: ["
-              + ioe.getClass().getName() + ", " + ioe.getMessage() + "]");
-          return;
-        }
-
-        // update the total remaining connect-timeout
-        connectionTimeout -= unit;
-
-        // throw an exception if we have waited for timeout amount of time
-        // note that the updated value if timeout is used here
-        if (connectionTimeout == 0) {
-          throw ioe;
-        }
-
-        // reset the connect timeout for the last try
-        if (connectionTimeout < unit) {
-          unit = connectionTimeout;
-          // reset the connect time out for the final connect
-          connection.setConnectTimeout(unit);
-        }
-      }
-    }
-  }
-
-  private void shuffleToMemory(MapHost host, MapOutput mapOutput, 
-                               InputStream localInput, 
-                               int decompressedLength, 
-                               int compressedLength) throws IOException {    
-    IFileInputStream checksumIn = 
-      new IFileInputStream(localInput, compressedLength, ifileReadAhead, ifileReadAheadLength);
-
-    localInput = checksumIn;       
-  
-    // Are map-outputs compressed?
-    if (codec != null) {
-      decompressor.reset();
-      localInput = codec.createInputStream(localInput, decompressor);
-    }
-  
-    // Copy map-output into an in-memory buffer
-    byte[] shuffleData = mapOutput.getMemory();
-    
-    try {
-      IOUtils.readFully(localInput, shuffleData, 0, shuffleData.length);
-      metrics.inputBytes(shuffleData.length);
-      LOG.info("Read " + shuffleData.length + " bytes from map-output for " +
-               mapOutput.getAttemptIdentifier());
-    } catch (IOException ioe) {      
-      // Close the streams
-      IOUtils.cleanup(LOG, localInput);
-
-      // Re-throw
-      throw ioe;
-    }
-
-  }
-  
-  private void shuffleToDisk(MapHost host, MapOutput mapOutput, 
-                             InputStream localInput, 
-                             long compressedLength) 
-  throws IOException {
-    // Copy data to local-disk
-    OutputStream output = mapOutput.getDisk();
-    long bytesLeft = compressedLength;
-    try {
-      final int BYTES_TO_READ = 64 * 1024;
-      byte[] buf = new byte[BYTES_TO_READ];
-      while (bytesLeft > 0) {
-        int n = localInput.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
-        if (n < 0) {
-          throw new IOException("read past end of stream reading " + 
-                                mapOutput.getAttemptIdentifier());
-        }
-        output.write(buf, 0, n);
-        bytesLeft -= n;
-        metrics.inputBytes(n);
-      }
-
-      LOG.info("Read " + (compressedLength - bytesLeft) + 
-               " bytes from map-output for " +
-               mapOutput.getAttemptIdentifier());
-
-      output.close();
-    } catch (IOException ioe) {
-      // Close the streams
-      IOUtils.cleanup(LOG, localInput, output);
-
-      // Re-throw
-      throw ioe;
-    }
-
-    // Sanity check
-    if (bytesLeft != 0) {
-      throw new IOException("Incomplete map output received for " +
-                            mapOutput.getAttemptIdentifier() + " from " +
-                            host.getHostIdentifier() + " (" + 
-                            bytesLeft + " bytes missing of " + 
-                            compressedLength + ")"
-      );
-    }
-  }
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c247a3bf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
index e0a4160..a045fc4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -32,6 +32,7 @@ import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 
 import com.google.protobuf.InvalidProtocolBufferException;
@@ -112,20 +113,10 @@ public class ShuffleInputEventHandler {
 
   // TODO NEWTEZ Handle encrypted shuffle
   private URI getBaseURI(String host, int port, int partitionId) {
-    StringBuilder sb = new StringBuilder("http://");
-    sb.append(host);
-    sb.append(":");
-    sb.append(String.valueOf(port));
-    sb.append("/");
-    
-    sb.append("mapOutput?job=");
-    // Required to use the existing ShuffleHandler
-    sb.append(inputContext.getApplicationId().toString().replace("application", "job"));
-    
-    sb.append("&reduce=");
-    sb.append(partitionId);
-    sb.append("&map=");
+    StringBuilder sb = ShuffleUtils.constructBaseURIForShuffleHandler(host, port,
+      partitionId, inputContext.getApplicationId().toString());
     URI u = URI.create(sb.toString());
     return u;
   }
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c247a3bf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
index 9ecf590..00fc55a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
@@ -20,8 +20,6 @@ package org.apache.tez.runtime.library.shuffle.common;
 
 import java.io.DataInputStream;
 import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -42,9 +40,9 @@ import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
 import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleHeader;
 import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
+import org.apache.tez.runtime.library.shuffle.common.HttpConnection.HttpConnectionParams;
 
 import com.google.common.base.Preconditions;
 
@@ -56,13 +54,10 @@ public class Fetcher implements Callable<FetchResult> {
 
   private static final Log LOG = LogFactory.getLog(Fetcher.class);
 
-  private static final int UNIT_CONNECT_TIMEOUT = 60 * 1000;
   private static final AtomicInteger fetcherIdGen = new AtomicInteger(0);
 
   // Configurable fields.
   private CompressionCodec codec;
-  private int connectionTimeout;
-  private int readTimeout;
 
   private boolean ifileReadAhead = TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT;
   private int ifileReadAheadLength = TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT;
@@ -94,12 +89,12 @@ public class Fetcher implements Callable<FetchResult> {
   private LinkedHashSet<InputAttemptIdentifier> remaining;
 
   private URL url;
-  private volatile HttpURLConnection connection;
   private volatile DataInputStream input;
-  private String encHash;
-  private String msgToEncode;
+  
+  private HttpConnection httpConnection;
+  private HttpConnectionParams httpConnectionParams;
 
-  private Fetcher(FetcherCallback fetcherCallback,
+  private Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params,
       FetchedInputAllocator inputManager, ApplicationId appId, SecretKey shuffleSecret,
       String srcNameTrimmed) {
     this.fetcherCallback = fetcherCallback;
@@ -107,10 +102,11 @@ public class Fetcher implements Callable<FetchResult> {
     this.shuffleSecret = shuffleSecret;
     this.appId = appId;
     this.pathToAttemptMap = new HashMap<String, InputAttemptIdentifier>();
+    this.httpConnectionParams = params;
 
     this.fetcherIdentifier = fetcherIdGen.getAndIncrement();
     this.logIdentifier = "fetcher [" + srcNameTrimmed +"] " + fetcherIdentifier;
-    
+
     // TODO NEWTEZ Ideally, move this out from here into a static initializer block.
     // Re-enable when ssl shuffle support is needed.
 //    synchronized (Fetcher.class) {
@@ -145,7 +141,13 @@ public class Fetcher implements Callable<FetchResult> {
     remaining = new LinkedHashSet<InputAttemptIdentifier>(srcAttempts);
 
     try {
-      connectToShuffleHandler(host, port, partition, srcAttempts);
+      StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host,
+        port, partition, appId.toString());
+      this.url = ShuffleUtils.constructInputURL(baseURI.toString(), srcAttempts,
+        httpConnectionParams.getKeepAlive());
+
+      httpConnection = new HttpConnection(url, httpConnectionParams, logIdentifier, shuffleSecret);
+      httpConnection.connect();
     } catch (IOException e) {
       // ioErrs.increment(1);
       // If connect did not succeed, just mark all the maps as failed,
@@ -168,8 +170,9 @@ public class Fetcher implements Callable<FetchResult> {
     }
 
     try {
-      input = new DataInputStream(connection.getInputStream());
-      validateConnectionResponse(msgToEncode, encHash);
+      input = httpConnection.getInputStream();
+      httpConnection.validate();
+      //validateConnectionResponse(msgToEncode, encHash);
     } catch (IOException e) {
       // ioErrs.increment(1);
       // If we got a read error at this stage, it implies there was a problem
@@ -240,13 +243,8 @@ public class Fetcher implements Callable<FetchResult> {
     // shutdown request to block
     synchronized (isShutDown) {
       try {
-        if (input != null) {
-          LOG.info("Closing input on " + logIdentifier);
-          input.close();
-        }
-        if (connection != null) {
-          LOG.info("Closing connection on " + logIdentifier);
-          connection.disconnect();
+        if (httpConnection != null) {
+          httpConnection.cleanup(false);
         }
       } catch (IOException e) {
         LOG.info("Exception while shutting down fetcher on " + logIdentifier + " : "
@@ -322,12 +320,14 @@ public class Fetcher implements Callable<FetchResult> {
           + fetchedInput.getType());
 
       if (fetchedInput.getType() == Type.MEMORY) {
-        ShuffleUtils.shuffleToMemory((MemoryFetchedInput) fetchedInput,
-            input, (int) decompressedLength, (int) compressedLength, codec,
-            ifileReadAhead, ifileReadAheadLength, LOG);
+        ShuffleUtils.shuffleToMemory(((MemoryFetchedInput) fetchedInput).getBytes(),
+          input, (int) decompressedLength, (int) compressedLength, codec,
+          ifileReadAhead, ifileReadAheadLength, LOG,
+          fetchedInput.getInputAttemptIdentifier().toString());
       } else {
-        ShuffleUtils.shuffleToDisk((DiskFetchedInput) fetchedInput, input,
-            compressedLength, LOG);
+        ShuffleUtils.shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(),
+          (host +":" +port), input, compressedLength, LOG,
+          fetchedInput.getInputAttemptIdentifier().toString());
       }
 
       // Inform the shuffle scheduler
@@ -417,135 +417,6 @@ public class Fetcher implements Callable<FetchResult> {
     }
   }
 
-  private void connectToShuffleHandler(String host, int port,
-      int partition, List<InputAttemptIdentifier> inputs) throws IOException {
-    try {
-      this.url = constructInputURL(host, port, partition, inputs);
-      this.connection = openConnection(url);
-
-      // generate hash of the url
-      this.msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
-      this.encHash = SecureShuffleUtils.hashFromString(msgToEncode,
-          shuffleSecret);
-
-      // put url hash into http header
-      connection.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH,
-          encHash);
-      // set the read timeout
-      connection.setReadTimeout(readTimeout);
-      // put shuffle version into http header
-      connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
-          ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
-      connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
-          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
-
-      connect(connectionTimeout);
-    } catch (IOException e) {
-      LOG.warn("Failed to connect to " + host + " with " + srcAttempts.size()
-          + " inputs", e);
-      throw e;
-    }
-  }
-
-  private void validateConnectionResponse(String msgToEncode, String encHash) throws IOException {
-    int rc = connection.getResponseCode();
-    if (rc != HttpURLConnection.HTTP_OK) {
-      throw new IOException("Got invalid response code " + rc + " from " + url
-          + ": " + connection.getResponseMessage());
-    }
-
-    if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(connection
-        .getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
-        || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(connection
-            .getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))) {
-      throw new IOException("Incompatible shuffle response version");
-    }
-
-    // get the replyHash which is HMac of the encHash we sent to the server
-    String replyHash = connection
-        .getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
-    if (replyHash == null) {
-      throw new IOException("security validation of TT Map output failed");
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("url=" + msgToEncode + ";encHash=" + encHash + ";replyHash="
-          + replyHash);
-    }
-    // verify that replyHash is HMac of encHash
-    SecureShuffleUtils.verifyReply(replyHash, encHash, shuffleSecret);
-    LOG.info("for url=" + msgToEncode + " sent hash and receievd reply");
-  }
-
-  protected HttpURLConnection openConnection(URL url) throws IOException {
-    HttpURLConnection connection = (HttpURLConnection) url.openConnection();
-    return connection;
-  }
-
-  /**
-   * The connection establishment is attempted multiple times and is given up
-   * only on the last failure. Instead of connecting with a timeout of X, we try
-   * connecting with a timeout of x < X but multiple times.
-   */
-  private void connect(int connectionTimeout) throws IOException {
-    int unit = 0;
-    if (connectionTimeout < 0) {
-      throw new IOException("Invalid timeout " + "[timeout = "
-          + connectionTimeout + " ms]");
-    } else if (connectionTimeout > 0) {
-      unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout);
-    }
-    // set the connect timeout to the unit-connect-timeout
-    connection.setConnectTimeout(unit);
-    while (true) {
-      try {
-        connection.connect();
-        break;
-      } catch (IOException ioe) {
-
-        // Check if already shutdown and abort subsequent connect attempts
-        if (isShutDown.get()) {
-          LOG.info("Fetcher already shutdown. Not attempting to connect again. Last exception was: ["
-              + ioe.getClass().getName() + ", " + ioe.getMessage() + "]");
-          return;
-        }
-        // update the total remaining connect-timeout
-        connectionTimeout -= unit;
-
-        // throw an exception if we have waited for timeout amount of time
-        // note that the updated value if timeout is used here
-        if (connectionTimeout == 0) {
-          throw ioe;
-        }
-
-        // reset the connect timeout for the last try
-        if (connectionTimeout < unit) {
-          unit = connectionTimeout;
-          // reset the connect time out for the final connect
-          connection.setConnectTimeout(unit);
-        }
-      }
-    }
-  }
-
-  private URL constructInputURL(String host, int port, int partition,
-      List<InputAttemptIdentifier> inputs) throws MalformedURLException {
-    StringBuilder url = ShuffleUtils.constructBaseURIForShuffleHandler(host,
-        port, partition, appId);
-    boolean first = true;
-    for (InputAttemptIdentifier input : inputs) {
-      if (first) {
-        first = false;
-        url.append(input.getPathComponent());
-      } else {
-        url.append(",").append(input.getPathComponent());
-      }
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("InputFetch URL for: " + host + " : " + url.toString());
-    }
-    return new URL(url.toString());
-  }
-
   /**
    * Builder for the construction of Fetchers
    */
@@ -553,25 +424,23 @@ public class Fetcher implements Callable<FetchResult> {
     private Fetcher fetcher;
     private boolean workAssigned = false;
 
-    public FetcherBuilder(FetcherCallback fetcherCallback,
+    public FetcherBuilder(FetcherCallback fetcherCallback, HttpConnectionParams params,
         FetchedInputAllocator inputManager, ApplicationId appId,
         SecretKey shuffleSecret, String srcNameTrimmed) {
-      this.fetcher = new Fetcher(fetcherCallback, inputManager, appId,
+      this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
           shuffleSecret, srcNameTrimmed);
     }
 
-    public FetcherBuilder setCompressionParameters(CompressionCodec codec) {
-      fetcher.codec = codec;
+    public FetcherBuilder setHttpConnectionParameters(HttpConnectionParams httpParams) {
+      fetcher.httpConnectionParams = httpParams;
       return this;
     }
 
-    public FetcherBuilder setConnectionParameters(int connectionTimeout,
-        int readTimeout) {
-      fetcher.connectionTimeout = connectionTimeout;
-      fetcher.readTimeout = readTimeout;
+    public FetcherBuilder setCompressionParameters(CompressionCodec codec) {
+      fetcher.codec = codec;
       return this;
     }
-    
+
     public FetcherBuilder setIFileParams(boolean readAhead, int readAheadBytes) {
       fetcher.ifileReadAhead = readAhead;
       fetcher.ifileReadAheadLength = readAheadBytes;
@@ -614,3 +483,4 @@ public class Fetcher implements Callable<FetchResult> {
     return true;
   }
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c247a3bf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/HttpConnection.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/HttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/HttpConnection.java
new file mode 100644
index 0000000..c50ce27
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/HttpConnection.java
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.shuffle.common;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.security.GeneralSecurityException;
+
+import javax.crypto.SecretKey;
+import javax.net.ssl.HttpsURLConnection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleHeader;
+
+/**
+ * HttpConnection which can be used for Unordered / Ordered shuffle.
+ */
+public class HttpConnection {
+
+  private static final Log LOG = LogFactory.getLog(HttpConnection.class);
+
+  /** Basic/unit connection timeout (in milliseconds) */
+  private final static int UNIT_CONNECT_TIMEOUT = 60 * 1000;
+
+  private URL url;
+  private final String logIdentifier;
+
+  private SSLFactory sslFactory;
+  private HttpURLConnection connection;
+  private DataInputStream input;
+
+  private boolean connectionSucceeed;
+  private volatile boolean cleanup;
+
+  private final SecretKey jobTokenSecret;
+  private String encHash;
+  private String msgToEncode;
+
+  private final HttpConnectionParams httpConnParams;
+
+  /**
+   * HttpConnection
+   * 
+   * @param url
+   * @param connParams
+   * @param logIdentifier
+   * @param jobTokenSecret
+   * @throws IOException
+   */
+  public HttpConnection(URL url, HttpConnectionParams connParams,
+      String logIdentifier, SecretKey jobTokenSecret) throws IOException {
+    this.logIdentifier = logIdentifier;
+    this.jobTokenSecret = jobTokenSecret;
+    this.httpConnParams = connParams;
+    this.url = url;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("MapOutput URL :" + url.toString());
+    }
+  }
+
+  /**
+   * Set ssl factory
+   * 
+   * @param factory
+   */
+  public void setSSLFactory(SSLFactory factory) {
+    this.sslFactory = factory;
+  }
+
+  private void setupConnection() throws IOException {
+    connection = (HttpURLConnection) url.openConnection();
+    if (sslFactory != null) {
+      try {
+        ((HttpsURLConnection) connection).setSSLSocketFactory(sslFactory
+          .createSSLSocketFactory());
+        ((HttpsURLConnection) connection).setHostnameVerifier(sslFactory
+          .getHostnameVerifier());
+      } catch (GeneralSecurityException ex) {
+        throw new IOException(ex);
+      }
+    }
+    // generate hash of the url
+    msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
+    encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecret);
+
+    // put url hash into http header
+    connection.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH,
+      encHash);
+    // set the read timeout
+    connection.setReadTimeout(httpConnParams.readTimeout);
+    // put shuffle version into http header
+    connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+      ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+    connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+      ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+  }
+
+  /**
+   * Connect to source
+   * 
+   * @return
+   * @throws IOException
+   */
+  public boolean connect() throws IOException {
+    return connect(httpConnParams.connectionTimeout);
+  }
+
+  /**
+   * Connect to source with specific timeout
+   * 
+   * @param connectionTimeout
+   * @return
+   * @throws IOException
+   */
+  public boolean connect(int connectionTimeout) throws IOException {
+    if (connection == null) {
+      setupConnection();
+    }
+    int unit = 0;
+    if (connectionTimeout < 0) {
+      throw new IOException("Invalid timeout " + "[timeout = "
+          + connectionTimeout + " ms]");
+    } else if (connectionTimeout > 0) {
+      unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout);
+    }
+    // set the connect timeout to the unit-connect-timeout
+    connection.setConnectTimeout(unit);
+    while (true) {
+      try {
+        connection.connect();
+        connectionSucceeed = true;
+        break;
+      } catch (IOException ioe) {
+        // Don't attempt another connect if already cleanedup.
+        if (!cleanup) {
+          LOG.info("Cleanup is set to true. Not attempting to"
+              + " connect again. Last exception was: ["
+              + ioe.getClass().getName() + ", " + ioe.getMessage() + "]");
+          return false;
+        }
+        // update the total remaining connect-timeout
+        connectionTimeout -= unit;
+        // throw an exception if we have waited for timeout amount of time
+        // note that the updated value if timeout is used here
+        if (connectionTimeout == 0) {
+          throw ioe;
+        }
+        // reset the connect timeout for the last try
+        if (connectionTimeout < unit) {
+          unit = connectionTimeout;
+          // reset the connect time out for the final connect
+          connection.setConnectTimeout(unit);
+        }
+      }
+    }
+    return true;
+  }
+
+  public void validate() throws IOException {
+    int rc = connection.getResponseCode();
+    if (rc != HttpURLConnection.HTTP_OK) {
+      throw new IOException("Got invalid response code " + rc + " from " + url
+          + ": " + connection.getResponseMessage());
+    }
+    // get the shuffle version
+    if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(connection
+      .getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+        || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(connection
+          .getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))) {
+      throw new IOException("Incompatible shuffle response version");
+    }
+    // get the replyHash which is HMac of the encHash we sent to the server
+    String replyHash =
+        connection
+          .getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
+    if (replyHash == null) {
+      throw new IOException("security validation of TT Map output failed");
+    }
+    LOG.debug("url=" + msgToEncode + ";encHash=" + encHash + ";replyHash="
+        + replyHash);
+    // verify that replyHash is HMac of encHash
+    SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
+    LOG.info("for url=" + msgToEncode + " sent hash and receievd reply");
+  }
+
+  /**
+   * Get the inputstream from the connection
+   * 
+   * @return DataInputStream
+   * @throws IOException
+   */
+  public DataInputStream getInputStream() throws IOException {
+    DataInputStream input = null;
+    if (connectionSucceeed) {
+      input =
+          new DataInputStream(new BufferedInputStream(
+            connection.getInputStream(), httpConnParams.bufferSize));
+    }
+    return input;
+  }
+
+  /**
+   * Cleanup the connection.
+   * 
+   * @param disconnect
+   *          Close the connection if this is true; otherwise respect keepalive
+   * @throws IOException
+   */
+  public void cleanup(boolean disconnect) throws IOException {
+    cleanup = true;
+    try {
+      if (input != null) {
+        LOG.info("Closing input on " + logIdentifier);
+        input.close();
+      }
+      if (httpConnParams.keepAlive && connectionSucceeed) {
+        // Refer:
+        // http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html
+        readErrorStream(connection.getErrorStream());
+      }
+      if (connection != null && (disconnect || !httpConnParams.keepAlive)) {
+        LOG.info("Closing connection on " + logIdentifier);
+        connection.disconnect();
+      }
+    } catch (IOException e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Exception while shutting down fetcher " + logIdentifier, e);
+      } else {
+        LOG.info("Exception while shutting down fetcher " + logIdentifier
+            + ": " + e.getMessage());
+      }
+    }
+  }
+
+  /**
+   * Cleanup the error stream if any, for keepAlive connections
+   * 
+   * @param errorStream
+   */
+  private void readErrorStream(InputStream errorStream) {
+    if (errorStream == null) {
+      return;
+    }
+    try {
+      DataOutputBuffer errorBuffer = new DataOutputBuffer();
+      IOUtils.copyBytes(errorStream, errorBuffer, 4096);
+      IOUtils.closeStream(errorBuffer);
+      IOUtils.closeStream(errorStream);
+    } catch (IOException ioe) {
+      // ignore
+    }
+  }
+
+  public static class HttpConnectionParams {
+    private boolean keepAlive;
+    private int keepAliveMaxConnections;
+    private int connectionTimeout;
+    private int readTimeout;
+    private int bufferSize;
+    private boolean sslShuffle;
+    private SSLFactory sslFactory;
+
+    public boolean getKeepAlive() {
+      return keepAlive;
+    }
+
+    public int getKeepAliveMaxConnections() {
+      return keepAliveMaxConnections;
+    }
+
+    public int getConnectionTimeout() {
+      return connectionTimeout;
+    }
+
+    public int getReadTimeout() {
+      return readTimeout;
+    }
+
+    public void setReadTimeout(int readTimeout) {
+      this.readTimeout = readTimeout;
+    }
+
+    public int getBufferSize() {
+      return bufferSize;
+    }
+
+    public boolean isSSLShuffleEnabled() {
+      return sslShuffle;
+    }
+
+    public SSLFactory getSSLFactory() {
+      return sslFactory;
+    }
+  }
+
+  public static class HttpConnectionParamsBuilder {
+    private HttpConnectionParams params;
+
+    public HttpConnectionParamsBuilder() {
+      params = new HttpConnectionParams();
+    }
+
+    public HttpConnectionParamsBuilder setKeepAlive(boolean keepAlive,
+        int keepAliveMaxConnections) {
+      params.keepAlive = keepAlive;
+      params.keepAliveMaxConnections = keepAliveMaxConnections;
+      return this;
+    }
+
+    public HttpConnectionParamsBuilder setTimeout(int connectionTimeout,
+        int readTimeout) {
+      params.connectionTimeout = connectionTimeout;
+      params.readTimeout = readTimeout;
+      return this;
+    }
+
+    public HttpConnectionParamsBuilder setSSL(boolean sslEnabled,
+        SSLFactory sslFactory) {
+      params.sslShuffle = true;
+      params.sslFactory = sslFactory;
+      return this;
+    }
+
+    public HttpConnectionParamsBuilder setBufferSize(int bufferSize) {
+      params.bufferSize = bufferSize;
+      return this;
+    }
+
+    public HttpConnectionParams build() {
+      return params;
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c247a3bf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
index 0c7cb2e..51471cf 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
@@ -21,11 +21,16 @@ package org.apache.tez.runtime.library.shuffle.common;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
 import java.nio.ByteBuffer;
+import java.util.List;
 
 import javax.crypto.SecretKey;
 
 import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
@@ -33,13 +38,17 @@ import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.sort.impl.IFileInputStream;
+import org.apache.tez.runtime.library.shuffle.common.HttpConnection.HttpConnectionParams;
+import org.apache.tez.runtime.library.shuffle.common.HttpConnection.HttpConnectionParamsBuilder;
 
 public class ShuffleUtils {
 
+  private static final Log LOG = LogFactory.getLog(ShuffleUtils.class);
   public static String SHUFFLE_HANDLER_SERVICE_ID = "mapreduce_shuffle";
 
   public static SecretKey getJobTokenSecretFromTokenBytes(ByteBuffer meta)
@@ -71,12 +80,12 @@ public class ShuffleUtils {
       in.close();
     }
   }
-  
+
   @SuppressWarnings("resource")
-  public static void shuffleToMemory(MemoryFetchedInput fetchedInput,
+  public static void shuffleToMemory(byte[] shuffleData,
       InputStream input, int decompressedLength, int compressedLength,
       CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength,
-      Log LOG) throws IOException {
+      Log LOG, String identifier) throws IOException {
     IFileInputStream checksumIn = new IFileInputStream(input, compressedLength,
         ifileReadAhead, ifileReadAheadLength);
 
@@ -88,14 +97,12 @@ public class ShuffleUtils {
       decompressor.reset();
       input = codec.createInputStream(input, decompressor);
     }
-    // Copy map-output into an in-memory buffer
-    byte[] shuffleData = fetchedInput.getBytes();
 
     try {
       IOUtils.readFully(input, shuffleData, 0, shuffleData.length);
       // metrics.inputBytes(shuffleData.length);
       LOG.info("Read " + shuffleData.length + " bytes from input for "
-          + fetchedInput.getInputAttemptIdentifier());
+          + identifier);
     } catch (IOException ioe) {
       // Close the streams
       IOUtils.cleanup(LOG, input);
@@ -104,11 +111,10 @@ public class ShuffleUtils {
     }
   }
   
-  public static void shuffleToDisk(DiskFetchedInput fetchedInput,
-      InputStream input, long compressedLength, Log LOG)
+  public static void shuffleToDisk(OutputStream output, String hostIdentifier,
+      InputStream input, long compressedLength, Log LOG, String identifier)
       throws IOException {
     // Copy data to local-disk
-    OutputStream output = fetchedInput.getOutputStream();
     long bytesLeft = compressedLength;
     try {
       final int BYTES_TO_READ = 64 * 1024;
@@ -117,7 +123,7 @@ public class ShuffleUtils {
         int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
         if (n < 0) {
           throw new IOException("read past end of stream reading "
-              + fetchedInput.getInputAttemptIdentifier());
+              + identifier);
         }
         output.write(buf, 0, n);
         bytesLeft -= n;
@@ -125,37 +131,99 @@ public class ShuffleUtils {
       }
 
       LOG.info("Read " + (compressedLength - bytesLeft)
-          + " bytes from input for " + fetchedInput.getInputAttemptIdentifier());
+          + " bytes from input for " + identifier);
 
       output.close();
     } catch (IOException ioe) {
       // Close the streams
       IOUtils.cleanup(LOG, input, output);
-
       // Re-throw
       throw ioe;
     }
 
     // Sanity check
     if (bytesLeft != 0) {
-      throw new IOException("Incomplete input received for "
-          + fetchedInput.getInputAttemptIdentifier() + " ("
-          + bytesLeft + " bytes missing of " + compressedLength + ")");
+      throw new IOException("Incomplete map output received for " +
+          identifier + " from " +
+          hostIdentifier + " (" + 
+          bytesLeft + " bytes missing of " + 
+          compressedLength + ")");
     }
   }
-  
+
   // TODO NEWTEZ handle ssl shuffle
-  public static StringBuilder constructBaseURIForShuffleHandler(String host, int port, int partition, ApplicationId appId) {
+  public static StringBuilder constructBaseURIForShuffleHandler(String host,
+      int port, int partition, String appId) {
+    return constructBaseURIForShuffleHandler(host + ":" + String.valueOf(port),
+      partition, appId);
+  }
+  
+  public static StringBuilder constructBaseURIForShuffleHandler(String hostIdentifier,
+      int partition, String appId) {
     StringBuilder sb = new StringBuilder("http://");
-    sb.append(host);
-    sb.append(":");
-    sb.append(String.valueOf(port));
+    sb.append(hostIdentifier);
     sb.append("/");
     sb.append("mapOutput?job=");
-    sb.append(appId.toString().replace("application", "job"));
+    sb.append(appId.replace("application", "job"));
     sb.append("&reduce=");
     sb.append(String.valueOf(partition));
     sb.append("&map=");
     return sb;
   }
+
+  public static URL constructInputURL(String baseURI, 
+      List<InputAttemptIdentifier> inputs, boolean keepAlive) throws MalformedURLException {
+    StringBuilder url = new StringBuilder(baseURI);
+    boolean first = true;
+    for (InputAttemptIdentifier input : inputs) {
+      if (first) {
+        first = false;
+        url.append(input.getPathComponent());
+      } else {
+        url.append(",").append(input.getPathComponent());
+      }
+    }
+    //It is possible to override keep-alive setting in cluster by adding keepAlive in url.
+    //Refer MAPREDUCE-5787 to enable/disable keep-alive in the cluster.
+    if (keepAlive) {
+      url.append("&keepAlive=true");
+    }
+    return new URL(url.toString());
+  }
+
+  public static HttpConnectionParams constructHttpShuffleConnectionParams(
+      Configuration conf) {
+    HttpConnectionParamsBuilder builder = new HttpConnectionParamsBuilder();
+
+    int connectionTimeout =
+        conf.getInt(TezJobConfig.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT,
+          TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_STALLED_COPY_TIMEOUT);
+
+    int readTimeout =
+        conf.getInt(TezJobConfig.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT,
+          TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT);
+
+    int bufferSize =
+        conf.getInt(TezJobConfig.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE,
+          TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE);
+
+    boolean keepAlive =
+        conf.getBoolean(TezJobConfig.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED,
+          TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED);
+    int keepAliveMaxConnections =
+        conf.getInt(
+          TezJobConfig.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS,
+          TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS);
+    if (keepAlive) {
+      System.setProperty("sun.net.http.errorstream.enableBuffering", "true");
+      System.setProperty("http.maxConnections",
+        String.valueOf(keepAliveMaxConnections));
+      LOG.info("Set keepAlive max connections: " + keepAliveMaxConnections);
+    }
+
+    return builder.setTimeout(connectionTimeout, readTimeout)
+      .setBufferSize(bufferSize)
+      .setKeepAlive(keepAlive, keepAliveMaxConnections).build();
+  }
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c247a3bf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
index ac5da74..da4cb71 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
@@ -94,6 +94,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
       throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
     }
     int srcIndex = dme.getSourceIndex();
+    String hostIdentifier = shufflePayload.getHost() + ":" + shufflePayload.getPort();
     LOG.info("Processing DataMovementEvent with srcIndex: "
         + srcIndex + ", targetIndex: " + dme.getTargetIndex()
         + ", attemptNum: " + dme.getVersion() + ", payload: "
@@ -118,7 +119,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
         DataProto dataProto = shufflePayload.getData();
         FetchedInput fetchedInput = inputAllocator.allocate(dataProto.getRawLength(),
             dataProto.getCompressedLength(), srcAttemptIdentifier);
-        moveDataToFetchedInput(dataProto, fetchedInput);
+        moveDataToFetchedInput(dataProto, fetchedInput, hostIdentifier);
         shuffleManager.addCompletedInputWithData(srcAttemptIdentifier, fetchedInput);
       } else {
         shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(),
@@ -128,16 +129,18 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
   }
   
   private void moveDataToFetchedInput(DataProto dataProto,
-      FetchedInput fetchedInput) throws IOException {
+      FetchedInput fetchedInput, String hostIdentifier) throws IOException {
     switch (fetchedInput.getType()) {
     case DISK:
-      ShuffleUtils.shuffleToDisk((DiskFetchedInput) fetchedInput, dataProto
-          .getData().newInput(), dataProto.getCompressedLength(), LOG);
+      ShuffleUtils.shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(),
+        hostIdentifier, dataProto.getData().newInput(), dataProto.getCompressedLength(), LOG,
+          fetchedInput.getInputAttemptIdentifier().toString());
       break;
     case MEMORY:
-      ShuffleUtils.shuffleToMemory((MemoryFetchedInput) fetchedInput,
-          dataProto.getData().newInput(), dataProto.getRawLength(),
-          dataProto.getCompressedLength(), codec, ifileReadAhead, ifileReadAheadLength, LOG);
+      ShuffleUtils.shuffleToMemory(((MemoryFetchedInput) fetchedInput).getBytes(),
+        dataProto.getData().newInput(), dataProto.getRawLength(), dataProto.getCompressedLength(),
+        codec, ifileReadAhead, ifileReadAheadLength, LOG,
+        fetchedInput.getInputAttemptIdentifier().toString());
       break;
     case WAIT:
     default:
@@ -163,3 +166,4 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
     return sb.toString();
   }
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c247a3bf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
index 9cce872..d81a4f5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
@@ -112,8 +112,6 @@ public class ShuffleManager implements FetcherCallback {
   
   // Parameters required by Fetchers
   private final SecretKey shuffleSecret;
-  private final int connectionTimeout;
-  private final int readTimeout;
   private final CompressionCodec codec;
   
   private final int ifileBufferSize;
@@ -132,6 +130,7 @@ public class ShuffleManager implements FetcherCallback {
   private final TezCounter bytesShuffledToMemCounter;
   
   private volatile Throwable shuffleError;
+  private final Configuration conf;
   
   // TODO More counters - FetchErrors, speed?
   
@@ -175,7 +174,7 @@ public class ShuffleManager implements FetcherCallback {
         new ThreadFactoryBuilder().setDaemon(true)
             .setNameFormat("Fetcher [" + srcNameTrimmed + "] #%d").build());
     this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
-    
+    this.conf = conf;
     
     ExecutorService schedulerRawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
         .setDaemon(true).setNameFormat("ShuffleRunner [" + srcNameTrimmed + "]").build());
@@ -188,13 +187,6 @@ public class ShuffleManager implements FetcherCallback {
         .getJobTokenSecretFromTokenBytes(inputContext
             .getServiceConsumerMetaData(TezConfiguration.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
     
-    this.connectionTimeout = conf.getInt(
-        TezJobConfig.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT,
-        TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_STALLED_COPY_TIMEOUT);
-    this.readTimeout = conf.getInt(
-        TezJobConfig.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT,
-        TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT);
-    
     LOG.info(this.getClass().getSimpleName() + " : numInputs=" + numInputs + ", compressionCodec="
         + (codec == null ? "NoCompressionCodec" : codec.getClass().getName()) + ", numFetchers="
         + numFetchers + ", ifileBufferSize=" + ifileBufferSize + ", ifileReadAheadEnabled="
@@ -290,9 +282,9 @@ public class ShuffleManager implements FetcherCallback {
   }
   
   private Fetcher constructFetcherForHost(InputHost inputHost) {
-    FetcherBuilder fetcherBuilder = new FetcherBuilder(ShuffleManager.this, inputManager,
+    FetcherBuilder fetcherBuilder = new FetcherBuilder(ShuffleManager.this,
+      ShuffleUtils.constructHttpShuffleConnectionParams(conf), inputManager,
         inputContext.getApplicationId(), shuffleSecret, srcNameTrimmed);
-    fetcherBuilder.setConnectionParameters(connectionTimeout, readTimeout);
     if (codec != null) {
       fetcherBuilder.setCompressionParameters(codec);
     }
@@ -709,3 +701,4 @@ public class ShuffleManager implements FetcherCallback {
     }
   }
 }
+


Mime
View raw message