activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [2/2] activemq-artemis git commit: ARTEMIS-1301 Network failures recognition on backpressure while streaming large messages
Date Thu, 20 Jul 2017 09:34:02 GMT
ARTEMIS-1301 Network failures recognition on backpressure while streaming large messages


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/fdbf4f45
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/fdbf4f45
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/fdbf4f45

Branch: refs/heads/master
Commit: fdbf4f450a37efdfa7e53b7846084b26293bdc7a
Parents: 00d880e
Author: Francesco Nigro <nigro.fra@gmail.com>
Authored: Tue Jul 18 13:31:35 2017 +0200
Committer: Martyn Taylor <mtaylor@redhat.com>
Committed: Thu Jul 20 10:33:04 2017 +0100

----------------------------------------------------------------------
 .../protocol/core/CoreRemotingConnection.java   |  1 +
 .../core/impl/ActiveMQSessionContext.java       | 32 +++++++++++---------
 .../remoting/impl/netty/NettyConnection.java    | 13 ++++++--
 .../artemis/spi/core/remoting/Connection.java   |  1 +
 .../impl/netty/NettyConnectionTest.java         | 11 +++++++
 5 files changed, 41 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fdbf4f45/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
index 45d9229..1756153 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
@@ -117,6 +117,7 @@ public interface CoreRemotingConnection extends RemotingConnection {
     * @param size size we are trying to write
     * @param timeout
     * @return
+    * @throws IllegalStateException if the connection is closed
     */
    boolean blockUntilWritable(int size, long timeout);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fdbf4f45/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 71baaeb..b8eb22c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -808,21 +808,25 @@ public class ActiveMQSessionContext extends SessionContext {
       final CoreRemotingConnection connection = channel.getConnection();
       final long blockingCallTimeoutMillis = Math.max(0, connection.getBlockingCallTimeout());
       final long startFlowControl = System.nanoTime();
-      final boolean isWritable = connection.blockUntilWritable(expectedEncodeSize, blockingCallTimeoutMillis);
-      if (!isWritable) {
-         final long endFlowControl = System.nanoTime();
-         final long elapsedFlowControl = endFlowControl - startFlowControl;
-         final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedFlowControl);
-         ActiveMQClientLogger.LOGGER.timeoutStreamingLargeMessage();
-         logger.debug("try to write " + expectedEncodeSize + " bytes after blocked " + elapsedMillis
+ " ms on a not writable connection: [" + connection.getID() + "]");
-      }
-      if (requiresResponse) {
-         // When sending it blocking, only the last chunk will be blocking.
-         channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
-      } else {
-         channel.send(chunkPacket);
+      try {
+         final boolean isWritable = connection.blockUntilWritable(expectedEncodeSize, blockingCallTimeoutMillis);
+         if (!isWritable) {
+            final long endFlowControl = System.nanoTime();
+            final long elapsedFlowControl = endFlowControl - startFlowControl;
+            final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedFlowControl);
+            ActiveMQClientLogger.LOGGER.timeoutStreamingLargeMessage();
+            logger.debug("try to write " + expectedEncodeSize + " bytes after blocked " +
elapsedMillis + " ms on a not writable connection: [" + connection.getID() + "]");
+         }
+         if (requiresResponse) {
+            // When sending it blocking, only the last chunk will be blocking.
+            channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
+         } else {
+            channel.send(chunkPacket);
+         }
+         return chunkPacket.getPacketSize();
+      } catch (Throwable e) {
+         throw new ActiveMQException(e.getMessage());
       }
-      return chunkPacket.getPacketSize();
    }
 
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fdbf4f45/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
index 3ce40c9..384ca5e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
@@ -294,8 +294,15 @@ public class NettyConnection implements Connection {
       write(buffer, flush, batched, null);
    }
 
+   private void checkConnectionState() {
+      if (this.closed || !this.channel.isActive()) {
+         throw new IllegalStateException("Connection " + getID() + " closed or disconnected");
+      }
+   }
+
    @Override
    public final boolean blockUntilWritable(final int requiredCapacity, final long timeout,
final TimeUnit timeUnit) {
+      checkConnectionState();
       final boolean isAllowedToBlock = isAllowedToBlock();
       if (!isAllowedToBlock) {
 
@@ -324,6 +331,8 @@ public class NettyConnection implements Connection {
          }
          boolean canWrite;
          while (!(canWrite = canWrite(requiredCapacity)) && System.nanoTime() <
deadline) {
+            //periodically check the connection state
+            checkConnectionState();
             LockSupport.parkNanos(parkNanos);
          }
          return canWrite;
@@ -361,9 +370,7 @@ public class NettyConnection implements Connection {
       if (logger.isDebugEnabled()) {
          final int remainingBytes = this.writeBufferHighWaterMark - readableBytes;
          if (remainingBytes < 0) {
-            logger.debug("a write request is exceeding by " + (-remainingBytes) +
-                            " bytes the writeBufferHighWaterMark size [ " + this.writeBufferHighWaterMark
+
-                            " ] : consider to set it at least of " + readableBytes + " bytes");
+            logger.debug("a write request is exceeding by " + (-remainingBytes) + " bytes
the writeBufferHighWaterMark size [ " + this.writeBufferHighWaterMark + " ] : consider to
set it at least of " + readableBytes + " bytes");
          }
       }
       //no need to lock because the Netty's channel is thread-safe

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fdbf4f45/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
index 56d1bc3..63dbcfb 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
@@ -53,6 +53,7 @@ public interface Connection {
     * @param timeout          the maximum time to wait
     * @param timeUnit         the time unit of the timeout argument
     * @return {@code true} if the connection can enqueue {@code requiredCapacity} bytes,
{@code false} otherwise
+    * @throws IllegalStateException if the connection is closed
     */
    default boolean blockUntilWritable(final int requiredCapacity, final long timeout, final
TimeUnit timeUnit) {
       return true;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fdbf4f45/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
index 8d8e482..05ae1f6 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -73,6 +74,16 @@ public class NettyConnectionTest extends ActiveMQTestBase {
 
    }
 
+   @Test(expected = IllegalStateException.class)
+   public void throwsExceptionOnBlockUntilWritableIfClosed() {
+      EmbeddedChannel channel = createChannel();
+      NettyConnection conn = new NettyConnection(emptyMap, channel, new MyListener(), false,
false);
+      conn.close();
+      //to make sure the channel is closed it needs to run the pending tasks
+      channel.runPendingTasks();
+      conn.blockUntilWritable(0, 0, TimeUnit.NANOSECONDS);
+   }
+
    private static EmbeddedChannel createChannel() {
       return new EmbeddedChannel(new ChannelInboundHandlerAdapter());
    }


Mime
View raw message