activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5486 - allow selector manager to reject tasks - org.apache.activemq.transport.nio.SelectorManager.rejectWork leaving the default to caller runs policy. This allows a broker to implement qos f
Date Wed, 28 Sep 2016 11:16:18 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 08695ab30 -> 634b42016


https://issues.apache.org/jira/browse/AMQ-5486 - allow selector manager to reject tasks -
org.apache.activemq.transport.nio.SelectorManager.rejectWork leaving the default to caller
runs policy. This allows a broker to implement qos for existing connections by forcing others
away


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/634b4201
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/634b4201
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/634b4201

Branch: refs/heads/master
Commit: 634b42016a4c347217129d49a4175afaba9666ed
Parents: 08695ab
Author: gtully <gary.tully@gmail.com>
Authored: Wed Sep 28 12:15:50 2016 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Wed Sep 28 12:15:50 2016 +0100

----------------------------------------------------------------------
 .../activemq/transport/nio/SelectorManager.java |  11 +-
 .../org/apache/activemq/bugs/AMQ5486Test.java   | 135 +++++++++++++++++++
 .../transport/nio/NIOAsyncSendWithPFCTest.java  |   4 -
 3 files changed, 145 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/634b4201/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
b/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
index b6b1f50..28d2559 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
@@ -23,6 +23,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -56,16 +57,24 @@ public final class SelectorManager {
                     t.setDaemon(true);
                     return t;
                 }
-            }, new ThreadPoolExecutor.CallerRunsPolicy());
+            }, newRejectionHandler());
 
         return rc;
     }
 
+    private RejectedExecutionHandler newRejectionHandler() {
+        return canRejectWork() ? new ThreadPoolExecutor.AbortPolicy() : new ThreadPoolExecutor.CallerRunsPolicy();
+    }
+
     private BlockingQueue<Runnable> newWorkQueue() {
         final int workQueueCapicity = getDefaultWorkQueueCapacity();
         return workQueueCapicity > 0 ? new LinkedBlockingQueue<Runnable>(workQueueCapicity)
: new SynchronousQueue<Runnable>();
     }
 
+    private static boolean canRejectWork() {
+        return Boolean.getBoolean("org.apache.activemq.transport.nio.SelectorManager.rejectWork");
+    }
+
     private static int getDefaultWorkQueueCapacity() {
         return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.workQueueCapacity",
0);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/634b4201/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5486Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5486Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5486Test.java
new file mode 100644
index 0000000..675d660
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5486Test.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.jms.support.JmsUtils;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import java.util.LinkedList;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class AMQ5486Test {
+
+    private static final int maxConnections = 100;
+    private static final int maxPoolSize = 10;
+
+    private final ExecutorService executor = Executors.newCachedThreadPool();
+    private String connectionUri;
+    private BrokerService service;
+    private TransportConnector connector;
+    final ConcurrentLinkedQueue<Connection> connections = new ConcurrentLinkedQueue<Connection>();
+
+    @Before
+    public void setUp() throws Exception {
+
+        // max out the pool and reject work
+        System.setProperty("org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize",
String.valueOf(maxPoolSize));
+        System.setProperty("org.apache.activemq.transport.nio.SelectorManager.workQueueCapacity",
"0");
+        System.setProperty("org.apache.activemq.transport.nio.SelectorManager.rejectWork",
"true");
+        service = new BrokerService();
+        service.setPersistent(false);
+        service.setUseJmx(false);
+        connector = service.addConnector("nio://0.0.0.0:0");
+        connectionUri = connector.getPublishableConnectString();
+        service.start();
+        service.waitUntilStarted();
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(connectionUri);
+    }
+
+    @Test
+    public void testFailureOnSelectorThreadPoolExhaustion() throws Exception {
+        final ConnectionFactory cf = createConnectionFactory();
+        final CountDownLatch startupLatch = new CountDownLatch(1);
+        final LinkedList<Exception> exceptions = new LinkedList<Exception>();
+        for(int i = 0; i < maxConnections; i++) {
+            executor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    ActiveMQConnection conn = null;
+                    try {
+                        startupLatch.await();
+                        conn = (ActiveMQConnection) cf.createConnection();
+                        conn.start();
+                        //conn.syncSendPacket(new TransactionInfo(conn.getConnectionInfo().getConnectionId(),
null, TransactionInfo.END));
+                        connections.add(conn);
+                    } catch (Exception e) {
+                        exceptions.add(e);
+                        JmsUtils.closeConnection(conn);
+                    }
+                }
+            });
+        }
+
+        // No connections at first
+        assertEquals(0, connector.getConnections().size());
+        // Release the latch to set up connections in parallel
+        startupLatch.countDown();
+
+        final TransportConnector connector = this.connector;
+
+
+        // Expect the max connections is created
+        assertTrue("Expected some exceptions",
+            Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return !exceptions.isEmpty();
+                }
+            })
+        );
+
+        assertTrue("Expected: more than " + (maxPoolSize - 1) + " connections, found: " +
connector.getConnections().size(),
+                Wait.waitFor(new Wait.Condition() {
+                    @Override
+                    public boolean isSatisified() throws Exception {
+                        // selector thread will take one thread from the pool
+                        return connector.getConnections().size() >= maxPoolSize - 1;
+                    }
+                })
+        );
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        executor.shutdownNow();
+
+        for (Connection connection : connections) {
+            JmsUtils.closeConnection(connection);
+        }
+
+        service.stop();
+        service.waitUntilStopped();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/634b4201/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java
index 0b7b7c3..a9cc901 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java
@@ -146,13 +146,9 @@ public class NIOAsyncSendWithPFCTest extends TestCase {
 
             }
 
-
             //wait till producer follow control kicks in
             waitForProducerFlowControl(broker, queueView);
 
-
-            TestSupport.dumpAllThreads("Blocked");
-
             try {
                 Session producerSession = exisitngConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             } catch (Exception ex) {


Mime
View raw message