activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r511088 - in /activemq/branches/activemq-4.1: ./ activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Date Fri, 23 Feb 2007 20:25:14 GMT
Author: chirino
Date: Fri Feb 23 12:25:14 2007
New Revision: 511088

URL: http://svn.apache.org/viewvc?view=rev&rev=511088
Log:
 r244@34:  chirino | 2007-02-23 14:49:32 -0500
 Fix for Memory limits for topics was not returning to normal after it's consumers are disconnected
 
 

Modified:
    activemq/branches/activemq-4.1/   (props changed)
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java

Propchange: activemq/branches/activemq-4.1/
------------------------------------------------------------------------------
--- svk:merge (original)
+++ svk:merge Fri Feb 23 12:25:14 2007
@@ -1 +1 @@
-635f1f41-eb29-0410-ac9d-be9e2c357fdd:/local/amq-4.1-port:243
+635f1f41-eb29-0410-ac9d-be9e2c357fdd:/local/amq-4.1-port:244

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=511088&r1=511087&r2=511088
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Fri Feb 23 12:25:14 2007
@@ -77,6 +77,7 @@
 import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
 import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicReference;
 
 /**
  * @version $Revision: 1.8 $
@@ -105,7 +106,7 @@
     // Used to do async dispatch..  this should perhaps be pushed down into the transport
layer..
     protected final List dispatchQueue = Collections.synchronizedList(new LinkedList());
     protected final TaskRunner taskRunner;
-    protected IOException transportException;        
+    protected final AtomicReference transportException = new AtomicReference();
     private boolean inServiceException=false;
 
     private ConnectionStatistics statistics = new ConnectionStatistics();
@@ -126,7 +127,8 @@
     protected final AtomicBoolean asyncException = new AtomicBoolean(false);
     private ConnectionContext context;
     private boolean networkConnection;
-    private CountDownLatch dispatchStopped = new CountDownLatch(1);
+    private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
+    protected AtomicBoolean dispatchStopped=new AtomicBoolean(false);
     
     static class ConnectionState extends org.apache.activemq.state.ConnectionState {
         private final ConnectionContext context;
@@ -180,7 +182,7 @@
             	Command command = (Command) o;
                 Response response = service(command);
                 if (response != null) {
-                    dispatch(response);
+                    dispatchSync(response);
                 }
             }
 
@@ -206,7 +208,7 @@
 
     public void serviceTransportException(IOException e) {
         if( !disposed.get() ) {
-            transportException = e; 
+            transportException.set(e); 
             if( transportLog.isDebugEnabled() )
                 transportLog.debug("Transport failed: "+e,e);
             ServiceSupport.dispose(this);
@@ -766,26 +768,46 @@
 
     public void dispatchSync(Command message) {
         getStatistics().getEnqueues().increment();
-        processDispatch(message);
+        try {
+            processDispatch(message);
+        } catch (IOException e) {
+            serviceExceptionAsync(e);
+        }
     }
     
     
     public void dispatchAsync(Command message) {
-        getStatistics().getEnqueues().increment();
-        if( taskRunner==null ) {
-            dispatchSync( message );
-        } else {
-            dispatchQueue.add(message);
-            try {
-                taskRunner.wakeup();
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
+        if( !disposed.get() ) {
+            getStatistics().getEnqueues().increment();
+            if( taskRunner==null ) {
+                dispatchSync( message );
+            } else {
+                dispatchQueue.add(message);
+                try {
+                    taskRunner.wakeup();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
             }
-        }        
+        } else {
+            if(message.isMessageDispatch()) {
+                MessageDispatch md=(MessageDispatch) message;
+                Runnable sub=(Runnable) md.getConsumer();
+                broker.processDispatch(md);
+                if(sub!=null){
+                    sub.run();
+                }
+             }
+        }
     }
     
-    protected void processDispatch(Command command){
+    protected void processDispatch(Command command) throws IOException {
         try {
+            if( !disposed.get() ) {
+                 dispatch(command);
+            }
+       } finally {
+
             if(command.isMessageDispatch()){
                 MessageDispatch md=(MessageDispatch) command;
                 Runnable sub=(Runnable) md.getConsumer();
@@ -793,25 +815,43 @@
                 if(sub!=null){
                     sub.run();
                 }
-                dispatch(command);
-            } else if( command.isShutdownInfo() ) {
-                dispatch(command);
-                dispatchStopped.countDown();
-            } else {
-                dispatch(command);
             }
-        } finally {
+
             getStatistics().getDequeues().increment();
         }
     }       
     
     public boolean iterate() {
-        if( dispatchQueue.isEmpty() || broker.isStopped()) {
-            return false;
-        } else {
-            Command command = (Command) dispatchQueue.remove(0);
-            processDispatch( command );
-            return true;
+        try {
+           if( disposed.get() ) {
+                if( dispatchStopped.compareAndSet(false, true)) {                       
                                     
+                    if( transportException.get()==null ) {
+                        dispatch(new ShutdownInfo());
+                    }
+                    dispatchStoppedLatch.countDown();
+                }
+                return false;                           
+            } 
+
+            if( !dispatchStopped.get() )  {
+
+                if( dispatchQueue.isEmpty() ) {
+                    return false;
+                } else {
+                    Command command = (Command) dispatchQueue.remove(0);
+                    processDispatch( command );
+                    return true;
+                }
+            } else {
+                return false;
+            }
+
+        } catch (IOException e) {
+            if( dispatchStopped.compareAndSet(false, true)) {                           
                                         
+                dispatchStoppedLatch.countDown();
+            }
+            serviceExceptionAsync(e);
+            return false;                           
         }
     }    
             
@@ -880,22 +920,25 @@
 	
 	        if(disposed.compareAndSet(false, true)) {
 		        
-               // Clear out what's on the queue so that we can send the Shutdown command
quicker.
-               dispatchQueue.clear();
-               if( transportException==null ) {
-                    // Wait up to 10 seconds for the shutdown command to be sent to 
-                    // the client.
-                    dispatchAsync(new ShutdownInfo());
-                    dispatchStopped.await(10, TimeUnit.SECONDS);
-               }
+                taskRunner.wakeup();
+                dispatchStoppedLatch.await();
 
 		        if( taskRunner!=null )
-		            taskRunner.shutdownNoWait();
-		        
-		        // Clear out the dispatch queue to release any memory that
-		        // is being held on to.
-		        dispatchQueue.clear();
+		            taskRunner.shutdown();
 		        
+                // Run the MessageDispatch callbacks so that message references get cleaned
up.
+                for (Iterator iter = dispatchQueue.iterator(); iter.hasNext();) {
+                    Command command = (Command) iter.next();
+                    if(command.isMessageDispatch()) {
+                        MessageDispatch md=(MessageDispatch) command;
+                        Runnable sub=(Runnable) md.getConsumer();
+                        broker.processDispatch(md);
+                        if(sub!=null){
+                            sub.run();
+                        }
+                    }
+                } 
+
 		        //
 		        // Remove all logical connection associated with this connection
 		        // from the broker.
@@ -1077,12 +1120,10 @@
         return null;
     }
 
-    protected void dispatch(Command command) {
+    protected void dispatch(Command command) throws IOException {
         try {
             setMarkedCandidate(true);
             transport.oneway(command);
-        } catch(IOException e){
-            serviceExceptionAsync(e);
         } finally{
             setMarkedCandidate(false);
         }



Mime
View raw message