activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [3/6] activemq-artemis git commit: ARTEMIS-19 allow disabling of message load-balancing
Date Wed, 03 Jun 2015 23:30:19 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/colocated-failover/src/main/resources/activemq/server0/broker.xml
----------------------------------------------------------------------
diff --git a/examples/jms/colocated-failover/src/main/resources/activemq/server0/broker.xml b/examples/jms/colocated-failover/src/main/resources/activemq/server0/broker.xml
index 5b3221d..363ae96 100644
--- a/examples/jms/colocated-failover/src/main/resources/activemq/server0/broker.xml
+++ b/examples/jms/colocated-failover/src/main/resources/activemq/server0/broker.xml
@@ -72,7 +72,7 @@ under the License.
             <connector-ref>netty-connector</connector-ref>
             <retry-interval>500</retry-interval>
             <use-duplicate-detection>true</use-duplicate-detection>
-            <forward-when-no-consumers>true</forward-when-no-consumers>
+            <message-load-balancing>STRICT</message-load-balancing>
             <max-hops>1</max-hops>
             <discovery-group-ref discovery-group-name="my-discovery-group"/>
          </cluster-connection>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/colocated-failover/src/main/resources/activemq/server1/broker.xml
----------------------------------------------------------------------
diff --git a/examples/jms/colocated-failover/src/main/resources/activemq/server1/broker.xml b/examples/jms/colocated-failover/src/main/resources/activemq/server1/broker.xml
index 84f38f1..464fdee 100644
--- a/examples/jms/colocated-failover/src/main/resources/activemq/server1/broker.xml
+++ b/examples/jms/colocated-failover/src/main/resources/activemq/server1/broker.xml
@@ -72,7 +72,7 @@ under the License.
             <connector-ref>netty-connector</connector-ref>
             <retry-interval>500</retry-interval>
             <use-duplicate-detection>true</use-duplicate-detection>
-            <forward-when-no-consumers>true</forward-when-no-consumers>
+            <message-load-balancing>STRICT</message-load-balancing>
             <max-hops>1</max-hops>
             <discovery-group-ref discovery-group-name="my-discovery-group"/>
          </cluster-connection>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/ha-policy-autobackup/src/main/resources/activemq/server0/broker.xml
----------------------------------------------------------------------
diff --git a/examples/jms/ha-policy-autobackup/src/main/resources/activemq/server0/broker.xml b/examples/jms/ha-policy-autobackup/src/main/resources/activemq/server0/broker.xml
index 8c38aea..79dd692 100644
--- a/examples/jms/ha-policy-autobackup/src/main/resources/activemq/server0/broker.xml
+++ b/examples/jms/ha-policy-autobackup/src/main/resources/activemq/server0/broker.xml
@@ -81,7 +81,7 @@ under the License.
             <connector-ref>netty-connector</connector-ref>
             <retry-interval>500</retry-interval>
             <use-duplicate-detection>true</use-duplicate-detection>
-            <forward-when-no-consumers>true</forward-when-no-consumers>
+            <message-load-balancing>STRICT</message-load-balancing>
             <max-hops>1</max-hops>
             <static-connectors>
                <connector-ref>server1-connector</connector-ref>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/ha-policy-autobackup/src/main/resources/activemq/server1/broker.xml
----------------------------------------------------------------------
diff --git a/examples/jms/ha-policy-autobackup/src/main/resources/activemq/server1/broker.xml b/examples/jms/ha-policy-autobackup/src/main/resources/activemq/server1/broker.xml
index ca0f811..7b135ac 100644
--- a/examples/jms/ha-policy-autobackup/src/main/resources/activemq/server1/broker.xml
+++ b/examples/jms/ha-policy-autobackup/src/main/resources/activemq/server1/broker.xml
@@ -81,7 +81,7 @@ under the License.
             <connector-ref>netty-connector</connector-ref>
             <retry-interval>500</retry-interval>
             <use-duplicate-detection>true</use-duplicate-detection>
-            <forward-when-no-consumers>true</forward-when-no-consumers>
+            <message-load-balancing>STRICT</message-load-balancing>
             <max-hops>1</max-hops>
             <static-connectors>
                <connector-ref>server0-connector</connector-ref>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/queue-message-redistribution/src/main/resources/activemq/server0/broker.xml
----------------------------------------------------------------------
diff --git a/examples/jms/queue-message-redistribution/src/main/resources/activemq/server0/broker.xml b/examples/jms/queue-message-redistribution/src/main/resources/activemq/server0/broker.xml
index e745c71..2d771b0 100644
--- a/examples/jms/queue-message-redistribution/src/main/resources/activemq/server0/broker.xml
+++ b/examples/jms/queue-message-redistribution/src/main/resources/activemq/server0/broker.xml
@@ -72,7 +72,7 @@ under the License.
             <connector-ref>netty-connector</connector-ref>
             <retry-interval>500</retry-interval>
             <use-duplicate-detection>true</use-duplicate-detection>
-            <forward-when-no-consumers>false</forward-when-no-consumers>
+            <message-load-balancing>ON_DEMAND</message-load-balancing>
             <max-hops>1</max-hops>
             <discovery-group-ref discovery-group-name="my-discovery-group"/>
          </cluster-connection>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/queue-message-redistribution/src/main/resources/activemq/server1/broker.xml
----------------------------------------------------------------------
diff --git a/examples/jms/queue-message-redistribution/src/main/resources/activemq/server1/broker.xml b/examples/jms/queue-message-redistribution/src/main/resources/activemq/server1/broker.xml
index 7bb693f..870be49 100644
--- a/examples/jms/queue-message-redistribution/src/main/resources/activemq/server1/broker.xml
+++ b/examples/jms/queue-message-redistribution/src/main/resources/activemq/server1/broker.xml
@@ -73,7 +73,7 @@ under the License.
             <connector-ref>netty-connector</connector-ref>
             <retry-interval>500</retry-interval>
             <use-duplicate-detection>true</use-duplicate-detection>
-            <forward-when-no-consumers>false</forward-when-no-consumers>
+            <message-load-balancing>ON_DEMAND</message-load-balancing>
             <max-hops>1</max-hops>
             <discovery-group-ref discovery-group-name="my-discovery-group"/>
          </cluster-connection>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/scale-down/src/main/resources/activemq/server0/broker.xml
----------------------------------------------------------------------
diff --git a/examples/jms/scale-down/src/main/resources/activemq/server0/broker.xml b/examples/jms/scale-down/src/main/resources/activemq/server0/broker.xml
index 4a38a74..33de64f 100644
--- a/examples/jms/scale-down/src/main/resources/activemq/server0/broker.xml
+++ b/examples/jms/scale-down/src/main/resources/activemq/server0/broker.xml
@@ -73,7 +73,7 @@ under the License.
             <retry-interval>500</retry-interval>
             <reconnect-attempts>5</reconnect-attempts>
             <use-duplicate-detection>true</use-duplicate-detection>
-            <forward-when-no-consumers>true</forward-when-no-consumers>
+            <message-load-balancing>STRICT</message-load-balancing>
             <max-hops>1</max-hops>
             <discovery-group-ref discovery-group-name="my-discovery-group"/>
          </cluster-connection>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/scale-down/src/main/resources/activemq/server1/broker.xml
----------------------------------------------------------------------
diff --git a/examples/jms/scale-down/src/main/resources/activemq/server1/broker.xml b/examples/jms/scale-down/src/main/resources/activemq/server1/broker.xml
index 57fa72d..7602ef9 100644
--- a/examples/jms/scale-down/src/main/resources/activemq/server1/broker.xml
+++ b/examples/jms/scale-down/src/main/resources/activemq/server1/broker.xml
@@ -74,7 +74,7 @@ under the License.
             <retry-interval>500</retry-interval>
             <reconnect-attempts>5</reconnect-attempts>
             <use-duplicate-detection>true</use-duplicate-detection>
-            <forward-when-no-consumers>true</forward-when-no-consumers>
+            <message-load-balancing>STRICT</message-load-balancing>
             <max-hops>1</max-hops>
             <discovery-group-ref discovery-group-name="my-discovery-group"/>
          </cluster-connection>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/symmetric-cluster/readme.html
----------------------------------------------------------------------
diff --git a/examples/jms/symmetric-cluster/readme.html b/examples/jms/symmetric-cluster/readme.html
index 116735c..8be9587 100644
--- a/examples/jms/symmetric-cluster/readme.html
+++ b/examples/jms/symmetric-cluster/readme.html
@@ -46,7 +46,7 @@ under the License.
      <p>Using UDP discovery makes configuration simpler since we don't have to know what nodes are
      available at any one time.</p>
      <p>Here's the relevant snippet from the server configuration, which tells the server to form a cluster
-     with the other nodes:</p>     
+     with the other nodes:</p>
      <pre class="prettyprint">
      <code>
    &lt;cluster-connection name="my-cluster"&gt;
@@ -54,19 +54,19 @@ under the License.
       &lt;connector-ref>netty-connector&lt;/connector-ref>
 	   &lt;retry-interval&gt;500&lt;/retry-interval&gt;
 	   &lt;use-duplicate-detection&gt;true&lt;/use-duplicate-detection&gt;
-	   &lt;forward-when-no-consumers&gt;true&lt;/forward-when-no-consumers&gt;
+	   &lt;message-load-balancing&gt;STRICT&lt;/message-load-balancing&gt;
 	   &lt;max-hops&gt;1&lt;/max-hops&gt;
 	   &lt;discovery-group-ref discovery-group-name="my-discovery-group"/&gt;
    &lt;/cluster-connection&gt;
    </code>
-     </pre>    
+     </pre>
      <p>In this example we create a symmetric cluster of six live nodes, and we also pair each live node
      with it's own backup node. (A backup node is not strictly necessary for a symmetric cluster).</p>
     <p>In this example will we will demonstrate this by deploying a JMS topic and Queue on all nodes of the cluster
      , sending messages to the queue and topic from different nodes, and verifying messages are received correctly
      by consumers on different nodes.</p>
     <p>For more information on configuring ActiveMQ Artemis clustering in general, please see the clustering
-     section of the user manual.</p>      
+     section of the user manual.</p>
      <h2>Example step-by-step</h2>
      <p><i>To run the example, simply type <code>mvn verify -Pexample</code> from this directory</i></p>
 
@@ -77,12 +77,12 @@ under the License.
          specific server to do that, and that server might not be available at the time. By creating the
          connection factory directly we avoid having to worry about a JNDI look-up.
          In an app server environment you could use HA-JNDI to lookup from the clustered JNDI servers without
-         having to know about a specific one.        
+         having to know about a specific one.
         </li>
-        
+
         <pre class="prettyprint">
            <code>
-   ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactoryWithHA("231.7.7.7", 9876); 
+   ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactoryWithHA("231.7.7.7", 9876);
    </code>
         </pre>
 
@@ -90,8 +90,8 @@ under the License.
         <pre class="prettyprint">
            <code>
    Queue queue = new ActiveMQQueue("exampleQueue");
-         
-   Topic topic = ActiveMQJMSClient.createActiveMQTopic("exampleTopic");           
+
+   Topic topic = ActiveMQJMSClient.createActiveMQTopic("exampleTopic");
            </code>
         </pre>
 
@@ -165,7 +165,7 @@ under the License.
    MessageConsumer consumer0 = session0.createConsumer(queue);
           </code>
         </pre>
-        
+
         <li>We create an anonymous message producer on server 2.</li>
         <pre class="prettyprint">
           <code>
@@ -177,15 +177,15 @@ under the License.
         <pre class="prettyprint">
            <code>
    final int numMessages = 500;
-                  
+
    for (int i = 0; i < numMessages; i++)
    {
       TextMessage message1 = session2.createTextMessage("Topic message 1");
-   
+
       producer2.send(topic, message1);
-      
+
       TextMessage message2 = session2.createTextMessage("Queue message 1");
-      
+
       producer2.send(queue, message2);
    }
            </code>
@@ -262,7 +262,7 @@ under the License.
       {
          connection1.close();
       }
-      
+
       if (connection2 != null)
       {
          connection2.close();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/symmetric-cluster/src/main/resources/activemq/server0/broker.xml
----------------------------------------------------------------------
diff --git a/examples/jms/symmetric-cluster/src/main/resources/activemq/server0/broker.xml b/examples/jms/symmetric-cluster/src/main/resources/activemq/server0/broker.xml
index 312d7a2..27b7489 100644
--- a/examples/jms/symmetric-cluster/src/main/resources/activemq/server0/broker.xml
+++ b/examples/jms/symmetric-cluster/src/main/resources/activemq/server0/broker.xml
@@ -76,7 +76,7 @@ under the License.
             <connector-ref>netty-connector</connector-ref>
             <retry-interval>500</retry-interval>
             <use-duplicate-detection>true</use-duplicate-detection>
-            <forward-when-no-consumers>false</forward-when-no-consumers>
+            <message-load-balancing>ON_DEMAND</message-load-balancing>
             <max-hops>1</max-hops>
             <discovery-group-ref discovery-group-name="my-discovery-group"/>
          </cluster-connection>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/symmetric-cluster/src/main/resources/activemq/server1/broker.xml
----------------------------------------------------------------------
diff --git a/examples/jms/symmetric-cluster/src/main/resources/activemq/server1/broker.xml b/examples/jms/symmetric-cluster/src/main/resources/activemq/server1/broker.xml
index f7c2569..16a7931 100644
--- a/examples/jms/symmetric-cluster/src/main/resources/activemq/server1/broker.xml
+++ b/examples/jms/symmetric-cluster/src/main/resources/activemq/server1/broker.xml
@@ -74,7 +74,7 @@ under the License.
             <connector-ref>netty-connector</connector-ref>
             <retry-interval>500</retry-interval>
             <use-duplicate-detection>true</use-duplicate-detection>
-            <forward-when-no-consumers>false</forward-when-no-consumers>
+            <message-load-balancing>ON_DEMAND</message-load-balancing>
             <max-hops>1</max-hops>
             <discovery-group-ref discovery-group-name="my-discovery-group"/>
          </cluster-connection>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/symmetric-cluster/src/main/resources/activemq/server2/broker.xml
----------------------------------------------------------------------
diff --git a/examples/jms/symmetric-cluster/src/main/resources/activemq/server2/broker.xml b/examples/jms/symmetric-cluster/src/main/resources/activemq/server2/broker.xml
index b87a19e..30f33a0 100644
--- a/examples/jms/symmetric-cluster/src/main/resources/activemq/server2/broker.xml
+++ b/examples/jms/symmetric-cluster/src/main/resources/activemq/server2/broker.xml
@@ -74,7 +74,7 @@ under the License.
             <connector-ref>netty-connector</connector-ref>
             <retry-interval>500</retry-interval>
             <use-duplicate-detection>true</use-duplicate-detection>
-            <forward-when-no-consumers>false</forward-when-no-consumers>
+            <message-load-balancing>ON_DEMAND</message-load-balancing>
             <max-hops>1</max-hops>
             <discovery-group-ref discovery-group-name="my-discovery-group"/>
          </cluster-connection>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/symmetric-cluster/src/main/resources/activemq/server3/broker.xml
----------------------------------------------------------------------
diff --git a/examples/jms/symmetric-cluster/src/main/resources/activemq/server3/broker.xml b/examples/jms/symmetric-cluster/src/main/resources/activemq/server3/broker.xml
index 0374876..afda21b 100644
--- a/examples/jms/symmetric-cluster/src/main/resources/activemq/server3/broker.xml
+++ b/examples/jms/symmetric-cluster/src/main/resources/activemq/server3/broker.xml
@@ -74,7 +74,7 @@ under the License.
             <connector-ref>netty-connector</connector-ref>
             <retry-interval>500</retry-interval>
             <use-duplicate-detection>true</use-duplicate-detection>
-            <forward-when-no-consumers>false</forward-when-no-consumers>
+            <message-load-balancing>ON_DEMAND</message-load-balancing>
             <max-hops>1</max-hops>
             <discovery-group-ref discovery-group-name="my-discovery-group"/>
          </cluster-connection>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/symmetric-cluster/src/main/resources/activemq/server4/broker.xml
----------------------------------------------------------------------
diff --git a/examples/jms/symmetric-cluster/src/main/resources/activemq/server4/broker.xml b/examples/jms/symmetric-cluster/src/main/resources/activemq/server4/broker.xml
index 25b099c..321d57b 100644
--- a/examples/jms/symmetric-cluster/src/main/resources/activemq/server4/broker.xml
+++ b/examples/jms/symmetric-cluster/src/main/resources/activemq/server4/broker.xml
@@ -73,7 +73,7 @@ under the License.
             <connector-ref>netty-connector</connector-ref>
             <retry-interval>500</retry-interval>
             <use-duplicate-detection>true</use-duplicate-detection>
-            <forward-when-no-consumers>false</forward-when-no-consumers>
+            <message-load-balancing>ON_DEMAND</message-load-balancing>
             <max-hops>1</max-hops>
             <discovery-group-ref discovery-group-name="my-discovery-group"/>
          </cluster-connection>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/symmetric-cluster/src/main/resources/activemq/server5/broker.xml
----------------------------------------------------------------------
diff --git a/examples/jms/symmetric-cluster/src/main/resources/activemq/server5/broker.xml b/examples/jms/symmetric-cluster/src/main/resources/activemq/server5/broker.xml
index 8efb4aa..abbe9ad 100644
--- a/examples/jms/symmetric-cluster/src/main/resources/activemq/server5/broker.xml
+++ b/examples/jms/symmetric-cluster/src/main/resources/activemq/server5/broker.xml
@@ -73,7 +73,7 @@ under the License.
             <connector-ref>netty-connector</connector-ref>
             <retry-interval>500</retry-interval>
             <use-duplicate-detection>true</use-duplicate-detection>
-            <forward-when-no-consumers>false</forward-when-no-consumers>
+            <message-load-balancing>ON_DEMAND</message-load-balancing>
             <max-hops>1</max-hops>
             <discovery-group-ref discovery-group-name="my-discovery-group"/>
          </cluster-connection>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredGroupingTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredGroupingTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredGroupingTest.java
index 0572255..732c5ed 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredGroupingTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredGroupingTest.java
@@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
 import org.apache.activemq.artemis.core.server.group.impl.Response;
 import org.apache.activemq.artemis.core.server.management.Notification;
@@ -66,9 +67,9 @@ public class ClusteredGroupingTest extends ClusterTestBase
       setupServer(0, isFileStorage(), isNetty());
       setupServer(1, isFileStorage(), isNetty());
 
-      setupClusterConnection("cluster0", "queues", false, 1, 0, 500, isNetty(), 0, 1);
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 0, 1);
 
-      setupClusterConnection("cluster1", "queues", false, 1,  0, 500, isNetty(), 1, 0);
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1,  0, 500, isNetty(), 1, 0);
 
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
@@ -141,11 +142,11 @@ public class ClusteredGroupingTest extends ClusterTestBase
       setupServer(1, isFileStorage(), isNetty());
       setupServer(2, isFileStorage(), isNetty());
 
-      setupClusterConnection("cluster0", "queues", false, 1,  0, 500, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1,  0, 500, isNetty(), 0, 1, 2);
 
-      setupClusterConnection("cluster1", "queues", false, 1,  0, 500, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1,  0, 500, isNetty(), 1, 0, 2);
 
-      setupClusterConnection("cluster2", "queues", false, 1,  0, 500, isNetty(), 2, 0, 1);
+      setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1,  0, 500, isNetty(), 2, 0, 1);
 
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
@@ -226,11 +227,11 @@ public class ClusteredGroupingTest extends ClusterTestBase
       setupServer(1, isFileStorage(), isNetty());
       setupServer(2, isFileStorage(), isNetty());
 
-      setupClusterConnection("cluster0", "queues", false, 1,  0, 500, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1,  0, 500, isNetty(), 0, 1, 2);
 
-      setupClusterConnection("cluster1", "queues", false, 1,  0, 500, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1,  0, 500, isNetty(), 1, 0, 2);
 
-      setupClusterConnection("cluster2", "queues", false, 1,  0, 500, isNetty(), 2, 0, 1);
+      setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1,  0, 500, isNetty(), 2, 0, 1);
 
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
@@ -312,13 +313,13 @@ public class ClusteredGroupingTest extends ClusterTestBase
       setupServer(2, isFileStorage(), isNetty());
       setupServer(3, isFileStorage(), isNetty());
 
-      setupClusterConnection("cluster0", "queues", false, 1,  0, 500, isNetty(), 0, 1, 2, 3);
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1,  0, 500, isNetty(), 0, 1, 2, 3);
 
-      setupClusterConnection("cluster1", "queues", false, 1,  0, 500, isNetty(), 1, 0, 2, 3);
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1,  0, 500, isNetty(), 1, 0, 2, 3);
 
-      setupClusterConnection("cluster2", "queues", false, 1,  0, 500, isNetty(), 2, 0, 1, 3);
+      setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1,  0, 500, isNetty(), 2, 0, 1, 3);
 
-      setupClusterConnection("cluster3", "queues", false, 1,  0, 500, isNetty(), 3, 1, 2, 3);
+      setupClusterConnection("cluster3", "queues", MessageLoadBalancingType.ON_DEMAND, 1,  0, 500, isNetty(), 3, 1, 2, 3);
 
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ScaleDownFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ScaleDownFailoverTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ScaleDownFailoverTest.java
index bf98b4b..5358bae 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ScaleDownFailoverTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ScaleDownFailoverTest.java
@@ -22,6 +22,7 @@ import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
 import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
 import org.jboss.byteman.contrib.bmunit.BMRule;
 import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
@@ -60,9 +61,9 @@ public class ScaleDownFailoverTest extends ClusterTestBase
          scaleDownConfiguration3.setGroupName("bill");
       }
       staticServers = servers;
-      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
-      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
-      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
       scaleDownConfiguration.getConnectors().addAll(servers[0].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
       scaleDownConfiguration2.getConnectors().addAll(servers[1].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
       scaleDownConfiguration3.getConnectors().addAll(servers[2].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ScaleDownFailureTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ScaleDownFailureTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ScaleDownFailureTest.java
index 46ea3b1..5c64aa5 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ScaleDownFailureTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ScaleDownFailureTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.extras.byteman;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
 import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
 import org.jboss.byteman.contrib.bmunit.BMRule;
 import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
@@ -44,8 +45,8 @@ public class ScaleDownFailureTest extends ClusterTestBase
          ((LiveOnlyPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration()).setScaleDownConfiguration(scaleDownConfiguration);
          ((LiveOnlyPolicyConfiguration) servers[1].getConfiguration().getHAPolicyConfiguration()).setScaleDownConfiguration(scaleDownConfiguration);
       }
-      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
-      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0);
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
       startServers(0, 1);
       setupSessionFactory(0, isNetty());
       setupSessionFactory(1, isNetty());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java
index 84541e6..cf352fd 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java
@@ -22,6 +22,7 @@ import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
 import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
 import org.apache.activemq.artemis.core.server.cluster.ClusterControl;
 import org.apache.activemq.artemis.core.server.cluster.ClusterController;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
 import org.junit.Before;
 import org.junit.Test;
@@ -48,8 +49,8 @@ public class ClusterControllerTest extends ClusterTestBase
 
       getServer(1).getConfiguration().setClusterPassword("something different");
 
-      setupClusterConnection("cluster0", "queues", false, 1, true, 0);
-      setupClusterConnection("cluster0", "queues", false, 1, true, 1);
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, true, 0);
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, true, 1);
 
       startServers(0);
       startServers(1);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java
index c89ac1b..421a2fb 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 package org.apache.activemq.artemis.tests.integration.cluster.distribution;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.junit.Before;
 
 import org.junit.Test;
@@ -46,8 +47,8 @@ public class ClusterHeadersRemovedTest extends ClusterTestBase
    @Test
    public void testHeadersRemoved() throws Exception
    {
-      setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty(), false);
-      setupClusterConnection("clusterX", 1, -1, "queues", false, 1, isNetty(), false);
+      setupClusterConnection("cluster1", 0, 1, "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), false);
+      setupClusterConnection("clusterX", 1, -1, "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), false);
       startServers(1, 0);
 
       setupSessionFactory(0, isNetty());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
index 9b4017b..371dee4 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
@@ -57,6 +57,7 @@ import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
 import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
 import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
 import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
 import org.apache.activemq.artemis.core.server.group.GroupingHandler;
 import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
@@ -1913,7 +1914,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase
                                          final int nodeFrom,
                                          final int nodeTo,
                                          final String address,
-                                         final boolean forwardWhenNoConsumers,
+                                         final MessageLoadBalancingType messageLoadBalancingType,
                                          final int maxHops,
                                          final boolean netty,
                                          final boolean allowDirectConnectionsOnly)
@@ -1944,7 +1945,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase
          .setAddress(address)
          .setConnectorName(name)
          .setRetryInterval(100)
-         .setForwardWhenNoConsumers(forwardWhenNoConsumers)
+         .setMessageLoadBalancingType(messageLoadBalancingType)
          .setMaxHops(maxHops)
          .setConfirmationWindowSize(1024)
          .setStaticConnectors(pairs)
@@ -1957,7 +1958,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase
                                          final int nodeFrom,
                                          final int nodeTo,
                                          final String address,
-                                         final boolean forwardWhenNoConsumers,
+                                         final MessageLoadBalancingType messageLoadBalancingType,
                                          final int maxHops,
                                          final int reconnectAttempts,
                                          final long retryInterval,
@@ -1991,7 +1992,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase
          .setConnectorName(name)
          .setReconnectAttempts(reconnectAttempts)
          .setRetryInterval(retryInterval)
-         .setForwardWhenNoConsumers(forwardWhenNoConsumers)
+         .setMessageLoadBalancingType(messageLoadBalancingType)
          .setMaxHops(maxHops)
          .setConfirmationWindowSize(1024)
          .setStaticConnectors(pairs)
@@ -2002,7 +2003,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase
 
    protected void setupClusterConnection(final String name,
                                          final String address,
-                                         final boolean forwardWhenNoConsumers,
+                                         final MessageLoadBalancingType messageLoadBalancingType,
                                          final int maxHops,
                                          final boolean netty,
                                          final int nodeFrom,
@@ -2027,7 +2028,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase
       }
       Configuration config = serverFrom.getConfiguration();
       ClusterConnectionConfiguration clusterConf =
-         createClusterConfig(name, address, forwardWhenNoConsumers,
+         createClusterConfig(name, address, messageLoadBalancingType,
                              maxHops,
                              connectorFrom,
                              pairs);
@@ -2037,7 +2038,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase
 
    protected void setupClusterConnection(final String name,
                                          final String address,
-                                         final boolean forwardWhenNoConsumers,
+                                         final MessageLoadBalancingType messageLoadBalancingType,
                                          final int maxHops,
                                          final int reconnectAttempts,
                                          final long retryInterval,
@@ -2072,7 +2073,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase
          .setReconnectAttempts(reconnectAttempts)
          .setCallTimeout(100)
          .setCallFailoverTimeout(100)
-         .setForwardWhenNoConsumers(forwardWhenNoConsumers)
+         .setMessageLoadBalancingType(messageLoadBalancingType)
          .setMaxHops(maxHops)
          .setConfirmationWindowSize(1024)
          .setStaticConnectors(pairs);
@@ -2081,7 +2082,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase
    }
 
    private ClusterConnectionConfiguration createClusterConfig(final String name, final String address,
-                                                              final boolean forwardWhenNoConsumers, final int maxHops,
+                                                              final MessageLoadBalancingType messageLoadBalancingType, final int maxHops,
                                                               TransportConfiguration connectorFrom, List<String> pairs)
    {
       return new ClusterConnectionConfiguration()
@@ -2089,7 +2090,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase
          .setAddress(address)
          .setConnectorName(connectorFrom.getName())
          .setRetryInterval(250)
-         .setForwardWhenNoConsumers(forwardWhenNoConsumers)
+         .setMessageLoadBalancingType(messageLoadBalancingType)
          .setMaxHops(maxHops)
          .setConfirmationWindowSize(1024)
          .setStaticConnectors(pairs);
@@ -2097,7 +2098,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase
 
    protected void setupClusterConnectionWithBackups(final String name,
                                                     final String address,
-                                                    final boolean forwardWhenNoConsumers,
+                                                    final MessageLoadBalancingType messageLoadBalancingType,
                                                     final int maxHops,
                                                     final boolean netty,
                                                     final int nodeFrom,
@@ -2127,7 +2128,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase
          .setAddress(address)
          .setConnectorName(name)
          .setRetryInterval(250)
-         .setForwardWhenNoConsumers(forwardWhenNoConsumers)
+         .setMessageLoadBalancingType(messageLoadBalancingType)
          .setMaxHops(maxHops)
          .setConfirmationWindowSize(1024)
          .setStaticConnectors(pairs);
@@ -2139,7 +2140,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase
                                                   final int node,
                                                   final String discoveryGroupName,
                                                   final String address,
-                                                  final boolean forwardWhenNoConsumers,
+                                                  final MessageLoadBalancingType messageLoadBalancingType,
                                                   final int maxHops,
                                                   final boolean netty)
    {
@@ -2159,7 +2160,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase
          .setConnectorName(name)
          .setRetryInterval(100)
          .setDuplicateDetection(true)
-         .setForwardWhenNoConsumers(forwardWhenNoConsumers)
+         .setMessageLoadBalancingType(messageLoadBalancingType)
          .setMaxHops(maxHops)
          .setConfirmationWindowSize(1024)
          .setDiscoveryGroupName(discoveryGroupName);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterWithBackupTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterWithBackupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterWithBackupTest.java
index 474a953..0e5c2a1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterWithBackupTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterWithBackupTest.java
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 package org.apache.activemq.artemis.tests.integration.cluster.distribution;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.junit.Before;
 
@@ -83,22 +84,22 @@ public class ClusterWithBackupTest extends ClusterTestBase
 
    protected void setupCluster() throws Exception
    {
-      setupCluster(false);
+      setupCluster(MessageLoadBalancingType.ON_DEMAND);
    }
 
-   protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+   protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception
    {
-      setupClusterConnection("cluster0", "queues", forwardWhenNoConsumers, 1, isNetty(), 3, 4, 5);
+      setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 3, 4, 5);
 
-      setupClusterConnection("cluster1", "queues", forwardWhenNoConsumers, 1, isNetty(), 4, 3, 5);
+      setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 4, 3, 5);
 
-      setupClusterConnection("cluster2", "queues", forwardWhenNoConsumers, 1, isNetty(), 5, 3, 4);
+      setupClusterConnection("cluster2", "queues", messageLoadBalancingType, 1, isNetty(), 5, 3, 4);
 
-      setupClusterConnection("cluster0", "queues", forwardWhenNoConsumers, 1, isNetty(), 0, 4, 5);
+      setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 4, 5);
 
-      setupClusterConnection("cluster1", "queues", forwardWhenNoConsumers, 1, isNetty(), 1, 3, 5);
+      setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, 3, 5);
 
-      setupClusterConnection("cluster2", "queues", forwardWhenNoConsumers, 1, isNetty(), 2, 3, 4);
+      setupClusterConnection("cluster2", "queues", messageLoadBalancingType, 1, isNetty(), 2, 3, 4);
    }
 
    protected void setupServers() throws Exception

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredGroupingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredGroupingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredGroupingTest.java
index 9efcae4..d4446a8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredGroupingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredGroupingTest.java
@@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
 import org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.server.group.GroupingHandler;
 import org.apache.activemq.artemis.core.server.group.UnproposalListener;
 import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
@@ -59,11 +60,11 @@ public class ClusteredGroupingTest extends ClusterTestBase
       setupServer(1, isFileStorage(), isNetty());
       setupServer(2, isFileStorage(), isNetty());
 
-      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
 
-      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
 
-      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+      setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
 
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0, 2000, 1000, 100);
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 2000, 1000, 100);
@@ -168,11 +169,11 @@ public class ClusteredGroupingTest extends ClusterTestBase
       setupServer(1, isFileStorage(), isNetty());
       setupServer(2, isFileStorage(), isNetty());
 
-      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
 
-      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
 
-      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+      setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
 
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0, -1, 2000, 500);
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
@@ -261,11 +262,11 @@ public class ClusteredGroupingTest extends ClusterTestBase
       setupServer(1, isFileStorage(), isNetty());
       setupServer(2, isFileStorage(), isNetty());
 
-      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
 
-      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
 
-      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+      setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
 
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
@@ -308,11 +309,11 @@ public class ClusteredGroupingTest extends ClusterTestBase
       setupServer(1, isFileStorage(), isNetty());
       setupServer(2, isFileStorage(), isNetty());
 
-      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
 
-      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
 
-      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+      setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
 
       final int TIMEOUT_GROUPS = 5000;
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0, TIMEOUT_GROUPS, -1, -1);
@@ -424,11 +425,11 @@ public class ClusteredGroupingTest extends ClusterTestBase
       setupServer(1, isFileStorage(), isNetty());
       setupServer(2, isFileStorage(), isNetty());
 
-      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
 
-      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
 
-      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+      setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
 
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0, 1000, 1000, 100);
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 1000, 100, 100);
@@ -515,9 +516,9 @@ public class ClusteredGroupingTest extends ClusterTestBase
       setupServer(1, isFileStorage(), isNetty());
       setupServer(2, isFileStorage(), isNetty());
 
-      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
-      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
-      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
 
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0, 10000, 500, 750);
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 10000, 500, 750);
@@ -823,11 +824,11 @@ public class ClusteredGroupingTest extends ClusterTestBase
       setupServer(1, isFileStorage(), isNetty());
       setupServer(2, isFileStorage(), isNetty());
 
-      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
 
-      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
 
-      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+      setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
 
       int TIMEOUT = 50000;
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0, TIMEOUT);
@@ -904,11 +905,11 @@ public class ClusteredGroupingTest extends ClusterTestBase
       setupServer(1, isFileStorage(), isNetty());
       setupServer(2, isFileStorage(), isNetty());
 
-      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
 
-      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
 
-      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+      setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
 
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
@@ -983,11 +984,11 @@ public class ClusteredGroupingTest extends ClusterTestBase
       setupServer(1, isFileStorage(), isNetty());
       setupServer(2, isFileStorage(), isNetty());
 
-      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
 
-      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
 
-      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+      setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
 
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 0);
 
@@ -1140,11 +1141,11 @@ public class ClusteredGroupingTest extends ClusterTestBase
       setupServer(1, isFileStorage(), isNetty());
       setupServer(2, isFileStorage(), isNetty());
 
-      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
 
-      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
 
-      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+      setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
 
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
@@ -1187,11 +1188,11 @@ public class ClusteredGroupingTest extends ClusterTestBase
       setupServer(1, isFileStorage(), isNetty());
       setupServer(2, isFileStorage(), isNetty());
 
-      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
 
-      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
 
-      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+      setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
 
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
@@ -1238,11 +1239,11 @@ public class ClusteredGroupingTest extends ClusterTestBase
       setupServer(1, isFileStorage(), isNetty());
       setupServer(2, isFileStorage(), isNetty());
 
-      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
 
-      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
 
-      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+      setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
 
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
@@ -1285,11 +1286,11 @@ public class ClusteredGroupingTest extends ClusterTestBase
       setupServer(1, isFileStorage(), isNetty());
       setupServer(2, isFileStorage(), isNetty());
 
-      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
 
-      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
 
-      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+      setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
 
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
@@ -1335,11 +1336,11 @@ public class ClusteredGroupingTest extends ClusterTestBase
       setupServer(1, isFileStorage(), isNetty());
       setupServer(2, isFileStorage(), isNetty());
 
-      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
 
-      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
 
-      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+      setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
 
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
@@ -1381,11 +1382,11 @@ public class ClusteredGroupingTest extends ClusterTestBase
       setupServer(1, isFileStorage(), isNetty());
       setupServer(2, isFileStorage(), isNetty());
 
-      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
 
-      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
 
-      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+      setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
 
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
@@ -1451,11 +1452,11 @@ public class ClusteredGroupingTest extends ClusterTestBase
       setupServer(1, isFileStorage(), isNetty());
       setupServer(2, isFileStorage(), isNetty());
 
-      setupClusterConnection("cluster0", "queues", false, 1, 0, 500, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 0, 1, 2);
 
-      setupClusterConnection("cluster1", "queues", false, 1, 0, 500, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 1, 0, 2);
 
-      setupClusterConnection("cluster2", "queues", false, 1, 0, 500, isNetty(), 2, 0, 1);
+      setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 2, 0, 1);
 
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
@@ -1514,11 +1515,11 @@ public class ClusteredGroupingTest extends ClusterTestBase
       setupServer(1, isFileStorage(), isNetty());
       setupServer(2, isFileStorage(), isNetty());
 
-      setupClusterConnection("cluster0", "queues", false, 1, 0, 500, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 0, 1, 2);
 
-      setupClusterConnection("cluster1", "queues", false, 1, 0, 500, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 1, 0, 2);
 
-      setupClusterConnection("cluster2", "queues", false, 1, 0, 500, isNetty(), 2, 0, 1);
+      setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 2, 0, 1);
 
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
@@ -1577,11 +1578,11 @@ public class ClusteredGroupingTest extends ClusterTestBase
       setupServer(1, isFileStorage(), isNetty());
       setupServer(2, isFileStorage(), isNetty());
 
-      setupClusterConnection("cluster0", "queues", false, 1, 0, 500, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 0, 1, 2);
 
-      setupClusterConnection("cluster1", "queues", false, 1, 0, 500, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 1, 0, 2);
 
-      setupClusterConnection("cluster2", "queues", false, 1, 0, 500, isNetty(), 2, 0, 1);
+      setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 2, 0, 1);
 
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
@@ -1635,11 +1636,11 @@ public class ClusteredGroupingTest extends ClusterTestBase
       setupServer(1, isFileStorage(), isNetty());
       setupServer(2, isFileStorage(), isNetty());
 
-      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
 
-      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
 
-      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+      setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
@@ -1686,11 +1687,11 @@ public class ClusteredGroupingTest extends ClusterTestBase
       setupServer(1, isFileStorage(), isNetty());
       setupServer(2, isFileStorage(), isNetty());
 
-      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
 
-      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
 
-      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+      setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java
index d6512f4..9dc4123 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.tests.integration.cluster.distribution;
 
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -129,20 +130,20 @@ public class ClusteredRequestResponseTest extends ClusterTestBase
 
    protected void setupCluster() throws Exception
    {
-      setupCluster(false);
+      setupCluster(MessageLoadBalancingType.ON_DEMAND);
    }
 
-   protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+   protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception
    {
-      setupClusterConnection("cluster0", "queues", forwardWhenNoConsumers, 1, isNetty(), 0, 1, 2, 3, 4);
+      setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 1, 2, 3, 4);
 
-      setupClusterConnection("cluster1", "queues", forwardWhenNoConsumers, 1, isNetty(), 1, 0, 2, 3, 4);
+      setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, 0, 2, 3, 4);
 
-      setupClusterConnection("cluster2", "queues", forwardWhenNoConsumers, 1, isNetty(), 2, 0, 1, 3, 4);
+      setupClusterConnection("cluster2", "queues", messageLoadBalancingType, 1, isNetty(), 2, 0, 1, 3, 4);
 
-      setupClusterConnection("cluster3", "queues", forwardWhenNoConsumers, 1, isNetty(), 3, 0, 1, 2, 4);
+      setupClusterConnection("cluster3", "queues", messageLoadBalancingType, 1, isNetty(), 3, 0, 1, 2, 4);
 
-      setupClusterConnection("cluster4", "queues", forwardWhenNoConsumers, 1, isNetty(), 4, 0, 1, 2, 3);
+      setupClusterConnection("cluster4", "queues", messageLoadBalancingType, 1, isNetty(), 4, 0, 1, 2, 3);
    }
 
    protected void setupServers() throws Exception

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageLoadBalancingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageLoadBalancingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageLoadBalancingTest.java
new file mode 100644
index 0000000..3cd9e61
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageLoadBalancingTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.distribution;
+
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MessageLoadBalancingTest extends ClusterTestBase
+{
+   @Override
+   @Before
+   public void setUp() throws Exception
+   {
+      super.setUp();
+
+      start();
+   }
+
+   private void start() throws Exception
+   {
+      setupServers();
+
+      setRedistributionDelay(0);
+   }
+
+   protected boolean isNetty()
+   {
+      return false;
+   }
+
+   @Test
+   public void testMessageLoadBalancingOff() throws Exception
+   {
+      setupCluster(MessageLoadBalancingType.OFF);
+
+      startServers(0, 1);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+
+      createQueue(0, "queues.testaddress", "queue0", null, false);
+      createQueue(1, "queues.testaddress", "queue0", null, false);
+
+      addConsumer(1, 1, "queue0", null);
+
+      waitForBindings(0, "queues.testaddress", 1, 0, true);
+      waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+      waitForBindings(0, "queues.testaddress", 1, 1, false);
+      waitForBindings(1, "queues.testaddress", 1, 0, false);
+
+      addConsumer(0, 0, "queue0", null);
+
+      waitForBindings(0, "queues.testaddress", 1, 1, true);
+      waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+      waitForBindings(0, "queues.testaddress", 1, 1, false);
+      waitForBindings(1, "queues.testaddress", 1, 1, false);
+
+      send(0, "queues.testaddress", 10, false, null);
+
+      ClientMessage message = getConsumer(1).receive(1000);
+      Assert.assertNull(message);
+
+      for (int i = 0; i < 10; i++)
+      {
+         message = getConsumer(0).receive(5000);
+         Assert.assertNotNull("" + i, message);
+         message.acknowledge();
+      }
+
+      ClientMessage clientMessage = getConsumer(0).receiveImmediate();
+      Assert.assertNull(clientMessage);
+   }
+
+   protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception
+   {
+      setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 1);
+
+      setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, 0);
+   }
+
+   protected void setRedistributionDelay(final long delay)
+   {
+      AddressSettings as = new AddressSettings().setRedistributionDelay(delay);
+
+      getServer(0).getAddressSettingsRepository().addMatch("queues.*", as);
+      getServer(1).getAddressSettingsRepository().addMatch("queues.*", as);
+   }
+
+   protected void setupServers() throws Exception
+   {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+   }
+
+   protected void stopServers() throws Exception
+   {
+      closeAllConsumers();
+
+      closeAllSessionFactories();
+
+      closeAllServerLocatorsFactories();
+
+      stopServers(0, 1);
+
+      clearServer(0, 1);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
index ebdac36..d294e75 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
@@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.artemis.core.message.impl.MessageImpl;
 import org.apache.activemq.artemis.core.server.Bindable;
@@ -69,7 +70,7 @@ public class MessageRedistributionTest extends ClusterTestBase
    @Test
    public void testRedistributionWithMessageGroups() throws Exception
    {
-      setupCluster(false);
+      setupCluster(MessageLoadBalancingType.ON_DEMAND);
 
       MessageRedistributionTest.log.info("Doing test");
 
@@ -174,7 +175,7 @@ public class MessageRedistributionTest extends ClusterTestBase
    @Test
    public void testRedistributionStopsWhenConsumerAdded() throws Exception
    {
-      setupCluster(false);
+      setupCluster(MessageLoadBalancingType.ON_DEMAND);
 
       MessageRedistributionTest.log.info("Doing test");
 
@@ -212,7 +213,7 @@ public class MessageRedistributionTest extends ClusterTestBase
    @Test
    public void testRedistributionWhenConsumerIsClosed() throws Exception
    {
-      setupCluster(false);
+      setupCluster(MessageLoadBalancingType.ON_DEMAND);
 
       MessageRedistributionTest.log.info("Doing test");
 
@@ -254,7 +255,7 @@ public class MessageRedistributionTest extends ClusterTestBase
    @Test
    public void testRedistributionWhenConsumerIsClosedDifferentQueues() throws Exception
    {
-      setupCluster(false);
+      setupCluster(MessageLoadBalancingType.ON_DEMAND);
 
       startServers(0, 1, 2);
 
@@ -334,7 +335,7 @@ public class MessageRedistributionTest extends ClusterTestBase
    @Test
    public void testRedistributionWhenConsumerIsClosedNotConsumersOnAllNodes() throws Exception
    {
-      setupCluster(false);
+      setupCluster(MessageLoadBalancingType.ON_DEMAND);
 
       startServers(0, 1, 2);
 
@@ -371,7 +372,7 @@ public class MessageRedistributionTest extends ClusterTestBase
    public void testNoRedistributionWhenConsumerIsClosedForwardWhenNoConsumersTrue() throws Exception
    {
       // x
-      setupCluster(true);
+      setupCluster(MessageLoadBalancingType.STRICT);
 
       startServers(0, 1, 2);
 
@@ -423,7 +424,7 @@ public class MessageRedistributionTest extends ClusterTestBase
    @Test
    public void testNoRedistributionWhenConsumerIsClosedNoConsumersOnOtherNodes() throws Exception
    {
-      setupCluster(false);
+      setupCluster(MessageLoadBalancingType.ON_DEMAND);
 
       startServers(0, 1, 2);
 
@@ -473,7 +474,7 @@ public class MessageRedistributionTest extends ClusterTestBase
    @Test
    public void testRedistributeWithScheduling() throws Exception
    {
-      setupCluster(false);
+      setupCluster(MessageLoadBalancingType.ON_DEMAND);
 
       AddressSettings setting = new AddressSettings().setRedeliveryDelay(10000);
       servers[0].getAddressSettingsRepository().addMatch("queues.testaddress", setting);
@@ -583,7 +584,7 @@ public class MessageRedistributionTest extends ClusterTestBase
    @Test
    public void testRedistributionWhenConsumerIsClosedQueuesWithFilters() throws Exception
    {
-      setupCluster(false);
+      setupCluster(MessageLoadBalancingType.ON_DEMAND);
 
       startServers(0, 1, 2);
 
@@ -624,7 +625,7 @@ public class MessageRedistributionTest extends ClusterTestBase
    @Test
    public void testRedistributionWhenConsumerIsClosedConsumersWithFilters() throws Exception
    {
-      setupCluster(false);
+      setupCluster(MessageLoadBalancingType.ON_DEMAND);
 
       startServers(0, 1, 2);
 
@@ -665,7 +666,7 @@ public class MessageRedistributionTest extends ClusterTestBase
    @Test
    public void testRedistributionWhenRemoteConsumerIsAdded() throws Exception
    {
-      setupCluster(false);
+      setupCluster(MessageLoadBalancingType.ON_DEMAND);
 
       startServers(0, 1, 2);
 
@@ -702,7 +703,7 @@ public class MessageRedistributionTest extends ClusterTestBase
    {
       for (int i = 0; i < 10; i++)
       {
-         setupCluster(false);
+         setupCluster(MessageLoadBalancingType.ON_DEMAND);
 
          startServers(0, 1, 2);
 
@@ -812,7 +813,7 @@ public class MessageRedistributionTest extends ClusterTestBase
       }
       for (int i = 0; i < 10; i++)
       {
-         setupCluster(false);
+         setupCluster(MessageLoadBalancingType.ON_DEMAND);
 
          startServers(0, 1);
 
@@ -877,7 +878,7 @@ public class MessageRedistributionTest extends ClusterTestBase
    @Test
    public void testRedistributionToQueuesWhereNotAllMessagesMatch() throws Exception
    {
-      setupCluster(false);
+      setupCluster(MessageLoadBalancingType.ON_DEMAND);
 
       startServers(0, 1, 2);
 
@@ -919,7 +920,7 @@ public class MessageRedistributionTest extends ClusterTestBase
       final long delay = 1000;
       setRedistributionDelay(delay);
 
-      setupCluster(false);
+      setupCluster(MessageLoadBalancingType.ON_DEMAND);
 
       startServers(0, 1, 2);
 
@@ -959,7 +960,7 @@ public class MessageRedistributionTest extends ClusterTestBase
       final long delay = 1000;
       setRedistributionDelay(delay);
 
-      setupCluster(false);
+      setupCluster(MessageLoadBalancingType.ON_DEMAND);
 
       startServers(0, 1, 2);
 
@@ -999,7 +1000,7 @@ public class MessageRedistributionTest extends ClusterTestBase
    @Test
    public void testRedistributionNumberOfMessagesGreaterThanBatchSize() throws Exception
    {
-      setupCluster(false);
+      setupCluster(MessageLoadBalancingType.ON_DEMAND);
 
       startServers(0, 1, 2);
 
@@ -1037,7 +1038,7 @@ public class MessageRedistributionTest extends ClusterTestBase
    @Test
    public void testRedistributionWhenNewNodeIsAddedWithConsumer() throws Exception
    {
-      setupCluster(false);
+      setupCluster(MessageLoadBalancingType.ON_DEMAND);
 
       startServers(0);
 
@@ -1069,7 +1070,7 @@ public class MessageRedistributionTest extends ClusterTestBase
    @Test
    public void testRedistributionWithPagingOnTarget() throws Exception
    {
-      setupCluster(false);
+      setupCluster(MessageLoadBalancingType.ON_DEMAND);
 
       AddressSettings as = new AddressSettings()
               .setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE)
@@ -1136,13 +1137,13 @@ public class MessageRedistributionTest extends ClusterTestBase
       session1.close();
    }
 
-   protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+   protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception
    {
-      setupClusterConnection("cluster0", "queues", forwardWhenNoConsumers, 1, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 1, 2);
 
-      setupClusterConnection("cluster1", "queues", forwardWhenNoConsumers, 1, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, 0, 2);
 
-      setupClusterConnection("cluster2", "queues", forwardWhenNoConsumers, 1, isNetty(), 2, 0, 1);
+      setupClusterConnection("cluster2", "queues", messageLoadBalancingType, 1, isNetty(), 2, 0, 1);
    }
 
    protected void setRedistributionDelay(final long delay)

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionWithDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionWithDiscoveryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionWithDiscoveryTest.java
index ae6df0c..af15179 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionWithDiscoveryTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionWithDiscoveryTest.java
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 package org.apache.activemq.artemis.tests.integration.cluster.distribution;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.junit.Assert;
 import org.junit.Before;
@@ -53,22 +54,22 @@ public class MessageRedistributionWithDiscoveryTest extends ClusterTestBase
 
    protected void setupCluster() throws Exception
    {
-      setupCluster(false);
+      setupCluster(MessageLoadBalancingType.ON_DEMAND);
    }
 
-   protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+   protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception
    {
       for (int i = 0; i < 5; i++)
       {
-         setServer(forwardWhenNoConsumers, i);
+         setServer(messageLoadBalancingType, i);
       }
    }
 
    /**
-    * @param forwardWhenNoConsumers
+    * @param messageLoadBalancingType
     * @throws Exception
     */
-   protected void setServer(final boolean forwardWhenNoConsumers, int server) throws Exception
+   protected void setServer(final MessageLoadBalancingType messageLoadBalancingType, int server) throws Exception
    {
       setupLiveServerWithDiscovery(server,
                                    groupAddress,
@@ -83,7 +84,7 @@ public class MessageRedistributionWithDiscoveryTest extends ClusterTestBase
 
       servers[server].getAddressSettingsRepository().addMatch("#", setting);
 
-      setupDiscoveryClusterConnection("cluster" + server, server, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+      setupDiscoveryClusterConnection("cluster" + server, server, "dg1", "queues", messageLoadBalancingType, 1, isNetty());
    }
 
    @Test
@@ -145,7 +146,7 @@ public class MessageRedistributionWithDiscoveryTest extends ClusterTestBase
       servers[0].stop();
       servers[0] = null;
 
-      setServer(false, 0);
+      setServer(MessageLoadBalancingType.ON_DEMAND, 0);
 
       startServers(1, 2);
 


Mime
View raw message