Return-Path: Delivered-To: apmail-activemq-dev-archive@www.apache.org Received: (qmail 64463 invoked from network); 23 Jul 2010 15:40:21 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 23 Jul 2010 15:40:21 -0000 Received: (qmail 76755 invoked by uid 500); 23 Jul 2010 15:40:21 -0000 Delivered-To: apmail-activemq-dev-archive@activemq.apache.org Received: (qmail 76027 invoked by uid 500); 23 Jul 2010 15:40:20 -0000 Mailing-List: contact dev-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list dev@activemq.apache.org Received: (qmail 76019 invoked by uid 99); 23 Jul 2010 15:40:20 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Jul 2010 15:40:20 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.22] (HELO thor.apache.org) (140.211.11.22) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Jul 2010 15:40:16 +0000 Received: from thor (localhost [127.0.0.1]) by thor.apache.org (8.13.8+Sun/8.13.8) with ESMTP id o6NFdtRM016549 for ; Fri, 23 Jul 2010 15:39:55 GMT Message-ID: <9816574.50791279899595067.JavaMail.jira@thor> Date: Fri, 23 Jul 2010 11:39:55 -0400 (EDT) From: "Gary Tully (JIRA)" To: dev@activemq.apache.org Subject: [jira] Resolved: (AMQ-2580) Durable subscribers receives nothing when reconnecting with a prefetch size less than the number of messages that don't match a message selector In-Reply-To: <58557183.8801264171663473.JavaMail.jira@brutus.apache.org> MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: ae95407df07c98740808b2ef9da0087c X-Virus-Checked: Checked by ClamAV on apache.org [ https://issues.apache.org/activemq/browse/AMQ-2580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Tully resolved AMQ-2580. ----------------------------- Resolution: Fixed resolved in 967134 - patch was great, test case was great, thanks. completed the impl for JDBC and kahaDB > Durable subscribers receives nothing when reconnecting with a prefetch size less than the number of messages that don't match a message selector > ------------------------------------------------------------------------------------------------------------------------------------------------ > > Key: AMQ-2580 > URL: https://issues.apache.org/activemq/browse/AMQ-2580 > Project: ActiveMQ > Issue Type: Bug > Components: Selector > Affects Versions: 5.3.0 > Environment: phillip:~ henryp$ uname -a > Darwin phillip.fritz.box 9.8.0 Darwin Kernel Version 9.8.0: Wed Jul 15 16:55:01 PDT 2009; root:xnu-1228.15.4~1/RELEASE_I386 i386 i386 > phillip:~ henryp$ java -version > java version "1.5.0_20" > Java(TM) 2 Runtime Environment, Standard Edition (build 1.5.0_20-b02-315) > Java HotSpot(TM) Client VM (build 1.5.0_20-141, mixed mode, sharing) > Reporter: Phillip Henry > Assignee: Gary Tully > Fix For: 5.4.0 > > Attachments: selector_patch_and_test.zip > > > 1. Create a connection factory with a message prefetch size of PREFETCH_SIZE. > 2. Create a durable subscriber to a Topic with a message selector of "a=X". > 3. Disconnect. > 4. More than PREFETCH_SIZE messages are then put onto the Topic with a string property "a=Y". > 5. Just one message is put onto the Topic with string property "a=X". > 6. The durable subscriber connects again but it does not get the message with string property "a=X". In fact, it gets nothing. > It appears that upon reconnecting, the message selector is not respected when retrieving the message from storage. > I've got a unit test to demonstrate this plus a proposed fix. > {code} > ### Eclipse Workspace Patch 1.0 > #P activemq > Index: activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java > =================================================================== > --- activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (revision 900353) > +++ activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (working copy) > @@ -306,7 +306,7 @@ > count++; > container.setBatchEntry(msg.getMessageId(), entry); > } else { > - break; > + //break; > } > } else { > container.reset(); > Index: activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java > =================================================================== > --- activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java (revision 900353) > +++ activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java (working copy) > @@ -46,10 +46,11 @@ > > public boolean recoverMessage(Message message) throws Exception { > if (listener.hasSpace()) { > - listener.recoverMessage(message); > - lastRecovered = message.getMessageId(); > - count++; > - return true; > + if (listener.recoverMessage(message)) { > + lastRecovered = message.getMessageId(); > + count++; > + return true; > + } > } > return false; > } > Index: activemq-pool/src/test/java/org/apache/activemq/pool/PrefetchTest.java > =================================================================== > --- activemq-pool/src/test/java/org/apache/activemq/pool/PrefetchTest.java (revision 0) > +++ activemq-pool/src/test/java/org/apache/activemq/pool/PrefetchTest.java (revision 0) > @@ -0,0 +1,174 @@ > +package org.apache.activemq.pool; > + > +import java.io.File; > + > +import javax.jms.JMSException; > +import javax.jms.MessageConsumer; > +import javax.jms.MessageProducer; > +import javax.jms.Session; > +import javax.jms.TextMessage; > +import javax.jms.Topic; > +import javax.jms.TopicConnection; > +import javax.jms.TopicSession; > + > +import junit.framework.TestCase; > + > +import org.apache.activemq.ActiveMQConnectionFactory; > +import org.apache.activemq.ActiveMQPrefetchPolicy; > +import org.apache.activemq.broker.BrokerService; > +import org.apache.activemq.store.PersistenceAdapter; > + > +public class PrefetchTest extends TestCase { > + > + private static final String TOPIC_NAME = "topicName"; > + private static final String CLIENT_ID = "client_id"; > + private static final String textOfSelectedMsg = "good_message"; > + > + protected TopicConnection connection; > + > + private Topic topic; > + private Session session; > + private MessageProducer producer; > + private PooledConnectionFactory connectionFactory; > + private TopicConnection topicConnection; > + private String bindAddress; > + private BrokerService service; > + > + protected void setUp() throws Exception { > + bindAddress = "tcp://localhost:61616"; > + super.setUp(); > + initDurableBroker(); > + initConnectionFactory(); > + initTopic(); > + > + } > + > + protected void tearDown() throws Exception { > + shutdownClient(); > + connectionFactory.stop(); > + service.stop(); > + super.tearDown(); > + } > + > + private void initConnection() throws JMSException { > + System.out.println("Initializing connection"); > + connection = (TopicConnection) connectionFactory.createConnection(); > + connection.start(); > + } > + > + public void testTopicIsDurableSmokeTest() throws Exception { > + > + initClient(); > + MessageConsumer consumer = createMessageConsumer(); > + System.out.println("Consuming message"); > + assertNull(consumer.receive(1)); > + shutdownClient(); > + consumer.close(); > + > + sendMessages(); > + shutdownClient(); > + > + initClient(); > + consumer = createMessageConsumer(); > + > + System.out.println("Consuming message"); > + TextMessage answer1 = (TextMessage)consumer.receive(1000); > + assertNotNull(answer1); > + > + consumer.close(); > + } > + > + private MessageConsumer createMessageConsumer() throws JMSException { > + System.out.println("creating durable subscriber"); > + return session.createDurableSubscriber(topic, > + TOPIC_NAME, > + "name='value'", > + false); > + } > + > + private void initClient() throws JMSException { > + System.out.println("Initializing client"); > + > + initConnection(); > + initSession(); > + } > + > + private void shutdownClient() > + throws JMSException { > + System.out.println("Closing session and connection"); > + session.close(); > + connection.close(); > + session = null; > + connection = null; > + } > + > + private void sendMessages() > + throws JMSException { > + initConnection(); > + > + initSession(); > + > + System.out.println("Creating producer"); > + producer = session.createProducer(topic); > + > + sendMessageThatFailsSelection(); > + > + sendMessage(textOfSelectedMsg, "value"); > + } > + > + private void initSession() throws JMSException { > + System.out.println("Initializing session"); > + session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); > + } > + > + private void sendMessageThatFailsSelection() throws JMSException { > + for (int i = 0 ; i < 5 ; i++) { > + String textOfNotSelectedMsg = "Msg_" + i; > + sendMessage(textOfNotSelectedMsg, "not_value"); > + System.out.println("#"); > + } > + } > + > + private void sendMessage( > + String msgText, > + String propertyValue) throws JMSException { > + System.out.println("Creating message: " + msgText); > + TextMessage messageToSelect = session.createTextMessage(msgText); > + messageToSelect.setStringProperty("name", propertyValue); > + System.out.println("Sending message"); > + producer.send(messageToSelect); > + } > + > + protected void initConnectionFactory() { > + ActiveMQConnectionFactory activeMqConnectionFactory = createActiveMqConnectionFactory(); > + connectionFactory = new PooledConnectionFactory(activeMqConnectionFactory); > + } > + > + > + private ActiveMQConnectionFactory createActiveMqConnectionFactory() { > + ActiveMQConnectionFactory activeMqConnectionFactory = new ActiveMQConnectionFactory(); > + ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); > + prefetchPolicy.setDurableTopicPrefetch(2); > + activeMqConnectionFactory.setPrefetchPolicy(prefetchPolicy ); > + activeMqConnectionFactory.setClientID(CLIENT_ID); > + return activeMqConnectionFactory; > + } > + > + private void initDurableBroker() throws Exception { > + service = new BrokerService(); > + PersistenceAdapter persistenceAdaptor = service.getPersistenceAdapter(); > + File file = new File("phills_durable_dir"); > + persistenceAdaptor.setDirectory(file); > + service.setTransportConnectorURIs(new String[] { bindAddress } ); > + service.setPersistent(true); > + service.setUseJmx(true); > + service.start(); > + > + } > + > + private void initTopic() throws JMSException { > + topicConnection = (TopicConnection) connectionFactory.createConnection(); > + TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); > + topic = topicSession.createTopic(TOPIC_NAME); > + } > +} > {code} -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.