activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1484579 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/NetTxSession.cs main/csharp/NetTxTransactionContext.cs test/csharp/DtcConsumerTransactionsTest.cs
Date Mon, 20 May 2013 20:18:32 GMT
Author: tabish
Date: Mon May 20 20:18:32 2013
New Revision: 1484579

URL: http://svn.apache.org/r1484579
Log:
fix for: https://issues.apache.org/jira/browse/AMQNET-412

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxTransactionContext.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs?rev=1484579&r1=1484578&r2=1484579&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs Mon
May 20 20:18:32 2013
@@ -25,6 +25,7 @@ namespace Apache.NMS.ActiveMQ
     public sealed class NetTxSession : Session, INetTxSession
     {
         private readonly NetTxTransactionContext transactionContext;
+        private string currentTransactionId;
 
         public NetTxSession(Connection connection, SessionId id)
             : base(connection, id, AcknowledgementMode.AutoAcknowledge)
@@ -128,8 +129,15 @@ namespace Apache.NMS.ActiveMQ
         {
             lock (transactionContext.SyncRoot)
             {
-                if (transactionContext.InNetTransaction && transactionContext.NetTxState
== NetTxTransactionContext.TxState.Pending)
+                while (transactionContext.InNetTransaction &&
+                       (transactionContext.NetTxState == NetTxTransactionContext.TxState.Pending
||
+                       (Transaction.Current != null && 
+                        this.currentTransactionId != Transaction.Current.TransactionInformation.LocalIdentifier)))
                 {
+                    if (Tracer.IsDebugEnabled)
+                    {
+                        Tracer.DebugFormat("NetTxSession awaiting completion of TX:{0}",
transactionContext.TransactionId);
+                    }
                     // To late to participate in this TX, we have to wait for it to complete
then
                     // we can create a new TX and start from there.
                     Monitor.Exit(transactionContext.SyncRoot);
@@ -162,6 +170,7 @@ namespace Apache.NMS.ActiveMQ
             // Start a new .NET style transaction, this could be distributed
             // or it could just be a Local transaction that could become
             // distributed later.
+            this.currentTransactionId = tx.TransactionInformation.LocalIdentifier; 
             transactionContext.Begin(tx);
         }
     }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxTransactionContext.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxTransactionContext.cs?rev=1484579&r1=1484578&r2=1484579&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxTransactionContext.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxTransactionContext.cs
Mon May 20 20:18:32 2013
@@ -102,7 +102,6 @@ namespace Apache.NMS.ActiveMQ
         {
             lock (syncObject)
             {
-                this.netTxState = TxState.Active;
                 dtcControlEvent.Reset();
 
                 Tracer.Debug("Begin notification received");
@@ -120,6 +119,10 @@ namespace Apache.NMS.ActiveMQ
                     this.currentEnlistment =
                         transaction.EnlistDurable(rmId, this, EnlistmentOptions.None);
 
+                    // In case of a exception in the current method the transaction will
be rolled back.
+                    // Until Begin Transaction is completed we consider to be in a rollback
scenario.
+                    this.netTxState = TxState.Pending;
+
                     Tracer.Debug("Enlisted in Durable Transaction with RM Id: " + rmId);
 
                     TransactionInformation txInfo = transaction.TransactionInformation;
@@ -146,6 +149,9 @@ namespace Apache.NMS.ActiveMQ
 
                     this.session.Connection.Oneway(info);
 
+                    // Begin Transaction is completed successfully. Change to transaction
active state now.
+                    this.netTxState = TxState.Active;
+
                     SignalTransactionStarted();
 
                     if (Tracer.IsDebugEnabled)
@@ -155,7 +161,12 @@ namespace Apache.NMS.ActiveMQ
                 }
                 catch (Exception)
                 {
-                    dtcControlEvent.Set();
+                    // When in pending state the rollback will signal that a new transaction
can be started. Otherwise do it here.
+                    if (netTxState != TxState.Pending)
+                    {
+                        netTxState = TxState.None;
+                        dtcControlEvent.Set();
+                    }
                     throw;
                 }
             }
@@ -201,9 +212,11 @@ namespace Apache.NMS.ActiveMQ
                         // change on the broker.
                         RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
 
-                        // if server responds that nothing needs to be done, then reply prepared
-                        // but clear the current state data so we appear done to the commit
method.
-                        preparingEnlistment.Prepared();
+                        // if server responds that nothing needs to be done, then reply done.
+                        // otherwise the DTC will call Commit or Rollback but another transaction
+                        // can already be in progress and this one would be commited or rolled
back 
+                        // immediately.
+                        preparingEnlistment.Done();
 
                         // Done so commit won't be called.
                         AfterCommit();

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs?rev=1484579&r1=1484578&r2=1484579&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
Mon May 20 20:18:32 2013
@@ -571,6 +571,86 @@ namespace Apache.NMS.ActiveMQ.Test
             VerifyBrokerQueueCount();
         }
 
+        [Test]
+        public void MessageShouldEnlistToTheCorrectTransaction()
+        {
+            const int messageCount = 100;
+            const int receiveCount = 100;
+
+            // enqueue several messages
+            PurgeDatabase();
+            PurgeAndFillQueue(messageCount);
+
+            var enlistment = new TestSinglePhaseCommit();
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.Start();
+
+                // receive half of total messages
+                using (INetTxSession session = connection.CreateNetTxSession())
+                {
+                    IQueue queue = session.GetQueue(testQueueName);
+                    using (IMessageConsumer consumer = session.CreateConsumer(queue))
+                    {
+                        for (int i = 0; i < receiveCount; i++)
+                        {
+                            try
+                            {
+                                using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
+                                {
+                                    ITextMessage message = consumer.Receive(TimeSpan.FromMilliseconds(10000))
as ITextMessage;
+
+                                    Transaction.Current.EnlistDurable(Guid.NewGuid(), enlistment,
EnlistmentOptions.None);
+                                    if (new Random().Next(2) == 0)
+                                    {
+                                        Tracer.InfoFormat("Throwing random Exception for
Message {0}", message.NMSMessageId);
+                                        throw new Exception();
+                                    }
+
+                                    scoped.Complete();
+                                }
+                            }
+                            catch
+                            {
+                            }
+
+                            Assert.False(enlistment.singlePhaseCommit, "No single phase commit
should happen.");
+                        }
+                    }
+                }
+            }
+        }
+
+        internal class TestSinglePhaseCommit : ISinglePhaseNotification
+        {
+            public bool singlePhaseCommit = false;
+
+            public void Prepare(PreparingEnlistment preparingEnlistment)
+            {
+                preparingEnlistment.Prepared();
+            }
+            public void Commit(Enlistment enlistment)
+            {
+                enlistment.Done();
+            }
+
+            public void Rollback(Enlistment enlistment)
+            {
+                enlistment.Done();
+            }
+            public void InDoubt(Enlistment enlistment)
+            {
+                enlistment.Done();
+            }
+            public void SinglePhaseCommit(SinglePhaseEnlistment singlePhaseEnlistment)
+            {
+                Tracer.Info("Performing invalid single phase commit.");
+                singlePhaseCommit = true;
+                singlePhaseEnlistment.Committed();
+            }
+        }
+
         #region Asynchronous Consumer Inside of a Transaction Test / Example
 
         private const int BATCH_COUNT = 5;



Mime
View raw message