activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jgo...@apache.org
Subject svn commit: r1187115 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk: ./ src/main/csharp/State/ConnectionState.cs src/main/csharp/State/ConnectionStateTracker.cs
Date Thu, 20 Oct 2011 22:44:28 GMT
Author: jgomes
Date: Thu Oct 20 22:44:28 2011
New Revision: 1187115

URL: http://svn.apache.org/viewvc?rev=1187115&view=rev
Log:
Merged revision(s) 1186568 from activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x:
Add locking around iterating over recoveringPullConsumers.  Add check for pre-existing consumer
ID to avoid exception throw while recovering.
........

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/   (props changed)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct 20 22:44:28 2011
@@ -1,3 +1,3 @@
-/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x:1082291,1135831,1137081,1171843,1171874,1177390,1177395
+/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x:1082291,1135831,1137081,1171843,1171874,1177390,1177395,1186568
 /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/tags/1.0.0:692591,693525
 /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/tags/1.1.0:788230,788233,790183

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs?rev=1187115&r1=1187114&r2=1187115&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs
Thu Oct 20 22:44:28 2011
@@ -31,8 +31,7 @@ namespace Apache.NMS.ActiveMQ.State
 		private readonly AtomicCollection<DestinationInfo> tempDestinations = new AtomicCollection<DestinationInfo>();
 		private readonly Atomic<bool> _shutdown = new Atomic<bool>(false);
 	    private bool connectionInterruptProcessingComplete = true;
-		private readonly Dictionary<ConsumerId, ConsumerInfo> recoveringPullConsumers = 
-			new Dictionary<ConsumerId, ConsumerInfo>();
+		private readonly Dictionary<ConsumerId, ConsumerInfo> recoveringPullConsumers = new
Dictionary<ConsumerId, ConsumerInfo>();
 
 		public ConnectionState(ConnectionInfo info)
 		{
@@ -179,7 +178,7 @@ namespace Apache.NMS.ActiveMQ.State
 				throw new ApplicationException("Disposed");
 			}
 		}
-		
+
 		public Dictionary<ConsumerId, ConsumerInfo> RecoveringPullConsumers
 		{
 			get { return this.recoveringPullConsumers; }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs?rev=1187115&r1=1187114&r2=1187115&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
Thu Oct 20 22:44:28 2011
@@ -19,6 +19,7 @@ using System;
 using System.Collections.Generic;
 using Apache.NMS.ActiveMQ.Commands;
 using Apache.NMS.ActiveMQ.Transport;
+using System.Collections;
 
 namespace Apache.NMS.ActiveMQ.State
 {
@@ -198,7 +199,13 @@ namespace Apache.NMS.ActiveMQ.State
                 if(!connectionInterruptionProcessingComplete && infoToSend.PrefetchSize
> 0 && transport.WireFormat.Version > 5)
                 {
                     infoToSend = consumerState.Info.Clone() as ConsumerInfo;
-					connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info);
+					lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot)
+					{
+						if(!connectionState.RecoveringPullConsumers.ContainsKey(infoToSend.ConsumerId))
+						{
+							connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info);
+						}
+					}
                     infoToSend.PrefetchSize = 0;
                     if(Tracer.IsDebugEnabled)
                     {
@@ -710,32 +717,34 @@ namespace Apache.NMS.ActiveMQ.State
             {
                 connectionState.ConnectionInterruptProcessingComplete = true;
 
-                Dictionary<ConsumerId, ConsumerInfo> stalledConsumers = connectionState.RecoveringPullConsumers;
-                foreach(KeyValuePair<ConsumerId, ConsumerInfo> entry in stalledConsumers)
-                {
-                    ConsumerControl control = new ConsumerControl();
-                    control.ConsumerId = entry.Key;
-                    control.Prefetch = entry.Value.PrefetchSize;
-                    control.Destination = entry.Value.Destination;
-                    try
-                    {
-                        if(Tracer.IsDebugEnabled)
-                        {
-                            Tracer.Debug("restored recovering consumer: " + control.ConsumerId
+
-                                         " with: " + control.Prefetch);
-                        }
-                        transport.Oneway(control);
-                    }
-                    catch(Exception ex)
-                    {
-                        if(Tracer.IsDebugEnabled)
-                        {
-                            Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId
+
-                                         " with: " + control.Prefetch + "Error: " + ex.Message);
-                        }
-                    }
-                }
-                stalledConsumers.Clear();
+				lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot)
+				{
+					foreach(KeyValuePair<ConsumerId, ConsumerInfo> entry in connectionState.RecoveringPullConsumers)
+					{
+						ConsumerControl control = new ConsumerControl();
+						control.ConsumerId = entry.Key;
+						control.Prefetch = entry.Value.PrefetchSize;
+						control.Destination = entry.Value.Destination;
+						try
+						{
+							if(Tracer.IsDebugEnabled)
+							{
+								Tracer.Debug("restored recovering consumer: " + control.ConsumerId +
+											 " with: " + control.Prefetch);
+							}
+							transport.Oneway(control);
+						}
+						catch(Exception ex)
+						{
+							if(Tracer.IsDebugEnabled)
+							{
+								Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId +
+											 " with: " + control.Prefetch + "Error: " + ex.Message);
+							}
+						}
+					}
+					connectionState.RecoveringPullConsumers.Clear();
+				}
             }
         }
 



Mime
View raw message