Return-Path: X-Original-To: apmail-activemq-dev-archive@www.apache.org Delivered-To: apmail-activemq-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AE796DD21 for ; Fri, 10 Aug 2012 10:23:29 +0000 (UTC) Received: (qmail 33119 invoked by uid 500); 10 Aug 2012 10:23:29 -0000 Delivered-To: apmail-activemq-dev-archive@activemq.apache.org Received: (qmail 32946 invoked by uid 500); 10 Aug 2012 10:23:29 -0000 Mailing-List: contact dev-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list dev@activemq.apache.org Received: (qmail 32884 invoked by uid 99); 10 Aug 2012 10:23:29 -0000 Received: from issues-vm.apache.org (HELO issues-vm) (140.211.11.160) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Aug 2012 10:23:29 +0000 Received: from isssues-vm.apache.org (localhost [127.0.0.1]) by issues-vm (Postfix) with ESMTP id F1BA814182B for ; Fri, 10 Aug 2012 10:23:28 +0000 (UTC) Date: Fri, 10 Aug 2012 10:23:28 +0000 (UTC) From: "Torsten Mielke (JIRA)" To: dev@activemq.apache.org Message-ID: <1606260124.54.1344594208992.JavaMail.jiratomcat@issues-vm> In-Reply-To: <1650068005.351.1344438500721.JavaMail.jiratomcat@issues-vm> Subject: [jira] [Comment Edited] (AMQ-3965) Expired msgs not getting acked to broker causing consumer to fill up its prefetch and not getting more msgs. MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/AMQ-3965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13432683#comment-13432683 ] Torsten Mielke edited comment on AMQ-3965 at 8/10/12 10:21 AM: --------------------------------------------------------------- Proposing the following fix: {code:title=ActiveMQMessageConsumer.java} private void ackLater(MessageDispatch md, byte ackType) throws JMSException { [...] if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter + ackCounter - additionalWindowSize)) { session.sendAck(pendingAck); pendingAck=null; deliveredCounter = 0; additionalWindowSize = 0; } [...] private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException { [...] synchronized (deliveredMessages) { if (!deliveredMessages.isEmpty()) { if (optimizeAcknowledge) { ackCounter++; // AMQ-3956 evaluate both expired and normal msgs as // otherwise consumer may get stalled if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) { MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); if (ack != null) { deliveredMessages.clear(); ackCounter = 0; session.sendAck(ack); optimizeAckTimestamp = System.currentTimeMillis(); } // AMQ-3956 - as further optimization send // ack for expired msgs when there are any. // This resets the deliveredCounter to 0 so that // we won't sent standard acks with every msg just // because the deliveredCounter just below // 0.5 * prefetch as used in ackLater() if (pendingAck != null && deliveredCounter > 0) { session.sendAck(pendingAck); pendingAck = null; deliveredCounter = 0; } } } {code} Essentially, both methods ackLater() and afterMessageIsConsumed() now evaluate ackCounter + deliveredCounter. This will avoid getting a stalled consumer and fixes the bug. However an additional optimization is necessary for the following case: Suppose a prefetch=100. Consumer receives 49 msgs that expire before being dispatched. So deliveredCounter=49, ackCounter=0. No ack is sent as threshold is below 50. Next, consumer only processes non-expired msgs. As deliveredCounter=49, it will dispatch 17 msg and then send a standard ack. Because after the 17th msg, deliveredCounter=49, ackCounter=17, so that exceeds 0.65*prefetch and hence a standard ack is sent. The ack resets the ackCounter but not the deliveredCounter, which remains at 49. So we would process only another 17 non-expired msgs before we would send the next standard ack to the broker. The problem is that the deliveredCounter does not change unless we receive another expired msg. If we only receive non-expired msgs going further, then deliveredCounter never gets reset to 0 and we ack after every 17 msgs instead of after 0.65*prefetch=65 msg. To avoid this situation, I propose to send a deliveredAck right after the standard Ack. This would reset the deliveredCounter to 0 and we can process the next 65 non-expired msgs without having to send back an ack (as opposed to processing only 17 msg before sending an ack). If there is no expired msgs to be acked, then no deliveredAck is being sent. So this should only trigger in the case where some expired msgs have accumulated but have not being acked yet. was (Author: tmielke): Proposing the following fix: {code:title=ActiveMQMessageConsumer.java} private void ackLater(MessageDispatch md, byte ackType) throws JMSException { [...] if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter + ackCounter - additionalWindowSize)) { session.sendAck(pendingAck); pendingAck=null; deliveredCounter = 0; additionalWindowSize = 0; } [...] private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException { [...] synchronized (deliveredMessages) { if (!deliveredMessages.isEmpty()) { if (optimizeAcknowledge) { ackCounter++; // AMQ-3956 evaluate both expired and normal msgs as // otherwise consumer may get stalled if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) { MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); if (ack != null) { deliveredMessages.clear(); ackCounter = 0; session.sendAck(ack); optimizeAckTimestamp = System.currentTimeMillis(); } // AMQ-3956 - as further optimization send // ack for expired msgs when there are any. // This resets the deliveredCounter to 0 so that // we won't sent standard acks with every msg just // because the deliveredCounter just below // 0.5 * prefetch as used in ackLater() if (pendingAck != null && deliveredCounter > 0) { session.sendAck(pendingAck); pendingAck = null; deliveredCounter = 0; } } } {code} Essentially, both methods ackLater() and afterMessageIsConsumed() now evaluate ackCounter + deliveredCounter. This will avoid getting a stalled consumer and fixes the bug. However an additional optimization is necessary for the following case: Suppose a prefetch=100. Consumer receives 49 msgs that expire before being dispatched. So deliveredCounter=49, ackCounter=0. No ack is sent as threshold is is below 50. Next, consumer only processes non-expired msgs. As deliveredCounter=49, it will dispatch 17 msg and then send an ack. After the 17th msg, deliveredCounter=49, ackCounter=17, so that exceeds 0.65*prefetch and hence a standard ack is sent. The ack resets the ackCounter but not the deliveredCounter, which remains at 49. So we would process only another 17 non-expired msgs before we would send the next standard ack to the broker. The problem is that the deliveredCounter does not change unless we receive another expired msg. If we only receive non-expired msgs going further, then deliveredCounter never gets reset to 0 and we ack after every 17 msgs. To avoid this situation, I propose to send a deliveredAck right after the standard Ack. This would reset the deliveredCounter to 0 and we can process the next 65 non-expired msgs without having to send back an ack (as opposed to processing only 17 msg before sending an ack). If there is no expired msgs to be acked, then no deliveredAck is being sent. So this should only trigger in the case where some expired msgs have accumulated but have not being acked yet. > Expired msgs not getting acked to broker causing consumer to fill up its prefetch and not getting more msgs. > ------------------------------------------------------------------------------------------------------------ > > Key: AMQ-3965 > URL: https://issues.apache.org/jira/browse/AMQ-3965 > Project: ActiveMQ > Issue Type: Bug > Components: JMS client > Affects Versions: 5.6.0 > Reporter: Torsten Mielke > Labels: optimizeDispatch > Attachments: OptimizeAcknowledgeWithExpiredMsgsTest.java, testcase.tgz > > > It is possible to get a consumer stalled and not receiving any more messages when using optimizeAcknowledge. > Let me illustrate in an example (JUnit test attached). > Suppose a consumer with optimizeAcknowledge and a prefetch of 100 msgs. > The broker's queue contains 105 msg. The first 45 msgs have a very low expiry time, the remaining don't expiry. > So the first 100 msgs get dispatched to the consumer (due to prefetch=100). Out of these the first 45 msgs do not get dispatched to consumer code because their expiry has elapsed by the time that are handled in the client. > {code:title=ActiveMQMessageConsumer.java} > public void dispatch(MessageDispatch md) { > MessageListener listener = this.messageListener.get(); > try { > [...] > synchronized (unconsumedMessages.getMutex()) { > if (!unconsumedMessages.isClosed()) { > if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) { > if (listener != null && unconsumedMessages.isRunning()) { > ActiveMQMessage message = createActiveMQMessage(md); > beforeMessageIsConsumed(md); > try { > boolean expired = message.isExpired(); > if (!expired) { > listener.onMessage(message); > } > afterMessageIsConsumed(md, expired); > {code} > listener.onMessage() above is not called as the msg has expired. > However it will calls into afterMessagesIsConsumed() > {code:title=ActiveMQMessageConsumer.java} > private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException { > [...] > if (messageExpired) { > synchronized (deliveredMessages) { > deliveredMessages.remove(md); > } > stats.getExpiredMessageCount().increment(); > ackLater(md, MessageAck.DELIVERED_ACK_TYPE); > {code} > and will remove the expired msg from the deliveredMessages list. It then calls into ackLater(). > However ackLater() only fires an ack back to the broker when the number of unsent acks has reached 50% of the prefetch value. > {code:title=ActiveMQMessageConsumer.java} > private void ackLater(MessageDispatch md, byte ackType) throws JMSException { > [...] > if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) { > session.sendAck(pendingAck); > {code} > In our example it has not reached that mark (only 45 expired msgs, i.e. 45%). > So the first 45 msgs, which expired before being dispatched, did not cause an ack being sent to the broker. > Now the next 55 messages get processed. These don't have an expiry so they get dispatched to consumer code. > After dispatching each msg to the registered application code, we call into afterMessageIsConsumed() but this time executing a different branch as the msgs are not expired > {code:title=ActiveMQMessageConsumer.java} > private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException { > [...] > else if (isAutoAcknowledgeEach()) { > if (deliveryingAcknowledgements.compareAndSet(false, true)) { > synchronized (deliveredMessages) { > if (!deliveredMessages.isEmpty()) { > if (optimizeAcknowledge) { > ackCounter++; > if (ackCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) { > MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); > if (ack != null) { > deliveredMessages.clear(); > ackCounter = 0; > session.sendAck(ack); > optimizeAckTimestamp = System.currentTimeMillis(); > } > } > {code} > with optimizeAcknowledge=true we only send an ack back to the broker if either optimizeAcknowledgeTimeOut has elapsed or the ackCounter has reached 65% of the prefetch (100). > The timeout will not have kicked in. The ackCounter will be at 55 after processing the last of 100 prefetched messages which is less than 65% of 100. So with the last prefetched msg being processed, it will not generate an ack back to the broker. > As a result, the client has processed all prefetched message and will not get any new messages dispatched from the broker. The broker has another 5 msgs on the queue but since it never received an ack from the client, it won't dispatch any further messages. > The client is stalled. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira