activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [3/4] activemq-artemis git commit: ARTEMIS-1776 Blocked Bridge is not resuming after reconnect
Date Tue, 17 Apr 2018 14:58:42 GMT
ARTEMIS-1776 Blocked Bridge is not resuming after reconnect

This is still part of ARTEMIS-1776 fix, which still part of the same release as we are on
now.
Hence I'm not opening a new JIRA for this one.

Cherry-picked from e5bce13316f7e81bb15a12592622df2ea2632a35


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3c546092
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3c546092
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3c546092

Branch: refs/heads/1.x
Commit: 3c546092788c1d6cf7f16d5a2b0938de00f2410a
Parents: 2c73bd0
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Fri Apr 6 10:00:43 2018 -0400
Committer: JiriOndrusek <jondruse@redhat.com>
Committed: Wed Apr 11 08:28:19 2018 +0200

----------------------------------------------------------------------
 .../core/server/cluster/impl/BridgeImpl.java    |   9 ++
 .../integration/cluster/bridge/BridgeTest.java  | 148 +++++++++++++++++++
 2 files changed, 157 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3c546092/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index dbbb4ea..46c978a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -221,6 +221,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
       this.server = server;
    }
 
+   /** For tests mainly */
+   public boolean isBlockedOnFlowControl() {
+      return blockedOnFlowControl;
+   }
+
    public static final byte[] getDuplicateBytes(final UUID nodeUUID, final long messageID)
{
       byte[] bytes = new byte[24];
 
@@ -922,6 +927,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
                }
             }
 
+            // need to reset blockedOnFlowControl after creating a new producer
+            // otherwise in case the bridge was blocked before a previous failure
+            // this would never resume
+            blockedOnFlowControl = false;
             producer = session.createProducer();
             session.addFailureListener(BridgeImpl.this);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3c546092/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
index 03ceb1a..9c1ca65 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
@@ -73,10 +73,13 @@ import org.apache.activemq.artemis.core.server.cluster.Transformer;
 import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.artemis.utils.LinkedListIterator;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.apache.activemq.artemis.utils.ReusableLatch;
@@ -249,6 +252,151 @@ public class BridgeTest extends ActiveMQTestBase {
       System.out.println(timeTaken + "ms");
    }
 
+   @Test
+   public void testBlockedBridgeAndReconnect() throws Exception {
+      long time = System.currentTimeMillis();
+      Map<String, Object> server0Params = new HashMap<>();
+      server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
+
+      Map<String, Object> server1Params = new HashMap<>();
+      addTargetParameters(server1Params);
+      server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+      server1.getAddressSettingsRepository().clear();
+      server1.getAddressSettingsRepository().addMatch("#", new AddressSettings().setMaxSizeBytes(10124
* 10).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK));
+
+      server0.getAddressSettingsRepository().clear();
+      server0.getAddressSettingsRepository().addMatch("#", new AddressSettings().setMaxSizeBytes(Long.MAX_VALUE).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK));
+
+      final String testAddress = "testAddress";
+      final String queueName0 = "queue0";
+      final String forwardAddress = "forwardAddress";
+      final String queueName1 = "queue1";
+
+      TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+
+      TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+
+      HashMap<String, TransportConfiguration> connectors = new HashMap<>();
+      connectors.put(server1tc.getName(), server1tc);
+      server0.getConfiguration().setConnectorConfigurations(connectors);
+
+      final int messageSize = 1024;
+
+      final int numMessages = 1000;
+
+      ArrayList<String> connectorConfig = new ArrayList<>();
+      connectorConfig.add(server1tc.getName());
+      BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(100).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(true).setConfirmationWindowSize(numMessages
* messageSize / 2).setStaticConnectors(connectorConfig).setProducerWindowSize(1024);
+
+      List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
+      bridgeConfigs.add(bridgeConfiguration);
+      server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+      CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(queueName0);
+      List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<>();
+      queueConfigs0.add(queueConfig0);
+      server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+      CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration().setAddress(forwardAddress).setName(queueName1);
+      List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<>();
+      queueConfigs1.add(queueConfig1);
+      server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+      server1.start();
+      server0.start();
+      locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc));
+      ClientSessionFactory sf0 = addSessionFactory(locator.createSessionFactory(server0tc));
+
+      ClientSessionFactory sf1 = addSessionFactory(locator.createSessionFactory(server1tc));
+
+      ClientSession session0 = sf0.createSession(false, true, 0);
+      ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+
+      ClientSession session1 = sf1.createSession(true, true, 0);
+      ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+
+      session1.start();
+
+      final byte[] bytes = new byte[messageSize];
+
+      final SimpleString propKey = new SimpleString("testkey");
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = session0.createMessage(true);
+
+         message.putIntProperty(propKey, i);
+
+         message.getBodyBuffer().writeBytes(bytes);
+
+         producer0.send(message);
+
+         if (i % 100 == 0) {
+            session0.commit();
+         }
+      }
+      session0.commit();
+
+      for (int i = 0; i < numMessages / 2; i++) {
+         ClientMessage message = consumer1.receive(5000);
+
+         Assert.assertNotNull(message);
+
+         Assert.assertEquals(i, message.getObjectProperty(propKey));
+
+         message.acknowledge();
+      }
+      session1.commit();
+
+      BridgeImpl bridge = (BridgeImpl)server0.getClusterManager().getBridges().get("bridge1");
+
+      // stop in the middle. wait the bridge to block
+      Wait.waitFor(() -> bridge.isBlockedOnFlowControl(),  5000, 100);
+
+      session1.close();
+      sf1.close();
+
+      // now restart the server.. the bridge should be reconnecting now
+      server1.stop();
+      server1.start();
+
+      sf1 = addSessionFactory(locator.createSessionFactory(server1tc));
+      session1 = sf1.createSession(true, true, 0);
+      consumer1 = session1.createConsumer(queueName1);
+      session1.start();
+
+      // consume the rest of the messages
+      for (int i = numMessages / 2; i < numMessages; i++) {
+         ClientMessage message = consumer1.receive(5000);
+
+         Assert.assertNotNull(message);
+
+         Assert.assertEquals(i, message.getObjectProperty(propKey));
+
+         message.acknowledge();
+      }
+
+
+      Wait.waitFor(() -> server0.locateQueue(SimpleString.toSimpleString("queue0")).getMessageCount()
== 0,  5000, 100);
+
+      Assert.assertNull(consumer1.receiveImmediate());
+
+      session0.close();
+
+      session1.close();
+
+      sf0.close();
+
+      sf1.close();
+
+      closeFields();
+      if (server0.getConfiguration().isPersistenceEnabled()) {
+         assertEquals(0, loadQueues(server0).size());
+      }
+      long timeTaken = System.currentTimeMillis() - time;
+      System.out.println(timeTaken + "ms");
+   }
+
    public void internaltestSimpleBridge(final boolean largeMessage, final boolean useFiles)
throws Exception {
       Map<String, Object> server0Params = new HashMap<>();
       server0 = createClusteredServerWithParams(isNetty(), 0, useFiles, server0Params);


Mime
View raw message