Return-Path: Delivered-To: apmail-activemq-dev-archive@www.apache.org Received: (qmail 18714 invoked from network); 22 Jan 2010 14:48:08 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 22 Jan 2010 14:48:08 -0000 Received: (qmail 93322 invoked by uid 500); 22 Jan 2010 14:48:07 -0000 Delivered-To: apmail-activemq-dev-archive@activemq.apache.org Received: (qmail 93287 invoked by uid 500); 22 Jan 2010 14:48:07 -0000 Mailing-List: contact dev-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list dev@activemq.apache.org Received: (qmail 93277 invoked by uid 99); 22 Jan 2010 14:48:07 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Jan 2010 14:48:07 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED,FB_WORD1_END_DOLLAR X-Spam-Check-By: apache.org Received: from [140.211.11.140] (HELO brutus.apache.org) (140.211.11.140) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Jan 2010 14:48:04 +0000 Received: from brutus.apache.org (localhost [127.0.0.1]) by brutus.apache.org (Postfix) with ESMTP id 751FA29A0011 for ; Fri, 22 Jan 2010 06:47:43 -0800 (PST) Message-ID: <58557183.8801264171663473.JavaMail.jira@brutus.apache.org> Date: Fri, 22 Jan 2010 14:47:43 +0000 (UTC) From: "Phillip Henry (JIRA)" To: dev@activemq.apache.org Subject: [jira] Created: (AMQ-2580) Durable subscribers receives nothing when reconnecting with a prefetch size less than the number of messages that don't match a message selector MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: ae95407df07c98740808b2ef9da0087c X-Virus-Checked: Checked by ClamAV on apache.org Durable subscribers receives nothing when reconnecting with a prefetch size less than the number of messages that don't match a message selector ------------------------------------------------------------------------------------------------------------------------------------------------ Key: AMQ-2580 URL: https://issues.apache.org/activemq/browse/AMQ-2580 Project: ActiveMQ Issue Type: Bug Components: Message Store Affects Versions: 5.3.0 Environment: phillip:~ henryp$ uname -a Darwin phillip.fritz.box 9.8.0 Darwin Kernel Version 9.8.0: Wed Jul 15 16:55:01 PDT 2009; root:xnu-1228.15.4~1/RELEASE_I386 i386 i386 phillip:~ henryp$ java -version java version "1.5.0_20" Java(TM) 2 Runtime Environment, Standard Edition (build 1.5.0_20-b02-315) Java HotSpot(TM) Client VM (build 1.5.0_20-141, mixed mode, sharing) Reporter: Phillip Henry 1. Create a connection factory with a message prefetch size of PREFETCH_SIZE. 2. Create a durable subscriber to a Topic with a message selector of "a=X". 3. Disconnect. 4. More than PREFETCH_SIZE messages are then put onto the Topic with a string property "a=Y". 5. Just one message is put onto the Topic with string property "a=X". 6. The durable subscriber connects again but it does not get the message with string property "a=X". In fact, it gets nothing. It appears that upon reconnecting, the message selector is not respected when retrieving the message from storage. I've got a unit test to demonstrate this plus a proposed fix. {code} ### Eclipse Workspace Patch 1.0 #P activemq Index: activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java =================================================================== --- activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (revision 900353) +++ activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (working copy) @@ -306,7 +306,7 @@ count++; container.setBatchEntry(msg.getMessageId(), entry); } else { - break; + //break; } } else { container.reset(); Index: activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java =================================================================== --- activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java (revision 900353) +++ activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java (working copy) @@ -46,10 +46,11 @@ public boolean recoverMessage(Message message) throws Exception { if (listener.hasSpace()) { - listener.recoverMessage(message); - lastRecovered = message.getMessageId(); - count++; - return true; + if (listener.recoverMessage(message)) { + lastRecovered = message.getMessageId(); + count++; + return true; + } } return false; } Index: activemq-pool/src/test/java/org/apache/activemq/pool/PrefetchTest.java =================================================================== --- activemq-pool/src/test/java/org/apache/activemq/pool/PrefetchTest.java (revision 0) +++ activemq-pool/src/test/java/org/apache/activemq/pool/PrefetchTest.java (revision 0) @@ -0,0 +1,174 @@ +package org.apache.activemq.pool; + +import java.io.File; + +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicSession; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.PersistenceAdapter; + +public class PrefetchTest extends TestCase { + + private static final String TOPIC_NAME = "topicName"; + private static final String CLIENT_ID = "client_id"; + private static final String textOfSelectedMsg = "good_message"; + + protected TopicConnection connection; + + private Topic topic; + private Session session; + private MessageProducer producer; + private PooledConnectionFactory connectionFactory; + private TopicConnection topicConnection; + private String bindAddress; + private BrokerService service; + + protected void setUp() throws Exception { + bindAddress = "tcp://localhost:61616"; + super.setUp(); + initDurableBroker(); + initConnectionFactory(); + initTopic(); + + } + + protected void tearDown() throws Exception { + shutdownClient(); + connectionFactory.stop(); + service.stop(); + super.tearDown(); + } + + private void initConnection() throws JMSException { + System.out.println("Initializing connection"); + connection = (TopicConnection) connectionFactory.createConnection(); + connection.start(); + } + + public void testTopicIsDurableSmokeTest() throws Exception { + + initClient(); + MessageConsumer consumer = createMessageConsumer(); + System.out.println("Consuming message"); + assertNull(consumer.receive(1)); + shutdownClient(); + consumer.close(); + + sendMessages(); + shutdownClient(); + + initClient(); + consumer = createMessageConsumer(); + + System.out.println("Consuming message"); + TextMessage answer1 = (TextMessage)consumer.receive(1000); + assertNotNull(answer1); + + consumer.close(); + } + + private MessageConsumer createMessageConsumer() throws JMSException { + System.out.println("creating durable subscriber"); + return session.createDurableSubscriber(topic, + TOPIC_NAME, + "name='value'", + false); + } + + private void initClient() throws JMSException { + System.out.println("Initializing client"); + + initConnection(); + initSession(); + } + + private void shutdownClient() + throws JMSException { + System.out.println("Closing session and connection"); + session.close(); + connection.close(); + session = null; + connection = null; + } + + private void sendMessages() + throws JMSException { + initConnection(); + + initSession(); + + System.out.println("Creating producer"); + producer = session.createProducer(topic); + + sendMessageThatFailsSelection(); + + sendMessage(textOfSelectedMsg, "value"); + } + + private void initSession() throws JMSException { + System.out.println("Initializing session"); + session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + } + + private void sendMessageThatFailsSelection() throws JMSException { + for (int i = 0 ; i < 5 ; i++) { + String textOfNotSelectedMsg = "Msg_" + i; + sendMessage(textOfNotSelectedMsg, "not_value"); + System.out.println("#"); + } + } + + private void sendMessage( + String msgText, + String propertyValue) throws JMSException { + System.out.println("Creating message: " + msgText); + TextMessage messageToSelect = session.createTextMessage(msgText); + messageToSelect.setStringProperty("name", propertyValue); + System.out.println("Sending message"); + producer.send(messageToSelect); + } + + protected void initConnectionFactory() { + ActiveMQConnectionFactory activeMqConnectionFactory = createActiveMqConnectionFactory(); + connectionFactory = new PooledConnectionFactory(activeMqConnectionFactory); + } + + + private ActiveMQConnectionFactory createActiveMqConnectionFactory() { + ActiveMQConnectionFactory activeMqConnectionFactory = new ActiveMQConnectionFactory(); + ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); + prefetchPolicy.setDurableTopicPrefetch(2); + activeMqConnectionFactory.setPrefetchPolicy(prefetchPolicy ); + activeMqConnectionFactory.setClientID(CLIENT_ID); + return activeMqConnectionFactory; + } + + private void initDurableBroker() throws Exception { + service = new BrokerService(); + PersistenceAdapter persistenceAdaptor = service.getPersistenceAdapter(); + File file = new File("phills_durable_dir"); + persistenceAdaptor.setDirectory(file); + service.setTransportConnectorURIs(new String[] { bindAddress } ); + service.setPersistent(true); + service.setUseJmx(true); + service.start(); + + } + + private void initTopic() throws JMSException { + topicConnection = (TopicConnection) connectionFactory.createConnection(); + TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + topic = topicSession.createTopic(TOPIC_NAME); + } +} {code} -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.