activemq-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Piotr Klimczak (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (AMQ-5854) Duplicate messages when failover is done during prepare phase of two phase commit.
Date Wed, 16 Sep 2015 13:40:46 GMT

    [ https://issues.apache.org/jira/browse/AMQ-5854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14768939#comment-14768939
] 

Piotr Klimczak edited comment on AMQ-5854 at 9/16/15 1:40 PM:
--------------------------------------------------------------

hakim.acharifi,

Your finding means, that ActiveMQ has problems not only with consuming messages from queues
but also with producing to queues.
In other words simple JMSMessageID verification is not enough to prevent duplications, as
in case of "duplicated production", each produced message will have different JMSMessageID
anyway.

So the message is, that nobody should workaround this issue implementing idempotent consumer
basing on JMSMessageID.

Cheers


was (Author: nannou9):
hakim.acharifi,

Your finding means, that ActiveMQ has problems not only with consuming messages from queues
but also with producing to queues.
In other words simple JMSMessageID verification is not enough to prevent duplications, as
in case of "duplicated production", each produced message will have different JMSMessageID
anyway.

So then nobody should workaround this issue implementing idempotent consumer basing on JMSMessageID.

Cheers

> Duplicate messages when failover is done during prepare phase of two phase commit.
> ----------------------------------------------------------------------------------
>
>                 Key: AMQ-5854
>                 URL: https://issues.apache.org/jira/browse/AMQ-5854
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, JMS client
>    Affects Versions: 5.9.1, 5.10.2, 5.11.1
>         Environment: Ubuntu or AIX
> ActiveMQ 5.9.1 (problem is reproduced in 5.10.2 and 5.11.1)
> Spring DMLC
> XA transactions with atomikos 3.7.0 (problem is also reproduced with 3.9.15)
> Persistent messages
> Multithreading (this problem occur when there is at least 2 consummers on a queue)
>            Reporter: Michael
>         Attachments: ActiveMQMessageConsumer-5.10.2-ModifyWithThreadSleep.java, ActiveMQMessageConsumer-5.11.1-ModifyWithThreadSleep.java,
amq5854.tar.gz
>
>
> Use case :
>                 With Spring DMLC, Read a jms message in a queue, produce a jms message
in an output queue and write data in database.
> Problem description :
>                 Due to hight CPU usage, the inactity monitor closes connections between
clients and broker while 16 messages were processed.
> {noformat}
> 2015-06-01 04:39:01,130 | WARN  | Transport Connection to: tcp://*** failed: org.apache.activemq.transport.InactivityIOException:
Channel was inactive for too (>30000) long: tcp://*** | org.apache.activemq.broker.TransportConnection.Transport
| ActiveMQ InactivityMonitor Worker
> {noformat}
>                 15 messages are rolled back and redilevered to another consummer.
>                 In the log we got 15 warnings :
> {noformat}
> ActiveMQMessageConsumer   |WARN |jmsContainer-173|rolling back transaction (XID:***)
post failover recovery. 1 previously delivered message(s) not replayed to consumer: ***
> {noformat}
>                 But one message is not rolled back (the transaction commit) and is also
redileverd to another consummer. So it's processed twice by two different consummers (two
inserts in database and two output JMS messages generated) and is not deduplicated.
>                 In the activeMq log we got the message :
> {noformat}
> WARN  | Async error occurred:  | org.apache.activemq.broker.TransportConnection.Service
| ActiveMQ Transport: tcp:///***
>                        javax.jms.JMSException: Unmatched acknowledge: MessageAck {commandId
= 6665, responseRequired = false, ackType = 2, consumerId = ID:***, firstMessageId = ID:***-50800-1433109620591-1:2:31356:1:1,
lastMessageId = ID:***-50800-1433109620591-1:2:31356:1:1, destination = queue://***, transactionId
= XID:[1096044365,globalId=47524f55505f3030303038736572766963657472616974656d656e7431363536373030343133,branchId=47524f55505f3030303038736572766963657472616974656d656e743137343737],
messageCount = 1, poisonCause = null}; Could not find Message-ID ID:***-50800-1433109620591-1:2:31356:1:1
in dispatched-list (start of ack)
> {noformat}
>                 For this duplicated message, the failover occur during prepare phase
of commit :
> {noformat}
> [{2015/06/01 04:39:50,322 |FailoverTransport         |WARN |jmsContainer-152|Transport
(tcp://***) failed, reason:  , attempting to automatically reconnect}]
> org.apache.activemq.transport.InactivityIOException: Cannot send, channel has already
failed: ***
>                 at org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:297)
>                 at org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:286)
>                 at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:85)
>                 at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
>                 at org.apache.activemq.transport.failover.FailoverTransport.oneway(FailoverTransport.java:658)
>                 at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
>                 at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
>                 at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1321)
>                 at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1315)
>                 at org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1933)
>                 at org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2099)
>                 at org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2094)
>                 at org.apache.activemq.ActiveMQMessageConsumer.acknowledge(ActiveMQMessageConsumer.java:1083)
>                 at org.apache.activemq.ActiveMQMessageConsumer$5.beforeEnd(ActiveMQMessageConsumer.java:1041)
>                 at org.apache.activemq.TransactionContext.beforeEnd(TransactionContext.java:202)
>                 at org.apache.activemq.TransactionContext.end(TransactionContext.java:409)
>                 at com.atomikos.datasource.xa.XAResourceTransaction.suspend(XAResourceTransaction.java:457)
>                 at com.atomikos.datasource.xa.XAResourceTransaction.prepare(XAResourceTransaction.java:608)
>                 at com.atomikos.icatch.imp.PrepareMessage.send(PrepareMessage.java:61)
>                 at com.atomikos.icatch.imp.PropagationMessage.submit(PropagationMessage.java:111)
>                 at com.atomikos.icatch.imp.Propagator$PropagatorThread.run(Propagator.java:87)
>                 at com.atomikos.icatch.imp.Propagator.submitPropagationMessage(Propagator.java:66)
>                 at com.atomikos.icatch.imp.ActiveStateHandler.prepare(ActiveStateHandler.java:173)
>                 at com.atomikos.icatch.imp.CoordinatorImp.prepare(CoordinatorImp.java:832)
>                 at com.atomikos.icatch.imp.CoordinatorImp.terminate(CoordinatorImp.java:1159)
>                 at com.atomikos.icatch.imp.CompositeTerminatorImp.commit(CompositeTerminatorImp.java:92)
>                 at com.atomikos.icatch.jta.TransactionImp.commit(TransactionImp.java:236)
>                 at com.atomikos.icatch.jta.TransactionManagerImp.commit(TransactionManagerImp.java:498)
>                 at com.atomikos.icatch.jta.UserTransactionImp.commit(UserTransactionImp.java:129)
>                 at org.springframework.transaction.jta.JtaTransactionManager.doCommit(JtaTransactionManager.java:1011)
>                 at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:755)
>                 at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:724)
>                 at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257)
>                 at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1101)
>                 at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:995)
>                 at java.lang.Thread.run(Thread.java:761)
> {noformat}
> Our analysis :
>                 We think that the duplicate message is caused by the failover during
the prepare phase of the commit so we modify the source code to reproduce the case.
>                 Our modifications in config to produce failovers:
>                                broker : transport.useKeepAlive=false
>                                client : wireFormat.maxInactivityDuration=5000
>                 We add Thread.sleep in the source code of org.apache.activemq.ActiveMQMessageConsumer
to force failover to be done exactly where we think it causes problems :
> {code:title=org.apache.activemq.ActiveMQMessageConsumer#acknowledge()|borderStyle=solid}
               
>                     public void acknowledge() throws JMSException {
>                                clearDeliveredList();
>                                waitForRedeliveries();
>                                synchronized(deliveredMessages) {
>                                    // BEGIN MODIFIED CODE
>                                    LOG.warn("start sleeping 20 seconds to test failover");
>                                    try{
>                                        Thread.sleep(1000 * 20 );
>                                    }catch (InterruptedException e){
>                                        LOG.error("Exception :",e);
>                                    }
>                                    LOG.warn("end sleeping 20 seconds to test failover");
>                                    // END MODIFIED CODE
>                                    // Acknowledge all messages so far.
>                                    MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
>                                    if (ack == null)
>                                        return; // no msgs
>                                    if (session.getTransacted()) {
>                                        rollbackOnFailedRecoveryRedelivery();
>                                        session.doStartTransaction();
>                                        ack.setTransactionId(session.getTransactionContext().getTransactionId());
>                                    }
>                                    pendingAck = null;
>                                     session.sendAck(ack);
>                                    // Adjust the counters
>                                    deliveredCounter = Math.max(0, deliveredCounter -
deliveredMessages.size());
>                                    additionalWindowSize = Math.max(0, additionalWindowSize
- deliveredMessages.size());
>                                    if (!session.getTransacted()) {
>                                        deliveredMessages.clear();
>                                    }
>                                }
>                     }
> {code}                               
>                 
>                 With these changes on the configuration and the code, the problem is
easily reproduced.
>                 We also try with transactedIndividualAck=true, and we add a Thread.sleep
in the code :
> {code:title=org.apache.activemq.ActiveMQMessageConsumer#registerSync()|borderStyle=solid}
               
>                     private void registerSync() throws JMSException {
>                                session.doStartTransaction();
>                                if (!synchronizationRegistered) {
>                                    synchronizationRegistered = true;
>                                    session.getTransactionContext().addSynchronization(new
Synchronization() {
>                                        @Override
>                                        public void beforeEnd() throws Exception {
>                                            if (transactedIndividualAck) {
>                                                clearDeliveredList();
>                                                waitForRedeliveries();
>                                                synchronized(deliveredMessages) {
>                                                    
>                                                    // BEGIN MODIFIED CODE
>                                                    LOG.warn("start sleeping 20 seconds
to test failover");
>                                                    try{
>                                                        Thread.sleep(1000 * 20 );
>                                                    }catch (InterruptedException e){
>                                                        LOG.error("Exception :",e);
>                                                    }
>                                                    LOG.warn("end sleeping 20 seconds
to test failover");
>                                                    // END MODIFIED CODE             
              
>                                                    rollbackOnFailedRecoveryRedelivery();
>                                                }
>                                            } else {
>                                                acknowledge();
>                                            }
>                                            synchronizationRegistered = false;
>                                        }
>                                        @Override
>                                        public void afterCommit() throws Exception {
>                                            commit();
>                                            synchronizationRegistered = false;
>                                        }
>                                        @Override
>                                        public void afterRollback() throws Exception {
>                                            rollback();
>                                            synchronizationRegistered = false;
>                                        }
>                                    });
>                                }
>                     }
> {code}                                               
>                 With these modifications, we still get duplicates messages.
>                 We think that the problem is that the statement synchronized(deliveredMessages)
prevents the call of clearDeliveredList() by another ActiveMQConnection thread that clears
messages in progress.
>                 By adding logs we observe that a thread is waiting deliveredMessages
‘s lock in clearDeliveredList() method.
>                 
> Question :
>                 
>                 We tried fixes described in https://issues.apache.org/jira/browse/AMQ-5068
and https://issues.apache.org/jira/browse/AMQ-3519 but it doesn’t help to solve our problem.
>                 Is there a workaround or a config parameter that can help to prevent
this problem ?
>                 
>                 We are working on our side to find a correction. An option may be to
force rolling back transaction if there is a failover during the prepare phase of commit in
ConnectionStateTracker.restoreTransactions().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message