tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject tez git commit: TEZ-3633. Implement keep-alive timeout in tez shuffle handler (jeagles)
Date Tue, 30 May 2017 19:22:34 GMT
Repository: tez
Updated Branches:
  refs/heads/master 59e65aa0e -> fc4cddf7c


TEZ-3633. Implement keep-alive timeout in tez shuffle handler (jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fc4cddf7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fc4cddf7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fc4cddf7

Branch: refs/heads/master
Commit: fc4cddf7c43d8644c4eba4949f51e2fda8a67f38
Parents: 59e65aa
Author: Jonathan Eagles <jeagles@yahoo-inc.com>
Authored: Tue May 30 14:22:24 2017 -0500
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Tue May 30 14:22:24 2017 -0500

----------------------------------------------------------------------
 .../apache/tez/auxservices/ShuffleHandler.java  | 72 +++++++++++++++++---
 .../tez/auxservices/TestShuffleHandler.java     | 28 ++++++++
 2 files changed, 90 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/fc4cddf7/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
index a829485..abce716 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
@@ -106,6 +106,7 @@ import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandler;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
@@ -131,8 +132,14 @@ import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.jboss.netty.handler.codec.http.QueryStringDecoder;
 import org.jboss.netty.handler.ssl.SslHandler;
 import org.jboss.netty.handler.stream.ChunkedWriteHandler;
+import org.jboss.netty.handler.timeout.IdleState;
+import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler;
+import org.jboss.netty.handler.timeout.IdleStateEvent;
+import org.jboss.netty.handler.timeout.IdleStateHandler;
 import org.jboss.netty.util.CharsetUtil;
+import org.jboss.netty.util.HashedWheelTimer;
 import org.jboss.netty.util.ThreadNameDeterminer;
+import org.jboss.netty.util.Timer;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -240,7 +247,7 @@ public class ShuffleHandler extends AuxiliaryService {
   public static final boolean DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = true;
   public static final boolean WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED =
       false;
-
+  private static final String TIMEOUT_HANDLER = "timeout";
   /* the maximum number of files a single GET request can
    open simultaneously during shuffle
    */
@@ -251,8 +258,9 @@ public class ShuffleHandler extends AuxiliaryService {
   public static final int DEFAULT_SHUFFLE_LISTEN_QUEUE_SIZE = 128;
 
   boolean connectionKeepAliveEnabled = false;
-  String connectionKeepAliveTimeOut;
-  int mapOutputMetaInfoCacheSize;
+  private int connectionKeepAliveTimeOut;
+  private int mapOutputMetaInfoCacheSize;
+  private Timer timer;
 
   @Metrics(about="Shuffle output metrics", context="mapred", name="tez")
   static class ShuffleMetrics implements ChannelFutureListener {
@@ -295,7 +303,15 @@ public class ShuffleHandler extends AuxiliaryService {
       int waitCount = this.reduceContext.getMapsToWait().decrementAndGet();
       if (waitCount == 0) {
         metrics.operationComplete(future);
-        future.getChannel().close();
+        // Let the idle timer handler close keep-alive connections
+        if (reduceContext.getKeepAlive()) {
+          ChannelPipeline pipeline = future.getChannel().getPipeline();
+          TimeoutHandler timeoutHandler =
+              (TimeoutHandler) pipeline.get(TIMEOUT_HANDLER);
+          timeoutHandler.setEnabledTimeout(true);
+        } else {
+          future.getChannel().close();
+        }
       } else {
         pipelineFact.getSHUFFLE().sendMap(reduceContext);
       }
@@ -317,11 +333,12 @@ public class ShuffleHandler extends AuxiliaryService {
     private Map<String, Shuffle.MapOutputInfo> infoMap;
     private String jobId;
     private String dagId;
+    private final boolean keepAlive;
 
     public ReduceContext(List<String> mapIds, Range reduceRange,
                          ChannelHandlerContext context, String usr,
                          Map<String, Shuffle.MapOutputInfo> mapOutputInfoMap,
-                         String jobId, String dagId) {
+                         String jobId, String dagId, boolean keepAlive) {
 
       this.mapIds = mapIds;
       this.reduceRange = reduceRange;
@@ -343,6 +360,7 @@ public class ShuffleHandler extends AuxiliaryService {
       this.user = usr;
       this.infoMap = mapOutputInfoMap;
       this.jobId = jobId;
+      this.keepAlive = keepAlive;
     }
 
     public Range getReduceRange() {
@@ -376,6 +394,10 @@ public class ShuffleHandler extends AuxiliaryService {
     public AtomicInteger getMapsToWait() {
       return mapsToWait;
     }
+
+    public boolean getKeepAlive() {
+      return keepAlive;
+    }
   }
 
   ShuffleHandler(MetricsSystem ms) {
@@ -518,8 +540,10 @@ public class ShuffleHandler extends AuxiliaryService {
     secretManager = new JobTokenSecretManager();
     recoverState(conf);
     ServerBootstrap bootstrap = new ServerBootstrap(selector);
+    // Timer is shared across entire factory and must be released separately
+    timer = new HashedWheelTimer();
     try {
-      pipelineFact = new HttpPipelineFactory(conf);
+      pipelineFact = new HttpPipelineFactory(conf, timer);
     } catch (Exception ex) {
       throw new RuntimeException(ex);
     }
@@ -541,7 +565,7 @@ public class ShuffleHandler extends AuxiliaryService {
     connectionKeepAliveEnabled =
         conf.getBoolean(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED,
           DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED);
-    connectionKeepAliveTimeOut = "timeout=" +
+    connectionKeepAliveTimeOut =
         Math.max(1, conf.getInt(SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT,
           DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT));
     mapOutputMetaInfoCacheSize =
@@ -559,6 +583,10 @@ public class ShuffleHandler extends AuxiliaryService {
     if (pipelineFact != null) {
       pipelineFact.destroy();
     }
+    if (timer != null) {
+      // Release this shared timer resource
+      timer.stop();
+    }
     if (stateDb != null) {
       stateDb.close();
     }
@@ -765,12 +793,29 @@ public class ShuffleHandler extends AuxiliaryService {
     }
   }
 
+  static class TimeoutHandler extends IdleStateAwareChannelHandler {
+
+    private boolean enabledTimeout;
+
+    void setEnabledTimeout(boolean enabledTimeout) {
+      this.enabledTimeout = enabledTimeout;
+    }
+
+    @Override
+    public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) {
+      if (e.getState() == IdleState.WRITER_IDLE && enabledTimeout) {
+        e.getChannel().close();
+      }
+    }
+  }
+
   class HttpPipelineFactory implements ChannelPipelineFactory {
 
     final Shuffle SHUFFLE;
     private SSLFactory sslFactory;
+    private final ChannelHandler idleStateHandler;
 
-    public HttpPipelineFactory(Configuration conf) throws Exception {
+    public HttpPipelineFactory(Configuration conf, Timer timer) throws Exception {
       SHUFFLE = getShuffle(conf);
       if (conf.getBoolean(SHUFFLE_SSL_ENABLED_KEY,
                           SHUFFLE_SSL_ENABLED_DEFAULT)) {
@@ -778,6 +823,7 @@ public class ShuffleHandler extends AuxiliaryService {
         sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
         sslFactory.init();
       }
+      this.idleStateHandler = new IdleStateHandler(timer, 0, connectionKeepAliveTimeOut,
0);
     }
 
     public Shuffle getSHUFFLE() {
@@ -801,6 +847,8 @@ public class ShuffleHandler extends AuxiliaryService {
       pipeline.addLast("encoder", new HttpResponseEncoder());
       pipeline.addLast("chunking", new ChunkedWriteHandler());
       pipeline.addLast("shuffle", SHUFFLE);
+      pipeline.addLast("idle", idleStateHandler);
+      pipeline.addLast(TIMEOUT_HANDLER, new TimeoutHandler());
       return pipeline;
       // TODO factor security manager into pipeline
       // TODO factor out encode/decode to permit binary shuffle
@@ -1024,6 +1072,9 @@ public class ShuffleHandler extends AuxiliaryService {
       Map<String, MapOutputInfo> mapOutputInfoMap =
           new HashMap<String, MapOutputInfo>();
       Channel ch = evt.getChannel();
+      ChannelPipeline pipeline = ch.getPipeline();
+      TimeoutHandler timeoutHandler = (TimeoutHandler)pipeline.get(TIMEOUT_HANDLER);
+      timeoutHandler.setEnabledTimeout(false);
       String user = userRsrc.get(jobId);
 
       try {
@@ -1038,8 +1089,9 @@ public class ShuffleHandler extends AuxiliaryService {
       }
       ch.write(response);
       //Initialize one ReduceContext object per messageReceived call
+      boolean keepAlive = keepAliveParam || connectionKeepAliveEnabled;
       ReduceContext reduceContext = new ReduceContext(mapIds, reduceRange, ctx,
-          user, mapOutputInfoMap, jobId, dagId);
+          user, mapOutputInfoMap, jobId, dagId, keepAlive);
       for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) {
         ChannelFuture nextMap = sendMap(reduceContext);
         if(nextMap == null) {
@@ -1226,7 +1278,7 @@ public class ShuffleHandler extends AuxiliaryService {
       if (connectionKeepAliveEnabled || keepAliveParam) {
         response.setHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(contentLength));
         response.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
-        response.setHeader(HttpHeaders.Values.KEEP_ALIVE, connectionKeepAliveTimeOut);
+        response.setHeader(HttpHeaders.Values.KEEP_ALIVE, "timeout=" + connectionKeepAliveTimeOut);
         if (LOG.isDebugEnabled()) {
           LOG.debug("Content Length in shuffle : " + contentLength);
         }

http://git-wip-us.apache.org/repos/asf/tez/blob/fc4cddf7/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
index a790b9a..b9fd0d2 100644
--- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
@@ -36,6 +36,7 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.HttpURLConnection;
+import java.net.SocketAddress;
 import java.net.SocketException;
 import java.net.URL;
 import java.nio.ByteBuffer;
@@ -77,6 +78,7 @@ import org.apache.hadoop.yarn.server.records.Version;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.socket.SocketChannel;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.AbstractChannel;
@@ -316,6 +318,16 @@ public class TestShuffleHandler {
         failures.size() == 0);
   }
 
+  static class LastSocketAddress {
+    SocketAddress lastAddress;
+    void setAddress(SocketAddress lastAddress) {
+      this.lastAddress = lastAddress;
+    }
+    SocketAddress getSocketAddres() {
+      return lastAddress;
+    }
+  }
+
   @Test(timeout = 10000)
   public void testKeepAlive() throws Exception {
     final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
@@ -324,6 +336,8 @@ public class TestShuffleHandler {
     conf.setBoolean(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, true);
     // try setting to -ve keep alive timeout.
     conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, -100);
+    final LastSocketAddress lastSocketAddress = new LastSocketAddress();
+
     ShuffleHandler shuffleHandler = new ShuffleHandler() {
       @Override
       protected Shuffle getShuffle(final Configuration conf) {
@@ -373,6 +387,7 @@ public class TestShuffleHandler {
           protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
                                                 Channel ch, String user, String mapId, Range
reduceRange,
                                                 MapOutputInfo info) throws IOException {
+            lastSocketAddress.setAddress(ch.getRemoteAddress());
             HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
 
             // send a shuffle header and a lot of data down the channel
@@ -432,6 +447,9 @@ public class TestShuffleHandler {
     Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
     ShuffleHeader header = new ShuffleHeader();
     header.readFields(input);
+    byte[] buffer = new byte[1024];
+    while (input.read(buffer) != -1) {}
+    SocketAddress firstAddress = lastSocketAddress.getSocketAddres();
     input.close();
 
     // For keepAlive via URL
@@ -453,6 +471,11 @@ public class TestShuffleHandler {
     header = new ShuffleHeader();
     header.readFields(input);
     input.close();
+    SocketAddress secondAddress = lastSocketAddress.getSocketAddres();
+    Assert.assertNotNull("Initial shuffle address should not be null", firstAddress);
+    Assert.assertNotNull("Keep-Alive shuffle address should not be null", secondAddress);
+    Assert.assertEquals("Initial shuffle address and keep-alive shuffle "
+        + "address should be the same", firstAddress, secondAddress);
   }
 
   @Test
@@ -1147,14 +1170,19 @@ public class TestShuffleHandler {
         mock(ChannelHandlerContext.class);
     final MessageEvent mockEvt = mock(MessageEvent.class);
     final Channel mockCh = mock(AbstractChannel.class);
+    final ChannelPipeline mockPipeline = Mockito.mock(ChannelPipeline.class);
 
     // Mock HttpRequest and ChannelFuture
     final HttpRequest mockHttpRequest = createMockHttpRequest();
     final ChannelFuture mockFuture = createMockChannelFuture(mockCh,
         listenerList);
+    final ShuffleHandler.TimeoutHandler timerHandler =
+        new ShuffleHandler.TimeoutHandler();
 
     // Mock Netty Channel Context and Channel behavior
     Mockito.doReturn(mockCh).when(mockCtx).getChannel();
+    Mockito.when(mockCh.getPipeline()).thenReturn(mockPipeline);
+    Mockito.when(mockPipeline.get(Mockito.any(String.class))).thenReturn(timerHandler);
     when(mockCtx.getChannel()).thenReturn(mockCh);
     Mockito.doReturn(mockFuture).when(mockCh).write(Mockito.any(Object.class));
     when(mockCh.write(Object.class)).thenReturn(mockFuture);


Mime
View raw message