activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject activemq-artemis git commit: [ARTEMIS-963] Prevent ClassCastException in ActiveMQChannelHandler
Date Wed, 15 Feb 2017 01:12:00 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/1.x e47b5d695 -> be3702096


[ARTEMIS-963] Prevent ClassCastException in ActiveMQChannelHandler

When HTTP Upgrade is enabled, update Netty's pipeline only after the
HTTP Upgrade handshake is successful *and* the trailing
EMPTY_LAST_CONTENT is received.
Otherwise, this EMPTY_LAST_CONTENT is handled by
ActiveMQChannelHandler which is only expected to handle ByteBuf

JIRA: https://issues.apache.org/jira/browse/ARTEMIS-963
(cherry picked from commit 992dc2bc1b613dcb788ae2b4d59dc098e3325610)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/be370209
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/be370209
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/be370209

Branch: refs/heads/1.x
Commit: be370209641312c270176d7ac552d3f0e54ab555
Parents: e47b5d6
Author: Jeff Mesnil <jmesnil@gmail.com>
Authored: Tue Feb 14 15:25:39 2017 +0100
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Tue Feb 14 20:11:53 2017 -0500

----------------------------------------------------------------------
 .../remoting/impl/netty/NettyConnector.java     | 59 ++++++++++++--------
 1 file changed, 37 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be370209/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
index f785292..38fb326 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
@@ -80,6 +80,7 @@ import io.netty.handler.codec.http.HttpResponse;
 import io.netty.handler.codec.http.HttpResponseDecoder;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.LastHttpContent;
 import io.netty.handler.codec.http.cookie.ClientCookieEncoder;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.AttributeKey;
@@ -745,36 +746,50 @@ public class NettyConnector extends AbstractConnector {
          this.httpClientCodec = httpClientCodec;
       }
 
+      /**
+       * HTTP upgrade response will be decode by Netty as 2 objects:
+       * - 1 HttpObject corresponding to the 101 SWITCHING PROTOCOL headers
+       * - 1 EMPTY_LAST_CONTENT
+       *
+       * The HTTP upgrade is successful whne the 101 SWITCHING PROTOCOL has been received
(handshakeComplete = true)
+       * but the latch is count down only when the following EMPTY_LAST_CONTENT is also received.
+       * Otherwise this ChannelHandler would be removed too soon and the ActiveMQChannelHandler
would handle the
+       * EMPTY_LAST_CONTENT (while it is expecitng only ByteBuf).
+       */
       @Override
       public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception
{
          if (logger.isDebugEnabled()) {
             logger.debug("Received msg=" + msg);
          }
-         try {
-            if (msg instanceof HttpResponse) {
-               HttpResponse response = (HttpResponse) msg;
-               if (response.getStatus().code() == HttpResponseStatus.SWITCHING_PROTOCOLS.code()
&& response.headers().get(HttpHeaders.Names.UPGRADE).equals(ACTIVEMQ_REMOTING)) {
-                  String accept = response.headers().get(SEC_ACTIVEMQ_REMOTING_ACCEPT);
-                  String expectedResponse = createExpectedResponse(MAGIC_NUMBER, ctx.channel().attr(REMOTING_KEY).get());
-
-                  if (expectedResponse.equals(accept)) {
-                     // remove the http handlers and flag the activemq channel handler as
active
-                     pipeline.remove(httpClientCodec);
-                     pipeline.remove(this);
-                     handshakeComplete = true;
-                     ActiveMQChannelHandler channelHandler = pipeline.get(ActiveMQChannelHandler.class);
-                     channelHandler.active = true;
-                     return;
-                  }
+         if (msg instanceof HttpResponse) {
+            HttpResponse response = (HttpResponse) msg;
+            if (response.getStatus().code() == HttpResponseStatus.SWITCHING_PROTOCOLS.code()
&& response.headers().get(HttpHeaders.Names.UPGRADE).equals(ACTIVEMQ_REMOTING)) {
+               String accept = response.headers().get(SEC_ACTIVEMQ_REMOTING_ACCEPT);
+               String expectedResponse = createExpectedResponse(MAGIC_NUMBER, ctx.channel().attr(REMOTING_KEY).get());
+
+               if (expectedResponse.equals(accept)) {
+                  // HTTP upgrade is successful but let's wait to receive the EMPTY_LAST_CONTENT
to count down the latch
+                  handshakeComplete = true;
+               } else {
+                  // HTTP upgrade failed
+                  ActiveMQClientLogger.LOGGER.httpHandshakeFailed(msg);
+                  ctx.close();
+                  latch.countDown();
                }
+               return;
             }
-         } finally {
-            if (!handshakeComplete) {
-               ActiveMQClientLogger.LOGGER.httpHandshakeFailed(msg);
-               ctx.close();
-            }
-            latch.countDown();
+         } else if (msg == LastHttpContent.EMPTY_LAST_CONTENT && handshakeComplete)
{
+            // remove the http handlers and flag the activemq channel handler as active
+            pipeline.remove(httpClientCodec);
+            pipeline.remove(this);
+            ActiveMQChannelHandler channelHandler = pipeline.get(ActiveMQChannelHandler.class);
+            channelHandler.active = true;
+         }
+         if (!handshakeComplete) {
+            ActiveMQClientLogger.LOGGER.httpHandshakeFailed(msg);
+            ctx.close();
          }
+         latch.countDown();
       }
 
       @Override


Mime
View raw message