activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r637879 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Date Mon, 17 Mar 2008 13:30:54 GMT
Author: rajdavies
Date: Mon Mar 17 06:30:38 2008
New Revision: 637879

URL: http://svn.apache.org/viewvc?rev=637879&view=rev
Log:
send shutdown to transports asynchronously - as they may be blocked

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=637879&r1=637878&r2=637879&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Mon Mar 17 06:30:38 2008
@@ -21,6 +21,10 @@
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -75,7 +79,7 @@
 public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
     
     private static final Log LOG = LogFactory.getLog(DemandForwardingBridge.class);
-    
+    private static final ThreadPoolExecutor STOP_TASKS;
     protected final Transport localBroker;
     protected final Transport remoteBroker;
     protected final IdGenerator idGenerator = new IdGenerator();
@@ -113,6 +117,7 @@
     private boolean createdByDuplex;
     private BrokerInfo localBrokerInfo;
     private BrokerInfo remoteBrokerInfo;
+    
 
     private AtomicBoolean started = new AtomicBoolean();
 
@@ -331,10 +336,23 @@
                 try {
                     disposed = true;
                     remoteBridgeStarted.set(false);
-                    localBroker.oneway(new ShutdownInfo());
-                    remoteBroker.oneway(new ShutdownInfo());
-                } catch (IOException e) {
-                    LOG.debug("Caught exception stopping", e);
+                    final CountDownLatch sendShutdown = new CountDownLatch(1);
+                    STOP_TASKS.execute(new Runnable() {
+                        public void run() {
+                            try {
+                                localBroker.oneway(new ShutdownInfo());
+                                remoteBroker.oneway(new ShutdownInfo());
+                            } catch (Throwable e) {
+                                LOG.debug("Caught exception sending shutdown", e);
+                            }finally {
+                                sendShutdown.countDown();
+                            }
+                            
+                        }
+                    });
+                    if( !sendShutdown.await(100, TimeUnit.MILLISECONDS) ) {
+                        LOG.debug("Network Could not shutdown in a timely manner");
+                    }
                 } finally {
                     ServiceStopper ss = new ServiceStopper();
                     ss.stop(localBroker);
@@ -636,7 +654,7 @@
                     }
                 }
             } catch (Throwable e) {
-            	e.printStackTrace();
+                LOG.warn("Caught an exception processing local command",e);
                 serviceLocalException(e);
             }
         }
@@ -949,6 +967,16 @@
     
     protected boolean isDuplex() {
         return configuration.isDuplex() || createdByDuplex;
+    }
+    
+    static {
+        STOP_TASKS =   new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new ThreadFactory() {
+            public Thread newThread(Runnable runnable) {
+                Thread thread = new Thread(runnable, "NetworkBridge: "+runnable);
+                thread.setDaemon(true);
+                return thread;
+            }
+        });
     }
 
 }



Mime
View raw message