activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r745953 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ActiveMQMessageConsumer.java main/java/org/apache/activemq/broker/region/Queue.java test/java/org/apache/activemq/JMSConsumerTest.java
Date Thu, 19 Feb 2009 18:16:03 GMT
Author: gtully
Date: Thu Feb 19 18:15:59 2009
New Revision: 745953

URL: http://svn.apache.org/viewvc?rev=745953&view=rev
Log:
tidy up redispatch logic a little more, resolve: AMQ-2128, deliver acks on dispose in auto_ack
mode. also get some closure on: MQ-2075

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=745953&r1=745952&r2=745953&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Thu Feb 19 18:15:59 2009
@@ -630,7 +630,7 @@
     void deliverAcks() {
         MessageAck ack = null;
         if (deliveryingAcknowledgements.compareAndSet(false, true)) {
-            if (this.optimizeAcknowledge) {
+            if (session.isAutoAcknowledge()) {
             	synchronized(deliveredMessages) {
             		ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
             		if (ack != null) {
@@ -775,14 +775,12 @@
             if (session.getTransacted()) {
                 // Do nothing.
             } else if (session.isAutoAcknowledge()) {
-                synchronized (deliveredMessages) {
-                    if (!deliveredMessages.isEmpty()) {
-                        if (optimizeAcknowledge) {
-                            if (deliveryingAcknowledgements.compareAndSet(
-                                    false, true)) {
+                if (deliveryingAcknowledgements.compareAndSet(false, true)) {
+                    synchronized (deliveredMessages) {
+                        if (!deliveredMessages.isEmpty()) {
+                            if (optimizeAcknowledge) {
                                 ackCounter++;
-                                if (ackCounter >= (info
-                                        .getCurrentPrefetchSize() * .65)) {
+                                if (ackCounter >= (info.getCurrentPrefetchSize() * .65))
{
                                 	MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                 	if (ack != null) {
                             		    deliveredMessages.clear();
@@ -790,16 +788,16 @@
                             		    session.sendAck(ack);
                                 	}
                                 }
-                                deliveryingAcknowledgements.set(false);
-                            }
-                        } else {
-                            MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
-                            if (ack!=null) {
-                            	deliveredMessages.clear();
-                            	session.sendAck(ack);
+                            } else {
+                                MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
+                                if (ack!=null) {
+                                    deliveredMessages.clear();
+                                    session.sendAck(ack);
+                                }
                             }
                         }
                     }
+                    deliveryingAcknowledgements.set(false);
                 }
             } else if (session.isDupsOkAcknowledge()) {
                 ackLater(md, MessageAck.STANDARD_ACK_TYPE);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=745953&r1=745952&r2=745953&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Feb 19 18:15:59 2009
@@ -336,8 +336,7 @@
                     }
                 }
                 ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
-                MessageGroupSet ownedGroups = getMessageGroupOwners()
-                        .removeConsumer(consumerId);
+                getMessageGroupOwners().removeConsumer(consumerId);
                 
                 // redeliver inflight messages
                 List<QueueMessageReference> list = new ArrayList<QueueMessageReference>();
@@ -353,19 +352,10 @@
                     list.add(qmr);
                 }
                 
-                if (!list.isEmpty() && !consumers.isEmpty()) {
+                if (!list.isEmpty()) {
                     doDispatch(list);
                 }
             }
-            //if it is a last consumer (and not a browser) dispatch all pagedIn messages
-            if (consumers.isEmpty() && !(sub instanceof QueueBrowserSubscription))
{
-            		List<QueueMessageReference> list = new ArrayList<QueueMessageReference>();
-            		for (QueueMessageReference ref : pagedInMessages.values()) {
-            			list.add(ref);
-            		}
-            		pagedInPendingDispatch.clear();
-            		doDispatch(list);
-            }
             if (!(this.optimizedDispatch || isSlave())) {
                 wakeup();
             }
@@ -1068,7 +1058,7 @@
 	        }
 	        
 	        synchronized (messages) {
-	            pageInMoreMessages = !messages.isEmpty();
+	            pageInMoreMessages |= !messages.isEmpty();
 	        }               
 	        
 	        // Kinda ugly.. but I think dispatchLock is the only mutex protecting the 
@@ -1333,14 +1323,18 @@
      *         were not full.
      */
     private List<QueueMessageReference> doActualDispatch(List<QueueMessageReference>
list) throws Exception {
-        List<QueueMessageReference> rc = new ArrayList<QueueMessageReference>(list.size());
-        Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
         List<Subscription> consumers;
         
         synchronized (this.consumers) {
+            if (this.consumers.isEmpty()) {
+                return list;
+            }
             consumers = new ArrayList<Subscription>(this.consumers);
         }
 
+        List<QueueMessageReference> rc = new ArrayList<QueueMessageReference>(list.size());
+        Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
+        
         for (MessageReference node : list) {
             Subscription target = null;
             int interestCount=0;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?rev=745953&r1=745952&r2=745953&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java Thu
Feb 19 18:15:59 2009
@@ -358,15 +358,99 @@
         assertEquals(4, counter.get());
     }
 
-    public void initCombosForTestMessageListenerUnackedWithPrefetch1StayInQueue() {
+    public void initCombosForTestMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue()
{ 
         addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE),
Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE),
-                                                      Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
+        addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE)});
         addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)});
     }
 
-    public void testMessageListenerUnackedWithPrefetch1StayInQueue() throws Exception {
+    public void testMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() throws
Exception {
+    
+        final AtomicInteger counter = new AtomicInteger(0);
+        final CountDownLatch sendDone = new CountDownLatch(1);
+        final CountDownLatch got2Done = new CountDownLatch(1);
+
+        // Set prefetch to 1
+        connection.getPrefetchPolicy().setAll(1);
+        // This test case does not work if optimized message dispatch is used as
+        // the main thread send block until the consumer receives the
+        // message. This test depends on thread decoupling so that the main
+        // thread can stop the consumer thread.
+        connection.setOptimizedMessageDispatch(false);
+        connection.start();
+
+        // Use all the ack modes
+        Session session = connection.createSession(false, ackMode);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message m) {
+                try {
+                    TextMessage tm = (TextMessage)m;
+                    LOG.info("Got in first listener: " + tm.getText());
+                    assertEquals("" + counter.get(), tm.getText());
+                    counter.incrementAndGet();
+                    if (counter.get() == 2) {
+                        sendDone.await();
+                        connection.close();
+                        got2Done.countDown();
+                    }
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        // Send the messages
+        sendMessages(session, destination, 4);
+        sendDone.countDown();
 
+        // Wait for first 2 messages to arrive.
+        assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS));
+
+        // Re-start connection.
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+
+        connection.getPrefetchPolicy().setAll(1);
+        connection.start();
+
+        // Pickup the remaining messages.
+        final CountDownLatch done2 = new CountDownLatch(1);
+        session = connection.createSession(false, ackMode);
+        consumer = session.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message m) {
+                try {
+                    TextMessage tm = (TextMessage)m;
+                    LOG.info("Got in second listener: " + tm.getText());
+                    // order is not guaranteed as the connection is started before the listener
is set.
+                    // assertEquals("" + counter.get(), tm.getText());
+                    counter.incrementAndGet();
+                    if (counter.get() == 4) {
+                        done2.countDown();
+                    }
+                } catch (Throwable e) {
+                    LOG.error("unexpected ex onMessage: ", e);
+                }
+            }
+        });
+
+        assertTrue(done2.await(1000, TimeUnit.MILLISECONDS));
+        Thread.sleep(200);
+
+        // assert msg 2 was redelivered as close() from onMessages() will only ack in auto_ack
mode
+        assertEquals(5, counter.get());      
+    }
+
+    public void initCombosForTestMessageListenerAutoAckOnCloseWithPrefetch1() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE),
Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
+        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)});
+    }
+
+    public void testMessageListenerAutoAckOnCloseWithPrefetch1() throws Exception {
+    
         final AtomicInteger counter = new AtomicInteger(0);
         final CountDownLatch sendDone = new CountDownLatch(1);
         final CountDownLatch got2Done = new CountDownLatch(1);
@@ -426,13 +510,12 @@
                 try {
                     TextMessage tm = (TextMessage)m;
                     LOG.info("Got in second listener: " + tm.getText());
-                    assertEquals("" + counter.get(), tm.getText());
                     counter.incrementAndGet();
                     if (counter.get() == 4) {
                         done2.countDown();
                     }
                 } catch (Throwable e) {
-                    LOG.info("unexpected ex onMessage: ", e);
+                    LOG.error("unexpected ex onMessage: ", e);
                 }
             }
         });
@@ -440,9 +523,9 @@
         assertTrue(done2.await(1000, TimeUnit.MILLISECONDS));
         Thread.sleep(200);
 
+        // close from onMessage with Auto_ack will ack
         // Make sure only 4 messages were delivered.
         assertEquals(4, counter.get());
-
     }
 
     public void initCombosForTestMessageListenerWithConsumerWithPrefetch1() {



Mime
View raw message