hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject hadoop git commit: MAPREDUCE-6518. Set SO_KEEPALIVE on shuffle connections. Contributed by Chang Li (cherry picked from commit ee9b80acf84718b952cbb82380cd0a4dbe22dc79)
Date Wed, 21 Oct 2015 21:47:16 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 7d3d0d605 -> 0dc69afdc


MAPREDUCE-6518. Set SO_KEEPALIVE on shuffle connections. Contributed by Chang Li
(cherry picked from commit ee9b80acf84718b952cbb82380cd0a4dbe22dc79)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0dc69afd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0dc69afd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0dc69afd

Branch: refs/heads/branch-2.7
Commit: 0dc69afdc7b89706d8318dad55a068a7ff122cee
Parents: 7d3d0d6
Author: Jason Lowe <jlowe@apache.org>
Authored: Wed Oct 21 21:43:42 2015 +0000
Committer: Jason Lowe <jlowe@apache.org>
Committed: Wed Oct 21 21:46:59 2015 +0000

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  3 +
 .../apache/hadoop/mapred/ShuffleHandler.java    |  1 +
 .../hadoop/mapred/TestShuffleHandler.java       | 59 ++++++++++++++++++++
 3 files changed, 63 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0dc69afd/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index f027037..42c64fa 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -45,6 +45,9 @@ Release 2.7.2 - UNRELEASED
     TaskAttemptImpl#sendJHStartEventForAssignedFailTask (Bibin A Chundatt via
     jlowe)
 
+    MAPREDUCE-6518. Set SO_KEEPALIVE on shuffle connections (Chang Li via
+    jlowe)
+
 Release 2.7.1 - 2015-07-06 
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0dc69afd/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
index bfb4c5e..e5fa57a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
@@ -493,6 +493,7 @@ public class ShuffleHandler extends AuxiliaryService {
     } catch (Exception ex) {
       throw new RuntimeException(ex);
     }
+    bootstrap.setOption("child.keepAlive", true);
     bootstrap.setPipelineFactory(pipelineFact);
     port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
     Channel ch = bootstrap.bind(new InetSocketAddress(port));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0dc69afd/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
index 5cb6050..ff8920f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
@@ -80,6 +80,8 @@ import org.apache.hadoop.yarn.server.records.Version;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.socket.SocketChannel;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.AbstractChannel;
 import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
@@ -140,6 +142,27 @@ public class TestShuffleHandler {
     }
   }
 
+  private static class MockShuffleHandler2 extends org.apache.hadoop.mapred.ShuffleHandler
{
+    boolean socketKeepAlive = false;
+
+    @Override
+    protected Shuffle getShuffle(final Configuration conf) {
+      return new Shuffle(conf) {
+        @Override
+        protected void verifyRequest(String appid, ChannelHandlerContext ctx,
+            HttpRequest request, HttpResponse response, URL requestUri)
+            throws IOException {
+          SocketChannel channel = (SocketChannel)(ctx.getChannel());
+          socketKeepAlive = channel.getConfig().isKeepAlive();
+        }
+      };
+    }
+
+    protected boolean isSocketKeepAlive() {
+      return socketKeepAlive;
+    }
+  }
+
   /**
    * Test the validation of ShuffleHandler's meta-data's serialization and
    * de-serialization.
@@ -423,6 +446,42 @@ public class TestShuffleHandler {
     input.close();
   }
 
+  @Test(timeout = 10000)
+  public void testSocketKeepAlive() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    conf.setBoolean(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, true);
+    // try setting to -ve keep alive timeout.
+    conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, -100);
+    HttpURLConnection conn = null;
+    MockShuffleHandler2 shuffleHandler = new MockShuffleHandler2();
+    try {
+      shuffleHandler.init(conf);
+      shuffleHandler.start();
+
+      String shuffleBaseURL = "http://127.0.0.1:"
+              + shuffleHandler.getConfig().get(
+                ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
+      URL url =
+          new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&reduce=1&"
+              + "map=attempt_12345_1_m_1_0");
+      conn = (HttpURLConnection) url.openConnection();
+      conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+      conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+      conn.connect();
+      conn.getInputStream();
+      Assert.assertTrue("socket should be set KEEP_ALIVE",
+          shuffleHandler.isSocketKeepAlive());
+    } finally {
+      if (conn != null) {
+        conn.disconnect();
+      }
+      shuffleHandler.stop();
+    }
+  }
+
   /**
    * simulate a reducer that sends an invalid shuffle-header - sometimes a wrong
    * header_name and sometimes a wrong version


Mime
View raw message