activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r1311237 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker: TransportConnection.java TransportConnector.java region/RegionBroker.java
Date Mon, 09 Apr 2012 14:07:03 GMT
Author: dejanb
Date: Mon Apr  9 14:07:03 2012
New Revision: 1311237

URL: http://svn.apache.org/viewvc?rev=1311237&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3706 - use round-robin instead of rnd for cluster
rebalancing

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1311237&r1=1311236&r2=1311237&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Mon Apr  9 14:07:03 2012
@@ -712,6 +712,9 @@ public class TransportConnection impleme
             // send ConnectionCommand
             ConnectionControl command = this.connector.getConnectionControl();
             command.setFaultTolerant(broker.isFaultTolerantConfiguration());
+            if (info.isFailoverReconnect()) {
+                command.setRebalanceConnection(false);
+            }
             dispatchAsync(command);
         }
         return null;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=1311237&r1=1311236&r2=1311237&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
Mon Apr  9 14:07:03 2012
@@ -16,19 +16,6 @@
  */
 package org.apache.activemq.broker;
 
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Random;
-import java.util.StringTokenizer;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.regex.Pattern;
-
-import javax.management.ObjectName;
-
 import org.apache.activemq.broker.jmx.ManagedTransportConnector;
 import org.apache.activemq.broker.jmx.ManagementContext;
 import org.apache.activemq.broker.region.ConnectorStatistics;
@@ -48,6 +35,16 @@ import org.apache.activemq.util.ServiceS
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.management.ObjectName;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.StringTokenizer;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.regex.Pattern;
+
 /**
  * @org.apache.xbean.XBean
  *
@@ -78,7 +75,7 @@ public class TransportConnector implemen
     private String updateClusterFilter;
     private boolean auditNetworkProducers = false;
 
-    Random rnd = new Random(System.currentTimeMillis());
+    LinkedList<String> peerBrokers = new LinkedList<String>();
 
     public TransportConnector() {
     }
@@ -410,23 +407,17 @@ public class TransportConnector implemen
         String separator = "";
 
         if (isUpdateClusterClients()) {
-            ArrayList<String> uris = new ArrayList<String>();
-            uris.add(brokerService.getDefaultSocketURIString());
-            for (BrokerInfo info: broker.getPeerBrokerInfos()) {
-                if (isMatchesClusterFilter(info.getBrokerName())) {
-                    if (info.getBrokerURL() != null) {
-                        uris.add(info.getBrokerURL());
-                    }
+            synchronized (peerBrokers) {
+                for (String uri : getPeerBrokers()) {
+                    connectedBrokers += separator + uri;
+                    separator = ",";
                 }
-            }
-            if (rebalance) {
-                Collections.shuffle(uris, rnd);
-            }
-            for (String uri: uris) {
-                connectedBrokers += separator + uri;
-                separator = ",";
-            }
 
+                if (rebalance) {
+                    String shuffle = getPeerBrokers().removeFirst();
+                    getPeerBrokers().addLast(shuffle);
+                }
+            }
         }
         ConnectionControl control = new ConnectionControl();
         control.setConnectedBrokers(connectedBrokers);
@@ -434,8 +425,32 @@ public class TransportConnector implemen
         return control;
 
     }
+    
+    public void addPeerBroker(BrokerInfo info) {
+        if (isMatchesClusterFilter(info.getBrokerName())) {
+            synchronized (peerBrokers) {
+                getPeerBrokers().addLast(info.getBrokerURL());
+            }
+        }
+    }
+    
+    public void removePeerBroker(BrokerInfo info) {
+        synchronized (peerBrokers) {
+            getPeerBrokers().remove(info.getBrokerURL());
+        }
+    }
+
+    public LinkedList<String> getPeerBrokers() {
+        synchronized (peerBrokers) {
+            if (peerBrokers.isEmpty()) {
+                peerBrokers.add(brokerService.getDefaultSocketURIString());
+            }
+            return peerBrokers;
+        }
+    }
 
     public void updateClientClusterInfo() {
+
         if (isRebalanceClusterClients() || isUpdateClusterClients()) {
             ConnectionControl control = getConnectionControl();
             for (Connection c : this.connections) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1311237&r1=1311236&r2=1311237&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Mon Apr  9 14:07:03 2012
@@ -16,20 +16,6 @@
  */
 package org.apache.activemq.broker.region;
 
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import javax.jms.InvalidClientIDException;
-import javax.jms.JMSException;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.Connection;
@@ -37,27 +23,10 @@ import org.apache.activemq.broker.Connec
 import org.apache.activemq.broker.ConsumerBrokerExchange;
 import org.apache.activemq.broker.EmptyBroker;
 import org.apache.activemq.broker.ProducerBrokerExchange;
-import org.apache.activemq.broker.TransportConnection;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.BrokerId;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerControl;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.MessageDispatchNotification;
-import org.apache.activemq.command.MessagePull;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveSubscriptionInfo;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.*;
 import org.apache.activemq.state.ConnectionState;
 import org.apache.activemq.store.kahadb.plist.PListStore;
 import org.apache.activemq.thread.Scheduler;
@@ -71,6 +40,21 @@ import org.apache.activemq.util.ServiceS
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.InvalidClientIDException;
+import javax.jms.JMSException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 /**
  * Routes Broker operations to the correct messaging regions for processing.
  *
@@ -649,7 +633,7 @@ public class RegionBroker extends EmptyB
         if (LOG.isDebugEnabled()) {
             LOG.debug(getBrokerName() + " addBroker:" + info.getBrokerName() + " brokerInfo
size : " + brokerInfos.size());
         }
-        addBrokerInClusterUpdate();
+        addBrokerInClusterUpdate(info);
     }
 
     @Override
@@ -662,7 +646,7 @@ public class RegionBroker extends EmptyB
             if (LOG.isDebugEnabled()) {
                 LOG.debug(getBrokerName() + " removeBroker:" + info.getBrokerName() + " brokerInfo
size : " + brokerInfos.size());
             }
-            removeBrokerInClusterUpdate();
+            removeBrokerInClusterUpdate(info);
         }
     }
 
@@ -913,19 +897,21 @@ public class RegionBroker extends EmptyB
         }
     }
 
-    protected void addBrokerInClusterUpdate() {
+    protected void addBrokerInClusterUpdate(BrokerInfo info) {
         List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
         for (TransportConnector connector : connectors) {
             if (connector.isUpdateClusterClients()) {
+                connector.addPeerBroker(info);
                 connector.updateClientClusterInfo();
             }
         }
     }
 
-    protected void removeBrokerInClusterUpdate() {
+    protected void removeBrokerInClusterUpdate(BrokerInfo info) {
         List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
         for (TransportConnector connector : connectors) {
             if (connector.isUpdateClusterClients() && connector.isUpdateClusterClientsOnRemove())
{
+                connector.removePeerBroker(info);
                 connector.updateClientClusterInfo();
             }
         }



Mime
View raw message