Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5C7C110D1C for ; Wed, 5 Jun 2013 17:35:15 +0000 (UTC) Received: (qmail 63674 invoked by uid 500); 5 Jun 2013 17:35:15 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 63598 invoked by uid 500); 5 Jun 2013 17:35:13 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 63587 invoked by uid 99); 5 Jun 2013 17:35:12 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Jun 2013 17:35:12 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Jun 2013 17:35:10 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 702342388847; Wed, 5 Jun 2013 17:34:51 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1489978 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/region/ activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/ activemq-unit-tests/src/test/java/org/apache/activemq/bugs/ Date: Wed, 05 Jun 2013 17:34:51 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130605173451.702342388847@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tabish Date: Wed Jun 5 17:34:50 2013 New Revision: 1489978 URL: http://svn.apache.org/r1489978 Log: fix for: https://issues.apache.org/jira/browse/AMQ-4487 and https://issues.apache.org/jira/browse/AMQ-4372 Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java?rev=1489978&r1=1489977&r2=1489978&view=diff ============================================================================== --- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java (original) +++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java Wed Jun 5 17:34:50 2013 @@ -19,12 +19,11 @@ package org.apache.activemq.broker.regio import java.io.IOException; import java.util.ArrayList; import java.util.List; -import javax.jms.InvalidSelectorException; + import javax.jms.JMSException; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.MessageAck; import org.apache.activemq.filter.MessageEvaluationContext; @@ -36,19 +35,20 @@ public class QueueBrowserSubscription ex boolean browseDone; boolean destinationsAdded; - public QueueBrowserSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) - throws JMSException { - super(broker,usageManager, context, info); + public QueueBrowserSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws JMSException { + super(broker, usageManager, context, info); } + @Override protected boolean canDispatch(MessageReference node) { - return !((QueueMessageReference)node).isAcked(); + return !((QueueMessageReference) node).isAcked(); } + @Override public synchronized String toString() { - return "QueueBrowserSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" - + destinations.size() + ", dispatched=" + dispatched.size() + ", delivered=" - + this.prefetchExtension + ", pending=" + getPendingQueueSize(); + return "QueueBrowserSubscription:" + " consumer=" + info.getConsumerId() + + ", destinations=" + destinations.size() + ", dispatched=" + dispatched.size() + + ", delivered=" + this.prefetchExtension + ", pending=" + getPendingQueueSize(); } synchronized public void destinationsAdded() throws Exception { @@ -57,12 +57,13 @@ public class QueueBrowserSubscription ex } private void checkDone() throws Exception { - if( !browseDone && queueRefs == 0 && destinationsAdded) { - browseDone=true; + if (!browseDone && queueRefs == 0 && destinationsAdded) { + browseDone = true; add(QueueMessageReference.NULL_MESSAGE); } } + @Override public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException { return !browseDone && super.matches(node, context); } @@ -70,15 +71,15 @@ public class QueueBrowserSubscription ex /** * Since we are a browser we don't really remove the message from the queue. */ - protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference n) - throws IOException { - if (info.isNetworkSubscription()) { - super.acknowledge(context, ack, n); - } + @Override + protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException { + if (info.isNetworkSubscription()) { + super.acknowledge(context, ack, n); + } } synchronized public void incrementQueueRef() { - queueRefs++; + queueRefs++; } synchronized public void decrementQueueRef() throws Exception { @@ -88,7 +89,6 @@ public class QueueBrowserSubscription ex checkDone(); } - @Override public List remove(ConnectionContext context, Destination destination) throws Exception { super.remove(context, destination); Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=1489978&r1=1489977&r2=1489978&view=diff ============================================================================== --- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original) +++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Wed Jun 5 17:34:50 2013 @@ -238,7 +238,13 @@ public class PolicyEntry extends Destina configurePrefetch(sub); sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); sub.setUsePrefetchExtension(isUsePrefetchExtension()); - sub.setMaxProducersToAudit(getMaxProducersToAudit()); + + // TODO + // We currently need an infinite audit because of the way that browser dispatch + // is done. We should refactor the browsers to better handle message dispatch so + // we can remove this and perform a more efficient dispatch. + sub.setMaxProducersToAudit(Integer.MAX_VALUE); + sub.setMaxAuditDepth(Integer.MAX_VALUE); } public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub) { Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java?rev=1489978&r1=1489977&r2=1489978&view=diff ============================================================================== --- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java (original) +++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java Wed Jun 5 17:34:50 2013 @@ -19,7 +19,6 @@ package org.apache.activemq.bugs; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import java.net.URI; import java.util.Enumeration; import javax.jms.Connection; @@ -31,7 +30,6 @@ import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.junit.After; @@ -46,14 +44,14 @@ public class AMQ4487Test { private final String destinationName = "TEST.QUEUE"; private BrokerService broker; - private URI connectUri; private ActiveMQConnectionFactory factory; @Before public void startBroker() throws Exception { broker = new BrokerService(); - TransportConnector connector = broker.addConnector("tcp://0.0.0.0:0"); broker.deleteAllMessages(); + broker.setUseJmx(false); + broker.setAdvisorySupport(false); PolicyEntry policy = new PolicyEntry(); policy.setQueue(">"); @@ -64,8 +62,7 @@ public class AMQ4487Test { broker.start(); broker.waitUntilStarted(); - connectUri = connector.getConnectUri(); - factory = new ActiveMQConnectionFactory(connectUri); + factory = new ActiveMQConnectionFactory("vm://localhost"); } @After @@ -101,7 +98,7 @@ public class AMQ4487Test { @Test public void testBrowsingWithMoreThanMaxAuditDepth() throws Exception { - doTestBrowsing(76); + doTestBrowsing(300); } @SuppressWarnings("rawtypes") @@ -124,7 +121,6 @@ public class AMQ4487Test { if (LOG.isDebugEnabled()) { LOG.debug("Browsed Message: {}", m.getJMSMessageID()); } - LOG.info("Browsed Message: {}", m.getJMSMessageID()); received++; if (received > messagesToSend) {