Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8D094200C47 for ; Thu, 30 Mar 2017 13:12:31 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8B6F8160B8B; Thu, 30 Mar 2017 11:12:31 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AA4CB160B78 for ; Thu, 30 Mar 2017 13:12:30 +0200 (CEST) Received: (qmail 183 invoked by uid 500); 30 Mar 2017 11:12:29 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 174 invoked by uid 99); 30 Mar 2017 11:12:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Mar 2017 11:12:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AF09ADFE59; Thu, 30 Mar 2017 11:12:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gtully@apache.org To: commits@activemq.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: [AMQ-6640] either broker dispatch to bridge or bridge dispatch to broker needs to be async - dispatchAsync network option is the trigger for the vm transport to be sync in line with the current defaults. original BacklogNetworkCrossT Date: Thu, 30 Mar 2017 11:12:29 +0000 (UTC) archived-at: Thu, 30 Mar 2017 11:12:31 -0000 Repository: activemq Updated Branches: refs/heads/master 5ac9657c1 -> 4ef1fc74c [AMQ-6640] either broker dispatch to bridge or bridge dispatch to broker needs to be async - dispatchAsync network option is the trigger for the vm transport to be sync in line with the current defaults. original BacklogNetworkCrossTalkTest scenario exposed this. upshot is dispatchAsync=false is not compatible with duplicate subscription suppression which is fair. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4ef1fc74 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4ef1fc74 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4ef1fc74 Branch: refs/heads/master Commit: 4ef1fc74cf8068b5c41cc1f4d1c97967658a465b Parents: 5ac9657 Author: gtully Authored: Thu Mar 30 12:12:13 2017 +0100 Committer: gtully Committed: Thu Mar 30 12:12:13 2017 +0100 ---------------------------------------------------------------------- .../java/org/apache/activemq/broker/TransportConnection.java | 2 +- .../java/org/apache/activemq/network/NetworkBridgeFactory.java | 5 +++-- .../main/java/org/apache/activemq/network/NetworkConnector.java | 2 +- .../apache/activemq/usecases/BacklogNetworkCrossTalkTest.java | 4 ++-- 4 files changed, 7 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/4ef1fc74/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index 69d29bc..a444d27 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -1428,7 +1428,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } setDuplexNetworkConnectorId(duplexNetworkConnectorId); } - Transport localTransport = NetworkBridgeFactory.createLocalTransport(broker.getVmConnectorURI()); + Transport localTransport = NetworkBridgeFactory.createLocalTransport(config, broker.getVmConnectorURI()); Transport remoteBridgeTransport = transport; if (! (remoteBridgeTransport instanceof ResponseCorrelator)) { // the vm transport case is already wrapped http://git-wip-us.apache.org/repos/asf/activemq/blob/4ef1fc74/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java index 32711a4..32a7853 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java @@ -58,8 +58,9 @@ public final class NetworkBridgeFactory { return result; } - public static Transport createLocalTransport(URI uri) throws Exception { - return createLocalTransport(uri, false); + public static Transport createLocalTransport(NetworkBridgeConfiguration configuration, URI uri) throws Exception { + // one end of the localbroker<->bridge transport needs to be async to allow concurrent forwards and acks + return createLocalTransport(uri, !configuration.isDispatchAsync()); } public static Transport createLocalAsyncTransport(URI uri) throws Exception { http://git-wip-us.apache.org/repos/asf/activemq/blob/4ef1fc74/activemq-broker/src/main/java/org/apache/activemq/network/NetworkConnector.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkConnector.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkConnector.java index f943b82..62192d3 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkConnector.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkConnector.java @@ -140,7 +140,7 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem } protected Transport createLocalTransport() throws Exception { - return NetworkBridgeFactory.createLocalTransport(localURI); + return NetworkBridgeFactory.createLocalTransport(this, localURI); } public static ActiveMQDestination[] getDurableTopicDestinations(final Set durableDestinations) { http://git-wip-us.apache.org/repos/asf/activemq/blob/4ef1fc74/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/BacklogNetworkCrossTalkTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/BacklogNetworkCrossTalkTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/BacklogNetworkCrossTalkTest.java index 6538af7..d1a6693 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/BacklogNetworkCrossTalkTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/BacklogNetworkCrossTalkTest.java @@ -52,7 +52,7 @@ public class BacklogNetworkCrossTalkTest extends JmsMultipleBrokersTestSupport { waitForBridgeFormation(); - final int numMessages = 2000; + final int numMessages = 1000; // Create queue ActiveMQDestination destA = createDestination("AAA", false); sendMessages("A", destA, numMessages); @@ -88,7 +88,7 @@ public class BacklogNetworkCrossTalkTest extends JmsMultipleBrokersTestSupport { @Override public void setUp() throws Exception { - messageSize = 500; + messageSize = 5000; super.setMaxTestTime(10*60*1000); super.setAutoFail(true); super.setUp();