activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r613829 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Date Mon, 21 Jan 2008 10:29:00 GMT
Author: rajdavies
Date: Mon Jan 21 02:28:59 2008
New Revision: 613829

URL: http://svn.apache.org/viewvc?rev=613829&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1556

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=613829&r1=613828&r2=613829&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Mon Jan 21 02:28:59 2008
@@ -595,11 +595,13 @@
         MessageAck ack = null;
         if (deliveryingAcknowledgements.compareAndSet(false, true)) {
             if (this.optimizeAcknowledge) {
-                if (!deliveredMessages.isEmpty()) {
-                    MessageDispatch md = deliveredMessages.getFirst();
-                    ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size());
-                    deliveredMessages.clear();
-                    ackCounter = 0;
+                synchronized(deliveredMessages) {
+                    if (!deliveredMessages.isEmpty()) {
+                        MessageDispatch md = deliveredMessages.getFirst();
+                        ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size());
+                        deliveredMessages.clear();
+                        ackCounter = 0;
+                    }
                 }
             }
             if (ack != null) {
@@ -712,7 +714,9 @@
     private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
         md.setDeliverySequenceId(session.getNextDeliveryId());
         if (!session.isDupsOkAcknowledge()) {
-            deliveredMessages.addFirst(md);
+            synchronized(deliveredMessages) {
+                deliveredMessages.addFirst(md);
+            }
             if (session.isTransacted()) {
                 ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
             }
@@ -730,24 +734,26 @@
             if (session.isTransacted()) {
                 // Do nothing.
             } else if (session.isAutoAcknowledge()) {
-                if (!deliveredMessages.isEmpty()) {
-                    if (optimizeAcknowledge) {
-                        if (deliveryingAcknowledgements.compareAndSet(false, true)) {
-                            ackCounter++;
-                            if (ackCounter >= (info.getCurrentPrefetchSize() * .65)) {
-                                MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE,
-                                                                deliveredMessages.size());
-                                session.asyncSendPacket(ack);
-                                ackCounter = 0;
-                                deliveredMessages.clear();
+                synchronized (deliveredMessages) {
+                    if (!deliveredMessages.isEmpty()) {
+                        if (optimizeAcknowledge) {
+                            if (deliveryingAcknowledgements.compareAndSet(
+                                    false, true)) {
+                                ackCounter++;
+                                if (ackCounter >= (info
+                                        .getCurrentPrefetchSize() * .65)) {
+                                    MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
+                                    session.asyncSendPacket(ack);
+                                    ackCounter = 0;
+                                    deliveredMessages.clear();
+                                }
+                                deliveryingAcknowledgements.set(false);
                             }
-                            deliveryingAcknowledgements.set(false);
+                        } else {
+                            MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
+                            session.asyncSendPacket(ack);
+                            deliveredMessages.clear();
                         }
-                    } else {
-                        MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE,
deliveredMessages
-                            .size());
-                        session.asyncSendPacket(ack);
-                        deliveredMessages.clear();
                     }
                 }
             } else if (session.isDupsOkAcknowledge()) {
@@ -812,30 +818,34 @@
      * @throws JMSException
      */
     public void acknowledge() throws JMSException {
-        if (deliveredMessages.isEmpty()) {
-            return;
-        }
-
-        // Acknowledge the last message.
-        MessageDispatch lastMd = deliveredMessages.get(0);
-        MessageAck ack = new MessageAck(lastMd, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size());
-        if (session.isTransacted()) {
-            session.doStartTransaction();
-            ack.setTransactionId(session.getTransactionContext().getTransactionId());
-        }
-        session.asyncSendPacket(ack);
-
-        // Adjust the counters
-        deliveredCounter -= deliveredMessages.size();
-        additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
-
-        if (!session.isTransacted()) {
-            deliveredMessages.clear();
+        synchronized(deliveredMessages) {
+            if (deliveredMessages.isEmpty()) {
+                return;
+            }
+    
+            // Acknowledge the last message.
+            MessageDispatch lastMd = deliveredMessages.get(0);
+            MessageAck ack = new MessageAck(lastMd, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size());
+            if (session.isTransacted()) {
+                session.doStartTransaction();
+                ack.setTransactionId(session.getTransactionContext().getTransactionId());
+            }
+            session.asyncSendPacket(ack);
+    
+            // Adjust the counters
+            deliveredCounter -= deliveredMessages.size();
+            additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
+    
+            if (!session.isTransacted()) {
+                deliveredMessages.clear();
+            }
         }
     }
 
     public void commit() throws JMSException {
-        deliveredMessages.clear();
+        synchronized (deliveredMessages) {
+            deliveredMessages.clear();
+        }
         redeliveryDelay = 0;
     }
 
@@ -845,74 +855,78 @@
                 // remove messages read but not acked at the broker yet through
                 // optimizeAcknowledge
                 if (!this.info.isBrowser()) {
-                    for (int i = 0; (i < deliveredMessages.size()) && (i <
ackCounter); i++) {
-                        // ensure we don't filter this as a duplicate
-                        MessageDispatch md = deliveredMessages.removeLast();
-                        session.connection.rollbackDuplicate(this, md.getMessage());
+                    synchronized(deliveredMessages) {
+                        for (int i = 0; (i < deliveredMessages.size()) && (i <
ackCounter); i++) {
+                            // ensure we don't filter this as a duplicate
+                            MessageDispatch md = deliveredMessages.removeLast();
+                            session.connection.rollbackDuplicate(this, md.getMessage());
+                        }
                     }
                 }
             }
-            if (deliveredMessages.isEmpty()) {
-                return;
-            }
-
-            // Only increase the redlivery delay after the first redelivery..
-            MessageDispatch lastMd = deliveredMessages.getFirst();
-            if (lastMd.getMessage().getRedeliveryCounter() > 0) {
-                redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
-            }
-
-            for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) {
-                MessageDispatch md = (MessageDispatch)iter.next();
-                md.getMessage().onMessageRolledBack();
-            }
-
-            if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
-                && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries())
{
-                // We need to NACK the messages so that they get sent to the
-                // DLQ.
-                // Acknowledge the last message.
-                
-                MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
-                session.asyncSendPacket(ack);
-                // ensure we don't filter this as a duplicate
-                session.connection.rollbackDuplicate(this, lastMd.getMessage());
-                // Adjust the window size.
-                additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
-                redeliveryDelay = 0;
-            } else {
-                
-                MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE,
deliveredMessages.size());
-                session.asyncSendPacket(ack);
-
-                // stop the delivery of messages.
-                unconsumedMessages.stop();
-
+            synchronized(deliveredMessages) {
+                if (deliveredMessages.isEmpty()) {
+                    return;
+                }
+    
+                // Only increase the redlivery delay after the first redelivery..
+                MessageDispatch lastMd = deliveredMessages.getFirst();
+                if (lastMd.getMessage().getRedeliveryCounter() > 0) {
+                    redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
+                }
+    
                 for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) {
                     MessageDispatch md = (MessageDispatch)iter.next();
-                    unconsumedMessages.enqueueFirst(md);
+                    md.getMessage().onMessageRolledBack();
                 }
-
-                if (redeliveryDelay > 0) {
-                    // Start up the delivery again a little later.
-                    Scheduler.executeAfterDelay(new Runnable() {
-                        public void run() {
-                            try {
-                                if (started.get()) {
-                                    start();
+    
+                if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
+                    && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries())
{
+                    // We need to NACK the messages so that they get sent to the
+                    // DLQ.
+                    // Acknowledge the last message.
+                    
+                    MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
+                    session.asyncSendPacket(ack);
+                    // ensure we don't filter this as a duplicate
+                    session.connection.rollbackDuplicate(this, lastMd.getMessage());
+                    // Adjust the window size.
+                    additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
+                    redeliveryDelay = 0;
+                } else {
+                    
+                    MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE,
deliveredMessages.size());
+                    session.asyncSendPacket(ack);
+    
+                    // stop the delivery of messages.
+                    unconsumedMessages.stop();
+    
+                    for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) {
+                        MessageDispatch md = (MessageDispatch)iter.next();
+                        unconsumedMessages.enqueueFirst(md);
+                    }
+    
+                    if (redeliveryDelay > 0) {
+                        // Start up the delivery again a little later.
+                        Scheduler.executeAfterDelay(new Runnable() {
+                            public void run() {
+                                try {
+                                    if (started.get()) {
+                                        start();
+                                    }
+                                } catch (JMSException e) {
+                                    session.connection.onAsyncException(e);
                                 }
-                            } catch (JMSException e) {
-                                session.connection.onAsyncException(e);
                             }
-                        }
-                    }, redeliveryDelay);
-                } else {
-                    start();
+                        }, redeliveryDelay);
+                    } else {
+                        start();
+                    }
+    
                 }
-
+                deliveredCounter -= deliveredMessages.size();
+                deliveredMessages.clear();
             }
-            deliveredCounter -= deliveredMessages.size();
-            deliveredMessages.clear();
         }
         if (messageListener != null) {
             session.redispatch(this, unconsumedMessages);



Mime
View raw message