Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 460C2200CCF for ; Mon, 24 Jul 2017 15:43:55 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4458116393C; Mon, 24 Jul 2017 13:43:55 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 62087163933 for ; Mon, 24 Jul 2017 15:43:54 +0200 (CEST) Received: (qmail 7948 invoked by uid 500); 24 Jul 2017 13:43:53 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 7939 invoked by uid 99); 24 Jul 2017 13:43:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Jul 2017 13:43:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3AB64E0896; Mon, 24 Jul 2017 13:43:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Message-Id: <89f051a3414e460591eb9d38f17d7c80@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: activemq-artemis git commit: ARTEMIS-1301 Network failures recognition on backpressure while streaming large messages Date: Mon, 24 Jul 2017 13:43:53 +0000 (UTC) archived-at: Mon, 24 Jul 2017 13:43:55 -0000 Repository: activemq-artemis Updated Branches: refs/heads/1.x e2d9e1bb6 -> 44d3be5a7 ARTEMIS-1301 Network failures recognition on backpressure while streaming large messages (cherry picked from commit 32fe21d59591788f67cbb65d7cb7e641f74c44d1) Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/44d3be5a Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/44d3be5a Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/44d3be5a Branch: refs/heads/1.x Commit: 44d3be5a7ef66b1df4a436e6545cfc995a11f5ad Parents: e2d9e1b Author: Francesco Nigro Authored: Tue Jul 18 13:31:35 2017 +0200 Committer: Clebert Suconic Committed: Mon Jul 24 09:43:43 2017 -0400 ---------------------------------------------------------------------- .../protocol/core/CoreRemotingConnection.java | 1 + .../core/impl/ActiveMQSessionContext.java | 32 +++++++++++--------- .../remoting/impl/netty/NettyConnection.java | 13 ++++++-- .../artemis/spi/core/remoting/Connection.java | 1 + .../impl/netty/NettyConnectionTest.java | 11 +++++++ 5 files changed, 41 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44d3be5a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java index 45d9229..1756153 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java @@ -117,6 +117,7 @@ public interface CoreRemotingConnection extends RemotingConnection { * @param size size we are trying to write * @param timeout * @return + * @throws IllegalStateException if the connection is closed */ boolean blockUntilWritable(int size, long timeout); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44d3be5a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index b74f988..d2bcc96 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -764,21 +764,25 @@ public class ActiveMQSessionContext extends SessionContext { final CoreRemotingConnection connection = channel.getConnection(); final long blockingCallTimeoutMillis = Math.max(0, connection.getBlockingCallTimeout()); final long startFlowControl = System.nanoTime(); - final boolean isWritable = connection.blockUntilWritable(expectedEncodeSize, blockingCallTimeoutMillis); - if (!isWritable) { - final long endFlowControl = System.nanoTime(); - final long elapsedFlowControl = endFlowControl - startFlowControl; - final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedFlowControl); - ActiveMQClientLogger.LOGGER.timeoutStreamingLargeMessage(); - logger.debug("try to write " + expectedEncodeSize + " bytes after blocked " + elapsedMillis + " ms on a not writable connection: [" + connection.getID() + "]"); - } - if (requiresResponse) { - // When sending it blocking, only the last chunk will be blocking. - channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE); - } else { - channel.send(chunkPacket); + try { + final boolean isWritable = connection.blockUntilWritable(expectedEncodeSize, blockingCallTimeoutMillis); + if (!isWritable) { + final long endFlowControl = System.nanoTime(); + final long elapsedFlowControl = endFlowControl - startFlowControl; + final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedFlowControl); + ActiveMQClientLogger.LOGGER.timeoutStreamingLargeMessage(); + logger.debug("try to write " + expectedEncodeSize + " bytes after blocked " + elapsedMillis + " ms on a not writable connection: [" + connection.getID() + "]"); + } + if (requiresResponse) { + // When sending it blocking, only the last chunk will be blocking. + channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE); + } else { + channel.send(chunkPacket); + } + return chunkPacket.getPacketSize(); + } catch (Throwable e) { + throw new ActiveMQException(e.getMessage()); } - return chunkPacket.getPacketSize(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44d3be5a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java index 373c2f7..3c5d1d5 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java @@ -291,8 +291,15 @@ public class NettyConnection implements Connection { write(buffer, flush, batched, null); } + private void checkConnectionState() { + if (this.closed || !this.channel.isActive()) { + throw new IllegalStateException("Connection " + getID() + " closed or disconnected"); + } + } + @Override public final boolean blockUntilWritable(final int requiredCapacity, final long timeout, final TimeUnit timeUnit) { + checkConnectionState(); final boolean isAllowedToBlock = isAllowedToBlock(); if (!isAllowedToBlock) { if (logger.isDebugEnabled()) { @@ -313,6 +320,8 @@ public class NettyConnection implements Connection { } boolean canWrite; while (!(canWrite = canWrite(requiredCapacity)) && System.nanoTime() < deadline) { + //periodically check the connection state + checkConnectionState(); LockSupport.parkNanos(parkNanos); } return canWrite; @@ -350,9 +359,7 @@ public class NettyConnection implements Connection { if (logger.isDebugEnabled()) { final int remainingBytes = this.writeBufferHighWaterMark - readableBytes; if (remainingBytes < 0) { - logger.debug("a write request is exceeding by " + (-remainingBytes) + - " bytes the writeBufferHighWaterMark size [ " + this.writeBufferHighWaterMark + - " ] : consider to set it at least of " + readableBytes + " bytes"); + logger.debug("a write request is exceeding by " + (-remainingBytes) + " bytes the writeBufferHighWaterMark size [ " + this.writeBufferHighWaterMark + " ] : consider to set it at least of " + readableBytes + " bytes"); } } //no need to lock because the Netty's channel is thread-safe http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44d3be5a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java index 56d1bc3..63dbcfb 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java @@ -53,6 +53,7 @@ public interface Connection { * @param timeout the maximum time to wait * @param timeUnit the time unit of the timeout argument * @return {@code true} if the connection can enqueue {@code requiredCapacity} bytes, {@code false} otherwise + * @throws IllegalStateException if the connection is closed */ default boolean blockUntilWritable(final int requiredCapacity, final long timeout, final TimeUnit timeUnit) { return true; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44d3be5a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java index f99b25a..54e40ed 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty; import java.nio.ByteBuffer; import java.util.Collections; import java.util.Map; +import java.util.concurrent.TimeUnit; import io.netty.channel.Channel; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -72,6 +73,16 @@ public class NettyConnectionTest extends ActiveMQTestBase { } + @Test(expected = IllegalStateException.class) + public void throwsExceptionOnBlockUntilWritableIfClosed() { + EmbeddedChannel channel = createChannel(); + NettyConnection conn = new NettyConnection(emptyMap, channel, new MyListener(), false, false); + conn.close(); + //to make sure the channel is closed it needs to run the pending tasks + channel.runPendingTasks(); + conn.blockUntilWritable(0, 0, TimeUnit.NANOSECONDS); + } + private static EmbeddedChannel createChannel() { return new EmbeddedChannel(new ChannelInboundHandlerAdapter()); }