activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-5163
Date Mon, 28 Apr 2014 18:03:52 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk d60022ec6 -> a88e19e7c


https://issues.apache.org/jira/browse/AMQ-5163

Allow for durable topic consumers to use individual ack mode.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a88e19e7
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a88e19e7
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a88e19e7

Branch: refs/heads/trunk
Commit: a88e19e7cda1f21de119db2672d1727453f552d3
Parents: d60022e
Author: Timothy Bish <tabish121@gmai.com>
Authored: Mon Apr 28 14:03:43 2014 -0400
Committer: Timothy Bish <tabish121@gmai.com>
Committed: Mon Apr 28 14:03:43 2014 -0400

----------------------------------------------------------------------
 .../org/apache/activemq/ActiveMQSession.java    | 47 ++++++++++--
 .../apache/activemq/JMSIndividualAckTest.java   | 21 +-----
 .../DurableSubscriptionTestSupport.java         | 76 ++++++++++++++++----
 3 files changed, 107 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a88e19e7/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index 1dc197f..e327ef1 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -90,7 +90,6 @@ import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.util.Callback;
-import org.apache.activemq.util.JMSExceptionSupport;
 import org.apache.activemq.util.LongSequenceGenerator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -291,6 +290,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      *
      * @see org.apache.activemq.management.StatsCapable#getStats()
      */
+    @Override
     public StatsImpl getStats() {
         return stats;
     }
@@ -313,6 +313,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      * @throws JMSException if the JMS provider fails to create this message due
      *                 to some internal error.
      */
+    @Override
     public BytesMessage createBytesMessage() throws JMSException {
         ActiveMQBytesMessage message = new ActiveMQBytesMessage();
         configureMessage(message);
@@ -329,6 +330,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      * @throws JMSException if the JMS provider fails to create this message due
      *                 to some internal error.
      */
+    @Override
     public MapMessage createMapMessage() throws JMSException {
         ActiveMQMapMessage message = new ActiveMQMapMessage();
         configureMessage(message);
@@ -346,6 +348,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      * @throws JMSException if the JMS provider fails to create this message due
      *                 to some internal error.
      */
+    @Override
     public Message createMessage() throws JMSException {
         ActiveMQMessage message = new ActiveMQMessage();
         configureMessage(message);
@@ -361,6 +364,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      * @throws JMSException if the JMS provider fails to create this message due
      *                 to some internal error.
      */
+    @Override
     public ObjectMessage createObjectMessage() throws JMSException {
         ActiveMQObjectMessage message = new ActiveMQObjectMessage();
         configureMessage(message);
@@ -377,6 +381,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      * @throws JMSException if the JMS provider fails to create this message due
      *                 to some internal error.
      */
+    @Override
     public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
         ActiveMQObjectMessage message = new ActiveMQObjectMessage();
         configureMessage(message);
@@ -393,6 +398,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      * @throws JMSException if the JMS provider fails to create this message due
      *                 to some internal error.
      */
+    @Override
     public StreamMessage createStreamMessage() throws JMSException {
         ActiveMQStreamMessage message = new ActiveMQStreamMessage();
         configureMessage(message);
@@ -408,6 +414,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      * @throws JMSException if the JMS provider fails to create this message due
      *                 to some internal error.
      */
+    @Override
     public TextMessage createTextMessage() throws JMSException {
         ActiveMQTextMessage message = new ActiveMQTextMessage();
         configureMessage(message);
@@ -424,6 +431,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      * @throws JMSException if the JMS provider fails to create this message due
      *                 to some internal error.
      */
+    @Override
     public TextMessage createTextMessage(String text) throws JMSException {
         ActiveMQTextMessage message = new ActiveMQTextMessage();
         message.setText(text);
@@ -519,6 +527,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      * @return true if the session is in transacted mode
      * @throws JMSException if there is some internal error.
      */
+    @Override
     public boolean getTransacted() throws JMSException {
         checkClosed();
         return isTransacted();
@@ -536,6 +545,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      * @see javax.jms.Connection#createSession(boolean,int)
      * @since 1.1 exception JMSException if there is some internal error.
      */
+    @Override
     public int getAcknowledgeMode() throws JMSException {
         checkClosed();
         return this.acknowledgementMode;
@@ -552,6 +562,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      * @throws javax.jms.IllegalStateException if the method is not called by a
      *                 transacted session.
      */
+    @Override
     public void commit() throws JMSException {
         checkClosed();
         if (!getTransacted()) {
@@ -572,6 +583,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      * @throws javax.jms.IllegalStateException if the method is not called by a
      *                 transacted session.
      */
+    @Override
     public void rollback() throws JMSException {
         checkClosed();
         if (!getTransacted()) {
@@ -611,6 +623,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      * @throws JMSException if the JMS provider fails to close the session due
      *                 to some internal error.
      */
+    @Override
     public void close() throws JMSException {
         if (!closed) {
             if (getTransactionContext().isInXATransaction()) {
@@ -787,6 +800,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      * @throws IllegalStateException if the method is called by a transacted
      *                 session.
      */
+    @Override
     public void recover() throws JMSException {
 
         checkClosed();
@@ -811,6 +825,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      * @see javax.jms.ServerSessionPool
      * @see javax.jms.ServerSession
      */
+    @Override
     public MessageListener getMessageListener() throws JMSException {
         checkClosed();
         return this.messageListener;
@@ -837,6 +852,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      * @see javax.jms.ServerSessionPool
      * @see javax.jms.ServerSession
      */
+    @Override
     public void setMessageListener(MessageListener listener) throws JMSException {
         // only check for closed if we set a new listener, as we allow to clear
         // the listener, such as when an application is shutting down, and is
@@ -857,6 +873,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      *
      * @see javax.jms.ServerSession
      */
+    @Override
     public void run() {
         MessageDispatch messageDispatch;
         while ((messageDispatch = executor.dequeueNoWait()) != null) {
@@ -885,6 +902,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
 
             if (isClientAcknowledge()||isIndividualAcknowledge()) {
                 message.setAcknowledgeCallback(new Callback() {
+                    @Override
                     public void execute() throws Exception {
                     }
                 });
@@ -905,6 +923,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
                     getTransactionContext().addSynchronization(new Synchronization() {
 
                         final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE
? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get());
+                        @Override
                         public void beforeEnd() throws Exception {
                             // validate our consumer so we don't push stale acks that get
ignored
                             if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId()))
{
@@ -961,6 +980,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
                                 }
                                 connection.getScheduler().executeAfterDelay(new Runnable()
{
 
+                                    @Override
                                     public void run() {
                                         ((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
                                     }
@@ -1017,6 +1037,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      *                 specified.
      * @since 1.1
      */
+    @Override
     public MessageProducer createProducer(Destination destination) throws JMSException {
         checkClosed();
         if (destination instanceof CustomDestination) {
@@ -1041,6 +1062,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      *                 specified.
      * @since 1.1
      */
+    @Override
     public MessageConsumer createConsumer(Destination destination) throws JMSException {
         return createConsumer(destination, (String) null);
     }
@@ -1068,6 +1090,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      * @throws InvalidSelectorException if the message selector is invalid.
      * @since 1.1
      */
+    @Override
     public MessageConsumer createConsumer(Destination destination, String messageSelector)
throws JMSException {
         return createConsumer(destination, messageSelector, false);
     }
@@ -1155,6 +1178,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      * @throws InvalidSelectorException if the message selector is invalid.
      * @since 1.1
      */
+    @Override
     public MessageConsumer createConsumer(Destination destination, String messageSelector,
boolean noLocal) throws JMSException {
         return createConsumer(destination, messageSelector, noLocal, null);
     }
@@ -1236,6 +1260,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      *                 internal error.
      * @since 1.1
      */
+    @Override
     public Queue createQueue(String queueName) throws JMSException {
         checkClosed();
         if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
@@ -1264,6 +1289,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      *                 internal error.
      * @since 1.1
      */
+    @Override
     public Topic createTopic(String topicName) throws JMSException {
         checkClosed();
         if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
@@ -1315,6 +1341,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      * @throws InvalidDestinationException if an invalid topic is specified.
      * @since 1.1
      */
+    @Override
     public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
{
         checkClosed();
         return createDurableSubscriber(topic, name, null, false);
@@ -1360,6 +1387,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      * @throws InvalidSelectorException if the message selector is invalid.
      * @since 1.1
      */
+    @Override
     public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector,
boolean noLocal) throws JMSException {
         checkClosed();
 
@@ -1367,11 +1395,6 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
             throw new InvalidDestinationException("Topic cannot be null");
         }
 
-        if (isIndividualAcknowledge()) {
-            throw JMSExceptionSupport.create("Cannot create a durable consumer for a Session
in "+
-                                             "INDIVIDUAL_ACKNOWLEDGE mode.", null);
-        }
-
         if (topic instanceof CustomDestination) {
             CustomDestination customDestination = (CustomDestination)topic;
             return customDestination.createDurableSubscriber(this, name, messageSelector,
noLocal);
@@ -1397,6 +1420,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      *                 specified
      * @since 1.1
      */
+    @Override
     public QueueBrowser createBrowser(Queue queue) throws JMSException {
         checkClosed();
         return createBrowser(queue, null);
@@ -1419,6 +1443,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      * @throws InvalidSelectorException if the message selector is invalid.
      * @since 1.1
      */
+    @Override
     public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
{
         checkClosed();
         return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue),
messageSelector, asyncDispatch);
@@ -1433,6 +1458,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      *                 to some internal error.
      * @since 1.1
      */
+    @Override
     public TemporaryQueue createTemporaryQueue() throws JMSException {
         checkClosed();
         return (TemporaryQueue)connection.createTempDestination(false);
@@ -1447,6 +1473,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      *                 to some internal error.
      * @since 1.1
      */
+    @Override
     public TemporaryTopic createTemporaryTopic() throws JMSException {
         checkClosed();
         return (TemporaryTopic)connection.createTempDestination(true);
@@ -1463,6 +1490,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      * @throws JMSException
      * @throws InvalidDestinationException if an invalid queue is specified.
      */
+    @Override
     public QueueReceiver createReceiver(Queue queue) throws JMSException {
         checkClosed();
         return createReceiver(queue, null);
@@ -1483,6 +1511,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      * @throws InvalidDestinationException if an invalid queue is specified.
      * @throws InvalidSelectorException if the message selector is invalid.
      */
+    @Override
     public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
{
         checkClosed();
 
@@ -1507,6 +1536,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      *                 internal error.
      * @throws InvalidDestinationException if an invalid queue is specified.
      */
+    @Override
     public QueueSender createSender(Queue queue) throws JMSException {
         checkClosed();
         if (queue instanceof CustomDestination) {
@@ -1537,6 +1567,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      *                 some internal error.
      * @throws InvalidDestinationException if an invalid topic is specified.
      */
+    @Override
     public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
         checkClosed();
         return createSubscriber(topic, null, false);
@@ -1575,6 +1606,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      * @throws InvalidDestinationException if an invalid topic is specified.
      * @throws InvalidSelectorException if the message selector is invalid.
      */
+    @Override
     public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean
noLocal) throws JMSException {
         checkClosed();
 
@@ -1603,6 +1635,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      *                 some internal error.
      * @throws InvalidDestinationException if an invalid topic is specified.
      */
+    @Override
     public TopicPublisher createPublisher(Topic topic) throws JMSException {
         checkClosed();
 
@@ -1633,11 +1666,13 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
      *                 specified.
      * @since 1.1
      */
+    @Override
     public void unsubscribe(String name) throws JMSException {
         checkClosed();
         connection.unsubscribe(name);
     }
 
+    @Override
     public void dispatch(MessageDispatch messageDispatch) {
         try {
             executor.execute(messageDispatch);

http://git-wip-us.apache.org/repos/asf/activemq/blob/a88e19e7/activemq-unit-tests/src/test/java/org/apache/activemq/JMSIndividualAckTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSIndividualAckTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSIndividualAckTest.java
index 90a7dee..ec3c153 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSIndividualAckTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSIndividualAckTest.java
@@ -24,7 +24,6 @@ import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
-import javax.jms.Topic;
 
 /**
  *
@@ -33,6 +32,7 @@ public class JMSIndividualAckTest extends TestSupport {
 
     private Connection connection;
 
+    @Override
     protected void setUp() throws Exception {
         super.setUp();
         connection = createConnection();
@@ -41,6 +41,7 @@ public class JMSIndividualAckTest extends TestSupport {
     /**
      * @see junit.framework.TestCase#tearDown()
      */
+    @Override
     protected void tearDown() throws Exception {
         if (connection != null) {
             connection.close();
@@ -154,25 +155,7 @@ public class JMSIndividualAckTest extends TestSupport {
         session.close();
     }
 
-    /**
-     * Tests that a durable consumer cannot be created for Individual Ack mode.
-     *
-     * @throws JMSException
-     */
-    public void testCreateDurableConsumerFails() throws JMSException {
-        connection.start();
-        Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
-        Topic dest = session.createTopic(getName());
-
-        try {
-            session.createDurableSubscriber(dest, getName());
-            fail("Should not be able to create duable subscriber.");
-        } catch(Exception e) {
-        }
-    }
-
     protected String getQueueName() {
         return getClass().getName() + "." + getName();
     }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/a88e19e7/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java
index 8017ffe..b610f8c 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java
@@ -29,12 +29,13 @@ import javax.jms.Topic;
 import javax.jms.TopicSubscriber;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.TestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.store.PersistenceAdapter;
 
 /**
- * 
+ *
  */
 public abstract class DurableSubscriptionTestSupport extends TestSupport {
 
@@ -44,21 +45,25 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport
{
     private MessageProducer producer;
     private BrokerService broker;
 
+    @Override
     protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
         return new ActiveMQConnectionFactory("vm://durable-broker");
     }
 
+    @Override
     protected Connection createConnection() throws Exception {
         Connection rc = super.createConnection();
         rc.setClientID(getName());
         return rc;
     }
 
+    @Override
     protected void setUp() throws Exception {
         createBroker();
         super.setUp();
     }
 
+    @Override
     protected void tearDown() throws Exception {
         super.tearDown();
         destroyBroker();
@@ -104,7 +109,7 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport
{
     }
 
     protected abstract PersistenceAdapter createPersistenceAdapter() throws Exception;
-    
+
     public void testMessageExpire() throws Exception {
         session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
         Topic topic = session.createTopic("TestTopic");
@@ -117,12 +122,12 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport
{
         // Make sure it works when the durable sub is active.
         producer.send(session.createTextMessage("Msg:1"));
         assertTextMessageEquals("Msg:1", consumer.receive(1000));
-        
+
         consumer.close();
-        
+
         producer.send(session.createTextMessage("Msg:2"));
         producer.send(session.createTextMessage("Msg:3"));
-        
+
         consumer = session.createDurableSubscriber(topic, "sub1");
 
         // Try to get the message.
@@ -225,7 +230,7 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport
{
         assertTextMessageEquals("Msg:2", consumer.receive(5000));
         assertNull(consumer.receive(5000));
     }
-    
+
     public void testDurableSubscriptionBrokerRestart() throws Exception {
 
         // Create the durable sub.
@@ -235,12 +240,12 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport
{
         // Ensure that consumer will receive messages sent before it was created
         Topic topic = session.createTopic("TestTopic?consumer.retroactive=true");
         consumer = session.createDurableSubscriber(topic, "sub1");
-        
+
         producer = session.createProducer(topic);
         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
         producer.send(session.createTextMessage("Msg:1"));
         assertTextMessageEquals("Msg:1", consumer.receive(5000));
-        
+
         // Make sure cleanup kicks in
         Thread.sleep(1000);
 
@@ -428,8 +433,7 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport
{
         consumer = session.createDurableSubscriber(topic, "sub1");
         Message msg = consumer.receive(1000);
         assertNotNull(msg);
-        assertEquals("Message 1", ((TextMessage)msg).getText());
-
+        assertEquals("Message 1", ((TextMessage) msg).getText());
     }
 
     public void testDurableSubWorksInNewConnection() throws Exception {
@@ -459,8 +463,56 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport
{
         consumer = session.createDurableSubscriber(topic, "sub1");
         Message msg = consumer.receive(1000);
         assertNotNull(msg);
-        assertEquals("Message 1", ((TextMessage)msg).getText());
+        assertEquals("Message 1", ((TextMessage) msg).getText());
+    }
+
+    public void testIndividualAckWithDurableSubs() throws Exception {
+        // Create the consumer.
+        connection.start();
+
+        Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+        Topic topic = session.createTopic("topic-" + getName());
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1");
+        // Drain any messages that may allready be in the sub
+        while (consumer.receive(1000) != null) {
+        }
+        consumer.close();
 
+        MessageProducer producer = session.createProducer(topic);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        producer.send(session.createTextMessage("Message 1"));
+        producer.send(session.createTextMessage("Message 2"));
+        producer.send(session.createTextMessage("Message 3"));
+        producer.close();
+
+        connection.close();
+        connection = createConnection();
+        connection.start();
+
+        session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+        consumer = session.createDurableSubscriber(topic, "sub1");
+
+        Message message = null;
+        for (int i = 0; i < 3; ++i) {
+            message = consumer.receive(5000);
+            assertNotNull(message);
+            assertEquals("Message " + (i + 1), ((TextMessage) message).getText());
+        }
+
+        message.acknowledge();
+
+        connection.close();
+        connection = createConnection();
+        connection.start();
+
+        session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+        consumer = session.createDurableSubscriber(topic, "sub1");
+
+        for (int i = 0; i < 2; ++i) {
+            message = consumer.receive(5000);
+            assertNotNull(message);
+            assertEquals("Message " + (i + 1), ((TextMessage) message).getText());
+        }
     }
 
     private MessageProducer createProducer(Session session, Destination queue) throws JMSException
{
@@ -476,7 +528,7 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport
{
     private void assertTextMessageEquals(String string, Message message) throws JMSException
{
         assertNotNull("Message was null", message);
         assertTrue("Message is not a TextMessage", message instanceof TextMessage);
-        assertEquals(string, ((TextMessage)message).getText());
+        assertEquals(string, ((TextMessage) message).getText());
     }
 
 }


Mime
View raw message