Return-Path: Delivered-To: apmail-geronimo-activemq-commits-archive@www.apache.org Received: (qmail 21723 invoked from network); 11 Jul 2006 04:45:32 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 11 Jul 2006 04:45:32 -0000 Received: (qmail 7059 invoked by uid 500); 11 Jul 2006 04:45:32 -0000 Delivered-To: apmail-geronimo-activemq-commits-archive@geronimo.apache.org Received: (qmail 7019 invoked by uid 500); 11 Jul 2006 04:45:32 -0000 Mailing-List: contact activemq-commits-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-dev@geronimo.apache.org Delivered-To: mailing list activemq-commits@geronimo.apache.org Received: (qmail 7009 invoked by uid 99); 11 Jul 2006 04:45:32 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Jul 2006 21:45:32 -0700 X-ASF-Spam-Status: No, hits=-8.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: local policy) Received: from [140.211.166.113] (HELO eris.apache.org) (140.211.166.113) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Jul 2006 21:45:30 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 892A51A981A; Mon, 10 Jul 2006 21:45:10 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r420722 - in /incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network: DemandForwardingBridge.java DemandForwardingBridgeSupport.java DiscoveryNetworkConnector.java Date: Tue, 11 Jul 2006 04:45:09 -0000 To: activemq-commits@geronimo.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20060711044510.892A51A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: chirino Date: Mon Jul 10 21:45:09 2006 New Revision: 420722 URL: http://svn.apache.org/viewvc?rev=420722&view=rev Log: Fixed Network Connection failure recovery. http://issues.apache.org/activemq/browse/AMQ-802 http://issues.apache.org/activemq/browse/AMQ-805 Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?rev=420722&r1=420721&r2=420722&view=diff ============================================================================== --- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java (original) +++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java Mon Jul 10 21:45:09 2006 @@ -53,9 +53,6 @@ ServiceSupport.dispose(this); } } - if (!disposed){ - triggerLocalStartBridge(); - } } } Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=420722&r1=420721&r2=420722&view=diff ============================================================================== --- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original) +++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Mon Jul 10 21:45:09 2006 @@ -70,14 +70,14 @@ protected static final Log log = LogFactory.getLog(DemandForwardingBridge.class); protected final Transport localBroker; protected final Transport remoteBroker; - protected IdGenerator idGenerator = new IdGenerator(); - protected LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); + protected final IdGenerator idGenerator = new IdGenerator(); + protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); protected ConnectionInfo localConnectionInfo; protected ConnectionInfo remoteConnectionInfo; protected SessionInfo localSessionInfo; protected ProducerInfo producerInfo; - protected String localBrokerName; - protected String remoteBrokerName; + protected String localBrokerName = "Unknown"; + protected String remoteBrokerName = "Unknown"; protected String localClientId; protected String userName; protected String password; @@ -87,22 +87,22 @@ protected String name = "bridge"; protected ConsumerInfo demandConsumerInfo; protected int demandConsumerDispatched; - protected AtomicBoolean localBridgeStarted = new AtomicBoolean(false); - protected AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false); + protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false); + protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false); protected boolean disposed = false; protected BrokerId localBrokerId; protected ActiveMQDestination[] excludedDestinations; protected ActiveMQDestination[] dynamicallyIncludedDestinations; protected ActiveMQDestination[] staticallyIncludedDestinations; protected ActiveMQDestination[] durableDestinations; - protected ConcurrentHashMap subscriptionMapByLocalId = new ConcurrentHashMap(); - protected ConcurrentHashMap subscriptionMapByRemoteId = new ConcurrentHashMap(); + protected final ConcurrentHashMap subscriptionMapByLocalId = new ConcurrentHashMap(); + protected final ConcurrentHashMap subscriptionMapByRemoteId = new ConcurrentHashMap(); protected final BrokerId localBrokerPath[] = new BrokerId[] { null }; protected CountDownLatch startedLatch = new CountDownLatch(2); protected boolean decreaseNetworkConsumerPriority; - protected boolean shutDown; protected int networkTTL = 1; protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false); + protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false); public DemandForwardingBridgeSupport(final Transport localBroker, final Transport remoteBroker) { @@ -111,7 +111,6 @@ } public void start() throws Exception { - log.info("Starting a network connection between "+localBroker+" and "+remoteBroker+" has been established."); localBroker.setTransportListener(new DefaultTransportListener(){ public void onCommand(Command command){ serviceLocalCommand(command); @@ -130,16 +129,23 @@ serviceRemoteException(error); } - public synchronized void transportInterupted(){ + public void transportInterupted(){ //clear any subscriptions - to try and prevent the bridge from stalling the broker if( remoteInterupted.compareAndSet(false, true) ) { - log.warn("Outbound transport to " + remoteBrokerName + " interrupted ..."); - clearDownSubscriptions(); - try{ - localBroker.oneway(localConnectionInfo.createRemoveCommand()); - }catch(IOException e){ - log.warn("Caught exception from local start",e); - } + + log.debug("Outbound transport to " + remoteBrokerName + " interrupted."); + + if( localBridgeStarted.get() ) { + clearDownSubscriptions(); + synchronized( DemandForwardingBridgeSupport.this ) { + try{ + localBroker.oneway(localConnectionInfo.createRemoveCommand()); + }catch(IOException e){ + log.warn("Caught exception from local start",e); + } + } + } + localBridgeStarted.set(false); remoteBridgeStarted.set(false); startedLatch = new CountDownLatch(2); @@ -147,35 +153,33 @@ } - public synchronized void transportResumed(){ - + public void transportResumed(){ if( remoteInterupted.compareAndSet(true, false) ) { - - //restart and static subscriptions - the consumer advisories will be replayed - log.info("Outbound transport to " + remoteBrokerName + " resumed"); - -// try{ -// triggerLocalStartBridge(); -// }catch(IOException e){ -// log.warn("Caught exception from local start",e); -// } - - try{ - // clear out the previous connection as it may have missed some consumer advisories. - remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand()); - triggerRemoteStartBridge(); - }catch(IOException e){ - log.warn("Caught exception from remote start",e); - } - + + // We want to slow down false connects so that we don't get in a busy loop. + // False connects can occurr if you using SSH tunnels. + if( !lastConnectSucceeded.get() ) { + try { + log.debug("Previous connection was never fully established. Sleeping for second to avoid busy loop."); + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + lastConnectSucceeded.set(false); + + log.debug("Outbound transport to " + remoteBrokerName + " resumed"); } - } }); localBroker.start(); remoteBroker.start(); -// triggerLocalStartBridge(); - triggerRemoteStartBridge(); + + try{ + triggerRemoteStartBridge(); + }catch(IOException e){ + log.warn("Caught exception from remote start",e); + } } protected void triggerLocalStartBridge() throws IOException { @@ -184,7 +188,7 @@ try{ startLocalBridge(); }catch(Exception e){ - log.error("Failed to start network bridge: "+e,e); + serviceLocalException(e); } } }; @@ -197,7 +201,7 @@ try{ startRemoteBridge(); }catch(Exception e){ - log.error("Failed to start network bridge: "+e,e); + serviceRemoteException(e); } } }; @@ -206,121 +210,109 @@ protected void startLocalBridge() throws Exception { if(localBridgeStarted.compareAndSet(false,true)){ - - localConnectionInfo=new ConnectionInfo(); - localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); - localClientId="NC_"+remoteBrokerName+"_inbound"+name; - localConnectionInfo.setClientId(localClientId); - localConnectionInfo.setUserName(userName); - localConnectionInfo.setPassword(password); - localBroker.oneway(localConnectionInfo); - - localSessionInfo=new SessionInfo(localConnectionInfo,1); - localBroker.oneway(localSessionInfo); - - log.info("Network connection between "+localBroker+" and "+remoteBroker+"("+remoteBrokerName - +") has been established."); - startedLatch.countDown(); - setupStaticDestinations(); + synchronized( this ) { + localConnectionInfo=new ConnectionInfo(); + localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); + localClientId="NC_"+remoteBrokerName+"_inbound"+name; + localConnectionInfo.setClientId(localClientId); + localConnectionInfo.setUserName(userName); + localConnectionInfo.setPassword(password); + localBroker.oneway(localConnectionInfo); + + localSessionInfo=new SessionInfo(localConnectionInfo,1); + localBroker.oneway(localSessionInfo); + + log.info("Network connection between "+localBroker+" and "+remoteBroker+"("+remoteBrokerName + +") has been established."); + + startedLatch.countDown(); + setupStaticDestinations(); + } } } protected void startRemoteBridge() throws Exception { - if(remoteBridgeStarted.compareAndSet(false,true)){ - - remoteConnectionInfo=new ConnectionInfo(); - remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); - remoteConnectionInfo.setClientId("NC_"+localBrokerName+"_outbound"+name); - remoteConnectionInfo.setUserName(userName); - remoteConnectionInfo.setPassword(password); - remoteBroker.oneway(remoteConnectionInfo); - - BrokerInfo brokerInfo=new BrokerInfo(); - brokerInfo.setBrokerName(localBrokerName); - remoteBroker.oneway(brokerInfo); - - SessionInfo remoteSessionInfo=new SessionInfo(remoteConnectionInfo,1); - remoteBroker.oneway(remoteSessionInfo); - - producerInfo=new ProducerInfo(remoteSessionInfo,1); - producerInfo.setResponseRequired(false); - remoteBroker.oneway(producerInfo); - - // Listen to consumer advisory messages on the remote broker to determine demand. - demandConsumerInfo=new ConsumerInfo(remoteSessionInfo,1); - demandConsumerInfo.setDispatchAsync(dispatchAsync); - demandConsumerInfo.setDestination(new ActiveMQTopic(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX - +destinationFilter)); - demandConsumerInfo.setPrefetchSize(prefetchSize); - remoteBroker.oneway(demandConsumerInfo); - //we want information about Destinations as well - ConsumerInfo destinationInfo = new ConsumerInfo(remoteSessionInfo,2); - destinationInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC); - destinationInfo.setPrefetchSize(prefetchSize); - remoteBroker.oneway(destinationInfo); - startedLatch.countDown(); + if(remoteBridgeStarted.compareAndSet(false,true)) { + + synchronized (this) { + + if( remoteConnectionInfo!=null ) { + remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand()); + } + + remoteConnectionInfo=new ConnectionInfo(); + remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); + remoteConnectionInfo.setClientId("NC_"+localBrokerName+"_outbound"+name); + remoteConnectionInfo.setUserName(userName); + remoteConnectionInfo.setPassword(password); + remoteBroker.oneway(remoteConnectionInfo); + + BrokerInfo brokerInfo=new BrokerInfo(); + brokerInfo.setBrokerName(localBrokerName); + remoteBroker.oneway(brokerInfo); + + SessionInfo remoteSessionInfo=new SessionInfo(remoteConnectionInfo,1); + remoteBroker.oneway(remoteSessionInfo); + + producerInfo=new ProducerInfo(remoteSessionInfo,1); + producerInfo.setResponseRequired(false); + remoteBroker.oneway(producerInfo); + + // Listen to consumer advisory messages on the remote broker to determine demand. + demandConsumerInfo=new ConsumerInfo(remoteSessionInfo,1); + demandConsumerInfo.setDispatchAsync(dispatchAsync); + demandConsumerInfo.setDestination(new ActiveMQTopic(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + +destinationFilter)); + demandConsumerInfo.setPrefetchSize(prefetchSize); + remoteBroker.oneway(demandConsumerInfo); + //we want information about Destinations as well + ConsumerInfo destinationInfo = new ConsumerInfo(remoteSessionInfo,2); + destinationInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC); + destinationInfo.setPrefetchSize(prefetchSize); + remoteBroker.oneway(destinationInfo); + startedLatch.countDown(); + + if (!disposed){ + triggerLocalStartBridge(); + } + + } } } public void stop() throws Exception { - shutDown = true; - doStop(); - } - - /** - * stop the bridge - * @throws Exception - */ - protected void doStop() throws Exception { log.debug(" stopping "+localBrokerName+ " bridge to " + remoteBrokerName + " is disposed already ? "+disposed); - if(!disposed){ - try{ - disposed=true; - - remoteBridgeStarted.set(false); - if(!shutDown){ - remoteBroker.oneway(new ShutdownInfo()); - if(localConnectionInfo!=null){ - localBroker.oneway(localConnectionInfo.createRemoveCommand()); - remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand()); - } - localBroker.oneway(new ShutdownInfo()); - } - localBroker.setTransportListener(null); - remoteBroker.setTransportListener(null); - }catch(IOException e){ - log.debug("Caught exception stopping",e); - }finally{ - ServiceStopper ss=new ServiceStopper(); - ss.stop(localBroker); - ss.stop(remoteBroker); - ss.throwFirstException(); - } - } + if (!disposed) { + try { + disposed = true; + + remoteBridgeStarted.set(false); + + localBroker.oneway(new ShutdownInfo()); + remoteBroker.oneway(new ShutdownInfo()); + + } catch (IOException e) { + log.debug("Caught exception stopping", e); + } finally { + ServiceStopper ss = new ServiceStopper(); + ss.stop(localBroker); + ss.stop(remoteBroker); + ss.throwFirstException(); + } + } log.debug(localBrokerName+ " bridge to " + remoteBrokerName + " stopped"); } - protected void doStopLocal(){ - try{ - if(!shutDown){ - if(localConnectionInfo!=null){ - localBroker.oneway(localConnectionInfo.createRemoveCommand()); - } - localBroker.oneway(new ShutdownInfo()); - } - localBroker.setTransportListener(null); - }catch(IOException e){ - log.debug("Caught exception stopping",e); - }finally{ - ServiceStopper ss=new ServiceStopper(); - ss.stop(localBroker); - localBridgeStarted.set(false); - } - } - protected void serviceRemoteException(Throwable error) { - log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),error); - ServiceSupport.dispose(this); + if( !disposed ) { + log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown due to a remote error: "+error); + log.debug("The remote Exception was: "+error, error); + new Thread() { + public void run() { + ServiceSupport.dispose(DemandForwardingBridgeSupport.this); + } + }.start(); + } } protected void serviceRemoteCommand(Command command) { @@ -336,7 +328,10 @@ demandConsumerDispatched=0; } }else if(command.isBrokerInfo()){ - serviceRemoteBrokerInfo(command); + + lastConnectSucceeded.set(true); + serviceRemoteBrokerInfo(command); + }else if(command.getClass() == ConnectionError.class ) { ConnectionError ce = (ConnectionError) command; serviceRemoteException(ce.getException()); @@ -344,6 +339,7 @@ switch(command.getDataStructureType()){ case KeepAliveInfo.DATA_STRUCTURE_TYPE: case WireFormatInfo.DATA_STRUCTURE_TYPE: + case ShutdownInfo.DATA_STRUCTURE_TYPE: break; default: log.warn("Unexpected remote command: "+command); @@ -413,7 +409,10 @@ ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination(); tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId()); } + destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(),getRemoteBrokerPath())); + + log.debug("Replying destination control command: "+destInfo); localBroker.oneway(destInfo); } @@ -424,8 +423,15 @@ } protected void serviceLocalException(Throwable error) { - log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),error); - ServiceSupport.dispose(this); + if( !disposed ) { + log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown due to a local error: "+error); + log.debug("The local Exception was:"+error,error); + new Thread() { + public void run() { + ServiceSupport.dispose(DemandForwardingBridgeSupport.this); + } + }.start(); + } } protected void addSubscription(DemandSubscription sub) throws IOException { @@ -502,16 +508,6 @@ remoteBroker.asyncRequest(message, callback); } - // Ack on every message since we don't know if the broker is blocked due to memory - // usage and is waiting for an Ack to un-block him. - - // Acking a range is more efficient, but also more prone to locking up a server - // Perhaps doing something like the following should be policy based. -// int dispatched = sub.incrementDispatched(); -// if(dispatched>(sub.getLocalInfo().getPrefetchSize()*.75)){ -// localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,dispatched)); -// sub.setDispatched(0); -// } } }else if(command.isBrokerInfo()){ serviceLocalBrokerInfo(command); @@ -521,8 +517,7 @@ // the local transport is just shutting down temporarily until the remote side // is restored. if( !remoteInterupted.get() ) { - shutDown = true; - doStop(); + stop(); } }else if(command.getClass() == ConnectionError.class ) { ConnectionError ce = (ConnectionError) command; @@ -695,21 +690,7 @@ public void setNetworkTTL(int networkTTL) { this.networkTTL=networkTTL; } - - /** - * @return Returns the shutDown. - */ - public boolean isShutDown() { - return shutDown; - } - - /** - * @param shutDown The shutDown to set. - */ - public void setShutDown(boolean shutDown) { - this.shutDown=shutDown; - } - + public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) { if(brokerPath!=null){ for(int i=0;i