activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r373863 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/command/ main/java/org/apache/activemq/network/ main/java/org/apache/...
Date Tue, 31 Jan 2006 16:35:25 GMT
Author: rajdavies
Date: Tue Jan 31 08:35:13 2006
New Revision: 373863

URL: http://svn.apache.org/viewcvs?rev=373863&view=rev
Log:
Fixes for networks and the invalid Brokers caper

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Command.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/KeepAliveInfo.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ShutdownInfo.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=373863&r1=373862&r2=373863&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Tue Jan 31 08:35:13 2006
@@ -47,6 +47,7 @@
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.vm.VMTransportFactory;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.JMXSupport;
 import org.apache.activemq.util.ServiceStopper;
@@ -190,7 +191,7 @@
      * @throws Exception
      */
     public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception{
-        NetworkConnector connector=new NetworkConnector();
+        NetworkConnector connector=new NetworkConnector(this);
         // add the broker name to the parameters if not set
         connector.setUri(discoveryAddress);
         return addNetworkConnector(connector);
@@ -219,7 +220,6 @@
         map.put("network", "true");
         uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
         connector.setLocalUri(uri);
-        connector.setBrokerName(getBrokerName());
         networkConnectors.add(connector);
         if (isUseJmx()) {
             registerNetworkConnectorMBean(connector);
@@ -356,6 +356,8 @@
         }
         log.info("ActiveMQ Message Broker (" + getBrokerName() + ") is shutting down");
         BrokerRegistry.getInstance().unbind(getBrokerName());
+        //remove any VMTransports connected
+        VMTransportFactory.stopped(getBrokerName());
 
         removeShutdownHook();
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=373863&r1=373862&r2=373863&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Tue Jan 31 08:35:13 2006
@@ -147,7 +147,7 @@
         }
         synchronized (clientIdSet ) {
             if (clientIdSet.containsKey(clientId)) {
-                throw new InvalidClientIDException("Client: " + clientId + " already connected");
+                throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected");
             }
             else {
                 clientIdSet.put(clientId, info);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java?rev=373863&r1=373862&r2=373863&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java Tue Jan 31 08:35:13 2006
@@ -64,6 +64,10 @@
     public boolean isMessageDispatchNotification(){
         return false;
     }
+    
+    public boolean isShutdownInfo(){
+        return false;
+    }
 
     /**
      * @openwire:property version=1

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Command.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Command.java?rev=373863&r1=373862&r2=373863&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Command.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Command.java Tue Jan 31 08:35:13 2006
@@ -43,6 +43,7 @@
     boolean isMessage();
     boolean isMessageAck();
     boolean isMessageDispatchNotification();
+    boolean isShutdownInfo();
     
     Response visit( CommandVisitor visitor) throws Throwable;
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/KeepAliveInfo.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/KeepAliveInfo.java?rev=373863&r1=373862&r2=373863&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/KeepAliveInfo.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/KeepAliveInfo.java Tue Jan 31 08:35:13 2006
@@ -79,5 +79,9 @@
     public boolean isMessageDispatchNotification(){
         return false;
     }
+    
+    public boolean isShutdownInfo(){
+        return false;
+    }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ShutdownInfo.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ShutdownInfo.java?rev=373863&r1=373862&r2=373863&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ShutdownInfo.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ShutdownInfo.java Tue Jan 31 08:35:13 2006
@@ -34,6 +34,10 @@
     public Response visit(CommandVisitor visitor) throws Throwable {
         return visitor.processShutdown( this );
     }
+    
+    public boolean isShutdownInfo(){
+        return true;
+    }
 
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java?rev=373863&r1=373862&r2=373863&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java Tue Jan 31 08:35:13 2006
@@ -172,5 +172,9 @@
     public boolean isMessageDispatchNotification(){
         return false;
     }
+    
+    public boolean isShutdownInfo(){
+        return false;
+    }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?rev=373863&r1=373862&r2=373863&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java Tue Jan 31 08:35:13 2006
@@ -1,26 +1,22 @@
 /**
- *
+ * 
  * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.activemq.network;
 
 import java.io.IOException;
-
 import javax.jms.JMSException;
-
 import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -53,393 +49,415 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 /**
- * Forwards messages from the local broker to the remote broker based on 
- * demand.
+ * Forwards messages from the local broker to the remote broker based on demand.
  * 
  * @org.xbean.XBean
  * 
  * @version $Revision$
  */
-public class DemandForwardingBridge implements Bridge {
-
-    static final private Log log = LogFactory.getLog(DemandForwardingBridge.class);
-    
+public class DemandForwardingBridge implements Bridge{
+    static final private Log log=LogFactory.getLog(DemandForwardingBridge.class);
     private final Transport localBroker;
     private final Transport remoteBroker;
-    
-    IdGenerator idGenerator = new IdGenerator();
-    LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
-    
-    ConnectionInfo connectionInfo;
-    SessionInfo sessionInfo;
-    ProducerInfo producerInfo;
-    
-    private String clientId;
+    private IdGenerator idGenerator=new IdGenerator();
+    private LongSequenceGenerator consumerIdGenerator=new LongSequenceGenerator();
+    private ConnectionInfo localConnectionInfo;
+    private ConnectionInfo remoteConnectionInfo;
+    private SessionInfo localSessionInfo;
+    private ProducerInfo producerInfo;
+    private String localBrokerName;
+    private String remoteBrokerName;
+    private String localClientId;
     private int prefetchSize=1000;
     private boolean dispatchAsync;
-    private String destinationFilter = ">";
-    
+    private String destinationFilter=">";
     private ConsumerInfo demandConsumerInfo;
     private int demandConsumerDispatched;
-    
+    private AtomicBoolean localBridgeStarted=new AtomicBoolean(false);
+    private AtomicBoolean remoteBridgeStarted=new AtomicBoolean(false);
+    private boolean disposed=false;
     BrokerId localBrokerId;
     BrokerId remoteBrokerId;
-    
-    private static class DemandSubscription {
+    private static class DemandSubscription{
         ConsumerInfo remoteInfo;
         ConsumerInfo localInfo;
         int dispatched;
-        
-        public DemandSubscription(ConsumerInfo info) {
-            remoteInfo = info;
-            localInfo = info.copy();
+
+        public DemandSubscription(ConsumerInfo info){
+            remoteInfo=info;
+            localInfo=info.copy();
         }
     }
-    
-    ConcurrentHashMap subscriptionMapByLocalId = new ConcurrentHashMap();
-    ConcurrentHashMap subscriptionMapByRemoteId = new ConcurrentHashMap();
-    
-    protected final BrokerId localBrokerPath[] = new BrokerId[] {null};
-    protected final BrokerId remoteBrokerPath[] = new BrokerId[] {null};
-    
-    public DemandForwardingBridge(Transport localBroker, Transport remoteBroker) {
-        this.localBroker = localBroker;
-        this.remoteBroker = remoteBroker;
+    ConcurrentHashMap subscriptionMapByLocalId=new ConcurrentHashMap();
+    ConcurrentHashMap subscriptionMapByRemoteId=new ConcurrentHashMap();
+    protected final BrokerId localBrokerPath[]=new BrokerId[] { null };
+    protected final BrokerId remoteBrokerPath[]=new BrokerId[] { null };
+
+    public DemandForwardingBridge(Transport localBroker,Transport remoteBroker){
+        this.localBroker=localBroker;
+        this.remoteBroker=remoteBroker;
     }
 
-    public void start() throws Exception {
-        log.info("Starting a network connection between " + localBroker + " and " + remoteBroker + " has been established.");
-
+    public void start() throws Exception{
+        log.info("Starting a network connection between "+localBroker+" and "+remoteBroker+" has been established.");
         localBroker.setTransportListener(new TransportListener(){
-            public void onCommand(Command command) {
+            public void onCommand(Command command){
                 serviceLocalCommand(command);
             }
-            public void onException(IOException error) {
+
+            public void onException(IOException error){
                 serviceLocalException(error);
             }
         });
-        
         remoteBroker.setTransportListener(new TransportListener(){
-            public void onCommand(Command command) {
+            public void onCommand(Command command){
                 serviceRemoteCommand(command);
             }
-            public void onException(IOException error) {
+
+            public void onException(IOException error){
                 serviceRemoteException(error);
             }
         });
-        
         localBroker.start();
         remoteBroker.start();
-        
+        triggerRemoteStartBridge();
     }
 
-    protected void triggerStartBridge() throws IOException {
-        Thread thead = new Thread() {
-            public void run() {
-                try {
-                    startBridge();
+    protected void triggerLocalStartBridge() throws IOException{
+        Thread thead=new Thread(){
+            public void run(){
+                try{
+                    startLocalBridge();
+                }catch(IOException e){
+                    log.error("Failed to start network bridge: "+e,e);
                 }
-                catch (IOException e) {
-                    log.error("Failed to start network bridge: " + e, e);
+            }
+        };
+        thead.start();
+    }
+
+    protected void triggerRemoteStartBridge() throws IOException{
+        Thread thead=new Thread(){
+            public void run(){
+                try{
+                    startRemoteBridge();
+                }catch(IOException e){
+                    log.error("Failed to start network bridge: "+e,e);
                 }
             }
         };
         thead.start();
     }
-    
-    protected void startBridge() throws IOException {
-        BrokerInfo brokerInfo = new BrokerInfo();
-        remoteBroker.oneway(brokerInfo);
-        connectionInfo = new ConnectionInfo();
-        connectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
-        connectionInfo.setClientId(clientId);
-        localBroker.oneway(connectionInfo);
-        remoteBroker.oneway(connectionInfo);
-
-        sessionInfo=new SessionInfo(connectionInfo, 1);
-        localBroker.oneway(sessionInfo);
-        remoteBroker.oneway(sessionInfo);
-        
-        producerInfo = new ProducerInfo(sessionInfo, 1);
-        producerInfo.setResponseRequired(false);
-        remoteBroker.oneway(producerInfo);
-
-        // Listen to consumer advisory messages on the remote broker to determine demand.
-        demandConsumerInfo = new ConsumerInfo(sessionInfo, 1);
-        demandConsumerInfo.setDispatchAsync(dispatchAsync);
-        demandConsumerInfo.setDestination(new ActiveMQTopic(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX+destinationFilter));
-        demandConsumerInfo.setPrefetchSize(prefetchSize);
-        remoteBroker.oneway(demandConsumerInfo);
-        
-        log.info("Network connection between " + localBroker + " and " + remoteBroker + " has been established.");
-    }
-    
-    public void stop() throws Exception{        
-        try {
-            if( connectionInfo!=null ) {
-                localBroker.request(connectionInfo.createRemoveCommand());
-                remoteBroker.request(connectionInfo.createRemoveCommand());
-            }
-            localBroker.setTransportListener(null);
-            remoteBroker.setTransportListener(null);
-            remoteBroker.oneway(new ShutdownInfo());
-            localBroker.oneway(new ShutdownInfo());
-        }catch(IOException e){
-          log.debug("Caught exception stopping",e);
-        } finally {
-            ServiceStopper ss = new ServiceStopper();
-            ss.stop(localBroker);
-            ss.stop(remoteBroker);
-            ss.throwFirstException();
+
+    protected void startLocalBridge() throws IOException{
+        if(localBridgeStarted.compareAndSet(false,true)){
+            localConnectionInfo=new ConnectionInfo();
+            localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
+            localClientId="NC_"+remoteBrokerName+"_inbound";
+            localConnectionInfo.setClientId(localClientId);
+            localBroker.oneway(localConnectionInfo);
+            localSessionInfo=new SessionInfo(localConnectionInfo,1);
+            localBroker.oneway(localSessionInfo);
+            log.info("Network connection between "+localBroker+" and "+remoteBroker+"("+remoteBrokerName
+                            +") has been established.");
         }
     }
-    
-    protected void serviceRemoteException(IOException error) {
-        log.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: "+error.getMessage(), error);
-        ServiceSupport.dispose(this);
+
+    protected void startRemoteBridge() throws IOException{
+        if(remoteBridgeStarted.compareAndSet(false,true)){
+            BrokerInfo brokerInfo=new BrokerInfo();
+            brokerInfo.setBrokerName(localBrokerName);
+            remoteBroker.oneway(brokerInfo);
+            remoteConnectionInfo=new ConnectionInfo();
+            remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
+            remoteConnectionInfo.setClientId("NC_"+localBrokerName+"_outbound");
+            remoteBroker.oneway(remoteConnectionInfo);
+            SessionInfo remoteSessionInfo=new SessionInfo(remoteConnectionInfo,1);
+            remoteBroker.oneway(remoteSessionInfo);
+            producerInfo=new ProducerInfo(remoteSessionInfo,1);
+            producerInfo.setResponseRequired(false);
+            remoteBroker.oneway(producerInfo);
+            // Listen to consumer advisory messages on the remote broker to determine demand.
+            demandConsumerInfo=new ConsumerInfo(remoteSessionInfo,1);
+            demandConsumerInfo.setDispatchAsync(dispatchAsync);
+            demandConsumerInfo.setDestination(new ActiveMQTopic(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX
+                            +destinationFilter));
+            demandConsumerInfo.setPrefetchSize(prefetchSize);
+            remoteBroker.oneway(demandConsumerInfo);
+        }
     }
-    
-    protected void serviceRemoteCommand(Command command) {
-        try {
-            if( command.isMessageDispatch() ) {
-                MessageDispatch md = (MessageDispatch) command;
-                serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
-                demandConsumerDispatched++;
-                if( demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize()*.75) ) {
-                    remoteBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched));
-                    demandConsumerDispatched=0;
+
+    public void stop() throws Exception{
+        if(!disposed){
+            try{
+                disposed=true;
+                localBridgeStarted.set(false);
+                remoteBridgeStarted.set(false);
+                if(localConnectionInfo!=null){
+                    localBroker.request(localConnectionInfo.createRemoveCommand());
+                    remoteBroker.request(remoteConnectionInfo.createRemoveCommand());
                 }
-            } else if ( command.isBrokerInfo() ) {
-                synchronized( this ) {
-                    remoteBrokerId = ((BrokerInfo)command).getBrokerId();
-                    remoteBrokerPath[0] = remoteBrokerId;
-                    if( localBrokerId !=null) {
-                        if( localBrokerId.equals(remoteBrokerId) ) {
-                            log.info("Disconnecting loop back connection.");
-                            ServiceSupport.dispose(this);
-                        } else {
-                            triggerStartBridge();                            
+                localBroker.setTransportListener(null);
+                remoteBroker.setTransportListener(null);
+                remoteBroker.oneway(new ShutdownInfo());
+                localBroker.oneway(new ShutdownInfo());
+            }catch(IOException e){
+                log.debug("Caught exception stopping",e);
+            }finally{
+                ServiceStopper ss=new ServiceStopper();
+                ss.stop(localBroker);
+                ss.stop(remoteBroker);
+                ss.throwFirstException();
+            }
+        }
+    }
+
+    protected void serviceRemoteException(IOException error){
+        log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),error);
+        ServiceSupport.dispose(this);
+    }
+
+    protected void serviceRemoteCommand(Command command){
+        if(!disposed){
+            try{
+                if(command.isMessageDispatch()){
+                    MessageDispatch md=(MessageDispatch) command;
+                    serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
+                    demandConsumerDispatched++;
+                    if(demandConsumerDispatched>(demandConsumerInfo.getPrefetchSize()*.75)){
+                        remoteBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,demandConsumerDispatched));
+                        demandConsumerDispatched=0;
+                    }
+                }else if(command.isBrokerInfo()){
+                    synchronized(this){
+                        BrokerInfo remoteBrokerInfo=(BrokerInfo) command;
+                        remoteBrokerId=remoteBrokerInfo.getBrokerId();
+                        remoteBrokerPath[0]=remoteBrokerId;
+                        remoteBrokerName=remoteBrokerInfo.getBrokerName();
+                        if(localBrokerId!=null){
+                            if(localBrokerId.equals(remoteBrokerId)){
+                                log.info("Disconnecting loop back connection.");
+                                ServiceSupport.dispose(this);
+                            }else{
+                                triggerLocalStartBridge();
+                            }
                         }
                     }
-                }
-            } else {
-                switch ( command.getDataStructureType() ) {
+                }else{
+                    switch(command.getDataStructureType()){
                     case WireFormatInfo.DATA_STRUCTURE_TYPE:
-                    break;
+                        break;
                     default:
                         log.warn("Unexpected remote command: "+command);
+                    }
                 }
+            }catch(IOException e){
+                serviceRemoteException(e);
             }
-        } catch (IOException e) {
-            serviceRemoteException(e);
         }
     }
 
-    private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
-        if( data.getClass() == ConsumerInfo.class ) {
-                       
+    private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException{
+        if(data.getClass()==ConsumerInfo.class){
             // Create a new local subscription
-            ConsumerInfo info = (ConsumerInfo) data;
-            BrokerId[] path = info.getBrokerPath();
-
-            if( (path!=null && path.length>0) || info.isNetworkSubscription() ) {
-                // Ignore:  We only support directly connected brokers for now.
+            ConsumerInfo info=(ConsumerInfo) data;
+            BrokerId[] path=info.getBrokerPath();
+            if((path!=null&&path.length>0)||info.isNetworkSubscription()){
+                // Ignore: We only support directly connected brokers for now.
                 return;
             }
-            if( contains(info.getBrokerPath(), localBrokerPath[0]) ) {
+            if(contains(info.getBrokerPath(),localBrokerPath[0])){
                 // Ignore this consumer as it's a consumer we locally sent to the broker.
                 return;
             }
-            
-            if( log.isTraceEnabled() )
-                log.trace("Forwarding sub on " + localBroker + " from " + remoteBroker + " on  "+info);
-
-            
+            if(log.isTraceEnabled())
+                log.trace("Forwarding sub on "+localBroker+" from "+remoteBrokerName+" :  "+info);
             // Update the packet to show where it came from.
-            info = info.copy();
-            info.setBrokerPath( appendToBrokerPath(info.getBrokerPath(), remoteBrokerPath) );
-                
-            DemandSubscription sub  = new DemandSubscription(info);
-            sub.localInfo.setConsumerId( new ConsumerId(sessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()) );
+            info=info.copy();
+            info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(),remoteBrokerPath));
+            DemandSubscription sub=new DemandSubscription(info);
+            sub.localInfo.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator
+                            .getNextSequenceId()));
             sub.localInfo.setDispatchAsync(dispatchAsync);
             sub.localInfo.setPrefetchSize(prefetchSize);
-            byte priority = ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
-            if( priority > Byte.MIN_VALUE && info.getBrokerPath()!=null && info.getBrokerPath().length>1 ) {
+            byte priority=ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
+            if(priority>Byte.MIN_VALUE&&info.getBrokerPath()!=null&&info.getBrokerPath().length>1){
                 // The longer the path to the consumer, the less it's consumer priority.
-                priority -= info.getBrokerPath().length+1;
+                priority-=info.getBrokerPath().length+1;
             }
             sub.localInfo.setPriority(priority);
-            subscriptionMapByLocalId.put(sub.localInfo.getConsumerId(), sub);
-            subscriptionMapByRemoteId.put(sub.remoteInfo.getConsumerId(), sub);
+            subscriptionMapByLocalId.put(sub.localInfo.getConsumerId(),sub);
+            subscriptionMapByRemoteId.put(sub.remoteInfo.getConsumerId(),sub);
             sub.localInfo.setBrokerPath(info.getBrokerPath());
             sub.localInfo.setNetworkSubscription(true);
             // This works for now since we use a VM connection to the local broker.
             // may need to change if we ever subscribe to a remote broker.
             sub.localInfo.setAdditionalPredicate(new BooleanExpression(){
-                public boolean matches(MessageEvaluationContext message) throws JMSException {
-                    try {
+                public boolean matches(MessageEvaluationContext message) throws JMSException{
+                    try{
                         return matchesForwardingFilter(message.getMessage());
-                    } catch (IOException e) {
+                    }catch(IOException e){
                         throw JMSExceptionSupport.create(e);
                     }
                 }
-                public Object evaluate(MessageEvaluationContext message) throws JMSException {
-                    return matches(message) ? Boolean.TRUE : Boolean.FALSE;
+
+                public Object evaluate(MessageEvaluationContext message) throws JMSException{
+                    return matches(message)?Boolean.TRUE:Boolean.FALSE;
                 }
             });
-                        
-            localBroker.oneway(sub.localInfo);            
+            localBroker.oneway(sub.localInfo);
         }
-        if( data.getClass() == RemoveInfo.class ) {
-            ConsumerId id = (ConsumerId) ((RemoveInfo)data).getObjectId();
-            DemandSubscription sub = (DemandSubscription)subscriptionMapByRemoteId.remove(id);
-            if( sub !=null ) {
+        if(data.getClass()==RemoveInfo.class){
+            ConsumerId id=(ConsumerId) ((RemoveInfo) data).getObjectId();
+            DemandSubscription sub=(DemandSubscription) subscriptionMapByRemoteId.remove(id);
+            if(sub!=null){
                 subscriptionMapByLocalId.remove(sub.localInfo.getConsumerId());
                 localBroker.oneway(sub.localInfo.createRemoveCommand());
             }
         }
     }
 
-    protected void serviceLocalException(Throwable error) {
-        log.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: "+error.getMessage(), error);
+    protected void serviceLocalException(Throwable error){
+        log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),error);
         ServiceSupport.dispose(this);
     }
 
-    boolean matchesForwardingFilter(Message message) {
-        if( message.isRecievedByDFBridge() || contains(message.getBrokerPath(), remoteBrokerPath[0]) )
+    boolean matchesForwardingFilter(Message message){
+        if(message.isRecievedByDFBridge()||contains(message.getBrokerPath(),remoteBrokerPath[0]))
             return false;
-
         // Don't propagate advisory messages about network subscriptions
-        if( message.isAdvisory() 
-                && message.getDataStructure()!=null
-                && message.getDataStructure().getDataStructureType()==CommandTypes.CONSUMER_INFO) {
-            
+        if(message.isAdvisory()&&message.getDataStructure()!=null
+                        &&message.getDataStructure().getDataStructureType()==CommandTypes.CONSUMER_INFO){
             ConsumerInfo info=(ConsumerInfo) message.getDataStructure();
-            if(info.isNetworkSubscription()) {
+            if(info.isNetworkSubscription()){
                 return false;
             }
         }
         return true;
     }
-    
-    protected void serviceLocalCommand(Command command) {
-        final boolean trace = log.isTraceEnabled();
-        try {
-            if( command.isMessageDispatch() ) {
-                MessageDispatch md = (MessageDispatch) command;
-                Message message = md.getMessage();
-                DemandSubscription sub = (DemandSubscription)subscriptionMapByLocalId.get(md.getConsumerId());
-                if( sub!=null ) {
-                   
-                    message = message.copy();
-                    
-                    // Update the packet to show where it came from.
-                    message.setBrokerPath( appendToBrokerPath(message.getBrokerPath(), localBrokerPath) );
-
-                    message.setProducerId(producerInfo.getProducerId());
-                    message.setDestination( md.getDestination() );
-                    
-                    if( message.getOriginalTransactionId()==null )
-                        message.setOriginalTransactionId(message.getTransactionId());
-                    message.setTransactionId(null);
-                    message.setRecievedByDFBridge(true);
-                    message.evictMarshlledForm();
-
-                    if( trace )
-                        log.trace("bridging " + localBroker + " -> " + remoteBroker + ": "+message);
-                    if (!message.isPersistent() || !sub.remoteInfo.isDurable()){
-                    remoteBroker.oneway( message );
-                    }else{
-                        Response response = remoteBroker.request(message);
-                        if (response.isException()) {
-                            ExceptionResponse er = (ExceptionResponse) response;
-                            serviceLocalException(er.getException());
-                            
+
+    protected void serviceLocalCommand(Command command){
+        if(!disposed){
+            final boolean trace=log.isTraceEnabled();
+            try{
+                if(command.isMessageDispatch()){
+                    MessageDispatch md=(MessageDispatch) command;
+                    Message message=md.getMessage();
+                    DemandSubscription sub=(DemandSubscription) subscriptionMapByLocalId.get(md.getConsumerId());
+                    if(sub!=null){
+                        message=message.copy();
+                        // Update the packet to show where it came from.
+                        message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(),localBrokerPath));
+                        message.setProducerId(producerInfo.getProducerId());
+                        message.setDestination(md.getDestination());
+                        if(message.getOriginalTransactionId()==null)
+                            message.setOriginalTransactionId(message.getTransactionId());
+                        message.setTransactionId(null);
+                        message.setRecievedByDFBridge(true);
+                        message.evictMarshlledForm();
+                        if(trace)
+                            log.trace("bridging "+localBrokerName+" -> "+remoteBrokerName+": "+message);
+                        if(!message.isPersistent()||!sub.remoteInfo.isDurable()){
+                            remoteBroker.oneway(message);
+                        }else{
+                            Response response=remoteBroker.request(message);
+                            if(response.isException()){
+                                ExceptionResponse er=(ExceptionResponse) response;
+                                serviceLocalException(er.getException());
+                            }
+                        }
+                        sub.dispatched++;
+                        if(sub.dispatched>(sub.localInfo.getPrefetchSize()*.75)){
+                            localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,sub.dispatched));
+                            sub.dispatched=0;
                         }
                     }
-                    sub.dispatched++;
-                    if( sub.dispatched > (sub.localInfo.getPrefetchSize()*.75) ) {
-                        localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, sub.dispatched));
-                        sub.dispatched=0;
-                    } 
-                                                     
-                }
-            } else if ( command.isBrokerInfo() ) {
-                synchronized( this ) {
-                    localBrokerId = ((BrokerInfo)command).getBrokerId();
-                    localBrokerPath[0] = localBrokerId;
-                    if( remoteBrokerId !=null  ) {
-                        if( remoteBrokerId.equals(localBrokerId) ) {
-                            log.info("Disconnecting loop back connection.");
-                            ServiceSupport.dispose(this);
-                        } else {
-                            triggerStartBridge();                            
+                }else if(command.isBrokerInfo()){
+                    synchronized(this){
+                        localBrokerId=((BrokerInfo) command).getBrokerId();
+                        localBrokerPath[0]=localBrokerId;
+                        if(remoteBrokerId!=null){
+                            if(remoteBrokerId.equals(localBrokerId)){
+                                log.info("Disconnecting loop back connection.");
+                                ServiceSupport.dispose(this);
+                            }
                         }
                     }
+                }else if(command.isShutdownInfo()){
+                    log.info(localBrokerName+" Shutting down");
+                    disposed = true;
+                    stop();
+                }else{
+                    switch(command.getDataStructureType()){
+                    case WireFormatInfo.DATA_STRUCTURE_TYPE:
+                        break;
+                    default:
+                        log.warn("Unexpected local command: "+command);
+                    }
                 }
-            } else {
-                switch ( command.getDataStructureType() ) {
-                case WireFormatInfo.DATA_STRUCTURE_TYPE:
-                break;
-                default:
-                    log.warn("Unexpected local command: "+command);
-                }
+            }catch(Exception e){
+                serviceLocalException(e);
             }
-        } catch (Exception e) {
-            serviceLocalException(e);
         }
     }
 
-    public String getClientId() {
-        return clientId;
+    public int getPrefetchSize(){
+        return prefetchSize;
     }
 
-    public void setClientId(String clientId) {
-        this.clientId = clientId;
+    public void setPrefetchSize(int prefetchSize){
+        this.prefetchSize=prefetchSize;
     }
 
-    public int getPrefetchSize() {
-        return prefetchSize;
+    public boolean isDispatchAsync(){
+        return dispatchAsync;
     }
 
-    public void setPrefetchSize(int prefetchSize) {
-        this.prefetchSize = prefetchSize;
+    public void setDispatchAsync(boolean dispatchAsync){
+        this.dispatchAsync=dispatchAsync;
     }
 
-    public boolean isDispatchAsync() {
-        return dispatchAsync;
+    public String getDestinationFilter(){
+        return destinationFilter;
     }
 
-    public void setDispatchAsync(boolean dispatchAsync) {
-        this.dispatchAsync = dispatchAsync;
+    public void setDestinationFilter(String destinationFilter){
+        this.destinationFilter=destinationFilter;
     }
 
-    public String getDestinationFilter() {
-        return destinationFilter;
+    /**
+     * @return Returns the localBrokerName.
+     */
+    public String getLocalBrokerName(){
+        return localBrokerName;
     }
-    public void setDestinationFilter(String destinationFilter) {
-        this.destinationFilter = destinationFilter;
+
+    /**
+     * @param localBrokerName
+     *            The localBrokerName to set.
+     */
+    public void setLocalBrokerName(String localBrokerName){
+        this.localBrokerName=localBrokerName;
     }
-    
-    private boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
-        if( brokerPath!=null ) {
-            for (int i = 0; i < brokerPath.length; i++) {
-                if( brokerId.equals(brokerPath[i]) )
+
+    private boolean contains(BrokerId[] brokerPath,BrokerId brokerId){
+        if(brokerPath!=null){
+            for(int i=0;i<brokerPath.length;i++){
+                if(brokerId.equals(brokerPath[i]))
                     return true;
             }
         }
         return false;
     }
-    private BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId pathsToAppend[]) {
-        if( brokerPath == null || brokerPath.length==0 )
+
+    private BrokerId[] appendToBrokerPath(BrokerId[] brokerPath,BrokerId pathsToAppend[]){
+        if(brokerPath==null||brokerPath.length==0)
             return pathsToAppend;
-        
-        BrokerId rc[] = new BrokerId[brokerPath.length+pathsToAppend.length];
+        BrokerId rc[]=new BrokerId[brokerPath.length+pathsToAppend.length];
         System.arraycopy(brokerPath,0,rc,0,brokerPath.length);
         System.arraycopy(pathsToAppend,0,rc,brokerPath.length,pathsToAppend.length);
         return rc;
     }
-
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java?rev=373863&r1=373862&r2=373863&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java Tue Jan 31 08:35:13 2006
@@ -21,6 +21,7 @@
 import java.net.URISyntaxException;
 
 import org.apache.activemq.Service;
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.DiscoveryEvent;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
@@ -41,17 +42,19 @@
 public class NetworkConnector implements Service, DiscoveryListener {
 
     private static final Log log = LogFactory.getLog(NetworkConnector.class);
+    private BrokerService brokerService;
     private DiscoveryAgent discoveryAgent;
     private URI localURI;
 
     private ConcurrentHashMap bridges = new ConcurrentHashMap();
-    private String brokerName;
     boolean failover=true;
     
-    public NetworkConnector() {
+    public NetworkConnector(BrokerService service) {
+        this.brokerService = service;
     }
 
-    public NetworkConnector(URI localURI, DiscoveryAgent discoveryAgent) throws IOException {
+    public NetworkConnector(BrokerService service,URI localURI, DiscoveryAgent discoveryAgent) throws IOException {
+        this.brokerService = service;
         this.localURI = localURI;
         setDiscoveryAgent(discoveryAgent);
     }
@@ -161,7 +164,7 @@
         this.discoveryAgent = discoveryAgent;
         if (discoveryAgent != null) {
             this.discoveryAgent.setDiscoveryListener(this);
-            this.discoveryAgent.setBrokerName(brokerName);
+            this.discoveryAgent.setBrokerName(brokerService.getBrokerName());
         }
     }
 
@@ -180,7 +183,7 @@
     // Implementation methods
     // -------------------------------------------------------------------------
     protected Bridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) {
-        return new DemandForwardingBridge(localTransport, remoteTransport) {
+        DemandForwardingBridge result =  new DemandForwardingBridge(localTransport, remoteTransport) {
             protected void serviceRemoteException(IOException error) {
                 super.serviceRemoteException(error);
                 try {
@@ -190,14 +193,11 @@
                 }
             }
         };
+        result.setLocalBrokerName(brokerService.getBrokerName());
+        return result;
     }
 
-    public void setBrokerName(String brokerName) {
-        this.brokerName = brokerName;
-        if( discoveryAgent!=null ) {
-            discoveryAgent.setBrokerName(brokerName);
-        }
-    }
+    
 
     public boolean isFailover() {
         return failover;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java?rev=373863&r1=373862&r2=373863&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java Tue Jan 31 08:35:13 2006
@@ -1,18 +1,15 @@
 /**
- *
+ * 
  * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.activemq.transport.vm;
 
@@ -21,7 +18,6 @@
 import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Map;
-
 import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerRegistry;
 import org.apache.activemq.broker.BrokerService;
@@ -36,153 +32,156 @@
 import org.apache.activemq.util.ServiceSupport;
 import org.apache.activemq.util.URISupport;
 import org.apache.activemq.util.URISupport.CompositeData;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 
-public class VMTransportFactory extends TransportFactory {
-
-    final public static ConcurrentHashMap brokers = new ConcurrentHashMap();
-    final public static ConcurrentHashMap connectors = new ConcurrentHashMap();
-    final public static ConcurrentHashMap servers = new ConcurrentHashMap();
 
+public class VMTransportFactory extends TransportFactory{
+    private static final Log log = LogFactory.getLog(VMTransportFactory.class);
+    final public static ConcurrentHashMap brokers=new ConcurrentHashMap();
+    final public static ConcurrentHashMap connectors=new ConcurrentHashMap();
+    final public static ConcurrentHashMap servers=new ConcurrentHashMap();
     BrokerFactoryHandler brokerFactoryHandler;
-    
-    public Transport doConnect(URI location) throws Exception {
+
+    public Transport doConnect(URI location) throws Exception{
         return VMTransportServer.configure(doCompositeConnect(location));
     }
 
-    public Transport doCompositeConnect(URI location) throws Exception {
+    public Transport doCompositeConnect(URI location) throws Exception{
         URI brokerURI;
         String host;
         Map options;
-
-        CompositeData data = URISupport.parseComposite(location);
-        if( data.getComponents().length==1 && "broker".equals(data.getComponents()[0].getScheme()) ) {
-            brokerURI = data.getComponents()[0];
-            
-            CompositeData brokerData = URISupport.parseComposite(brokerURI);
-            host = (String)brokerData.getParameters().get("brokerName");
-            if( host == null )
-                host = "localhost";
-            if( brokerData.getPath()!=null )
-                host = data.getPath();
-            
-            options = data.getParameters();
-            location = new URI("vm://"+host);
-        } else {
+        CompositeData data=URISupport.parseComposite(location);
+        if(data.getComponents().length==1&&"broker".equals(data.getComponents()[0].getScheme())){
+            brokerURI=data.getComponents()[0];
+            CompositeData brokerData=URISupport.parseComposite(brokerURI);
+            host=(String) brokerData.getParameters().get("brokerName");
+            if(host==null)
+                host="localhost";
+            if(brokerData.getPath()!=null)
+                host=data.getPath();
+            options=data.getParameters();
+            location=new URI("vm://"+host);
+        }else{
             // If using the less complex vm://localhost?broker.persistent=true form
-            try {
-                host =  location.getHost();
-                options = URISupport.parseParamters(location);
-                String config = (String) options.remove("brokerConfig");
-                if( config != null ) {
-                    brokerURI = new URI(config);
-                } else {
-                    Map brokerOptions = IntrospectionSupport.extractProperties(options, "broker.");
-                    brokerURI = new URI("broker://()/"+host+"?"+URISupport.createQueryString(brokerOptions));
+            try{
+                host=location.getHost();
+                options=URISupport.parseParamters(location);
+                String config=(String) options.remove("brokerConfig");
+                if(config!=null){
+                    brokerURI=new URI(config);
+                }else{
+                    Map brokerOptions=IntrospectionSupport.extractProperties(options,"broker.");
+                    brokerURI=new URI("broker://()/"+host+"?"+URISupport.createQueryString(brokerOptions));
                 }
-            } catch (URISyntaxException e1) {
+            }catch(URISyntaxException e1){
                 throw IOExceptionSupport.create(e1);
             }
-            
-            location = new URI("vm://"+host);
+            location=new URI("vm://"+host);
         }
-        
-        VMTransportServer server = (VMTransportServer) servers.get(host);   
-        //validate the broker is still active
-        if( !validateBroker(host) || server == null ) {
-            BrokerService broker = BrokerRegistry.getInstance().lookup(host);
-            if (broker == null) {
-                try {
-                    if( brokerFactoryHandler !=null ) {
-                        broker = brokerFactoryHandler.createBroker(brokerURI);
-                    } else {
-                        broker = BrokerFactory.createBroker(brokerURI);
+        VMTransportServer server=(VMTransportServer) servers.get(host);
+        // validate the broker is still active
+        if(!validateBroker(host)||server==null){
+            BrokerService broker=BrokerRegistry.getInstance().lookup(host);
+            if(broker==null){
+                try{
+                    if(brokerFactoryHandler!=null){
+                        broker=brokerFactoryHandler.createBroker(brokerURI);
+                    }else{
+                        broker=BrokerFactory.createBroker(brokerURI);
                     }
                     broker.start();
-                }
-                catch (URISyntaxException e) {
+                }catch(URISyntaxException e){
                     throw IOExceptionSupport.create(e);
                 }
-                brokers.put(host, broker);
+                brokers.put(host,broker);
             }
-            server = (VMTransportServer) servers.get(host);
-            if (server == null) {
-                server = (VMTransportServer) bind(location, true);
-                TransportConnector connector = new TransportConnector(broker.getBroker(), server);
+            server=(VMTransportServer) servers.get(host);
+            if(server==null){
+                server=(VMTransportServer) bind(location,true);
+                TransportConnector connector=new TransportConnector(broker.getBroker(),server);
                 connector.start();
-                connectors.put(host, connector);
+                connectors.put(host,connector);
             }
-        }else {
-            
-        }
-
-        VMTransport vmtransport = server.connect();
-        IntrospectionSupport.setProperties(vmtransport, options);
-
-        Transport transport = vmtransport;
-        if (vmtransport.isMarshal()) {
-            HashMap optionsCopy = new HashMap(options);
-            transport = new MarshallingTransportFilter(transport, createWireFormat(options), createWireFormat(optionsCopy));
+        }else{}
+        VMTransport vmtransport=server.connect();
+        IntrospectionSupport.setProperties(vmtransport,options);
+        Transport transport=vmtransport;
+        if(vmtransport.isMarshal()){
+            HashMap optionsCopy=new HashMap(options);
+            transport=new MarshallingTransportFilter(transport,createWireFormat(options),createWireFormat(optionsCopy));
         }
-
-        if( !options.isEmpty() ) {
+        if(!options.isEmpty()){
             throw new IllegalArgumentException("Invalid connect parameters: "+options);
         }
-        
         return transport;
     }
 
-    public TransportServer doBind(String brokerId,URI location) throws IOException {
-        return bind(location, false);
+    public TransportServer doBind(String brokerId,URI location) throws IOException{
+        return bind(location,false);
     }
 
     /**
      * @param location
-     * @return
+     * @return the TransportServer
      * @throws IOException
      */
-    private TransportServer bind(URI location, boolean dispose) throws IOException {
-        String host = location.getHost();
-        VMTransportServer server = new VMTransportServer(location, dispose);
-        Object currentBoundValue = servers.get(host);
-        if (currentBoundValue != null) {
-            throw new IOException("VMTransportServer already bound at: " + location);
+    private TransportServer bind(URI location,boolean dispose) throws IOException{
+        String host=location.getHost();
+        log.info("binding to broker: " + host);
+        VMTransportServer server=new VMTransportServer(location,dispose);
+        Object currentBoundValue=servers.get(host);
+        if(currentBoundValue!=null){
+            throw new IOException("VMTransportServer already bound at: "+location);
         }
-        servers.put(host, server);
+        servers.put(host,server);
         return server;
     }
 
-    public static void stopped(VMTransportServer server) {
-        String host = server.getBindURI().getHost();
+    public static void stopped(VMTransportServer server){
+        String host=server.getBindURI().getHost();
+        log.info("Shutting down VM connectors for broker: "  +host);
+        servers.remove(host);
+        TransportConnector connector=(TransportConnector) connectors.remove(host);
+        if(connector!=null){
+            ServiceSupport.dispose(connector);
+            BrokerService broker=(BrokerService) brokers.remove(host);
+            if(broker!=null){
+                ServiceSupport.dispose(broker);
+            }
+        }
+    }
+
+    public static void stopped(String host){
+        log.info("Shutting down VM connectors for broker: "  +host);
         servers.remove(host);
-        TransportConnector connector = (TransportConnector) connectors.remove(host);
-        if (connector != null) {
+        TransportConnector connector=(TransportConnector) connectors.remove(host);
+        if(connector!=null){
             ServiceSupport.dispose(connector);
-            BrokerService broker = (BrokerService) brokers.remove(host);
-            if (broker != null) {
+            BrokerService broker=(BrokerService) brokers.remove(host);
+            if(broker!=null){
                 ServiceSupport.dispose(broker);
             }
         }
     }
 
-    public BrokerFactoryHandler getBrokerFactoryHandler() {
+    public BrokerFactoryHandler getBrokerFactoryHandler(){
         return brokerFactoryHandler;
     }
 
-    public void setBrokerFactoryHandler(BrokerFactoryHandler brokerFactoryHandler) {
-        this.brokerFactoryHandler = brokerFactoryHandler;
+    public void setBrokerFactoryHandler(BrokerFactoryHandler brokerFactoryHandler){
+        this.brokerFactoryHandler=brokerFactoryHandler;
     }
 
-    
     private boolean validateBroker(String host){
         boolean result=true;
         if(brokers.containsKey(host)||servers.containsKey(host)||connectors.containsKey(host)){
-            //check the broker is still in the BrokerRegistry
+            // check the broker is still in the BrokerRegistry
             TransportConnector connector=(TransportConnector) connectors.get(host);
             if(BrokerRegistry.getInstance().lookup(host)==null||(connector!=null&&connector.getBroker().isStopped())){
                 result=false;
-                //clean-up
+                // clean-up
                 brokers.remove(host);
                 servers.remove(host);
                 if(connector!=null){

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java?rev=373863&r1=373862&r2=373863&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java Tue Jan 31 08:35:13 2006
@@ -127,7 +127,7 @@
     protected void setUp() throws Exception {
         super.setUp();
         bridge = new DemandForwardingBridge(createTransport(), createRemoteTransport());
-        bridge.setClientId("local-remote-bridge");
+        bridge.setLocalBrokerName("local");
         bridge.setDispatchAsync(false);
         bridge.start();
         



Mime
View raw message