activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Remo Gloor (JIRA)" <jira+amq...@apache.org>
Subject [jira] [Created] (AMQNET-412) Messages are enlisted to the wrong transaction
Date Thu, 14 Feb 2013 17:43:12 GMT
Remo Gloor created AMQNET-412:
---------------------------------

             Summary: Messages are enlisted to the wrong transaction
                 Key: AMQNET-412
                 URL: https://issues.apache.org/jira/browse/AMQNET-412
             Project: ActiveMQ .Net
          Issue Type: Bug
         Environment: Apache.NMS.ActiveMq 1.5.7
            Reporter: Remo Gloor
            Assignee: Jim Gomes
            Priority: Critical


Under load active mq enlists a message to a previous transactions. This leads to very strange
behaviors:
- Database is updated and message is rolled back
- Message is completed but database rolledback

All this results in an invalid system state making. DTC is not usable this way.

Analysis of the source code have shown that the problem is in NetTxSession.DoStartTransaction
There it checks if a .NET Transaction in the TransactionContext. In this case it adds the
message to that transaction. But this can be the previous transaction because DTC 2-PhaseCommit
is asyncronous. It needs to check if the Current Transaction is the same as one before and
wait if they do not match.

The following applacation demonstrates the problem when enough messages are processed E.g.
enqueue 100 msg in foo.bar. It is basically TestRedeliveredCase3 but with half of the messages
failing. 

Whenever a SinglePhaseCommit occurs in the TestSinglePhaseCommit this means the database would
be commited in an own transaction. 

    class Program
    {
        private static INetTxSession activeMqSession;
        private static IMessageConsumer consumer;
        private static INetTxConnection connection;

        static void Main(string[] args)
        {
            using (connection = CreateActiveMqConnection())
            using (activeMqSession = connection.CreateNetTxSession())
            using (consumer = activeMqSession.CreateConsumer(SessionUtil.GetQueue(activeMqSession,
"queue://foo.bar")))
            {
                connection.Start();

                while (true)
                {
                    try
                    {
                        using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
                        {
                            IMessage msg = null;
                            while (msg == null)
                            {
                                msg = consumer.ReceiveNoWait();
                            }

                            OnMessage(msg);
                            scoped.Complete();
                        }
                    }
                    catch(Exception exception) {}
                }
            }
        }

        private static INetTxConnection CreateActiveMqConnection()
        {
            var connectionFactory = new Apache.NMS.ActiveMQ.NetTxConnectionFactory("activemq:tcp://localhost:61616")
            {
                AcknowledgementMode = AcknowledgementMode.Transactional
            };

            return connectionFactory.CreateNetTxConnection();
        }

        private static void OnMessage(IMessage message)
        {
            var x = new TestSinglePhaseCommit();

            var session2 = activeMqSession;
            {
                Transaction.Current.EnlistDurable(Guid.NewGuid(), x, EnlistmentOptions.None);

                // The proble occurs only if a message is sent using the same session like
the receiver
                using (var producer = session2.CreateProducer(SessionUtil.GetQueue(session2,
"queue://foo.baz")))
                {
                    producer.Send(new ActiveMQTextMessage("foo"));
                }

                if (new Random().Next(2) == 0) throw new Exception();
            }
        }
    }

    internal class TestSinglePhaseCommit : ISinglePhaseNotification
    {
        public void Prepare(PreparingEnlistment preparingEnlistment)
        {
            Console.WriteLine("Tx Prepare");
            preparingEnlistment.Prepared();
        }
        public void Commit(Enlistment enlistment)
        {
            Console.WriteLine("Tx Commit");
            enlistment.Done();
        }

        public void Rollback(Enlistment enlistment)
        {
            Console.WriteLine("Tx Rollback");
            enlistment.Done();
        }
        public void InDoubt(Enlistment enlistment)
        {
            Console.WriteLine("Tx InDoubt");
            enlistment.Done();
        }
        public void SinglePhaseCommit(SinglePhaseEnlistment singlePhaseEnlistment)
        {
            Console.WriteLine("Tx SinglePhaseCommit");
            singlePhaseEnlistment.Committed();
        }
    }



--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message