activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1484526 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk: ./ src/main/csharp/
Date Mon, 20 May 2013 16:57:23 GMT
Author: tabish
Date: Mon May 20 16:57:22 2013
New Revision: 1484526

URL: http://svn.apache.org/r1484526
Log:
continue to partition the DTC code into its own domain.   

Added:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxTransactionContext.cs
Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnectionFactory.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxMessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1484526&r1=1484525&r2=1484526&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Mon May 20 16:57:22 2013
@@ -1025,38 +1025,18 @@ namespace Apache.NMS.ActiveMQ
 			}
 		}
 
-		public void BeforeMessageIsConsumed(MessageDispatch dispatch)
+		public virtual void BeforeMessageIsConsumed(MessageDispatch dispatch)
 		{
 			this.lastDeliveredSequenceId = dispatch.Message.MessageId.BrokerSequenceId;
 
 			if (!IsAutoAcknowledgeBatch)
 			{
-                if (this.session.IsTransacted)
-                {
-                    bool waitForDtcWaitHandle = false;
-                    lock (this.session.TransactionContext.SyncRoot)
-                    {
-                        // In the case where the consumer is operating in concert with a
-                        // distributed TX manager we need to wait whenever the TX is being
-                        // controlled by the DTC as it completes all operations async and
-                        // we cannot start consumption again until all its tasks have completed.)
-                        waitForDtcWaitHandle = this.session.TransactionContext.InNetTransaction &&
-                                               this.session.TransactionContext.NetTxState ==
-                                               TransactionContext.TxState.Pending;
-                    }
-
-                    if (waitForDtcWaitHandle)
-                    {
-                        this.session.TransactionContext.DtcWaitHandle.WaitOne();
-                    }
-                }                
-
 			    lock(this.dispatchedMessages)
 				{
 					this.dispatchedMessages.AddFirst(dispatch);
 				}
 
-				if(this.session.IsTransacted)
+				if (this.session.IsTransacted)
 				{
                 	if (this.transactedIndividualAck) 
 					{
@@ -1092,7 +1072,7 @@ namespace Apache.NMS.ActiveMQ
 			return false;
 		}
 
-		public void AfterMessageIsConsumed(MessageDispatch dispatch, bool expired)
+		public virtual void AfterMessageIsConsumed(MessageDispatch dispatch, bool expired)
 		{
 			if(this.unconsumedMessages.Closed)
 			{
@@ -1603,7 +1583,7 @@ namespace Apache.NMS.ActiveMQ
 			}
 		}
 
-		private bool IsAutoAcknowledgeBatch
+	    protected bool IsAutoAcknowledgeBatch
 		{
 			get { return this.session.IsDupsOkAcknowledge && !this.info.Destination.IsQueue; }
 		}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs?rev=1484526&r1=1484525&r2=1484526&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs Mon May 20 16:57:22 2013
@@ -29,7 +29,7 @@ namespace Apache.NMS.ActiveMQ
     /// The default Session creation methods of Connection are overriden here
     /// to always return a TX capable session instance.
     /// </summary>
-    public class NetTxConnection : Connection, INetTxConnection
+    public sealed class NetTxConnection : Connection, INetTxConnection
     {
         private NetTxRecoveryPolicy recoveryPolicy = new NetTxRecoveryPolicy();
         private Guid configuredResourceManagerId = Guid.Empty;

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnectionFactory.cs?rev=1484526&r1=1484525&r2=1484526&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnectionFactory.cs Mon May 20 16:57:22 2013
@@ -23,7 +23,7 @@ using Apache.NMS.ActiveMQ.Transport;
 
 namespace Apache.NMS.ActiveMQ
 {
-    public class NetTxConnectionFactory : ConnectionFactory, INetTxConnectionFactory
+    public sealed class NetTxConnectionFactory : ConnectionFactory, INetTxConnectionFactory
     {
         private NetTxRecoveryPolicy recoveryPolicy = new NetTxRecoveryPolicy();
         private Guid configuredResourceManagerId = Guid.Empty;

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxMessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxMessageConsumer.cs?rev=1484526&r1=1484525&r2=1484526&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxMessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxMessageConsumer.cs Mon May 20 16:57:22 2013
@@ -22,9 +22,10 @@ using Apache.NMS.ActiveMQ.Commands;
 
 namespace Apache.NMS.ActiveMQ
 {
-    class NetTxMessageConsumer : MessageConsumer
+    public sealed class NetTxMessageConsumer : MessageConsumer
     {
         private readonly NetTxSession session;
+        private readonly NetTxTransactionContext transactionContext;
 
         internal NetTxMessageConsumer(Session session, ConsumerId id, ActiveMQDestination destination, 
                                       string name, string selector, int prefetch, int maxPendingMessageCount,
@@ -33,6 +34,7 @@ namespace Apache.NMS.ActiveMQ
                  maxPendingMessageCount, noLocal, browser, dispatchAsync)
         {
             this.session = session as NetTxSession;
+            this.transactionContext = session.TransactionContext as NetTxTransactionContext;
         }
 
         public override void Close()
@@ -42,7 +44,7 @@ namespace Apache.NMS.ActiveMQ
                 return;
             }
 
-            lock (this.session.TransactionContext.SyncRoot)
+            lock (this.transactionContext.SyncRoot)
             {
                 if (this.session.IsTransacted || this.session.TransactionContext.InTransaction)
                 {
@@ -59,5 +61,34 @@ namespace Apache.NMS.ActiveMQ
                 }
             }
         }
+
+        public override void BeforeMessageIsConsumed(MessageDispatch dispatch)
+        {
+            if (!IsAutoAcknowledgeBatch)
+            {
+                if (this.session.IsTransacted)
+                {
+                    bool waitForDtcWaitHandle = false;
+                    lock (this.transactionContext.SyncRoot)
+                    {
+                        // In the case where the consumer is operating in concert with a
+                        // distributed TX manager we need to wait whenever the TX is being
+                        // controlled by the DTC as it completes all operations async and
+                        // we cannot start consumption again until all its tasks have completed.)
+                        waitForDtcWaitHandle = this.transactionContext.InNetTransaction &&
+                                               this.transactionContext.NetTxState ==
+                                               NetTxTransactionContext.TxState.Pending;
+                    }
+
+                    if (waitForDtcWaitHandle)
+                    {
+                        this.transactionContext.DtcWaitHandle.WaitOne();
+                    }
+                }
+            }
+
+            base.BeforeMessageIsConsumed(dispatch);
+        }
+
     }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs?rev=1484526&r1=1484525&r2=1484526&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs Mon May 20 16:57:22 2013
@@ -22,12 +22,15 @@ using Apache.NMS.ActiveMQ.Commands;
 
 namespace Apache.NMS.ActiveMQ
 {
-    public class NetTxSession : Session, INetTxSession
+    public sealed class NetTxSession : Session, INetTxSession
     {
+        private readonly NetTxTransactionContext transactionContext;
+
         public NetTxSession(Connection connection, SessionId id)
             : base(connection, id, AcknowledgementMode.AutoAcknowledge)
         {
-            TransactionContext.InitializeDtcTxContext();
+            this.transactionContext = TransactionContext as NetTxTransactionContext;
+            this.transactionContext.InitializeDtcTxContext();
         }
 
         /// <summary>
@@ -55,7 +58,7 @@ namespace Apache.NMS.ActiveMQ
         /// </summary>
         public override bool IsTransacted
         {
-            get { return Transaction.Current != null || TransactionContext.InNetTransaction; }
+            get { return Transaction.Current != null || transactionContext.InNetTransaction; }
         }
 
         public override bool IsAutoAcknowledge
@@ -73,17 +76,17 @@ namespace Apache.NMS.ActiveMQ
 
             try
             {
-                if (TransactionContext.InNetTransaction)
+                if (transactionContext.InNetTransaction)
                 {
-                    lock (TransactionContext.SyncRoot)
+                    lock (transactionContext.SyncRoot)
                     {
-                        if (TransactionContext.InNetTransaction)
+                        if (transactionContext.InNetTransaction)
                         {
                             // Must wait for all the DTC operations to complete before
                             // moving on from this close call.
-                            Monitor.Exit(TransactionContext.SyncRoot);
-                            this.TransactionContext.DtcWaitHandle.WaitOne();
-                            Monitor.Enter(TransactionContext.SyncRoot);
+                            Monitor.Exit(transactionContext.SyncRoot);
+                            this.transactionContext.DtcWaitHandle.WaitOne();
+                            Monitor.Enter(transactionContext.SyncRoot);
                         }
                     }
                 }
@@ -104,6 +107,11 @@ namespace Apache.NMS.ActiveMQ
                                             maxPending, noLocal, false, this.DispatchAsync);
         }
 
+        protected override TransactionContext CreateTransactionContext()
+        {
+            return new NetTxTransactionContext(this);
+        }
+
         internal override void DoRollback()
         {
             // Only the Transaction Manager can do this when in a .NET Transaction.
@@ -118,18 +126,18 @@ namespace Apache.NMS.ActiveMQ
 
         internal override void DoStartTransaction()
         {
-            lock (TransactionContext.SyncRoot)
+            lock (transactionContext.SyncRoot)
             {
-                if (TransactionContext.InNetTransaction && TransactionContext.NetTxState == TransactionContext.TxState.Pending)
+                if (transactionContext.InNetTransaction && transactionContext.NetTxState == NetTxTransactionContext.TxState.Pending)
                 {
                     // To late to participate in this TX, we have to wait for it to complete then
                     // we can create a new TX and start from there.
-                    Monitor.Exit(TransactionContext.SyncRoot);
-                    TransactionContext.DtcWaitHandle.WaitOne();
-                    Monitor.Enter(TransactionContext.SyncRoot);
+                    Monitor.Exit(transactionContext.SyncRoot);
+                    transactionContext.DtcWaitHandle.WaitOne();
+                    Monitor.Enter(transactionContext.SyncRoot);
                 }
  
-                if (!TransactionContext.InNetTransaction && Transaction.Current != null)
+                if (!transactionContext.InNetTransaction && Transaction.Current != null)
                 {
                     Tracer.Debug("NetTxSession detected Ambient Transaction, start new TX with broker");
                     EnrollInSpecifiedTransaction(Transaction.Current);
@@ -139,7 +147,7 @@ namespace Apache.NMS.ActiveMQ
 
         private void EnrollInSpecifiedTransaction(Transaction tx)
         {
-            if(TransactionContext.InNetTransaction)
+            if(transactionContext.InNetTransaction)
             {
                 Tracer.Warn("Enlist attempted while a Net TX was Active.");
                 throw new InvalidOperationException("Session is Already enlisted in a Transaction");
@@ -154,7 +162,7 @@ namespace Apache.NMS.ActiveMQ
             // Start a new .NET style transaction, this could be distributed
             // or it could just be a Local transaction that could become
             // distributed later.
-            TransactionContext.Begin(tx);
+            transactionContext.Begin(tx);
         }
     }
 }

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxTransactionContext.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxTransactionContext.cs?rev=1484526&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxTransactionContext.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxTransactionContext.cs Mon May 20 16:57:22 2013
@@ -0,0 +1,598 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Transactions;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.Transactions;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ
+{
+    public sealed class NetTxTransactionContext : TransactionContext, ISinglePhaseNotification
+    {
+        private const int XA_OK = 0;
+        private const int XA_READONLY = 3;
+
+        private Enlistment currentEnlistment;
+
+        public NetTxTransactionContext(Session session) : base(session)
+        {
+        }
+
+        public override bool InLocalTransaction
+        {
+            get { return this.transactionId != null && this.currentEnlistment == null; }
+        }
+
+        public override void Begin()
+        {
+            throw new IllegalStateException("Local Transactions not supported in NetTx resources");
+        }
+
+        public override void Commit()
+        {
+            throw new IllegalStateException("Local Transactions not supported in NetTx resources");
+        }
+
+        public override void Rollback()
+        {
+            throw new IllegalStateException("Local Transactions not supported in NetTx resources");
+        }
+
+        #region Transaction Members used when dealing with .NET System Transactions.
+
+        // When DTC calls prepare we must then wait for either the TX to commit, rollback or
+        // be canceled because its in doubt.
+        private readonly ManualResetEvent dtcControlEvent = new ManualResetEvent(true);
+
+        // Once the DTC calls prepare we lock this object and don't unlock it again until
+        // the TX has either completed or terminated, the users of this class should use
+        // this sync point when the TX is a DTC version as opposed to a local one.
+        private readonly object syncObject = new Mutex();
+
+        public enum TxState
+        {
+            None = 0, Active = 1, Pending = 2
+        }
+
+        private TxState netTxState = TxState.None;
+
+        public object SyncRoot
+        {
+            get { return this.syncObject; }
+        }
+
+        public bool InNetTransaction
+        {
+            get { return this.transactionId != null && this.transactionId is XATransactionId; }
+        }
+
+        public TxState NetTxState
+        {
+            get
+            {
+                return this.netTxState;
+            }
+        }
+
+        public WaitHandle DtcWaitHandle
+        {
+            get { return dtcControlEvent; }
+        }
+
+        public void Begin(Transaction transaction)
+        {
+            lock (syncObject)
+            {
+                this.netTxState = TxState.Active;
+                dtcControlEvent.Reset();
+
+                Tracer.Debug("Begin notification received");
+
+                if (InNetTransaction)
+                {
+                    throw new TransactionInProgressException("A Transaction is already in Progress");
+                }
+
+                try
+                {
+                    Guid rmId = ResourceManagerGuid;
+
+                    // Enlist this object in the transaction.
+                    this.currentEnlistment =
+                        transaction.EnlistDurable(rmId, this, EnlistmentOptions.None);
+
+                    Tracer.Debug("Enlisted in Durable Transaction with RM Id: " + rmId);
+
+                    TransactionInformation txInfo = transaction.TransactionInformation;
+
+                    XATransactionId xaId = new XATransactionId();
+                    this.transactionId = xaId;
+
+                    if (txInfo.DistributedIdentifier != Guid.Empty)
+                    {
+                        xaId.GlobalTransactionId = txInfo.DistributedIdentifier.ToByteArray();
+                        xaId.BranchQualifier = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
+                    }
+                    else
+                    {
+                        xaId.GlobalTransactionId = Encoding.UTF8.GetBytes(txInfo.LocalIdentifier);
+                        xaId.BranchQualifier = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
+                    }
+
+                    // Now notify the broker that a new XA'ish transaction has started.
+                    TransactionInfo info = new TransactionInfo();
+                    info.ConnectionId = this.connection.ConnectionId;
+                    info.TransactionId = this.transactionId;
+                    info.Type = (int)TransactionType.Begin;
+
+                    this.session.Connection.Oneway(info);
+
+                    SignalTransactionStarted();
+
+                    if (Tracer.IsDebugEnabled)
+                    {
+                        Tracer.Debug("Began XA'ish Transaction:" + xaId.GlobalTransactionId);
+                    }
+                }
+                catch (Exception)
+                {
+                    dtcControlEvent.Set();
+                    throw;
+                }
+            }
+        }
+
+        public void Prepare(PreparingEnlistment preparingEnlistment)
+        {
+            lock (this.syncObject)
+            {
+                this.netTxState = TxState.Pending;
+
+                try
+                {
+                    Tracer.Debug("Prepare notification received for TX id: " + this.transactionId);
+
+                    BeforeEnd();
+
+                    // Before sending the request to the broker, log the recovery bits, if
+                    // this fails we can't prepare and the TX should be rolled back.
+                    RecoveryLogger.LogRecoveryInfo(this.transactionId as XATransactionId,
+                                                   preparingEnlistment.RecoveryInformation());
+
+                    // Inform the broker that work on the XA'sh TX Branch is complete.
+                    TransactionInfo info = new TransactionInfo();
+                    info.ConnectionId = this.connection.ConnectionId;
+                    info.TransactionId = this.transactionId;
+                    info.Type = (int)TransactionType.End;
+
+                    this.connection.CheckConnected();
+                    this.connection.SyncRequest(info);
+
+                    // Prepare the Transaction for commit.
+                    info.Type = (int)TransactionType.Prepare;
+                    IntegerResponse response = (IntegerResponse)this.connection.SyncRequest(info);
+                    if (response.Result == XA_READONLY)
+                    {
+                        Tracer.Debug("Transaction Prepare done and doesn't need a commit, TX id: " + this.transactionId);
+
+                        this.transactionId = null;
+                        this.currentEnlistment = null;
+
+                        // Read Only means there's nothing to recover because there was no
+                        // change on the broker.
+                        RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
+
+                        // if server responds that nothing needs to be done, then reply prepared
+                        // but clear the current state data so we appear done to the commit method.
+                        preparingEnlistment.Prepared();
+
+                        // Done so commit won't be called.
+                        AfterCommit();
+
+                        // A Read-Only TX is considered closed at this point, DTC won't call us again.
+                        this.dtcControlEvent.Set();
+                    }
+                    else
+                    {
+                        Tracer.Debug("Transaction Prepare succeeded TX id: " + this.transactionId);
+
+                        // If work finished correctly, reply prepared
+                        preparingEnlistment.Prepared();
+                    }
+                }
+                catch (Exception ex)
+                {
+                    Tracer.DebugFormat("Transaction[{0}] Prepare failed with error: {1}",
+                                       this.transactionId, ex.Message);
+
+                    AfterRollback();
+                    preparingEnlistment.ForceRollback();
+                    try
+                    {
+                        this.connection.OnException(ex);
+                    }
+                    catch (Exception error)
+                    {
+                        Tracer.Error(error.ToString());
+                    }
+
+                    this.currentEnlistment = null;
+                    this.transactionId = null;
+                    this.netTxState = TxState.None;
+                    this.dtcControlEvent.Set();
+                }
+            }
+        }
+
+        public void Commit(Enlistment enlistment)
+        {
+            lock (this.syncObject)
+            {
+                try
+                {
+                    Tracer.Debug("Commit notification received for TX id: " + this.transactionId);
+
+                    if (this.transactionId != null)
+                    {
+                        // Now notify the broker that a new XA'ish transaction has completed.
+                        TransactionInfo info = new TransactionInfo();
+                        info.ConnectionId = this.connection.ConnectionId;
+                        info.TransactionId = this.transactionId;
+                        info.Type = (int)TransactionType.CommitTwoPhase;
+
+                        this.connection.CheckConnected();
+                        this.connection.SyncRequest(info);
+
+                        Tracer.Debug("Transaction Commit Done TX id: " + this.transactionId);
+
+                        RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
+
+                        // if server responds that nothing needs to be done, then reply done.
+                        enlistment.Done();
+
+                        AfterCommit();
+                    }
+                }
+                catch (Exception ex)
+                {
+                    Tracer.DebugFormat("Transaction[{0}] Commit failed with error: {1}",
+                                       this.transactionId, ex.Message);
+                    try
+                    {
+                        this.connection.OnException(ex);
+                    }
+                    catch (Exception error)
+                    {
+                        Tracer.Error(error.ToString());
+                    }
+                }
+                finally
+                {
+                    this.currentEnlistment = null;
+                    this.transactionId = null;
+                    this.netTxState = TxState.None;
+
+                    CountDownLatch latch = this.recoveryComplete;
+                    if (latch != null)
+                    {
+                        latch.countDown();
+                    }
+
+                    this.dtcControlEvent.Set();
+                }
+            }
+        }
+
+        public void SinglePhaseCommit(SinglePhaseEnlistment enlistment)
+        {
+            lock (this.syncObject)
+            {
+                try
+                {
+                    Tracer.Debug("Single Phase Commit notification received for TX id: " + this.transactionId);
+
+                    if (this.transactionId != null)
+                    {
+                        BeforeEnd();
+
+                        // Now notify the broker that a new XA'ish transaction has completed.
+                        TransactionInfo info = new TransactionInfo();
+                        info.ConnectionId = this.connection.ConnectionId;
+                        info.TransactionId = this.transactionId;
+                        info.Type = (int)TransactionType.CommitOnePhase;
+
+                        this.connection.CheckConnected();
+                        this.connection.SyncRequest(info);
+
+                        Tracer.Debug("Transaction Single Phase Commit Done TX id: " + this.transactionId);
+
+                        // if server responds that nothing needs to be done, then reply done.
+                        enlistment.Done();
+
+                        AfterCommit();
+                    }
+                }
+                catch (Exception ex)
+                {
+                    Tracer.DebugFormat("Transaction[{0}] Single Phase Commit failed with error: {1}",
+                                       this.transactionId, ex.Message);
+                    AfterRollback();
+                    enlistment.Done();
+                    try
+                    {
+                        this.connection.OnException(ex);
+                    }
+                    catch (Exception error)
+                    {
+                        Tracer.Error(error.ToString());
+                    }
+                }
+                finally
+                {
+                    this.currentEnlistment = null;
+                    this.transactionId = null;
+                    this.netTxState = TxState.None;
+
+                    this.dtcControlEvent.Set();
+                }
+            }
+        }
+
+        public void Rollback(Enlistment enlistment)
+        {
+            lock (this.syncObject)
+            {
+                try
+                {
+                    Tracer.Debug("Rollback notification received for TX id: " + this.transactionId);
+
+                    if (this.transactionId != null)
+                    {
+                        BeforeEnd();
+
+                        // Now notify the broker that a new XA'ish transaction has started.
+                        TransactionInfo info = new TransactionInfo();
+                        info.ConnectionId = this.connection.ConnectionId;
+                        info.TransactionId = this.transactionId;
+                        info.Type = (int)TransactionType.End;
+
+                        this.connection.CheckConnected();
+                        this.connection.SyncRequest(info);
+
+                        info.Type = (int)TransactionType.Rollback;
+                        this.connection.CheckConnected();
+                        this.connection.SyncRequest(info);
+
+                        Tracer.Debug("Transaction Rollback Done TX id: " + this.transactionId);
+
+                        RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
+
+                        // if server responds that nothing needs to be done, then reply done.
+                        enlistment.Done();
+
+                        AfterRollback();
+                    }
+                }
+                catch (Exception ex)
+                {
+                    Tracer.DebugFormat("Transaction[{0}] Rollback failed with error: {1}",
+                                       this.transactionId, ex.Message);
+                    AfterRollback();
+                    try
+                    {
+                        this.connection.OnException(ex);
+                    }
+                    catch (Exception error)
+                    {
+                        Tracer.Error(error.ToString());
+                    }
+                }
+                finally
+                {
+                    this.currentEnlistment = null;
+                    this.transactionId = null;
+                    this.netTxState = TxState.None;
+
+                    CountDownLatch latch = this.recoveryComplete;
+                    if (latch != null)
+                    {
+                        latch.countDown();
+                    }
+
+                    this.dtcControlEvent.Set();
+                }
+            }
+        }
+
+        public void InDoubt(Enlistment enlistment)
+        {
+            lock (syncObject)
+            {
+                try
+                {
+                    Tracer.Debug("In Doubt notification received for TX id: " + this.transactionId);
+
+                    BeforeEnd();
+
+                    // Now notify the broker that Rollback should be performed.
+                    TransactionInfo info = new TransactionInfo();
+                    info.ConnectionId = this.connection.ConnectionId;
+                    info.TransactionId = this.transactionId;
+                    info.Type = (int)TransactionType.End;
+
+                    this.connection.CheckConnected();
+                    this.connection.SyncRequest(info);
+
+                    info.Type = (int)TransactionType.Rollback;
+                    this.connection.CheckConnected();
+                    this.connection.SyncRequest(info);
+
+                    Tracer.Debug("InDoubt Transaction Rollback Done TX id: " + this.transactionId);
+
+                    RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
+
+                    // if server responds that nothing needs to be done, then reply done.
+                    enlistment.Done();
+
+                    AfterRollback();
+                }
+                finally
+                {
+                    this.currentEnlistment = null;
+                    this.transactionId = null;
+                    this.netTxState = TxState.None;
+
+                    CountDownLatch latch = this.recoveryComplete;
+                    if (latch != null)
+                    {
+                        latch.countDown();
+                    }
+
+                    this.dtcControlEvent.Set();
+                }
+            }
+        }
+
+        #endregion
+
+        #region Distributed Transaction Recovery Bits
+
+        private volatile CountDownLatch recoveryComplete;
+
+        /// <summary>
+        /// Should be called from NetTxSession when created to check if any TX
+        /// data is stored for recovery and whether the Broker has matching info
+        /// stored.  If an Transaction is found that belongs to this client and is
+        /// still alive on the Broker it will be recovered, otherwise the stored 
+        /// data should be cleared.
+        /// </summary>
+        public void InitializeDtcTxContext()
+        {
+            // initialize the logger with the current Resource Manager Id
+            RecoveryLogger.Initialize(ResourceManagerId);
+
+            KeyValuePair<XATransactionId, byte[]>[] localRecoverables = RecoveryLogger.GetRecoverables();
+            if (localRecoverables.Length == 0)
+            {
+                Tracer.Debug("Did not detect any open DTC transaction records on disk.");
+                // No local data so anything stored on the broker can't be recovered here.
+                return;
+            }
+
+            XATransactionId[] recoverables = TryRecoverBrokerTXIds();
+            if (recoverables.Length == 0)
+            {
+                Tracer.Debug("Did not detect any recoverable transactions at Broker.");
+                // Broker has no recoverable data so nothing to do here, delete the 
+                // old recovery log as its stale.
+                RecoveryLogger.Purge();
+                return;
+            }
+
+            List<KeyValuePair<XATransactionId, byte[]>> matches = new List<KeyValuePair<XATransactionId, byte[]>>();
+
+            foreach (XATransactionId recoverable in recoverables)
+            {
+                foreach (KeyValuePair<XATransactionId, byte[]> entry in localRecoverables)
+                {
+                    if (entry.Key.Equals(recoverable))
+                    {
+                        Tracer.DebugFormat("Found a matching TX on Broker to stored Id: {0} reenlisting.", entry.Key);
+                        matches.Add(entry);
+                    }
+                }
+            }
+
+            if (matches.Count != 0)
+            {
+                this.recoveryComplete = new CountDownLatch(matches.Count);
+
+                foreach (KeyValuePair<XATransactionId, byte[]> recoverable in matches)
+                {
+                    this.transactionId = recoverable.Key;
+                    Tracer.Info("Reenlisting recovered TX with Id: " + this.transactionId);
+                    this.currentEnlistment =
+                        TransactionManager.Reenlist(ResourceManagerGuid, recoverable.Value, this);
+                }
+
+                this.recoveryComplete.await();
+                Tracer.Debug("All Recovered TX enlistments Reports complete, Recovery Complete.");
+                TransactionManager.RecoveryComplete(ResourceManagerGuid);
+                return;
+            }
+
+            // The old recovery information doesn't match what's on the broker so we
+            // should discard it as its stale now.
+            RecoveryLogger.Purge();
+        }
+
+        private XATransactionId[] TryRecoverBrokerTXIds()
+        {
+            Tracer.Debug("Checking for Recoverable Transactions on Broker.");
+
+            TransactionInfo info = new TransactionInfo();
+            info.ConnectionId = this.session.Connection.ConnectionId;
+            info.Type = (int)TransactionType.Recover;
+
+            this.connection.CheckConnected();
+            DataArrayResponse response = this.connection.SyncRequest(info) as DataArrayResponse;
+
+            if (response != null && response.Data.Length > 0)
+            {
+                Tracer.DebugFormat("Broker reports there are {0} recoverable XA Transactions", response.Data.Length);
+
+                List<XATransactionId> recovered = new List<XATransactionId>();
+
+                foreach (DataStructure ds in response.Data)
+                {
+                    XATransactionId xid = ds as XATransactionId;
+                    if (xid != null)
+                    {
+                        recovered.Add(xid);
+                    }
+                }
+
+                return recovered.ToArray();
+            }
+
+            return new XATransactionId[0];
+        }
+
+        #endregion
+
+        internal IRecoveryLogger RecoveryLogger
+        {
+            get { return (this.connection as NetTxConnection).RecoveryPolicy.RecoveryLogger; }
+        }
+
+        internal string ResourceManagerId
+        {
+            get { return (this.connection as NetTxConnection).ResourceManagerGuid.ToString(); }
+        }
+
+        internal Guid ResourceManagerGuid
+        {
+            get { return (this.connection as NetTxConnection).ResourceManagerGuid; }
+        }
+
+    }
+}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=1484526&r1=1484525&r2=1484526&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Mon May 20 16:57:22 2013
@@ -304,11 +304,6 @@ namespace Apache.NMS.ActiveMQ
                 return;
             }
 
-            if(disposing)
-            {
-                // Dispose managed code here.
-            }
-
             try
             {
                 // Force a Stop when we are Disposing vs a Normal Close.
@@ -344,10 +339,10 @@ namespace Apache.NMS.ActiveMQ
         internal void DoClose()
         {
 			Shutdown();
-            RemoveInfo info = new RemoveInfo();
-            info.ObjectId = this.info.SessionId;
-            info.LastDeliveredSequenceId = this.lastDeliveredSequenceId;
-            this.connection.Oneway(info);
+            RemoveInfo removeInfo = new RemoveInfo();
+            removeInfo.ObjectId = this.info.SessionId;
+            removeInfo.LastDeliveredSequenceId = this.lastDeliveredSequenceId;
+            this.connection.Oneway(removeInfo);
 		}
 		
         internal void Shutdown()

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs?rev=1484526&r1=1484525&r2=1484526&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs Mon May 20 16:57:22 2013
@@ -15,15 +15,8 @@
  * limitations under the License.
  */
 
-using System;
-using System.Text;
-using System.Threading;
-using System.Transactions;
 using System.Collections;
-using System.Collections.Generic;
-using Apache.NMS.Util;
 using Apache.NMS.ActiveMQ.Commands;
-using Apache.NMS.ActiveMQ.Transactions;
 
 namespace Apache.NMS.ActiveMQ
 {
@@ -35,16 +28,12 @@ namespace Apache.NMS.ActiveMQ
 
 namespace Apache.NMS.ActiveMQ
 {
-	public class TransactionContext : ISinglePhaseNotification
+	public class TransactionContext
     {
-        private const int XA_OK = 0;
-        private const int XA_READONLY = 3;
-
-        private TransactionId transactionId;
-        private readonly Session session;
-        private readonly Connection connection;
-        private readonly ArrayList synchronizations = ArrayList.Synchronized(new ArrayList());
-        private Enlistment currentEnlistment;
+        protected TransactionId transactionId;
+        protected readonly Session session;
+        protected readonly Connection connection;
+        protected readonly ArrayList synchronizations = ArrayList.Synchronized(new ArrayList());
 
         public TransactionContext(Session session)
 		{
@@ -57,9 +46,9 @@ namespace Apache.NMS.ActiveMQ
             get{ return this.transactionId != null; }
         }
 
-        public bool InLocalTransaction
+        public virtual bool InLocalTransaction
         {
-            get{ return this.transactionId != null && this.currentEnlistment == null; }
+            get{ return this.transactionId != null; }
         }
 
         public TransactionId TransactionId
@@ -67,9 +56,6 @@ namespace Apache.NMS.ActiveMQ
             get { return transactionId; }
         }
         
-        /// <summary>
-        /// Method AddSynchronization
-        /// </summary>
         public void AddSynchronization(ISynchronization synchronization)
         {
             synchronizations.Add(synchronization);
@@ -80,7 +66,7 @@ namespace Apache.NMS.ActiveMQ
             synchronizations.Remove(synchronization);
         }
         
-        public void Begin()
+        public virtual void Begin()
         {
             if(!InTransaction)
             {
@@ -93,19 +79,16 @@ namespace Apache.NMS.ActiveMQ
                 
                 this.session.Connection.Oneway(info);
 
-                if(this.TransactionStartedListener != null)
-                {
-                    this.TransactionStartedListener(this.session);
-                }
+                SignalTransactionStarted();
 
                 if(Tracer.IsDebugEnabled)
                 {
-                    Tracer.Debug("Begin:" + this.transactionId.ToString());
+                    Tracer.Debug("Begin:" + this.transactionId);
                 }
             }
         }
         
-        public void Rollback()
+        public virtual void Rollback()
         {
             if(InTransaction)
             {
@@ -130,7 +113,7 @@ namespace Apache.NMS.ActiveMQ
             }
         }
         
-        public void Commit()
+        public virtual void Commit()
         {
             if(InTransaction)
             {
@@ -177,10 +160,7 @@ namespace Apache.NMS.ActiveMQ
                         synchronization.AfterCommit();
                     }
 
-                    if(this.TransactionCommittedListener != null)
-                    {
-                        this.TransactionCommittedListener(this.session);
-                    }
+                    SignalTransactionCommitted();
                 }
             }
             finally
@@ -200,10 +180,7 @@ namespace Apache.NMS.ActiveMQ
                         synchronization.AfterRollback();
                     }
 
-                    if(this.TransactionRolledBackListener != null)
-                    {
-                        this.TransactionRolledBackListener(this.session);
-                    }
+                    SignalTransactionRolledBack();
                 }
             }
             finally
@@ -218,547 +195,31 @@ namespace Apache.NMS.ActiveMQ
         public event SessionTxEventDelegate TransactionCommittedListener;
         public event SessionTxEventDelegate TransactionRolledBackListener;
 
-        #endregion
-
-        #region Transaction Members used when dealing with .NET System Transactions.
-
-        // When DTC calls prepare we must then wait for either the TX to commit, rollback or
-        // be canceled because its in doubt.
-        private readonly ManualResetEvent dtcControlEvent = new ManualResetEvent(true);
-
-        // Once the DTC calls prepare we lock this object and don't unlock it again until
-        // the TX has either completed or terminated, the users of this class should use
-        // this sync point when the TX is a DTC version as opposed to a local one.
-        private readonly object syncObject = new Mutex();
-
-	    public enum TxState
-	    {
-	        None = 0, Active = 1, Pending = 2
-	    }
-
-	    private TxState netTxState = TxState.None;
-
-        public object SyncRoot
-	    {
-            get { return this.syncObject; }
-	    }
-
-        public bool InNetTransaction
-        {
-            get{ return this.transactionId != null && this.transactionId is XATransactionId; }
-        }
-
-        public TxState NetTxState
+        protected void SignalTransactionStarted()
         {
-            get
+            if (this.TransactionStartedListener != null)
             {
-                return this.netTxState; 
+                this.TransactionStartedListener(this.session);
             }
         }
 
-	    public WaitHandle DtcWaitHandle
-	    {
-            get { return dtcControlEvent; }
-	    }
-
-        public void Begin(Transaction transaction)
+        protected void SignalTransactionCommitted()
         {
-            lock (syncObject)
+            if (this.TransactionCommittedListener != null)
             {
-                this.netTxState = TxState.Active;
-                dtcControlEvent.Reset();
-
-                Tracer.Debug("Begin notification received");
-
-                if (InNetTransaction)
-                {
-                    throw new TransactionInProgressException("A Transaction is already in Progress");
-                }
-
-                try
-                {
-                    Guid rmId = ResourceManagerGuid;
-
-                    // Enlist this object in the transaction.
-                    this.currentEnlistment =
-                        transaction.EnlistDurable(rmId, this, EnlistmentOptions.None);
-
-                    Tracer.Debug("Enlisted in Durable Transaction with RM Id: " + rmId);
-
-                    TransactionInformation txInfo = transaction.TransactionInformation;
-
-                    XATransactionId xaId = new XATransactionId();
-                    this.transactionId = xaId;
-
-                    if (txInfo.DistributedIdentifier != Guid.Empty)
-                    {
-                        xaId.GlobalTransactionId = txInfo.DistributedIdentifier.ToByteArray();
-                        xaId.BranchQualifier = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
-                    }
-                    else
-                    {
-                        xaId.GlobalTransactionId = Encoding.UTF8.GetBytes(txInfo.LocalIdentifier);
-                        xaId.BranchQualifier = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
-                    }
-
-                    // Now notify the broker that a new XA'ish transaction has started.
-                    TransactionInfo info = new TransactionInfo();
-                    info.ConnectionId = this.connection.ConnectionId;
-                    info.TransactionId = this.transactionId;
-                    info.Type = (int) TransactionType.Begin;
-
-                    this.session.Connection.Oneway(info);
-
-                    if (this.TransactionStartedListener != null)
-                    {
-                        this.TransactionStartedListener(this.session);
-                    }
-
-                    if (Tracer.IsDebugEnabled)
-                    {
-                        Tracer.Debug("Began XA'ish Transaction:" + xaId.GlobalTransactionId.ToString());
-                    }
-                }
-                catch (Exception)
-                {
-                    dtcControlEvent.Set();
-                    throw;
-                }
+                this.TransactionCommittedListener(this.session);
             }
         }
 
-        public void Prepare(PreparingEnlistment preparingEnlistment)
+        protected void SignalTransactionRolledBack()
         {
-            lock (this.syncObject)
+            if (this.TransactionRolledBackListener != null)
             {
-                this.netTxState = TxState.Pending;
-
-                try
-                {
-                    Tracer.Debug("Prepare notification received for TX id: " + this.transactionId);
-
-                    BeforeEnd();
-
-                    // Before sending the request to the broker, log the recovery bits, if
-                    // this fails we can't prepare and the TX should be rolled back.
-                    RecoveryLogger.LogRecoveryInfo(this.transactionId as XATransactionId,
-                                                   preparingEnlistment.RecoveryInformation());
-
-                    // Inform the broker that work on the XA'sh TX Branch is complete.
-                    TransactionInfo info = new TransactionInfo();
-                    info.ConnectionId = this.connection.ConnectionId;
-                    info.TransactionId = this.transactionId;
-                    info.Type = (int) TransactionType.End;
-
-                    this.connection.CheckConnected();
-                    this.connection.SyncRequest(info);
-
-                    // Prepare the Transaction for commit.
-                    info.Type = (int) TransactionType.Prepare;
-                    IntegerResponse response = (IntegerResponse) this.connection.SyncRequest(info);
-                    if (response.Result == XA_READONLY)
-                    {
-                        Tracer.Debug("Transaction Prepare done and doesn't need a commit, TX id: " + this.transactionId);
-
-                        this.transactionId = null;
-                        this.currentEnlistment = null;
-
-                        // Read Only means there's nothing to recover because there was no
-                        // change on the broker.
-                        RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
-
-                        // if server responds that nothing needs to be done, then reply prepared
-                        // but clear the current state data so we appear done to the commit method.
-                        preparingEnlistment.Prepared();
-
-                        // Done so commit won't be called.
-                        AfterCommit();
-
-                        // A Read-Only TX is considered closed at this point, DTC won't call us again.
-                        this.dtcControlEvent.Set();
-                    }
-                    else
-                    {
-                        Tracer.Debug("Transaction Prepare succeeded TX id: " + this.transactionId);
-
-                        // If work finished correctly, reply prepared
-                        preparingEnlistment.Prepared();
-                    }
-                }
-                catch (Exception ex)
-                {
-                    Tracer.DebugFormat("Transaction[{0}] Prepare failed with error: {1}",
-                                       this.transactionId, ex.Message);
-
-                    AfterRollback();
-                    preparingEnlistment.ForceRollback();
-                    try
-                    {
-                        this.connection.OnException(ex);
-                    }
-                    catch (Exception error)
-                    {
-                        Tracer.Error(error.ToString());
-                    }
-
-                    this.currentEnlistment = null;
-                    this.transactionId = null;
-                    this.netTxState = TxState.None;
-                    this.dtcControlEvent.Set();
-                }
+                this.TransactionRolledBackListener(this.session);
             }
         }
 
-        public void Commit(Enlistment enlistment)
-        {
-            lock (this.syncObject)
-            {
-                try
-                {
-                    Tracer.Debug("Commit notification received for TX id: " + this.transactionId);
-
-                    if (this.transactionId != null)
-                    {
-                        // Now notify the broker that a new XA'ish transaction has completed.
-                        TransactionInfo info = new TransactionInfo();
-                        info.ConnectionId = this.connection.ConnectionId;
-                        info.TransactionId = this.transactionId;
-                        info.Type = (int) TransactionType.CommitTwoPhase;
-
-                        this.connection.CheckConnected();
-                        this.connection.SyncRequest(info);
-
-                        Tracer.Debug("Transaction Commit Done TX id: " + this.transactionId);
-
-                        RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
-
-                        // if server responds that nothing needs to be done, then reply done.
-                        enlistment.Done();
-
-                        AfterCommit();
-                    }
-                }
-                catch (Exception ex)
-                {
-                    Tracer.DebugFormat("Transaction[{0}] Commit failed with error: {1}",
-                                       this.transactionId, ex.Message);
-                    try
-                    {
-                        this.connection.OnException(ex);
-                    }
-                    catch (Exception error)
-                    {
-                        Tracer.Error(error.ToString());
-                    }
-                }
-                finally
-                {
-                    this.currentEnlistment = null;
-                    this.transactionId = null;
-                    this.netTxState = TxState.None;
-
-                    CountDownLatch latch = this.recoveryComplete;
-                    if (latch != null)
-                    {
-                        latch.countDown();
-                    }
-
-                    this.dtcControlEvent.Set();
-                }
-            }
-        }
-
-        public void SinglePhaseCommit(SinglePhaseEnlistment enlistment)
-        {
-            lock (this.syncObject)
-            {
-                try
-                {
-                    Tracer.Debug("Single Phase Commit notification received for TX id: " + this.transactionId);
-
-                    if (this.transactionId != null)
-                    {
-                        BeforeEnd();
-
-                        // Now notify the broker that a new XA'ish transaction has completed.
-                        TransactionInfo info = new TransactionInfo();
-                        info.ConnectionId = this.connection.ConnectionId;
-                        info.TransactionId = this.transactionId;
-                        info.Type = (int) TransactionType.CommitOnePhase;
-
-                        this.connection.CheckConnected();
-                        this.connection.SyncRequest(info);
-
-                        Tracer.Debug("Transaction Single Phase Commit Done TX id: " + this.transactionId);
-
-                        // if server responds that nothing needs to be done, then reply done.
-                        enlistment.Done();
-
-                        AfterCommit();
-                    }
-                }
-                catch (Exception ex)
-                {
-                    Tracer.DebugFormat("Transaction[{0}] Single Phase Commit failed with error: {1}",
-                                       this.transactionId, ex.Message);
-                    AfterRollback();
-                    enlistment.Done();
-                    try
-                    {
-                        this.connection.OnException(ex);
-                    }
-                    catch (Exception error)
-                    {
-                        Tracer.Error(error.ToString());
-                    }
-                }
-                finally
-                {
-                    this.currentEnlistment = null;
-                    this.transactionId = null;
-                    this.netTxState = TxState.None;
-
-                    this.dtcControlEvent.Set();
-                }
-            }
-        }
-		
-        public void Rollback(Enlistment enlistment)
-        {
-            lock (this.syncObject)
-            {
-                try
-                {
-                    Tracer.Debug("Rollback notification received for TX id: " + this.transactionId);
-
-                    if (this.transactionId != null)
-                    {
-                        BeforeEnd();
-
-                        // Now notify the broker that a new XA'ish transaction has started.
-                        TransactionInfo info = new TransactionInfo();
-                        info.ConnectionId = this.connection.ConnectionId;
-                        info.TransactionId = this.transactionId;
-                        info.Type = (int) TransactionType.End;
-
-                        this.connection.CheckConnected();
-                        this.connection.SyncRequest(info);
-
-                        info.Type = (int) TransactionType.Rollback;
-                        this.connection.CheckConnected();
-                        this.connection.SyncRequest(info);
-
-                        Tracer.Debug("Transaction Rollback Done TX id: " + this.transactionId);
-
-                        RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
-
-                        // if server responds that nothing needs to be done, then reply done.
-                        enlistment.Done();
-
-                        AfterRollback();
-                    }
-                }
-                catch (Exception ex)
-                {
-                    Tracer.DebugFormat("Transaction[{0}] Rollback failed with error: {1}",
-                                       this.transactionId, ex.Message);
-                    AfterRollback();
-                    try
-                    {
-                        this.connection.OnException(ex);
-                    }
-                    catch (Exception error)
-                    {
-                        Tracer.Error(error.ToString());
-                    }
-                }
-                finally
-                {
-                    this.currentEnlistment = null;
-                    this.transactionId = null;
-                    this.netTxState = TxState.None;
-
-                    CountDownLatch latch = this.recoveryComplete;
-                    if (latch != null)
-                    {
-                        latch.countDown();
-                    }
-
-                    this.dtcControlEvent.Set();
-                }
-            }
-        }
-
-        public void InDoubt(Enlistment enlistment)
-        {
-            lock (syncObject)
-            {
-                try
-                {
-                    Tracer.Debug("In Doubt notification received for TX id: " + this.transactionId);
-
-                    BeforeEnd();
-
-                    // Now notify the broker that Rollback should be performed.
-                    TransactionInfo info = new TransactionInfo();
-                    info.ConnectionId = this.connection.ConnectionId;
-                    info.TransactionId = this.transactionId;
-                    info.Type = (int) TransactionType.End;
-
-                    this.connection.CheckConnected();
-                    this.connection.SyncRequest(info);
-
-                    info.Type = (int) TransactionType.Rollback;
-                    this.connection.CheckConnected();
-                    this.connection.SyncRequest(info);
-
-                    Tracer.Debug("InDoubt Transaction Rollback Done TX id: " + this.transactionId);
-
-                    RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
-
-                    // if server responds that nothing needs to be done, then reply done.
-                    enlistment.Done();
-
-                    AfterRollback();
-                }
-                finally
-                {
-                    this.currentEnlistment = null;
-                    this.transactionId = null;
-                    this.netTxState = TxState.None;
-
-                    CountDownLatch latch = this.recoveryComplete;
-                    if (latch != null)
-                    {
-                        latch.countDown();
-                    }
-
-                    this.dtcControlEvent.Set();
-                }
-            }
-        }
-
-        #endregion
-
-        #region Distributed Transaction Recovery Bits
-
-	    private volatile CountDownLatch recoveryComplete = null;
-
-        /// <summary>
-        /// Should be called from NetTxSession when created to check if any TX
-        /// data is stored for recovery and whether the Broker has matching info
-        /// stored.  If an Transaction is found that belongs to this client and is
-        /// still alive on the Broker it will be recovered, otherwise the stored 
-        /// data should be cleared.
-        /// </summary>
-        public void InitializeDtcTxContext()
-        {
-            // initialize the logger with the current Resource Manager Id
-            RecoveryLogger.Initialize(ResourceManagerId);
-
-            KeyValuePair<XATransactionId, byte[]>[] localRecoverables = RecoveryLogger.GetRecoverables();
-            if (localRecoverables.Length == 0)
-            {
-                Tracer.Debug("Did not detect any open DTC transaction records on disk.");
-                // No local data so anything stored on the broker can't be recovered here.
-                return;
-            }
-
-            XATransactionId[] recoverables = TryRecoverBrokerTXIds();
-            if (recoverables.Length == 0)
-            {
-                Tracer.Debug("Did not detect any recoverable transactions at Broker.");
-                // Broker has no recoverable data so nothing to do here, delete the 
-                // old recovery log as its stale.
-                RecoveryLogger.Purge();
-                return;
-            }
-
-            List<KeyValuePair<XATransactionId, byte[]>> matches = new List<KeyValuePair<XATransactionId, byte[]>>();
-
-            foreach(XATransactionId recoverable in recoverables)
-            {
-                foreach(KeyValuePair<XATransactionId, byte[]> entry in localRecoverables)
-                {
-                    if(entry.Key.Equals(recoverable))
-                    {
-                        Tracer.DebugFormat("Found a matching TX on Broker to stored Id: {0} reenlisting.", entry.Key);
-                        matches.Add(entry);
-                    }
-                }
-            }
-
-            if (matches.Count != 0)
-            {
-                this.recoveryComplete = new CountDownLatch(matches.Count);
-
-                foreach (KeyValuePair<XATransactionId, byte[]> recoverable in matches)
-                {
-                    this.transactionId = recoverable.Key;
-                    Tracer.Info("Reenlisting recovered TX with Id: " + this.transactionId);
-                    this.currentEnlistment = 
-                        TransactionManager.Reenlist(ResourceManagerGuid, recoverable.Value, this);
-                }
-
-                this.recoveryComplete.await();
-                Tracer.Debug("All Recovered TX enlistments Reports complete, Recovery Complete.");
-                TransactionManager.RecoveryComplete(ResourceManagerGuid);
-                return;
-            }
-
-            // The old recovery information doesn't match what's on the broker so we
-            // should discard it as its stale now.
-            RecoveryLogger.Purge();
-        }
-
-        private XATransactionId[] TryRecoverBrokerTXIds()
-        {
-            Tracer.Debug("Checking for Recoverable Transactions on Broker.");
-
-            TransactionInfo info = new TransactionInfo();
-            info.ConnectionId = this.session.Connection.ConnectionId;
-            info.Type = (int)TransactionType.Recover;
-
-            this.connection.CheckConnected();
-            DataArrayResponse response = this.connection.SyncRequest(info) as DataArrayResponse;
-
-            if (response != null && response.Data.Length > 0)
-            {
-                Tracer.DebugFormat("Broker reports there are {0} recoverable XA Transactions", response.Data.Length);
-
-                List<XATransactionId> recovered = new List<XATransactionId>();
-
-                foreach (DataStructure ds in response.Data)
-                {
-                    XATransactionId xid = ds as XATransactionId;
-                    if (xid != null)
-                    {
-                        recovered.Add(xid);
-                    }
-                }
-
-                return recovered.ToArray();
-            }
-
-            return new XATransactionId[0];
-        }
-
-        #endregion
-
-        internal IRecoveryLogger RecoveryLogger
-        {
-            get { return (this.connection as NetTxConnection).RecoveryPolicy.RecoveryLogger; }
-        }
-
-        internal string ResourceManagerId
-        {
-            get { return (this.connection as NetTxConnection).ResourceManagerGuid.ToString(); }
-        }
-
-        internal Guid ResourceManagerGuid
-        {
-            get { return (this.connection as NetTxConnection).ResourceManagerGuid; }
-        }
+	    #endregion
 
     }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj?rev=1484526&r1=1484525&r2=1484526&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj Mon May 20 16:57:22 2013
@@ -318,6 +318,7 @@
     <Compile Include="src\main\csharp\NetTxMessageConsumer.cs" />
     <Compile Include="src\main\csharp\NetTxRecoveryPolicy.cs" />
     <Compile Include="src\main\csharp\NetTxSession.cs" />
+    <Compile Include="src\main\csharp\NetTxTransactionContext.cs" />
     <Compile Include="src\main\csharp\OpenWire\BaseDataStreamMarshaller.cs">
       <SubType>Code</SubType>
     </Compile>



Mime
View raw message