activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r418600 - in /incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq: broker/AbstractConnection.java state/ConnectionState.java state/SessionState.java
Date Sun, 02 Jul 2006 14:31:48 GMT
Author: chirino
Date: Sun Jul  2 07:31:48 2006
New Revision: 418600

URL: http://svn.apache.org/viewvc?rev=418600&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQ-724

Modified:
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java?rev=418600&r1=418599&r2=418600&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
(original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
Sun Jul  2 07:31:48 2006
@@ -421,7 +421,11 @@
         if( ss == null )
             throw new IllegalStateException("Cannot add a producer to a session that had
not been registered: "+sessionId);
         broker.addProducer(cs.getContext(), info);
-        ss.addProducer(info);
+        try {
+            ss.addProducer(info);
+		} catch (IllegalStateException e) {
+			broker.removeProducer(cs.getContext(), info);
+		}
         return null;
     }
     
@@ -451,7 +455,12 @@
             throw new IllegalStateException("Cannot add a consumer to a session that had
not been registered: "+sessionId);
 
         broker.addConsumer(cs.getContext(), info);
-        ss.addConsumer(info);
+        try {
+			ss.addConsumer(info);
+		} catch (IllegalStateException e) {
+			broker.removeConsumer(cs.getContext(), info);
+		}
+        
         return null;
     }
     
@@ -476,8 +485,12 @@
         ConnectionId connectionId = info.getSessionId().getParentId();
         
         ConnectionState cs = lookupConnectionState(connectionId);
-        broker.addSession(cs.getContext(), info);
-        cs.addSession(info);
+    	broker.addSession(cs.getContext(), info);
+        try {
+            cs.addSession(info);
+		} catch (IllegalStateException e) {
+			broker.removeSession(cs.getContext(), info);
+		}
         return null;
     }
     
@@ -487,6 +500,10 @@
         
         ConnectionState cs = lookupConnectionState(connectionId);
         SessionState session = cs.getSessionState(id);
+        
+        // Don't let new consumers or producers get added while we are closing this down.
+        session.shutdown();
+        
         if( session == null )
             throw new IllegalStateException("Cannot remove session that had not been registered:
"+id);
         
@@ -543,6 +560,9 @@
     public Response processRemoveConnection(ConnectionId id)  {
         
         ConnectionState cs = lookupConnectionState(id);
+        
+        // Don't allow things to be added to the connection state while we are shutting down.
+        cs.shutdown();
         
         // Cascade the connection stop to the sessions.
         for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) {

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java?rev=418600&r1=418599&r2=418600&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java
(original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java
Sun Jul  2 07:31:48 2006
@@ -23,6 +23,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConnectionInfo;
@@ -37,6 +38,7 @@
     final ConnectionInfo info;
     private final ConcurrentHashMap sessions = new ConcurrentHashMap();
     private final List tempDestinations = Collections.synchronizedList(new ArrayList());
+    private final AtomicBoolean shutdown = new AtomicBoolean(false);
     
     public ConnectionState(ConnectionInfo info) {
         this.info = info;
@@ -49,10 +51,11 @@
     }
 
     public void addTempDestination(DestinationInfo info) {
+    	checkShutdown();
         tempDestinations.add(info);
     }
 
-    public void removeTempDestination(ActiveMQDestination destination) {
+	public void removeTempDestination(ActiveMQDestination destination) {
         for (Iterator iter = tempDestinations.iterator(); iter.hasNext();) {
             DestinationInfo di = (DestinationInfo) iter.next();
             if( di.getDestination().equals(destination) ) {
@@ -62,6 +65,7 @@
     }
 
     public void addSession(SessionInfo info) {
+    	checkShutdown();
         sessions.put(info.getSessionId(), new SessionState(info));            
     }        
     public SessionState removeSession(SessionId id) {
@@ -85,5 +89,19 @@
 
     public Collection getSessionStates() {
         return sessions.values();
-    }        
+    }
+    
+    private void checkShutdown() {
+		if( shutdown.get() )
+			throw new IllegalStateException("Disposed");
+	}
+    
+    public void shutdown() {
+    	if( shutdown.compareAndSet(false, true) ) {
+    		for (Iterator iter = sessions.values().iterator(); iter.hasNext();) {
+				SessionState ss = (SessionState) iter.next();
+				ss.shutdown();
+			}
+    	}
+    }
 }

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java?rev=418600&r1=418599&r2=418600&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
(original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
Sun Jul  2 07:31:48 2006
@@ -19,6 +19,7 @@
 
 import java.util.Collection;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
@@ -33,6 +34,7 @@
     
     public final ConcurrentHashMap producers = new ConcurrentHashMap();
     public final ConcurrentHashMap consumers = new ConcurrentHashMap();
+    private final AtomicBoolean shutdown = new AtomicBoolean(false);
     
     public SessionState(SessionInfo info) {
         this.info = info;
@@ -42,6 +44,7 @@
     }
     
     public void addProducer(ProducerInfo info) {
+    	checkShutdown();
         producers.put(info.getProducerId(), new ProducerState(info));            
     }        
     public ProducerState removeProducer(ProducerId id) {
@@ -49,6 +52,7 @@
     }
     
     public void addConsumer(ConsumerInfo info) {
+    	checkShutdown();
         consumers.put(info.getConsumerId(), new ConsumerState(info));            
     }        
     public ConsumerState removeConsumer(ConsumerId id) {
@@ -72,5 +76,15 @@
     
     public Collection getConsumerStates() {
         return consumers.values();
-    }        
+    }
+    
+    private void checkShutdown() {
+		if( shutdown.get() )
+			throw new IllegalStateException("Disposed");
+	}
+    
+    public void shutdown() {
+    	shutdown.set(false);
+    }
+
 }



Mime
View raw message