activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Phillip Henry (JIRA)" <j...@apache.org>
Subject [jira] Created: (AMQ-2580) Durable subscribers receives nothing when reconnecting with a prefetch size less than the number of messages that don't match a message selector
Date Fri, 22 Jan 2010 14:47:43 GMT
Durable subscribers receives nothing when reconnecting with a prefetch size less than the number
of messages that don't match a message selector
------------------------------------------------------------------------------------------------------------------------------------------------

                 Key: AMQ-2580
                 URL: https://issues.apache.org/activemq/browse/AMQ-2580
             Project: ActiveMQ
          Issue Type: Bug
          Components: Message Store
    Affects Versions: 5.3.0
         Environment: phillip:~ henryp$ uname -a
Darwin phillip.fritz.box 9.8.0 Darwin Kernel Version 9.8.0: Wed Jul 15 16:55:01 PDT 2009;
root:xnu-1228.15.4~1/RELEASE_I386 i386 i386
phillip:~ henryp$ java -version
java version "1.5.0_20"
Java(TM) 2 Runtime Environment, Standard Edition (build 1.5.0_20-b02-315)
Java HotSpot(TM) Client VM (build 1.5.0_20-141, mixed mode, sharing)
            Reporter: Phillip Henry


1. Create a connection factory with a message prefetch size of PREFETCH_SIZE.
2. Create a durable subscriber to a Topic with a message selector of "a=X". 
3. Disconnect.
4. More than PREFETCH_SIZE messages are then put onto the Topic with a string property "a=Y".
5. Just one message is put onto the Topic with string property "a=X".
6. The durable subscriber connects again but it does not get the message with string property
"a=X". In fact, it gets nothing.

It appears that upon reconnecting, the message selector is not respected when retrieving the
message from storage.

I've got a unit test to demonstrate this plus a proposed fix.

{code}
### Eclipse Workspace Patch 1.0
#P activemq
Index: activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
===================================================================
--- activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
      (revision 900353)
+++ activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
      (working copy)
@@ -306,7 +306,7 @@
                                 count++;
                                 container.setBatchEntry(msg.getMessageId(), entry);
                             } else {
-                                break;
+                                //break;
                             }
                         } else {
                             container.reset();
Index: activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
===================================================================
--- activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
     (revision 900353)
+++ activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
     (working copy)
@@ -46,10 +46,11 @@
     
     public boolean recoverMessage(Message message) throws Exception {
         if (listener.hasSpace()) {
-            listener.recoverMessage(message);
-            lastRecovered = message.getMessageId();
-            count++;
-            return true;
+            if (listener.recoverMessage(message)) {
+                lastRecovered = message.getMessageId();
+                count++;
+                return true;
+            }
         }
         return false;
     }
Index: activemq-pool/src/test/java/org/apache/activemq/pool/PrefetchTest.java
===================================================================
--- activemq-pool/src/test/java/org/apache/activemq/pool/PrefetchTest.java      (revision
0)
+++ activemq-pool/src/test/java/org/apache/activemq/pool/PrefetchTest.java      (revision
0)
@@ -0,0 +1,174 @@
+package org.apache.activemq.pool;
+
+import java.io.File;
+
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.PersistenceAdapter;
+
+public class PrefetchTest extends TestCase {
+
+       private static final String TOPIC_NAME = "topicName";
+       private static final String CLIENT_ID = "client_id";
+       private static final String textOfSelectedMsg = "good_message";
+
+       protected TopicConnection connection;
+
+       private Topic topic;
+       private Session session;
+       private MessageProducer producer;
+       private PooledConnectionFactory connectionFactory;
+       private TopicConnection topicConnection;
+       private String bindAddress;
+       private BrokerService service;
+
+       protected void setUp() throws Exception {
+               bindAddress = "tcp://localhost:61616";
+               super.setUp();
+               initDurableBroker();
+               initConnectionFactory();
+               initTopic();
+
+       }
+
+       protected void tearDown() throws Exception {
+           shutdownClient();
+               connectionFactory.stop();
+               service.stop();
+               super.tearDown();
+       }
+
+       private void initConnection() throws JMSException {
+           System.out.println("Initializing connection");
+               connection = (TopicConnection) connectionFactory.createConnection(); 
+               connection.start();
+       }
+
+    public void testTopicIsDurableSmokeTest() throws Exception {
+        
+       initClient();
+       MessageConsumer consumer = createMessageConsumer();
+       System.out.println("Consuming message");
+       assertNull(consumer.receive(1));
+       shutdownClient();
+       consumer.close();
+    
+       sendMessages();
+       shutdownClient();
+    
+       initClient();
+       consumer = createMessageConsumer();
+    
+       System.out.println("Consuming message");
+       TextMessage answer1 = (TextMessage)consumer.receive(1000);
+       assertNotNull(answer1);
+    
+       consumer.close();
+    }
+    
+    private MessageConsumer createMessageConsumer() throws JMSException {
+        System.out.println("creating durable subscriber");
+       return session.createDurableSubscriber(topic, 
+               TOPIC_NAME, 
+               "name='value'", 
+               false);
+    }
+
+       private void initClient() throws JMSException {
+           System.out.println("Initializing client");
+           
+               initConnection();
+       initSession();
+       }
+
+       private void shutdownClient()
+                       throws JMSException {
+           System.out.println("Closing session and connection");
+        session.close();
+        connection.close();
+        session = null;
+        connection = null;
+       }
+
+       private void sendMessages()
+                       throws JMSException {
+           initConnection();
+
+               initSession();
+
+               System.out.println("Creating producer");
+               producer = session.createProducer(topic);
+
+               sendMessageThatFailsSelection();
+        
+               sendMessage(textOfSelectedMsg, "value");
+       }
+
+       private void initSession() throws JMSException {
+           System.out.println("Initializing session");
+               session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+       }
+
+       private void sendMessageThatFailsSelection() throws JMSException {
+               for (int i = 0 ; i < 5 ; i++) {
+                       String textOfNotSelectedMsg = "Msg_" + i;
+                       sendMessage(textOfNotSelectedMsg, "not_value");
+                       System.out.println("#");
+               }
+       }
+
+       private void sendMessage(
+                       String msgText,
+                       String propertyValue) throws JMSException {
+           System.out.println("Creating message: " + msgText);
+               TextMessage messageToSelect = session.createTextMessage(msgText);
+        messageToSelect.setStringProperty("name", propertyValue);
+        System.out.println("Sending message");
+               producer.send(messageToSelect);
+       }
+
+       protected void initConnectionFactory() {
+               ActiveMQConnectionFactory activeMqConnectionFactory = createActiveMqConnectionFactory();
+               connectionFactory = new PooledConnectionFactory(activeMqConnectionFactory);
+       }
+
+
+       private ActiveMQConnectionFactory createActiveMqConnectionFactory() {
+               ActiveMQConnectionFactory activeMqConnectionFactory = new ActiveMQConnectionFactory();
+        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+        prefetchPolicy.setDurableTopicPrefetch(2);
+               activeMqConnectionFactory.setPrefetchPolicy(prefetchPolicy );
+        activeMqConnectionFactory.setClientID(CLIENT_ID);
+               return activeMqConnectionFactory;
+       }
+
+       private void initDurableBroker() throws Exception {
+               service = new BrokerService();
+               PersistenceAdapter persistenceAdaptor = service.getPersistenceAdapter();
+               File file = new File("phills_durable_dir");
+               persistenceAdaptor.setDirectory(file);
+               service.setTransportConnectorURIs(new String[] { bindAddress } );
+               service.setPersistent(true);
+               service.setUseJmx(true);
+               service.start();
+
+       }
+
+       private void initTopic() throws JMSException {
+               topicConnection = (TopicConnection) connectionFactory.createConnection();
+               TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+               topic = topicSession.createTopic(TOPIC_NAME);
+       }
+}
{code}

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message