activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gno...@apache.org
Subject svn commit: r374292 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
Date Thu, 02 Feb 2006 07:47:21 GMT
Author: gnodet
Date: Wed Feb  1 23:47:16 2006
New Revision: 374292

URL: http://svn.apache.org/viewcvs?rev=374292&view=rev
Log:
Fix threading problems in the DemandForwardingBridge.java

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?rev=374292&r1=374291&r2=374292&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
Wed Feb  1 23:47:16 2006
@@ -16,7 +16,6 @@
 import java.io.IOException;
 import javax.jms.JMSException;
 import org.apache.activemq.advisory.AdvisorySupport;
-import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -49,6 +48,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 /**
  * Forwards messages from the local broker to the remote broker based on demand.
@@ -94,6 +94,7 @@
     ConcurrentHashMap subscriptionMapByRemoteId=new ConcurrentHashMap();
     protected final BrokerId localBrokerPath[]=new BrokerId[] { null };
     protected final BrokerId remoteBrokerPath[]=new BrokerId[] { null };
+    private CountDownLatch startedLatch = new CountDownLatch(2);
 
     public DemandForwardingBridge(Transport localBroker,Transport remoteBroker){
         this.localBroker=localBroker;
@@ -162,6 +163,7 @@
             localBroker.oneway(localSessionInfo);
             log.info("Network connection between "+localBroker+" and "+remoteBroker+"("+remoteBrokerName
                             +") has been established.");
+            startedLatch.countDown();
         }
     }
 
@@ -186,6 +188,7 @@
                             +destinationFilter));
             demandConsumerInfo.setPrefetchSize(prefetchSize);
             remoteBroker.oneway(demandConsumerInfo);
+            startedLatch.countDown();
         }
     }
 
@@ -214,7 +217,7 @@
         }
     }
 
-    protected void serviceRemoteException(IOException error){
+    protected void serviceRemoteException(Exception error){
         log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown:
"+error.getMessage(),error);
         ServiceSupport.dispose(this);
     }
@@ -223,6 +226,7 @@
         if(!disposed){
             try{
                 if(command.isMessageDispatch()){
+                    waitStarted();
                     MessageDispatch md=(MessageDispatch) command;
                     serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
                     demandConsumerDispatched++;
@@ -239,6 +243,7 @@
                         if(localBrokerId!=null){
                             if(localBrokerId.equals(remoteBrokerId)){
                                 log.info("Disconnecting loop back connection.");
+                                waitStarted();
                                 ServiceSupport.dispose(this);
                             }else{
                                 triggerLocalStartBridge();
@@ -253,7 +258,7 @@
                         log.warn("Unexpected remote command: "+command);
                     }
                 }
-            }catch(IOException e){
+            }catch(Exception e){
                 serviceRemoteException(e);
             }
         }
@@ -343,6 +348,7 @@
             final boolean trace=log.isTraceEnabled();
             try{
                 if(command.isMessageDispatch()){
+                    waitStarted();
                     MessageDispatch md=(MessageDispatch) command;
                     Message message=md.getMessage();
                     DemandSubscription sub=(DemandSubscription) subscriptionMapByLocalId.get(md.getConsumerId());
@@ -381,6 +387,7 @@
                         if(remoteBrokerId!=null){
                             if(remoteBrokerId.equals(localBrokerId)){
                                 log.info("Disconnecting loop back connection.");
+                                waitStarted();
                                 ServiceSupport.dispose(this);
                             }
                         }
@@ -459,5 +466,9 @@
         System.arraycopy(brokerPath,0,rc,0,brokerPath.length);
         System.arraycopy(pathsToAppend,0,rc,brokerPath.length,pathsToAppend.length);
         return rc;
+    }
+    
+    private void waitStarted() throws InterruptedException {
+        startedLatch.await();
     }
 }



Mime
View raw message