activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: AMQ-6673 Add some fixes and improvements to the AMQP test client
Date Thu, 11 May 2017 19:01:28 GMT
Repository: activemq
Updated Branches:
  refs/heads/master fddbac2b8 -> 154ff81ee


AMQ-6673 Add some fixes and improvements to the AMQP test client

Adds some thread safety fixes and Netty usage fixes to the transport as
well as adding a traceBytes option to trace the bytes sent / received
during testing.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/154ff81e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/154ff81e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/154ff81e

Branch: refs/heads/master
Commit: 154ff81eeef5cb24b280ba91afa6a880afda7e9e
Parents: fddbac2
Author: Timothy Bish <tabish121@gmail.com>
Authored: Thu May 11 15:01:16 2017 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Thu May 11 15:01:16 2017 -0400

----------------------------------------------------------------------
 .../client/transport/NettyTcpTransport.java     | 29 ++++++++++++++------
 .../client/transport/NettyTransportOptions.java | 19 +++++++++++++
 .../amqp/client/transport/NettyWSTransport.java |  2 +-
 3 files changed, 41 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/154ff81e/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
index 0980d5e..eb54cda 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
@@ -42,6 +42,7 @@ import io.netty.channel.FixedRecvByteBufAllocator;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.logging.LoggingHandler;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
@@ -53,7 +54,6 @@ public class NettyTcpTransport implements NettyTransport {
 
     private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class);
 
-    private static final int QUIET_PERIOD = 20;
     private static final int SHUTDOWN_TIMEOUT = 100;
 
     protected Bootstrap bootstrap;
@@ -66,7 +66,7 @@ public class NettyTcpTransport implements NettyTransport {
     private final AtomicBoolean connected = new AtomicBoolean();
     private final AtomicBoolean closed = new AtomicBoolean();
     private final CountDownLatch connectLatch = new CountDownLatch(1);
-    private IOException failureCause;
+    private volatile IOException failureCause;
 
     /**
      * Create a new transport instance
@@ -163,7 +163,10 @@ public class NettyTcpTransport implements NettyTransport {
                 channel = null;
             }
             if (group != null) {
-                group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+                Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+                if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+                    LOG.trace("Channel group shutdown failed to complete in allotted time");
+                }
                 group = null;
             }
 
@@ -196,11 +199,17 @@ public class NettyTcpTransport implements NettyTransport {
     public void close() throws IOException {
         if (closed.compareAndSet(false, true)) {
             connected.set(false);
-            if (channel != null) {
-                channel.close().syncUninterruptibly();
-            }
-            if (group != null) {
-                group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+            try {
+                if (channel != null) {
+                    channel.close().syncUninterruptibly();
+                }
+            } finally {
+                if (group != null) {
+                    Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+                    if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+                        LOG.trace("Channel group shutdown failed to complete in allotted
time");
+                    }
+                }
             }
         }
     }
@@ -371,6 +380,10 @@ public class NettyTcpTransport implements NettyTransport {
             channel.pipeline().addLast(sslHandler);
         }
 
+        if (getTransportOptions().isTraceBytes()) {
+            channel.pipeline().addLast("logger", new LoggingHandler(getClass()));
+        }
+
         addAdditionalHandlers(channel.pipeline());
 
         channel.pipeline().addLast(createChannelHandler());

http://git-wip-us.apache.org/repos/asf/activemq/blob/154ff81e/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java
index add924c..c0fe41f 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java
@@ -30,6 +30,7 @@ public class NettyTransportOptions implements Cloneable {
     public static final int DEFAULT_SO_TIMEOUT = -1;
     public static final int DEFAULT_CONNECT_TIMEOUT = 60000;
     public static final int DEFAULT_TCP_PORT = 5672;
+    public static final boolean DEFAULT_TRACE_BYTES = false;
 
     public static final NettyTransportOptions INSTANCE = new NettyTransportOptions();
 
@@ -42,6 +43,7 @@ public class NettyTransportOptions implements Cloneable {
     private boolean tcpKeepAlive = DEFAULT_TCP_KEEP_ALIVE;
     private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY;
     private int defaultTcpPort = DEFAULT_TCP_PORT;
+    private boolean traceBytes = DEFAULT_TRACE_BYTES;
 
     /**
      * @return the currently set send buffer size in bytes.
@@ -163,6 +165,23 @@ public class NettyTransportOptions implements Cloneable {
         this.defaultTcpPort = defaultTcpPort;
     }
 
+    /**
+     * @return true if the transport should enable byte tracing
+     */
+    public boolean isTraceBytes() {
+        return traceBytes;
+    }
+
+    /**
+     * Determines if the transport should add a logger for bytes in / out
+     *
+     * @param traceBytes
+     *      should the transport log the bytes in and out.
+     */
+    public void setTraceBytes(boolean traceBytes) {
+        this.traceBytes = traceBytes;
+    }
+
     public boolean isSSL() {
         return false;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/154ff81e/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
index c2fcfe7..9693a98 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
@@ -141,7 +141,7 @@ public class NettyWSTransport extends NettyTcpTransport {
             if (message instanceof FullHttpResponse) {
                 FullHttpResponse response = (FullHttpResponse) message;
                 throw new IllegalStateException(
-                    "Unexpected FullHttpResponse (getStatus=" + response.getStatus() +
+                    "Unexpected FullHttpResponse (getStatus=" + response.status() +
                     ", content=" + response.content().toString(StandardCharsets.UTF_8) +
')');
             }
 


Mime
View raw message