activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Remo Gloor (JIRA)" <jira+amq...@apache.org>
Subject [jira] [Commented] (AMQNET-412) Messages are enlisted to the wrong transaction
Date Thu, 14 Feb 2013 20:01:13 GMT

    [ https://issues.apache.org/jira/browse/AMQNET-412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13578620#comment-13578620
] 

Remo Gloor commented on AMQNET-412:
-----------------------------------

Unit testing is impossible because it in not deterministic. Is would fail sometimes and be
succesful in other cases.
                
> Messages are enlisted to the wrong transaction
> ----------------------------------------------
>
>                 Key: AMQNET-412
>                 URL: https://issues.apache.org/jira/browse/AMQNET-412
>             Project: ActiveMQ .Net
>          Issue Type: Bug
>         Environment: Apache.NMS.ActiveMq 1.5.7
>            Reporter: Remo Gloor
>            Assignee: Jim Gomes
>            Priority: Critical
>
> Under load active mq enlists a message to a previous transactions. This leads to very
strange behaviors:
> - Database is updated and message is rolled back
> - Message is completed but database rolledback
> All this results in an invalid system state making. DTC is not usable this way.
> Analysis of the source code have shown that the problem is in NetTxSession.DoStartTransaction
There it checks if a .NET Transaction in the TransactionContext. In this case it adds the
message to that transaction. But this can be the previous transaction because DTC 2-PhaseCommit
is asyncronous. It needs to check if the Current Transaction is the same as one before and
wait if they do not match.
> The following applacation demonstrates the problem when enough messages are processed
E.g. enqueue 100 msg in foo.bar. It is basically TestRedeliveredCase3 but with half of the
messages failing. 
> Whenever a SinglePhaseCommit occurs in the TestSinglePhaseCommit this means the database
would be commited in an own transaction. 
>     class Program
>     {
>         private static INetTxSession activeMqSession;
>         private static IMessageConsumer consumer;
>         private static INetTxConnection connection;
>         static void Main(string[] args)
>         {
>             using (connection = CreateActiveMqConnection())
>             using (activeMqSession = connection.CreateNetTxSession())
>             using (consumer = activeMqSession.CreateConsumer(SessionUtil.GetQueue(activeMqSession,
"queue://foo.bar")))
>             {
>                 connection.Start();
>                 while (true)
>                 {
>                     try
>                     {
>                         using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
>                         {
>                             IMessage msg = null;
>                             while (msg == null)
>                             {
>                                 msg = consumer.ReceiveNoWait();
>                             }
>                             OnMessage(msg);
>                             scoped.Complete();
>                         }
>                     }
>                     catch(Exception exception) {}
>                 }
>             }
>         }
>         private static INetTxConnection CreateActiveMqConnection()
>         {
>             var connectionFactory = new Apache.NMS.ActiveMQ.NetTxConnectionFactory("activemq:tcp://localhost:61616")
>             {
>                 AcknowledgementMode = AcknowledgementMode.Transactional
>             };
>             return connectionFactory.CreateNetTxConnection();
>         }
>         private static void OnMessage(IMessage message)
>         {
>             var x = new TestSinglePhaseCommit();
>             var session2 = activeMqSession;
>             {
>                 Transaction.Current.EnlistDurable(Guid.NewGuid(), x, EnlistmentOptions.None);
>                 using (var producer = session2.CreateProducer(SessionUtil.GetQueue(session2,
"queue://foo.baz")))
>                 {
>                     producer.Send(new ActiveMQTextMessage("foo"));
>                 }
>                 if (new Random().Next(2) == 0) throw new Exception();
>             }
>         }
>     }
>     internal class TestSinglePhaseCommit : ISinglePhaseNotification
>     {
>         public void Prepare(PreparingEnlistment preparingEnlistment)
>         {
>             Console.WriteLine("Tx Prepare");
>             preparingEnlistment.Prepared();
>         }
>         public void Commit(Enlistment enlistment)
>         {
>             Console.WriteLine("Tx Commit");
>             enlistment.Done();
>         }
>         public void Rollback(Enlistment enlistment)
>         {
>             Console.WriteLine("Tx Rollback");
>             enlistment.Done();
>         }
>         public void InDoubt(Enlistment enlistment)
>         {
>             Console.WriteLine("Tx InDoubt");
>             enlistment.Done();
>         }
>         public void SinglePhaseCommit(SinglePhaseEnlistment singlePhaseEnlistment)
>         {
>             Console.WriteLine("Tx SinglePhaseCommit");
>             singlePhaseEnlistment.Committed();
>         }
>     }

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message