pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] merlimat closed pull request #2464: Copy ByteBufPair buffers when using with SSL (#2401)
Date Fri, 31 Aug 2018 22:39:33 GMT
merlimat closed pull request #2464: Copy ByteBufPair buffers when using with SSL (#2401)
URL: https://github.com/apache/incubator-pulsar/pull/2464
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index 7858a11048..a3fb772f56 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -57,9 +57,11 @@ protected void initChannel(SocketChannel ch) throws Exception {
                     serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(),
                     serviceConfig.getTlsRequireTrustedClientCertOnConnect());
             ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
+            ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
+        } else {
+            ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
         }
 
-        ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
         ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize,
0, 4, 0, 4));
         ch.pipeline().addLast("handler", new ServerCnx(pulsar));
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index afba7d2723..3188b3562b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -100,9 +100,10 @@ public void initChannel(SocketChannel ch) throws Exception {
                                 conf.getTlsTrustCertsFilePath());
                     }
                     ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
+                    ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
+                } else {
+                    ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
                 }
-
-                ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
                 ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MaxMessageSize,
0, 4, 0, 4));
                 ch.pipeline().addLast("handler", clientCnxSupplier.get());
             }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java
index 94e1fb7b13..b99270b1c1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java
@@ -109,6 +109,7 @@ public ReferenceCounted touch(Object hint) {
     }
 
     public static final Encoder ENCODER = new Encoder();
+    public static final CopyingEncoder COPYING_ENCODER = new CopyingEncoder();
 
     @Sharable
     public static class Encoder extends ChannelOutboundHandlerAdapter {
@@ -132,4 +133,26 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise
promise)
         }
     }
 
-}
\ No newline at end of file
+    @Sharable
+    public static class CopyingEncoder extends ChannelOutboundHandlerAdapter {
+        @Override
+        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
+            if (msg instanceof ByteBufPair) {
+                ByteBufPair b = (ByteBufPair) msg;
+
+                // Some handlers in the pipeline will modify the bytebufs passed in to them
(i.e. SslHandler).
+                // For these handlers, we need to pass a copy of the buffers as the source
buffers may be cached
+                // for multiple requests.
+                try {
+                    ctx.write(b.getFirst().copy(), ctx.voidPromise());
+                    ctx.write(b.getSecond().copy(), promise);
+                } finally {
+                    ReferenceCountUtil.safeRelease(b);
+                }
+            } else {
+                ctx.write(msg, promise);
+            }
+        }
+    }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message