Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 46054 invoked from network); 20 Apr 2010 15:14:02 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 20 Apr 2010 15:14:02 -0000 Received: (qmail 72646 invoked by uid 500); 20 Apr 2010 15:14:02 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 72585 invoked by uid 500); 20 Apr 2010 15:14:02 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 72578 invoked by uid 99); 20 Apr 2010 15:14:02 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Apr 2010 15:14:02 +0000 X-ASF-Spam-Status: No, hits=-1112.3 required=10.0 tests=ALL_TRUSTED,AWL X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Apr 2010 15:14:00 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 91D2623888CD; Tue, 20 Apr 2010 15:13:18 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r935954 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/policy/ test/java/org/apache/activemq/ Date: Tue, 20 Apr 2010 15:13:18 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100420151318.91D2623888CD@eris.apache.org> Author: gtully Date: Tue Apr 20 15:13:18 2010 New Revision: 935954 URL: http://svn.apache.org/viewvc?rev=935954&view=rev Log: resolve https://issues.apache.org/activemq/browse/AMQ-2651 - modified patch applied with thanks, did not change the default as there are a bunch of tests and user applications that depend on the current default Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java (with props) Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=935954&r1=935953&r2=935954&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Tue Apr 20 15:13:18 2010 @@ -60,6 +60,7 @@ public abstract class PrefetchSubscripti protected PendingMessageCursor pending; protected final List dispatched = new CopyOnWriteArrayList(); protected int prefetchExtension; + protected boolean usePrefetchExtension = true; protected long enqueueCounter; protected long dispatchCounter; protected long dequeueCounter; @@ -257,7 +258,7 @@ public abstract class PrefetchSubscripti // contract prefetch if dispatch required a pull if (getPrefetchSize() == 0) { prefetchExtension = Math.max(0, prefetchExtension - index); - } else if (context.isInTransaction()) { + } else if (usePrefetchExtension && context.isInTransaction()) { // extend prefetch window only if not a pulling consumer prefetchExtension = Math.max(prefetchExtension, index); } @@ -307,7 +308,9 @@ public abstract class PrefetchSubscripti node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); } if (ack.getLastMessageId().equals(node.getMessageId())) { - prefetchExtension = Math.max(prefetchExtension, index + 1); + if (usePrefetchExtension) { + prefetchExtension = Math.max(prefetchExtension, index + 1); + } destination = node.getRegionDestination(); callDispatchMatched = true; break; @@ -746,4 +749,12 @@ public abstract class PrefetchSubscripti public void setMaxAuditDepth(int maxAuditDepth) { this.maxAuditDepth = maxAuditDepth; } + + public boolean isUsePrefetchExtension() { + return usePrefetchExtension; + } + + public void setUsePrefetchExtension(boolean usePrefetchExtension) { + this.usePrefetchExtension = usePrefetchExtension; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=935954&r1=935953&r2=935954&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Tue Apr 20 15:13:18 2010 @@ -83,7 +83,8 @@ public class PolicyEntry extends Destina private int queueBrowserPrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH; private int topicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH; private int durableTopicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH; - private int cursorMemoryHighWaterMark=70; + private boolean usePrefetchExtension = true; + private int cursorMemoryHighWaterMark = 70; private int storeUsageHighWaterMark = 100; @@ -195,7 +196,7 @@ public class PolicyEntry extends Destina } sub.setMaxAuditDepth(getMaxAuditDepth()); sub.setMaxProducersToAudit(getMaxProducersToAudit()); - + sub.setUsePrefetchExtension(isUsePrefetchExtension()); } public void configure(Broker broker, SystemUsage memoryManager, QueueBrowserSubscription sub) { @@ -207,6 +208,7 @@ public class PolicyEntry extends Destina sub.setPrefetchSize(getQueueBrowserPrefetch()); } sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); + sub.setUsePrefetchExtension(isUsePrefetchExtension()); } public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub) { @@ -218,6 +220,7 @@ public class PolicyEntry extends Destina sub.setPrefetchSize(getQueuePrefetch()); } sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); + sub.setUsePrefetchExtension(isUsePrefetchExtension()); } // Properties @@ -692,12 +695,20 @@ public class PolicyEntry extends Destina this.durableTopicPrefetch = durableTopicPrefetch; } + public boolean isUsePrefetchExtension() { + return this.usePrefetchExtension; + } + + public void setUsePrefetchExtension(boolean usePrefetchExtension) { + this.usePrefetchExtension = usePrefetchExtension; + } + public int getCursorMemoryHighWaterMark() { - return this.cursorMemoryHighWaterMark; - } + return this.cursorMemoryHighWaterMark; + } - public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { - this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark; + public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { + this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark; } public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) { Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java?rev=935954&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java Tue Apr 20 15:13:18 2010 @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ConnectionConsumer; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.ServerSession; +import javax.jms.ServerSessionPool; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +// see: https://issues.apache.org/activemq/browse/AMQ-2651 +public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport { + private static final Log LOG = LogFactory.getLog(OnePrefetchAsyncConsumerTest.class); + + private TestMutex testMutex; + protected Connection connection; + protected ConnectionConsumer connectionConsumer; + protected Queue queue; + protected CountDownLatch messageTwoDelay = new CountDownLatch(1); + + public void testPrefetchExtension() throws Exception { + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(queue); + + // when Msg1 is acked, the PrefetchSubscription will (incorrectly?) increment its prefetchExtension + producer.send(session.createTextMessage("Msg1")); + + // Msg2 will exhaust the ServerSessionPool (since it only has 1 ServerSession) + producer.send(session.createTextMessage("Msg2")); + + // Msg3 will cause the test to fail as it will attempt to retrieve an additional ServerSession from + // an exhausted ServerSessionPool due to the (incorrectly?) incremented prefetchExtension in the PrefetchSubscription + producer.send(session.createTextMessage("Msg3")); + + session.commit(); + + // wait for test to complete and the test result to get set + // this happens asynchronously since the messages are delivered asynchronously + synchronized (testMutex) { + while (!testMutex.testCompleted) { + testMutex.wait(); + } + } + + //test completed, result is ready + assertTrue("Attempted to retrieve more than one ServerSession at a time", testMutex.testSuccessful); + } + + protected void setUp() throws Exception { + bindAddress = "tcp://localhost:61616"; + super.setUp(); + + testMutex = new TestMutex(); + connection = createConnection(); + queue = createQueue(); + // note the last arg of 1, this becomes the prefetchSize in PrefetchSubscription + connectionConsumer = connection.createConnectionConsumer( + queue, null, new TestServerSessionPool(connection), 1); + connection.start(); + } + + protected void tearDown() throws Exception { + connectionConsumer.close(); + connection.close(); + super.tearDown(); + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = super.createBroker(); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + // ensure prefetch is exact. only delivery next when current is acked + defaultEntry.setUsePrefetchExtension(false); + policyMap.setDefaultEntry(defaultEntry); + answer.setDestinationPolicy(policyMap); + return answer; + } + + protected Queue createQueue() { + return new ActiveMQQueue(getDestinationString()); + } + + // simulates a ServerSessionPool with only 1 ServerSession + private class TestServerSessionPool implements ServerSessionPool { + Connection connection; + TestServerSession serverSession; + boolean serverSessionInUse = false; + + public TestServerSessionPool(Connection connection) throws JMSException { + this.connection = connection; + serverSession = new TestServerSession(this); + } + + public ServerSession getServerSession() throws JMSException { + synchronized (this) { + if (serverSessionInUse) { + LOG.info("asked for session while in use, not serialised delivery"); + synchronized (testMutex) { + testMutex.testSuccessful = false; + testMutex.testCompleted = true; + } + } + serverSessionInUse = true; + return serverSession; + } + } + } + + private class TestServerSession implements ServerSession { + TestServerSessionPool pool; + Session session; + + public TestServerSession(TestServerSessionPool pool) throws JMSException { + this.pool = pool; + session = pool.connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + session.setMessageListener(new TestMessageListener()); + } + + public Session getSession() throws JMSException { + return session; + } + + public void start() throws JMSException { + // use a separate thread to process the message asynchronously + new Thread() { + public void run() { + // let the session deliver the message + session.run(); + + // commit the tx + try { + session.commit(); + } + catch (JMSException e) { + } + + // return ServerSession to pool + synchronized (pool) { + pool.serverSessionInUse = false; + } + + // let the test check if the test was completed + synchronized (testMutex) { + testMutex.notify(); + } + } + }.start(); + } + } + + private class TestMessageListener implements MessageListener { + public void onMessage(Message message) { + try { + String text = ((TextMessage)message).getText(); + LOG.info("got message: " + text); + if (text.equals("Msg3")) { + // if we get here, Exception in getServerSession() was not thrown, test is successful + // this obviously doesn't happen now, + // need to fix prefetchExtension computation logic in PrefetchSubscription to get here + synchronized (testMutex) { + if (!testMutex.testCompleted) { + testMutex.testSuccessful = true; + testMutex.testCompleted = true; + } + } + } + else if (text.equals("Msg2")) { + // simulate long message processing so that Msg3 comes when Msg2 is still being processed + // and thus the single ServerSession is in use + TimeUnit.SECONDS.sleep(4); + } + } + catch (JMSException e) { + } + catch (InterruptedException e) { + } + } + } + + private class TestMutex { + boolean testCompleted = false; + boolean testSuccessful = true; + } +} \ No newline at end of file Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date