activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [25/50] [abbrv] activemq-nms-stomp git commit: Refactor the CheckConnected function to handle multiple threads attempting to check connection status against an offline broker. Guard against unwanted exceptions being thrown when indexing into a connectio
Date Mon, 06 Mar 2017 23:29:49 GMT
Refactor the CheckConnected function to handle multiple threads attempting to check connection
status against an offline broker.  Guard against unwanted exceptions being thrown when indexing
into a connection state array that has not been fully set up because the broker is offline.

Fixes [AMQNET-331]. (See https://issues.apache.org/jira/browse/AMQNET-331)



Project: http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/commit/3d2d37af
Tree: http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/tree/3d2d37af
Diff: http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/diff/3d2d37af

Branch: refs/heads/1.5.x
Commit: 3d2d37af432882cbc44713d46c1acf4cc8208a01
Parents: 67916c2
Author: Jim Gomes <jgomes@apache.org>
Authored: Fri Jun 17 23:58:06 2011 +0000
Committer: Jim Gomes <jgomes@apache.org>
Committed: Fri Jun 17 23:58:06 2011 +0000

----------------------------------------------------------------------
 src/main/csharp/Connection.cs                   | 71 +++++++++++++++-----
 src/main/csharp/State/ConnectionState.cs        | 38 +++++------
 src/main/csharp/State/ConnectionStateTracker.cs | 23 ++++---
 src/main/csharp/State/SynchronizedObjects.cs    |  8 +++
 src/main/csharp/Transport/MutexTransport.cs     |  6 +-
 5 files changed, 99 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/3d2d37af/src/main/csharp/Connection.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Connection.cs b/src/main/csharp/Connection.cs
index ffaf180..4ca6aa5 100755
--- a/src/main/csharp/Connection.cs
+++ b/src/main/csharp/Connection.cs
@@ -501,14 +501,7 @@ namespace Apache.NMS.Stomp
 
         public Response SyncRequest(Command command)
         {
-            try
-            {
-                return SyncRequest(command, this.RequestTimeout);
-            }
-            catch(Exception ex)
-            {
-                throw NMSExceptionSupport.Create(ex);
-            }
+            return SyncRequest(command, this.RequestTimeout);
         }
 
         public Response SyncRequest(Command command, TimeSpan requestTimeout)
@@ -546,7 +539,13 @@ namespace Apache.NMS.Stomp
             }
         }
 
-        protected void CheckConnected()
+        private object checkConnectedLock = new object();
+
+        /// <summary>
+        /// Check and ensure that the connection objcet is connected.  If it is not
+        /// connected or is closed, a ConnectionClosedException is thrown.
+        /// </summary>
+        internal void CheckConnected()
         {
             if(closed.Value)
             {
@@ -555,17 +554,57 @@ namespace Apache.NMS.Stomp
 
             if(!connected.Value)
             {
-                if(!this.userSpecifiedClientID)
+                DateTime timeoutTime = DateTime.Now + this.RequestTimeout;
+                int waitCount = 1;
+
+                while(true)
                 {
-                    this.info.ClientId = this.clientIdGenerator.GenerateId();
+                    if(Monitor.TryEnter(checkConnectedLock))
+                    {
+                        try
+                        {
+                            if(!connected.Value)
+                            {
+                                if(!this.userSpecifiedClientID)
+                                {
+                                    this.info.ClientId = this.clientIdGenerator.GenerateId();
+                                }
+
+                                try
+                                {
+                                    if(null != transport)
+                                    {
+                                        // Send the connection and see if an ack/nak is returned.
+                                        Response response = transport.Request(this.info,
this.RequestTimeout);
+                                        if(!(response is ExceptionResponse))
+                                        {
+                                            connected.Value = true;
+                                        }
+                                    }
+                                }
+                                catch
+                                {
+                                }
+                            }
+                        }
+                        finally
+                        {
+                            Monitor.Exit(checkConnectedLock);
+                        }
+                    }
+
+                    if(connected.Value || DateTime.Now > timeoutTime)
+                    {
+                        break;
+                    }
+
+                    // Back off from being overly aggressive.  Having too many threads
+                    // aggressively trying to connect to a down broker pegs the CPU.
+                    Thread.Sleep(5 * (waitCount++));
                 }
 
-                connected.Value = true;
-                // now lets send the connection and see if we get an ack/nak
-                if(null == SyncRequest(info))
+                if(!connected.Value)
                 {
-                    closed.Value = true;
-                    connected.Value = false;
                     throw new ConnectionClosedException();
                 }
             }

http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/3d2d37af/src/main/csharp/State/ConnectionState.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/State/ConnectionState.cs b/src/main/csharp/State/ConnectionState.cs
index d568a47..c39d67f 100644
--- a/src/main/csharp/State/ConnectionState.cs
+++ b/src/main/csharp/State/ConnectionState.cs
@@ -25,8 +25,7 @@ namespace Apache.NMS.Stomp.State
 	{
 
 		ConnectionInfo info;
-		private readonly AtomicDictionary<ConsumerId, ConsumerState> consumers =
-            new AtomicDictionary<ConsumerId, ConsumerState>();
+		private readonly AtomicDictionary<ConsumerId, ConsumerState> consumers = new AtomicDictionary<ConsumerId,
ConsumerState>();
 		private readonly Atomic<bool> _shutdown = new Atomic<bool>(false);
 
 		public ConnectionState(ConnectionInfo info)
@@ -49,26 +48,25 @@ namespace Apache.NMS.Stomp.State
 		{
 			get
 			{
-				#if DEBUG
-				try
+				ConsumerState consumerState;
+				
+				if(consumers.TryGetValue(id, out consumerState))
 				{
-				#endif
-					return consumers[id];
-				#if DEBUG
+					return consumerState;
 				}
-				catch(System.Collections.Generic.KeyNotFoundException ex)
+				
+#if DEBUG
+				// Useful for dignosing missing consumer ids
+				string consumerList = string.Empty;
+				foreach(ConsumerId consumerId in consumers.Keys)
 				{
-					// Useful for dignosing missing consumer ids
-					string consumerList = string.Empty;
-					foreach(ConsumerId consumerId in consumers.Keys)
-					{
-						consumerList += consumerId.ToString() + "\n";
-					}
-					System.Diagnostics.Debug.Assert(false,
-						string.Format("Consumer '{0}' did not exist in the consumers collection.\n\nConsumers:-\n{1}",
id, consumerList));
-					throw ex;
+					consumerList += consumerId.ToString() + "\n";
 				}
-				#endif
+				
+				System.Diagnostics.Debug.Assert(false,
+					string.Format("Consumer '{0}' did not exist in the consumers collection.\n\nConsumers:-\n{1}",
id, consumerList));
+#endif
+				return null;
 			}
 		}
 
@@ -80,7 +78,9 @@ namespace Apache.NMS.Stomp.State
 
 		public ConsumerState removeConsumer(ConsumerId id)
 		{
-			ConsumerState ret = consumers[id];
+			ConsumerState ret = null;
+			
+			consumers.TryGetValue(id, out ret);
 			consumers.Remove(id);
 			return ret;
 		}

http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/3d2d37af/src/main/csharp/State/ConnectionStateTracker.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/State/ConnectionStateTracker.cs b/src/main/csharp/State/ConnectionStateTracker.cs
index d9dbc9d..ccf76a8 100644
--- a/src/main/csharp/State/ConnectionStateTracker.cs
+++ b/src/main/csharp/State/ConnectionStateTracker.cs
@@ -31,8 +31,7 @@ namespace Apache.NMS.Stomp.State
 	{
 		private static readonly Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
 
-		protected Dictionary<ConnectionId, ConnectionState> connectionStates =
-            new Dictionary<ConnectionId, ConnectionState>();
+		protected Dictionary<ConnectionId, ConnectionState> connectionStates = new Dictionary<ConnectionId,
ConnectionState>();
 
 		private bool _restoreConsumers = true;
 
@@ -67,10 +66,10 @@ namespace Apache.NMS.Stomp.State
 			{
 				transport.Oneway(connectionState.Info);
 
-                if(RestoreConsumers)
-                {
-                    DoRestoreConsumers(transport, connectionState);
-                }
+				if(RestoreConsumers)
+				{
+					DoRestoreConsumers(transport, connectionState);
+				}
 			}
 		}
 
@@ -97,10 +96,11 @@ namespace Apache.NMS.Stomp.State
 					ConnectionId connectionId = sessionId.ParentId;
 					if(connectionId != null)
 					{
-						ConnectionState cs = connectionStates[connectionId];
-						if(cs != null)
+						ConnectionState cs = null;
+						
+						if(connectionStates.TryGetValue(connectionId, out cs))
 						{
-						    cs.addConsumer(info);
+							cs.addConsumer(info);
 						}
 					}
 				}
@@ -118,8 +118,9 @@ namespace Apache.NMS.Stomp.State
 					ConnectionId connectionId = sessionId.ParentId;
 					if(connectionId != null)
 					{
-						ConnectionState cs = connectionStates[connectionId];
-						if(cs != null)
+						ConnectionState cs = null;
+						
+						if(connectionStates.TryGetValue(connectionId, out cs))
 						{
 							cs.removeConsumer(id);
 						}

http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/3d2d37af/src/main/csharp/State/SynchronizedObjects.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/State/SynchronizedObjects.cs b/src/main/csharp/State/SynchronizedObjects.cs
index 24a23a8..2795503 100644
--- a/src/main/csharp/State/SynchronizedObjects.cs
+++ b/src/main/csharp/State/SynchronizedObjects.cs
@@ -177,6 +177,14 @@ namespace Apache.NMS.Stomp.State
 			}
 		}
 
+		public bool TryGetValue(TKey key, out TValue val)
+		{
+			lock(((ICollection) _dictionary).SyncRoot)
+			{
+				return _dictionary.TryGetValue(key, out val);
+			}
+		}
+
 		public AtomicCollection<TKey> Keys
 		{
 			get

http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/3d2d37af/src/main/csharp/Transport/MutexTransport.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Transport/MutexTransport.cs b/src/main/csharp/Transport/MutexTransport.cs
index c5cbc31..540addd 100644
--- a/src/main/csharp/Transport/MutexTransport.cs
+++ b/src/main/csharp/Transport/MutexTransport.cs
@@ -20,6 +20,7 @@ using Apache.NMS.Stomp.Commands;
 
 namespace Apache.NMS.Stomp.Transport
 {
+	/// <summary>
 	/// A Transport which guards access to the next transport using a mutex.
 	/// </summary>
 	public class MutexTransport : TransportFilter
@@ -31,6 +32,7 @@ namespace Apache.NMS.Stomp.Transport
 			if(timeout > 0)
 			{
 				DateTime timeoutTime = DateTime.Now + TimeSpan.FromMilliseconds(timeout);
+				int waitCount = 1;
 
 				while(true)
 				{
@@ -44,7 +46,9 @@ namespace Apache.NMS.Stomp.Transport
 						throw new IOException(string.Format("Oneway timed out after {0} milliseconds.", timeout));
 					}
 
-					Thread.Sleep(10);
+					// Back off from being overly aggressive.  Having too many threads
+					// aggressively trying to get the lock pegs the CPU.
+					Thread.Sleep(3 * (waitCount++));
 				}
 			}
 			else


Mime
View raw message