activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r394729 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Date Mon, 17 Apr 2006 17:05:00 GMT
Author: chirino
Date: Mon Apr 17 10:04:59 2006
New Revision: 394729

URL: http://svn.apache.org/viewcvs?rev=394729&view=rev
Log:
An async error could cause a deadlock when using the VM transport since all it's operations
are sync.  The error handling is now done in an async thread to avoid the deadlock.


Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=394729&r1=394728&r2=394729&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Mon Apr 17 10:04:59 2006
@@ -88,12 +88,19 @@
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
 import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingDeque;
+import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
 
 public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection,
StatsCapable, Closeable,  StreamConnection, TransportListener {
 
     public static final TaskRunnerFactory SESSION_TASK_RUNNER = new TaskRunnerFactory("session
Task",ThreadPriorities.INBOUND_CLIENT_SESSION,true,1000);
+    private final Executor asyncConnectionThread;
 
     private static final Log log = LogFactory.getLog(ActiveMQConnection.class);
     private static final IdGenerator connectionIdGenerator = new IdGenerator();
@@ -165,6 +172,14 @@
      */
     protected ActiveMQConnection(Transport transport, JMSStatsImpl factoryStats)
             throws Exception {
+       
+	// Configure a single threaded executor who's core thread can timeout if idle
+        asyncConnectionThread = new ThreadPoolExecutor(1,1,5,TimeUnit.SECONDS, new LinkedBlockingQueue(),
new ThreadFactory() {
+            public Thread newThread(Runnable r) {
+                return new Thread(r, "Connection task");
+            }});
+        asyncConnectionThread.allowCoreThreadTimeOut(true);
+        
         this.info = new ConnectionInfo(new ConnectionId(connectionIdGenerator.generateId()));
         this.info.setManageable(true);
         this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
@@ -1388,7 +1403,7 @@
     /**
      * @param command - the command to consume
      */
-    public void onCommand(Command command) {
+    public void onCommand(final Command command) {
         if (!closed.get() && command != null) {
             if (command.isMessageDispatch()) {
                 MessageDispatch md = (MessageDispatch) command;
@@ -1416,7 +1431,13 @@
                 onControlCommand((ControlCommand) command);
             }
             else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE)
{
-                onAsyncException(((ConnectionError)command).getException());
+                asyncConnectionThread.execute(new Runnable(){
+                    public void run() {
+                        onAsyncException(((ConnectionError)command).getException());
+                    }
+                });
+                new Thread("Async error worker") {
+                }.start();
             }else if (command instanceof ConnectionControl){
                 onConnectionControl((ConnectionControl) command);
             }else if (command instanceof ConsumerControl){
@@ -1437,25 +1458,37 @@
     public void onAsyncException(Throwable error) {
         if (!closed.get() && !closing.get()) {
             if (this.exceptionListener != null) {
+                
                 if (!(error instanceof JMSException))
                     error = JMSExceptionSupport.create(error);
-                this.exceptionListener.onException((JMSException) error);
+                final JMSException e = (JMSException) error;
+                
+                asyncConnectionThread.execute(new Runnable(){
+                    public void run() {
+                        ActiveMQConnection.this.exceptionListener.onException(e);
+                    }
+                });
+                
             } else {
                 log.warn("Async exception with no exception listener: " + error, error);
             }
         }
     }
     
-    public void onException(IOException error) {
+    public void onException(final IOException error) {
         onAsyncException(error);
-        transportFailed(error);
-        ServiceSupport.dispose(this.transport);
-        brokerInfoReceived.countDown();
-
-        for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
-            TransportListener listener = (TransportListener) iter.next();
-            listener.onException(error);
-        }
+        asyncConnectionThread.execute(new Runnable(){
+            public void run() {
+                transportFailed(error);
+                ServiceSupport.dispose(ActiveMQConnection.this.transport);
+                brokerInfoReceived.countDown();
+        
+                for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
+                    TransportListener listener = (TransportListener) iter.next();
+                    listener.onException(error);
+                }
+            }
+        });
     }
     
     public void transportInterupted() {
@@ -1781,4 +1814,4 @@
 
 
     
-}
\ No newline at end of file
+}



Mime
View raw message