tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [02/50] [abbrv] tez git commit: TEZ-2450. support async http clients in ordered & unordered inputs (rbalamohan)
Date Mon, 01 Jun 2015 23:36:40 GMT
TEZ-2450. support async http clients in ordered & unordered inputs (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9dabf947
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9dabf947
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9dabf947

Branch: refs/heads/TEZ-2003
Commit: 9dabf94767480750f31d8f3e24d17a89bc036331
Parents: 7be325e
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Wed May 27 05:32:08 2015 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Wed May 27 05:32:08 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 pom.xml                                         |   5 +
 tez-runtime-library/findbugs-exclude.xml        |  12 +
 tez-runtime-library/pom.xml                     |   4 +
 .../org/apache/tez/http/BaseHttpConnection.java |  63 +++
 .../org/apache/tez/http/HttpConnection.java     | 318 ++++++++++++++
 .../apache/tez/http/HttpConnectionParams.java   |  82 ++++
 .../java/org/apache/tez/http/SSLFactory.java    | 238 +++++++++++
 .../http/async/netty/AsyncHttpConnection.java   | 231 ++++++++++
 .../netty/TezBodyDeferringAsyncHandler.java     | 256 +++++++++++
 .../library/api/TezRuntimeConfiguration.java    |   4 +
 .../runtime/library/common/shuffle/Fetcher.java |  34 +-
 .../library/common/shuffle/HttpConnection.java  | 428 -------------------
 .../library/common/shuffle/ShuffleUtils.java    | 117 +++--
 .../common/shuffle/impl/ShuffleManager.java     |  12 +-
 .../orderedgrouped/FetcherOrderedGrouped.java   |  23 +-
 .../orderedgrouped/ShuffleScheduler.java        |  11 +-
 .../library/input/OrderedGroupedKVInput.java    |   1 +
 .../runtime/library/input/UnorderedKVInput.java |   1 +
 .../org/apache/tez/http/TestHttpConnection.java | 202 +++++++++
 .../library/common/shuffle/TestFetcher.java     |  12 +-
 .../shuffle/orderedgrouped/TestFetcher.java     |  71 ++-
 .../apache/tez/test/TestPipelinedShuffle.java   |  13 +-
 .../org/apache/tez/test/TestSecureShuffle.java  |  21 +-
 24 files changed, 1636 insertions(+), 524 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a14e9da..5f5dd48 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
   TEZ-2468. Change the minimum Java version to Java 7.
 
 ALL CHANGES:
+  TEZ-2450. support async http clients in ordered & unordered inputs.
   TEZ-2454. Change FetcherOrderedGroup to work as Callables instead of blocking threads.
   TEZ-2466. tez-history-parser breaks hadoop 2.2 compatability.
   TEZ-2463. Update site for 0.7.0 release

http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 44592fa..2922cab 100644
--- a/pom.xml
+++ b/pom.xml
@@ -182,6 +182,11 @@
         <version>1.7.5</version>
       </dependency>
       <dependency>
+	      <groupId>com.ning</groupId>
+	      <artifactId>async-http-client</artifactId>
+	      <version>1.8.16</version>
+      </dependency>
+      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-log4j12</artifactId>
         <version>1.7.5</version>

http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/findbugs-exclude.xml b/tez-runtime-library/findbugs-exclude.xml
index 489e243..919e1e3 100644
--- a/tez-runtime-library/findbugs-exclude.xml
+++ b/tez-runtime-library/findbugs-exclude.xml
@@ -73,6 +73,18 @@
     <Bug pattern="UWF_UNWRITTEN_PUBLIC_OR_PROTECTED_FIELD"/>
   </Match>
 
+  <Match>
+    <Class name="org.apache.tez.runtime.library.common.shuffle.orderedgrouped.FetcherOrderedGrouped"/>
+    <Method name="setupConnection" params="org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost, java.util.List" returns="boolean"/>
+    <Bug pattern="BC_VACUOUS_INSTANCEOF"/>
+  </Match>
+
+  <Match>
+    <Class name="org.apache.tez.runtime.library.common.shuffle.Fetcher"/>
+    <Method name="setupConnection" params="java.util.List" returns="org.apache.tez.runtime.library.common.shuffle.Fetcher$HostFetchResult"/>
+    <Bug pattern="BC_VACUOUS_INSTANCEOF"/>
+  </Match>
+
   <!-- TODO This needs more looking into -->
   <Match>
     <Class name="org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter"/>

http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/pom.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/pom.xml b/tez-runtime-library/pom.xml
index 03e0ec3..4433a02 100644
--- a/tez-runtime-library/pom.xml
+++ b/tez-runtime-library/pom.xml
@@ -26,6 +26,10 @@
 
   <dependencies>
     <dependency>
+      <groupId>com.ning</groupId>
+      <artifactId>async-http-client</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.tez</groupId>
       <artifactId>tez-api</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/http/BaseHttpConnection.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/BaseHttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/http/BaseHttpConnection.java
new file mode 100644
index 0000000..dd642ae
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/http/BaseHttpConnection.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.http;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+public abstract class BaseHttpConnection {
+  /**
+   * Basic/unit connection timeout (in milliseconds)
+   */
+  protected final static int UNIT_CONNECT_TIMEOUT = 60 * 1000;
+
+  /**
+   * Connect to url
+   *
+   * @return boolean
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract boolean connect() throws IOException, InterruptedException;
+
+  /**
+   * Validate established connection
+   *
+   * @throws IOException
+   */
+  public abstract void validate() throws IOException;
+
+  /**
+   * Get inputstream
+   *
+   * @return DataInputStream
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract DataInputStream getInputStream() throws IOException, InterruptedException;
+
+  /**
+   * Clean up connection
+   *
+   * @param disconnect
+   * @throws IOException
+   */
+  public abstract void cleanup(boolean disconnect) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java
new file mode 100644
index 0000000..4732354
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.http;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.concurrent.TimeUnit;
+
+public class HttpConnection extends BaseHttpConnection {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HttpConnection.class);
+
+  private URL url;
+  private final String logIdentifier;
+
+  @VisibleForTesting
+  protected volatile HttpURLConnection connection;
+  private volatile DataInputStream input;
+  private volatile boolean connectionSucceeed;
+  private volatile boolean cleanup;
+
+  private final JobTokenSecretManager jobTokenSecretMgr;
+  private String encHash;
+  private String msgToEncode;
+
+  private final HttpConnectionParams httpConnParams;
+  private final Stopwatch stopWatch;
+
+  /**
+   * HttpConnection
+   *
+   * @param url
+   * @param connParams
+   * @param logIdentifier
+   * @param jobTokenSecretManager
+   * @throws IOException
+   */
+  public HttpConnection(URL url, HttpConnectionParams connParams,
+      String logIdentifier, JobTokenSecretManager jobTokenSecretManager) throws IOException {
+    this.logIdentifier = logIdentifier;
+    this.jobTokenSecretMgr = jobTokenSecretManager;
+    this.httpConnParams = connParams;
+    this.url = url;
+    this.stopWatch = new Stopwatch();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("MapOutput URL :" + url.toString());
+    }
+  }
+
+  @VisibleForTesting
+  public void computeEncHash() throws IOException {
+    // generate hash of the url
+    msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
+    encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecretMgr);
+  }
+
+  private void setupConnection() throws IOException {
+    connection = (HttpURLConnection) url.openConnection();
+    if (httpConnParams.isSslShuffle()) {
+      //Configure for SSL
+      SSLFactory sslFactory = httpConnParams.getSslFactory();
+      Preconditions.checkArgument(sslFactory != null, "SSLFactory can not be null");
+      sslFactory.configure(connection);
+    }
+
+    computeEncHash();
+
+    // put url hash into http header
+    connection.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
+    // set the read timeout
+    connection.setReadTimeout(httpConnParams.getReadTimeout());
+    // put shuffle version into http header
+    connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+        ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+    connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+        ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+  }
+
+  /**
+   * Connect to source
+   *
+   * @return true if connection was successful
+   * false if connection was previously cleaned up
+   * @throws IOException upon connection failure
+   */
+  @Override
+  public boolean connect() throws IOException {
+    return connect(httpConnParams.getConnectionTimeout());
+  }
+
+  /**
+   * Connect to source with specific timeout
+   *
+   * @param connectionTimeout
+   * @return true if connection was successful
+   * false if connection was previously cleaned up
+   * @throws IOException upon connection failure
+   */
+  private boolean connect(int connectionTimeout) throws IOException {
+    stopWatch.reset().start();
+    if (connection == null) {
+      setupConnection();
+    }
+    int unit = 0;
+    if (connectionTimeout < 0) {
+      throw new IOException("Invalid timeout " + "[timeout = " + connectionTimeout + " ms]");
+    } else if (connectionTimeout > 0) {
+      unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout);
+    }
+    // set the connect timeout to the unit-connect-timeout
+    connection.setConnectTimeout(unit);
+    int connectionFailures = 0;
+    while (true) {
+      long connectStartTime = System.currentTimeMillis();
+      try {
+        connection.connect();
+        connectionSucceeed = true;
+        break;
+      } catch (IOException ioe) {
+        // Don't attempt another connect if already cleanedup.
+        connectionFailures++;
+        if (cleanup) {
+          LOG.info("Cleanup is set to true. Not attempting to"
+              + " connect again. Last exception was: ["
+              + ioe.getClass().getName() + ", " + ioe.getMessage() + "]");
+          return false;
+        }
+        // update the total remaining connect-timeout
+        connectionTimeout -= unit;
+        // throw an exception if we have waited for timeout amount of time
+        // note that the updated value if timeout is used here
+        if (connectionTimeout <= 0) {
+          throw new IOException(
+              "Failed to connect to " + url + ", #connectionFailures=" + connectionFailures, ioe);
+        }
+        long elapsed = System.currentTimeMillis() - connectStartTime;
+        if (elapsed < unit) {
+          try {
+            long sleepTime = unit - elapsed;
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Sleeping for " + sleepTime + " while establishing connection to " + url +
+                  ", since connectAttempt returned in " + elapsed + " ms");
+            }
+            Thread.sleep(sleepTime);
+          } catch (InterruptedException e) {
+            throw new IOException(
+                "Connection establishment sleep interrupted, #connectionFailures=" +
+                    connectionFailures, e);
+          }
+        }
+
+        // reset the connect timeout for the last try
+        if (connectionTimeout < unit) {
+          unit = connectionTimeout;
+          // reset the connect time out for the final connect
+          connection.setConnectTimeout(unit);
+        }
+
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Time taken to connect to " + url.toString() +
+          " " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms; connectionFailures="
+          + connectionFailures);
+    }
+    return true;
+  }
+
+  @Override
+  public void validate() throws IOException {
+    stopWatch.reset().start();
+    int rc = connection.getResponseCode();
+    if (rc != HttpURLConnection.HTTP_OK) {
+      throw new IOException("Got invalid response code " + rc + " from " + url
+          + ": " + connection.getResponseMessage());
+    }
+
+    // get the shuffle version
+    if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(connection
+        .getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+        || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(connection
+        .getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))) {
+      throw new IOException("Incompatible shuffle response version");
+    }
+
+    // get the replyHash which is HMac of the encHash we sent to the server
+    String replyHash =
+        connection
+            .getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
+    if (replyHash == null) {
+      throw new IOException("security validation of TT Map output failed");
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("url=" + msgToEncode + ";encHash=" + encHash + ";replyHash="
+          + replyHash);
+    }
+
+    // verify that replyHash is HMac of encHash
+    SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecretMgr);
+    //Following log statement will be used by tez-tool perf-analyzer for mapping attempt to NM host
+    LOG.info("for url=" + url +
+        " sent hash and receievd reply " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms");
+  }
+
+  /**
+   * Get the inputstream from the connection
+   *
+   * @return DataInputStream
+   * @throws IOException
+   */
+  @Override
+  public DataInputStream getInputStream() throws IOException {
+    stopWatch.reset().start();
+    if (connectionSucceeed) {
+      input = new DataInputStream(new BufferedInputStream(
+              connection.getInputStream(), httpConnParams.getBufferSize()));
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Time taken to getInputStream (connect) " + url +
+          " " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms");
+    }
+    return input;
+  }
+
+  /**
+   * Cleanup the connection.
+   *
+   * @param disconnect Close the connection if this is true; otherwise respect keepalive
+   * @throws IOException
+   */
+  @Override
+  public void cleanup(boolean disconnect) throws IOException {
+    cleanup = true;
+    stopWatch.reset().start();
+    try {
+      if (input != null) {
+        LOG.info("Closing input on " + logIdentifier);
+        input.close();
+        input = null;
+      }
+      if (httpConnParams.isKeepAlive() && connectionSucceeed) {
+        // Refer:
+        // http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html
+        readErrorStream(connection.getErrorStream());
+      }
+      if (connection != null && (disconnect || !httpConnParams.isKeepAlive())) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Closing connection on " + logIdentifier);
+        }
+        connection.disconnect();
+        connection = null;
+      }
+    } catch (IOException e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Exception while shutting down fetcher " + logIdentifier, e);
+      } else {
+        LOG.info("Exception while shutting down fetcher " + logIdentifier
+            + ": " + e.getMessage());
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Time taken to cleanup connection to " + url +
+          " " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms");
+    }
+  }
+
+  /**
+   * Cleanup the error stream if any, for keepAlive connections
+   *
+   * @param errorStream
+   */
+  private void readErrorStream(InputStream errorStream) {
+    if (errorStream == null) {
+      return;
+    }
+    try {
+      DataOutputBuffer errorBuffer = new DataOutputBuffer();
+      IOUtils.copyBytes(errorStream, errorBuffer, 4096);
+      IOUtils.closeStream(errorBuffer);
+      IOUtils.closeStream(errorStream);
+    } catch (IOException ioe) {
+      // ignore
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnectionParams.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnectionParams.java b/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnectionParams.java
new file mode 100644
index 0000000..aac4bb3
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnectionParams.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.http;
+
+public class HttpConnectionParams {
+  private final boolean keepAlive;
+  private final int keepAliveMaxConnections;
+  private final int connectionTimeout;
+  private final int readTimeout;
+  private final int bufferSize;
+
+  private final boolean sslShuffle;
+  private final SSLFactory sslFactory;
+
+  public HttpConnectionParams(boolean keepAlive, int keepAliveMaxConnections, int
+      connectionTimeout, int readTimeout, int bufferSize, boolean sslShuffle, SSLFactory
+      sslFactory) {
+    this.keepAlive = keepAlive;
+    this.keepAliveMaxConnections = keepAliveMaxConnections;
+    this.connectionTimeout = connectionTimeout;
+    this.readTimeout = readTimeout;
+    this.bufferSize = bufferSize;
+    this.sslShuffle = sslShuffle;
+    this.sslFactory = sslFactory;
+  }
+
+  public int getBufferSize() {
+    return bufferSize;
+  }
+
+  public int getConnectionTimeout() {
+    return connectionTimeout;
+  }
+
+  public boolean isKeepAlive() {
+    return keepAlive;
+  }
+
+  public int getKeepAliveMaxConnections() {
+    return keepAliveMaxConnections;
+  }
+
+  public int getReadTimeout() {
+    return readTimeout;
+  }
+
+  public boolean isSslShuffle() {
+    return sslShuffle;
+  }
+
+  public SSLFactory getSslFactory() {
+    return sslFactory;
+  }
+
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("keepAlive=").append(keepAlive).append(", ");
+    sb.append("keepAliveMaxConnections=").append(keepAliveMaxConnections).append(", ");
+    sb.append("connectionTimeout=").append(connectionTimeout).append(", ");
+    sb.append("readTimeout=").append(readTimeout).append(", ");
+    sb.append("bufferSize=").append(bufferSize).append(", ");
+    sb.append("bufferSize=").append(bufferSize);
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/http/SSLFactory.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/SSLFactory.java b/tez-runtime-library/src/main/java/org/apache/tez/http/SSLFactory.java
new file mode 100644
index 0000000..f23739b
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/http/SSLFactory.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.http;
+
+import com.ning.http.client.AsyncHttpClientConfig;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
+import org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory;
+import org.apache.hadoop.security.ssl.KeyStoresFactory;
+import org.apache.hadoop.security.ssl.SSLFactory.Mode;
+import org.apache.hadoop.security.ssl.SSLHostnameVerifier;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocketFactory;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.security.GeneralSecurityException;
+
+import static org.apache.hadoop.security.ssl.SSLFactory.DEFAULT_SSL_ENABLED_PROTOCOLS;
+import static org.apache.hadoop.security.ssl.SSLFactory.DEFAULT_SSL_REQUIRE_CLIENT_CERT;
+import static org.apache.hadoop.security.ssl.SSLFactory.KEYSTORES_FACTORY_CLASS_KEY;
+import static org.apache.hadoop.security.ssl.SSLFactory.SSL_CLIENT_CONF_KEY;
+import static org.apache.hadoop.security.ssl.SSLFactory.SSL_ENABLED_PROTOCOLS;
+import static org.apache.hadoop.security.ssl.SSLFactory.SSL_HOSTNAME_VERIFIER_KEY;
+import static org.apache.hadoop.security.ssl.SSLFactory.SSL_REQUIRE_CLIENT_CERT_KEY;
+import static org.apache.hadoop.security.ssl.SSLFactory.SSL_SERVER_CONF_KEY;
+
+/**
+ * Factory that creates SSLEngine and SSLSocketFactory instances using
+ * Hadoop configuration information.
+ * <p/>
+ * This SSLFactory uses a {@link org.apache.hadoop.security.ssl.ReloadingX509TrustManager} instance,
+ * which reloads public keys if the truststore file changes.
+ * <p/>
+ * This factory is used to configure HTTPS in Hadoop HTTP based endpoints, both
+ * client and server.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class SSLFactory implements ConnectionConfigurator {
+
+  private Configuration conf;
+  private Mode mode;
+  private boolean requireClientCert;
+  private SSLContext context;
+  private HostnameVerifier hostnameVerifier;
+  private KeyStoresFactory keystoresFactory;
+
+  private String[] enabledProtocols = null;
+
+  /**
+   * Creates an SSLFactory.
+   *
+   * @param mode SSLFactory mode, client or server.
+   * @param conf Hadoop configuration from where the SSLFactory configuration
+   *             will be read.
+   */
+  public SSLFactory(Mode mode, Configuration conf) {
+    this.conf = conf;
+    if (mode == null) {
+      throw new IllegalArgumentException("mode cannot be NULL");
+    }
+    this.mode = mode;
+    requireClientCert = conf.getBoolean(SSL_REQUIRE_CLIENT_CERT_KEY,
+        DEFAULT_SSL_REQUIRE_CLIENT_CERT);
+    Configuration sslConf = readSSLConfiguration(mode);
+
+    Class<? extends KeyStoresFactory> klass
+        = conf.getClass(KEYSTORES_FACTORY_CLASS_KEY,
+        FileBasedKeyStoresFactory.class, KeyStoresFactory.class);
+    keystoresFactory = ReflectionUtils.newInstance(klass, sslConf);
+
+    enabledProtocols = conf.getStrings(SSL_ENABLED_PROTOCOLS, DEFAULT_SSL_ENABLED_PROTOCOLS);
+  }
+
+  private Configuration readSSLConfiguration(Mode mode) {
+    Configuration sslConf = new Configuration(false);
+    sslConf.setBoolean(SSL_REQUIRE_CLIENT_CERT_KEY, requireClientCert);
+    String sslConfResource;
+    if (mode == Mode.CLIENT) {
+      sslConfResource = conf.get(SSL_CLIENT_CONF_KEY, "ssl-client.xml");
+    } else {
+      sslConfResource = conf.get(SSL_SERVER_CONF_KEY, "ssl-server.xml");
+    }
+    sslConf.addResource(sslConfResource);
+    return sslConf;
+  }
+
+  /**
+   * Initializes the factory.
+   *
+   * @throws GeneralSecurityException thrown if an SSL initialization error
+   *                                  happened.
+   * @throws IOException              thrown if an IO error happened while reading the SSL
+   *                                  configuration.
+   */
+  public void init() throws GeneralSecurityException, IOException {
+    keystoresFactory.init(mode);
+    context = SSLContext.getInstance("TLS");
+    context.init(keystoresFactory.getKeyManagers(),
+        keystoresFactory.getTrustManagers(), null);
+    context.getDefaultSSLParameters().setProtocols(enabledProtocols);
+    hostnameVerifier = getHostnameVerifier(conf);
+  }
+
+  private HostnameVerifier getHostnameVerifier(Configuration conf)
+      throws GeneralSecurityException, IOException {
+    return getHostnameVerifier(conf.get(SSL_HOSTNAME_VERIFIER_KEY, "DEFAULT").
+        trim().toUpperCase());
+  }
+
+  public static HostnameVerifier getHostnameVerifier(String verifier)
+      throws GeneralSecurityException, IOException {
+    HostnameVerifier hostnameVerifier;
+    if (verifier.equals("DEFAULT")) {
+      hostnameVerifier = SSLHostnameVerifier.DEFAULT;
+    } else if (verifier.equals("DEFAULT_AND_LOCALHOST")) {
+      hostnameVerifier = SSLHostnameVerifier.DEFAULT_AND_LOCALHOST;
+    } else if (verifier.equals("STRICT")) {
+      hostnameVerifier = SSLHostnameVerifier.STRICT;
+    } else if (verifier.equals("STRICT_IE6")) {
+      hostnameVerifier = SSLHostnameVerifier.STRICT_IE6;
+    } else if (verifier.equals("ALLOW_ALL")) {
+      hostnameVerifier = SSLHostnameVerifier.ALLOW_ALL;
+    } else {
+      throw new GeneralSecurityException("Invalid hostname verifier: " +
+          verifier);
+    }
+    return hostnameVerifier;
+  }
+
+  /**
+   * Releases any resources being used.
+   */
+  public void destroy() {
+    keystoresFactory.destroy();
+  }
+
+  /**
+   * Returns the SSLFactory KeyStoresFactory instance.
+   *
+   * @return the SSLFactory KeyStoresFactory instance.
+   */
+  public KeyStoresFactory getKeystoresFactory() {
+    return keystoresFactory;
+  }
+
+
+  /**
+   * Returns a configured SSLSocketFactory.
+   *
+   * @return the configured SSLSocketFactory.
+   * @throws GeneralSecurityException thrown if the SSLSocketFactory could not
+   *                                  be initialized.
+   * @throws IOException              thrown if and IO error occurred while loading
+   *                                  the server keystore.
+   */
+  public SSLSocketFactory createSSLSocketFactory() throws GeneralSecurityException, IOException {
+    if (mode != Mode.CLIENT) {
+      throw new IllegalStateException("Factory is in CLIENT mode");
+    }
+    return context.getSocketFactory();
+  }
+
+  /**
+   * Returns the hostname verifier it should be used in HttpsURLConnections.
+   *
+   * @return the hostname verifier.
+   */
+  public HostnameVerifier getHostnameVerifier() {
+    if (mode != Mode.CLIENT) {
+      throw new IllegalStateException("Factory is in CLIENT mode");
+    }
+    return hostnameVerifier;
+  }
+
+
+
+  /**
+   * If the given {@link HttpURLConnection} is an {@link HttpsURLConnection}
+   * configures the connection with the {@link SSLSocketFactory} and
+   * {@link HostnameVerifier} of this SSLFactory, otherwise does nothing.
+   *
+   * @param conn the {@link HttpURLConnection} instance to configure.
+   * @return the configured {@link HttpURLConnection} instance.
+   * @throws IOException if an IO error occurred.
+   */
+  @Override
+  public HttpURLConnection configure(HttpURLConnection conn) throws IOException {
+    if (conn instanceof HttpsURLConnection) {
+      HttpsURLConnection sslConn = (HttpsURLConnection) conn;
+      try {
+        sslConn.setSSLSocketFactory(createSSLSocketFactory());
+      } catch (GeneralSecurityException ex) {
+        throw new IOException(ex);
+      }
+      sslConn.setHostnameVerifier(getHostnameVerifier());
+      conn = sslConn;
+    }
+    return conn;
+  }
+
+  /**
+   * Set ssl context for {@link com.ning.http.client.AsyncHttpClientConfig.Builder}
+   *
+   * @param asyncNingBuilder {@link com.ning.http.client.AsyncHttpClientConfig.Builder} instance to
+   *                configure.
+   * @throws IOException if an IO error occurred.
+   */
+  public void configure(AsyncHttpClientConfig.Builder asyncNingBuilder) throws IOException {
+    if (asyncNingBuilder != null) {
+      asyncNingBuilder.setSSLContext(context);
+      asyncNingBuilder.setHostnameVerifier(getHostnameVerifier());
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/AsyncHttpConnection.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/AsyncHttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/AsyncHttpConnection.java
new file mode 100644
index 0000000..f46939d
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/AsyncHttpConnection.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.http.async.netty;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.ning.http.client.AsyncHttpClient;
+import com.ning.http.client.AsyncHttpClientConfig;
+import com.ning.http.client.ListenableFuture;
+import com.ning.http.client.Request;
+import com.ning.http.client.RequestBuilder;
+import com.ning.http.client.Response;
+import org.apache.commons.io.IOUtils;
+import org.apache.tez.http.BaseHttpConnection;
+import org.apache.tez.http.HttpConnectionParams;
+import org.apache.tez.http.SSLFactory;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+public class AsyncHttpConnection extends BaseHttpConnection {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AsyncHttpConnection.class);
+
+  private final JobTokenSecretManager jobTokenSecretMgr;
+  private String encHash;
+  private String msgToEncode;
+
+  private final HttpConnectionParams httpConnParams;
+  private final Stopwatch stopWatch;
+  private final URL url;
+
+  private static volatile AsyncHttpClient httpAsyncClient;
+
+  private final TezBodyDeferringAsyncHandler handler;
+  private final PipedOutputStream pos; //handler would write to this as and when it receives chunks
+  private final PipedInputStream pis; //connected to pos, which can be used by fetchers
+
+  private Response response;
+  private ListenableFuture<Response> responseFuture;
+  private TezBodyDeferringAsyncHandler.BodyDeferringInputStream dis;
+
+  private void initClient(HttpConnectionParams httpConnParams) throws IOException {
+    if (httpAsyncClient != null) {
+      return;
+    }
+
+    if (httpAsyncClient == null) {
+      synchronized (AsyncHttpConnection.class) {
+        if (httpAsyncClient == null) {
+          LOG.info("Initializing AsyncClient (TezBodyDeferringAsyncHandler)");
+          AsyncHttpClientConfig.Builder builder = new AsyncHttpClientConfig.Builder();
+          if (httpConnParams.isSslShuffle()) {
+            //Configure SSL
+            SSLFactory sslFactory = httpConnParams.getSslFactory();
+            Preconditions.checkArgument(sslFactory != null, "SSLFactory can not be null");
+            sslFactory.configure(builder);
+          }
+
+          /**
+           * TODO : following settings need fine tuning.
+           * Change following config to accept common thread pool later.
+           * Change max connections based on the total inputs (ordered & unordered). Need to tune
+           * setMaxConnections & addRequestFilter.
+           */
+          builder
+              .setAllowPoolingConnection(httpConnParams.isKeepAlive())
+              .setAllowSslConnectionPool(httpConnParams.isKeepAlive())
+              .setCompressionEnabled(false)
+              //.setExecutorService(applicationThreadPool)
+              //.addRequestFilter(new ThrottleRequestFilter())
+              .setMaximumConnectionsPerHost(1)
+              .setConnectionTimeoutInMs(httpConnParams.getConnectionTimeout())
+              .setRequestTimeoutInMs(httpConnParams.getReadTimeout())
+              .setUseRawUrl(true)
+              .build();
+            httpAsyncClient = new AsyncHttpClient(builder.build());
+        }
+      }
+    }
+  }
+
+  public AsyncHttpConnection(URL url, HttpConnectionParams connParams,
+      String logIdentifier, JobTokenSecretManager jobTokenSecretManager) throws IOException {
+    this.jobTokenSecretMgr = jobTokenSecretManager;
+    this.httpConnParams = connParams;
+    this.url = url;
+    this.stopWatch = new Stopwatch();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("MapOutput URL :" + url.toString());
+    }
+
+    initClient(httpConnParams);
+    pos = new PipedOutputStream();
+    pis = new PipedInputStream(pos, httpConnParams.getBufferSize());
+    handler = new TezBodyDeferringAsyncHandler(pos, url, UNIT_CONNECT_TIMEOUT);
+  }
+
+  @VisibleForTesting
+  public void computeEncHash() throws IOException {
+    // generate hash of the url
+    msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
+    encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecretMgr);
+  }
+
+  /**
+   * Connect to source
+   *
+   * @return true if connection was successful
+   * false if connection was previously cleaned up
+   * @throws IOException upon connection failure
+   */
+  public boolean connect() throws IOException, InterruptedException {
+    computeEncHash();
+
+    RequestBuilder rb = new RequestBuilder();
+    rb.setHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
+    rb.setHeader(ShuffleHeader.HTTP_HEADER_NAME, ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+    rb.setHeader(ShuffleHeader.HTTP_HEADER_VERSION, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+    Request request = rb.setUrl(url.toString()).build();
+
+    //for debugging
+    LOG.debug("Request url={}, encHash={}, id={}", url, encHash);
+
+    try {
+      //Blocks calling thread until it receives headers, but have the option to defer response body
+      responseFuture = httpAsyncClient.executeRequest(request, handler);
+
+      //BodyDeferringAsyncHandler would automatically manage producer and consumer frequency mismatch
+      dis = new TezBodyDeferringAsyncHandler.BodyDeferringInputStream(responseFuture, handler, pis);
+
+      response = dis.getAsapResponse();
+      if (response == null) {
+        throw new IOException("Response is null");
+      }
+    } catch(IOException e) {
+      throw e;
+    }
+
+    //verify the response
+    int rc = response.getStatusCode();
+    if (rc != HttpURLConnection.HTTP_OK) {
+      LOG.debug("Request url={}, id={}", response.getUri());
+      throw new IOException("Got invalid response code " + rc + " from "
+          + url + ": " + response.getStatusText());
+    }
+    return true;
+  }
+
+  public void validate() throws IOException {
+    // get the shuffle version
+    if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME
+        .equals(response.getHeader(ShuffleHeader.HTTP_HEADER_NAME))
+        || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION
+        .equals(response.getHeader(ShuffleHeader.HTTP_HEADER_VERSION))) {
+      throw new IOException("Incompatible shuffle response version");
+    }
+
+    // get the replyHash which is HMac of the encHash we sent to the server
+    String replyHash = response.getHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
+    if (replyHash == null) {
+      throw new IOException("security validation of TT Map output failed");
+    }
+    LOG.debug("url={};encHash={};replyHash={}", msgToEncode, encHash, replyHash);
+
+    // verify that replyHash is HMac of encHash
+    SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecretMgr);
+    //Following log statement will be used by tez-tool perf-analyzer for mapping attempt to NM host
+    LOG.info("for url={} sent hash and receievd reply {} ms", url, stopWatch.elapsedMillis());
+  }
+
+  /**
+   * Get the inputstream from the connection
+   *
+   * @return DataInputStream
+   * @throws IOException
+   */
+  public DataInputStream getInputStream() throws IOException, InterruptedException {
+    Preconditions.checkState(response != null, "Response can not be null");
+    return new DataInputStream(dis);
+  }
+
+  @VisibleForTesting
+  public void close() {
+    httpAsyncClient.close();
+    httpAsyncClient = null;
+  }
+  /**
+   * Cleanup the connection.
+   *
+   * @param disconnect
+   * @throws IOException
+   */
+  public void cleanup(boolean disconnect) throws IOException {
+    // Netty internally has its own connection management and takes care of it.
+    if (response != null) {
+      dis.close();
+    }
+    IOUtils.closeQuietly(pos);
+    IOUtils.closeQuietly(pis);
+    response = null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/TezBodyDeferringAsyncHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/TezBodyDeferringAsyncHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/TezBodyDeferringAsyncHandler.java
new file mode 100644
index 0000000..8e83eac
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/TezBodyDeferringAsyncHandler.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.http.async.netty;
+
+import com.ning.http.client.AsyncHandler;
+import com.ning.http.client.HttpResponseBodyPart;
+import com.ning.http.client.HttpResponseHeaders;
+import com.ning.http.client.HttpResponseStatus;
+import com.ning.http.client.Response;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Same as {@link com.ning.http.client.BodyDeferringAsyncHandler} with additional checks handle
+ * errors in getResponse(). Based on testing, at very high load {@link com.ning.http.client
+ * .BodyDeferringAsyncHandler} gets to hung state in getResponse() as it tries to wait
+ * indefinitely for headers to arrive.  This class tries to fix the problem by waiting only for
+ * the connection timeout.
+ */
+@InterfaceAudience.Private
+class TezBodyDeferringAsyncHandler implements AsyncHandler<Response> {
+  private static final Logger LOG = LoggerFactory.getLogger(TezBodyDeferringAsyncHandler.class);
+
+  private final Response.ResponseBuilder responseBuilder = new Response.ResponseBuilder();
+  private final CountDownLatch headersArrived = new CountDownLatch(1);
+  private final OutputStream output;
+
+  private volatile boolean responseSet;
+  private volatile boolean statusReceived;
+  private volatile Response response;
+  private volatile Throwable throwable;
+
+  private final Semaphore semaphore = new Semaphore(1);
+
+  private final URL url;
+  private final int headerReceiveTimeout;
+
+  TezBodyDeferringAsyncHandler(final OutputStream os, final URL url, final int timeout) {
+    this.output = os;
+    this.responseSet = false;
+    this.url = url;
+    this.headerReceiveTimeout = timeout;
+  }
+
+  public void onThrowable(Throwable t) {
+    this.throwable = t;
+    // Counting down to handle error cases too.
+    // In "premature exceptions" cases, the onBodyPartReceived() and
+    // onCompleted()
+    // methods will never be invoked, leaving caller of getResponse() method
+    // blocked forever.
+    try {
+      semaphore.acquire();
+    } catch (InterruptedException e) {
+      // Ignore
+    } finally {
+      LOG.error("Error in asyncHandler ", t);
+      headersArrived.countDown();
+      semaphore.release();
+    }
+    try {
+      closeOut();
+    } catch (IOException e) {
+      // ignore
+    }
+  }
+
+  public AsyncHandler.STATE onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
+    responseBuilder.reset();
+    responseBuilder.accumulate(responseStatus);
+    statusReceived = true;
+    return AsyncHandler.STATE.CONTINUE;
+  }
+
+  public AsyncHandler.STATE onHeadersReceived(HttpResponseHeaders headers) throws Exception {
+    responseBuilder.accumulate(headers);
+    return AsyncHandler.STATE.CONTINUE;
+  }
+
+  public AsyncHandler.STATE onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
+    // body arrived, flush headers
+    if (!responseSet) {
+      response = responseBuilder.build();
+      responseSet = true;
+      headersArrived.countDown();
+    }
+    bodyPart.writeTo(output);
+    return AsyncHandler.STATE.CONTINUE;
+  }
+
+  protected void closeOut() throws IOException {
+    try {
+      output.flush();
+    } finally {
+      output.close();
+    }
+  }
+
+  public Response onCompleted() throws IOException {
+    if (!responseSet) {
+      response = responseBuilder.build();
+      responseSet = true;
+    }
+    // Counting down to handle error cases too.
+    // In "normal" cases, latch is already at 0 here
+    // But in other cases, for example when because of some error
+    // onBodyPartReceived() is never called, the caller
+    // of getResponse() would remain blocked infinitely.
+    // By contract, onCompleted() is always invoked, even in case of errors
+    headersArrived.countDown();
+    closeOut();
+    try {
+      semaphore.acquire();
+      if (throwable != null) {
+        IOException ioe = new IOException(throwable.getMessage());
+        ioe.initCause(throwable);
+        throw ioe;
+      } else {
+        // sending out current response
+        return responseBuilder.build();
+      }
+    } catch (InterruptedException e) {
+      return null;
+    } finally {
+      semaphore.release();
+    }
+  }
+
+  /**
+   * This method -- unlike Future<Reponse>.get() -- will block only as long,
+   * as headers arrive. This is useful for large transfers, to examine headers
+   * ASAP, and defer body streaming to it's fine destination and prevent
+   * unneeded bandwidth consumption. The response here will contain the very
+   * 1st response from server, so status code and headers, but it might be
+   * incomplete in case of broken servers sending trailing headers. In that
+   * case, the "usual" Future<Response>.get() method will return complete
+   * headers, but multiple invocations of getResponse() will always return the
+   * 1st cached, probably incomplete one. Note: the response returned by this
+   * method will contain everything <em>except</em> the response body itself,
+   * so invoking any method like Response.getResponseBodyXXX() will result in
+   * error! Also, please not that this method might return <code>null</code>
+   * in case of some errors.
+   *
+   * @return a {@link Response}
+   * @throws InterruptedException
+   */
+  public Response getResponse() throws InterruptedException, IOException {
+    /**
+     * Based on testing, it is possible that it is in connected state, but the headers are not
+     * received. Instead of waiting forever, close after timeout for next retry.
+     */
+    boolean result = headersArrived.await(headerReceiveTimeout, TimeUnit.MILLISECONDS);
+    if (!result) {
+      LOG.error("Breaking after timeout={}, url={}, responseSet={} statusReceived={}",
+          headerReceiveTimeout, url, responseSet, statusReceived);
+      return null;
+    }
+    try {
+      semaphore.acquire();
+      if (throwable != null) {
+        IOException ioe = new IOException(throwable.getMessage());
+        ioe.initCause(throwable);
+        throw ioe;
+      } else {
+        return response;
+      }
+    } finally {
+      semaphore.release();
+    }
+  }
+
+  /**
+   * A simple helper class that is used to perform automatic "join" for async
+   * download and the error checking of the Future of the request.
+   */
+  static class BodyDeferringInputStream extends FilterInputStream {
+    private final Future<Response> future;
+    private final TezBodyDeferringAsyncHandler bdah;
+
+    public BodyDeferringInputStream(final Future<Response> future,
+        final TezBodyDeferringAsyncHandler bdah, final InputStream in) {
+      super(in);
+      this.future = future;
+      this.bdah = bdah;
+    }
+
+    /**
+     * Closes the input stream, and "joins" (wait for complete execution
+     * together with potential exception thrown) of the async request.
+     */
+    public void close() throws IOException {
+      // close
+      super.close();
+      // "join" async request
+      try {
+        getLastResponse();
+      } catch (Exception e) {
+        IOException ioe = new IOException(e.getMessage());
+        ioe.initCause(e);
+        throw ioe;
+      }
+    }
+
+    /**
+     * Delegates to {@link TezBodyDeferringAsyncHandler#getResponse()}. Will
+     * blocks as long as headers arrives only. Might return
+     * <code>null</code>. See
+     * {@link TezBodyDeferringAsyncHandler#getResponse()} method for details.
+     *
+     * @return a {@link Response}
+     * @throws InterruptedException
+     */
+    public Response getAsapResponse() throws InterruptedException, IOException {
+      return bdah.getResponse();
+    }
+
+    /**
+     * Delegates to <code>Future<Response>#get()</code> method. Will block
+     * as long as complete response arrives.
+     *
+     * @return a {@link Response}
+     * @throws InterruptedException
+     * @throws java.util.concurrent.ExecutionException
+     */
+    public Response getLastResponse() throws InterruptedException, ExecutionException {
+      return future.get();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index 3d9a701..fc94347 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -210,6 +210,9 @@ public class TezRuntimeConfiguration {
   public final static int TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE_DEFAULT =
       8 * 1024;
 
+  public static final String TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP = TEZ_RUNTIME_PREFIX +
+      "shuffle.use.async.http";
+  public static final boolean TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP_DEFAULT = false;
 
   public static final String TEZ_RUNTIME_SHUFFLE_ENABLE_SSL = TEZ_RUNTIME_PREFIX +
       "shuffle.ssl.enable";
@@ -352,6 +355,7 @@ public class TezRuntimeConfiguration {
     tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES);
     tezRuntimeKeys.add(TEZ_RUNTIME_PARTITIONER_CLASS);
     tezRuntimeKeys.add(TEZ_RUNTIME_COMBINER_CLASS);
+    tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP);
     tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
     tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT);
     tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE);

http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 61e0151..e7c98b7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -38,6 +38,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.tez.http.BaseHttpConnection;
+import org.apache.tez.http.HttpConnectionParams;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.commons.lang.StringUtils;
@@ -59,7 +61,6 @@ import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
 import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
 import org.apache.tez.runtime.library.exceptions.FetcherReadTimeoutException;
 import org.apache.tez.runtime.library.common.shuffle.FetchedInput.Type;
-import org.apache.tez.runtime.library.common.shuffle.HttpConnection.HttpConnectionParams;
 
 import com.google.common.base.Preconditions;
 
@@ -108,7 +109,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
   private URL url;
   private volatile DataInputStream input;
   
-  private HttpConnection httpConnection;
+  BaseHttpConnection httpConnection;
   private HttpConnectionParams httpConnectionParams;
 
   private final boolean localDiskFetchEnabled;
@@ -121,6 +122,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
   // Initiative value is 0, which means it hasn't retried yet.
   private long retryStartTime = 0;
 
+  private final boolean asyncHttp;
+
   private final boolean isDebugEnabled = LOG.isDebugEnabled();
 
   private Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params,
@@ -132,7 +135,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
       boolean localDiskFetchEnabled,
       boolean sharedFetchEnabled,
       String localHostname,
-      int shufflePort) {
+      int shufflePort, boolean asyncHttp) {
+    this.asyncHttp = asyncHttp;
     this.fetcherCallback = fetcherCallback;
     this.inputManager = inputManager;
     this.jobTokenSecretMgr = jobTokenSecretManager;
@@ -402,13 +406,17 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
   private HostFetchResult setupConnection(List<InputAttemptIdentifier> attempts) {
     try {
       StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host,
-          port, partition, appId.toString(), httpConnectionParams.isSSLShuffleEnabled());
+          port, partition, appId.toString(), httpConnectionParams.isSslShuffle());
       this.url = ShuffleUtils.constructInputURL(baseURI.toString(), attempts,
-          httpConnectionParams.getKeepAlive());
+          httpConnectionParams.isKeepAlive());
 
-      httpConnection = new HttpConnection(url, httpConnectionParams, logIdentifier, jobTokenSecretMgr);
+      httpConnection = ShuffleUtils.getHttpConnection(asyncHttp, url, httpConnectionParams,
+          logIdentifier, jobTokenSecretMgr);
       httpConnection.connect();
-    } catch (IOException e) {
+    } catch (IOException | InterruptedException e) {
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
       // ioErrs.increment(1);
       // If connect did not succeed, just mark all the maps as failed,
       // indirectly penalizing the host
@@ -449,6 +457,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
         return new HostFetchResult(new FetchResult(host, port, partition, remaining),
             new InputAttemptIdentifier[] { firstAttempt }, false);
       }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt(); //reset status
+      return null;
     }
     return null;
   }
@@ -903,10 +914,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
     public FetcherBuilder(FetcherCallback fetcherCallback,
         HttpConnectionParams params, FetchedInputAllocator inputManager,
         ApplicationId appId, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
-        Configuration conf, boolean localDiskFetchEnabled, String localHostname, int shufflePort) {
+        Configuration conf, boolean localDiskFetchEnabled, String localHostname, int shufflePort,
+        boolean asyncHttp) {
       this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
           jobTokenSecretMgr, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled,
-          false, localHostname, shufflePort);
+          false, localHostname, shufflePort, asyncHttp);
     }
 
     public FetcherBuilder(FetcherCallback fetcherCallback,
@@ -915,10 +927,10 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
         Configuration conf, RawLocalFileSystem localFs,
         LocalDirAllocator localDirAllocator, Path lockPath,
         boolean localDiskFetchEnabled, boolean sharedFetchEnabled,
-        String localHostname, int shufflePort) {
+        String localHostname, int shufflePort, boolean asyncHttp) {
       this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
           jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
-          lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort);
+          lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp);
     }
 
     public FetcherBuilder setHttpConnectionParameters(HttpConnectionParams httpParams) {

http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
deleted file mode 100644
index 7827f0a..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
+++ /dev/null
@@ -1,428 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.library.common.shuffle;
-
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.security.GeneralSecurityException;
-import java.util.concurrent.TimeUnit;
-
-import javax.net.ssl.HttpsURLConnection;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.security.ssl.SSLFactory;
-import org.apache.tez.common.security.JobTokenSecretManager;
-import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
-import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
-
-import com.google.common.base.Stopwatch;
-
-/**
- * HttpConnection which can be used for Unordered / Ordered shuffle.
- */
-public class HttpConnection {
-
-  private static final Logger LOG = LoggerFactory.getLogger(HttpConnection.class);
-
-  /** Basic/unit connection timeout (in milliseconds) */
-  private final static int UNIT_CONNECT_TIMEOUT = 60 * 1000;
-
-  private URL url;
-  private final String logIdentifier;
-
-  //Shared by many threads
-  private static SSLFactory sslFactory;
-
-  @VisibleForTesting
-  protected volatile HttpURLConnection connection;
-  private volatile DataInputStream input;
-
-  private volatile boolean connectionSucceeed;
-  private volatile boolean cleanup;
-
-  private final JobTokenSecretManager jobTokenSecretMgr;
-  private String encHash;
-  private String msgToEncode;
-
-  private final HttpConnectionParams httpConnParams;
-  private final Stopwatch stopWatch;
-
-  /**
-   * HttpConnection
-   * 
-   * @param url
-   * @param connParams
-   * @param logIdentifier
-   * @param jobTokenSecretManager
-   * @throws IOException
-   */
-  public HttpConnection(URL url, HttpConnectionParams connParams,
-      String logIdentifier, JobTokenSecretManager jobTokenSecretManager) throws IOException {
-    this.logIdentifier = logIdentifier;
-    this.jobTokenSecretMgr = jobTokenSecretManager;
-    this.httpConnParams = connParams;
-    this.url = url;
-    this.stopWatch = new Stopwatch();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("MapOutput URL :" + url.toString());
-    }
-  }
-
-  private void setupConnection() throws IOException {
-    connection = (HttpURLConnection) url.openConnection();
-    if (sslFactory != null && httpConnParams.sslShuffle) {
-      try {
-        ((HttpsURLConnection) connection).setSSLSocketFactory(sslFactory
-          .createSSLSocketFactory());
-        ((HttpsURLConnection) connection).setHostnameVerifier(sslFactory
-          .getHostnameVerifier());
-      } catch (GeneralSecurityException ex) {
-        throw new IOException(ex);
-      }
-    }
-    // generate hash of the url
-    msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
-    encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecretMgr);
-
-    // put url hash into http header
-    connection.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH,
-      encHash);
-    // set the read timeout
-    connection.setReadTimeout(httpConnParams.readTimeout);
-    // put shuffle version into http header
-    connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
-      ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
-    connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
-      ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
-  }
-
-  /**
-   * Connect to source
-   * 
-   * @return true if connection was successful
-   *         false if connection was previously cleaned up
-   * @throws IOException upon connection failure
-   */
-  public boolean connect() throws IOException {
-    return connect(httpConnParams.connectionTimeout);
-  }
-
-  /**
-   * Connect to source with specific timeout
-   * 
-   * @param connectionTimeout
-   * @return true if connection was successful
-   *         false if connection was previously cleaned up
-   * @throws IOException upon connection failure
-   */
-  public boolean connect(int connectionTimeout) throws IOException {
-    stopWatch.reset().start();
-    if (connection == null) {
-      setupConnection();
-    }
-    int unit = 0;
-    if (connectionTimeout < 0) {
-      throw new IOException("Invalid timeout " + "[timeout = "
-          + connectionTimeout + " ms]");
-    } else if (connectionTimeout > 0) {
-      unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout);
-    }
-    // set the connect timeout to the unit-connect-timeout
-    connection.setConnectTimeout(unit);
-    int connectionFailures = 0;
-    while (true) {
-      long connectStartTime = System.currentTimeMillis();
-      try {
-        connection.connect();
-        connectionSucceeed = true;
-        break;
-      } catch (IOException ioe) {
-        // Don't attempt another connect if already cleanedup.
-        connectionFailures++;
-        if (cleanup) {
-          LOG.info("Cleanup is set to true. Not attempting to"
-              + " connect again. Last exception was: ["
-              + ioe.getClass().getName() + ", " + ioe.getMessage() + "]");
-          return false;
-        }
-        // update the total remaining connect-timeout
-        connectionTimeout -= unit;
-        // throw an exception if we have waited for timeout amount of time
-        // note that the updated value if timeout is used here
-        if (connectionTimeout <= 0) {
-          throw new IOException(
-              "Failed to connect to " + url + ", #connectionFailures=" + connectionFailures, ioe);
-        }
-        long elapsed = System.currentTimeMillis() - connectStartTime;
-        if (elapsed < unit) {
-          try {
-            long sleepTime = unit - elapsed;
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Sleeping for " + sleepTime + " while establishing connection to " + url +
-                  ", since connectAttempt returned in " + elapsed + " ms");
-            }
-            Thread.sleep(sleepTime);
-          } catch (InterruptedException e) {
-            throw new IOException(
-                "Connection establishment sleep interrupted, #connectionFailures=" +
-                    connectionFailures, e);
-          }
-        }
-
-        // reset the connect timeout for the last try
-        if (connectionTimeout < unit) {
-          unit = connectionTimeout;
-          // reset the connect time out for the final connect
-          connection.setConnectTimeout(unit);
-        }
-
-      }
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Time taken to connect to " + url.toString() +
-        " " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms; connectionFailures="+ connectionFailures);
-    }
-    return true;
-  }
-
-  public void validate() throws IOException {
-    stopWatch.reset().start();
-    int rc = connection.getResponseCode();
-    if (rc != HttpURLConnection.HTTP_OK) {
-      throw new IOException("Got invalid response code " + rc + " from " + url
-          + ": " + connection.getResponseMessage());
-    }
-    // get the shuffle version
-    if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(connection
-      .getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
-        || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(connection
-          .getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))) {
-      throw new IOException("Incompatible shuffle response version");
-    }
-    // get the replyHash which is HMac of the encHash we sent to the server
-    String replyHash =
-        connection
-          .getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
-    if (replyHash == null) {
-      throw new IOException("security validation of TT Map output failed");
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("url=" + msgToEncode + ";encHash=" + encHash + ";replyHash="
-          + replyHash);
-    }
-    // verify that replyHash is HMac of encHash
-    SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecretMgr);
-    //Following log statement will be used by tez-tool perf-analyzer for mapping attempt to NM host
-    LOG.info("for url=" + url +
-      " sent hash and receievd reply " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms");
-  }
-
-  /**
-   * Get the inputstream from the connection
-   * 
-   * @return DataInputStream
-   * @throws IOException
-   */
-  public DataInputStream getInputStream() throws IOException {
-    stopWatch.reset().start();
-    if (connectionSucceeed) {
-      input =
-          new DataInputStream(new BufferedInputStream(
-            connection.getInputStream(), httpConnParams.bufferSize));
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Time taken to getInputStream (connect) " + url +
-        " " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms");
-    }
-    return input;
-  }
-
-  /**
-   * Cleanup the connection.
-   * 
-   * @param disconnect
-   *          Close the connection if this is true; otherwise respect keepalive
-   * @throws IOException
-   */
-  public void cleanup(boolean disconnect) throws IOException {
-    cleanup = true;
-    stopWatch.reset().start();
-    try {
-      if (input != null) {
-        LOG.info("Closing input on " + logIdentifier);
-        input.close();
-        input = null;
-      }
-      if (httpConnParams.keepAlive && connectionSucceeed) {
-        // Refer:
-        // http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html
-        readErrorStream(connection.getErrorStream());
-      }
-      if (connection != null && (disconnect || !httpConnParams.keepAlive)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Closing connection on " + logIdentifier);
-        }
-        connection.disconnect();
-        connection = null;
-      }
-    } catch (IOException e) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Exception while shutting down fetcher " + logIdentifier, e);
-      } else {
-        LOG.info("Exception while shutting down fetcher " + logIdentifier
-            + ": " + e.getMessage());
-      }
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Time taken to cleanup connection to " + url +
-        " " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms");
-    }
-  }
-
-  /**
-   * Cleanup the error stream if any, for keepAlive connections
-   * 
-   * @param errorStream
-   */
-  private void readErrorStream(InputStream errorStream) {
-    if (errorStream == null) {
-      return;
-    }
-    try {
-      DataOutputBuffer errorBuffer = new DataOutputBuffer();
-      IOUtils.copyBytes(errorStream, errorBuffer, 4096);
-      IOUtils.closeStream(errorBuffer);
-      IOUtils.closeStream(errorStream);
-    } catch (IOException ioe) {
-      // ignore
-    }
-  }
-
-  public static class HttpConnectionParams {
-    private boolean keepAlive;
-    private int keepAliveMaxConnections;
-    private int connectionTimeout;
-    private int readTimeout;
-    private int bufferSize;
-    private boolean sslShuffle;
-
-    public boolean getKeepAlive() {
-      return keepAlive;
-    }
-
-    public int getKeepAliveMaxConnections() {
-      return keepAliveMaxConnections;
-    }
-
-    public int getConnectionTimeout() {
-      return connectionTimeout;
-    }
-
-    public int getReadTimeout() {
-      return readTimeout;
-    }
-
-    public void setReadTimeout(int readTimeout) {
-      this.readTimeout = readTimeout;
-    }
-
-    public int getBufferSize() {
-      return bufferSize;
-    }
-
-    public boolean isSSLShuffleEnabled() {
-      return sslShuffle;
-    }
-
-    public String toString() {
-      StringBuilder sb = new StringBuilder();
-      sb.append("keepAlive=").append(keepAlive).append(", ");
-      sb.append("keepAliveMaxConnections=").append(keepAliveMaxConnections).append(", ");
-      sb.append("connectionTimeout=").append(connectionTimeout).append(", ");
-      sb.append("readTimeout=").append(readTimeout).append(", ");
-      sb.append("bufferSize=").append(bufferSize).append(", ");
-      sb.append("sslShuffle=").append(sslShuffle);
-      return sb.toString();
-    }
-  }
-
-  public static class HttpConnectionParamsBuilder {
-    private HttpConnectionParams params;
-
-    public HttpConnectionParamsBuilder() {
-      params = new HttpConnectionParams();
-    }
-
-    public HttpConnectionParamsBuilder setKeepAlive(boolean keepAlive,
-        int keepAliveMaxConnections) {
-      params.keepAlive = keepAlive;
-      params.keepAliveMaxConnections = keepAliveMaxConnections;
-      return this;
-    }
-
-    public HttpConnectionParamsBuilder setTimeout(int connectionTimeout,
-        int readTimeout) {
-      params.connectionTimeout = connectionTimeout;
-      params.readTimeout = readTimeout;
-      return this;
-    }
-
-    public synchronized HttpConnectionParamsBuilder setSSL(boolean sslEnabled,
-        Configuration conf) {
-      synchronized (HttpConnectionParamsBuilder.class) {
-        params.sslShuffle = sslEnabled;
-        if (sslEnabled) {
-          //Create sslFactory if it is null or if it was destroyed earlier
-          if (sslFactory == null || sslFactory.getKeystoresFactory()
-              .getTrustManagers() == null) {
-            LOG.info("Initializing SSL factory in HttpConnection");
-            sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
-            try {
-              sslFactory.init();
-            } catch (Exception ex) {
-              sslFactory.destroy();
-              sslFactory = null;
-              throw new RuntimeException(ex);
-            }
-          }
-        }
-      }
-      return this;
-    }
-
-    public HttpConnectionParamsBuilder setBufferSize(int bufferSize) {
-      params.bufferSize = bufferSize;
-      return this;
-    }
-
-    public HttpConnectionParams build() {
-      return params;
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 46489ed..8b6e847 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -33,13 +33,16 @@ import javax.crypto.SecretKey;
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.http.BaseHttpConnection;
+import org.apache.tez.http.HttpConnection;
+import org.apache.tez.http.HttpConnectionParams;
+import org.apache.tez.http.SSLFactory;
+import org.apache.tez.http.async.netty.AsyncHttpConnection;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.security.token.Token;
 import org.apache.tez.common.TezCommonUtils;
@@ -54,8 +57,6 @@ import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.sort.impl.IFile;
-import org.apache.tez.runtime.library.common.shuffle.HttpConnection.HttpConnectionParams;
-import org.apache.tez.runtime.library.common.shuffle.HttpConnection.HttpConnectionParamsBuilder;
 import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
 import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
@@ -66,6 +67,9 @@ public class ShuffleUtils {
   private static final Logger LOG = LoggerFactory.getLogger(ShuffleUtils.class);
   public static final String SHUFFLE_HANDLER_SERVICE_ID = "mapreduce_shuffle";
 
+  //Shared by multiple threads
+  private static volatile SSLFactory sslFactory;
+
   static final ThreadLocal<DecimalFormat> MBPS_FORMAT =
       new ThreadLocal<DecimalFormat>() {
         @Override
@@ -213,45 +217,15 @@ public class ShuffleUtils {
     return new URL(url.toString());
   }
 
-  public static HttpConnectionParams constructHttpShuffleConnectionParams(
-      Configuration conf) {
-    HttpConnectionParamsBuilder builder = new HttpConnectionParamsBuilder();
-
-    int connectionTimeout =
-        conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT,
-          TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_STALLED_COPY_TIMEOUT_DEFAULT);
-
-    int readTimeout =
-        conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT,
-          TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT_DEFAULT);
-
-    int bufferSize =
-        conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE,
-          TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE_DEFAULT);
-
-    boolean keepAlive =
-        conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED,
-          TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED_DEFAULT);
-    int keepAliveMaxConnections =
-        conf.getInt(
-          TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS,
-          TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS_DEFAULT);
-    if (keepAlive) {
-      System.setProperty("sun.net.http.errorstream.enableBuffering", "true");
-      System.setProperty("http.maxConnections",
-        String.valueOf(keepAliveMaxConnections));
-      LOG.info("Set keepAlive max connections: " + keepAliveMaxConnections);
+  public static BaseHttpConnection getHttpConnection(boolean asyncHttp, URL url,
+      HttpConnectionParams params, String logIdentifier, JobTokenSecretManager jobTokenSecretManager)
+      throws IOException {
+    if (asyncHttp) {
+      //TODO: support other async packages? httpclient-async?
+      return new AsyncHttpConnection(url, params, logIdentifier, jobTokenSecretManager);
+    } else {
+      return new HttpConnection(url, params, logIdentifier, jobTokenSecretManager);
     }
-
-    builder.setTimeout(connectionTimeout, readTimeout)
-        .setBufferSize(bufferSize)
-        .setKeepAlive(keepAlive, keepAliveMaxConnections);
-
-    boolean sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
-      TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT);
-    builder.setSSL(sslShuffle, conf);
-
-    return builder.build();
   }
 
   public static String stringify(DataMovementEventPayloadProto dmProto) {
@@ -473,5 +447,62 @@ public class ShuffleUtils {
             ", EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
             MBPS_FORMAT.get().format(rate) + " MB/s");
   }
+
+  /**
+   * Build {@link org.apache.tez.http.HttpConnectionParams} from configuration
+   *
+   * @param conf
+   * @return HttpConnectionParams
+   */
+  public static HttpConnectionParams getHttpConnectionParams(Configuration conf) {
+    int connectionTimeout =
+        conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT,
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_STALLED_COPY_TIMEOUT_DEFAULT);
+
+    int readTimeout = conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT,
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT_DEFAULT);
+
+    int bufferSize = conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE,
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE_DEFAULT);
+
+    boolean keepAlive = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED,
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED_DEFAULT);
+
+    int keepAliveMaxConnections = conf.getInt(
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS,
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS_DEFAULT);
+
+    if (keepAlive) {
+      System.setProperty("sun.net.http.errorstream.enableBuffering", "true");
+      System.setProperty("http.maxConnections", String.valueOf(keepAliveMaxConnections));
+    }
+
+    boolean sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT);
+
+    if (sslShuffle) {
+      if (sslFactory == null) {
+        synchronized (HttpConnectionParams.class) {
+          //Create sslFactory if it is null or if it was destroyed earlier
+          if (sslFactory == null || sslFactory.getKeystoresFactory().getTrustManagers() == null) {
+            sslFactory =
+                new SSLFactory(org.apache.hadoop.security.ssl.SSLFactory.Mode.CLIENT, conf);
+            try {
+              sslFactory.init();
+            } catch (Exception ex) {
+              sslFactory.destroy();
+              sslFactory = null;
+              throw new RuntimeException(ex);
+            }
+          }
+        }
+      }
+    }
+
+    HttpConnectionParams httpConnParams = new HttpConnectionParams(keepAlive,
+        keepAliveMaxConnections, connectionTimeout, readTimeout, bufferSize, sslShuffle,
+        sslFactory);
+    return httpConnParams;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index f354920..b7c0742 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -26,7 +26,6 @@ import java.text.DecimalFormat;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -46,7 +45,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import javax.crypto.SecretKey;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
+import org.apache.tez.http.HttpConnectionParams;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -55,7 +54,6 @@ import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtilsInternal;
@@ -76,7 +74,6 @@ import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
 import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
 import org.apache.tez.runtime.library.common.shuffle.Fetcher;
 import org.apache.tez.runtime.library.common.shuffle.FetcherCallback;
-import org.apache.tez.runtime.library.common.shuffle.HttpConnection.HttpConnectionParams;
 import org.apache.tez.runtime.library.common.shuffle.InputHost;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.common.shuffle.FetchedInput.Type;
@@ -132,6 +129,7 @@ public class ShuffleManager implements FetcherCallback {
   private final Condition wakeLoop = lock.newCondition();
   
   private final int numFetchers;
+  private final boolean asyncHttp;
   
   // Parameters required by Fetchers
   private final JobTokenSecretManager jobTokenSecretMgr;
@@ -241,8 +239,8 @@ public class ShuffleManager implements FetcherCallback {
         .getJobTokenSecretFromTokenBytes(inputContext
             .getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
     this.jobTokenSecretMgr = new JobTokenSecretManager(shuffleSecret);
-    httpConnectionParams =
-        ShuffleUtils.constructHttpShuffleConnectionParams(conf);
+    this.asyncHttp = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP, false);
+    httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
 
     this.localFs = (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw();
 
@@ -398,7 +396,7 @@ public class ShuffleManager implements FetcherCallback {
       httpConnectionParams, inputManager, inputContext.getApplicationId(),
         jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
         lockDisk, localDiskFetchEnabled, sharedFetchEnabled,
-        localhostName, shufflePort);
+        localhostName, shufflePort, asyncHttp);
 
     if (codec != null) {
       fetcherBuilder.setCompressionParameters(codec);


Mime
View raw message