activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Timothy Bish (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AMQ-3664) Not all messages will be acknowledged when optimizeAcknowledge is true
Date Tue, 18 Sep 2012 10:17:07 GMT

    [ https://issues.apache.org/jira/browse/AMQ-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13457723#comment-13457723
] 

Timothy Bish commented on AMQ-3664:
-----------------------------------

added option optimizedAckScheduledAckInterval to Connection and MessageConsumer to allow for
a configurable async ack of outstanding messages.  By default this option is set to zero,
meaning no async acks are sent.  If the user wants to ensure all messages are acked at some
point then they can enable this by setting to a value greater than zero.  
                
> Not all messages will be acknowledged when optimizeAcknowledge is true
> ----------------------------------------------------------------------
>
>                 Key: AMQ-3664
>                 URL: https://issues.apache.org/jira/browse/AMQ-3664
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.5.1
>         Environment: Windows 7 and Linux Debian with JRE 1.6.24 or JRE 1.6.27
>            Reporter: Matthias Wessel
>            Priority: Critical
>             Fix For: 5.7.0
>
>
> I make performance test with activemq. When I set optimizeAcknowledge = true I get a
dramatic performance improvement, but when I shut down the producer the consumer does not
acknowledge all messages! If I stop the consumer and then I start the consumer a second time
the consumer recieves messages again and again not all messages will be acknoledged in the
queue.
> I am using camel 2.9.0 to produce and consume the messages.
> I am using the consumer Template with asyncSendBody.
> The following route is configured in the camelContext:
> {noformat}
>     <camel:camelContext id="camelContext">
>     	<camel:template id="producerTemplate"/>
>     	<camel:consumerTemplate id="consumerTemplate"/>
>     	<camel:route>
>     		<camel:from uri="jms:queue0?concurrentConsumers=3&amp;maxConcurrentConsumers=10&amp;asyncConsumer=true"/>
>     		<camel:to uri="beanConsumer"/>
>     	</camel:route>
>     </camel:camelContext>
> The config for the ActiveMQComponent:
>     <bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
> 		<property name="connectionFactory">		
> 			<bean class="org.apache.activemq.pool.PooledConnectionFactory">
>    				<property name="connectionFactory">
>   					<bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
>    						<property name="optimizeAcknowledge" value="true"/>
>    						<property name="dispatchAsync" value="true"/>
>   						<property name="sendAcksAsync" value="true"/>
>   						<property name="useAsyncSend" value="true"/>
> 				 		<property name="brokerURL" value="nio://138-ham-de:61616"/>				 		
> 				 		<property name="useDedicatedTaskRunner" value="false"/> 
>     				</bean>	
>       			</property>
>       		</bean>
>       	</property>
>     </bean>
> {noformat}
> I think, the problem is here:
> Class ActiveMQMessageConsumer:
> {noformat}
>     private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws
JMSException {
>         if (unconsumedMessages.isClosed()) {
>             return;
>         }
>         if (messageExpired) {
>             synchronized (deliveredMessages) {
>                 deliveredMessages.remove(md);
>             }
>             stats.getExpiredMessageCount().increment();
>             ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
>         } else {
>             stats.onMessage();
>             if (session.getTransacted()) {
>                 // Do nothing.
>             } else if (isAutoAcknowledgeEach()) {
>                 if (deliveryingAcknowledgements.compareAndSet(false, true)) {
>                     synchronized (deliveredMessages) {
>                         if (!deliveredMessages.isEmpty()) {
>                             if (optimizeAcknowledge) {
>                                 ackCounter++;
>                                 if (ackCounter >= (info.getPrefetchSize() * .65) ||
System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAckTimeout)) {
>                                 	MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
>                                 	if (ack != null) {
>                             		    deliveredMessages.clear();
>                             		    ackCounter = 0;
>                             		    session.sendAck(ack);
>                             		    optimizeAckTimestamp = System.currentTimeMillis();
>                                 	}
>                                 }
>                             } else {
>                                 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
>                                 if (ack!=null) {
>                                     deliveredMessages.clear();
>                                     session.sendAck(ack);
>                                 }
>                             }
>                         }
>                     }
>                     deliveryingAcknowledgements.set(false);
>                 }
>             } else if (isAutoAcknowledgeBatch()) {
>                 ackLater(md, MessageAck.STANDARD_ACK_TYPE);
>             } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge())
{
>                 boolean messageUnackedByConsumer = false;
>                 synchronized (deliveredMessages) {
>                     messageUnackedByConsumer = deliveredMessages.contains(md);
>                 }
>                 if (messageUnackedByConsumer) {
>                     ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
>                 }
>             } 
>             else {
>                 throw new IllegalStateException("Invalid session state.");
>             }
>         }
>     }
> {noformat}
> What will happen when no producer will send a message to this queue so that no message
will pass this method? When will the deliveredMessages been acked?

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message