activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r921821 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
Date Thu, 11 Mar 2010 12:17:10 GMT
Author: rajdavies
Date: Thu Mar 11 12:17:10 2010
New Revision: 921821

URL: http://svn.apache.org/viewvc?rev=921821&view=rev
Log:
Add a cluster update filter

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=921821&r1=921820&r2=921821&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
Thu Mar 11 12:17:10 2010
@@ -16,11 +16,14 @@
  */
 package org.apache.activemq.broker;
 
+import static org.apache.activemq.thread.DefaultThreadPools.getDefaultTaskRunnerFactory;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Iterator;
+import java.util.StringTokenizer;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.regex.Pattern;
 import javax.management.ObjectName;
 import org.apache.activemq.broker.jmx.ManagedTransportConnector;
 import org.apache.activemq.broker.jmx.ManagementContext;
@@ -28,7 +31,6 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.ConnectionControl;
 import org.apache.activemq.security.MessageAuthorizationPolicy;
-import org.apache.activemq.thread.DefaultThreadPools;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportAcceptListener;
@@ -41,23 +43,13 @@ import org.apache.activemq.util.ServiceS
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-
-import static org.apache.activemq.thread.DefaultThreadPools.*;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Iterator;
-import java.util.concurrent.CopyOnWriteArrayList;
-import javax.management.ObjectName;
-
 /**
  * @org.apache.xbean.XBean
  * @version $Revision: 1.6 $
  */
 public class TransportConnector implements Connector, BrokerServiceAware {
 
-    private static final Log LOG = LogFactory.getLog(TransportConnector.class);
+    final Log LOG = LogFactory.getLog(TransportConnector.class);
 
     protected CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>();
     protected TransportStatusDetector statusDector;
@@ -75,9 +67,10 @@ public class TransportConnector implemen
     private boolean disableAsyncDispatch;
     private boolean enableStatusMonitor = false;
     private Broker broker;
-    private boolean updateClusterClients=false;
+    private boolean updateClusterClients = false;
     private boolean rebalanceClusterClients;
-    
+    private String updateClusterFilter;
+
     public TransportConnector() {
     }
 
@@ -93,7 +86,6 @@ public class TransportConnector implemen
 
     }
 
-
     /**
      * @return Returns the connections.
      */
@@ -105,7 +97,8 @@ public class TransportConnector implemen
      * Factory method to create a JMX managed version of this transport
      * connector
      */
-    public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName
connectorName) throws IOException, URISyntaxException {
+    public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName
connectorName)
+            throws IOException, URISyntaxException {
         ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName,
getServer());
         rc.setBrokerInfo(getBrokerInfo());
         rc.setConnectUri(getConnectUri());
@@ -130,15 +123,16 @@ public class TransportConnector implemen
     public void setBrokerInfo(BrokerInfo brokerInfo) {
         this.brokerInfo = brokerInfo;
     }
-    
+
     /**
      * 
-     * @deprecated use the {@link #setBrokerService(BrokerService)} method instead.
+     * @deprecated use the {@link #setBrokerService(BrokerService)} method
+     *             instead.
      */
     @Deprecated
     public void setBrokerName(String name) {
-        if (this.brokerInfo==null) {
-            this.brokerInfo=new BrokerInfo();
+        if (this.brokerInfo == null) {
+            this.brokerInfo = new BrokerInfo();
         }
         this.brokerInfo.setBrokerName(name);
     }
@@ -204,7 +198,6 @@ public class TransportConnector implemen
     }
 
     public void start() throws Exception {
-        TransportServer server = getServer();  
         broker = brokerService.getBroker();
         brokerInfo.setBrokerName(broker.getBrokerName());
         brokerInfo.setBrokerId(broker.getBrokerId());
@@ -214,7 +207,7 @@ public class TransportConnector implemen
         server.setAcceptListener(new TransportAcceptListener() {
             public void onAccept(final Transport transport) {
                 try {
-                    getDefaultTaskRunnerFactory().execute(new Runnable(){
+                    getDefaultTaskRunnerFactory().execute(new Runnable() {
                         public void run() {
                             try {
                                 Connection connection = createConnection(transport);
@@ -237,13 +230,14 @@ public class TransportConnector implemen
             }
 
             private void onAcceptError(Exception error, String remoteHost) {
-                LOG.error("Could not accept connection " + (remoteHost == null ? "" : "from
" + remoteHost) + ": " + error);
+                LOG.error("Could not accept connection " + (remoteHost == null ? "" : "from
" + remoteHost) + ": "
+                        + error);
                 LOG.debug("Reason: " + error, error);
             }
         });
         server.setBrokerInfo(brokerInfo);
         server.start();
-        
+
         DiscoveryAgent da = getDiscoveryAgent();
         if (da != null) {
             da.registerService(getPublishableConnectString());
@@ -258,15 +252,16 @@ public class TransportConnector implemen
     }
 
     private String getPublishableConnectString() throws Exception {
-        URI connectUri = getConnectUri();
-        String publishableConnectString = connectUri.toString();
-        // strip off server side query parameters which may not be compatible to clients
-        if (connectUri.getRawQuery() != null) {
-            publishableConnectString = 
-                publishableConnectString.substring(0, publishableConnectString.indexOf(connectUri.getRawQuery())
-1); 
+        URI theConnectURI = getConnectUri();
+        String publishableConnectString = theConnectURI.toString();
+        // strip off server side query parameters which may not be compatible to
+        // clients
+        if (theConnectURI.getRawQuery() != null) {
+            publishableConnectString = publishableConnectString.substring(0, publishableConnectString
+                    .indexOf(theConnectURI.getRawQuery()) - 1);
         }
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Publishing: " + publishableConnectString + " for broker transport
URI: " + connectUri);
+            LOG.debug("Publishing: " + publishableConnectString + " for broker transport
URI: " + theConnectURI);
         }
         return publishableConnectString;
     }
@@ -295,7 +290,8 @@ public class TransportConnector implemen
     // Implementation methods
     // -------------------------------------------------------------------------
     protected Connection createConnection(Transport transport) throws IOException {
-        TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch
? null : taskRunnerFactory);
+        TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch
? null
+                : taskRunnerFactory);
         boolean statEnabled = this.getStatistics().isEnabled();
         answer.getStatistics().setEnabled(statEnabled);
         answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy);
@@ -307,9 +303,10 @@ public class TransportConnector implemen
             throw new IllegalArgumentException("You must specify either a server or uri property");
         }
         if (brokerService == null) {
-            throw new IllegalArgumentException("You must specify the brokerService property.
Maybe this connector should be added to a broker?");
+            throw new IllegalArgumentException(
+                    "You must specify the brokerService property. Maybe this connector should
be added to a broker?");
         }
-      	return TransportFactory.bind(brokerService, uri);
+        return TransportFactory.bind(brokerService, uri);
     }
 
     public DiscoveryAgent getDiscoveryAgent() throws IOException {
@@ -381,44 +378,70 @@ public class TransportConnector implemen
         }
         return rc;
     }
-    
+
     protected ConnectionControl getConnectionControl() {
         boolean rebalance = isRebalanceClusterClients();
-            String connectedBrokers = "";
-            String self = "";
-            if (brokerService.getDefaultSocketURI() != null) {
-                self += brokerService.getDefaultSocketURI().toString();
-                self += ",";
-            }
-            if (rebalance == false) {
-                connectedBrokers += self;
-            }
-            if (this.broker.getPeerBrokerInfos() != null) {
+        String connectedBrokers = "";
+        String self = "";
+        if (brokerService.getDefaultSocketURI() != null) {
+            self += brokerService.getDefaultSocketURI().toString();
+            self += ",";
+        }
+        if (rebalance == false) {
+            connectedBrokers += self;
+        }
+        if (this.broker.getPeerBrokerInfos() != null) {
             for (BrokerInfo info : this.broker.getPeerBrokerInfos()) {
-                connectedBrokers += info.getBrokerURL();
-                connectedBrokers += ",";
-            }
-            }
-            if (rebalance) {
-                connectedBrokers += self;
+                if (isMatchesClusterFilter(info.getBrokerName())) {
+                    connectedBrokers += info.getBrokerURL();
+                    connectedBrokers += ",";
+                }
             }
+        }
+        if (rebalance) {
+            connectedBrokers += self;
+        }
+
+        ConnectionControl control = new ConnectionControl();
+        control.setConnectedBrokers(connectedBrokers);
+        control.setRebalanceConnection(rebalance);
+        return control;
 
-            ConnectionControl control = new ConnectionControl();
-            control.setConnectedBrokers(connectedBrokers);
-            control.setRebalanceConnection(rebalance);
-            return control;
-        
     }
-    
+
     public void updateClientClusterInfo() {
         if (isRebalanceClusterClients() || isUpdateClusterClients()) {
             ConnectionControl control = getConnectionControl();
-            for (Connection c: this.connections) {
+            for (Connection c : this.connections) {
                 c.updateClient(control);
             }
         }
     }
 
+    private boolean isMatchesClusterFilter(String brokerName) {
+        boolean result = true;
+        String filter = getUpdateClusterFilter();
+        if (filter != null) {
+            filter = filter.trim();
+            if (filter.length() > 0) {
+                StringTokenizer tokenizer = new StringTokenizer(filter, ",");
+                while (result && tokenizer.hasMoreTokens()) {
+                    String token = tokenizer.nextToken();
+                    result = isMatchesClusterFilter(brokerName, token);
+                }
+            }
+        }
+        return result;
+    }
+
+    private boolean isMatchesClusterFilter(String brokerName, String match) {
+        boolean result = true;
+        if (brokerName != null && match != null && brokerName.length() >
0 && match.length() > 0) {
+            result = Pattern.matches(match, brokerName);
+        }
+        return result;
+    }
+
     public boolean isDisableAsyncDispatch() {
         return disableAsyncDispatch;
     }
@@ -435,7 +458,8 @@ public class TransportConnector implemen
     }
 
     /**
-     * @param enableStatusMonitor the enableStatusMonitor to set
+     * @param enableStatusMonitor
+     *            the enableStatusMonitor to set
      */
     public void setEnableStatusMonitor(boolean enableStatusMonitor) {
         this.enableStatusMonitor = enableStatusMonitor;
@@ -452,9 +476,9 @@ public class TransportConnector implemen
         return broker;
     }
 
-	public BrokerService getBrokerService() {
-		return brokerService;
-	}
+    public BrokerService getBrokerService() {
+        return brokerService;
+    }
 
     /**
      * @return the updateClusterClients
@@ -464,7 +488,8 @@ public class TransportConnector implemen
     }
 
     /**
-     * @param updateClusterClients the updateClusterClients to set
+     * @param updateClusterClients
+     *            the updateClusterClients to set
      */
     public void setUpdateClusterClients(boolean updateClusterClients) {
         this.updateClusterClients = updateClusterClients;
@@ -478,9 +503,25 @@ public class TransportConnector implemen
     }
 
     /**
-     * @param rebalanceClusterClients the rebalanceClusterClients to set
+     * @param rebalanceClusterClients
+     *            the rebalanceClusterClients to set
      */
     public void setRebalanceClusterClients(boolean rebalanceClusterClients) {
         this.rebalanceClusterClients = rebalanceClusterClients;
     }
+
+    /**
+     * @return the updateClusterFilter
+     */
+    public String getUpdateClusterFilter() {
+        return this.updateClusterFilter;
+    }
+
+    /**
+     * @param updateClusterFilter
+     *            the updateClusterFilter to set
+     */
+    public void setUpdateClusterFilter(String updateClusterFilter) {
+        this.updateClusterFilter = updateClusterFilter;
+    }
 }



Mime
View raw message