Author: tabish
Date: Fri Mar 18 20:36:13 2011
New Revision: 1083055
URL: http://svn.apache.org/viewvc?rev=1083055&view=rev
Log:
Fix for https://issues.apache.org/jira/browse/AMQNET-321
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs?rev=1083055&r1=1083054&r2=1083055&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
Fri Mar 18 20:36:13 2011
@@ -16,6 +16,7 @@
*/
using System;
+using System.Transactions;
using Apache.NMS.ActiveMQ.Transport;
using Apache.NMS.ActiveMQ.Util;
@@ -42,6 +43,13 @@ namespace Apache.NMS.ActiveMQ
return (INetTxSession) CreateSession(AcknowledgementMode.Transactional);
}
+ public INetTxSession CreateNetTxSession(Transaction tx)
+ {
+ NetTxSession session = (NetTxSession)CreateSession(AcknowledgementMode.Transactional);
+ session.Enlist(tx);
+ return session;
+ }
+
protected override Session CreateAtiveMQSession(AcknowledgementMode ackMode)
{
CheckConnected();
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs?rev=1083055&r1=1083054&r2=1083055&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs Fri
Mar 18 20:36:13 2011
@@ -17,7 +17,6 @@
using System;
using System.Transactions;
-using Apache.NMS;
using Apache.NMS.ActiveMQ.Commands;
namespace Apache.NMS.ActiveMQ
@@ -31,6 +30,23 @@ namespace Apache.NMS.ActiveMQ
}
/// <summary>
+ /// Manually Enlists in the given Transaction. This can be used to when the
+ /// client is using the Session in Asynchronous listener mode since the Session
+ /// cannot atuomatically join in this case as there is no Ambient transaction in
+ /// the Message Dispatch thread. This also allows for clients to use the explicit
+ /// exception model when necessary.
+ /// </summary>
+ public void Enlist(Transaction tx)
+ {
+ if(tx == null)
+ {
+ throw new NullReferenceException("Specified Transaction cannot be null");
+ }
+
+ this.EnrollInSpecifiedTransaction(tx);
+ }
+
+ /// <summary>
/// Reports Transacted whenever there is an Ambient Transaction or the internal
/// TransactionContext is still involed in a .NET Transaction beyond the lifetime
/// of an ambient transaction (can happen during a scoped transaction disposing
@@ -61,20 +77,38 @@ namespace Apache.NMS.ActiveMQ
internal override void DoStartTransaction()
{
- if(!TransactionContext.InNetTransaction)
+ if(!TransactionContext.InNetTransaction && Transaction.Current != null)
{
- if(Transaction.Current != null)
- {
- Tracer.Debug("NetTxSession detected Ambient Transaction, start new TX
with broker");
-
- // Start a new .NET style transaction, this could be distributed
- // or it could just be a Local transaction that could become
- // distributed later.
- TransactionContext.Begin(Transaction.Current);
- }
+ Tracer.Debug("NetTxSession detected Ambient Transaction, start new TX with
broker");
+
+ EnrollInSpecifiedTransaction(Transaction.Current);
}
}
+ private void EnrollInSpecifiedTransaction(Transaction tx)
+ {
+ // If an Async DTC operation is in progress such as Commit or Rollback
+ // we need to let it complete before deciding if the Session is in a TX
+ // otherwise we might error out for no reason.
+ TransactionContext.DtcWaitHandle.WaitOne(TimeSpan.FromMilliseconds(1000), true);
+
+ if(TransactionContext.InNetTransaction)
+ {
+ Tracer.Warn("Enlist attempted while a Net TX was Active.");
+ throw new InvalidOperationException("Session is Already enlisted in a Transaction");
+ }
+
+ if(Transaction.Current != null && !Transaction.Current.Equals(tx))
+ {
+ Tracer.Warn("Enlist attempted with a TX that doesn't match the Ambient TX.");
+ throw new ArgumentException("Specified TX must match the ambient TX if set.");
+ }
+
+ // Start a new .NET style transaction, this could be distributed
+ // or it could just be a Local transaction that could become
+ // distributed later.
+ TransactionContext.Begin(tx);
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs?rev=1083055&r1=1083054&r2=1083055&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
Fri Mar 18 20:36:13 2011
@@ -316,22 +316,33 @@ namespace Apache.NMS.ActiveMQ.Test
INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
using (INetTxConnection connection = factory.CreateNetTxConnection())
- using (INetTxSession session = connection.CreateNetTxSession())
+ using (NetTxSession session = connection.CreateNetTxSession() as NetTxSession)
{
IQueue queue = session.GetQueue(testQueueName);
IMessageConsumer consumer = session.CreateConsumer(queue);
consumer.Listener += AsyncTxAwareOnMessage;
+ // Be carefull, message are dispatched once this is done, so you could receive
+ // a Message outside a TX. We use the awaitBatchProcessingStart event here
to
+ // gate te OnMessage callback, once that method returns the Message is ack'd
and
+ // no longer has a chance to participate in a TX.
connection.Start();
for (int i = 0; i < BATCH_COUNT; ++i)
{
using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
{
+ session.Enlist(Transaction.Current);
+
batchTxControl = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
awaitBatchProcessingStart.Set();
scoped.Complete();
- }
+ }
+
+ // Reenlisting to fast seems to annoy the DTC. Also since DTC operations
are
+ // async we need to allow a little time for lag so that the last TX actually
+ // completes before we start a new one.
+ Thread.Sleep(250);
}
}
|