activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [15/15] activemq-artemis git commit: ARTEMIS-1920 AMQP throw NPE if can't find a backup server
Date Sat, 09 Jun 2018 05:57:25 GMT
ARTEMIS-1920 AMQP throw NPE if can't find a backup server

(cherry picked from commit de0747a9a40f44c309287a54f324cc9a21f31b93)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8058a512
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8058a512
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8058a512

Branch: refs/heads/2.6.x
Commit: 8058a512e5fe97f1a139dc00a3c16d56ee92882a
Parents: 40b66d1
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Fri Jun 8 16:27:42 2018 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Sat Jun 9 01:56:49 2018 -0400

----------------------------------------------------------------------
 .../amqp/broker/AMQPConnectionCallback.java     |  4 +-
 .../core/postoffice/impl/BindingsImpl.java      |  2 +-
 .../ProtocolsMessageLoadBalancingTest.java      | 85 ++++++++++++++++++++
 3 files changed, 89 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8058a512/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
index 05b4f4f..84fdd24 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
@@ -291,7 +291,9 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener
{
       ClusterConnection clusterConnection = clusterManager.getDefaultConnection(null);
       if (clusterConnection != null) {
          TopologyMemberImpl member = clusterConnection.getTopology().getMember(server.getNodeID().toString());
-         return member.toBackupURI();
+         if (member != null) {
+            return member.toBackupURI();
+         }
       }
       return null;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8058a512/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index 478c700..c669eba 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -258,7 +258,7 @@ public final class BindingsImpl implements Bindings {
                if (entry.getValue() instanceof RemoteQueueBinding) {
                   RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) entry.getValue();
                   if (remoteQueueBinding.getRemoteQueueID() == id) {
-                     message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array());
+                     message.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array());
                   }
                }
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8058a512/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java
index 1c98157..8ed685c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java
@@ -26,6 +26,7 @@ import javax.jms.Session;
 import java.util.Arrays;
 import java.util.Collection;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
@@ -37,7 +38,9 @@ import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImp
 import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.junit.Wait;
 import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.junit.Assert;
@@ -282,6 +285,88 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase
{
       connection.close();
    }
 
+   @Test
+   public void testRestartConnection() throws Exception {
+
+      startServers(MessageLoadBalancingType.STRICT);
+
+      System.out.println("connections " + servers[1].getRemotingService().getConnections().size());
+
+      Wait.assertEquals(3, () -> servers[1].getRemotingService().getConnections().size());
+      Wait.assertEquals(3, () -> servers[0].getRemotingService().getConnections().size());
+
+      RemotingConnection[] connectionsServer1 = servers[1].getRemotingService().getConnections().toArray(new
RemotingConnection[3]);
+      RemotingConnection[] connectionsServer0 = servers[0].getRemotingService().getConnections().toArray(new
RemotingConnection[3]);
+
+      ConnectionFactory[] factory = new ConnectionFactory[NUMBER_OF_SERVERS];
+      Connection[] connection = new Connection[NUMBER_OF_SERVERS];
+      Session[] session = new Session[NUMBER_OF_SERVERS];
+      MessageConsumer[] consumer = new MessageConsumer[NUMBER_OF_SERVERS];
+
+      // this will pre create consumers to make sure messages are distributed evenly without
redistribution
+      for (int node = 0; node < NUMBER_OF_SERVERS; node++) {
+         factory[node] = getJmsConnectionFactory(node);
+         connection[node] = factory[node].createConnection();
+         session[node] = connection[node].createSession(false, Session.AUTO_ACKNOWLEDGE);
+         consumer[node] = session[node].createConsumer(session[node].createQueue(queueName.toString()));
+      }
+
+      waitForBindings(0, "queues.0", 1, 1, true);
+      waitForBindings(1, "queues.0", 1, 1, true);
+
+      waitForBindings(0, "queues.0", 1, 1, false);
+      waitForBindings(1, "queues.0", 1, 1, false);
+
+      for (RemotingConnection remotingConnection : servers[1].getRemotingService().getConnections())
{
+         remotingConnection.fail(new ActiveMQException("forcing failure"));
+      }
+      for (RemotingConnection remotingConnection : servers[1].getRemotingService().getConnections())
{
+         remotingConnection.fail(new ActiveMQException("forcing failure"));
+      }
+
+      // this is to allow reconnects
+      Thread.sleep(500);
+
+      // this will pre create consumers to make sure messages are distributed evenly without
redistribution
+      for (int node = 0; node < NUMBER_OF_SERVERS; node++) {
+         try {
+            connection[node].close();
+         } catch (Throwable e) {
+            e.printStackTrace();
+         }
+         factory[node] = getJmsConnectionFactory(node);
+         connection[node] = factory[node].createConnection();
+         session[node] = connection[node].createSession(false, Session.AUTO_ACKNOWLEDGE);
+         consumer[node] = session[node].createConsumer(session[node].createQueue(queueName.toString()));
+      }
+
+      waitForBindings(0, "queues.0", 1, 1, true);
+      waitForBindings(1, "queues.0", 1, 1, true);
+
+      waitForBindings(0, "queues.0", 1, 1, false);
+      waitForBindings(1, "queues.0", 1, 1, false);
+
+      System.out.println("connections " + servers[1].getRemotingService().getConnections().size());
+
+      // sending Messages.. they should be load balanced
+      {
+         ConnectionFactory cf = getJmsConnectionFactory(0);
+         Connection cn = cf.createConnection();
+         Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer pd = sn.createProducer(sn.createQueue(queueName.toString()));
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            pd.send(sn.createTextMessage("hello " + i));
+         }
+
+         cn.close();
+      }
+
+      receiveMessages(connection[0], consumer[0], NUMBER_OF_MESSAGES / 2, true);
+      receiveMessages(connection[1], consumer[1], NUMBER_OF_MESSAGES / 2, true);
+
+   }
+
    private void receiveMessages(Connection connection,
                                 MessageConsumer messageConsumer,
                                 int messageCount,


Mime
View raw message