activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hadr...@apache.org
Subject [09/10] git commit: modify fix for https://issues.apache.org/jira/browse/AMQ-4181 - apply maxBrowsePageSize, when > 0 it will limit a browser dispatch; related to https://issues.apache.org/jira/browse/AMQ-4487 https://issues.apache.org/jira/browse/AMQ-43
Date Tue, 11 Mar 2014 23:29:35 GMT
modify fix for https://issues.apache.org/jira/browse/AMQ-4181 - apply maxBrowsePageSize, when
> 0 it will limit a browser dispatch; related to https://issues.apache.org/jira/browse/AMQ-4487
https://issues.apache.org/jira/browse/AMQ-4372 https://issues.apache.org/jira/browse/AMQ-4595


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/52dc82b0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/52dc82b0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/52dc82b0

Branch: refs/heads/activemq-5.9
Commit: 52dc82b000aa7838aab5a9dae2952a9315e6dead
Parents: be78a36
Author: gtully <gary.tully@gmail.com>
Authored: Tue Oct 22 15:22:36 2013 +0100
Committer: Hadrian Zbarcea <hadrian@apache.org>
Committed: Tue Mar 11 17:06:12 2014 -0400

----------------------------------------------------------------------
 .../activemq/broker/region/BaseDestination.java |   2 +-
 .../apache/activemq/broker/region/Queue.java    |   4 +-
 .../broker/region/QueueBrowserSubscription.java |   9 ++
 .../broker/region/policy/PolicyEntry.java       |   3 +
 .../apache/activemq/JmsQueueBrowserTest.java    |   1 +
 .../org/apache/activemq/bugs/AMQ4595Test.java   |  15 +--
 .../usecases/QueueBrowsingLimitTest.java        | 113 +++++++++++++++++++
 7 files changed, 137 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/52dc82b0/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index 5edbdf3..f350098 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -286,7 +286,7 @@ public abstract class BaseDestination implements Destination {
     }
 
     public int getMaxBrowsePageSize() {
-        return this.maxBrowsePageSize;
+        return this.maxBrowsePageSize > 0 ? this.maxBrowsePageSize : getMaxPageSize();
     }
 
     public void setMaxBrowsePageSize(int maxPageSize) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/52dc82b0/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 7713d71..5b50f1c 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1605,7 +1605,7 @@ public class Queue extends BaseDestination implements Task, UsageListener
{
                         LOG.debug("dispatch to browser: {}, already dispatched/paged count:
{}", browser, alreadyDispatchedMessages.size());
                         boolean added = false;
                         for (QueueMessageReference node : alreadyDispatchedMessages) {
-                            if (!node.isAcked() && !browser.isDuplicate(node.getMessageId()))
{
+                            if (!node.isAcked() && !browser.isDuplicate(node.getMessageId())
&& !browser.atMax()) {
                                 msgContext.setMessageReference(node);
                                 if (browser.matches(node, msgContext)) {
                                     browser.add(node);
@@ -1614,7 +1614,7 @@ public class Queue extends BaseDestination implements Task, UsageListener
{
                             }
                         }
                         // are we done browsing? no new messages paged
-                        if (!added) {
+                        if (!added || browser.atMax()) {
                             browser.decrementQueueRef();
                             browserDispatches.remove(browserDispatch);
                         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/52dc82b0/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
index 9bc3c1d..97de921 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
@@ -43,6 +43,7 @@ public class QueueBrowserSubscription extends QueueSubscription {
     boolean destinationsAdded;
 
     private final Map<MessageId, Object> audit = new HashMap<MessageId, Object>();
+    private long maxMessages;
 
     public QueueBrowserSubscription(Broker broker, SystemUsage usageManager, ConnectionContext
context, ConsumerInfo info) throws JMSException {
         super(broker, usageManager, context, info);
@@ -115,4 +116,12 @@ public class QueueBrowserSubscription extends QueueSubscription {
         // in case of browser
         return new ArrayList<MessageReference>();
     }
+
+    public boolean atMax() {
+        return maxMessages > 0 && getEnqueueCounter() >= maxMessages;
+    }
+
+    public void setMaxMessages(long max) {
+        maxMessages = max;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/52dc82b0/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
index c219b19..9e1b006 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
@@ -275,6 +275,9 @@ public class PolicyEntry extends DestinationMapEntry {
         // we can remove this and perform a more efficient dispatch.
         sub.setMaxProducersToAudit(Integer.MAX_VALUE);
         sub.setMaxAuditDepth(Short.MAX_VALUE);
+
+        // part solution - dispatching to browsers needs to be restricted
+        sub.setMaxMessages(getMaxBrowsePageSize());
     }
 
     public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub)
{

http://git-wip-us.apache.org/repos/asf/activemq/blob/52dc82b0/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
index b98a461..c063e24 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
@@ -440,6 +440,7 @@ public class JmsQueueBrowserTest extends JmsTestSupport {
         PolicyMap policyMap = new PolicyMap();
         PolicyEntry policyEntry = new PolicyEntry();
         policyEntry.setUseCache(isUseCache);
+        policyEntry.setMaxBrowsePageSize(4096);
         policyMap.setDefaultEntry(policyEntry);
         brokerService.setDestinationPolicy(policyMap);
         return brokerService;

http://git-wip-us.apache.org/repos/asf/activemq/blob/52dc82b0/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java
index 04d3620..0baf5c3 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java
@@ -34,6 +34,8 @@ import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.junit.After;
 import org.junit.Before;
@@ -55,13 +57,12 @@ public class AMQ4595Test {
         TransportConnector connector = broker.addConnector("vm://localhost");
         broker.deleteAllMessages();
 
-//        PolicyEntry policy = new PolicyEntry();
-//        policy.setQueue(">");
-//        policy.setMaxAuditDepth(16384);
-//        policy.setCursorMemoryHighWaterMark(95);  // More breathing room.
-//        PolicyMap pMap = new PolicyMap();
-//        pMap.setDefaultEntry(policy);
-//        broker.setDestinationPolicy(pMap);
+        //PolicyMap pMap = new PolicyMap();
+        //PolicyEntry policyEntry = new PolicyEntry();
+        //policyEntry.setMaxBrowsePageSize(10000);
+        //pMap.put(new ActiveMQQueue(">"), policyEntry);
+        // when no policy match, browserSub has maxMessages==0
+        //broker.setDestinationPolicy(pMap);
 
         broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024);
         broker.start();

http://git-wip-us.apache.org/repos/asf/activemq/blob/52dc82b0/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingLimitTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingLimitTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingLimitTest.java
new file mode 100644
index 0000000..15df78c
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingLimitTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Enumeration;
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertEquals;
+
+public class QueueBrowsingLimitTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(QueueBrowsingLimitTest.class);
+
+    private BrokerService broker;
+    private URI connectUri;
+    private ActiveMQConnectionFactory factory;
+    private final int browserLimit = 300;
+
+
+    @Before
+    public void startBroker() throws Exception {
+        broker = createBroker();
+        TransportConnector connector = broker.addConnector("tcp://0.0.0.0:0");
+        broker.deleteAllMessages();
+        broker.start();
+        broker.waitUntilStarted();
+
+        PolicyEntry policy = new PolicyEntry();
+        policy.setMaxBrowsePageSize(browserLimit);
+        broker.setDestinationPolicy(new PolicyMap());
+        broker.getDestinationPolicy().setDefaultEntry(policy);
+
+        connectUri = connector.getConnectUri();
+        factory = new ActiveMQConnectionFactory(connectUri);
+
+    }
+
+    public BrokerService createBroker() throws IOException {
+        return new BrokerService();
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    @Test
+    public void testBrowsingLimited() throws Exception {
+
+        int messageToSend = 470;
+
+        ActiveMQQueue queue = new ActiveMQQueue("TEST");
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(queue);
+
+        String data = "";
+        for( int i=0; i < 1024*2; i++ ) {
+            data += "x";
+        }
+
+        for( int i=0; i < messageToSend; i++ ) {
+            producer.send(session.createTextMessage(data));
+        }
+
+        QueueBrowser browser = session.createBrowser(queue);
+        Enumeration<?> enumeration = browser.getEnumeration();
+        int received = 0;
+        while (enumeration.hasMoreElements()) {
+            Message m = (Message) enumeration.nextElement();
+            received++;
+            LOG.info("Browsed message " + received + ": " + m.getJMSMessageID());
+        }
+
+        browser.close();
+
+        assertEquals(browserLimit, received);
+    }
+}


Mime
View raw message