Return-Path: Delivered-To: apmail-geronimo-activemq-commits-archive@www.apache.org Received: (qmail 41096 invoked from network); 2 Feb 2006 07:47:43 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 2 Feb 2006 07:47:43 -0000 Received: (qmail 96886 invoked by uid 500); 2 Feb 2006 07:47:43 -0000 Delivered-To: apmail-geronimo-activemq-commits-archive@geronimo.apache.org Received: (qmail 96874 invoked by uid 500); 2 Feb 2006 07:47:43 -0000 Mailing-List: contact activemq-commits-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-dev@geronimo.apache.org Delivered-To: mailing list activemq-commits@geronimo.apache.org Received: (qmail 96865 invoked by uid 99); 2 Feb 2006 07:47:42 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Feb 2006 23:47:42 -0800 X-ASF-Spam-Status: No, hits=-8.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [209.237.227.194] (HELO minotaur.apache.org) (209.237.227.194) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 01 Feb 2006 23:47:42 -0800 Received: (qmail 41037 invoked by uid 65534); 2 Feb 2006 07:47:21 -0000 Message-ID: <20060202074721.41036.qmail@minotaur.apache.org> Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r374292 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java Date: Thu, 02 Feb 2006 07:47:21 -0000 To: activemq-commits@geronimo.apache.org From: gnodet@apache.org X-Mailer: svnmailer-1.0.5 X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: gnodet Date: Wed Feb 1 23:47:16 2006 New Revision: 374292 URL: http://svn.apache.org/viewcvs?rev=374292&view=rev Log: Fix threading problems in the DemandForwardingBridge.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?rev=374292&r1=374291&r2=374292&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java Wed Feb 1 23:47:16 2006 @@ -16,7 +16,6 @@ import java.io.IOException; import javax.jms.JMSException; import org.apache.activemq.advisory.AdvisorySupport; -import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; @@ -49,6 +48,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; +import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch; import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; /** * Forwards messages from the local broker to the remote broker based on demand. @@ -94,6 +94,7 @@ ConcurrentHashMap subscriptionMapByRemoteId=new ConcurrentHashMap(); protected final BrokerId localBrokerPath[]=new BrokerId[] { null }; protected final BrokerId remoteBrokerPath[]=new BrokerId[] { null }; + private CountDownLatch startedLatch = new CountDownLatch(2); public DemandForwardingBridge(Transport localBroker,Transport remoteBroker){ this.localBroker=localBroker; @@ -162,6 +163,7 @@ localBroker.oneway(localSessionInfo); log.info("Network connection between "+localBroker+" and "+remoteBroker+"("+remoteBrokerName +") has been established."); + startedLatch.countDown(); } } @@ -186,6 +188,7 @@ +destinationFilter)); demandConsumerInfo.setPrefetchSize(prefetchSize); remoteBroker.oneway(demandConsumerInfo); + startedLatch.countDown(); } } @@ -214,7 +217,7 @@ } } - protected void serviceRemoteException(IOException error){ + protected void serviceRemoteException(Exception error){ log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),error); ServiceSupport.dispose(this); } @@ -223,6 +226,7 @@ if(!disposed){ try{ if(command.isMessageDispatch()){ + waitStarted(); MessageDispatch md=(MessageDispatch) command; serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure()); demandConsumerDispatched++; @@ -239,6 +243,7 @@ if(localBrokerId!=null){ if(localBrokerId.equals(remoteBrokerId)){ log.info("Disconnecting loop back connection."); + waitStarted(); ServiceSupport.dispose(this); }else{ triggerLocalStartBridge(); @@ -253,7 +258,7 @@ log.warn("Unexpected remote command: "+command); } } - }catch(IOException e){ + }catch(Exception e){ serviceRemoteException(e); } } @@ -343,6 +348,7 @@ final boolean trace=log.isTraceEnabled(); try{ if(command.isMessageDispatch()){ + waitStarted(); MessageDispatch md=(MessageDispatch) command; Message message=md.getMessage(); DemandSubscription sub=(DemandSubscription) subscriptionMapByLocalId.get(md.getConsumerId()); @@ -381,6 +387,7 @@ if(remoteBrokerId!=null){ if(remoteBrokerId.equals(localBrokerId)){ log.info("Disconnecting loop back connection."); + waitStarted(); ServiceSupport.dispose(this); } } @@ -459,5 +466,9 @@ System.arraycopy(brokerPath,0,rc,0,brokerPath.length); System.arraycopy(pathsToAppend,0,rc,brokerPath.length,pathsToAppend.length); return rc; + } + + private void waitStarted() throws InterruptedException { + startedLatch.await(); } }