Return-Path: Delivered-To: apmail-geronimo-activemq-commits-archive@www.apache.org Received: (qmail 49429 invoked from network); 19 Feb 2006 22:14:17 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 19 Feb 2006 22:14:17 -0000 Received: (qmail 62859 invoked by uid 500); 19 Feb 2006 22:14:17 -0000 Delivered-To: apmail-geronimo-activemq-commits-archive@geronimo.apache.org Received: (qmail 62840 invoked by uid 500); 19 Feb 2006 22:14:17 -0000 Mailing-List: contact activemq-commits-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-dev@geronimo.apache.org Delivered-To: mailing list activemq-commits@geronimo.apache.org Received: (qmail 62831 invoked by uid 99); 19 Feb 2006 22:14:17 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 19 Feb 2006 14:14:17 -0800 X-ASF-Spam-Status: No, hits=-8.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [209.237.227.194] (HELO minotaur.apache.org) (209.237.227.194) by apache.org (qpsmtpd/0.29) with SMTP; Sun, 19 Feb 2006 14:14:16 -0800 Received: (qmail 49357 invoked by uid 65534); 19 Feb 2006 22:13:55 -0000 Message-ID: <20060219221355.49356.qmail@minotaur.apache.org> Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r378967 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java Date: Sun, 19 Feb 2006 22:13:55 -0000 To: activemq-commits@geronimo.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.6 X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: rajdavies Date: Sun Feb 19 14:13:54 2006 New Revision: 378967 URL: http://svn.apache.org/viewcvs?rev=378967&view=rev Log: remove await latch call if closing a loop back network connector - this should 'fix' - hanging SpringTest. Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?rev=378967&r1=378966&r2=378967&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java Sun Feb 19 14:13:54 2006 @@ -42,6 +42,7 @@ import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportListener; import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.JMSExceptionSupport; import org.apache.activemq.util.LongSequenceGenerator; @@ -93,6 +94,7 @@ protected CountDownLatch startedLatch = new CountDownLatch(2); protected Object brokerInfoMutex = new Object(); protected boolean decreaseNetworkConsumerPriority; + protected boolean shutDown; protected int networkTTL = 1; @@ -113,7 +115,7 @@ serviceLocalException(error); } }); - remoteBroker.setTransportListener(new DefaultTransportListener(){ + remoteBroker.setTransportListener(new TransportListener(){ public void onCommand(Command command){ serviceRemoteCommand(command); } @@ -121,6 +123,20 @@ public void onException(IOException error){ serviceRemoteException(error); } + + public void transportInterupted(){ + //clear any subscriptions - to try and prevent the bridge from stalling the broker + log.warn("Outbound transport to " + remoteBrokerName + " interrupted ..."); + clearDownSubscriptions(); + + } + + public void transportResumed(){ + //restart and static subscriptions - the consumer advisories will be replayed + log.info("Outbound transport to " + remoteBrokerName + " resumed"); + setupStaticDestinations(); + + } }); localBroker.start(); remoteBroker.start(); @@ -195,25 +211,32 @@ } + public void stop() throws Exception{ + shutDown = true; + doStop(); + } /** * stop the bridge * @throws Exception */ - public void stop() throws Exception{ + protected void doStop() throws Exception{ + log.debug(" stopping "+localBrokerName+ " bridge to " + remoteBrokerName + " is disposed already ? "+disposed); if(!disposed){ try{ disposed=true; localBridgeStarted.set(false); remoteBridgeStarted.set(false); - if(localConnectionInfo!=null){ - localBroker.request(localConnectionInfo.createRemoveCommand()); - remoteBroker.request(remoteConnectionInfo.createRemoveCommand()); + if(!shutDown){ + remoteBroker.oneway(new ShutdownInfo()); + if(localConnectionInfo!=null){ + localBroker.oneway(localConnectionInfo.createRemoveCommand()); + remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand()); + } + localBroker.oneway(new ShutdownInfo()); } localBroker.setTransportListener(null); remoteBroker.setTransportListener(null); - remoteBroker.oneway(new ShutdownInfo()); - localBroker.oneway(new ShutdownInfo()); }catch(IOException e){ log.debug("Caught exception stopping",e); }finally{ @@ -223,6 +246,7 @@ ss.throwFirstException(); } } + log.debug(localBrokerName+ " bridge to " + remoteBrokerName + " stopped"); } protected void serviceRemoteException(Exception error){ @@ -251,7 +275,7 @@ if(localBrokerId!=null){ if(localBrokerId.equals(remoteBrokerId)){ log.info("Disconnecting loop back connection."); - waitStarted(); + //waitStarted(); ServiceSupport.dispose(this); } } @@ -345,7 +369,6 @@ if(message.getOriginalTransactionId()==null) message.setOriginalTransactionId(message.getTransactionId()); message.setTransactionId(null); - message.setRecievedByDFBridge(true); message.evictMarshlledForm(); return message; } @@ -393,8 +416,10 @@ } }else if(command.isShutdownInfo()){ log.info(localBrokerName+" Shutting down"); - disposed = true; - stop(); + shutDown = true; + doStop(); + + }else{ switch(command.getDataStructureType()){ case WireFormatInfo.DATA_STRUCTURE_TYPE: @@ -567,6 +592,21 @@ public void setNetworkTTL(int networkTTL){ this.networkTTL=networkTTL; } + + /** + * @return Returns the shutDown. + */ + public boolean isShutDown(){ + return shutDown; + } + + /** + * @param shutDown The shutDown to set. + */ + public void setShutDown(boolean shutDown){ + this.shutDown=shutDown; + } + private boolean contains(BrokerId[] brokerPath,BrokerId brokerId){ @@ -616,16 +656,19 @@ /** * Subscriptions for these desitnations are always created - * @throws IOException * */ - protected void setupStaticDestinations() throws IOException{ + protected void setupStaticDestinations(){ ActiveMQDestination[] dests = staticallyIncludedDestinations; if (dests != null){ for(int i=0;i= networkTTL){ + if (log.isTraceEnabled()){ + log.trace("ConsumerInfo advisory restricted to " + networkTTL + " network hops ignoring: " + message); + } return false; } } @@ -721,7 +772,12 @@ protected void waitStarted() throws InterruptedException { startedLatch.await(); } + + protected void clearDownSubscriptions(){ + + } +