activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Diego Rodríguez Martín <drodrig...@altiria.com>
Subject Redeliveries, DLQ, AsycError
Date Thu, 30 Jul 2009 12:52:35 GMT
Hi,

    I'm trying to setup a config where message redelivery is important. 
Every message read from the queue is processed sending an http request 
to another host, so it can fail due to a lot of causes, and I want the 
system to retry the post later.
   
    I have made some tests and this is what is happening:
   
    There are, in this test, 100 messages in the queue. 2 messages are 
going to make the http request to a non existent host and exit with 
RuntimeException to rollback, ("bad messages"). 98 messages are going to 
succeed, ("good messages"). This is configurable because the host is 
extracted from a bdd with a message attribute as the key. What I 
expected to happen at the end of the test is 98 good messages processed 
and the 2 bad messages remaining in the queue waiting for a retry until 
the max redeliveries in RedeliveryPolicy is reached and then sent to 
DLQ. This is going to happen after some minutes because the redelivery 
policy is initialRedeliveryDelay 10000, maximumRedeliveries 6, 
useExponentialBackOff true, backOffMultiplier 3.
   
    What is happening instead is that after a while with no activity, I 
can see 10 messages still in the original queue, that means that 8 good 
messages are stucked in the queue. Then the 2 bad messages are retried 
until maximumRedeliveries is reached, some minutes later, sent to DLQ, 
and then the 8 good messages are processed succesfully.
   
    This means there is something that is retaining the rollbacked 
messages, not letting other messages in the queue to be processed
   
    First I thougth of prefetchPolicy, as the messages could be retained 
by this internal queue. Setting queuePrefetch to 1, didn't made any 
change. Then I thougth of DefaultMessageListenerContainer, because its 
processing threads could be blocked by this rollback.
   
    Also, when there are some messages waiting to be redelivered, if I 
desinstall the DefaultMessageListenerContainer consumers, I get an 
exception in the ActiveMQ broker, probably related with the connection 
that is retaining the rollbacked messages:
   
30 jul 2009 14:17:45,140 ERROR [ActiveMQ Transport: 
tcp:///127.0.0.1:3656] 
org.apache.activemq.broker.TransportConnection.Service  - sync error 
occurred:
javax.jms.JMSException: Transaction 
'TX:ID:mymachine-4579-1248953104078-0:5207:52' has not been started.
javax.jms.JMSException: Transaction 
'TX:ID:mymachine-4579-1248953104078-0:5207:52' has not been started.
        at 
org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:270)
        at 
org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:190)
        at 
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
        at 
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
        at 
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
        at 
org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
        at 
org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
        at org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
        at 
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
        at 
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
        at 
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
        at 
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
        at 
org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
        at 
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
        at 
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
        at 
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
        at java.lang.Thread.run(Thread.java:595)   
   
   
    I don't know how to test who's causing this problem, 
DefaultMessageListenerContainer or ActiveMQ. Maybe is 
DefaultMessageListenerContainer consumer threads, but the exception in 
the broker means ActiveMQ could be also the problem.
   
    Anyone can help me to narrow the problem?
   
    I am using ActiveMQ 5.2 in a window box, and Spring 
DefaultMessageListenerContainer for the consumers with this config. In 
you need more config data, please ask
   
   
    <bean id="jmsConnectionFactory" 
class="org.apache.activemq.spring.ActiveMQConnectionFactory" 
lazy-init="true">
        <property name="brokerURL" 
value="failover:(tcp://localhost:6275?soTimeout=60000&amp;connectionTimeout=30000)?initialReconnectDelay=20000&amp;randomize=false&amp;maxReconnectAttempts=-1&amp;useExponentialBackOff=false&amp;reconnectDelayExponent=2"/>
        <property name="redeliveryPolicy">
            <bean class="org.apache.activemq.RedeliveryPolicy">
                <property name="initialRedeliveryDelay" value="10000" />
                <property name="maximumRedeliveries" value="6" />
                <property name="useExponentialBackOff" value="true" />
                <property name="backOffMultiplier" value="3" />
            </bean>
        </property>
        <property name="prefetchPolicy">
            <bean class="org.apache.activemq.ActiveMQPrefetchPolicy">
                <property name="queuePrefetch" value="1" />
            </bean>
        </property>
        <property name="copyMessageOnSend" value="false"/>            
           
        <property name="userName" value="amquser"/>
        <property name="password" value="amquserpassword"/>
    </bean>
   
    <bean id="inputQueueListenerContainer" 
class="org.springframework.jms.listener.DefaultMessageListenerContainer" 
destroy-method="shutdown"  lazy-init="true">
        <property name="connectionFactory">
            <ref bean="jmsConnectionFactory" />
        </property>
        <property name="destination">
            <ref bean="inputQueue" />
        </property>
        <property name="messageListener">
            <ref bean="inputQueueMessageListener" />
        </property>
        <property name="sessionTransacted" value="true" />
        <property name="concurrentConsumers" value="2" />
        <property name="maxConcurrentConsumers" value="4" />
    </bean>

-- 
-------------------------------------------------------------
http://www.programacionenjava.com
-------------------------------------------------------------


Mime
View raw message