cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ashaki...@apache.org
Subject cxf git commit: [CXF-6702]: Support external executor for JMS Containers
Date Sun, 06 Dec 2015 15:26:43 GMT
Repository: cxf
Updated Branches:
  refs/heads/master aaabd57fb -> 0336a2399


[CXF-6702]: Support external executor for JMS Containers


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/0336a239
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/0336a239
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/0336a239

Branch: refs/heads/master
Commit: 0336a2399b1980e04372c651368f8718f75a4f14
Parents: aaabd57
Author: Andrei Shakirin <andrei.shakirin@gmail.com>
Authored: Sun Dec 6 16:26:34 2015 +0100
Committer: Andrei Shakirin <andrei.shakirin@gmail.com>
Committed: Sun Dec 6 16:26:34 2015 +0100

----------------------------------------------------------------------
 .../apache/cxf/transport/jms/JMSConduit.java    |  6 ++--
 .../cxf/transport/jms/JMSDestination.java       |  6 ++--
 .../apache/cxf/transport/jms/JMSFactory.java    |  5 ++-
 .../util/AbstractMessageListenerContainer.java  | 37 ++++++++++++++++++--
 .../util/PollingMessageListenerContainer.java   | 24 ++-----------
 5 files changed, 50 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/0336a239/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
index 1a8afc9..677dcf6 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
@@ -178,8 +178,10 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender,
Me
                                                                                       staticReplyDestination,

                                                                                       this);
                     container.setMessageSelector(messageSelector);
-                    Executor executor = JMSFactory.createExecutor(bus, "jms-conduit");
-                    container.setExecutor(executor);
+                    Object executor = bus.getProperty(JMSFactory.JMS_CONDUIT_EXECUTOR);
+                    if (executor instanceof Executor) {
+                        container.setExecutor((Executor) executor);
+                    }
                     container.start();
                     jmsListener = container;
                     addBusListener();

http://git-wip-us.apache.org/repos/asf/cxf/blob/0336a239/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
index 9f0fcbc..113a7d2 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
@@ -140,8 +140,10 @@ public class JMSDestination extends AbstractMultiplexDestination implements
Mess
             container.setTransacted(jmsConfig.isSessionTransacted());
             container.setDurableSubscriptionName(jmsConfig.getDurableSubscriptionName());
 
-            Executor executor = JMSFactory.createExecutor(bus, "jms-destination");
-            container.setExecutor(executor);
+            Object executor = bus.getProperty(JMSFactory.JMS_DESTINATION_EXECUTOR);
+            if (executor instanceof Executor) {
+                container.setExecutor((Executor) executor);
+            }
             container.start();
             suspendedContinuations.setListenerContainer(container);
             connection.start();

http://git-wip-us.apache.org/repos/asf/cxf/blob/0336a239/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
index 4eaf083..5d617c8 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
@@ -37,6 +37,9 @@ import org.apache.cxf.workqueue.WorkQueueManager;
  * Factory to create jms helper objects from configuration and context information
  */
 public final class JMSFactory {
+    public static final String JMS_DESTINATION_EXECUTOR = "org.apache.cxf.extensions.jms.destination.executor";
+    public static final String JMS_CONDUIT_EXECUTOR = "org.apache.cxf.extensions.jms.conduit.executor";
+
     static final String MESSAGE_ENDPOINT_FACTORY = "MessageEndpointFactory";
     static final String MDB_TRANSACTED_METHOD = "MDBTransactedMethod";
 
@@ -116,7 +119,7 @@ public final class JMSFactory {
      * @param name
      * @return
      */
-    public static Executor createExecutor(Bus bus, String name) {
+    public static Executor createWorkQueueExecutor(Bus bus, String name) {
         WorkQueueManager manager = bus.getExtension(WorkQueueManager.class);
         if (manager != null) {
             AutomaticWorkQueue workQueue1 = manager.getNamedWorkQueue(name);

http://git-wip-us.apache.org/repos/asf/cxf/blob/0336a239/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java
index f5affe4..65d6c4c 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java
@@ -19,7 +19,9 @@
 package org.apache.cxf.transport.jms.util;
 
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Logger;
 
 import javax.jms.Connection;
@@ -40,11 +42,14 @@ public abstract class AbstractMessageListenerContainer implements JMSListenerCon
     protected int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
     protected String messageSelector;
     protected boolean running;
-    protected Executor executor;
     protected String durableSubscriptionName;
     protected boolean pubSubNoLocal;
     protected TransactionManager transactionManager;
 
+    private Executor executor;
+    private int concurrentConsumers = 1;
+    private boolean internalExecutor;
+
     public AbstractMessageListenerContainer() {
         super();
     }
@@ -70,7 +75,8 @@ public abstract class AbstractMessageListenerContainer implements JMSListenerCon
 
     protected Executor getExecutor() {
         if (executor == null) {
-            executor = Executors.newFixedThreadPool(10);
+            executor = Executors.newFixedThreadPool(concurrentConsumers);
+            internalExecutor = true;
         }
         return executor;
     }
@@ -79,6 +85,25 @@ public abstract class AbstractMessageListenerContainer implements JMSListenerCon
         this.executor = executor;
     }
 
+    @Override
+    public void stop() {
+        // In case of using external executor, don't shutdown it
+        if ((executor == null) || !internalExecutor) {
+            return;
+        }
+        
+        ExecutorService executorService = (ExecutorService)executor;
+        executorService.shutdown();
+        try {
+            executorService.awaitTermination(10, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            // Ignore
+        }
+        executorService.shutdownNow();
+        executor = null;
+        internalExecutor = false;
+    }
+    
     public void setDurableSubscriptionName(String durableSubscriptionName) {
         this.durableSubscriptionName = durableSubscriptionName;
     }
@@ -96,4 +121,12 @@ public abstract class AbstractMessageListenerContainer implements JMSListenerCon
         this.transactionManager = transactionManager;
     }
 
+    public void setConcurrentConsumers(int concurrentConsumers) {
+        this.concurrentConsumers = concurrentConsumers;
+    }
+
+    public int getConcurrentConsumers() {
+        return concurrentConsumers;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/0336a239/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
index f79ab09..0acd40f 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
@@ -18,9 +18,6 @@
  */
 package org.apache.cxf.transport.jms.util;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -40,10 +37,6 @@ import org.apache.cxf.common.logging.LogUtils;
 public class PollingMessageListenerContainer extends AbstractMessageListenerContainer {
     private static final Logger LOG = LogUtils.getL7dLogger(PollingMessageListenerContainer.class);
 
-    private ExecutorService pollers;
-
-    private int concurrentConsumers = 1;
-
     public PollingMessageListenerContainer(Connection connection, Destination destination,
                                            MessageListener listenerHandler) {
         this.connection = connection;
@@ -164,10 +157,9 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
             return;
         }
         running = true;
-        pollers = Executors.newFixedThreadPool(concurrentConsumers);
-        for (int c = 0; c < concurrentConsumers; c++) {
+        for (int c = 0; c < getConcurrentConsumers(); c++) {
             Runnable poller = (transactionManager != null) ? new XAPoller() : new Poller();

-            pollers.execute(poller);
+            getExecutor().execute(poller);
         }
     }
 
@@ -178,14 +170,7 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
             return;
         }
         running = false;
-        pollers.shutdown();
-        try {
-            pollers.awaitTermination(10, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-            // Ignore
-        }
-        pollers.shutdownNow();
-        pollers = null;
+        super.stop();        
     }
 
     @Override
@@ -193,7 +178,4 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
         stop();
     }
 
-    public void setConcurrentConsumers(int concurrentConsumers) {
-        this.concurrentConsumers = concurrentConsumers;
-    }
 }


Mime
View raw message