activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r471837 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/transport/ main/java/org/ap...
Date Mon, 06 Nov 2006 19:36:00 GMT
Author: chirino
Date: Mon Nov  6 11:35:59 2006
New Revision: 471837

URL: http://svn.apache.org/viewvc?view=rev&rev=471837
Log:
http://issues.apache.org/activemq/browse/AMQ-1026
Created a ReconnectTest that showed problems with the synchronization used when a client reconnects to a server via failover before the server detects the client failure.
- InactivityMonitor : Better syncronization so that an inactivty exception is only raised once.
- Connection: Added serviceExceptionAsync() method and change all methods that are dispatching to use this instead of serviceException() to avoid possible deadlock that can occur during connection shutdown.
- MockTransport: finer grained sychonization to avoid deadlocks.
- PrefetchSubscription: it is possible it will get duplicate acks on a failover reconnect


Added:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java?view=diff&rev=471837&r1=471836&r2=471837
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java Mon Nov  6 11:35:59 2006
@@ -71,6 +71,9 @@
 import org.apache.commons.logging.LogFactory;
 
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
 
 /**
@@ -93,23 +96,31 @@
     private boolean manageable;
 
     protected final ConcurrentHashMap localConnectionStates = new ConcurrentHashMap();
-	protected final Map brokerConnectionStates;
+    protected final Map brokerConnectionStates;
     
     private WireFormatInfo wireFormatInfo;    
-    protected boolean disposed=false;
+    protected final AtomicBoolean disposed=new AtomicBoolean(false);
     protected IOException transportException;
+    private CountDownLatch stopLatch = new CountDownLatch(1);
     
     static class ConnectionState extends org.apache.activemq.state.ConnectionState {
         private final ConnectionContext context;
+        AbstractConnection connection;
 
-        public ConnectionState(ConnectionInfo info, ConnectionContext context) {
+        public ConnectionState(ConnectionInfo info, ConnectionContext context, AbstractConnection connection) {
             super(info);
             this.context = context;
+            this.connection=connection;
         }
         
         public ConnectionContext getContext() {
             return context;
         }
+        
+        public AbstractConnection getConnection() {
+            return connection;
+        }
+        
     }
 
     
@@ -152,39 +163,68 @@
     }
 
     public void stop() throws Exception{
-        if(disposed)
-            return;
-        disposed=true;
-        
-        if( taskRunner!=null )
-            taskRunner.shutdown();
-        
-        //
-        // Remove all logical connection associated with this connection
-        // from the broker.
-        if(!broker.isStopped()){
-            ArrayList l=new ArrayList(localConnectionStates.keySet());
-            for(Iterator iter=l.iterator();iter.hasNext();){
-                ConnectionId connectionId=(ConnectionId) iter.next();
-                try{
-                    processRemoveConnection(connectionId);
-                }catch(Throwable ignore){}
-            }
-            if(brokerInfo!=null){
-                broker.removeBroker(this,brokerInfo);
-            }
+        if(disposed.compareAndSet(false, true)) {
+	        
+	        if( taskRunner!=null )
+	            taskRunner.shutdown();
+	        
+	        // Clear out the dispatch queue to release any memory that
+	        // is being held on to.
+	        dispatchQueue.clear();
+	        
+	        //
+	        // Remove all logical connection associated with this connection
+	        // from the broker.
+	        if(!broker.isStopped()){
+	            ArrayList l=new ArrayList(localConnectionStates.keySet());
+	            for(Iterator iter=l.iterator();iter.hasNext();){
+	                ConnectionId connectionId=(ConnectionId) iter.next();
+	                try{
+	                	log.debug("Cleaning up connection resources.");
+	                    processRemoveConnection(connectionId);
+	                }catch(Throwable ignore){
+	                	ignore.printStackTrace();
+	                }
+	            }
+	            if(brokerInfo!=null){
+	                broker.removeBroker(this,brokerInfo);
+	            }
+	        }
+			stopLatch.countDown();
         }
     }
     
     public void serviceTransportException(IOException e) {
-        if( !disposed ) {
-            transportException = e;	
+        if( !disposed.get() ) {
+            transportException = e; 
             if( transportLog.isDebugEnabled() )
                 transportLog.debug("Transport failed: "+e,e);
             ServiceSupport.dispose(this);
         }
     }
-        
+    
+    /**
+     * Calls the serviceException method in an async thread.  Since 
+     * handling a service exception closes a socket, we should not tie 
+     * up broker threads since client sockets may hang or cause deadlocks.
+     * 
+     * @param e
+     */
+	public void serviceExceptionAsync(final IOException e) {
+		new Thread("Async Exception Handler") {
+			public void run() {
+				serviceException(e);
+			}
+		}.start();
+	}
+
+	/**
+	 * Closes a clients connection due to a detected error.
+	 * 
+	 * Errors are ignored if: the client is closing or broker is closing.
+	 * Otherwise, the connection error transmitted to the client before stopping it's
+	 * transport.
+	 */
     public void serviceException(Throwable e) {
         // are we a transport exception such as not being able to dispatch
         // synchronously to a transport
@@ -195,9 +235,9 @@
         // Handle the case where the broker is stopped 
         // But the client is still connected.
         else if (e.getClass() == BrokerStoppedException.class ) {
-            if( !disposed ) {
+            if( !disposed.get() ) {
                 if( serviceLog.isDebugEnabled() )
-                	serviceLog.debug("Broker has been stopped.  Notifying client and closing his connection.");
+                    serviceLog.debug("Broker has been stopped.  Notifying client and closing his connection.");
                 
                 ConnectionError ce = new ConnectionError();
                 ce.setException(e);
@@ -205,21 +245,21 @@
                 
                 // Wait a little bit to try to get the output buffer to flush the exption notification to the client.
                 try {
-					Thread.sleep(500);
-				} catch (InterruptedException ie) {
-					Thread.currentThread().interrupt();
-				}
+                    Thread.sleep(500);
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                }
                 
-				// Worst case is we just kill the connection before the notification gets to him.
+                // Worst case is we just kill the connection before the notification gets to him.
                 ServiceSupport.dispose(this);
             }
         }
         
-        else if( !disposed && !inServiceException ) {
+        else if( !disposed.get() && !inServiceException ) {
             inServiceException = true;
                 try {
                 if( serviceLog.isDebugEnabled() )
-                	serviceLog.debug("Async error occurred: "+e,e);
+                    serviceLog.debug("Async error occurred: "+e,e);
                 ConnectionError ce = new ConnectionError();
                 ce.setException(e);
                 dispatchAsync(ce);
@@ -238,8 +278,8 @@
             response = command.visit(this);
         } catch ( Throwable e ) {
             if( responseRequired ) {
-            	if( serviceLog.isDebugEnabled() && e.getClass()!=BrokerStoppedException.class )
-            		serviceLog.debug("Error occured while processing sync command: "+e,e);
+                if( serviceLog.isDebugEnabled() && e.getClass()!=BrokerStoppedException.class )
+                    serviceLog.debug("Error occured while processing sync command: "+e,e);
                 response = new ExceptionResponse(e);
             } else {
                 serviceException(e);
@@ -312,7 +352,7 @@
         
         // Avoid replaying dup commands
         if( cs.getTransactionState(info.getTransactionId())==null ) {
-        	cs.addTransactionState(info.getTransactionId());
+            cs.addTransactionState(info.getTransactionId());
             broker.beginTransaction(context, info.getTransactionId());
         }
         return null;
@@ -371,7 +411,7 @@
         }
         
         cs.removeTransactionState(info.getTransactionId());
-    	broker.commitTransaction(context, info.getTransactionId(), false);
+        broker.commitTransaction(context, info.getTransactionId(), false);
         return null;
     }
 
@@ -383,7 +423,7 @@
         }
         
         cs.removeTransactionState(info.getTransactionId());
-    	broker.rollbackTransaction(context, info.getTransactionId());
+        broker.rollbackTransaction(context, info.getTransactionId());
         return null;
     }
     
@@ -409,7 +449,7 @@
 
 
     public Response processMessage(Message messageSend) throws Exception {
-    	
+        
         ProducerId producerId = messageSend.getProducerId();
         ConnectionState state = lookupConnectionState(producerId);
         ConnectionContext context = state.getContext();
@@ -418,21 +458,21 @@
         // then, finde the associated producer state so we can do some dup detection.
         ProducerState producerState=null;        
         if( messageSend.getMessageId().getProducerId().equals( messageSend.getProducerId() ) ) {
-	        SessionState ss = state.getSessionState(producerId.getParentId());
-	        if( ss == null )
-	            throw new IllegalStateException("Cannot send from a session that had not been registered: "+producerId.getParentId());
-	        producerState = ss.getProducerState(producerId); 
+            SessionState ss = state.getSessionState(producerId.getParentId());
+            if( ss == null )
+                throw new IllegalStateException("Cannot send from a session that had not been registered: "+producerId.getParentId());
+            producerState = ss.getProducerState(producerId); 
         }
         
         if( producerState == null ) {
             broker.send(context, messageSend);
         } else {
-	        // Avoid Dups.
-	        long seq = messageSend.getMessageId().getProducerSequenceId();
-	        if( seq > producerState.getLastSequenceId() ) {
-	        	producerState.setLastSequenceId(seq);
-	            broker.send(context, messageSend);
-	        }
+            // Avoid Dups.
+            long seq = messageSend.getMessageId().getProducerSequenceId();
+            if( seq > producerState.getLastSequenceId() ) {
+                producerState.setLastSequenceId(seq);
+                broker.send(context, messageSend);
+            }
         }
         
         return null;
@@ -453,12 +493,12 @@
     }
 
     public Response processBrokerInfo(BrokerInfo info) {
-    	
-    	// We only expect to get one broker info command per connection
-    	if( this.brokerInfo!=null ) {
-    		log.warn("Unexpected extra broker info command received: "+info);
-    	}
-    	
+        
+        // We only expect to get one broker info command per connection
+        if( this.brokerInfo!=null ) {
+            log.warn("Unexpected extra broker info command received: "+info);
+        }
+        
         this.brokerInfo = info;
         broker.addBroker(this, info);
         return null;
@@ -494,12 +534,12 @@
 
         // Avoid replaying dup commands
         if( !ss.getProducerIds().contains(info.getProducerId()) ) {
-	        broker.addProducer(cs.getContext(), info);
-	        try {
-	            ss.addProducer(info);
-			} catch (IllegalStateException e) {
-				broker.removeProducer(cs.getContext(), info);
-			}
+            broker.addProducer(cs.getContext(), info);
+            try {
+                ss.addProducer(info);
+            } catch (IllegalStateException e) {
+                broker.removeProducer(cs.getContext(), info);
+            }
         }
         return null;
     }
@@ -531,12 +571,12 @@
 
         // Avoid replaying dup commands
         if( !ss.getConsumerIds().contains(info.getConsumerId()) ) {
-	        broker.addConsumer(cs.getContext(), info);
-	        try {
-				ss.addConsumer(info);
-			} catch (IllegalStateException e) {
-				broker.removeConsumer(cs.getContext(), info);
-			}
+            broker.addConsumer(cs.getContext(), info);
+            try {
+                ss.addConsumer(info);
+            } catch (IllegalStateException e) {
+                broker.removeConsumer(cs.getContext(), info);
+            }
         }
         
         return null;
@@ -565,12 +605,12 @@
         
         // Avoid replaying dup commands
         if( !cs.getSessionIds().contains(info.getSessionId()) ) {
-	    	broker.addSession(cs.getContext(), info);
-	        try {
-	            cs.addSession(info);
-			} catch (IllegalStateException e) {
-				broker.removeSession(cs.getContext(), info);
-			}
+            broker.addSession(cs.getContext(), info);
+            try {
+                cs.addSession(info);
+            } catch (IllegalStateException e) {
+                broker.removeSession(cs.getContext(), info);
+            }
         }
         return null;
     }
@@ -613,34 +653,51 @@
     }
     
     public Response processAddConnection(ConnectionInfo info) throws Exception {
+
     	ConnectionState state = (ConnectionState) brokerConnectionStates.get(info.getConnectionId());
-    	if( state == null ) {
-            // Setup the context.
-            ConnectionContext context = new ConnectionContext(info);
-            context.setConnection(this);
-            context.setBroker(broker);
-            context.setConnector(connector);
-            context.setTransactions(new ConcurrentHashMap());
-            context.setWireFormatInfo(wireFormatInfo);
-            this.manageable = info.isManageable();
-            context.incrementReference();
-	        
-	        state = new ConnectionState(info, context);
-	        brokerConnectionStates.put(info.getConnectionId(), state);
-	        localConnectionStates.put(info.getConnectionId(), state);	        
-	        
-	        broker.addConnection(context, info);
-	        if (info.isManageable() && broker.isFaultTolerantConfiguration()){
-	            //send ConnectionCommand
-	            ConnectionControl command = new ConnectionControl();
-	            command.setFaultTolerant(broker.isFaultTolerantConfiguration());
-	            dispatchAsync(command);
-	        }
-    	} else {
-    		// We are a concurrent connection... it must be client reconnect.
-    		localConnectionStates.put(info.getConnectionId(), state);
-    		state.getContext().incrementReference();
+    	
+    	if( state !=null ) {
+    		// ConnectionInfo replay??  Chances are that it's a client reconnecting,
+    		// and we have not detected that that old connection died.. Kill the old connection
+    		// to make sure our state is in sync with the client.
+    		if( this != state.getConnection() ) {
+    			log.debug("Killing previous stale connection: "+state.getConnection());
+    			state.getConnection().stop();
+    			if( !state.getConnection().stopLatch.await(15, TimeUnit.SECONDS) ) {
+    				throw new Exception("Previous connection could not be clean up.");
+    			}
+    		}
     	}
+    	
+		log.debug("Setting up new connection: "+this);
+
+    	
+        // Setup the context.
+        String clientId = info.getClientId();
+        ConnectionContext context = new ConnectionContext();
+        context.setConnection(this);
+        context.setBroker(broker);
+        context.setConnector(connector);
+        context.setTransactions(new ConcurrentHashMap());
+        context.setClientId(clientId);
+        context.setUserName(info.getUserName());
+        context.setConnectionId(info.getConnectionId());
+        context.setWireFormatInfo(wireFormatInfo);
+        context.incrementReference();
+        this.manageable = info.isManageable();
+        
+        state = new ConnectionState(info, context, this);
+        brokerConnectionStates.put(info.getConnectionId(), state);
+        localConnectionStates.put(info.getConnectionId(), state);           
+        
+        broker.addConnection(context, info);
+        if (info.isManageable() && broker.isFaultTolerantConfiguration()){
+            //send ConnectionCommand
+            ConnectionControl command = new ConnectionControl();
+            command.setFaultTolerant(broker.isFaultTolerantConfiguration());
+            dispatchAsync(command);
+        }
+
         return null;
     }
     
@@ -680,11 +737,11 @@
         }
         ConnectionState state = (ConnectionState) localConnectionStates.remove(id);
         if( state != null ) {
-        	// If we are the last reference, we should remove the state
-        	// from the broker.
-        	if( state.getContext().decrementReference() == 0 ){ 
-        		brokerConnectionStates.remove(id);
-        	}
+            // If we are the last reference, we should remove the state
+            // from the broker.
+            if( state.getContext().decrementReference() == 0 ){ 
+                brokerConnectionStates.remove(id);
+            }
         }
         return null;
     }
@@ -758,14 +815,14 @@
      * @return true if the Connection is connected
      */
     public boolean isConnected() {
-        return !disposed;
+        return !disposed.get();
     }
     
     /**
      * @return true if the Connection is active
      */
     public boolean isActive() {
-        return !disposed;
+        return !disposed.get();
     }
     
     

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java?view=diff&rev=471837&r1=471836&r2=471837
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java Mon Nov  6 11:35:59 2006
@@ -17,6 +17,8 @@
  */
 package org.apache.activemq.broker;
 
+import java.io.IOException;
+
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.region.ConnectionStatistics;
 import org.apache.activemq.command.Command;
@@ -101,5 +103,7 @@
      * @return the source address for this connection
      */
 	public String getRemoteAddress();
+
+	public void serviceExceptionAsync(IOException e);
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=471837&r1=471836&r2=471837
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Mon Nov  6 11:35:59 2006
@@ -29,6 +29,7 @@
 import org.apache.commons.logging.LogFactory;
 
 import java.io.IOException;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * @version $Revision: 1.8 $
@@ -46,7 +47,8 @@
     private boolean pendingStop;
     private long timeStamp = 0;
     private MasterBroker masterBroker; //used if this connection is used by a Slave
-
+    private AtomicBoolean stopped = new AtomicBoolean(false);
+    
     /**
      * @param connector
      * @param transport
@@ -93,34 +95,41 @@
         }
     }
 
-    public synchronized void stop() throws Exception {
+    public void stop() throws Exception {
         // If we're in the middle of starting
         // then go no further... for now.
-        pendingStop = true;
-        if (starting) {
-            log.debug("stop() called in the middle of start(). Delaying...");
-            return;
-        }
-
-        connector.onStopped(this);
-        try {
-            if (masterBroker != null) {
-                masterBroker.stop();
-            }
-
-            // If the transport has not failed yet,
-            // notify the peer that we are doing a normal shutdown.
-            if (transportException == null) {
-                transport.oneway(new ShutdownInfo());
-            }
-        }
-        catch (Exception ignore) {
-            //ignore.printStackTrace();
-        }
-
-        transport.stop();
-        active = false;
-        super.stop();
+        synchronized(this) { 
+	        pendingStop = true;
+	        if (starting) {
+	            log.debug("stop() called in the middle of start(). Delaying...");
+	            return;
+	        }
+        }
+
+    	
+    	if( stopped.compareAndSet(false, true) ) {
+
+    		log.debug("Stopping connection: "+transport.getRemoteAddress());
+	        connector.onStopped(this);
+	        try {
+	            if (masterBroker != null){
+	                masterBroker.stop();
+	            }
+	            
+	            // If the transport has not failed yet,
+	            // notify the peer that we are doing a normal shutdown.
+	            if( transportException == null ) {
+	            	transport.oneway(new ShutdownInfo());
+	            }
+	        } catch (Exception ignore) {
+	            //ignore.printStackTrace();
+	        }
+	
+	        transport.stop();
+	        active = false;
+	        super.stop();
+    		log.debug("Stopped connection: "+transport.getRemoteAddress());
+    	}
     }
 
 
@@ -269,7 +278,7 @@
             getStatistics().onCommand(command);
         }
         catch (IOException e) {
-            serviceException(e);
+            serviceExceptionAsync(e);
         }
         finally {
             setMarkedCandidate(false);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?view=diff&rev=471837&r1=471836&r2=471837
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Mon Nov  6 11:35:59 2006
@@ -242,8 +242,13 @@
                 topics.put(key,view);
             }
         }
-        registeredMBeans.add(key);
-        mbeanServer.registerMBean(view,key);
+        try {
+            mbeanServer.registerMBean(view,key);
+            registeredMBeans.add(key);
+        } catch (Throwable e) {
+            log.warn("Failed to register MBean: "+key);
+            log.debug("Failure reason: "+e,e);
+        }            
     }
 
     protected void unregisterDestination(ObjectName key) throws Exception{
@@ -252,7 +257,12 @@
         temporaryQueues.remove(key);
         temporaryTopics.remove(key);
         if(registeredMBeans.remove(key)){
-            mbeanServer.unregisterMBean(key);
+            try {
+        		mbeanServer.unregisterMBean(key);
+            } catch (Throwable e) {
+                log.warn("Failed to unregister MBean: "+key);
+                log.debug("Failure reason: "+e,e);
+            }            
         }
     }
 
@@ -279,7 +289,7 @@
                             registeredMBeans.remove(inactiveName);
                             mbeanServer.unregisterMBean(inactiveName);
                         }
-                    }catch(Exception e){
+                    }catch(Throwable e){
                         log.error("Unable to unregister inactive durable subscriber: "+subscriptionKey,e);
                     }
                 }else{
@@ -287,8 +297,15 @@
                 }
             }
         }
-        registeredMBeans.add(key);
-        mbeanServer.registerMBean(view,key);
+        
+        try {
+            mbeanServer.registerMBean(view,key);
+            registeredMBeans.add(key);
+        } catch (Throwable e) {
+            log.warn("Failed to register MBean: "+key);
+            log.debug("Failure reason: "+e,e);
+        }
+        
     }
 
     protected void unregisterSubscription(ObjectName key) throws Exception{
@@ -298,7 +315,12 @@
         temporaryQueueSubscribers.remove(key);
         temporaryTopicSubscribers.remove(key);
         if(registeredMBeans.remove(key)){
-            mbeanServer.unregisterMBean(key);
+            try {
+                mbeanServer.unregisterMBean(key);
+            } catch (Throwable e) {
+                log.warn("Failed to unregister MBean: "+key);
+                log.debug("Failure reason: "+e,e);
+            }
         }
         DurableSubscriptionView view=(DurableSubscriptionView) durableTopicSubscribers.remove(key);
         if(view!=null){
@@ -346,8 +368,15 @@
                             +","+"Type=Subscription,"+"active=false,"+"name="
                             +JMXSupport.encodeObjectNamePart(key.toString())+"");
             SubscriptionView view=new InactiveDurableSubscriptionView(this,key.getClientId(),info);
-            registeredMBeans.add(objectName);
-            mbeanServer.registerMBean(view,objectName);
+            
+            try {
+                mbeanServer.registerMBean(view,objectName);
+                registeredMBeans.add(objectName);
+            } catch (Throwable e) {
+                log.warn("Failed to register MBean: "+key);
+                log.debug("Failure reason: "+e,e);
+            }            
+            
             inactiveDurableTopicSubscribers.put(objectName,view);
             subscriptionKeys.put(key,objectName);
         }catch(Exception e){

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java?view=diff&rev=471837&r1=471836&r2=471837
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java Mon Nov  6 11:35:59 2006
@@ -46,16 +46,18 @@
     private final MBeanServer server;
     private final ObjectName connectorName;
     private ConnectionViewMBean mbean;
-    private ObjectName name;
-    private String connectionId;
+
+    private ObjectName byClientIdName;
+	private ObjectName byAddressName;
 
     public ManagedTransportConnection(TransportConnector connector, Transport transport, Broker broker, TaskRunnerFactory factory, MBeanServer server,
-            ObjectName connectorName, String connectionId) throws IOException {
+            ObjectName connectorName) throws IOException {
         super(connector, transport, broker, factory);
         this.server = server;
         this.connectorName = connectorName;
         this.mbean = new ConnectionView(this);
-        setConnectionId(connectionId);
+        byAddressName = createByAddressObjectName("address", transport.getRemoteAddress());
+        registerMBean(byAddressName);
     }
 
     public synchronized void stop() throws Exception {
@@ -63,60 +65,79 @@
             setPendingStop(true);
             return;
         }
-        unregisterMBean();
+    	synchronized(this) {
+	        unregisterMBean(byClientIdName);
+	        unregisterMBean(byAddressName);
+	        byClientIdName=null;
+	        byAddressName=null;
+    	}
         super.stop();
     }
 
-    public String getConnectionId() {
-        return connectionId;
-    }
-
     /**
      * Sets the connection ID of this connection. On startup this connection ID
      * is set to an incrementing counter; once the client registers it is set to
      * the clientID of the JMS client.
      */
     public void setConnectionId(String connectionId) throws IOException {
-        this.connectionId = connectionId;
-        unregisterMBean();
-        name = createObjectName();
-        registerMBean();
     }
 
     public Response processAddConnection(ConnectionInfo info) throws Exception {
         Response answer = super.processAddConnection(info);
         String clientId = info.getClientId();
         if (clientId != null) {
-            // lets update the MBean name
-            setConnectionId(clientId);
+            if(byClientIdName==null) {
+    	        byClientIdName = createByClientIdObjectName(clientId);
+    	        registerMBean(byClientIdName);
+            }
         }
         return answer;
     }
 
     // Implementation methods
     // -------------------------------------------------------------------------
-    protected void registerMBean() throws IOException {
-        try {
-            server.registerMBean(mbean, name);
-        }
-        catch (Throwable e) {
-            throw IOExceptionSupport.create(e);
-        }
-
+    protected void registerMBean(ObjectName name) {
+    	if( name!=null ) {
+	        try {
+	            server.registerMBean(mbean, name);
+	        } catch (Throwable e) {
+	            log.warn("Failed to register MBean: "+name);
+	            log.debug("Failure reason: "+e,e);
+	        }
+    	}
     }
 
-    protected void unregisterMBean() {
+    protected void unregisterMBean(ObjectName name) {
         if (name != null) {
             try {
                 server.unregisterMBean(name);
             }
             catch (Throwable e) {
                 log.warn("Failed to unregister mbean: " + name);
+                log.debug("Failure reason: "+e,e);
             }
         }
     }
 
-    protected ObjectName createObjectName() throws IOException {
+    protected ObjectName createByAddressObjectName(String type, String value) throws IOException {
+        // Build the object name for the destination
+        Hashtable map = connectorName.getKeyPropertyList();
+        try {
+            return new ObjectName(
+            		connectorName.getDomain()+":"+
+            		"BrokerName="+JMXSupport.encodeObjectNamePart((String) map.get("BrokerName"))+","+
+            		"Type=Connection,"+
+                    "ConnectorName="+JMXSupport.encodeObjectNamePart((String) map.get("ConnectorName"))+","+
+            		"ViewType="+JMXSupport.encodeObjectNamePart(type)+","+
+            		"Name="+JMXSupport.encodeObjectNamePart(value)
+            		);
+        }
+        catch (Throwable e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
+    
+    protected ObjectName createByClientIdObjectName(String value) throws IOException {
         // Build the object name for the destination
         Hashtable map = connectorName.getKeyPropertyList();
         try {
@@ -125,11 +146,13 @@
             		"BrokerName="+JMXSupport.encodeObjectNamePart((String) map.get("BrokerName"))+","+
             		"Type=Connection,"+
                     "ConnectorName="+JMXSupport.encodeObjectNamePart((String) map.get("ConnectorName"))+","+
-            		"Connection="+JMXSupport.encodeObjectNamePart(connectionId)
+                	"Connection="+JMXSupport.encodeObjectNamePart(value)
             		);
         }
         catch (Throwable e) {
             throw IOExceptionSupport.create(e);
         }
     }
+
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java?view=diff&rev=471837&r1=471836&r2=471837
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java Mon Nov  6 11:35:59 2006
@@ -53,8 +53,7 @@
     }
 
     protected Connection createConnection(Transport transport) throws IOException {
-        String connectionId = "" + getNextConnectionId();
-        return new ManagedTransportConnection(this, transport, getBroker(), isDisableAsyncDispatch() ? null : getTaskRunnerFactory(), mbeanServer, connectorName, connectionId);
+        return new ManagedTransportConnection(this, transport, getBroker(), isDisableAsyncDispatch() ? null : getTaskRunnerFactory(), mbeanServer, connectorName);
     }
 
     protected static synchronized long getNextConnectionId() {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=471837&r1=471836&r2=471837
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Nov  6 11:35:59 2006
@@ -20,12 +20,12 @@
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.LinkedList;
+
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
-import org.apache.activemq.ActiveMQConnectionFactory;
+
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
@@ -250,7 +250,7 @@
         if( isSlaveBroker() ) {
         	throw new JMSException("Slave broker out of sync with master: Acknowledgment ("+ack+") was not in the dispatch list: "+dispatched);
         } else {
-        	throw new JMSException("Invalid acknowledgment: "+ack);
+        	log.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "+ack);
         }
     }
 
@@ -415,7 +415,7 @@
             try{
                 dispatchMatched();
             }catch(IOException e){
-                context.getConnection().serviceException(e);
+                context.getConnection().serviceExceptionAsync(e);
             }
         }
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?view=diff&rev=471837&r1=471836&r2=471837
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java Mon Nov  6 11:35:59 2006
@@ -38,7 +38,7 @@
     
     private WireFormatInfo localWireFormatInfo;
     private WireFormatInfo remoteWireFormatInfo;
-    private boolean monitorStarted=false;
+    private final AtomicBoolean monitorStarted= new AtomicBoolean(false);
 
     private final AtomicBoolean commandSent=new AtomicBoolean(false);
     private final AtomicBoolean inSend=new AtomicBoolean(false);
@@ -145,13 +145,15 @@
     }
     
     public void onException(IOException error) {
-        stopMonitorThreads();
-        transportListener.onException(error);
+    	if( monitorStarted.get() ) {
+	        stopMonitorThreads();
+	        getTransportListener().onException(error);
+    	}
     }
     
     
     synchronized private void startMonitorThreads() throws IOException {
-        if( monitorStarted ) 
+        if( monitorStarted.get() ) 
             return;
         if( localWireFormatInfo == null )
             return;
@@ -160,9 +162,9 @@
         
         long l = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
         if( l > 0 ) {
+            monitorStarted.set(true);        
             Scheduler.executePeriodically(writeChecker, l/2);
             Scheduler.executePeriodically(readChecker, l);
-            monitorStarted=true;        
         }
     }
     
@@ -170,10 +172,9 @@
      * 
      */
     synchronized private void stopMonitorThreads() {
-        if( monitorStarted ) {
+        if( monitorStarted.compareAndSet(true, false) ) {
             Scheduler.cancel(readChecker);
             Scheduler.cancel(writeChecker);
-            monitorStarted=false;
         }
     }
     

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java?view=diff&rev=471837&r1=471836&r2=471837
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java Mon Nov  6 11:35:59 2006
@@ -44,9 +44,9 @@
     synchronized public void setTransportListener(TransportListener channelListener) {
         this.transportListener = channelListener;
         if (channelListener == null)
-            next.setTransportListener(null);
+            getNext().setTransportListener(null);
         else
-            next.setTransportListener(this);
+            getNext().setTransportListener(this);
     }
 
 
@@ -55,26 +55,26 @@
      * @throws IOException if the next channel has not been set.
      */
     public void start() throws Exception {
-        if( next == null )
+        if( getNext() == null )
             throw new IOException("The next channel has not been set.");
         if( transportListener == null )
             throw new IOException("The command listener has not been set.");
-        next.start();
+        getNext().start();
     }
 
     /**
      * @see org.apache.activemq.Service#stop()
      */
     public void stop() throws Exception {
-        next.stop();
+        getNext().stop();
     }    
 
-    synchronized public void onCommand(Object command) {
-        transportListener.onCommand(command);
+    public void onCommand(Object command) {
+        getTransportListener().onCommand(command);
     }
 
     /**
-     * @return Returns the next.
+     * @return Returns the getNext().
      */
     synchronized public Transport getNext() {
         return next;
@@ -87,49 +87,49 @@
         return transportListener;
     }
     
-    synchronized public String toString() {
-        return next.toString();
+    public String toString() {
+        return getNext().toString();
     }
 
-    synchronized public void oneway(Object command) throws IOException {
-        next.oneway(command);
+    public void oneway(Object command) throws IOException {
+        getNext().oneway(command);
     }
 
-    synchronized public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
-        return next.asyncRequest(command, null);
+    public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
+        return getNext().asyncRequest(command, null);
     }
 
-    synchronized public Object request(Object command) throws IOException {
-        return next.request(command);
+    public Object request(Object command) throws IOException {
+        return getNext().request(command);
     }
     
     public Object request(Object command,int timeout) throws IOException {
-        return next.request(command, timeout);
+        return getNext().request(command, timeout);
     }
 
-    synchronized public void onException(IOException error) {
-        transportListener.onException(error);
+    public void onException(IOException error) {
+        getTransportListener().onException(error);
     }
 
-    synchronized public Object narrow(Class target) {
+    public Object narrow(Class target) {
         if( target.isAssignableFrom(getClass()) ) {
             return this;
         }
-        return next.narrow(target);
+        return getNext().narrow(target);
     }
 
     synchronized public void setNext(Transport next) {
         this.next = next;
     }
 
-    synchronized public void install(TransportFilter filter) {
+    public void install(TransportFilter filter) {
         filter.setTransportListener(this);
         getNext().setTransportListener(filter);
         setNext(filter);
     }
 
 	public String getRemoteAddress() {
-		return next.getRemoteAddress();
+		return getNext().getRemoteAddress();
 	}  
     
-}
\ No newline at end of file
+}

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java?view=auto&rev=471837
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java Mon Nov  6 11:35:59 2006
@@ -0,0 +1,224 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.failover;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.DeliveryMode;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.mock.MockTransport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ * @version $Revision: 1.1 $
+ */
+public class ReconnectTest extends TestCase {
+    
+    protected static final Log log = LogFactory.getLog(ReconnectTest.class);
+	public static final int MESSAGES_PER_ITTERATION = 10;
+	public static final int WORKER_COUNT = 10;
+	private BrokerService bs;
+	private URI tcpUri;
+	private AtomicInteger interruptedCount = new AtomicInteger();
+	private Worker[] workers;
+
+	class Worker implements Runnable, ExceptionListener {
+		
+		private ActiveMQConnection connection;
+		private AtomicBoolean stop=new AtomicBoolean(false);
+		public AtomicInteger iterations = new AtomicInteger();
+		public CountDownLatch stopped = new CountDownLatch(1);
+		private Throwable error;
+		
+		public Worker() throws URISyntaxException, JMSException {
+			URI uri = new URI("failover://(mock://("+tcpUri+"))");
+			ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
+			connection = (ActiveMQConnection)factory.createConnection();
+			connection.setExceptionListener(this);
+			connection.addTransportListener(new TransportListener() {
+				public void onCommand(Object command) {
+				}
+				public void onException(IOException error) {
+					setError(error);
+				}
+				public void transportInterupted() {
+					interruptedCount.incrementAndGet();
+				}
+				public void transportResumed() {
+				}});
+			connection.start();
+		}
+		
+	    public void failConnection() {
+			MockTransport mockTransport = (MockTransport)connection.getTransportChannel().narrow(MockTransport.class);
+	    	mockTransport.onException(new IOException("Simulated error"));
+	    }
+	    
+	    public void start() {
+	    	new Thread(this).start();
+	    }
+	    public void stop() {
+	    	stop.set(true);
+	    	try {
+				if( !stopped.await(5, TimeUnit.SECONDS) ) {
+					connection.close();
+					stopped.await();
+				} else {
+					connection.close();
+				}
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+	    }
+	    
+	    public void run() {
+	    	try {
+	    		ActiveMQQueue queue = new ActiveMQQueue("FOO");
+				Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+				MessageConsumer consumer = session.createConsumer(queue);
+				MessageProducer producer = session.createProducer(queue);
+				producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+				while( !stop.get() ) {
+					for( int i=0; i < MESSAGES_PER_ITTERATION; i++) {
+						producer.send(session.createTextMessage("TEST:"+i));
+					}
+					for( int i=0; i < MESSAGES_PER_ITTERATION; i++) {
+						consumer.receive();
+					}
+					iterations.incrementAndGet();
+				}
+				session.close();
+			} catch (JMSException e) {
+				setError(e);
+			} finally {
+				stopped.countDown();
+			}
+	    }
+
+		public void onException(JMSException error) {
+			setError(error);
+			stop();
+		}
+
+
+		public synchronized Throwable getError() {
+			return error;
+		}
+		public synchronized void setError(Throwable error) {
+			this.error = error;
+		}
+
+		public synchronized void assertNoErrors() {
+			if( error !=null ) {
+				error.printStackTrace();
+				fail("Got Exception: "+error);
+			}
+		}
+		
+	}
+	
+    public void testReconnects() throws Exception {
+    	
+    	for( int k=1; k < 5; k++ ) {
+    		
+        	System.out.println("Test run: "+k);
+    		
+    		// Wait for at least one iteration to occur...
+        	for (int i=0; i < WORKER_COUNT; i++) {
+        		for( int j=0; workers[i].iterations.get() == 0 && j < 5; j++ ) {
+        			workers[i].assertNoErrors();
+        			System.out.println("Waiting for worker "+i+" to finish an iteration.");
+        			Thread.sleep(1000);
+        		}
+        		assertTrue("Worker "+i+" never completed an interation.", workers[i].iterations.get()!=0);
+    			workers[i].assertNoErrors();
+        	}
+    		
+        	System.out.println("Simulating transport error to cause reconnect.");
+        	
+        	// Simulate a transport failure.
+        	for (int i=0; i < WORKER_COUNT; i++) {
+        		workers[i].failConnection();
+        	}
+        	
+    		// Wait for the connections to get interrupted...
+        	while ( interruptedCount.get() < WORKER_COUNT ) {
+    			System.out.println("Waiting for connections to get interrupted.. at: "+interruptedCount.get());
+    			Thread.sleep(1000);
+        	}
+
+        	// let things stablize..
+			System.out.println("Pausing before starting next iterations...");
+        	Thread.sleep(1000);
+
+        	// Reset the counters..
+        	interruptedCount.set(0);
+        	for (int i=0; i < WORKER_COUNT; i++) {
+        		workers[i].iterations.set(0);
+        	}
+
+    	}
+    	
+    }
+
+    protected void setUp() throws Exception {
+    	bs = new BrokerService();
+    	bs.setPersistent(false);
+    	bs.setUseJmx(true);
+    	TransportConnector connector = bs.addConnector("tcp://localhost:0");
+    	bs.start();
+    	tcpUri = connector.getConnectUri();
+    	
+    	workers = new Worker[WORKER_COUNT];
+    	for (int i=0; i < WORKER_COUNT; i++) {
+    		workers[i] = new Worker();
+    		workers[i].start();
+    	}
+    	    	
+    }
+
+    protected void tearDown() throws Exception {
+    	for (int i=0; i < WORKER_COUNT; i++) {
+    		workers[i].stop();
+    	}
+        new ServiceStopper().stop(bs);
+    }
+
+}



Mime
View raw message