activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r358579 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/activemq/ActiveMQMessageConsumer.java test/java/org/activemq/JMSConsumerTest.java
Date Thu, 22 Dec 2005 15:43:47 GMT
Author: chirino
Date: Thu Dec 22 07:43:42 2005
New Revision: 358579

URL: http://svn.apache.org/viewcvs?rev=358579&view=rev
Log:
Fixed and added test cases for the consumer start() stop() methods.

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/ActiveMQMessageConsumer.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/JMSConsumerTest.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/ActiveMQMessageConsumer.java?rev=358579&r1=358578&r2=358579&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/ActiveMQMessageConsumer.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/ActiveMQMessageConsumer.java
Thu Dec 22 07:43:42 2005
@@ -245,10 +245,7 @@
      * @return true if this is a durable topic subscriber
      */
     public boolean isDurableSubscriber() {
-        // TODO Add ActiveMQTopicSubscriber
-        return false;
-        // return this instanceof ActiveMQTopicSubscriber && consumerName !=
-        // null && consumerName.length() > 0;
+        return info.getSubcriptionName()!=null && info.getDestination().isTopic();
     }
 
     /**
@@ -671,8 +668,12 @@
 
                 Scheduler.executeAfterDelay(new Runnable() {
                     public void run() {
-                        if (started.get())
-                            unconsumedMessages.start();
+                        try {
+                            if (started.get())
+                                start();
+                        } catch (JMSException e) {
+                            session.connection.onAsyncException(e);
+                        }
                     }
                 }, redeliveryDelay);
                 
@@ -695,7 +696,7 @@
         MessageListener listener = this.messageListener;
         try {
             if (!unconsumedMessages.isClosed()) {
-                if (listener != null) {
+                if (listener != null && started.get()) {
                     ActiveMQMessage message = createActiveMQMessage(md);
                     beforeMessageIsConsumed(md);
                     listener.onMessage(message);
@@ -716,9 +717,19 @@
         return unconsumedMessages.size();
     }
 
-    public void start() {
+    public void start() throws JMSException {
         started.set(true);
         unconsumedMessages.start();
+        MessageListener listener = this.messageListener;
+        if( listener!=null ) {
+            MessageDispatch md;
+            while( (md = unconsumedMessages.dequeueNoWait())!=null ) {
+                ActiveMQMessage message = createActiveMQMessage(md);
+                beforeMessageIsConsumed(md);
+                listener.onMessage(message);
+                afterMessageIsConsumed(md, false);
+            }
+        }
     }
 
     public void stop() {

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/JMSConsumerTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/JMSConsumerTest.java?rev=358579&r1=358578&r2=358579&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/JMSConsumerTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/JMSConsumerTest.java
Thu Dec 22 07:43:42 2005
@@ -59,6 +59,55 @@
     public byte destinationType;
     public boolean durableConsumer;
     
+    public void initCombosForTestMessageListenerWithConsumerCanBeStopped() {
+        addCombinationValues("deliveryMode", new Object[] { 
+                new Integer(DeliveryMode.NON_PERSISTENT),
+                new Integer(DeliveryMode.PERSISTENT) });
+        addCombinationValues("destinationType", new Object[] { 
+                new Byte(ActiveMQDestination.QUEUE_TYPE),
+                new Byte(ActiveMQDestination.TOPIC_TYPE), 
+                new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE),
+                new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE) });
+    }
+    public void testMessageListenerWithConsumerCanBeStopped() throws Throwable {
+
+        final AtomicInteger counter = new AtomicInteger(0);
+        final CountDownLatch done1 = new CountDownLatch(1);
+        final CountDownLatch done2 = new CountDownLatch(1);
+        
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message m) {
+                counter.incrementAndGet();
+                if( counter.get()==1 )
+                    done1.countDown();
+                if( counter.get()==2 )
+                    done2.countDown();
+            }
+        });
+
+        // Send a first message to make sure that the consumer dispatcher is running
+        sendMessages(session, destination, 1);
+        assertTrue(done1.await(1, TimeUnit.SECONDS));
+        assertEquals(1, counter.get());
+
+        // Stop the consumer.
+        consumer.stop();
+
+        // Send a message, but should not get delivered.
+        sendMessages(session, destination, 1);
+        assertFalse(done2.await(1, TimeUnit.SECONDS));
+        assertEquals(1, counter.get());
+        
+        // Start the consumer, and the message should now get delivered.
+        consumer.start();
+        assertTrue(done2.await(1, TimeUnit.SECONDS));
+        assertEquals(2, counter.get());
+    }
     
     public void initCombosForTestMutiReceiveWithPrefetch1() {
         addCombinationValues("deliveryMode", new Object[] { 



Mime
View raw message