activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r830743 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp: MessageConsumer.cs Util/MessageDispatchChannel.cs
Date Wed, 28 Oct 2009 19:50:20 GMT
Author: tabish
Date: Wed Oct 28 19:50:20 2009
New Revision: 830743

URL: http://svn.apache.org/viewvc?rev=830743&view=rev
Log:
* MessageConsumer.cs:
* Util/MessageDispatchChannel.cs: 

Fix a race on Close where an ack could be missed when an async listener is processing a message

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=830743&r1=830742&r2=830743&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
Wed Oct 28 19:50:20 2009
@@ -38,8 +38,6 @@
 	/// </summary>
 	public class MessageConsumer : IMessageConsumer, IDispatcher
 	{
-        private object closedLock = new object();
-
         private readonly MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
         private readonly LinkedList<MessageDispatch> dispatchedMessages = new LinkedList<MessageDispatch>();
         private readonly ConsumerInfo info;
@@ -50,7 +48,6 @@
         private Atomic<bool> started = new Atomic<bool>();
         private Atomic<bool> deliveringAcks = new Atomic<bool>();
 
-        private bool closed = false;
 		private int maximumRedeliveryCount = 10;
 		private int redeliveryTimeout = 500;
 		protected bool disposed = false;
@@ -263,40 +260,37 @@
 
         internal void DoClose()
         {
-            lock(this.closedLock)
-            {
-                if(!this.unconsumedMessages.Closed)
-                {                    
-                    // Do we have any acks we need to send out before closing?
-                    // Ack any delivered messages now.
-                    if(!this.session.IsTransacted) 
+            if(!this.unconsumedMessages.Closed)
+            {                    
+                // Do we have any acks we need to send out before closing?
+                // Ack any delivered messages now.
+                if(!this.session.IsTransacted) 
+                {
+                    DeliverAcks();
+                    if(this.IsAutoAcknowledgeBatch)
                     {
-                        DeliverAcks();
-                        if(this.IsAutoAcknowledgeBatch)
-                        {
-                            Acknowledge();
-                        }
+                        Acknowledge();
                     }
-                    
-                    if(!this.session.IsTransacted)
+                }
+                
+                if(!this.session.IsTransacted)
+                {
+                    lock(this.dispatchedMessages)
                     {
-                        lock(this.dispatchedMessages)
-                        {
-                            dispatchedMessages.Clear();
-                        }
+                        dispatchedMessages.Clear();
                     }
-                    
-                    this.unconsumedMessages.Close();
-                    this.session.DisposeOf(this.info.ConsumerId, this.lastDeliveredSequenceId);
-
-                    RemoveInfo removeCommand = new RemoveInfo();
-                    removeCommand.ObjectId = this.info.ConsumerId;
-                    removeCommand.LastDeliveredSequenceId = this.lastDeliveredSequenceId;
-                    
-                    this.session.Connection.Oneway(removeCommand);
-                    this.session = null;
                 }
-            }            
+                
+                this.unconsumedMessages.Close();
+                this.session.DisposeOf(this.info.ConsumerId, this.lastDeliveredSequenceId);
+
+                RemoveInfo removeCommand = new RemoveInfo();
+                removeCommand.ObjectId = this.info.ConsumerId;
+                removeCommand.LastDeliveredSequenceId = this.lastDeliveredSequenceId;
+                
+                this.session.Connection.Oneway(removeCommand);
+                this.session = null;
+            }
         }
 
 		#endregion
@@ -312,15 +306,7 @@
 				messagePull.ResponseRequired = false;
 
 				Tracer.Debug("Sending MessagePull: " + messagePull);
-				lock(closedLock)
-				{
-					if(closed)
-					{
-						throw new ConnectionClosedException();
-					}
-
-					session.Connection.Oneway(messagePull);
-				}
+			    session.Connection.Oneway(messagePull);
 			}
 		}
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs?rev=830743&r1=830742&r2=830743&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
Wed Oct 28 19:50:20 2009
@@ -28,14 +28,10 @@
     public class MessageDispatchChannel
     {
         private readonly Mutex mutex = new Mutex();
-        private Atomic<bool> closed = new Atomic<bool>();
-        private Atomic<bool> running = new Atomic<bool>();
+        private bool closed;
+        private bool running;
         private LinkedList<MessageDispatch> channel = new LinkedList<MessageDispatch>();
         
-        public MessageDispatchChannel()
-        {
-        }
-
         #region Properties
 
         public object SyncRoot
@@ -45,14 +41,40 @@
         
         public bool Closed
         {
-            get{ return this.closed.Value; }
-            set{ this.closed.Value = value; }
+            get 
+            {
+                lock(this.mutex)
+                {
+                    return this.closed; 
+                }
+            }
+            
+            set 
+            {
+                lock(this.mutex)
+                {
+                    this.closed = value;
+                }
+            }
         }
 
         public bool Running
         {
-            get{ return this.running.Value; }
-            set{ this.running.Value = value; }
+            get
+            {
+                lock(this.mutex)
+                {
+                    return this.running;
+                }
+            }
+            
+            set
+            {
+                lock(this.mutex)
+                {
+                    this.running = value;
+                }
+            }
         }
 
         public bool Empty
@@ -85,7 +107,7 @@
             {
                 if(!Closed)
                 {
-                    this.running.Value = true;
+                    this.running = true;
                     Monitor.PulseAll(this.mutex);
                 }
             }
@@ -95,7 +117,7 @@
         {
             lock(mutex)
             {
-                this.running.Value = false;
+                this.running = false;
                 Monitor.PulseAll(this.mutex);
             }
         }
@@ -106,8 +128,8 @@
             {
                 if(!Closed)
                 {
-                    this.running.Value = false;
-                    this.closed.Value = true;
+                    this.running = false;
+                    this.closed = true;
                 }          
 
                 Monitor.PulseAll(this.mutex);



Mime
View raw message