hadoop-common-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [hadoop] szilard-nemeth commented on a change in pull request #3259: HADOOP-15327. Upgrade MR ShuffleHandler to use Netty4
Date Fri, 06 Aug 2021 11:57:52 GMT

szilard-nemeth commented on a change in pull request #3259:
URL: https://github.com/apache/hadoop/pull/3259#discussion_r684177660



##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
##########
@@ -828,27 +893,44 @@ public void destroy() {
       }
     }
 
-    @Override
-    public ChannelPipeline getPipeline() throws Exception {
-      ChannelPipeline pipeline = Channels.pipeline();
+    @Override protected void initChannel(SocketChannel ch) throws Exception {
+      ChannelPipeline pipeline = ch.pipeline();
       if (sslFactory != null) {
         pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
       }
       pipeline.addLast("decoder", new HttpRequestDecoder());
-      pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
-      pipeline.addLast("encoder", new HttpResponseEncoder());
+      pipeline.addLast("aggregator", new HttpObjectAggregator(1 << 16));
+      pipeline.addLast(ENCODER_HANDLER_NAME, new HttpResponseEncoder());
       pipeline.addLast("chunking", new ChunkedWriteHandler());
       pipeline.addLast("shuffle", SHUFFLE);
-      pipeline.addLast("idle", idleStateHandler);
-      pipeline.addLast(TIMEOUT_HANDLER, new TimeoutHandler());
-      return pipeline;
+      addOutboundHandlersIfRequired(pipeline);
+      pipeline.addLast(TIMEOUT_HANDLER, new TimeoutHandler(connectionKeepAliveTimeOut));
       // TODO factor security manager into pipeline
       // TODO factor out encode/decode to permit binary shuffle
       // TODO factor out decode of index to permit alt. models
     }
+
+    private void addOutboundHandlersIfRequired(ChannelPipeline pipeline) {
+      if (useOutboundExceptionHandler) {
+        //https://stackoverflow.com/questions/50612403/catch-all-exception-handling-for-outbound-channelhandler
+        pipeline.addLast("outboundExceptionHandler", new ChannelOutboundHandlerAdapter()
{
+          @Override
+          public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
+            promise.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+            super.write(ctx, msg, promise);
+          }
+        });
+      }
+      if (useOutboundLogger) {
+        //Replace HttpResponseEncoder with LoggingHttpResponseEncoder
+        //Need to use the same name as before, otherwise we would have 2 encoders 
+        pipeline.replace(ENCODER_HANDLER_NAME, ENCODER_HANDLER_NAME, new LoggingHttpResponseEncoder(false));

Review comment:
       Makes sense, inlined the method and got rid of the pipeline.replace




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


Mime
View raw message