From commits-return-51078-archive-asf-public=cust-asf.ponee.io@activemq.apache.org Tue Apr 17 16:58:42 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 237EA18076D for ; Tue, 17 Apr 2018 16:58:41 +0200 (CEST) Received: (qmail 42188 invoked by uid 500); 17 Apr 2018 14:58:41 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 42175 invoked by uid 99); 17 Apr 2018 14:58:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Apr 2018 14:58:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BB19CF5E28; Tue, 17 Apr 2018 14:58:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Tue, 17 Apr 2018 14:58:42 -0000 Message-Id: <5e78c3b5d3b5459f8cfb51bc0fd37692@git.apache.org> In-Reply-To: <369578dae9e64f02879d2877caa1b957@git.apache.org> References: <369578dae9e64f02879d2877caa1b957@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/4] activemq-artemis git commit: ARTEMIS-1776 Blocked Bridge is not resuming after reconnect ARTEMIS-1776 Blocked Bridge is not resuming after reconnect This is still part of ARTEMIS-1776 fix, which still part of the same release as we are on now. Hence I'm not opening a new JIRA for this one. Cherry-picked from e5bce13316f7e81bb15a12592622df2ea2632a35 Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3c546092 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3c546092 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3c546092 Branch: refs/heads/1.x Commit: 3c546092788c1d6cf7f16d5a2b0938de00f2410a Parents: 2c73bd0 Author: Clebert Suconic Authored: Fri Apr 6 10:00:43 2018 -0400 Committer: JiriOndrusek Committed: Wed Apr 11 08:28:19 2018 +0200 ---------------------------------------------------------------------- .../core/server/cluster/impl/BridgeImpl.java | 9 ++ .../integration/cluster/bridge/BridgeTest.java | 148 +++++++++++++++++++ 2 files changed, 157 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3c546092/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index dbbb4ea..46c978a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -221,6 +221,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled this.server = server; } + /** For tests mainly */ + public boolean isBlockedOnFlowControl() { + return blockedOnFlowControl; + } + public static final byte[] getDuplicateBytes(final UUID nodeUUID, final long messageID) { byte[] bytes = new byte[24]; @@ -922,6 +927,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } } + // need to reset blockedOnFlowControl after creating a new producer + // otherwise in case the bridge was blocked before a previous failure + // this would never resume + blockedOnFlowControl = false; producer = session.createProducer(); session.addFailureListener(BridgeImpl.this); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3c546092/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java index 03ceb1a..9c1ca65 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java @@ -73,10 +73,13 @@ import org.apache.activemq.artemis.core.server.cluster.Transformer; import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.utils.LinkedListIterator; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.ReusableLatch; @@ -249,6 +252,151 @@ public class BridgeTest extends ActiveMQTestBase { System.out.println(timeTaken + "ms"); } + @Test + public void testBlockedBridgeAndReconnect() throws Exception { + long time = System.currentTimeMillis(); + Map server0Params = new HashMap<>(); + server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params); + + Map server1Params = new HashMap<>(); + addTargetParameters(server1Params); + server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params); + server1.getAddressSettingsRepository().clear(); + server1.getAddressSettingsRepository().addMatch("#", new AddressSettings().setMaxSizeBytes(10124 * 10).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK)); + + server0.getAddressSettingsRepository().clear(); + server0.getAddressSettingsRepository().addMatch("#", new AddressSettings().setMaxSizeBytes(Long.MAX_VALUE).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK)); + + final String testAddress = "testAddress"; + final String queueName0 = "queue0"; + final String forwardAddress = "forwardAddress"; + final String queueName1 = "queue1"; + + TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params); + + TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params); + + HashMap connectors = new HashMap<>(); + connectors.put(server1tc.getName(), server1tc); + server0.getConfiguration().setConnectorConfigurations(connectors); + + final int messageSize = 1024; + + final int numMessages = 1000; + + ArrayList connectorConfig = new ArrayList<>(); + connectorConfig.add(server1tc.getName()); + BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(100).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(true).setConfirmationWindowSize(numMessages * messageSize / 2).setStaticConnectors(connectorConfig).setProducerWindowSize(1024); + + List bridgeConfigs = new ArrayList<>(); + bridgeConfigs.add(bridgeConfiguration); + server0.getConfiguration().setBridgeConfigurations(bridgeConfigs); + + CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(queueName0); + List queueConfigs0 = new ArrayList<>(); + queueConfigs0.add(queueConfig0); + server0.getConfiguration().setQueueConfigurations(queueConfigs0); + + CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration().setAddress(forwardAddress).setName(queueName1); + List queueConfigs1 = new ArrayList<>(); + queueConfigs1.add(queueConfig1); + server1.getConfiguration().setQueueConfigurations(queueConfigs1); + + server1.start(); + server0.start(); + locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc)); + ClientSessionFactory sf0 = addSessionFactory(locator.createSessionFactory(server0tc)); + + ClientSessionFactory sf1 = addSessionFactory(locator.createSessionFactory(server1tc)); + + ClientSession session0 = sf0.createSession(false, true, 0); + ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress)); + + ClientSession session1 = sf1.createSession(true, true, 0); + ClientConsumer consumer1 = session1.createConsumer(queueName1); + + + session1.start(); + + final byte[] bytes = new byte[messageSize]; + + final SimpleString propKey = new SimpleString("testkey"); + + for (int i = 0; i < numMessages; i++) { + ClientMessage message = session0.createMessage(true); + + message.putIntProperty(propKey, i); + + message.getBodyBuffer().writeBytes(bytes); + + producer0.send(message); + + if (i % 100 == 0) { + session0.commit(); + } + } + session0.commit(); + + for (int i = 0; i < numMessages / 2; i++) { + ClientMessage message = consumer1.receive(5000); + + Assert.assertNotNull(message); + + Assert.assertEquals(i, message.getObjectProperty(propKey)); + + message.acknowledge(); + } + session1.commit(); + + BridgeImpl bridge = (BridgeImpl)server0.getClusterManager().getBridges().get("bridge1"); + + // stop in the middle. wait the bridge to block + Wait.waitFor(() -> bridge.isBlockedOnFlowControl(), 5000, 100); + + session1.close(); + sf1.close(); + + // now restart the server.. the bridge should be reconnecting now + server1.stop(); + server1.start(); + + sf1 = addSessionFactory(locator.createSessionFactory(server1tc)); + session1 = sf1.createSession(true, true, 0); + consumer1 = session1.createConsumer(queueName1); + session1.start(); + + // consume the rest of the messages + for (int i = numMessages / 2; i < numMessages; i++) { + ClientMessage message = consumer1.receive(5000); + + Assert.assertNotNull(message); + + Assert.assertEquals(i, message.getObjectProperty(propKey)); + + message.acknowledge(); + } + + + Wait.waitFor(() -> server0.locateQueue(SimpleString.toSimpleString("queue0")).getMessageCount() == 0, 5000, 100); + + Assert.assertNull(consumer1.receiveImmediate()); + + session0.close(); + + session1.close(); + + sf0.close(); + + sf1.close(); + + closeFields(); + if (server0.getConfiguration().isPersistenceEnabled()) { + assertEquals(0, loadQueues(server0).size()); + } + long timeTaken = System.currentTimeMillis() - time; + System.out.println(timeTaken + "ms"); + } + public void internaltestSimpleBridge(final boolean largeMessage, final boolean useFiles) throws Exception { Map server0Params = new HashMap<>(); server0 = createClusteredServerWithParams(isNetty(), 0, useFiles, server0Params);