activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Remo Gloor (JIRA)" <jira+amq...@apache.org>
Subject [jira] [Updated] (AMQNET-412) Messages are enlisted to the wrong transaction
Date Thu, 14 Feb 2013 17:45:13 GMT

     [ https://issues.apache.org/jira/browse/AMQNET-412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Remo Gloor updated AMQNET-412:
------------------------------

    Description: 
Under load active mq enlists a message to a previous transactions. This leads to very strange
behaviors:
- Database is updated and message is rolled back
- Message is completed but database rolledback

All this results in an invalid system state making. DTC is not usable this way.

Analysis of the source code have shown that the problem is in NetTxSession.DoStartTransaction
There it checks if a .NET Transaction in the TransactionContext. In this case it adds the
message to that transaction. But this can be the previous transaction because DTC 2-PhaseCommit
is asyncronous. It needs to check if the Current Transaction is the same as one before and
wait if they do not match.

The following applacation demonstrates the problem when enough messages are processed E.g.
enqueue 100 msg in foo.bar. It is basically TestRedeliveredCase3 but with half of the messages
failing. 

Whenever a SinglePhaseCommit occurs in the TestSinglePhaseCommit this means the database would
be commited in an own transaction. 

    class Program
    {
        private static INetTxSession activeMqSession;
        private static IMessageConsumer consumer;
        private static INetTxConnection connection;

        static void Main(string[] args)
        {
            using (connection = CreateActiveMqConnection())
            using (activeMqSession = connection.CreateNetTxSession())
            using (consumer = activeMqSession.CreateConsumer(SessionUtil.GetQueue(activeMqSession,
"queue://foo.bar")))
            {
                connection.Start();

                while (true)
                {
                    try
                    {
                        using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
                        {
                            IMessage msg = null;
                            while (msg == null)
                            {
                                msg = consumer.ReceiveNoWait();
                            }

                            OnMessage(msg);
                            scoped.Complete();
                        }
                    }
                    catch(Exception exception) {}
                }
            }
        }

        private static INetTxConnection CreateActiveMqConnection()
        {
            var connectionFactory = new Apache.NMS.ActiveMQ.NetTxConnectionFactory("activemq:tcp://localhost:61616")
            {
                AcknowledgementMode = AcknowledgementMode.Transactional
            };

            return connectionFactory.CreateNetTxConnection();
        }

        private static void OnMessage(IMessage message)
        {
            var x = new TestSinglePhaseCommit();

            var session2 = activeMqSession;
            {
                Transaction.Current.EnlistDurable(Guid.NewGuid(), x, EnlistmentOptions.None);

                using (var producer = session2.CreateProducer(SessionUtil.GetQueue(session2,
"queue://foo.baz")))
                {
                    producer.Send(new ActiveMQTextMessage("foo"));
                }

                if (new Random().Next(2) == 0) throw new Exception();
            }
        }
    }

    internal class TestSinglePhaseCommit : ISinglePhaseNotification
    {
        public void Prepare(PreparingEnlistment preparingEnlistment)
        {
            Console.WriteLine("Tx Prepare");
            preparingEnlistment.Prepared();
        }
        public void Commit(Enlistment enlistment)
        {
            Console.WriteLine("Tx Commit");
            enlistment.Done();
        }

        public void Rollback(Enlistment enlistment)
        {
            Console.WriteLine("Tx Rollback");
            enlistment.Done();
        }
        public void InDoubt(Enlistment enlistment)
        {
            Console.WriteLine("Tx InDoubt");
            enlistment.Done();
        }
        public void SinglePhaseCommit(SinglePhaseEnlistment singlePhaseEnlistment)
        {
            Console.WriteLine("Tx SinglePhaseCommit");
            singlePhaseEnlistment.Committed();
        }
    }



  was:
Under load active mq enlists a message to a previous transactions. This leads to very strange
behaviors:
- Database is updated and message is rolled back
- Message is completed but database rolledback

All this results in an invalid system state making. DTC is not usable this way.

Analysis of the source code have shown that the problem is in NetTxSession.DoStartTransaction
There it checks if a .NET Transaction in the TransactionContext. In this case it adds the
message to that transaction. But this can be the previous transaction because DTC 2-PhaseCommit
is asyncronous. It needs to check if the Current Transaction is the same as one before and
wait if they do not match.

The following applacation demonstrates the problem when enough messages are processed E.g.
enqueue 100 msg in foo.bar. It is basically TestRedeliveredCase3 but with half of the messages
failing. 

Whenever a SinglePhaseCommit occurs in the TestSinglePhaseCommit this means the database would
be commited in an own transaction. 

    class Program
    {
        private static INetTxSession activeMqSession;
        private static IMessageConsumer consumer;
        private static INetTxConnection connection;

        static void Main(string[] args)
        {
            using (connection = CreateActiveMqConnection())
            using (activeMqSession = connection.CreateNetTxSession())
            using (consumer = activeMqSession.CreateConsumer(SessionUtil.GetQueue(activeMqSession,
"queue://foo.bar")))
            {
                connection.Start();

                while (true)
                {
                    try
                    {
                        using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
                        {
                            IMessage msg = null;
                            while (msg == null)
                            {
                                msg = consumer.ReceiveNoWait();
                            }

                            OnMessage(msg);
                            scoped.Complete();
                        }
                    }
                    catch(Exception exception) {}
                }
            }
        }

        private static INetTxConnection CreateActiveMqConnection()
        {
            var connectionFactory = new Apache.NMS.ActiveMQ.NetTxConnectionFactory("activemq:tcp://localhost:61616")
            {
                AcknowledgementMode = AcknowledgementMode.Transactional
            };

            return connectionFactory.CreateNetTxConnection();
        }

        private static void OnMessage(IMessage message)
        {
            var x = new TestSinglePhaseCommit();

            var session2 = activeMqSession;
            {
                Transaction.Current.EnlistDurable(Guid.NewGuid(), x, EnlistmentOptions.None);

                // The proble occurs only if a message is sent using the same session like
the receiver
                using (var producer = session2.CreateProducer(SessionUtil.GetQueue(session2,
"queue://foo.baz")))
                {
                    producer.Send(new ActiveMQTextMessage("foo"));
                }

                if (new Random().Next(2) == 0) throw new Exception();
            }
        }
    }

    internal class TestSinglePhaseCommit : ISinglePhaseNotification
    {
        public void Prepare(PreparingEnlistment preparingEnlistment)
        {
            Console.WriteLine("Tx Prepare");
            preparingEnlistment.Prepared();
        }
        public void Commit(Enlistment enlistment)
        {
            Console.WriteLine("Tx Commit");
            enlistment.Done();
        }

        public void Rollback(Enlistment enlistment)
        {
            Console.WriteLine("Tx Rollback");
            enlistment.Done();
        }
        public void InDoubt(Enlistment enlistment)
        {
            Console.WriteLine("Tx InDoubt");
            enlistment.Done();
        }
        public void SinglePhaseCommit(SinglePhaseEnlistment singlePhaseEnlistment)
        {
            Console.WriteLine("Tx SinglePhaseCommit");
            singlePhaseEnlistment.Committed();
        }
    }



    
> Messages are enlisted to the wrong transaction
> ----------------------------------------------
>
>                 Key: AMQNET-412
>                 URL: https://issues.apache.org/jira/browse/AMQNET-412
>             Project: ActiveMQ .Net
>          Issue Type: Bug
>         Environment: Apache.NMS.ActiveMq 1.5.7
>            Reporter: Remo Gloor
>            Assignee: Jim Gomes
>            Priority: Critical
>
> Under load active mq enlists a message to a previous transactions. This leads to very
strange behaviors:
> - Database is updated and message is rolled back
> - Message is completed but database rolledback
> All this results in an invalid system state making. DTC is not usable this way.
> Analysis of the source code have shown that the problem is in NetTxSession.DoStartTransaction
There it checks if a .NET Transaction in the TransactionContext. In this case it adds the
message to that transaction. But this can be the previous transaction because DTC 2-PhaseCommit
is asyncronous. It needs to check if the Current Transaction is the same as one before and
wait if they do not match.
> The following applacation demonstrates the problem when enough messages are processed
E.g. enqueue 100 msg in foo.bar. It is basically TestRedeliveredCase3 but with half of the
messages failing. 
> Whenever a SinglePhaseCommit occurs in the TestSinglePhaseCommit this means the database
would be commited in an own transaction. 
>     class Program
>     {
>         private static INetTxSession activeMqSession;
>         private static IMessageConsumer consumer;
>         private static INetTxConnection connection;
>         static void Main(string[] args)
>         {
>             using (connection = CreateActiveMqConnection())
>             using (activeMqSession = connection.CreateNetTxSession())
>             using (consumer = activeMqSession.CreateConsumer(SessionUtil.GetQueue(activeMqSession,
"queue://foo.bar")))
>             {
>                 connection.Start();
>                 while (true)
>                 {
>                     try
>                     {
>                         using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
>                         {
>                             IMessage msg = null;
>                             while (msg == null)
>                             {
>                                 msg = consumer.ReceiveNoWait();
>                             }
>                             OnMessage(msg);
>                             scoped.Complete();
>                         }
>                     }
>                     catch(Exception exception) {}
>                 }
>             }
>         }
>         private static INetTxConnection CreateActiveMqConnection()
>         {
>             var connectionFactory = new Apache.NMS.ActiveMQ.NetTxConnectionFactory("activemq:tcp://localhost:61616")
>             {
>                 AcknowledgementMode = AcknowledgementMode.Transactional
>             };
>             return connectionFactory.CreateNetTxConnection();
>         }
>         private static void OnMessage(IMessage message)
>         {
>             var x = new TestSinglePhaseCommit();
>             var session2 = activeMqSession;
>             {
>                 Transaction.Current.EnlistDurable(Guid.NewGuid(), x, EnlistmentOptions.None);
>                 using (var producer = session2.CreateProducer(SessionUtil.GetQueue(session2,
"queue://foo.baz")))
>                 {
>                     producer.Send(new ActiveMQTextMessage("foo"));
>                 }
>                 if (new Random().Next(2) == 0) throw new Exception();
>             }
>         }
>     }
>     internal class TestSinglePhaseCommit : ISinglePhaseNotification
>     {
>         public void Prepare(PreparingEnlistment preparingEnlistment)
>         {
>             Console.WriteLine("Tx Prepare");
>             preparingEnlistment.Prepared();
>         }
>         public void Commit(Enlistment enlistment)
>         {
>             Console.WriteLine("Tx Commit");
>             enlistment.Done();
>         }
>         public void Rollback(Enlistment enlistment)
>         {
>             Console.WriteLine("Tx Rollback");
>             enlistment.Done();
>         }
>         public void InDoubt(Enlistment enlistment)
>         {
>             Console.WriteLine("Tx InDoubt");
>             enlistment.Done();
>         }
>         public void SinglePhaseCommit(SinglePhaseEnlistment singlePhaseEnlistment)
>         {
>             Console.WriteLine("Tx SinglePhaseCommit");
>             singlePhaseEnlistment.Committed();
>         }
>     }

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message