activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r581747 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ActiveMQMessageConsumer.java ActiveMQSession.java broker/region/PrefetchSubscription.java command/MessageAck.java
Date Wed, 03 Oct 2007 23:32:30 GMT
Author: chirino
Date: Wed Oct  3 16:32:29 2007
New Revision: 581747

URL: http://svn.apache.org/viewvc?rev=581747&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1038
 We now send REDELIVERY acks to the broker when a message is redelivered.  This allows the
broker to update the message with the number of times redelivery has occured so that if the
message is delivered to another consumer it can DQL the message when max redeliveries have
occured across consumers.


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=581747&r1=581746&r2=581747&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Wed Oct  3 16:32:29 2007
@@ -104,7 +104,6 @@
     private final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
     private int deliveredCounter;
     private int additionalWindowSize;
-    private int rollbackCounter;
     private long redeliveryDelay;
     private int ackCounter;
     private int dispatchedCount;
@@ -627,6 +626,14 @@
 
     public void dispose() throws JMSException {
         if (!unconsumedMessages.isClosed()) {
+            
+//            if ( !deliveredMessages.isEmpty() ) {
+//                // We need to let the broker know how many times that message
+//                // was rolled back.
+//                rollbackCounter++;
+//                MessageDispatch lastMd = deliveredMessages.get(0);
+//            }
+
             // Do we have any acks we need to send out before closing?
             // Ack any delivered messages now. (session may still
             // commit/rollback the acks).
@@ -829,7 +836,6 @@
 
     public void commit() throws JMSException {
         deliveredMessages.clear();
-        rollbackCounter = 0;
         redeliveryDelay = 0;
     }
 
@@ -851,31 +857,39 @@
             }
 
             // Only increase the redlivery delay after the first redelivery..
-            if (rollbackCounter > 0) {
+            MessageDispatch lastMd = deliveredMessages.getFirst();
+            if (lastMd.getMessage().getRedeliveryCounter() > 0) {
                 redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
             }
-            rollbackCounter++;
+
+            for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) {
+                MessageDispatch md = (MessageDispatch)iter.next();
+                md.getMessage().onMessageRolledBack();
+            }
+
             if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
-                && rollbackCounter > redeliveryPolicy.getMaximumRedeliveries())
{
+                && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries())
{
                 // We need to NACK the messages so that they get sent to the
                 // DLQ.
                 // Acknowledge the last message.
-                MessageDispatch lastMd = deliveredMessages.get(0);
+                
                 MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
                 session.asyncSendPacket(ack);
                 // ensure we don't filter this as a duplicate
                 session.connection.rollbackDuplicate(this, lastMd.getMessage());
                 // Adjust the window size.
                 additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
-                rollbackCounter = 0;
                 redeliveryDelay = 0;
             } else {
+                
+                MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE,
deliveredMessages.size());
+                session.asyncSendPacket(ack);
+
                 // stop the delivery of messages.
                 unconsumedMessages.stop();
 
                 for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) {
                     MessageDispatch md = (MessageDispatch)iter.next();
-                    md.getMessage().onMessageRolledBack();
                     unconsumedMessages.enqueueFirst(md);
                 }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=581747&r1=581746&r2=581747&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Wed
Oct  3 16:32:29 2007
@@ -755,6 +755,11 @@
                                 ack.setFirstMessageId(md.getMessage().getMessageId());
                                 asyncSendPacket(ack);
                             } else {
+                                
+                                MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE,
1);
+                                ack.setFirstMessageId(md.getMessage().getMessageId());
+                                asyncSendPacket(ack);
+
                                 // Figure out how long we should wait to resend
                                 // this message.
                                 long redeliveryDelay = 0;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=581747&r1=581746&r2=581747&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Wed Oct  3 16:32:29 2007
@@ -230,6 +230,28 @@
             if (!callDispatchMatched) {
                 throw new JMSException("Could not correlate acknowledgment with dispatched
message: " + ack);
             }
+        } else if (ack.isRedeliveredAck() ) {
+            // Message was re-delivered but it was not yet considered to be a DLQ message.
+            // Acknowledge all dispatched messages up till the message id of the
+            // acknowledgment.
+            boolean inAckRange = false;
+            for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext();)
{
+                final MessageReference node = iter.next();
+                MessageId messageId = node.getMessageId();
+                if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId))
{
+                    inAckRange = true;
+                }
+                if (inAckRange) {
+                    node.incrementRedeliveryCounter();
+                    if (ack.getLastMessageId().equals(messageId)) {
+                        callDispatchMatched = true;
+                        break;
+                    }
+                }
+            }
+            if (!callDispatchMatched) {
+                throw new JMSException("Could not correlate acknowledgment with dispatched
message: " + ack);
+            }
         } else if (ack.isPoisonAck()) {
             // TODO: what if the message is already in a DLQ???
             // Handle the poison ACK case: we need to send the message to a DLQ

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java?rev=581747&r1=581746&r2=581747&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java
Wed Oct  3 16:32:29 2007
@@ -46,6 +46,14 @@
      */
     public static final byte POSION_ACK_TYPE = 1;
 
+    /**
+     * In case the client want's to explicitly let the broker know that a
+     * message was not processed and it was re-delivered to the consumer
+     * but it was not yet considered to be a poison message.  The messageCount 
+     * field will hold the number of times the message was re-delivered. 
+     */
+    public static final byte REDELIVERED_ACK_TYPE = 3;
+    
     protected byte ackType;
     protected ConsumerId consumerId;
     protected MessageId firstMessageId;
@@ -95,6 +103,10 @@
 
     public boolean isDeliveredAck() {
         return ackType == DELIVERED_ACK_TYPE;
+    }
+    
+    public boolean isRedeliveredAck() {
+        return ackType == REDELIVERED_ACK_TYPE;
     }
 
     /**



Mime
View raw message