Return-Path: Delivered-To: apmail-geronimo-activemq-commits-archive@www.apache.org Received: (qmail 28133 invoked from network); 10 Aug 2006 18:49:04 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 10 Aug 2006 18:49:04 -0000 Received: (qmail 39434 invoked by uid 500); 10 Aug 2006 18:48:57 -0000 Delivered-To: apmail-geronimo-activemq-commits-archive@geronimo.apache.org Received: (qmail 38937 invoked by uid 500); 10 Aug 2006 18:48:55 -0000 Mailing-List: contact activemq-commits-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-dev@geronimo.apache.org Delivered-To: mailing list activemq-commits@geronimo.apache.org Received: (qmail 38922 invoked by uid 99); 10 Aug 2006 18:48:55 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 Aug 2006 11:48:55 -0700 X-ASF-Spam-Status: No, hits=-8.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: local policy) Received: from [140.211.166.113] (HELO eris.apache.org) (140.211.166.113) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 Aug 2006 11:48:52 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id ACCA41A981A; Thu, 10 Aug 2006 11:48:26 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r430476 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/command/ test/java/org/apache/activemq/ Date: Thu, 10 Aug 2006 18:48:25 -0000 To: activemq-commits@geronimo.apache.org From: jstrachan@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20060810184826.ACCA41A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: jstrachan Date: Thu Aug 10 11:48:24 2006 New Revision: 430476 URL: http://svn.apache.org/viewvc?rev=430476&view=rev Log: added the client side and a test case to show prefetch of zero working for pull-only queues with synchronous consumers to fix AMQ-855 Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java (with props) Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageDispatchChannel.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessagePull.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=430476&r1=430475&r2=430476&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Thu Aug 10 11:48:24 2006 @@ -34,6 +34,7 @@ import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessagePull; import org.apache.activemq.management.JMSConsumerStatsImpl; import org.apache.activemq.management.StatsCapable; import org.apache.activemq.management.StatsImpl; @@ -323,6 +324,9 @@ */ public void setMessageListener(MessageListener listener) throws JMSException { checkClosed(); + if (info.getPrefetchSize() == 0) { + throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1"); + } this.messageListener = listener; if (listener != null) { boolean wasRunning = session.isRunning(); @@ -411,6 +415,7 @@ * this message consumer is concurrently closed */ public Message receive() throws JMSException { + sendPullCommand(); checkClosed(); checkMessageListener(); MessageDispatch md = dequeue(-1); @@ -455,6 +460,7 @@ * closed */ public Message receive(long timeout) throws JMSException { + sendPullCommand(); checkClosed(); checkMessageListener(); if (timeout == 0) { @@ -584,6 +590,19 @@ protected void checkClosed() throws IllegalStateException { if (unconsumedMessages.isClosed()) { throw new IllegalStateException("The Consumer is closed"); + } + } + + /** + * If we have a zero prefetch specified then send a pull command to the broker to pull a message + * we are about to receive + * + */ + protected void sendPullCommand() throws JMSException { + if (info.getPrefetchSize() == 0) { + MessagePull messagePull = new MessagePull(); + messagePull.configure(info); + session.asyncSendPacket(messagePull); } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageDispatchChannel.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageDispatchChannel.java?rev=430476&r1=430475&r2=430476&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageDispatchChannel.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageDispatchChannel.java Thu Aug 10 11:48:24 2006 @@ -163,6 +163,8 @@ } public String toString() { - return list.toString(); + synchronized(mutex) { + return list.toString(); + } } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessagePull.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessagePull.java?rev=430476&r1=430475&r2=430476&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessagePull.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessagePull.java Thu Aug 10 11:48:24 2006 @@ -43,6 +43,14 @@ } /** + * Configures a message pull from the consumer information + */ + public void configure(ConsumerInfo info) { + setConsumerId(info.getConsumerId()); + setDestination(info.getDestination()); + } + + /** * @openwire:property version=1 cache=true */ public ConsumerId getConsumerId() { Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java?rev=430476&view=auto ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java (added) +++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java Thu Aug 10 11:48:24 2006 @@ -0,0 +1,88 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.spring.SpringConsumer; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +/** + * + * @version $Revision$ + */ +public class ZeroPrefetchConsumerTest extends TestSupport { + + private static final Log log = LogFactory.getLog(ZeroPrefetchConsumerTest.class); + + protected Connection connection; + protected Queue queue; + + public void testCannotUseMessageListener() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(queue); + + MessageListener listener = new SpringConsumer(); + try { + consumer.setMessageListener(listener); + fail("Should have thrown JMSException as we cannot use MessageListener with zero prefetch"); + } + catch (JMSException e) { + log.info("Received expected exception : " + e); + } + } + + public void testPullConsumerWorks() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello World!")); + + // now lets receive it + MessageConsumer consumer = session.createConsumer(queue); + Message answer = consumer.receive(5000); + assertNotNull("Should have received a message!", answer); + } + + protected void setUp() throws Exception { + topic = false; + super.setUp(); + + connection = createConnection(); + connection.start(); + queue = createQueue(); + } + + protected void tearDown() throws Exception { + connection.close(); + super.tearDown(); + } + + protected Queue createQueue() { + return new ActiveMQQueue(getClass().getName() + "." + getName() + "?consumer.prefetchSize=0"); + } + +} Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java ------------------------------------------------------------------------------ svn:mime-type = text/plain