From commits-return-7927-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Mon Jan 21 10:29:26 2008 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 33542 invoked from network); 21 Jan 2008 10:29:25 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 21 Jan 2008 10:29:25 -0000 Received: (qmail 34731 invoked by uid 500); 21 Jan 2008 10:29:16 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 34712 invoked by uid 500); 21 Jan 2008 10:29:16 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 34703 invoked by uid 99); 21 Jan 2008 10:29:15 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Jan 2008 02:29:15 -0800 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Jan 2008 10:29:09 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 1E3781A9832; Mon, 21 Jan 2008 02:29:01 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r613829 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Date: Mon, 21 Jan 2008 10:29:00 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080121102901.1E3781A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rajdavies Date: Mon Jan 21 02:28:59 2008 New Revision: 613829 URL: http://svn.apache.org/viewvc?rev=613829&view=rev Log: Fix for https://issues.apache.org/activemq/browse/AMQ-1556 Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=613829&r1=613828&r2=613829&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Mon Jan 21 02:28:59 2008 @@ -595,11 +595,13 @@ MessageAck ack = null; if (deliveryingAcknowledgements.compareAndSet(false, true)) { if (this.optimizeAcknowledge) { - if (!deliveredMessages.isEmpty()) { - MessageDispatch md = deliveredMessages.getFirst(); - ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size()); - deliveredMessages.clear(); - ackCounter = 0; + synchronized(deliveredMessages) { + if (!deliveredMessages.isEmpty()) { + MessageDispatch md = deliveredMessages.getFirst(); + ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size()); + deliveredMessages.clear(); + ackCounter = 0; + } } } if (ack != null) { @@ -712,7 +714,9 @@ private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException { md.setDeliverySequenceId(session.getNextDeliveryId()); if (!session.isDupsOkAcknowledge()) { - deliveredMessages.addFirst(md); + synchronized(deliveredMessages) { + deliveredMessages.addFirst(md); + } if (session.isTransacted()) { ackLater(md, MessageAck.DELIVERED_ACK_TYPE); } @@ -730,24 +734,26 @@ if (session.isTransacted()) { // Do nothing. } else if (session.isAutoAcknowledge()) { - if (!deliveredMessages.isEmpty()) { - if (optimizeAcknowledge) { - if (deliveryingAcknowledgements.compareAndSet(false, true)) { - ackCounter++; - if (ackCounter >= (info.getCurrentPrefetchSize() * .65)) { - MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, - deliveredMessages.size()); - session.asyncSendPacket(ack); - ackCounter = 0; - deliveredMessages.clear(); + synchronized (deliveredMessages) { + if (!deliveredMessages.isEmpty()) { + if (optimizeAcknowledge) { + if (deliveryingAcknowledgements.compareAndSet( + false, true)) { + ackCounter++; + if (ackCounter >= (info + .getCurrentPrefetchSize() * .65)) { + MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size()); + session.asyncSendPacket(ack); + ackCounter = 0; + deliveredMessages.clear(); + } + deliveryingAcknowledgements.set(false); } - deliveryingAcknowledgements.set(false); + } else { + MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size()); + session.asyncSendPacket(ack); + deliveredMessages.clear(); } - } else { - MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredMessages - .size()); - session.asyncSendPacket(ack); - deliveredMessages.clear(); } } } else if (session.isDupsOkAcknowledge()) { @@ -812,30 +818,34 @@ * @throws JMSException */ public void acknowledge() throws JMSException { - if (deliveredMessages.isEmpty()) { - return; - } - - // Acknowledge the last message. - MessageDispatch lastMd = deliveredMessages.get(0); - MessageAck ack = new MessageAck(lastMd, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size()); - if (session.isTransacted()) { - session.doStartTransaction(); - ack.setTransactionId(session.getTransactionContext().getTransactionId()); - } - session.asyncSendPacket(ack); - - // Adjust the counters - deliveredCounter -= deliveredMessages.size(); - additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); - - if (!session.isTransacted()) { - deliveredMessages.clear(); + synchronized(deliveredMessages) { + if (deliveredMessages.isEmpty()) { + return; + } + + // Acknowledge the last message. + MessageDispatch lastMd = deliveredMessages.get(0); + MessageAck ack = new MessageAck(lastMd, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size()); + if (session.isTransacted()) { + session.doStartTransaction(); + ack.setTransactionId(session.getTransactionContext().getTransactionId()); + } + session.asyncSendPacket(ack); + + // Adjust the counters + deliveredCounter -= deliveredMessages.size(); + additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); + + if (!session.isTransacted()) { + deliveredMessages.clear(); + } } } public void commit() throws JMSException { - deliveredMessages.clear(); + synchronized (deliveredMessages) { + deliveredMessages.clear(); + } redeliveryDelay = 0; } @@ -845,74 +855,78 @@ // remove messages read but not acked at the broker yet through // optimizeAcknowledge if (!this.info.isBrowser()) { - for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) { - // ensure we don't filter this as a duplicate - MessageDispatch md = deliveredMessages.removeLast(); - session.connection.rollbackDuplicate(this, md.getMessage()); + synchronized(deliveredMessages) { + for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) { + // ensure we don't filter this as a duplicate + MessageDispatch md = deliveredMessages.removeLast(); + session.connection.rollbackDuplicate(this, md.getMessage()); + } } } } - if (deliveredMessages.isEmpty()) { - return; - } - - // Only increase the redlivery delay after the first redelivery.. - MessageDispatch lastMd = deliveredMessages.getFirst(); - if (lastMd.getMessage().getRedeliveryCounter() > 0) { - redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay); - } - - for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) { - MessageDispatch md = (MessageDispatch)iter.next(); - md.getMessage().onMessageRolledBack(); - } - - if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES - && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) { - // We need to NACK the messages so that they get sent to the - // DLQ. - // Acknowledge the last message. - - MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size()); - session.asyncSendPacket(ack); - // ensure we don't filter this as a duplicate - session.connection.rollbackDuplicate(this, lastMd.getMessage()); - // Adjust the window size. - additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); - redeliveryDelay = 0; - } else { - - MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size()); - session.asyncSendPacket(ack); - - // stop the delivery of messages. - unconsumedMessages.stop(); - + synchronized(deliveredMessages) { + if (deliveredMessages.isEmpty()) { + return; + } + + // Only increase the redlivery delay after the first redelivery.. + MessageDispatch lastMd = deliveredMessages.getFirst(); + if (lastMd.getMessage().getRedeliveryCounter() > 0) { + redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay); + } + for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) { MessageDispatch md = (MessageDispatch)iter.next(); - unconsumedMessages.enqueueFirst(md); + md.getMessage().onMessageRolledBack(); } - - if (redeliveryDelay > 0) { - // Start up the delivery again a little later. - Scheduler.executeAfterDelay(new Runnable() { - public void run() { - try { - if (started.get()) { - start(); + + if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES + && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) { + // We need to NACK the messages so that they get sent to the + // DLQ. + // Acknowledge the last message. + + MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size()); + session.asyncSendPacket(ack); + // ensure we don't filter this as a duplicate + session.connection.rollbackDuplicate(this, lastMd.getMessage()); + // Adjust the window size. + additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); + redeliveryDelay = 0; + } else { + + MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size()); + session.asyncSendPacket(ack); + + // stop the delivery of messages. + unconsumedMessages.stop(); + + for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) { + MessageDispatch md = (MessageDispatch)iter.next(); + unconsumedMessages.enqueueFirst(md); + } + + if (redeliveryDelay > 0) { + // Start up the delivery again a little later. + Scheduler.executeAfterDelay(new Runnable() { + public void run() { + try { + if (started.get()) { + start(); + } + } catch (JMSException e) { + session.connection.onAsyncException(e); } - } catch (JMSException e) { - session.connection.onAsyncException(e); } - } - }, redeliveryDelay); - } else { - start(); + }, redeliveryDelay); + } else { + start(); + } + } - + deliveredCounter -= deliveredMessages.size(); + deliveredMessages.clear(); } - deliveredCounter -= deliveredMessages.size(); - deliveredMessages.clear(); } if (messageListener != null) { session.redispatch(this, unconsumedMessages);