Author: chirino
Date: Fri Feb 23 12:25:14 2007
New Revision: 511088
URL: http://svn.apache.org/viewvc?view=rev&rev=511088
Log:
r244@34: chirino | 2007-02-23 14:49:32 -0500
Fix for Memory limits for topics was not returning to normal after it's consumers are disconnected
Modified:
activemq/branches/activemq-4.1/ (props changed)
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Propchange: activemq/branches/activemq-4.1/
------------------------------------------------------------------------------
--- svk:merge (original)
+++ svk:merge Fri Feb 23 12:25:14 2007
@@ -1 +1 @@
-635f1f41-eb29-0410-ac9d-be9e2c357fdd:/local/amq-4.1-port:243
+635f1f41-eb29-0410-ac9d-be9e2c357fdd:/local/amq-4.1-port:244
Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=511088&r1=511087&r2=511088
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Fri Feb 23 12:25:14 2007
@@ -77,6 +77,7 @@
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicReference;
/**
* @version $Revision: 1.8 $
@@ -105,7 +106,7 @@
// Used to do async dispatch.. this should perhaps be pushed down into the transport
layer..
protected final List dispatchQueue = Collections.synchronizedList(new LinkedList());
protected final TaskRunner taskRunner;
- protected IOException transportException;
+ protected final AtomicReference transportException = new AtomicReference();
private boolean inServiceException=false;
private ConnectionStatistics statistics = new ConnectionStatistics();
@@ -126,7 +127,8 @@
protected final AtomicBoolean asyncException = new AtomicBoolean(false);
private ConnectionContext context;
private boolean networkConnection;
- private CountDownLatch dispatchStopped = new CountDownLatch(1);
+ private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
+ protected AtomicBoolean dispatchStopped=new AtomicBoolean(false);
static class ConnectionState extends org.apache.activemq.state.ConnectionState {
private final ConnectionContext context;
@@ -180,7 +182,7 @@
Command command = (Command) o;
Response response = service(command);
if (response != null) {
- dispatch(response);
+ dispatchSync(response);
}
}
@@ -206,7 +208,7 @@
public void serviceTransportException(IOException e) {
if( !disposed.get() ) {
- transportException = e;
+ transportException.set(e);
if( transportLog.isDebugEnabled() )
transportLog.debug("Transport failed: "+e,e);
ServiceSupport.dispose(this);
@@ -766,26 +768,46 @@
public void dispatchSync(Command message) {
getStatistics().getEnqueues().increment();
- processDispatch(message);
+ try {
+ processDispatch(message);
+ } catch (IOException e) {
+ serviceExceptionAsync(e);
+ }
}
public void dispatchAsync(Command message) {
- getStatistics().getEnqueues().increment();
- if( taskRunner==null ) {
- dispatchSync( message );
- } else {
- dispatchQueue.add(message);
- try {
- taskRunner.wakeup();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ if( !disposed.get() ) {
+ getStatistics().getEnqueues().increment();
+ if( taskRunner==null ) {
+ dispatchSync( message );
+ } else {
+ dispatchQueue.add(message);
+ try {
+ taskRunner.wakeup();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
- }
+ } else {
+ if(message.isMessageDispatch()) {
+ MessageDispatch md=(MessageDispatch) message;
+ Runnable sub=(Runnable) md.getConsumer();
+ broker.processDispatch(md);
+ if(sub!=null){
+ sub.run();
+ }
+ }
+ }
}
- protected void processDispatch(Command command){
+ protected void processDispatch(Command command) throws IOException {
try {
+ if( !disposed.get() ) {
+ dispatch(command);
+ }
+ } finally {
+
if(command.isMessageDispatch()){
MessageDispatch md=(MessageDispatch) command;
Runnable sub=(Runnable) md.getConsumer();
@@ -793,25 +815,43 @@
if(sub!=null){
sub.run();
}
- dispatch(command);
- } else if( command.isShutdownInfo() ) {
- dispatch(command);
- dispatchStopped.countDown();
- } else {
- dispatch(command);
}
- } finally {
+
getStatistics().getDequeues().increment();
}
}
public boolean iterate() {
- if( dispatchQueue.isEmpty() || broker.isStopped()) {
- return false;
- } else {
- Command command = (Command) dispatchQueue.remove(0);
- processDispatch( command );
- return true;
+ try {
+ if( disposed.get() ) {
+ if( dispatchStopped.compareAndSet(false, true)) {
+ if( transportException.get()==null ) {
+ dispatch(new ShutdownInfo());
+ }
+ dispatchStoppedLatch.countDown();
+ }
+ return false;
+ }
+
+ if( !dispatchStopped.get() ) {
+
+ if( dispatchQueue.isEmpty() ) {
+ return false;
+ } else {
+ Command command = (Command) dispatchQueue.remove(0);
+ processDispatch( command );
+ return true;
+ }
+ } else {
+ return false;
+ }
+
+ } catch (IOException e) {
+ if( dispatchStopped.compareAndSet(false, true)) {
+ dispatchStoppedLatch.countDown();
+ }
+ serviceExceptionAsync(e);
+ return false;
}
}
@@ -880,22 +920,25 @@
if(disposed.compareAndSet(false, true)) {
- // Clear out what's on the queue so that we can send the Shutdown command
quicker.
- dispatchQueue.clear();
- if( transportException==null ) {
- // Wait up to 10 seconds for the shutdown command to be sent to
- // the client.
- dispatchAsync(new ShutdownInfo());
- dispatchStopped.await(10, TimeUnit.SECONDS);
- }
+ taskRunner.wakeup();
+ dispatchStoppedLatch.await();
if( taskRunner!=null )
- taskRunner.shutdownNoWait();
-
- // Clear out the dispatch queue to release any memory that
- // is being held on to.
- dispatchQueue.clear();
+ taskRunner.shutdown();
+ // Run the MessageDispatch callbacks so that message references get cleaned
up.
+ for (Iterator iter = dispatchQueue.iterator(); iter.hasNext();) {
+ Command command = (Command) iter.next();
+ if(command.isMessageDispatch()) {
+ MessageDispatch md=(MessageDispatch) command;
+ Runnable sub=(Runnable) md.getConsumer();
+ broker.processDispatch(md);
+ if(sub!=null){
+ sub.run();
+ }
+ }
+ }
+
//
// Remove all logical connection associated with this connection
// from the broker.
@@ -1077,12 +1120,10 @@
return null;
}
- protected void dispatch(Command command) {
+ protected void dispatch(Command command) throws IOException {
try {
setMarkedCandidate(true);
transport.oneway(command);
- } catch(IOException e){
- serviceExceptionAsync(e);
} finally{
setMarkedCandidate(false);
}
|