activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [1/2] activemq git commit: Adding a configuration option to PolicyEntry to enable setting the maximum number of created destinations by policy on the broker.
Date Mon, 08 Jun 2015 21:10:33 GMT
Repository: activemq
Updated Branches:
  refs/heads/master eaf5c1215 -> 062b0c2c0


Adding a configuration option to PolicyEntry to enable setting the maximum number of created
destinations by policy on the broker.

This resolves https://issues.apache.org/jira/browse/AMQ-5751


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/886e2d4d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/886e2d4d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/886e2d4d

Branch: refs/heads/master
Commit: 886e2d4d97555e2f10276616389a5d1f915bad18
Parents: eaf5c12
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Fri May 1 19:02:18 2015 +0000
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Mon Jun 8 16:00:08 2015 -0400

----------------------------------------------------------------------
 .../activemq/broker/region/AbstractRegion.java  |  82 +++++-
 .../broker/region/RegionStatistics.java         |  89 ++++++
 .../broker/region/policy/PolicyEntry.java       |  17 ++
 .../policy/MaxDestinationsPolicyTest.java       | 271 +++++++++++++++++++
 4 files changed, 457 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/886e2d4d/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
index 4f487bf..ab47ea4 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
@@ -25,7 +25,10 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
+
+import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ConsumerBrokerExchange;
 import org.apache.activemq.DestinationDoesNotExistException;
@@ -64,6 +67,7 @@ public abstract class AbstractRegion implements Region {
     protected final SystemUsage usageManager;
     protected final DestinationFactory destinationFactory;
     protected final DestinationStatistics destinationStatistics;
+    protected final RegionStatistics regionStatistics = new RegionStatistics();
     protected final RegionBroker broker;
     protected boolean autoCreateDestinations = true;
     protected final TaskRunnerFactory taskRunnerFactory;
@@ -120,7 +124,16 @@ public abstract class AbstractRegion implements Region {
         } finally {
             destinationsLock.readLock().unlock();
         }
-        destinations.clear();
+
+        destinationsLock.writeLock().lock();
+        try {
+            destinations.clear();
+            regionStatistics.getAdvisoryDestinations().reset();
+            regionStatistics.getDestinations().reset();
+            regionStatistics.getAllDestinations().reset();
+        } finally {
+            destinationsLock.writeLock().unlock();
+        }
     }
 
     public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,
@@ -131,6 +144,10 @@ public abstract class AbstractRegion implements Region {
             Destination dest = destinations.get(destination);
             if (dest == null) {
                 if (destination.isTemporary() == false || createIfTemporary) {
+                    // Limit the number of destinations that can be created if
+                    // maxDestinations has been set on a policy
+                    validateMaxDestinations(destination);
+
                     LOG.debug("{} adding destination: {}", broker.getBrokerName(), destination);
                     dest = createDestination(context, destination);
                     // intercept if there is a valid interceptor defined
@@ -140,6 +157,7 @@ public abstract class AbstractRegion implements Region {
                     }
                     dest.start();
                     destinations.put(destination, dest);
+                    updateRegionDestCounts(destination, 1);
                     destinationMap.put(destination, dest);
                     addSubscriptionsForDestination(context, dest);
                 }
@@ -157,6 +175,61 @@ public abstract class AbstractRegion implements Region {
         return subscriptions;
     }
 
+
+    /**
+     * Updates the counts in RegionStatistics based on whether or not the destination
+     * is an Advisory Destination or not
+     *
+     * @param destination the destination being used to determine which counters to update
+     * @param count the count to add to the counters
+     */
+    protected void updateRegionDestCounts(ActiveMQDestination destination, int count) {
+        if (destination != null) {
+            if (AdvisorySupport.isAdvisoryTopic(destination)) {
+                regionStatistics.getAdvisoryDestinations().add(count);
+            } else {
+                regionStatistics.getDestinations().add(count);
+            }
+            regionStatistics.getAllDestinations().add(count);
+        }
+    }
+
+    /**
+     * This method checks whether or not the destination can be created based on
+     * {@link PolicyEntry#getMaxDestinations}, if it has been set. Advisory
+     * topics are ignored.
+     *
+     * @param destination
+     * @throws Exception
+     */
+    protected void validateMaxDestinations(ActiveMQDestination destination)
+            throws Exception {
+        if (broker.getDestinationPolicy() != null) {
+            PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
+            // Make sure the destination is not an advisory topic
+            if (entry != null && entry.getMaxDestinations() >= 0
+                    && !AdvisorySupport.isAdvisoryTopic(destination)) {
+                // If there is an entry for this destination, look up the set of
+                // destinations associated with this policy
+                // If a destination isn't specified, then just count up
+                // non-advisory destinations (ie count all destinations)
+                int destinationSize = (int) (entry.getDestination() != null ?
+                        destinationMap.get(entry.getDestination()).size() : regionStatistics.getDestinations().getCount());
+                if (destinationSize >= entry.getMaxDestinations()) {
+                    if (entry.getDestination() != null) {
+                        throw new IllegalStateException(
+                                "The maxmimum number of destinations allowed ("+ entry.getMaxDestinations()
+
+                                ") for the policy " + entry.getDestination() + " has already
been reached.");
+                    // No destination has been set (default policy)
+                    } else {
+                        throw new IllegalStateException("The maxmimum number of destinations
allowed ("
+                                        + entry.getMaxDestinations() + ") has already been
reached.");
+                    }
+                }
+            }
+        }
+    }
+
     protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context,
Destination dest)
             throws Exception {
 
@@ -210,6 +283,8 @@ public abstract class AbstractRegion implements Region {
         try {
             Destination dest = destinations.remove(destination);
             if (dest != null) {
+                updateRegionDestCounts(destination, -1);
+
                 // timeout<0 or we timed out, we now force any remaining
                 // subscriptions to un-subscribe.
                 for (Iterator<Subscription> iter = subscriptions.values().iterator();
iter.hasNext();) {
@@ -620,7 +695,10 @@ public abstract class AbstractRegion implements Region {
                     destination = destinationInterceptor.intercept(destination);
                 }
                 getDestinationMap().put(key, destination);
-                destinations.put(key, destination);
+                Destination prev = destinations.put(key, destination);
+                if (prev == null) {
+                    updateRegionDestCounts(key, 1);
+                }
             }
         } finally {
             destinationsLock.writeLock().unlock();

http://git-wip-us.apache.org/repos/asf/activemq/blob/886e2d4d/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionStatistics.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionStatistics.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionStatistics.java
new file mode 100644
index 0000000..d39a4f1
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionStatistics.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.region;
+
+import org.apache.activemq.management.CountStatisticImpl;
+import org.apache.activemq.management.StatsImpl;
+
+/**
+ * The J2EE Statistics for the Connection.
+ *
+ *
+ */
+public class RegionStatistics extends StatsImpl {
+
+    private CountStatisticImpl advisoryDestinations;
+    private CountStatisticImpl destinations;
+    private CountStatisticImpl allDestinations;
+
+    public RegionStatistics() {
+        this(true);
+    }
+
+    public RegionStatistics(boolean enabled) {
+
+        advisoryDestinations = new CountStatisticImpl("advisoryTopics", "The number of advisory
destinations in the region");
+        destinations = new CountStatisticImpl("destinations", "The number of regular (non-adivsory)
destinations in the region");
+        allDestinations = new CountStatisticImpl("allDestinations", "The total number of
destinations, including advisory destinations, in the region");
+
+        addStatistic("advisoryDestinations", advisoryDestinations);
+        addStatistic("destinations", destinations);
+        addStatistic("allDestinations", allDestinations);
+
+        this.setEnabled(enabled);
+    }
+
+    public CountStatisticImpl getAdvisoryDestinations() {
+        return advisoryDestinations;
+    }
+
+    public CountStatisticImpl getDestinations() {
+        return destinations;
+    }
+
+    public CountStatisticImpl getAllDestinations() {
+        return allDestinations;
+    }
+
+    public void reset() {
+        super.reset();
+        advisoryDestinations.reset();
+        destinations.reset();
+        allDestinations.reset();
+    }
+
+    public void setEnabled(boolean enabled) {
+        super.setEnabled(enabled);
+        advisoryDestinations.setEnabled(enabled);
+        destinations.setEnabled(enabled);
+        allDestinations.setEnabled(enabled);
+    }
+
+    public void setParent(RegionStatistics parent) {
+        if (parent != null) {
+            advisoryDestinations.setParent(parent.getAdvisoryDestinations());
+            destinations.setParent(parent.getDestinations());
+            allDestinations.setParent(parent.getAllDestinations());
+        } else {
+            advisoryDestinations.setParent(null);
+            destinations.setParent(null);
+            allDestinations.setParent(null);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/886e2d4d/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
index 41b77b2..26cfa6b 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
@@ -99,6 +99,8 @@ public class PolicyEntry extends DestinationMapEntry {
     private boolean reduceMemoryFootprint;
     private NetworkBridgeFilterFactory networkBridgeFilterFactory;
     private boolean doOptimzeMessageStorage = true;
+    private int maxDestinations = -1;
+
     /*
      * percentage of in-flight messages above which optimize message store is disabled
      */
@@ -962,4 +964,19 @@ public class PolicyEntry extends DestinationMapEntry {
     public boolean isPersistJMSRedelivered() {
         return persistJMSRedelivered;
     }
+
+    public int getMaxDestinations() {
+        return maxDestinations;
+    }
+
+    /**
+     * Sets the maximum number of destinations that can be created
+     *
+     * @param maxDestinations
+     *            maximum number of destinations
+     */
+    public void setMaxDestinations(int maxDestinations) {
+        this.maxDestinations = maxDestinations;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/886e2d4d/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MaxDestinationsPolicyTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MaxDestinationsPolicyTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MaxDestinationsPolicyTest.java
new file mode 100644
index 0000000..714da18
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MaxDestinationsPolicyTest.java
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * This unit test is to test that setting the property "maxDestinations" on
+ * PolicyEntry works correctly. If this property is set, it will limit the
+ * number of destinations that can be created. Advisory topics will be ignored
+ * during calculations.
+ *
+ */
+public class MaxDestinationsPolicyTest {
+    BrokerService broker;
+    ConnectionFactory factory;
+    Connection connection;
+    Session session;
+    MessageProducer producer;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = new BrokerService();
+
+        File testDataDir = new File("target/activemq-data/AMQ-5751");
+        broker.setDataDirectoryFile(testDataDir);
+        broker.setUseJmx(true);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.getSystemUsage().getMemoryUsage().setLimit(1024l * 1024 * 64);
+        KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
+        persistenceAdapter.setDirectory(new File(testDataDir, "kahadb"));
+        broker.setPersistenceAdapter(persistenceAdapter);
+        broker.addConnector("tcp://localhost:0");
+        broker.start();
+        factory = new ActiveMQConnectionFactory(broker.getTransportConnectors()
+                .get(0).getConnectUri().toString());
+        connection = factory.createConnection();
+        connection.start();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        session.close();
+        connection.stop();
+        connection.close();
+        broker.stop();
+    }
+
+    /**
+     * Test that 10 queues can be created when default policy allows it.
+     */
+    @Test
+    public void testMaxDestinationDefaultPolicySuccess() throws Exception {
+        applyDefaultMaximumDestinationPolicy(10);
+
+        for (int i = 0; i < 10; i++) {
+            createQueue("queue." + i);
+        }
+    }
+
+    /**
+     * Test that default policy prevents going beyond max
+     */
+    @Test(expected = javax.jms.IllegalStateException.class)
+    public void testMaxDestinationDefaultPolicyFail() throws Exception {
+        applyDefaultMaximumDestinationPolicy(10);
+
+        for (int i = 0; i < 11; i++) {
+            createQueue("queue." + i);
+        }
+    }
+
+    /**
+     * Test that a queue policy overrides the default policy
+     */
+    @Test(expected = javax.jms.IllegalStateException.class)
+    public void testMaxDestinationOnQueuePolicy() throws Exception {
+        PolicyMap policyMap = applyDefaultMaximumDestinationPolicy(10);
+        applyMaximumDestinationPolicy(policyMap, new ActiveMQQueue("queue.>"),
+                5);
+
+        // This should fail even though the default policy is set to a limit of
+        // 10 because the
+        // queue policy overrides it
+        for (int i = 0; i < 6; i++) {
+            createQueue("queue." + i);
+        }
+    }
+
+    /**
+     * Test that 10 topics can be created when default policy allows it.
+     */
+    @Test
+    public void testTopicMaxDestinationDefaultPolicySuccess() throws Exception {
+        applyDefaultMaximumDestinationPolicy(10);
+
+        for (int i = 0; i < 10; i++) {
+            createTopic("topic." + i);
+        }
+    }
+
+    /**
+     * Test that topic creation will faill when exceeding the limit
+     */
+    @Test(expected = javax.jms.IllegalStateException.class)
+    public void testTopicMaxDestinationDefaultPolicyFail() throws Exception {
+        applyDefaultMaximumDestinationPolicy(20);
+
+        for (int i = 0; i < 21; i++) {
+            createTopic("topic." + i);
+        }
+    }
+
+    /**
+     * Test that no limit is enforced
+     */
+    @Test
+    public void testTopicDefaultPolicyNoMaxDestinations() throws Exception {
+        // -1 is the default and signals no max destinations
+        applyDefaultMaximumDestinationPolicy(-1);
+        for (int i = 0; i < 100; i++) {
+            createTopic("topic." + i);
+        }
+    }
+
+    /**
+     * Test a mixture of queue and topic policies
+     */
+    @Test
+    public void testComplexMaxDestinationPolicy() throws Exception {
+        PolicyMap policyMap = applyMaximumDestinationPolicy(new PolicyMap(),
+                new ActiveMQQueue("queue.>"), 5);
+        applyMaximumDestinationPolicy(policyMap, new ActiveMQTopic("topic.>"),
+                10);
+
+        for (int i = 0; i < 5; i++) {
+            createQueue("queue." + i);
+        }
+
+        for (int i = 0; i < 10; i++) {
+            createTopic("topic." + i);
+        }
+
+        // Make sure that adding one more of either a topic or a queue fails
+        boolean fail = false;
+        try {
+            createTopic("topic.test");
+        } catch (javax.jms.IllegalStateException e) {
+            fail = true;
+        }
+        assertTrue(fail);
+
+        fail = false;
+        try {
+            createQueue("queue.test");
+        } catch (javax.jms.IllegalStateException e) {
+            fail = true;
+        }
+        assertTrue(fail);
+    }
+
+    /**
+     * Test child destinations of a policy
+     */
+    @Test
+    public void testMaxDestinationPolicyOnChildDests() throws Exception {
+        applyMaximumDestinationPolicy(new PolicyMap(), new ActiveMQTopic(
+                "topic.>"), 10);
+
+        for (int i = 0; i < 10; i++) {
+            createTopic("topic.test" + i);
+        }
+
+        // Make sure that adding one more fails
+        boolean fail = false;
+        try {
+            createTopic("topic.abc.test");
+        } catch (javax.jms.IllegalStateException e) {
+            fail = true;
+        }
+        assertTrue(fail);
+
+    }
+
+    /**
+     * Test a topic policy overrides the default
+     */
+    @Test(expected = javax.jms.IllegalStateException.class)
+    public void testMaxDestinationOnTopicPolicy() throws Exception {
+        PolicyMap policyMap = applyDefaultMaximumDestinationPolicy(10);
+        applyMaximumDestinationPolicy(policyMap, new ActiveMQTopic("topic.>"),
+                5);
+
+        // This should fail even though the default policy is set to a limit of
+        // 10 because the
+        // queue policy overrides it
+        for (int i = 0; i < 6; i++) {
+            createTopic("topic." + i);
+        }
+    }
+
+    private PolicyMap applyMaximumDestinationPolicy(PolicyMap policyMap,
+            ActiveMQDestination destination, int maxDestinations) {
+        PolicyEntry entry = new PolicyEntry();
+        entry.setDestination(destination);
+        entry.setMaxDestinations(maxDestinations);
+        policyMap.setPolicyEntries(Lists.newArrayList(entry));
+        broker.setDestinationPolicy(policyMap);
+        return policyMap;
+    }
+
+    private PolicyMap applyDefaultMaximumDestinationPolicy(int maxDestinations) {
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        if (maxDestinations >= 0) {
+            defaultEntry.setMaxDestinations(maxDestinations);
+        }
+        policyMap.setDefaultEntry(defaultEntry);
+        broker.setDestinationPolicy(policyMap);
+        return policyMap;
+    }
+
+    private void createQueue(String queueName) throws Exception {
+        Queue queue = session.createQueue(queueName);
+        producer = session.createProducer(queue);
+    }
+
+    private void createTopic(String topicName) throws Exception {
+        Topic topic = session.createTopic(topicName);
+        producer = session.createProducer(topic);
+    }
+
+}


Mime
View raw message