Return-Path: X-Original-To: apmail-activemq-dev-archive@www.apache.org Delivered-To: apmail-activemq-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 637BED391 for ; Sat, 8 Sep 2012 10:20:11 +0000 (UTC) Received: (qmail 47950 invoked by uid 500); 8 Sep 2012 10:20:11 -0000 Delivered-To: apmail-activemq-dev-archive@activemq.apache.org Received: (qmail 47636 invoked by uid 500); 8 Sep 2012 10:20:08 -0000 Mailing-List: contact dev-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list dev@activemq.apache.org Received: (qmail 47602 invoked by uid 99); 8 Sep 2012 10:20:07 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 08 Sep 2012 10:20:07 +0000 Date: Sat, 8 Sep 2012 21:20:07 +1100 (NCT) From: "Claus Ibsen (JIRA)" To: dev@activemq.apache.org Message-ID: <1359717962.54039.1347099607445.JavaMail.jiratomcat@arcas> In-Reply-To: <1966239247.47957.1326793120162.JavaMail.tomcat@hel.zones.apache.org> Subject: [jira] [Commented] (AMQ-3664) Not all messages will be acknowledged when optimizeAcknowledge is true MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/AMQ-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13451306#comment-13451306 ] Claus Ibsen commented on AMQ-3664: ---------------------------------- Gary Yeah maybe a multiplexed background thread could run and acknowledge the messages if the consumer has been inactive for a period. I wonder if we can use a single thread for all optimized ack consumers, I would assume a thread per consumer would bee too much? > Not all messages will be acknowledged when optimizeAcknowledge is true > ---------------------------------------------------------------------- > > Key: AMQ-3664 > URL: https://issues.apache.org/jira/browse/AMQ-3664 > Project: ActiveMQ > Issue Type: Bug > Components: Broker > Affects Versions: 5.5.1 > Environment: Windows 7 and Linux Debian with JRE 1.6.24 or JRE 1.6.27 > Reporter: Matthias Wessel > Priority: Critical > > I make performance test with activemq. When I set optimizeAcknowledge = true I get a dramatic performance improvement, but when I shut down the producer the consumer does not acknowledge all messages! If I stop the consumer and then I start the consumer a second time the consumer recieves messages again and again not all messages will be acknoledged in the queue. > I am using camel 2.9.0 to produce and consume the messages. > I am using the consumer Template with asyncSendBody. > The following route is configured in the camelContext: > {noformat} > > > > > > > > > The config for the ActiveMQComponent: > > > > > > > > > > > > > > > > > {noformat} > I think, the problem is here: > Class ActiveMQMessageConsumer: > {noformat} > private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException { > if (unconsumedMessages.isClosed()) { > return; > } > if (messageExpired) { > synchronized (deliveredMessages) { > deliveredMessages.remove(md); > } > stats.getExpiredMessageCount().increment(); > ackLater(md, MessageAck.DELIVERED_ACK_TYPE); > } else { > stats.onMessage(); > if (session.getTransacted()) { > // Do nothing. > } else if (isAutoAcknowledgeEach()) { > if (deliveryingAcknowledgements.compareAndSet(false, true)) { > synchronized (deliveredMessages) { > if (!deliveredMessages.isEmpty()) { > if (optimizeAcknowledge) { > ackCounter++; > if (ackCounter >= (info.getPrefetchSize() * .65) || System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAckTimeout)) { > MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); > if (ack != null) { > deliveredMessages.clear(); > ackCounter = 0; > session.sendAck(ack); > optimizeAckTimestamp = System.currentTimeMillis(); > } > } > } else { > MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); > if (ack!=null) { > deliveredMessages.clear(); > session.sendAck(ack); > } > } > } > } > deliveryingAcknowledgements.set(false); > } > } else if (isAutoAcknowledgeBatch()) { > ackLater(md, MessageAck.STANDARD_ACK_TYPE); > } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) { > boolean messageUnackedByConsumer = false; > synchronized (deliveredMessages) { > messageUnackedByConsumer = deliveredMessages.contains(md); > } > if (messageUnackedByConsumer) { > ackLater(md, MessageAck.DELIVERED_ACK_TYPE); > } > } > else { > throw new IllegalStateException("Invalid session state."); > } > } > } > {noformat} > What will happen when no producer will send a message to this queue so that no message will pass this method? When will the deliveredMessages been acked? -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira