activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r634505 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Date Fri, 07 Mar 2008 02:33:39 GMT
Author: chirino
Date: Thu Mar  6 18:33:32 2008
New Revision: 634505

URL: http://svn.apache.org/viewvc?rev=634505&view=rev
Log:
Added a ReentrantReadWriteLock to guard against a service call executing while a connection
is being shutdown.


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=634505&r1=634504&r2=634505&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Thu Mar  6 18:33:32 2008
@@ -18,7 +18,6 @@
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -32,8 +31,8 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.apache.activemq.Service;
 import org.apache.activemq.broker.ft.MasterBroker;
 import org.apache.activemq.broker.region.ConnectionStatistics;
 import org.apache.activemq.broker.region.RegionBroker;
@@ -90,7 +89,6 @@
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.MarshallingSupport;
-import org.apache.activemq.util.ServiceSupport;
 import org.apache.activemq.util.URISupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -98,7 +96,7 @@
 /**
  * @version $Revision: 1.8 $
  */
-public class TransportConnection implements Service, Connection, Task, CommandVisitor {
+public class TransportConnection implements Connection, Task, CommandVisitor {
 
     private static final Log LOG = LogFactory.getLog(TransportConnection.class);
     private static final Log TRANSPORTLOG = LogFactory.getLog(TransportConnection.class.getName()
@@ -137,9 +135,8 @@
     private boolean starting;
     private boolean pendingStop;
     private long timeStamp;
-    private final AtomicBoolean stopped = new AtomicBoolean(false);
-    private final AtomicBoolean transportDisposed = new AtomicBoolean();
-    private CountDownLatch stopLatch = new CountDownLatch(1);
+    private final AtomicBoolean stopping = new AtomicBoolean(false);
+    private CountDownLatch stopped = new CountDownLatch(1);
     private final AtomicBoolean asyncException = new AtomicBoolean(false);
     private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId,
ProducerBrokerExchange>();
     private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId,
ConsumerBrokerExchange>();
@@ -151,7 +148,10 @@
     private DemandForwardingBridge duplexBridge;
     private final TaskRunnerFactory taskRunnerFactory;
     private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
-
+    
+    private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
+    
+    
     /**
      * @param connector
      * @param transport
@@ -175,15 +175,25 @@
         this.transport.setTransportListener(new DefaultTransportListener() {
 
             public void onCommand(Object o) {
-                Command command = (Command)o;
-                Response response = service(command);
-                if (response != null) {
-                    dispatchSync(response);
+                serviceLock.readLock().lock();
+                try {
+                    Command command = (Command)o;
+                    Response response = service(command);
+                    if (response != null) {
+                        dispatchSync(response);
+                    }
+                } finally {
+                    serviceLock.readLock().unlock();
                 }
             }
 
             public void onException(IOException exception) {
-                serviceTransportException(exception);
+                serviceLock.readLock().lock();
+                try {
+                    serviceTransportException(exception);
+                } finally {
+                    serviceLock.readLock().unlock();
+                }
             }
         });
         connected = true;
@@ -199,12 +209,12 @@
     }
 
     public void serviceTransportException(IOException e) {
-        if (!stopped.get()) {
+        if (!stopping.get()) {
             transportException.set(e);
             if (TRANSPORTLOG.isDebugEnabled()) {
                 TRANSPORTLOG.debug("Transport failed: " + e, e);
             }
-            ServiceSupport.dispose(this);
+            stopAsync();
         }
     }
 
@@ -218,7 +228,6 @@
     public void serviceExceptionAsync(final IOException e) {
         if (asyncException.compareAndSet(false, true)) {
             new Thread("Async Exception Handler") {
-
                 public void run() {
                     serviceException(e);
                 }
@@ -241,7 +250,7 @@
             // Handle the case where the broker is stopped
             // But the client is still connected.
 
-            if (!stopped.get()) {
+            if (!stopping.get()) {
                 if (SERVICELOG.isDebugEnabled()) {
                     SERVICELOG
                         .debug("Broker has been stopped.  Notifying client and closing his
connection.");
@@ -259,9 +268,9 @@
                 }
                 // Worst case is we just kill the connection before the
                 // notification gets to him.
-                ServiceSupport.dispose(this);
+                stopAsync();
             }
-        } else if (!stopped.get() && !inServiceException) {
+        } else if (!stopping.get() && !inServiceException) {
             inServiceException = true;
             try {
                 SERVICELOG.error("Async error occurred: " + e, e);
@@ -324,15 +333,7 @@
     }
 
     public Response processShutdown(ShutdownInfo info) throws Exception {
-        new Thread("Async Exception Handler") {
-            public void run() {
-                try {
-                    TransportConnection.this.stop();
-                } catch (Exception e) {
-                    serviceException(e);
-                }
-            }
-        }.start();
+        stopAsync();
         return null;
     }
 
@@ -735,7 +736,7 @@
     }
 
     public void dispatchAsync(Command message) {
-        if (!stopped.get()) {
+        if (!stopping.get()) {
             //getStatistics().getEnqueues().increment();
             if (taskRunner == null) {
                 dispatchSync(message);
@@ -763,7 +764,7 @@
         final MessageDispatch messageDispatch = (MessageDispatch)(command.isMessageDispatch()
             ? command : null);
         try {
-            if (!stopped.get()) {
+            if (!stopping.get()) {
                 if (messageDispatch != null) {
                     broker.preProcessDispatch(messageDispatch);
                 }
@@ -783,7 +784,7 @@
 
     public boolean iterate() {
         try {
-            if (stopped.get()) {
+            if (stopping.get()) {
                 if (dispatchStopped.compareAndSet(false, true)) {
                     if (transportException.get() == null) {
                         try {
@@ -865,8 +866,14 @@
             }
         }
     }
-
     public void stop() throws Exception {
+        stopAsync();
+        if( !stopped.await(10, TimeUnit.SECONDS) ) {
+            LOG.info("Could not shutdown the connection to '" + transport.getRemoteAddress()+
"' in a timely manner.");
+        }
+    }
+    
+    public void stopAsync() {
         // If we're in the middle of starting
         // then go no further... for now.
         synchronized (this) {
@@ -876,14 +883,31 @@
                 return;
             }
         }
-        if (stopped.compareAndSet(false, true)) {
-            doStop();
-            stopLatch.countDown();
-        } else {
-            stopLatch.await(1, TimeUnit.SECONDS);
+        if (stopping.compareAndSet(false, true)) {
+            new Thread("ActiveMQ Transport Stopper: "+ transport.getRemoteAddress()) {
+                @Override
+                public void run() {
+                    // make sure we are not servicing client requests while we are shutting
down.
+                    serviceLock.writeLock().lock();
+                    try {
+                        doStop();
+                    } catch (Throwable e) {
+                        LOG.info("Error occured while shutting down a connection to '" +
transport.getRemoteAddress()+ "': "+e);
+                        LOG.debug("Error occured while shutting down a connection to '" +
transport.getRemoteAddress()+ "': ", e);
+                    } finally {
+                        stopped.countDown();
+                        serviceLock.writeLock().unlock();
+                    }
+                }
+            }.start();
         }
     }
 
+    @Override
+    public String toString() {
+        return  "Transport Connection to: "+transport.getRemoteAddress();
+    }
+    
     protected void doStop() throws Exception, InterruptedException {
         LOG.debug("Stopping connection: " + transport.getRemoteAddress());
         connector.onStopped(this);



Mime
View raw message