hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject hive git commit: HIVE-16044: LLAP: Shuffle Handler keep-alive connections are closed from the server side (Rajesh Balamohan, reviwed by Gopal Vijayaraghavan)
Date Wed, 12 Apr 2017 07:42:24 GMT
Repository: hive
Updated Branches:
  refs/heads/master a38ae17fe -> bd313cf36


HIVE-16044: LLAP: Shuffle Handler keep-alive connections are closed from the server side (Rajesh
Balamohan, reviwed by Gopal Vijayaraghavan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bd313cf3
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bd313cf3
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bd313cf3

Branch: refs/heads/master
Commit: bd313cf365ca9bdc3321f6b20dcb80296022c390
Parents: a38ae17
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Wed Apr 12 13:11:59 2017 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Wed Apr 12 13:11:59 2017 +0530

----------------------------------------------------------------------
 .../llap/shufflehandler/ShuffleHandler.java     | 37 +++++++++++++++++---
 1 file changed, 33 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/bd313cf3/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
index dce0c56..085c977 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
@@ -74,7 +74,6 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.common.security.JobTokenIdentifier;
@@ -88,6 +87,7 @@ import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandler;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
@@ -109,9 +109,15 @@ import org.jboss.netty.handler.codec.http.HttpResponse;
 import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.jboss.netty.handler.codec.http.QueryStringDecoder;
+import org.jboss.netty.handler.timeout.IdleState;
+import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler;
+import org.jboss.netty.handler.timeout.IdleStateEvent;
+import org.jboss.netty.handler.timeout.IdleStateHandler;
 import org.jboss.netty.handler.ssl.SslHandler;
 import org.jboss.netty.handler.stream.ChunkedWriteHandler;
 import org.jboss.netty.util.CharsetUtil;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timer;
 
 public class ShuffleHandler implements AttemptRegistrationListener {
 
@@ -208,6 +214,7 @@ public class ShuffleHandler implements AttemptRegistrationListener {
   final boolean connectionKeepAliveEnabled;
   final int connectionKeepAliveTimeOut;
   final int mapOutputMetaInfoCacheSize;
+  Timer timer;
   private final LocalDirAllocator lDirAlloc =
       new LocalDirAllocator(SHUFFLE_HANDLER_LOCAL_DIRS);
   private final Shuffle shuffle;
@@ -311,8 +318,10 @@ public class ShuffleHandler implements AttemptRegistrationListener {
 
   public void start() throws Exception {
     ServerBootstrap bootstrap = new ServerBootstrap(selector);
+    // Timer is shared across entire factory and must be released separately
+    timer = new HashedWheelTimer();
     try {
-      pipelineFact = new HttpPipelineFactory(conf);
+      pipelineFact = new HttpPipelineFactory(conf, timer);
     } catch (Exception ex) {
       throw new RuntimeException(ex);
     }
@@ -476,6 +485,10 @@ public class ShuffleHandler implements AttemptRegistrationListener {
     if (pipelineFact != null) {
       pipelineFact.destroy();
     }
+    if (timer != null) {
+      // Release this shared timer resource
+      timer.stop();
+    }
     if (dirWatcher != null) {
       dirWatcher.stop();
     }
@@ -506,12 +519,22 @@ public class ShuffleHandler implements AttemptRegistrationListener {
     userRsrc.remove(appIdString);
   }
 
+  private static class TimeoutHandler extends IdleStateAwareChannelHandler {
+    @Override
+    public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) {
+      if (e.getState() == IdleState.WRITER_IDLE) {
+        e.getChannel().close();
+      }
+    }
+  }
+
   class HttpPipelineFactory implements ChannelPipelineFactory {
 
     final Shuffle SHUFFLE;
     private SSLFactory sslFactory;
+    private final ChannelHandler idleStateHandler;
 
-    public HttpPipelineFactory(Configuration conf) throws Exception {
+    public HttpPipelineFactory(Configuration conf, Timer timer) throws Exception {
       SHUFFLE = getShuffle(conf);
       // TODO Setup SSL Shuffle
 //      if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
@@ -520,6 +543,7 @@ public class ShuffleHandler implements AttemptRegistrationListener {
 //        sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
 //        sslFactory.init();
 //      }
+      this.idleStateHandler = new IdleStateHandler(timer, 0, connectionKeepAliveTimeOut,
0);
     }
 
     public void destroy() {
@@ -539,6 +563,8 @@ public class ShuffleHandler implements AttemptRegistrationListener {
       pipeline.addLast("encoder", new HttpResponseEncoder());
       pipeline.addLast("chunking", new ChunkedWriteHandler());
       pipeline.addLast("shuffle", SHUFFLE);
+      pipeline.addLast("idle", idleStateHandler);
+      pipeline.addLast("timeout", new TimeoutHandler());
       return pipeline;
       // TODO factor security manager into pipeline
       // TODO factor out encode/decode to permit binary shuffle
@@ -752,7 +778,10 @@ public class ShuffleHandler implements AttemptRegistrationListener {
           return;
         }
       }
-      lastMap.addListener(ChannelFutureListener.CLOSE);
+      // If Keep alive is enabled, do not close the connection.
+      if (!keepAliveParam && !connectionKeepAliveEnabled) {
+        lastMap.addListener(ChannelFutureListener.CLOSE);
+      }
     }
 
     private String getErrorMessage(Throwable t) {


Mime
View raw message