activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1483064 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp: MessageConsumer.cs NetTxSession.cs TransactionContext.cs
Date Wed, 15 May 2013 20:13:29 GMT
Author: tabish
Date: Wed May 15 20:13:28 2013
New Revision: 1483064

URL: http://svn.apache.org/r1483064
Log:
apply fix for: https://issues.apache.org/jira/browse/AMQNET-421

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1483064&r1=1483063&r2=1483064&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
Wed May 15 20:13:28 2013
@@ -39,11 +39,11 @@ namespace Apache.NMS.ActiveMQ
 	/// </summary>
 	public class MessageConsumer : IMessageConsumer, IDispatcher
 	{
-		private readonly MessageTransformation messageTransformation;
-		private readonly MessageDispatchChannel unconsumedMessages;
-		private readonly LinkedList<MessageDispatch> dispatchedMessages = new LinkedList<MessageDispatch>();
-		private readonly ConsumerInfo info;
-		private readonly Session session;
+        private readonly MessageTransformation messageTransformation;
+        private readonly MessageDispatchChannel unconsumedMessages;
+        private readonly LinkedList<MessageDispatch> dispatchedMessages = new LinkedList<MessageDispatch>();
+        private readonly ConsumerInfo info;
+        private readonly Session session;
 
 		private MessageAck pendingAck = null;
 
@@ -434,7 +434,7 @@ namespace Apache.NMS.ActiveMQ
 			disposed = true;
 		}
 
-		public void Close()
+		public virtual void Close()
 		{
 			if(!this.unconsumedMessages.Closed)
 			{
@@ -1029,25 +1029,25 @@ namespace Apache.NMS.ActiveMQ
 		{
 			this.lastDeliveredSequenceId = dispatch.Message.MessageId.BrokerSequenceId;
 
-			if(!IsAutoAcknowledgeBatch)
+			if (!IsAutoAcknowledgeBatch)
 			{
                 if (this.session.IsTransacted)
                 {
-                    this.session.TransactionContext.SyncRoot.WaitOne();
-
-                    // In the case where the consumer is operating in concert with a
-                    // distributed TX manager we need to wait whenever the TX is being
-                    // controlled by the DTC as it completes all operations async and
-                    // we cannot start consumption again until all its tasks have completed.)
-                    if (this.session.TransactionContext.InNetTransaction && 
-                        this.session.TransactionContext.NetTxState == TransactionContext.TxState.Pending)
+                    bool waitForDtcWaitHandle = false;
+                    lock (this.session.TransactionContext.SyncRoot)
                     {
-                        this.session.TransactionContext.SyncRoot.ReleaseMutex();
-                        this.session.TransactionContext.DtcWaitHandle.WaitOne();        
               
+                        // In the case where the consumer is operating in concert with a
+                        // distributed TX manager we need to wait whenever the TX is being
+                        // controlled by the DTC as it completes all operations async and
+                        // we cannot start consumption again until all its tasks have completed.)
+                        waitForDtcWaitHandle = this.session.TransactionContext.InNetTransaction
&&
+                                               this.session.TransactionContext.NetTxState
==
+                                               TransactionContext.TxState.Pending;
                     }
-                    else
+
+                    if (waitForDtcWaitHandle)
                     {
-                        this.session.TransactionContext.SyncRoot.ReleaseMutex();
+                        this.session.TransactionContext.DtcWaitHandle.WaitOne();
                     }
                 }                
 
@@ -1623,6 +1623,11 @@ namespace Apache.NMS.ActiveMQ
             return this.info.Destination.Equals(dest);
         }
 
+	    internal bool Closed
+	    {
+            get { return this.unconsumedMessages.Closed; }
+	    }
+
 	    private void DoOptimizedAck(object state)
 		{
 			if (this.optimizeAcknowledge && !this.unconsumedMessages.Closed)

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs?rev=1483064&r1=1483063&r2=1483064&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs Wed
May 15 20:13:28 2013
@@ -75,18 +75,17 @@ namespace Apache.NMS.ActiveMQ
             {
                 if (TransactionContext.InNetTransaction)
                 {
-                    TransactionContext.SyncRoot.WaitOne();
-
-                    if (TransactionContext.InNetTransaction)
+                    lock (TransactionContext.SyncRoot)
                     {
-                        // Must wait for all the DTC operations to complete before
-                        // moving on from this close call.
-                        TransactionContext.SyncRoot.ReleaseMutex();
-                        this.TransactionContext.DtcWaitHandle.WaitOne();
-                        TransactionContext.SyncRoot.WaitOne();
+                        if (TransactionContext.InNetTransaction)
+                        {
+                            // Must wait for all the DTC operations to complete before
+                            // moving on from this close call.
+                            Monitor.Exit(TransactionContext.SyncRoot);
+                            this.TransactionContext.DtcWaitHandle.WaitOne();
+                            Monitor.Enter(TransactionContext.SyncRoot);
+                        }
                     }
-
-                    TransactionContext.SyncRoot.ReleaseMutex();
                 }
 
                 base.Close();
@@ -111,24 +110,23 @@ namespace Apache.NMS.ActiveMQ
 
         internal override void DoStartTransaction()
         {
-            TransactionContext.SyncRoot.WaitOne();
-
-            if (TransactionContext.InNetTransaction && TransactionContext.NetTxState
== TransactionContext.TxState.Pending)
-            {
-                // To late to participate in this TX, we have to wait for it to complete
then
-                // we can create a new TX and start from there.
-                TransactionContext.SyncRoot.ReleaseMutex();
-                TransactionContext.DtcWaitHandle.WaitOne();
-                TransactionContext.SyncRoot.WaitOne();
-            }
-
-            if (!TransactionContext.InNetTransaction && Transaction.Current != null)
+            lock (TransactionContext.SyncRoot)
             {
-                Tracer.Debug("NetTxSession detected Ambient Transaction, start new TX with
broker");
-                EnrollInSpecifiedTransaction(Transaction.Current);
+                if (TransactionContext.InNetTransaction && TransactionContext.NetTxState
== TransactionContext.TxState.Pending)
+                {
+                    // To late to participate in this TX, we have to wait for it to complete
then
+                    // we can create a new TX and start from there.
+                    Monitor.Exit(TransactionContext.SyncRoot);
+                    TransactionContext.DtcWaitHandle.WaitOne();
+                    Monitor.Enter(TransactionContext.SyncRoot);
+                }
+ 
+                if (!TransactionContext.InNetTransaction && Transaction.Current !=
null)
+                {
+                    Tracer.Debug("NetTxSession detected Ambient Transaction, start new TX
with broker");
+                    EnrollInSpecifiedTransaction(Transaction.Current);
+                }
             }
-
-            TransactionContext.SyncRoot.ReleaseMutex();
         }
 
         private void EnrollInSpecifiedTransaction(Transaction tx)

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs?rev=1483064&r1=1483063&r2=1483064&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
Wed May 15 20:13:28 2013
@@ -229,7 +229,7 @@ namespace Apache.NMS.ActiveMQ
         // Once the DTC calls prepare we lock this object and don't unlock it again until
         // the TX has either completed or terminated, the users of this class should use
         // this sync point when the TX is a DTC version as opposed to a local one.
-        private readonly Mutex syncObject = new Mutex();
+        private readonly object syncObject = new Mutex();
 
 	    public enum TxState
 	    {
@@ -238,7 +238,7 @@ namespace Apache.NMS.ActiveMQ
 
 	    private TxState netTxState = TxState.None;
 
-        public Mutex SyncRoot
+        public object SyncRoot
 	    {
             get { return this.syncObject; }
 	    }



Mime
View raw message