activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r378967 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
Date Sun, 19 Feb 2006 22:13:55 GMT
Author: rajdavies
Date: Sun Feb 19 14:13:54 2006
New Revision: 378967

URL: http://svn.apache.org/viewcvs?rev=378967&view=rev
Log:
remove await latch call if closing a loop back network connector - this
should 'fix' - hanging SpringTest.

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?rev=378967&r1=378966&r2=378967&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
Sun Feb 19 14:13:54 2006
@@ -42,6 +42,7 @@
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.JMSExceptionSupport;
 import org.apache.activemq.util.LongSequenceGenerator;
@@ -93,6 +94,7 @@
     protected CountDownLatch startedLatch = new CountDownLatch(2);
     protected Object brokerInfoMutex = new Object();
     protected boolean decreaseNetworkConsumerPriority;
+    protected boolean shutDown;
     protected int networkTTL = 1;
     
     
@@ -113,7 +115,7 @@
                 serviceLocalException(error);
             }
         });
-        remoteBroker.setTransportListener(new DefaultTransportListener(){
+        remoteBroker.setTransportListener(new TransportListener(){
             public void onCommand(Command command){
                 serviceRemoteCommand(command);
             }
@@ -121,6 +123,20 @@
             public void onException(IOException error){
                 serviceRemoteException(error);
             }
+
+            public void transportInterupted(){
+                //clear any subscriptions - to try and prevent the bridge from stalling the
broker
+                log.warn("Outbound transport to " + remoteBrokerName +  " interrupted ...");
+                clearDownSubscriptions();
+                
+            }
+
+            public void transportResumed(){
+                //restart and static subscriptions - the consumer advisories will be replayed
+                log.info("Outbound transport to " + remoteBrokerName + " resumed");
+                setupStaticDestinations();
+                
+            }
         });
         localBroker.start();
         remoteBroker.start();
@@ -195,25 +211,32 @@
     }
     
    
+    public void stop() throws Exception{
+        shutDown = true;
+        doStop();
+    }
 
     /**
      * stop the bridge
      * @throws Exception 
      */
-    public void stop() throws Exception{
+    protected void doStop() throws Exception{
+        log.debug(" stopping "+localBrokerName+ " bridge to " + remoteBrokerName + " is disposed
already ? "+disposed);
         if(!disposed){
             try{
                 disposed=true;
                 localBridgeStarted.set(false);
                 remoteBridgeStarted.set(false);
-                if(localConnectionInfo!=null){
-                    localBroker.request(localConnectionInfo.createRemoveCommand());
-                    remoteBroker.request(remoteConnectionInfo.createRemoveCommand());
+                if(!shutDown){
+                   remoteBroker.oneway(new ShutdownInfo());
+                    if(localConnectionInfo!=null){
+                        localBroker.oneway(localConnectionInfo.createRemoveCommand());
+                        remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
+                    }
+                    localBroker.oneway(new ShutdownInfo());
                 }
                 localBroker.setTransportListener(null);
                 remoteBroker.setTransportListener(null);
-                remoteBroker.oneway(new ShutdownInfo());
-                localBroker.oneway(new ShutdownInfo());
             }catch(IOException e){
                 log.debug("Caught exception stopping",e);
             }finally{
@@ -223,6 +246,7 @@
                 ss.throwFirstException();
             }
         }
+        log.debug(localBrokerName+ " bridge to " + remoteBrokerName + " stopped");
     }
 
     protected void serviceRemoteException(Exception error){
@@ -251,7 +275,7 @@
                         if(localBrokerId!=null){
                             if(localBrokerId.equals(remoteBrokerId)){
                                 log.info("Disconnecting loop back connection.");
-                                waitStarted();
+                                //waitStarted();
                                 ServiceSupport.dispose(this);
                             }
                         }
@@ -345,7 +369,6 @@
         if(message.getOriginalTransactionId()==null)
             message.setOriginalTransactionId(message.getTransactionId());
         message.setTransactionId(null);
-        message.setRecievedByDFBridge(true);
         message.evictMarshlledForm();
         return message;
     }
@@ -393,8 +416,10 @@
                     }
                 }else if(command.isShutdownInfo()){
                     log.info(localBrokerName+" Shutting down");
-                    disposed = true;
-                    stop();
+                    shutDown = true;
+                    doStop();
+                   
+                    
                 }else{
                     switch(command.getDataStructureType()){
                     case WireFormatInfo.DATA_STRUCTURE_TYPE:
@@ -567,6 +592,21 @@
     public void setNetworkTTL(int networkTTL){
         this.networkTTL=networkTTL;
     }
+    
+    /**
+     * @return Returns the shutDown.
+     */
+    public boolean isShutDown(){
+        return shutDown;
+    }
+
+    /**
+     * @param shutDown The shutDown to set.
+     */
+    public void setShutDown(boolean shutDown){
+        this.shutDown=shutDown;
+    }
+
 
 
     private boolean contains(BrokerId[] brokerPath,BrokerId brokerId){
@@ -616,16 +656,19 @@
     
     /**
      * Subscriptions for these desitnations are always created
-     * @throws IOException 
      *
      */
-    protected void setupStaticDestinations() throws IOException{
+    protected void setupStaticDestinations(){
         ActiveMQDestination[] dests = staticallyIncludedDestinations;
         if (dests != null){
             for(int i=0;i<dests.length;i++){
                 ActiveMQDestination dest=dests[i];
                 DemandSubscription sub = createDemandSubscription(dest);
-                addSubscription(sub);
+                try{
+                    addSubscription(sub);
+                }catch(IOException e){
+                   log.error("Failed to add static destination " + dest,e);
+                }
                 if(log.isTraceEnabled())
                     log.trace("Forwarding messages for static destination: " + dest);
             } 
@@ -633,6 +676,10 @@
     }
     
     protected DemandSubscription createDemandSubscription(ConsumerInfo info){
+     return doCreateDemandSubscription(info);
+    }
+    
+    protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info){
         DemandSubscription result=new DemandSubscription(info);
         result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator
                         .getNextSequenceId()));
@@ -711,7 +758,11 @@
         if(message.isAdvisory()&&message.getDataStructure()!=null
                         &&message.getDataStructure().getDataStructureType()==CommandTypes.CONSUMER_INFO){
             ConsumerInfo info=(ConsumerInfo) message.getDataStructure();
-            if(info.isNetworkSubscription()){
+            hops = info.getBrokerPath() == null ? 0 : message.getBrokerPath().length;
+            if(hops >= networkTTL){
+                if (log.isTraceEnabled()){
+                    log.trace("ConsumerInfo advisory restricted to " + networkTTL + " network
hops ignoring: " + message);
+                }
                 return false;
             }
         }
@@ -721,7 +772,12 @@
     protected void waitStarted() throws InterruptedException {
         startedLatch.await();
     }
+    
+    protected void clearDownSubscriptions(){
+        
+    }
 
+  
     
 
     



Mime
View raw message