qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject qpid-jms git commit: QPIDJMS-237 Ensure that executor threads shutdown quickly
Date Tue, 13 Dec 2016 23:35:07 GMT
Repository: qpid-jms
Updated Branches:
  refs/heads/master affc435d0 -> a26b7791b


QPIDJMS-237 Ensure that executor threads shutdown quickly

Ensure that the scheduled executors shutdown in a timely manner when the
connections are closed.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/a26b7791
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/a26b7791
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/a26b7791

Branch: refs/heads/master
Commit: a26b7791b85c277dcd1f6d63679e96db5e03e4d3
Parents: affc435
Author: Timothy Bish <tabish121@gmail.com>
Authored: Tue Dec 13 18:34:57 2016 -0500
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Tue Dec 13 18:34:57 2016 -0500

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpProvider.java    | 33 +++++++++++++++-----
 .../jms/provider/failover/FailoverProvider.java | 17 ++++++----
 .../qpid/jms/provider/mock/MockProvider.java    | 13 +++++---
 3 files changed, 45 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a26b7791/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 10f98cd..b59859b 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -19,10 +19,10 @@ package org.apache.qpid.jms.provider.amqp;
 import java.io.IOException;
 import java.net.URI;
 import java.nio.ByteBuffer;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -101,12 +101,13 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     private static final AtomicInteger PROVIDER_SEQUENCE = new AtomicInteger();
     private static final NoOpAsyncResult NOOP_REQUEST = new NoOpAsyncResult();
 
-    private ProviderListener listener;
+    private volatile ProviderListener listener;
     private AmqpConnection connection;
-    private volatile AmqpSaslAuthenticator authenticator;
-    private org.apache.qpid.jms.transports.Transport transport;
+    private AmqpSaslAuthenticator authenticator;
+    private volatile org.apache.qpid.jms.transports.Transport transport;
     private String transportType = AmqpProviderFactory.DEFAULT_TRANSPORT_TYPE;
     private String vhost;
+    private String threadToken;
     private boolean traceFrames;
     private boolean traceBytes;
     private boolean saslLayer = true;
@@ -120,7 +121,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
 
     private final URI remoteURI;
     private final AtomicBoolean closed = new AtomicBoolean();
-    private final ScheduledExecutorService serializer;
+    private ScheduledThreadPoolExecutor serializer;
     private final Transport protonTransport = Transport.Factory.create();
     private final Collector protonCollector = new CollectorImpl();
     private final Connection protonConnection = Connection.Factory.create();
@@ -136,7 +137,8 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
      */
     public AmqpProvider(URI remoteURI) {
         this.remoteURI = remoteURI;
-        this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory()
{
+
+        serializer = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
 
             @Override
             public Thread newThread(Runnable runner) {
@@ -149,6 +151,9 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
             }
         });
 
+        serializer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+        serializer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+
         updateTracer();
     }
 
@@ -290,7 +295,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
                         }
                     }
                 } finally {
-                    ThreadPoolUtils.shutdown(serializer);
+                    ThreadPoolUtils.shutdownGraceful(serializer);
                 }
             }
         }
@@ -1218,6 +1223,20 @@ public class AmqpProvider implements Provider, TransportListener ,
AmqpResourceP
     }
 
     /**
+     * @return the threadToken
+     */
+    public String getThreadToken() {
+        return threadToken;
+    }
+
+    /**
+     * @param threadToken the threadToken to set
+     */
+    public void setThreadToken(String threadToken) {
+        this.threadToken = threadToken;
+    }
+
+    /**
      * Allows a resource to request that its parent resource schedule a future
      * cancellation of a request and return it a {@link Future} instance that
      * can be used to cancel the scheduled automatic failure of the request.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a26b7791/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
index 7a76c7e..ec3b1fb 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
@@ -23,9 +23,8 @@ import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -89,8 +88,8 @@ public class FailoverProvider extends DefaultProviderListener implements
Provide
     private final FailoverUriPool uris;
     private ScheduledFuture<?> requestTimeoutTask;
 
-    private final ScheduledExecutorService serializer;
-    private final ScheduledExecutorService connectionHub;
+    private final ScheduledThreadPoolExecutor serializer;
+    private final ScheduledThreadPoolExecutor connectionHub;
     private final AtomicBoolean closed = new AtomicBoolean();
     private final AtomicBoolean failed = new AtomicBoolean();
     private final AtomicBoolean closingConnection = new AtomicBoolean(false);
@@ -134,7 +133,7 @@ public class FailoverProvider extends DefaultProviderListener implements
Provide
     public FailoverProvider(List<URI> uris, Map<String, String> nestedOptions)
{
         this.uris = new FailoverUriPool(uris, nestedOptions);
 
-        this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory()
{
+        serializer = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
 
             @Override
             public Thread newThread(Runnable runner) {
@@ -145,10 +144,13 @@ public class FailoverProvider extends DefaultProviderListener implements
Provide
             }
         });
 
+        serializer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+        serializer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+
         // All Connection attempts happen in this schedulers thread.  Once a connection
         // is established it will hand the open connection back to the serializer thread
         // for state recovery.
-        this.connectionHub = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+        connectionHub = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
 
             @Override
             public Thread newThread(Runnable runner) {
@@ -158,6 +160,9 @@ public class FailoverProvider extends DefaultProviderListener implements
Provide
                 return serial;
             }
         });
+
+        connectionHub.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+        connectionHub.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a26b7791/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
index e822239..95b0699 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
@@ -19,8 +19,7 @@ package org.apache.qpid.jms.provider.mock;
 import java.io.IOException;
 import java.net.URI;
 import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -44,6 +43,7 @@ import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.ProviderFuture;
 import org.apache.qpid.jms.provider.ProviderListener;
 import org.apache.qpid.jms.provider.amqp.AmqpProvider;
+import org.apache.qpid.jms.util.ThreadPoolUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,7 +61,7 @@ public class MockProvider implements Provider {
     private final MockProviderStats stats;
     private final URI remoteURI;
     private final MockProviderConfiguration configuration;
-    private final ScheduledExecutorService serializer;
+    private final ScheduledThreadPoolExecutor serializer;
     private final AtomicBoolean closed = new AtomicBoolean();
     private final MockRemotePeer context;
 
@@ -77,7 +77,7 @@ public class MockProvider implements Provider {
         this.context = context;
         this.stats = new MockProviderStats(context != null ? context.getContextStats() :
null);
 
-        this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory()
{
+        serializer = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
 
             @Override
             public Thread newThread(Runnable runner) {
@@ -89,6 +89,9 @@ public class MockProvider implements Provider {
                 return serial;
             }
         });
+
+        serializer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+        serializer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
     }
 
     @Override
@@ -156,7 +159,7 @@ public class MockProvider implements Provider {
             } catch (IOException e) {
                 LOG.warn("Error caught while closing Provider: ", e.getMessage());
             } finally {
-                serializer.shutdown();
+                ThreadPoolUtils.shutdownGraceful(serializer);
             }
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message