activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r472345 - in /incubator/activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/ src/main/java/org/apache/activemq/broker/region/ src/test/java/org/apache/activemq/ src/test/java/org/apache/activemq/test/rollback/
Date Wed, 08 Nov 2006 00:14:57 GMT
Author: chirino
Date: Tue Nov  7 16:14:56 2006
New Revision: 472345

URL: http://svn.apache.org/viewvc?view=rev&rev=472345
Log:
All message dispatching should occur from the session's executor.  Also, we should dispatch
any messages in the consumers queue before dispatching messages in the session's queues.

http://issues.apache.org/activemq/browse/AMQ-1031
http://issues.apache.org/activemq/browse/AMQ-1032


Modified:
    incubator/activemq/trunk/activemq-core/pom.xml
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java

Modified: incubator/activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/pom.xml?view=diff&rev=472345&r1=472344&r2=472345
==============================================================================
--- incubator/activemq/trunk/activemq-core/pom.xml (original)
+++ incubator/activemq/trunk/activemq-core/pom.xml Tue Nov  7 16:14:56 2006
@@ -254,8 +254,6 @@
             <!-- TODO need to get the JUnit test configured to create SSL sockets nicely
via system properties -->
             <exclude>**/StompSslTest.*</exclude>
 
-            <!-- TODO reproduces a bad ack bug -->
-            <exclude>**/RollbacksWhileConsumingLargeQueueTest.*</exclude>
           </excludes>
         </configuration>
       </plugin>

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?view=diff&rev=472345&r1=472344&r2=472345
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Tue Nov  7 16:14:56 2006
@@ -776,6 +776,11 @@
             }
             if(deliveredMessages.isEmpty())
                 return;
+
+            // Only increase the redlivery delay after the first redelivery..
+            if( rollbackCounter > 0 )
+            	redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
+
             rollbackCounter++;
             if(rollbackCounter>redeliveryPolicy.getMaximumRedeliveries()){
                 // We need to NACK the messages so that they get sent to the
@@ -791,23 +796,29 @@
             }else{
                 // stop the delivery of messages.
                 unconsumedMessages.stop();
-                // Start up the delivery again a little later.
-                redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
-                Scheduler.executeAfterDelay(new Runnable(){
-                    public void run(){
-                        try{
-                            if(started.get())
-                                start();
-                        }catch(JMSException e){
-                            session.connection.onAsyncException(e);
-                        }
-                    }
-                },redeliveryDelay);
+                                
                 for(Iterator iter=deliveredMessages.iterator();iter.hasNext();){
                     MessageDispatch md=(MessageDispatch) iter.next();
                     md.getMessage().onMessageRolledBack();
                     unconsumedMessages.enqueueFirst(md);
                 }
+                                
+                if( redeliveryDelay > 0 ) {
+                    // Start up the delivery again a little later.
+	                Scheduler.executeAfterDelay(new Runnable(){
+	                    public void run(){
+	                        try{
+	                            if(started.get())
+	                                start();
+	                        }catch(JMSException e){
+	                            session.connection.onAsyncException(e);
+	                        }
+	                    }
+	                },redeliveryDelay);
+                } else {
+                	start();
+                }
+
             }
             deliveredCounter-=deliveredMessages.size();
             deliveredMessages.clear();
@@ -820,31 +831,33 @@
     public void dispatch(MessageDispatch md) {
         MessageListener listener = this.messageListener;
         try {
-            if (!unconsumedMessages.isClosed()) {
-                if (listener != null && unconsumedMessages.isRunning() ) {
-                    ActiveMQMessage message = createActiveMQMessage(md);
-                    beforeMessageIsConsumed(md);
-                    try {
-                        listener.onMessage(message);
-                        afterMessageIsConsumed(md, false);
-                    } catch (RuntimeException e) {
-                        if ( session.isDupsOkAcknowledge() || session.isAutoAcknowledge()
) {
-                            // Redeliver the message
-                        } else {
-                            // Transacted or Client ack: Deliver the next message.
-                            afterMessageIsConsumed(md, false);
-                        }
-                        log.warn("Exception while processing message: " + e, e);
-                    }
-                } else {
-                    unconsumedMessages.enqueue(md);
-                    if (availableListener != null) {
-                        availableListener.onMessageAvailable(this);
-                    }
-                }
+            synchronized(unconsumedMessages.getMutex()){
+	            if (!unconsumedMessages.isClosed()) {
+	                if (listener != null && unconsumedMessages.isRunning() ) {
+	                    ActiveMQMessage message = createActiveMQMessage(md);
+	                    beforeMessageIsConsumed(md);
+	                    try {
+	                        listener.onMessage(message);
+	                        afterMessageIsConsumed(md, false);
+	                    } catch (RuntimeException e) {
+	                        if ( session.isDupsOkAcknowledge() || session.isAutoAcknowledge()
) {
+	                            // Redeliver the message
+	                        } else {
+	                            // Transacted or Client ack: Deliver the next message.
+	                            afterMessageIsConsumed(md, false);
+	                        }
+	                        log.warn("Exception while processing message: " + e, e);
+	                    }
+	                } else {
+	                    unconsumedMessages.enqueue(md);
+	                    if (availableListener != null) {
+	                        availableListener.onMessageAvailable(this);
+	                    }
+	                }
+	            }
             }
         } catch (Exception e) {
-            log.warn("could not process message: " + md, e);
+        	session.connection.onAsyncException(e);
         }
     }
 
@@ -853,18 +866,12 @@
     }
 
     public void start() throws JMSException {
+    	if (unconsumedMessages.isClosed()) {
+    		return;
+    	}    	
         started.set(true);
         unconsumedMessages.start();
-        MessageListener listener = this.messageListener;
-        if( listener!=null ) {
-            MessageDispatch md;
-            while( (md = unconsumedMessages.dequeueNoWait())!=null ) {
-                ActiveMQMessage message = createActiveMQMessage(md);
-                beforeMessageIsConsumed(md);
-                listener.onMessage(message);
-                afterMessageIsConsumed(md, false);
-            }
-        }
+        session.executor.wakeup();
     }
 
     public void stop() {
@@ -875,5 +882,29 @@
     public String toString() {
         return "ActiveMQMessageConsumer { value=" +info.getConsumerId()+", started=" +started.get()+"
}";
     }
+
+    /**
+     * Delivers a message to the message listener.
+     * @return
+     * @throws JMSException 
+     */
+	public boolean iterate() {
+		MessageListener listener = this.messageListener;
+		if( listener!=null ) {
+		    MessageDispatch md = unconsumedMessages.dequeueNoWait();
+		    if( md!=null ) {
+		        try {
+			        ActiveMQMessage message = createActiveMQMessage(md);
+			        beforeMessageIsConsumed(md);
+			        listener.onMessage(message);
+			        afterMessageIsConsumed(md, false);
+				} catch (JMSException e) {
+		        	session.connection.onAsyncException(e);
+				}
+		        return true;
+		    }
+		}
+    	return false;
+	}
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?view=diff&rev=472345&r1=472344&r2=472345
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
Tue Nov  7 16:14:56 2006
@@ -61,8 +61,8 @@
         }
     }
 
-    private void wakeup() {
-        if( !dispatchedBySessionPool && hasUncomsumedMessages() ) {
+    public void wakeup() {
+        if( !dispatchedBySessionPool ) {
             if( taskRunner!=null ) {
                 try {
                     taskRunner.wakeup();
@@ -148,6 +148,16 @@
     }
 
     public boolean iterate() {
+
+    	// Deliver any messages queued on the consumer to their listeners.
+    	for (Iterator i = this.session.consumers.iterator(); i.hasNext();) {
+            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
+        	if( consumer.iterate() ) {
+        		return true;
+        	}
+        }
+    	
+    	// No messages left queued on the listeners.. so now dispatch messages queued on the
session
         MessageDispatch message = messageQueue.dequeueNoWait();
         if( message==null ) {
             return false;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java?view=diff&rev=472345&r1=472344&r2=472345
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
Tue Nov  7 16:14:56 2006
@@ -32,7 +32,7 @@
 
     // +/-15% for a 30% spread -cgs
     protected double collisionAvoidanceFactor = 0.15d;
-    protected int maximumRedeliveries = 5;
+    protected int maximumRedeliveries = 6;
     protected long initialRedeliveryDelay = 1000L;
     protected static Random randomNumberGenerator;
     protected boolean useCollisionAvoidance = false;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=472345&r1=472344&r2=472345
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Tue Nov  7 16:14:56 2006
@@ -183,6 +183,10 @@
                                     prefetchExtension--;
                                 }
                             }
+
+                            public void afterRollback() throws Exception {
+                            	super.afterRollback();
+                            }
                         });
                     }
                     index++;

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java?view=diff&rev=472345&r1=472344&r2=472345
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
Tue Nov  7 16:14:56 2006
@@ -57,7 +57,7 @@
     protected RedeliveryPolicy getRedeliveryPolicy() {
         RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
         redeliveryPolicy.setInitialRedeliveryDelay(1000);
-        redeliveryPolicy.setMaximumRedeliveries(2);
+        redeliveryPolicy.setMaximumRedeliveries(3);
         redeliveryPolicy.setBackOffMultiplier((short) 2);
         redeliveryPolicy.setUseExponentialBackOff(true);
         return redeliveryPolicy;
@@ -82,7 +82,7 @@
             try {
                 log.info("Message Received: " + message);
                 counter++;
-                if (counter <= 3) {
+                if (counter <= 4) {
                     log.info("Message Rollback.");
                     session.rollback();
                 } else {
@@ -119,24 +119,26 @@
         } catch (InterruptedException e) {
 
         }
-        // first try
-        assertEquals(1, listener.counter);
+        
+        // first try.. should get 2 since there is no delay on the 
+        // first redeliver..
+        assertEquals(2, listener.counter);
 
         try {
             Thread.sleep(1000);
         } catch (InterruptedException e) {
 
         }
-        // second try (redelivery after 1 sec)
-        assertEquals(2, listener.counter);
+        // 2nd redeliver (redelivery after 1 sec)
+        assertEquals(3, listener.counter);
 
         try {
             Thread.sleep(2000);
         } catch (InterruptedException e) {
 
         }
-        // third try (redelivery after 2 seconds) - it should give up after that
-        assertEquals(3, listener.counter);
+        // 3rd redeliver (redelivery after 2 seconds) - it should give up after that
+        assertEquals(4, listener.counter);
 
         // create new message
         producer.send(createTextMessage(session));
@@ -148,7 +150,7 @@
             // ignore
         }
         // it should be committed, so no redelivery
-        assertEquals(4, listener.counter);
+        assertEquals(5, listener.counter);
 
         try {
             Thread.sleep(1500);
@@ -156,7 +158,7 @@
             // ignore
         }
         // no redelivery, counter should still be 4
-        assertEquals(4, listener.counter);
+        assertEquals(5, listener.counter);
 
         session.close();
     }
@@ -184,8 +186,8 @@
         } catch (InterruptedException e) {
 
         }
-        // first try
-        assertEquals(1, listener.counter);
+        // first try 
+        assertEquals(2, listener.counter);
 
         try {
             Thread.sleep(1000);
@@ -193,7 +195,7 @@
 
         }
         // second try (redelivery after 1 sec)
-        assertEquals(2, listener.counter);
+        assertEquals(3, listener.counter);
 
         try {
             Thread.sleep(2000);
@@ -201,7 +203,7 @@
 
         }
         // third try (redelivery after 2 seconds) - it should give up after that
-        assertEquals(3, listener.counter);
+        assertEquals(4, listener.counter);
 
         // create new message
         producer.send(createTextMessage(session));
@@ -213,7 +215,7 @@
             // ignore
         }
         // it should be committed, so no redelivery
-        assertEquals(4, listener.counter);
+        assertEquals(5, listener.counter);
 
         try {
             Thread.sleep(1500);
@@ -221,7 +223,7 @@
             // ignore
         }
         // no redelivery, counter should still be 4
-        assertEquals(4, listener.counter);
+        assertEquals(5, listener.counter);
 
         session.close();
     }

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java?view=diff&rev=472345&r1=472344&r2=472345
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
Tue Nov  7 16:14:56 2006
@@ -70,9 +70,15 @@
         assertEquals("1st", m.getText());        
         session.rollback();
 
-        // Show re-delivery delay is incrementing.
+        // No delay on first rollback..
+        m = (TextMessage)consumer.receive(100);
+        assertNotNull(m);
+        session.rollback();
+        
+        // Show subsequent re-delivery delay is incrementing.
         m = (TextMessage)consumer.receive(100);
         assertNull(m);
+        
         m = (TextMessage)consumer.receive(500);
         assertNotNull(m);
         assertEquals("1st", m.getText());        
@@ -117,7 +123,12 @@
         assertEquals("1st", m.getText());        
         session.rollback();
 
-        // Show re-delivery delay is incrementing.
+        // No delay on first rollback..
+        m = (TextMessage)consumer.receive(100);
+        assertNotNull(m);
+        session.rollback();
+        
+        // Show subsequent re-delivery delay is incrementing.
         m = (TextMessage)consumer.receive(100);
         assertNull(m);
         m = (TextMessage)consumer.receive(500);

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java?view=diff&rev=472345&r1=472344&r2=472345
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java
Tue Nov  7 16:14:56 2006
@@ -45,7 +45,7 @@
 	private CountDownLatch latch;
 	private Throwable failure;
 
-	public void xtestWithReciever() throws Throwable {
+	public void testWithReciever() throws Throwable {
 		latch = new CountDownLatch(numberOfMessagesOnQueue);
 		Session session = connection.createSession(true, 0);
 		MessageConsumer consumer = session.createConsumer(destination);
@@ -148,11 +148,11 @@
 
 		int value = deliveryCounter.incrementAndGet();
 		if (value % 2 == 0) {
-			log.info("Rolling Back message: " + value + " id: " + msgId + ", content: " + msgText);
+			log.info("Rolling Back message: " + ackCounter.get() + " id: " + msgId + ", content: "
+ msgText);
 			throw new RuntimeException("Dummy exception on message: " + value);
 		}
 
-		log.info("Received message: " + value + " id: " + msgId + ", content: " + msgText);
+		log.info("Received message: " + ackCounter.get() + " id: " + msgId + ", content: " + msgText);
 		ackCounter.incrementAndGet();
 		latch.countDown();
 	}



Mime
View raw message