activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1030680 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp: MessageConsumer.cs NetTxConnection.cs NetTxConnectionFactory.cs NetTxSession.cs Session.cs TransactionContext.cs
Date Wed, 03 Nov 2010 20:59:54 GMT
Author: tabish
Date: Wed Nov  3 20:59:54 2010
New Revision: 1030680

URL: http://svn.apache.org/viewvc?rev=1030680&view=rev
Log:
fix for: https://issues.apache.org/activemq/browse/AMQNET-290

Creates an .NET System.Transactions Session that will join in on the current Ambient Transaction
whenever it detects one.  

Added:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
  (with props)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnectionFactory.cs
  (with props)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs   (with
props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1030680&r1=1030679&r2=1030680&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
Wed Nov  3 20:59:54 2010
@@ -504,6 +504,7 @@ namespace Apache.NMS.ActiveMQ
 						ack = MakeAckForAllDeliveredMessages(AckType.DeliveredAck);
 						if(ack != null)
 						{
+                            Tracer.Debug("Consumer - DeliverAcks clearing the Dispatch list");
 							this.dispatchedMessages.Clear();
 						}
 						else
@@ -853,6 +854,8 @@ namespace Apache.NMS.ActiveMQ
 
 				if(!synchronizationRegistered)
 				{
+                    Tracer.DebugFormat("Consumer {0} Registering new MessageConsumerSynchronization",
+                                       this.info.ConsumerId);
 					this.synchronizationRegistered = true;
 					this.session.TransactionContext.AddSynchronization(new MessageConsumerSynchronization(this));
 				}
@@ -963,6 +966,8 @@ namespace Apache.NMS.ActiveMQ
 				{
 					if(this.dispatchedMessages.Count == 0)
 					{
+                        Tracer.DebugFormat("Consumer {0} Rolled Back, no dispatched Messages",
+                                           this.info.ConsumerId);
 						return;
 					}
 
@@ -1020,9 +1025,15 @@ namespace Apache.NMS.ActiveMQ
 						// stop the delivery of messages.
 						this.unconsumedMessages.Stop();
 
+                        if(Tracer.IsDebugEnabled)
+                        {
+                            Tracer.DebugFormat("Consumer {0} Rolled Back, Re-enque {1} messages",
+                                               this.info.ConsumerId, this.dispatchedMessages.Count);
+                        }
+
 						foreach(MessageDispatch dispatch in this.dispatchedMessages)
 						{
-							this.unconsumedMessages.EnqueueFirst(dispatch);
+                            this.unconsumedMessages.EnqueueFirst(dispatch);
 						}
 
 						if(redeliveryDelay > 0 && !this.unconsumedMessages.Closed)
@@ -1155,18 +1166,24 @@ namespace Apache.NMS.ActiveMQ
 
 			public void BeforeEnd()
 			{
+                Tracer.DebugFormat("MessageConsumerSynchronization - BeforeEnd Called for
Consumer {0}.",
+                                   this.consumer.ConsumerId);
 				this.consumer.Acknowledge();
 				this.consumer.synchronizationRegistered = false;
 			}
 
 			public void AfterCommit()
 			{
+                Tracer.DebugFormat("MessageConsumerSynchronization - AfterCommit Called for
Consumer {0}.",
+                                   this.consumer.ConsumerId);
 				this.consumer.Commit();
 				this.consumer.synchronizationRegistered = false;
 			}
 
 			public void AfterRollback()
 			{
+                Tracer.DebugFormat("MessageConsumerSynchronization - AfterRollback Called
for Consumer {0}.",
+                                   this.consumer.ConsumerId);
 				this.consumer.Rollback();
 				this.consumer.synchronizationRegistered = false;
 			}

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs?rev=1030680&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
(added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
Wed Nov  3 20:59:54 2010
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS;
+using Apache.NMS.ActiveMQ.Transport;
+using Apache.NMS.ActiveMQ.Util;
+
+namespace Apache.NMS.ActiveMQ
+{
+    public class NetTxConnection : Connection, INetTxConnection
+    {
+        public NetTxConnection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
+            : base(connectionUri, transport, clientIdGenerator)
+        {
+        }
+
+        public INetTxSession CreateNetTxSession()
+        {
+            return (INetTxSession) CreateSession(AcknowledgementMode.Transactional);
+        }
+
+        protected override Session CreateAtiveMQSession(AcknowledgementMode ackMode)
+        {
+            CheckConnected();
+            return new NetTxSession(this, NextSessionId);
+        }
+
+    }
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnectionFactory.cs?rev=1030680&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnectionFactory.cs
(added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnectionFactory.cs
Wed Nov  3 20:59:54 2010
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Apache.NMS.ActiveMQ.Transport;
+
+namespace Apache.NMS.ActiveMQ
+{
+    public class NetTxConnectionFactory : ConnectionFactory, INetTxConnectionFactory
+    {
+        public NetTxConnectionFactory() : base(GetDefaultBrokerUrl())
+        {
+        }
+
+        public NetTxConnectionFactory(string brokerUri) : base(brokerUri, null)
+        {
+        }
+
+        public NetTxConnectionFactory(string brokerUri, string clientID)
+            : base(brokerUri, clientID)
+        {
+        }
+
+        public NetTxConnectionFactory(Uri brokerUri)
+            : base(brokerUri, null)
+        {
+        }
+
+        public NetTxConnectionFactory(Uri brokerUri, string clientID)
+            : base(brokerUri, clientID)
+        {
+        }
+
+        public INetTxConnection CreateNetTxConnection()
+        {
+            return (INetTxConnection) base.CreateActiveMQConnection();
+        }
+
+        public INetTxConnection CreateNetTxConnection(string userName, string password)
+        {
+            return (INetTxConnection) base.CreateActiveMQConnection(userName, password);
+        }
+
+        protected override Connection CreateActiveMQConnection(ITransport transport)
+        {
+            return new NetTxConnection(this.BrokerUri, transport, this.ClientIdGenerator);
+        }
+
+    }
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnectionFactory.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs?rev=1030680&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs Wed
Nov  3 20:59:54 2010
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Transactions;
+using Apache.NMS;
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ
+{
+    public class NetTxSession : Session, INetTxSession
+    {
+        public NetTxSession(Connection connection, SessionId id)
+            : base(connection, id, AcknowledgementMode.AutoAcknowledge)
+        {
+        }
+
+        /// <summary>
+        /// Reports Transacted whenever there is an Ambient Transaction or the internal
+        /// TransactionContext is still involed in a .NET Transaction beyond the lifetime
+        /// of an ambient transaction (can happen during a scoped transaction disposing
+        /// without Complete being called and a Rollback is in progress.)
+        /// </summary>
+        public override bool IsTransacted
+        {
+            get { return Transaction.Current != null || TransactionContext.InNetTransaction;
}
+        }
+
+        public override bool IsAutoAcknowledge
+        {
+            // When not in a .NET Transaction we assume Auto Ack.
+            get { return true; }
+        }
+
+        internal override void DoRollback()
+        {
+            // Only the Transaction Manager can do this when in a .NET Transaction.
+            throw new TransactionInProgressException("Cannot Rollback() inside an NetTxSession");
+        }
+
+        internal override void DoCommit()
+        {
+            // Only the Transaction Manager can do this when in a .NET Transaction.
+            throw new TransactionInProgressException("Cannot Commit() inside an NetTxSession");
+        }
+
+        internal override void DoStartTransaction()
+        {
+            if(!TransactionContext.InNetTransaction)
+            {
+                if(Transaction.Current != null)
+                {
+                    Tracer.Debug("NetTxSession detected Ambient Transaction, start new TX
with broker");
+
+                    // Start a new .NET style transaction, this could be distributed
+                    // or it could just be a Local transaction that could become
+                    // distributed later.
+                    TransactionContext.Begin(Transaction.Current);
+                }
+            }
+        }
+
+    }
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=1030680&r1=1030679&r2=1030680&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Wed Nov
 3 20:59:54 2010
@@ -71,11 +71,7 @@ namespace Apache.NMS.ActiveMQ
 			this.acknowledgementMode = acknowledgementMode;
 			this.requestTimeout = connection.RequestTimeout;
 			this.dispatchAsync = connection.DispatchAsync;
-
-			if(acknowledgementMode == AcknowledgementMode.Transactional)
-			{
-				this.transactionContext = new TransactionContext(this);
-			}
+		    this.transactionContext = new TransactionContext(this);
 
             Uri brokerUri = connection.BrokerUri;
 
@@ -186,39 +182,39 @@ namespace Apache.NMS.ActiveMQ
 			set { this.requestTimeout = value; }
 		}
 
-		public bool Transacted
-		{
-			get { return this.AcknowledgementMode == AcknowledgementMode.Transactional; }
-		}
+        public bool Transacted
+        {
+            get { return this.IsTransacted; }
+        }
 
-		public AcknowledgementMode AcknowledgementMode
+		public virtual AcknowledgementMode AcknowledgementMode
 		{
 			get { return this.acknowledgementMode; }
 		}
 
-		public bool IsClientAcknowledge
+		public virtual bool IsClientAcknowledge
 		{
 			get { return this.acknowledgementMode == AcknowledgementMode.ClientAcknowledge; }
 		}
 
-		public bool IsAutoAcknowledge
+		public virtual bool IsAutoAcknowledge
 		{
 			get { return this.acknowledgementMode == AcknowledgementMode.AutoAcknowledge; }
 		}
 
-		public bool IsDupsOkAcknowledge
+		public virtual bool IsDupsOkAcknowledge
 		{
 			get { return this.acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge; }
 		}
 
-		public bool IsIndividualAcknowledge
+		public virtual bool IsIndividualAcknowledge
 		{
 			get { return this.acknowledgementMode == AcknowledgementMode.IndividualAcknowledge; }
 		}
 
-		public bool IsTransacted
+		public virtual bool IsTransacted
 		{
-			get { return this.acknowledgementMode == AcknowledgementMode.Transactional; }
+			get{ return this.acknowledgementMode == AcknowledgementMode.Transactional; }
 		}
 
 		public SessionExecutor Executor
@@ -315,25 +311,21 @@ namespace Apache.NMS.ActiveMQ
 
 				try
 				{
-					Tracer.InfoFormat("Closing The Session with Id {0}", this.info.SessionId.ToString());
-					DoClose();
-					Tracer.InfoFormat("Closed The Session with Id {0}", this.info.SessionId.ToString());
+                    if(transactionContext.InNetTransaction)
+                    {
+                        this.transactionContext.AddSynchronization(new SessionCloseSynchronization(this));
+                    }
+                    else
+                    {
+    					Tracer.InfoFormat("Closing The Session with Id {0}", this.info.SessionId.ToString());
+    					DoClose();
+    					Tracer.InfoFormat("Closed The Session with Id {0}", this.info.SessionId.ToString());
+                    }
 				}
 				catch(Exception ex)
 				{
 					Tracer.ErrorFormat("Error during session close: {0}", ex);
 				}
-				finally
-				{
-					// Make sure we attempt to inform the broker this Session is done.
-					RemoveInfo info = new RemoveInfo();
-					info.ObjectId = this.info.SessionId;
-					info.LastDeliveredSequenceId = this.lastDeliveredSequenceId;
-					this.connection.Oneway(info);
-					this.connection = null;
-					this.closed = true;
-					this.closing = false;
-				}
 			}
 		}
 
@@ -393,8 +385,14 @@ namespace Apache.NMS.ActiveMQ
 				}
 				finally
 				{
-					this.closed = true;
-					this.closing = false;
+                    // Make sure we attempt to inform the broker this Session is done.
+                    RemoveInfo info = new RemoveInfo();
+                    info.ObjectId = this.info.SessionId;
+                    info.LastDeliveredSequenceId = this.lastDeliveredSequenceId;
+                    this.connection.Oneway(info);
+                    this.connection = null;
+                    this.closed = true;
+                    this.closing = false;
 				}
 			}
 		}
@@ -657,35 +655,21 @@ namespace Apache.NMS.ActiveMQ
 
 		public void Commit()
 		{
-			if(!Transacted)
-			{
-				throw new InvalidOperationException(
-						"You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is:
"
-						+ this.AcknowledgementMode);
-			}
-
-			this.TransactionContext.Commit();
+            this.DoCommit();
 		}
 
 		public void Rollback()
 		{
-			if(!Transacted)
-			{
-				throw new InvalidOperationException(
-						"You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is:
"
-						+ this.AcknowledgementMode);
-			}
-
-			this.TransactionContext.Rollback();
+            this.DoRollback();
 		}
 
 		#endregion
 
-		public void DoSend( ActiveMQMessage message, MessageProducer producer, MemoryUsage producerWindow,
TimeSpan sendTimeout )
+		internal void DoSend( ActiveMQMessage message, MessageProducer producer, MemoryUsage producerWindow,
TimeSpan sendTimeout )
 		{
 			ActiveMQMessage msg = message;
 
-			if(Transacted)
+			if(IsTransacted)
 			{
 				DoStartTransaction();
 				msg.TransactionId = TransactionContext.TransactionId;
@@ -729,12 +713,36 @@ namespace Apache.NMS.ActiveMQ
 			}
 		}
 
+        internal virtual void DoCommit()
+        {
+            if(!IsTransacted)
+            {
+                throw new InvalidOperationException(
+                        "You cannot perform a Commit() on a non-transacted session. Acknowlegement
mode is: "
+                        + this.AcknowledgementMode);
+            }
+
+            this.TransactionContext.Commit();
+        }
+
+        internal virtual void DoRollback()
+        {
+            if(!IsTransacted)
+            {
+                throw new InvalidOperationException(
+                        "You cannot perform a Commit() on a non-transacted session. Acknowlegement
mode is: "
+                        + this.AcknowledgementMode);
+            }
+
+            this.TransactionContext.Rollback();
+        }
+
 		/// <summary>
 		/// Ensures that a transaction is started
 		/// </summary>
-		public void DoStartTransaction()
+		internal virtual void DoStartTransaction()
 		{
-			if(Transacted)
+			if(IsTransacted)
 			{
 				this.TransactionContext.Begin();
 			}
@@ -929,5 +937,29 @@ namespace Apache.NMS.ActiveMQ
 		{
 		}
 
+        class SessionCloseSynchronization : ISynchronization
+        {
+            private readonly Session session;
+
+            public SessionCloseSynchronization(Session session)
+            {
+                this.session = session;
+            }
+
+            public void BeforeEnd()
+            {
+            }
+
+            public void AfterCommit()
+            {
+                this.session.DoClose();
+            }
+
+            public void AfterRollback()
+            {
+                this.session.DoClose();
+            }
+        }
+
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs?rev=1030680&r1=1030679&r2=1030680&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
Wed Nov  3 20:59:54 2010
@@ -14,6 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+using System;
+using System.Text;
+using System.Net;
+using System.Transactions;
 using System.Collections;
 using Apache.NMS.ActiveMQ.Commands;
 
@@ -27,15 +32,21 @@ namespace Apache.NMS.ActiveMQ
 
 namespace Apache.NMS.ActiveMQ
 {
-	public class TransactionContext
+	public class TransactionContext : IEnlistmentNotification
     {
+        private const int XA_OK = 0;
+        private const int XA_READONLY = 3;
+
         private TransactionId transactionId;
         private readonly Session session;
+        private readonly Connection connection;
         private readonly ArrayList synchronizations = ArrayList.Synchronized(new ArrayList());
-        
+        private Enlistment currentEnlistment;
+
         public TransactionContext(Session session)
 		{
             this.session = session;
+            this.connection = session.Connection;
         }
 
         public bool InTransaction
@@ -103,7 +114,6 @@ namespace Apache.NMS.ActiveMQ
                 this.session.Connection.SyncRequest(info);
     
                 this.AfterRollback();
-                this.synchronizations.Clear();
             }
         }
         
@@ -129,7 +139,6 @@ namespace Apache.NMS.ActiveMQ
                 this.session.Connection.SyncRequest(info);
                 
                 this.AfterCommit();
-                this.synchronizations.Clear();
             }
         }
 
@@ -146,24 +155,293 @@ namespace Apache.NMS.ActiveMQ
 
         internal void AfterCommit()
         {
-            lock(this.synchronizations.SyncRoot)
+            try
             {
-                foreach(ISynchronization synchronization in this.synchronizations)
+                lock(this.synchronizations.SyncRoot)
                 {
-                    synchronization.AfterCommit();
+                    foreach(ISynchronization synchronization in this.synchronizations)
+                    {
+                        synchronization.AfterCommit();
+                    }
                 }
             }
+            finally
+            {
+                synchronizations.Clear();
+            }
         }
 
         internal void AfterRollback()
         {
-            lock(this.synchronizations.SyncRoot)
+            try
             {
-                foreach(ISynchronization synchronization in this.synchronizations)
+                lock(this.synchronizations.SyncRoot)
+                {
+                    foreach(ISynchronization synchronization in this.synchronizations)
+                    {
+                        synchronization.AfterRollback();
+                    }
+                }
+            }
+            finally
+            {
+                synchronizations.Clear();
+            }
+        }
+
+        #region Transaction Members used when dealing with .NET System Transactions.
+
+        public bool InNetTransaction
+        {
+            get{ return this.currentEnlistment != null; }
+        }
+
+        public void Begin(Transaction transaction)
+        {
+            Tracer.Debug("Begin notification received");
+
+            if(InNetTransaction)
+            {
+                throw new TransactionInProgressException("A Transaction is already in Progress");
+            }
+
+            // Enlist this object in the transaction.
+            this.currentEnlistment =
+                transaction.EnlistVolatile(this, EnlistmentOptions.None);
+
+            System.Transactions.TransactionInformation txInfo = transaction.TransactionInformation;
+
+            XATransactionId xaId = new XATransactionId();
+            this.transactionId = xaId;
+
+            if(txInfo.DistributedIdentifier != Guid.Empty)
+            {
+                xaId.GlobalTransactionId = txInfo.DistributedIdentifier.ToByteArray();
+                xaId.BranchQualifier = Encoding.UTF8.GetBytes(txInfo.LocalIdentifier);
+            }
+            else
+            {
+                xaId.GlobalTransactionId = Encoding.UTF8.GetBytes(txInfo.LocalIdentifier);
+                xaId.BranchQualifier = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
+            }
+
+            // Now notify the broker that a new XA'ish transaction has started.
+            TransactionInfo info = new TransactionInfo();
+            info.ConnectionId = this.session.Connection.ConnectionId;
+            info.TransactionId = this.transactionId;
+            info.Type = (int) TransactionType.Begin;
+
+            this.session.Connection.Oneway(info);
+
+            if(Tracer.IsDebugEnabled)
+            {
+                Tracer.Debug("Begin XA'ish Transaction:" + xaId.GlobalTransactionId.ToString());
+            }
+        }
+
+        public void Prepare(PreparingEnlistment preparingEnlistment)
+        {
+            Tracer.Debug("Prepare notification received");
+
+            // Now notify the broker that a new XA'ish transaction has started.
+            TransactionInfo info = new TransactionInfo();
+            info.ConnectionId = this.session.Connection.ConnectionId;
+            info.TransactionId = this.transactionId;
+
+            try
+            {
+                BeforeEnd();
+
+                // End the current branch
+                info.Type = (int) TransactionType.End;
+                this.connection.SyncRequest(info);
+
+                // Prepare the Transaction for commit.
+                info.Type = (int) TransactionType.Prepare;
+                IntegerResponse response = (IntegerResponse) this.connection.SyncRequest(info);
+                if(response.Result == XA_READONLY)
+                {
+                    Tracer.Debug("Transaction Prepare Reports Done: ");
+
+                    // if server responds that nothing needs to be done, then reply prepared
+                    // but clear the current state data so we appear done to the commit method.
+                    preparingEnlistment.Prepared();
+
+                    this.transactionId = null;
+                    this.currentEnlistment = null;
+
+                    // Done so commit won't be called.
+                    AfterCommit();
+                }
+                else
+                {
+                    Tracer.Debug("Transaction Prepare finished Successfully: ");
+
+                    // If work finished correctly, reply prepared
+                    preparingEnlistment.Prepared();
+                }
+            }
+            catch(Exception ex)
+            {
+                Tracer.Debug("Transaction Prepare failed with error: " + ex.Message);
+                AfterRollback();
+                preparingEnlistment.ForceRollback();
+            }
+        }
+
+        public void Commit(Enlistment enlistment)
+        {
+            Tracer.Debug("Commit notification received");
+
+            try
+            {
+                if(this.transactionId != null)
                 {
-                    synchronization.AfterRollback();
+                    // Now notify the broker that a new XA'ish transaction has started.
+                    TransactionInfo info = new TransactionInfo();
+                    info.ConnectionId = this.session.Connection.ConnectionId;
+                    info.TransactionId = this.transactionId;
+                    info.Type = (int) TransactionType.CommitOnePhase;
+
+                    this.connection.CheckConnected();
+                    this.connection.SyncRequest(info);
+
+                    Tracer.Debug("Transaction Commit Reports Done: ");
+
+                    // if server responds that nothing needs to be done, then reply done.
+                    enlistment.Done();
+
+                    AfterCommit();
                 }
             }
-        }        
+            catch(Exception ex)
+            {
+                Tracer.Debug("Transaction Commit failed with error: " + ex.Message);
+                AfterRollback();
+                enlistment.Done();
+                throw;
+            }
+            finally
+            {
+                this.currentEnlistment = null;
+                this.transactionId = null;
+            }
+        }
+
+        public void Rollback(Enlistment enlistment)
+        {
+            Tracer.Debug("Rollback notification received");
+
+            // Now notify the broker that a new XA'ish transaction has started.
+            TransactionInfo info = new TransactionInfo();
+            info.ConnectionId = this.session.Connection.ConnectionId;
+            info.TransactionId = this.transactionId;
+
+            try
+            {
+                BeforeEnd();
+
+                info.Type = (int) TransactionType.End;
+                this.connection.SyncRequest(info);
+
+                info.Type = (int) TransactionType.Rollback;
+                this.connection.CheckConnected();
+                this.connection.SyncRequest(info);
+
+                Tracer.Debug("Transaction Rollback Reports Done: ");
+
+                // if server responds that nothing needs to be done, then reply done.
+                enlistment.Done();
+
+                AfterRollback();
+            }
+            catch(Exception ex)
+            {
+                Tracer.Debug("Transaction Rollback failed with error: " + ex.Message);
+                AfterRollback();
+                enlistment.Done();
+                throw;
+            }
+            finally
+            {
+                this.currentEnlistment = null;
+                this.transactionId = null;
+            }
+        }
+
+        public void InDoubt(Enlistment enlistment)
+        {
+            Tracer.Debug("In doubt notification received");
+
+            try
+            {
+                // Now notify the broker that it should forget this TX.
+                TransactionInfo info = new TransactionInfo();
+                info.ConnectionId = this.session.Connection.ConnectionId;
+                info.TransactionId = this.transactionId;
+                info.Type = (int) TransactionType.Forget;
+    
+                //Declare done on the enlistment
+                enlistment.Done();
+            }
+            finally
+            {
+                this.currentEnlistment = null;
+                this.transactionId = null;
+            }
+        }
+
+        #endregion
+
+        private Guid GuidFromId(string id)
+        {
+            // Remove the ID: prefix, that's non-unique to be sure
+            string resId = id.TrimStart("ID:".ToCharArray());
+
+            // Remaing parts should be host-port-timestamp-instance:sequence
+            string[] parts = resId.Split(":-".ToCharArray());
+
+            // We don't use the hostname here, just the remaining bits.
+            int a = Int32.Parse(parts[1]);
+            short b = Int16.Parse(parts[3]);
+            short c = Int16.Parse(parts[4]);
+            byte[] d = System.BitConverter.GetBytes(Int64.Parse(parts[2]));
+
+            return new Guid(a, b, c, d);
+        }
+
+        private string IdFromGuid(Guid guid)
+        {
+            byte[] bytes = guid.ToByteArray();
+
+            int port = System.BitConverter.ToInt32(bytes, 0);
+            int instance = System.BitConverter.ToInt16(bytes, 4);
+            int sequence = System.BitConverter.ToInt16(bytes, 6);
+            long timestamp = System.BitConverter.ToInt64(bytes, 8);
+
+            StringBuilder builder = new StringBuilder("ID:");
+
+            string hostname = "localhost";
+
+            try
+            {
+                hostname = Dns.GetHostName();
+            }
+            catch
+            {
+            }
+
+            builder.Append(hostname);
+            builder.Append("-");
+            builder.Append(port);
+            builder.Append("-");
+            builder.Append(timestamp);
+            builder.Append("-");
+            builder.Append(instance);
+            builder.Append(":");
+            builder.Append(sequence);
+
+            return builder.ToString();
+        }
     }
 }



Mime
View raw message