activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r511089 - in /activemq/branches/activemq-4.1: ./ activemq-core/src/main/java/org/apache/activemq/broker/ activemq-core/src/main/java/org/apache/activemq/broker/region/ activemq-core/src/main/java/org/apache/activemq/memory/
Date Fri, 23 Feb 2007 20:25:31 GMT
Author: chirino
Date: Fri Feb 23 12:25:30 2007
New Revision: 511089

URL: http://svn.apache.org/viewvc?view=rev&rev=511089
Log:
 r245@34:  chirino | 2007-02-23 14:49:44 -0500
 We now can shutdown a connection that is blocked on send due to queue limits.
 Stats tracking code added recently was causing a NPE at times.  Added a gaurd against the
NPE
 
 

Modified:
    activemq/branches/activemq-4.1/   (props changed)
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java

Propchange: activemq/branches/activemq-4.1/
------------------------------------------------------------------------------
--- svk:merge (original)
+++ svk:merge Fri Feb 23 12:25:30 2007
@@ -1 +1 @@
-635f1f41-eb29-0410-ac9d-be9e2c357fdd:/local/amq-4.1-port:244
+635f1f41-eb29-0410-ac9d-be9e2c357fdd:/local/amq-4.1-port:245

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java?view=diff&rev=511089&r1=511088&r2=511089
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
Fri Feb 23 12:25:30 2007
@@ -18,6 +18,7 @@
 package org.apache.activemq.broker;
 
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.broker.region.MessageReference;
@@ -56,6 +57,7 @@
     private AtomicInteger referenceCounter = new AtomicInteger();
     private boolean dontSendReponse;
     private boolean networkConnection;
+    private final AtomicBoolean stopping = new AtomicBoolean();
     
     private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
     
@@ -262,6 +264,10 @@
 
 	public synchronized void setNetworkConnection(boolean networkConnection) {
 		this.networkConnection = networkConnection;
+	}
+
+	public AtomicBoolean getStopping() {
+		return stopping;
 	}
 
 }

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=511089&r1=511088&r2=511089
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Fri Feb 23 12:25:30 2007
@@ -920,6 +920,16 @@
 	
 	        if(disposed.compareAndSet(false, true)) {
 		        
+
+               // Let all the connection contexts know we are shutting down
+               // so that in progress operations can notice and unblock.
+                ArrayList l=new ArrayList(localConnectionStates.values());
+                for(Iterator iter=l.iterator();iter.hasNext();){
+                    ConnectionState cs=(ConnectionState) iter.next();
+                    cs.getContext().getStopping().set(true);
+                }
+
+
                 taskRunner.wakeup();
                 dispatchStoppedLatch.await();
 
@@ -943,7 +953,7 @@
 		        // Remove all logical connection associated with this connection
 		        // from the broker.
 		        if(!broker.isStopped()){
-		            ArrayList l=new ArrayList(localConnectionStates.keySet());
+		            l=new ArrayList(localConnectionStates.keySet());
 		            for(Iterator iter=l.iterator();iter.hasNext();){
 		                ConnectionId connectionId=(ConnectionId) iter.next();
 		                try{

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=511089&r1=511088&r2=511089
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Feb 23 12:25:30 2007
@@ -315,7 +315,10 @@
                 if (usageManager.isSendFailIfNoSpace() ) {
                     throw new javax.jms.ResourceAllocationException("Usage Manager memory
limit reached");
                 } else {
-                    usageManager.waitForSpace();
+                    while( !usageManager.waitForSpace(1000) ) {
+                        if( context.getStopping().get() )
+                            throw new IOException("Connection closed, send aborted.");
+                    }
                 }
             }        	
         }

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=511089&r1=511088&r2=511089
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Fri Feb 23 12:25:30 2007
@@ -236,6 +236,10 @@
             if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
                 throw new javax.jms.ResourceAllocationException("Usage Manager memory limit
reached");
             } else {
+                while( !usageManager.waitForSpace(1000) ) {
+                    if( context.getStopping().get() )
+                        throw new IOException("Connection closed, send aborted.");
+                }
                 usageManager.waitForSpace();
             }    
         }

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=511089&r1=511088&r2=511089
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Fri Feb 23 12:25:30 2007
@@ -162,7 +162,7 @@
                 context.getTransaction().addSynchronization(new Synchronization(){
                     public void afterCommit() throws Exception{
                     	synchronized( TopicSubscription.this ) {
-	                    	if( singleDestination ) {
+	                    	if( singleDestination && destination!=null) {
 	                    		destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
 	                    	}
                     	}                    
@@ -172,7 +172,7 @@
                 });
             }else{
             	
-            	if( singleDestination ) {
+            	if( singleDestination && destination!=null ) {
             		destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
             	}
             	            

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java?view=diff&rev=511089&r1=511088&r2=511089
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
Fri Feb 23 12:25:30 2007
@@ -95,6 +95,19 @@
         }
     }
 
+    public boolean waitForSpace(long timeout) throws InterruptedException {
+        if(parent!=null) {
+            if( !parent.waitForSpace(timeout) )
+            	return false;
+        }
+        synchronized (usageMutex) {
+            if( percentUsage >= 100 ) {
+                usageMutex.wait(timeout);
+            }
+            return percentUsage < 100;
+        }
+    }
+
     /**
      * @param callback
      * @return true if the UsageManager was full.  The callback will only be called if this
method returns true.



Mime
View raw message