activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r386507 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/BrokerService.java network/ConnectionFilter.java network/DiscoveryNetworkConnector.java network/NetworkConnector.java
Date Fri, 17 Mar 2006 02:24:30 GMT
Author: chirino
Date: Thu Mar 16 18:24:29 2006
New Revision: 386507

URL: http://svn.apache.org/viewcvs?rev=386507&view=rev
Log:
Fix for http://jira.activemq.org/jira/browse/AMQ-639

The NetworkConnector was not removing the old bridge for the map of bridges that he was mantinaing,
so when the remote broker came back up, then the bridge was not restarted.

Also add a new ConnectionFilter interface which allows the broker to avoid establishing loopback
connections.

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConnectionFilter.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=386507&r1=386506&r2=386507&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Thu Mar 16 18:24:29 2006
@@ -47,6 +47,7 @@
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.network.ConnectionFilter;
 import org.apache.activemq.network.DiscoveryNetworkConnector;
 import org.apache.activemq.network.NetworkConnector;
 import org.apache.activemq.network.jms.JmsConnector;
@@ -224,6 +225,24 @@
         map.put("network", "true");
         uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
         connector.setLocalUri(uri);
+        
+        // Set a connection filter so that the connector does not establish loop back connections.
+        connector.setConnectionFilter(new ConnectionFilter() {
+            public boolean connectTo(URI location) {
+                List transportConnectors = getTransportConnectors();
+                for (Iterator iter = transportConnectors.iterator(); iter.hasNext();) {
+                    try {
+                        TransportConnector tc = (TransportConnector) iter.next();
+                        if( location.equals(tc.getConnectUri()) ) {
+                            return false;
+                        }
+                    } catch (Throwable e) {
+                    }
+                }
+                return true;
+            }
+        });
+        
         networkConnectors.add(connector);
         if (isUseJmx()) {
             registerNetworkConnectorMBean(connector);

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConnectionFilter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConnectionFilter.java?rev=386507&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConnectionFilter.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConnectionFilter.java
Thu Mar 16 18:24:29 2006
@@ -0,0 +1,16 @@
+package org.apache.activemq.network;
+
+import java.net.URI;
+
+/**
+ * Abstraction that allows you to control which brokers a NetworkConnector connects bridges
to.
+ * 
+ * @version $Revision$
+ */
+public interface ConnectionFilter {
+    /**
+     * @param location
+     * @return true if the network connector should establish a connection to the specified
location.
+     */
+    boolean connectTo(URI location);
+}

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?rev=386507&r1=386506&r2=386507&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
Thu Mar 16 18:24:29 2006
@@ -18,6 +18,7 @@
 
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.DiscoveryEvent;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
@@ -44,7 +45,7 @@
 
     private DiscoveryAgent discoveryAgent;
     private ConcurrentHashMap bridges = new ConcurrentHashMap();
-
+    
     public DiscoveryNetworkConnector() {
     }
 
@@ -69,8 +70,11 @@
                 return;
             }
 
-            // Has it allready been added?
-            if (bridges.containsKey(uri) || localURI.equals(uri))
+            // Should we try to connect to that URI?
+            if (    bridges.containsKey(uri) 
+                    || localURI.equals(uri) 
+                    || (connectionFilter!=null && !connectionFilter.connectTo(uri))
+                    )
                 return;
 
             URI connectUri = uri;
@@ -131,7 +135,7 @@
                 return;
             }
 
-            Bridge bridge = (Bridge) bridges.get(uri);
+            Bridge bridge = (Bridge) bridges.remove(uri);
             if (bridge == null)
                 return;
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java?rev=386507&r1=386506&r2=386507&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
Thu Mar 16 18:24:29 2006
@@ -18,6 +18,7 @@
 
 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
 
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
@@ -50,6 +51,9 @@
     private boolean decreaseNetworkConsumerPriority;
     private int networkTTL = 1;
     private String name = "bridge";
+    private int prefetchSize = 1000;
+    private boolean dispatchAsync = true;
+    protected ConnectionFilter connectionFilter;
 
     public NetworkConnector() {
     }
@@ -234,6 +238,8 @@
         result.setLocalBrokerName(getBrokerName());
         result.setName(getBrokerName());
         result.setNetworkTTL(getNetworkTTL());
+        result.setPrefetchSize(prefetchSize);
+        result.setDispatchAsync(dispatchAsync);
         result.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority());
 
         List destsList = getDynamicallyIncludedDestinations();
@@ -271,5 +277,29 @@
 
     protected Transport createLocalTransport() throws Exception {
         return TransportFactory.connect(localURI);
+    }
+
+    public boolean isDispatchAsync() {
+        return dispatchAsync;
+    }
+
+    public void setDispatchAsync(boolean dispatchAsync) {
+        this.dispatchAsync = dispatchAsync;
+    }
+
+    public int getPrefetchSize() {
+        return prefetchSize;
+    }
+
+    public void setPrefetchSize(int prefetchSize) {
+        this.prefetchSize = prefetchSize;
+    }
+
+    public ConnectionFilter getConnectionFilter() {
+        return connectionFilter;
+    }
+
+    public void setConnectionFilter(ConnectionFilter connectionFilter) {
+        this.connectionFilter = connectionFilter;
     }
 }



Mime
View raw message