ARTEMIS-1680 - Synchronize message load balacing type between brokers
This guarantees the update of message load balancing type between addresses and linked adresses
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/13e07115
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/13e07115
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/13e07115
Branch: refs/heads/master
Commit: 13e071158da465aad0f2608076f078143d9ce83c
Parents: 8dfa345
Author: raul.valdoleiros <raul.valdoleiros@ceiia.com>
Authored: Tue Feb 13 15:55:46 2018 +0000
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Mon Feb 19 11:33:09 2018 -0500
----------------------------------------------------------------------
.../artemis/core/postoffice/AddressManager.java | 3 +
.../artemis/core/postoffice/PostOffice.java | 3 +
.../core/postoffice/impl/PostOfficeImpl.java | 6 ++
.../postoffice/impl/SimpleAddressManager.java | 6 ++
.../postoffice/impl/WildcardAddressManager.java | 19 ++++
.../cluster/impl/ClusterConnectionImpl.java | 14 +--
.../mqtt/imported/MqttClusterWildcardTest.java | 93 ++++++++++++++++++++
.../core/server/impl/fakes/FakePostOffice.java | 6 ++
8 files changed, 145 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/13e07115/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
index 858754d..c8c0428 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
@@ -22,6 +22,7 @@ import java.util.Set;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -76,4 +77,6 @@ public interface AddressManager {
AddressInfo getAddressInfo(SimpleString address);
+ void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType
messageLoadBalancingType) throws Exception;
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/13e07115/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
index b78883f..5d081a3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
@@ -29,6 +29,7 @@ import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -150,4 +151,6 @@ public interface PostOffice extends ActiveMQComponent {
boolean isAddressBound(SimpleString address) throws Exception;
Set<SimpleString> getAddresses();
+
+ void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType
messageLoadBalancingType) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/13e07115/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 73d6953..3f9356c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -69,6 +69,7 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
@@ -1006,6 +1007,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener,
Binding
}
@Override
+ public void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType
messageLoadBalancingType) throws Exception {
+ addressManager.updateMessageLoadBalancingTypeForAddress(address, messageLoadBalancingType);
+ }
+
+ @Override
public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws
Exception {
return addressManager.getMatchingQueue(address, routingType);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/13e07115/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
index 054b536..aa94de2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
@@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.CompositeAddress;
@@ -346,4 +347,9 @@ public class SimpleAddressManager implements AddressManager {
public AddressInfo getAddressInfo(SimpleString addressName) {
return addressInfoMap.get(addressName);
}
+
+ @Override
+ public void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType
messageLoadBalancingType) throws Exception {
+ getBindingsForRoutingAddress(address).setMessageLoadBalancingType(messageLoadBalancingType);
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/13e07115/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java
index eb242f3..2180e0b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java
@@ -23,6 +23,7 @@ import org.apache.activemq.artemis.core.postoffice.Address;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -107,6 +108,24 @@ public class WildcardAddressManager extends SimpleAddressManager {
return exists;
}
+ @Override
+ public void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType
messageLoadBalancingType) throws Exception {
+ Address add = addAndUpdateAddressMap(address);
+ Bindings bindingsForRoutingAddress = super.getBindingsForRoutingAddress(address);
+ if (bindingsForRoutingAddress != null) {
+ bindingsForRoutingAddress.setMessageLoadBalancingType(messageLoadBalancingType);
+ }
+ if (add.containsWildCard()) {
+ for (Address destAdd : add.getLinkedAddresses()) {
+ getBindingsForRoutingAddress(destAdd.getAddress()).setMessageLoadBalancingType(messageLoadBalancingType);
+ }
+ } else {
+ for (Address destAdd : add.getLinkedAddresses()) {
+ super.getBindingsForRoutingAddress(destAdd.getAddress()).setMessageLoadBalancingType(messageLoadBalancingType);
+ }
+ }
+ }
+
/**
* If the address is a wild card then the binding will be removed from the actual mappings
for any linked address.
* otherwise it will be removed as normal.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/13e07115/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index 454ba6f..70923be 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -48,7 +48,6 @@ import org.apache.activemq.artemis.core.client.impl.Topology;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
-import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
@@ -1233,9 +1232,13 @@ public final class ClusterConnectionImpl implements ClusterConnection,
AfterConn
} catch (Exception ignore) {
}
- Bindings theBindings = postOffice.getBindingsForAddress(queueAddress);
-
- theBindings.setMessageLoadBalancingType(messageLoadBalancingType);
+ try {
+ postOffice.updateMessageLoadBalancingTypeForAddress(queueAddress, messageLoadBalancingType);
+ } catch (Exception e) {
+ if (logger.isTraceEnabled()) {
+ logger.trace(e.getLocalizedMessage(), e);
+ }
+ }
}
@@ -1256,7 +1259,8 @@ public final class ClusterConnectionImpl implements ClusterConnection,
AfterConn
RemoteQueueBinding binding = bindings.remove(clusterName);
if (binding == null) {
- throw new IllegalStateException("Cannot find binding for queue " + clusterName);
+ logger.warn("Cannot remove binding, because cannot find binding for queue " +
clusterName);
+ return;
}
postOffice.removeBinding(binding.getUniqueName(), null, false);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/13e07115/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java
index 5485f57..105e7d7 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java
@@ -112,6 +112,99 @@ public class MqttClusterWildcardTest extends ClusterTestBase {
}
}
+ @Test
+ public void wildcardsWithBroker1Disconnected() throws Exception {
+ BlockingConnection connection1 = null;
+ BlockingConnection connection2 = null;
+ BlockingConnection connection3 = null;
+ final String TOPIC = "test/+/some/#";
+ try {
+
+ WildcardConfiguration wildcardConfiguration = new WildcardConfiguration();
+ wildcardConfiguration.setAnyWords('#');
+ wildcardConfiguration.setDelimiter('/');
+ wildcardConfiguration.setRoutingEnabled(true);
+ wildcardConfiguration.setSingleWord('+');
+
+ setupServer(0, false, isNetty());
+ servers[0].getConfiguration().setWildCardConfiguration(wildcardConfiguration);
+
+
+ setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(),
0, 1);
+
+ startServers(0);
+
+
+ connection1 = retrieveMQTTConnection("tcp://localhost:61616");
+
+
+ // Subscribe to topics
+ Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)};
+ connection1.subscribe(topics);
+
+ waitForBindings(0, TOPIC, 1, 1, true);
+ waitForBindings(0, TOPIC, 0, 0, false);
+
+ // Publish Messages
+ String payload1 = "This is message 1";
+ String payload2 = "This is message 2";
+ String payload3 = "This is message 3";
+
+ connection1.publish("test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false);
+ connection1.publish("test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false);
+ connection1.publish("test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false);
+
+ Message message1 = connection1.receive(5, TimeUnit.SECONDS);
+
+ setupServer(1, false, isNetty());
+ servers[1].getConfiguration().setWildCardConfiguration(wildcardConfiguration);
+
+ setupClusterConnection("cluster1", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(),
1, 0);
+ startServers(1);
+
+ connection2 = retrieveMQTTConnection("tcp://localhost:61617");
+ connection3 = retrieveMQTTConnection("tcp://localhost:61617");
+ connection2.subscribe(topics);
+ connection3.subscribe(new Topic[]{new Topic("teste/1/some/1", QoS.AT_MOST_ONCE)});
+
+ waitForBindings(1, TOPIC, 1, 1, false);
+ waitForBindings(1, TOPIC, 1, 1, true);
+
+ connection1.publish("test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false);
+ connection1.publish("test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false);
+ connection1.publish("test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false);
+
+
+ Message message2 = connection1.receive(5, TimeUnit.SECONDS);
+ Message message3 = connection1.receive(5, TimeUnit.SECONDS);
+ Message message4 = connection2.receive(5, TimeUnit.SECONDS);
+ Message message5 = connection2.receive(5, TimeUnit.SECONDS);
+ Message message6 = connection2.receive(5, TimeUnit.SECONDS);
+
+ assertEquals(payload1, new String(message1.getPayload()));
+ assertEquals(payload2, new String(message2.getPayload()));
+ assertEquals(payload3, new String(message3.getPayload()));
+ assertEquals(payload1, new String(message4.getPayload()));
+ assertEquals(payload2, new String(message5.getPayload()));
+ assertEquals(payload3, new String(message6.getPayload()));
+
+ } finally {
+ String[] topics = new String[]{TOPIC};
+ if (connection1 != null) {
+ connection1.unsubscribe(topics);
+ connection1.disconnect();
+ }
+ if (connection2 != null) {
+ connection2.unsubscribe(topics);
+ connection2.disconnect();
+ }
+ if (connection3 != null) {
+ connection3.unsubscribe(new String[]{"teste/1/some/1"});
+ connection3.disconnect();
+ }
+ }
+ }
+
private static BlockingConnection retrieveMQTTConnection(String host) throws Exception
{
MQTT mqtt = new MQTT();
mqtt.setHost(host);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/13e07115/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
index 96c7451..e455e41 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
@@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -69,6 +70,11 @@ public class FakePostOffice implements PostOffice {
}
@Override
+ public void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType
messageLoadBalancingType) throws Exception {
+
+ }
+
+ @Override
public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) {
return null;
|