activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r390369 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network: DemandForwardingBridgeSupport.java NetworkConnector.java
Date Fri, 31 Mar 2006 08:54:19 GMT
Author: rajdavies
Date: Fri Mar 31 00:54:04 2006
New Revision: 390369

URL: http://svn.apache.org/viewcvs?rev=390369&view=rev
Log:
patch for https://issues.apache.org/activemq/browse/AMQ-672 - supplied by Mike Gerdes

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=390369&r1=390368&r2=390369&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Fri Mar 31 00:54:04 2006
@@ -22,7 +22,6 @@
 
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQTempDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -32,7 +31,6 @@
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.DataStructure;
-import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.ExceptionResponse;
 import org.apache.activemq.command.KeepAliveInfo;
 import org.apache.activemq.command.Message;
@@ -57,7 +55,6 @@
 import org.apache.commons.logging.LogFactory;
 
 import java.io.IOException;
-import javax.jms.TemporaryTopic;
 
 /**
  * A useful base class for implementing demand forwarding bridges.
@@ -77,6 +74,8 @@
     protected String localBrokerName;
     protected String remoteBrokerName;
     protected String localClientId;
+    protected String userName;
+    protected String password;
     protected int prefetchSize = 1000;
     protected boolean dispatchAsync;
     protected String destinationFilter = ">";
@@ -176,6 +175,8 @@
             localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
             localClientId="NC_"+remoteBrokerName+"_inbound"+name;
             localConnectionInfo.setClientId(localClientId);
+            localConnectionInfo.setUserName(userName);
+            localConnectionInfo.setPassword(password);
             localBroker.oneway(localConnectionInfo);
 
             localSessionInfo=new SessionInfo(localConnectionInfo,1);
@@ -194,6 +195,8 @@
             remoteConnectionInfo=new ConnectionInfo();
             remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
             remoteConnectionInfo.setClientId("NC_"+localBrokerName+"_outbound"+name);
+            remoteConnectionInfo.setUserName(userName);
+            remoteConnectionInfo.setPassword(password);
             remoteBroker.oneway(remoteConnectionInfo);
 
             BrokerInfo brokerInfo=new BrokerInfo();
@@ -214,13 +217,7 @@
                             +destinationFilter));
             demandConsumerInfo.setPrefetchSize(prefetchSize);
             remoteBroker.oneway(demandConsumerInfo);
-            
-            //we want infomation about Destinations as well
-            ConsumerInfo destinationInfo  = new ConsumerInfo(remoteSessionInfo,2);
-            destinationInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
-            destinationInfo.setPrefetchSize(prefetchSize);
-            remoteBroker.oneway(destinationInfo);
-            
+
             startedLatch.countDown();
         }
     }
@@ -331,32 +328,6 @@
                 if(log.isTraceEnabled())
                     log.trace("Ignoring sub " + info + " already subscribed to matching destination");
             }
-        }else if (data.getClass()==DestinationInfo.class){
-//          It's a destination info - we want to pass up
-            //infomation about temporary destinations 
-            DestinationInfo destInfo = (DestinationInfo) data;
-            BrokerId[] path=destInfo.getBrokerPath();
-            if((path!=null&&path.length>= networkTTL)){
-                if(log.isTraceEnabled())
-                    log.trace("Ignoring Subscription " + destInfo + " restricted to " + networkTTL
+ " network hops only");
-                return;
-            }
-            if(contains(destInfo.getBrokerPath(),localBrokerPath[0])){
-                // Ignore this consumer as it's a consumer we locally sent to the broker.
-                if(log.isTraceEnabled())
-                    log.trace("Ignoring sub " + destInfo + " already routed through this
broker once");
-                return;
-            }
-            
-            destInfo.setConnectionId(localConnectionInfo.getConnectionId());
-            if (destInfo.getDestination() instanceof ActiveMQTempDestination){
-                //re-set connection id so comes from here
-                ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
-                tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
-            }
-            destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(),getRemoteBrokerPath()));
-            localBroker.oneway(destInfo);
-            
         }
         if(data.getClass()==RemoveInfo.class){
             ConsumerId id=(ConsumerId) ((RemoveInfo) data).getObjectId();
@@ -374,8 +345,7 @@
             localBroker.oneway(sub.getLocalInfo());
         }
     }
-    
-    
+
     protected void removeSubscription(DemandSubscription sub) throws IOException {
         if(sub!=null){
             subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
@@ -768,7 +738,21 @@
     protected abstract void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException;
 
     protected abstract void serviceRemoteBrokerInfo(Command command) throws IOException;
-    
-    protected abstract BrokerId[] getRemoteBrokerPath();
+
+	public String getPassword() {
+		return password;
+	}
+
+	public void setPassword(String password) {
+		this.password = password;
+	}
+
+	public String getUserName() {
+		return userName;
+	}
+
+	public void setUserName(String userName) {
+		this.userName = userName;
+	}
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java?rev=390369&r1=390368&r2=390369&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
Fri Mar 31 00:54:04 2006
@@ -52,6 +52,8 @@
     private String name = "bridge";
     private int prefetchSize = 1000;
     private boolean dispatchAsync = true;
+    private String userName;
+    private String password;
     protected ConnectionFilter connectionFilter;
 
     public NetworkConnector() {
@@ -237,6 +239,8 @@
         result.setLocalBrokerName(getBrokerName());
         result.setName(getBrokerName());
         result.setNetworkTTL(getNetworkTTL());
+        result.setUserName(userName);
+        result.setPassword(password);
         result.setPrefetchSize(prefetchSize);
         result.setDispatchAsync(dispatchAsync);
         result.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority());
@@ -301,4 +305,20 @@
     public void setConnectionFilter(ConnectionFilter connectionFilter) {
         this.connectionFilter = connectionFilter;
     }
+
+	public String getPassword() {
+		return password;
+	}
+
+	public void setPassword(String password) {
+		this.password = password;
+	}
+
+	public String getUserName() {
+		return userName;
+	}
+
+	public void setUserName(String userName) {
+		this.userName = userName;
+	}
 }



Mime
View raw message