activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1083055 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/NetTxConnection.cs main/csharp/NetTxSession.cs test/csharp/DtcConsumerTransactionsTest.cs
Date Fri, 18 Mar 2011 20:36:13 GMT
Author: tabish
Date: Fri Mar 18 20:36:13 2011
New Revision: 1083055

URL: http://svn.apache.org/viewvc?rev=1083055&view=rev
Log:
Fix for https://issues.apache.org/jira/browse/AMQNET-321

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs?rev=1083055&r1=1083054&r2=1083055&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
Fri Mar 18 20:36:13 2011
@@ -16,6 +16,7 @@
  */
 
 using System;
+using System.Transactions;
 using Apache.NMS.ActiveMQ.Transport;
 using Apache.NMS.ActiveMQ.Util;
 
@@ -42,6 +43,13 @@ namespace Apache.NMS.ActiveMQ
             return (INetTxSession) CreateSession(AcknowledgementMode.Transactional);
         }
 
+        public INetTxSession CreateNetTxSession(Transaction tx)
+        {
+            NetTxSession session = (NetTxSession)CreateSession(AcknowledgementMode.Transactional);
+            session.Enlist(tx);
+            return session;
+        }
+
         protected override Session CreateAtiveMQSession(AcknowledgementMode ackMode)
         {
             CheckConnected();

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs?rev=1083055&r1=1083054&r2=1083055&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs Fri
Mar 18 20:36:13 2011
@@ -17,7 +17,6 @@
 
 using System;
 using System.Transactions;
-using Apache.NMS;
 using Apache.NMS.ActiveMQ.Commands;
 
 namespace Apache.NMS.ActiveMQ
@@ -31,6 +30,23 @@ namespace Apache.NMS.ActiveMQ
         }
 
         /// <summary>
+        /// Manually Enlists in the given Transaction.  This can be used to when the
+        /// client is using the Session in Asynchronous listener mode since the Session
+        /// cannot atuomatically join in this case as there is no Ambient transaction in
+        /// the Message Dispatch thread.  This also allows for clients to use the explicit
+        /// exception model when necessary.  
+        /// </summary>
+        public void Enlist(Transaction tx)
+        {
+            if(tx == null)
+            {
+                throw new NullReferenceException("Specified Transaction cannot be null");
+            }
+
+            this.EnrollInSpecifiedTransaction(tx);
+        }
+
+        /// <summary>
         /// Reports Transacted whenever there is an Ambient Transaction or the internal
         /// TransactionContext is still involed in a .NET Transaction beyond the lifetime
         /// of an ambient transaction (can happen during a scoped transaction disposing
@@ -61,20 +77,38 @@ namespace Apache.NMS.ActiveMQ
 
         internal override void DoStartTransaction()
         {
-            if(!TransactionContext.InNetTransaction)
+            if(!TransactionContext.InNetTransaction && Transaction.Current != null)
             {
-                if(Transaction.Current != null)
-                {
-                    Tracer.Debug("NetTxSession detected Ambient Transaction, start new TX
with broker");
-
-                    // Start a new .NET style transaction, this could be distributed
-                    // or it could just be a Local transaction that could become
-                    // distributed later.
-                    TransactionContext.Begin(Transaction.Current);
-                }
+                Tracer.Debug("NetTxSession detected Ambient Transaction, start new TX with
broker");
+
+                EnrollInSpecifiedTransaction(Transaction.Current);
             }
         }
 
+        private void EnrollInSpecifiedTransaction(Transaction tx)
+        {
+            // If an Async DTC operation is in progress such as Commit or Rollback
+            // we need to let it complete before deciding if the Session is in a TX
+            // otherwise we might error out for no reason.
+            TransactionContext.DtcWaitHandle.WaitOne(TimeSpan.FromMilliseconds(1000), true);
+
+            if(TransactionContext.InNetTransaction)
+            {
+                Tracer.Warn("Enlist attempted while a Net TX was Active.");
+                throw new InvalidOperationException("Session is Already enlisted in a Transaction");
+            }
+
+            if(Transaction.Current != null && !Transaction.Current.Equals(tx))
+            {
+                Tracer.Warn("Enlist attempted with a TX that doesn't match the Ambient TX.");
+                throw new ArgumentException("Specified TX must match the ambient TX if set.");
+            }
+            
+            // Start a new .NET style transaction, this could be distributed
+            // or it could just be a Local transaction that could become
+            // distributed later.
+            TransactionContext.Begin(tx);
+        }
     }
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs?rev=1083055&r1=1083054&r2=1083055&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
Fri Mar 18 20:36:13 2011
@@ -316,22 +316,33 @@ namespace Apache.NMS.ActiveMQ.Test
             INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
 
             using (INetTxConnection connection = factory.CreateNetTxConnection())
-            using (INetTxSession session = connection.CreateNetTxSession())
+            using (NetTxSession session = connection.CreateNetTxSession() as NetTxSession)
             {
                 IQueue queue = session.GetQueue(testQueueName);
                 IMessageConsumer consumer = session.CreateConsumer(queue);
                 consumer.Listener += AsyncTxAwareOnMessage;
 
+                // Be carefull, message are dispatched once this is done, so you could receive
+                // a Message outside a TX.  We use the awaitBatchProcessingStart event here
to
+                // gate te OnMessage callback, once that method returns the Message is ack'd
and
+                // no longer has a chance to participate in a TX.
                 connection.Start();
 
                 for (int i = 0; i < BATCH_COUNT; ++i)
                 {
                     using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
                     {
+                        session.Enlist(Transaction.Current);
+
                         batchTxControl = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
                         awaitBatchProcessingStart.Set();
                         scoped.Complete();
-                    }                    
+                    }
+
+                    // Reenlisting to fast seems to annoy the DTC.  Also since DTC operations
are
+                    // async we need to allow a little time for lag so that the last TX actually

+                    // completes before we start a new one.
+                    Thread.Sleep(250);
                 }
             }
 



Mime
View raw message