activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias Wessel (Issue Comment Edited) (JIRA)" <j...@apache.org>
Subject [jira] [Issue Comment Edited] (AMQ-3664) Not all messages will be acknowledged when optimizeAcknowledge is true
Date Wed, 18 Jan 2012 12:15:39 GMT

    [ https://issues.apache.org/jira/browse/AMQ-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13188421#comment-13188421
] 

Matthias Wessel edited comment on AMQ-3664 at 1/18/12 12:15 PM:
----------------------------------------------------------------

When I switch to another cache than CACHE_AUTO or CACHE CONSUMER, I get a dramatic deterioration
of performance. Then I am faster when I do not use ptimizeAcknowledge = true. This cannot
be the goal of this setting!
The other problem is I do not want to close the consumer. Yes, I want to close the producer,
but the consumer should consume all existing messages in the queue. The consumer should not
care about how many producers produce messages to the queue.
                
      was (Author: matw):
    When I switch to another cache than CACHE_AUTO or CACHE CONSUMER, I get a dramatic deterioration
of performance. Then I am faster when I do not use ptimizeAcknowledge = true. This cannot
be the goal of this setting!
                  
> Not all messages will be acknowledged when optimizeAcknowledge is true
> ----------------------------------------------------------------------
>
>                 Key: AMQ-3664
>                 URL: https://issues.apache.org/jira/browse/AMQ-3664
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.5.1
>         Environment: Windows 7 and Linux Debian with JRE 1.6.24 or JRE 1.6.27
>            Reporter: Matthias Wessel
>            Priority: Critical
>
> I make performance test with activemq. When I set optimizeAcknowledge = true I get a
dramatic performance improvement, but when I shut down the producer the consumer does not
acknowledge all messages! If I stop the consumer and then I start the consumer a second time
the consumer recieves messages again and again not all messages will be acknoledged in the
queue.
> I am using camel 2.9.0 to produce and consume the messages.
> I am using the consumer Template with asyncSendBody.
> The following route is configured in the camelContext:
>     <camel:camelContext id="camelContext">
>     	<camel:template id="producerTemplate"/>
>     	<camel:consumerTemplate id="consumerTemplate"/>
>     	<camel:route>
>     		<camel:from uri="jms:queue0?concurrentConsumers=3&amp;maxConcurrentConsumers=10&amp;asyncConsumer=true"/>
>     		<camel:to uri="beanConsumer"/>
>     	</camel:route>
>     </camel:camelContext>
> The config for the ActiveMQComponent:
>     <bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
> 		<property name="connectionFactory">		
> 			<bean class="org.apache.activemq.pool.PooledConnectionFactory">
>    				<property name="connectionFactory">
>   					<bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
>    						<property name="optimizeAcknowledge" value="true"/>
>    						<property name="dispatchAsync" value="true"/>
>   						<property name="sendAcksAsync" value="true"/>
>   						<property name="useAsyncSend" value="true"/>
> 				 		<property name="brokerURL" value="nio://138-ham-de:61616"/>				 		
> 				 		<property name="useDedicatedTaskRunner" value="false"/> 
>     				</bean>	
>       			</property>
>       		</bean>
>       	</property>
>     </bean>
> I think, the problem is here:
> Class ActiveMQMessageConsumer:
>     private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws
JMSException {
>         if (unconsumedMessages.isClosed()) {
>             return;
>         }
>         if (messageExpired) {
>             synchronized (deliveredMessages) {
>                 deliveredMessages.remove(md);
>             }
>             stats.getExpiredMessageCount().increment();
>             ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
>         } else {
>             stats.onMessage();
>             if (session.getTransacted()) {
>                 // Do nothing.
>             } else if (isAutoAcknowledgeEach()) {
>                 if (deliveryingAcknowledgements.compareAndSet(false, true)) {
>                     synchronized (deliveredMessages) {
>                         if (!deliveredMessages.isEmpty()) {
>                             if (optimizeAcknowledge) {
>                                 ackCounter++;
>                                 if (ackCounter >= (info.getPrefetchSize() * .65) ||
System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAckTimeout)) {
>                                 	MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
>                                 	if (ack != null) {
>                             		    deliveredMessages.clear();
>                             		    ackCounter = 0;
>                             		    session.sendAck(ack);
>                             		    optimizeAckTimestamp = System.currentTimeMillis();
>                                 	}
>                                 }
>                             } else {
>                                 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
>                                 if (ack!=null) {
>                                     deliveredMessages.clear();
>                                     session.sendAck(ack);
>                                 }
>                             }
>                         }
>                     }
>                     deliveryingAcknowledgements.set(false);
>                 }
>             } else if (isAutoAcknowledgeBatch()) {
>                 ackLater(md, MessageAck.STANDARD_ACK_TYPE);
>             } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge())
{
>                 boolean messageUnackedByConsumer = false;
>                 synchronized (deliveredMessages) {
>                     messageUnackedByConsumer = deliveredMessages.contains(md);
>                 }
>                 if (messageUnackedByConsumer) {
>                     ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
>                 }
>             } 
>             else {
>                 throw new IllegalStateException("Invalid session state.");
>             }
>         }
>     }
> What will happen when no producer will send a message to this queue so that no message
will pass this method? When will the deliveredMessages been acked?

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Mime
View raw message