Author: jgomes
Date: Thu Oct 20 22:44:28 2011
New Revision: 1187115
URL: http://svn.apache.org/viewvc?rev=1187115&view=rev
Log:
Merged revision(s) 1186568 from activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x:
Add locking around iterating over recoveringPullConsumers. Add check for pre-existing consumer
ID to avoid exception throw while recovering.
........
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/ (props changed)
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct 20 22:44:28 2011
@@ -1,3 +1,3 @@
-/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x:1082291,1135831,1137081,1171843,1171874,1177390,1177395
+/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x:1082291,1135831,1137081,1171843,1171874,1177390,1177395,1186568
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/tags/1.0.0:692591,693525
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/tags/1.1.0:788230,788233,790183
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs?rev=1187115&r1=1187114&r2=1187115&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs
Thu Oct 20 22:44:28 2011
@@ -31,8 +31,7 @@ namespace Apache.NMS.ActiveMQ.State
private readonly AtomicCollection<DestinationInfo> tempDestinations = new AtomicCollection<DestinationInfo>();
private readonly Atomic<bool> _shutdown = new Atomic<bool>(false);
private bool connectionInterruptProcessingComplete = true;
- private readonly Dictionary<ConsumerId, ConsumerInfo> recoveringPullConsumers =
- new Dictionary<ConsumerId, ConsumerInfo>();
+ private readonly Dictionary<ConsumerId, ConsumerInfo> recoveringPullConsumers = new
Dictionary<ConsumerId, ConsumerInfo>();
public ConnectionState(ConnectionInfo info)
{
@@ -179,7 +178,7 @@ namespace Apache.NMS.ActiveMQ.State
throw new ApplicationException("Disposed");
}
}
-
+
public Dictionary<ConsumerId, ConsumerInfo> RecoveringPullConsumers
{
get { return this.recoveringPullConsumers; }
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs?rev=1187115&r1=1187114&r2=1187115&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
Thu Oct 20 22:44:28 2011
@@ -19,6 +19,7 @@ using System;
using System.Collections.Generic;
using Apache.NMS.ActiveMQ.Commands;
using Apache.NMS.ActiveMQ.Transport;
+using System.Collections;
namespace Apache.NMS.ActiveMQ.State
{
@@ -198,7 +199,13 @@ namespace Apache.NMS.ActiveMQ.State
if(!connectionInterruptionProcessingComplete && infoToSend.PrefetchSize
> 0 && transport.WireFormat.Version > 5)
{
infoToSend = consumerState.Info.Clone() as ConsumerInfo;
- connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info);
+ lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot)
+ {
+ if(!connectionState.RecoveringPullConsumers.ContainsKey(infoToSend.ConsumerId))
+ {
+ connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info);
+ }
+ }
infoToSend.PrefetchSize = 0;
if(Tracer.IsDebugEnabled)
{
@@ -710,32 +717,34 @@ namespace Apache.NMS.ActiveMQ.State
{
connectionState.ConnectionInterruptProcessingComplete = true;
- Dictionary<ConsumerId, ConsumerInfo> stalledConsumers = connectionState.RecoveringPullConsumers;
- foreach(KeyValuePair<ConsumerId, ConsumerInfo> entry in stalledConsumers)
- {
- ConsumerControl control = new ConsumerControl();
- control.ConsumerId = entry.Key;
- control.Prefetch = entry.Value.PrefetchSize;
- control.Destination = entry.Value.Destination;
- try
- {
- if(Tracer.IsDebugEnabled)
- {
- Tracer.Debug("restored recovering consumer: " + control.ConsumerId
+
- " with: " + control.Prefetch);
- }
- transport.Oneway(control);
- }
- catch(Exception ex)
- {
- if(Tracer.IsDebugEnabled)
- {
- Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId
+
- " with: " + control.Prefetch + "Error: " + ex.Message);
- }
- }
- }
- stalledConsumers.Clear();
+ lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot)
+ {
+ foreach(KeyValuePair<ConsumerId, ConsumerInfo> entry in connectionState.RecoveringPullConsumers)
+ {
+ ConsumerControl control = new ConsumerControl();
+ control.ConsumerId = entry.Key;
+ control.Prefetch = entry.Value.PrefetchSize;
+ control.Destination = entry.Value.Destination;
+ try
+ {
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("restored recovering consumer: " + control.ConsumerId +
+ " with: " + control.Prefetch);
+ }
+ transport.Oneway(control);
+ }
+ catch(Exception ex)
+ {
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId +
+ " with: " + control.Prefetch + "Error: " + ex.Message);
+ }
+ }
+ }
+ connectionState.RecoveringPullConsumers.Clear();
+ }
}
}
|